Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not remove failed jobs from queue #230

Closed
ghost opened this issue Jun 24, 2020 · 8 comments
Closed

Not remove failed jobs from queue #230

ghost opened this issue Jun 24, 2020 · 8 comments

Comments

@ghost
Copy link

ghost commented Jun 24, 2020

I dont want to remove the failed jobs from the queue, Instead let the user process it later. I have tried to set defaultJobOptions.removeOnFail to false, still it is removed from the queue. I could not find anything regarding this in the docs either.

const contactsQueue = new Queue("contacts", {
		connection: {
			host: "127.0.0.1",
			port: 6379,
		},
		defaultJobOptions: { removeOnFail: false },
	});

	contacts.filter(async (contact) => {
		contactsQueue.add("contact:add", contact.email);
	});
	const worker = new Worker(
		"contacts",
		async (job) => {
			throw new Error("foo bar");
		},
		{
			connection: {
				host: "127.0.0.1",
				port: 6379,
			},
		}
	);

	worker.on("completed", async (job) => {
		console.log("completed", job.data);
	});

	worker.on("failed", async (job: Job) => {
		await job.moveToFailed(new Error("111"), "0", true);
		console.log("failed", job.data);
		console.log("failed", await job.isFailed());
	});

	worker.on("drained", async (job) => {
		console.log("drained", job);
		await worker.close();
	});

The logs print that the job failed as true still they are removed from queue

@ghost
Copy link
Author

ghost commented Jun 26, 2020

@manast Can you please tell if this is broken? Works as expected in bull, but not in bullmq

@ghost ghost mentioned this issue Jun 26, 2020
@manast
Copy link
Contributor

manast commented Jun 27, 2020

What do you mean with "remove from queue" to begin with? Explain what is the behaviour you expect with code please.

@ghost
Copy link
Author

ghost commented Jun 27, 2020

Okay, so in producer, I am adding some data to the queue 15 times only.

const queueScheduler = new QueueScheduler("contacts", {
		connection: redisConnection,
	});
	const contactsQueue = new Queue("contacts", {
		connection: redisConnection,
		defaultJobOptions: {
			removeOnFail: false,
		},
	});
	console.log(await contactsQueue.getFailedCount());
	console.log((await contactsQueue.getFailed()).length);
	for (let i = 1; i <= 15; ++i) {
		await contactsQueue.add("add", i);
	}

And in consumer, I am throwing error for each job,

    const worker = new Worker(
		"contacts",
		async (job: Job) => {
			await sleep(200);
			throw new Error("foofoo");
		},
		{
			connection: redisConnection,
			limiter: {
				max: 20,
				duration: 1000,
			},
			concurrency: 10,
		}
	);
	worker.on("completed", async (job: Job) => {
		console.log("[completed]", job.data);
	});
	worker.on("failed", async (job: Job) => {
		console.log("[failed]", job.data);
	});

Now as expected, all the jobs are failing, and when I call console.log(await contactsQueue.getFailedCount());, the count is correct, 15 jobs, but when I call await contactsQueue.getFailed(), it only returns 2 jobs. And should we manually add these jobs to the failed queue or is there a builtin mechanism to handle such tasks?

@manast
Copy link
Contributor

manast commented Jun 27, 2020

In the code above you are calling getFailedCount and getFailed before the workers had a chance to process the jobs.

@ghost
Copy link
Author

ghost commented Jun 27, 2020

Yes you are right, but I call the producer again, and at time I get the failed jobs from the previous time

@manast
Copy link
Contributor

manast commented Jun 27, 2020

send a code snippet that precisely reproduces what you mean

@ghost
Copy link
Author

ghost commented Jun 27, 2020

Actually I am using producer-consumer design. There are 2 routes, one for producer to add data to queue. It adds 15 items to the queue and returns. Another route for consumer, it asynchronously calls the worker, to process only 20 items from the queue, and then close the worker.

Here is what I am expecting...

  1. Call producer (failed jobs are 0, contact queue now has 15 jobs)
  2. Call producer (failed jobs are 0, contact queue now has 30 jobs)
  3. Call consumer (Work on 20 jobs and lets say 4 operations failed)
  4. Call producer (failed jobs are 5, contact queue now has 30 jobs (30 - 20 + 15 + 5) (after adding the failed jobs back to queue)). :)

But what I see is...

  1. Call producer (failed jobs are 0, contact queue now has 15 jobs)
  2. Call producer (failed jobs are 0, contact queue now has 30 jobs)
  3. Call consumer (Work on 20 jobs and lets say 4 operations failed)
  4. Call producer (failed jobs are 5, contact queue now has 27 jobs (30 - 20 + 15 + 2) (after adding the failed jobs back to queue)). :(

As you can see, getFailed() only returns 2 jobs. The code pasted above is all I have, The rest of the code is for express and I think irrelevant.

@manast
Copy link
Contributor

manast commented Jun 27, 2020

The code above does not reproduce an issue. You need to be able to write a code snippet as a test case, where something does not work as expected. I would like to help you, but really I can't if you do not provide the code.

@ghost ghost closed this as completed Apr 13, 2021
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant