diff --git a/src/shared/acknowledge.lua b/src/shared/acknowledge.lua index 03e83d3..96835be 100644 --- a/src/shared/acknowledge.lua +++ b/src/shared/acknowledge.lua @@ -12,7 +12,15 @@ redis.call("ZREM", "processing", jobQueue .. ":" .. jobId) redis.call("PUBLISH", jobQueue .. ":" .. jobId, "acknowledged") redis.call("PUBLISH", "acknowledged", jobQueue .. ":" .. jobId) -if rescheduleFor == '' then +local wasOverridenDuringExecution = false +if redis.call("ZSCORE", "queue", jobQueue .. ":" .. jobId) then + wasOverridenDuringExecution = true +elseif redis.call("ZSCORE", "blocked:" .. jobQueue, jobId) then + wasOverridenDuringExecution = true +end + +if wasOverridenDuringExecution then +elseif rescheduleFor == '' then redis.call("DEL", jobTableJobKey) redis.call("SREM", "queues:" .. jobQueue, jobId) else diff --git a/test/functional/repro-override-during-execution.test.ts b/test/functional/repro-override-during-execution.test.ts new file mode 100644 index 0000000..ebf26c4 --- /dev/null +++ b/test/functional/repro-override-during-execution.test.ts @@ -0,0 +1,75 @@ +import { expect } from "chai"; +import { makeWorkerEnv } from "./support"; +import { delay, describeAcrossBackends, makeSignal } from "../util"; + +describeAcrossBackends("Repro: Override during Execution", (backend) => { + const executionStarted = makeSignal(); + const jobWasOverriden = makeSignal(); + const secondJobWasCalled = makeSignal(); + const env = makeWorkerEnv(backend, async (job) => { + if (job.payload === "wait") { + executionStarted.signal(); + await jobWasOverriden; + } + + if (job.payload === "enqueue-second") { + await env.producer.enqueue({ + queue: job.queue, + id: job.id, + payload: "second", + exclusive: true, + override: true, + }); + } + + if (job.payload === "second") { + secondJobWasCalled.signal(); + } + + return false; + }); + beforeEach(env.setup); + afterEach(env.teardown); + + describe("when job is overriden while being executed", () => { + it("is executed along the new (overriden) schedule", async () => { + const queue = "override-during"; + const id = "foo"; + await env.producer.enqueue({ + queue, + id, + schedule: { type: "every", meta: "100" }, + payload: "wait", + }); + + await executionStarted; + + const newRunAt = new Date(Date.now() + 10000000); + await env.producer.enqueue({ + queue, + id, + runAt: newRunAt, + override: true, + payload: "", + }); + + jobWasOverriden.signal(); + await delay(10); + + const job = await env.producer.findById(queue, id); + expect(+job.runAt).to.equal(+newRunAt); + }); + it("doesnt stop next job from being executed (repro quirrel#739)", async () => { + const queue = "doesntstopnext"; + const id = "foo"; + await env.producer.enqueue({ + queue, + id, + payload: "enqueue-second", + exclusive: true, + }); + + await secondJobWasCalled + }); + }); +}); diff --git a/test/functional/support.ts b/test/functional/support.ts index 7eac39f..ac005f8 100644 --- a/test/functional/support.ts +++ b/test/functional/support.ts @@ -72,7 +72,7 @@ export function makeProducerEnv( return env; } -type WorkerFailPredicate = (job: Job) => boolean; +type WorkerFailPredicate = (job: Job) => boolean | Promise; type JobListener = (job: Job) => void; @@ -126,7 +126,7 @@ export function makeWorkerEnv( await delay(1); } - if (fail(job)) { + if (await fail(job)) { await workerEnv.worker.acknowledger.reportFailure( ackDescriptor, job,