Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exiting] Fatal exception: Missing lock for job err #596

Closed
ianjkaplan opened this issue Jun 16, 2021 · 17 comments · Fixed by #697
Closed

[exiting] Fatal exception: Missing lock for job err #596

ianjkaplan opened this issue Jun 16, 2021 · 17 comments · Fixed by #697
Labels

Comments

@ianjkaplan
Copy link

I have a worker server that was knocked down due to an unhanded exception. Upon restart it started to pull jobs from the queue and It again began to shut down due to Fatal exception: Error: Missing lock for job 4189 failed

Please advise

@manast
Copy link
Contributor

manast commented Jun 17, 2021

It is difficult to give any meaningful advise with so little context... That exception is rare, I would start by investigating if your jobs are stalling (i.e. very high CPU consuming and long lived).

@ianjkaplan
Copy link
Author

Yes the jobs are likely stalled because the worker crashed due to a separete unhandled exception and then upon restart began processing jobs that produced this error.

@allandiego
Copy link

allandiego commented Jul 2, 2021

@manast Im facing the same error, do you have any clue what could it be?

I get the worker log message so its executing the job but then the error:

Processing job 8 of type job-broadcast, {
  "chatId": 2,
  "apiMethod": "sendMessage",
  "messageText": "test message"
}

Error: Missing lock for job 8. failed

this is my queue code:

import {
  Queue,
  JobsOptions,
  Worker,
  WorkerOptions,
  QueueScheduler,
  QueueOptions,
} from 'bullmq';

import {
  BroadcasterOptions,
  QueueJobData,
  BroadcasteMessage,
} from './types';

export class Broadcaster {
  private queue: Queue;
  private queueScheduler: QueueScheduler;
  private worker: Worker;
  private workerOptions: WorkerOptions;
  private queueOptions: QueueOptions;
  private jobsOptions: JobsOptions;
  private queueName: string;
  private jobsProcessed: number;
  private jobsAmount: number;

  constructor(options: BroadcasterOptions) {
    this.queueName = options.queueName;
    this.jobsProcessed = 0;
    this.jobsAmount = 0;
    this.jobsOptions = options.jobsOptions;
    this.queueOptions = options.queueOptions;
    this.workerOptions = options.workerOptions;
  }

  async init(): Promise<void> {
    this.queue = new Queue(this.queueName, this.queueOptions);

    this.queueScheduler = new QueueScheduler(this.queueName, {
      connection: this.queueOptions.connection,
    });

    this.worker = new Worker<QueueJobData>(
      this.queueName,
      async job => {
        // Optionally report some progress
        job.updateProgress(this.jobsProcessed / this.jobsAmount);

        // const { apiMethod, chatId, ...restArguments } = job.data;

        console.log(
          `Processing job ${job.id} of type ${job.name}, ${JSON.stringify(
            job.data,
            null,
            2,
          )}`,
        );
      },
      this.workerOptions,
    );

    this.worker.on('drained', async () => {
      console.log('worker drained');
    });

    this.worker.on('completed', job => {
      this.jobsProcessed += 1;

      console.log(`Job (${job.id}) chat (${job.data.chat.id}) has completed!`);

      if (this.jobsProcessed === this.jobsAmount) {
        console.log('Broadcast finished');

        this.jobsProcessed = 0;
        this.jobsAmount = 0;
      }
    });

    this.worker.on('failed', (job: any, error: any) => {
      this.jobsProcessed += 1;

      console.log(
        `Job (${job.id}) chat (${job.data.chat.id}) has failed with ${error}`,
      );

      if (this.jobsProcessed === this.jobsAmount) {
        console.log('Broadcast finished');

        this.jobsProcessed = 0;
        this.jobsAmount = 0;
      }
    });

    this.worker.on('error', error => {
      console.error('worker error', error);
    });
  }

  async addBroadcastJob(
    chatIds: number[],
    jobData: BroadcasteMessage,
  ): Promise<void> {
    try {
      this.jobsAmount += chatIds.length;

      await Promise.all(
        chatIds.map(async chatId => {
          await this.queue.add(
            `job-broadcast`,
            { chatId, ...jobData },
            this.jobsOptions,
          );
        }),
      );
    } catch (error) {
      console.log(error);
    }
  }
}

Test queue:

async function run() {
  const bc = new Broadcaster({
    queueName: `queue-broadcast`,
    queueOptions: {
      connection: {
        host: 'localhost',
        port: 6379,
      },
    },
    jobsOptions: {
      attempts: 1,
    },
    workerOptions: {
      concurrency: 10,
      limiter: {
        max: 10,
        duration: 2 * 1000,
      },
      connection: {
        host: 'localhost',
        port: 6379,
      },
    },
  });

  await bc.init();

  await bc.addBroadcastJob([1, 2, 3], {
    apiMethod: 'sendMessage',
    messageText: 'test message',
  });

  await bc.addBroadcastJob([...Array(30).keys()], {
    apiMethod: 'sendMessage',
    messageText: 'test message 2',
  });
}

run();

@manast
Copy link
Contributor

manast commented Jul 12, 2021

The case that I can think of is if the stalled job still is being processed by the worker that stalled, because then you will have 2 workers working on the same job, then when one of the workers completes the other one will get this error, but then you will see that the job actually was completed (or failed).

@allandiego
Copy link

Im using only 1 worker in the same process, tried to run the worker on a separated file / process and the result is the same

@allandiego
Copy link

doing a bit more of debugging I have found the problem, the error was caused by the log at worker on complete handler while trying to access an non existing job data property

this.worker.on('completed', job => {
  console.log(`Job (${job.id}) chat (${job.data.chat.id}) has completed!`);  // << this cause the error
});

@manast
Copy link
Contributor

manast commented Jul 13, 2021

@allandiego how can that possibly cause the error, it is just console logging a string?

@allandiego
Copy link

allandiego commented Jul 14, 2021

@manast I have no idea, the property job.data.chat.id dont exists the correct should be job.data.chatId. I have created this reproducible demo check this out:
If you switch the logs the other dont throws any error

import { Queue, Worker, QueueScheduler } from 'bullmq';

async function run() {
  const queueName = 'myQueue';
  const queue = new Queue(queueName);
  const queueScheduler = new QueueScheduler(queueName);

  const worker = new Worker(
    queueName,
    async job => {
      console.log(
        `Processing job ${job.id} of type ${job.name}, ${JSON.stringify(
          job.data,
          null,
          2,
        )}`,
      );
    },
    {
      concurrency: 10,
      limiter: {
        max: 10,
        duration: 2 * 1000,
      },
    },
  );

  worker.on('drained', async () => {
    console.log('worker drained');
  });

  worker.on('completed', job => {
    // this cause the error job.data.chat.id dont exists
    console.log(`Job (${job.id}) chat (${job.data.chat.id}) has completed!`);

    // correct property runs without error
    // console.log(`Job (${job.id}) chat (${job.data.chatId}) has completed!`);
  });

  worker.on('failed', (job: any, error: any) => {
    console.log(
      `Job (${job.id}) chat (${job.data.chatId}) has failed with ${error}`,
    );
  });

  worker.on('error', error => {
    console.error('worker error', error);
  });

  async function addBroadcastJob(
    chatIds: number[],
    jobData: any,
  ): Promise<void> {
      await Promise.all(
        chatIds.map(async chatId => {
          await queue.add(
            `job-broadcast`,
            { chatId, ...jobData },
            { attempts: 1 },
          );
        }),
      );
  }

  await addBroadcastJob([...Array(30).keys()], {
    apiMethod: 'sendMessage',
    messageText: 'test message',
  });
}

run();

@manast
Copy link
Contributor

manast commented Jul 15, 2021

@allandiego I recommend you that you simplify that test up to its minimal expression, then you we will be able to see where the problem comes from. For instance start with simplest possible that works and then keep adding functionality.

@allandiego
Copy link

Its working, the error only appers when console.log a non existing job.data property inside worker.on('completed'

@amirAlamian
Copy link

I have the same issue I realize it was because of the keyPrefix in redis connection. just remove

@jamesholcomb
Copy link

I ran into this recently when implementing a new feature and can confirm @allandiego experience.

If an exception is thrown inside the completed event, it can cause the Missing lock for job error. I was able to reproduce it in small test app that schedules jobs quickly (orders of milliseconds) across 2 QueueScheulders.

My app uses the delay queue, removeOnComplete: true removeOnFail: true and custom job ids.

After the missing lock error, the queue is left in an indeterminate state. Adding additional jobs to the queue with the same id as the job that caused the error do not get added (but the queue.add() call returns as through it does with no error).

@manast
Copy link
Contributor

manast commented Aug 18, 2021

@jamesholcomb Do you have an example code that reproduces what you mean?

@jamesholcomb
Copy link

@manast
Copy link
Contributor

manast commented Aug 20, 2021

@jamesholcomb I can reproduce the error and I found the reason, apparently when you use an eventemitter's "emit" method, and any of the listeners throws an exception, then the "emit" call itself also throws an exception, which is a behaviour that basically blows my mind, so now we need to wrap all the calls to emit since they can throw exceptions if some listener does not handle their errors...

@manast
Copy link
Contributor

manast commented Aug 20, 2021

I find this behaviour really disturbing, the whole point of the emit emitter is to decouple the emitter from the listeners...

@github-actions
Copy link

🎉 This issue has been resolved in version 1.42.1 🎉

The release is available on:

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants