Skip to content

Commit

Permalink
Merge fe83314 into 7695192
Browse files Browse the repository at this point in the history
  • Loading branch information
Skn0tt committed Aug 20, 2021
2 parents 7695192 + fe83314 commit 2472fec
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 3 deletions.
10 changes: 9 additions & 1 deletion src/shared/acknowledge.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@ redis.call("ZREM", "processing", jobQueue .. ":" .. jobId)
redis.call("PUBLISH", jobQueue .. ":" .. jobId, "acknowledged")
redis.call("PUBLISH", "acknowledged", jobQueue .. ":" .. jobId)

if rescheduleFor == '' then
local wasOverridenDuringExecution = false
if redis.call("ZSCORE", "queue", jobQueue .. ":" .. jobId) then
wasOverridenDuringExecution = true
elseif redis.call("ZSCORE", "blocked:" .. jobQueue, jobId) then
wasOverridenDuringExecution = true
end

if wasOverridenDuringExecution then
elseif rescheduleFor == '' then
redis.call("DEL", jobTableJobKey)
redis.call("SREM", "queues:" .. jobQueue, jobId)
else
Expand Down
75 changes: 75 additions & 0 deletions test/functional/repro-override-during-execution.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { expect } from "chai";
import { makeWorkerEnv } from "./support";
import { delay, describeAcrossBackends, makeSignal } from "../util";

describeAcrossBackends("Repro: Override during Execution", (backend) => {
const executionStarted = makeSignal();
const jobWasOverriden = makeSignal();
const secondJobWasCalled = makeSignal();
const env = makeWorkerEnv(backend, async (job) => {
if (job.payload === "wait") {
executionStarted.signal();
await jobWasOverriden;
}

if (job.payload === "enqueue-second") {
await env.producer.enqueue({
queue: job.queue,
id: job.id,
payload: "second",
exclusive: true,
override: true,
});
}

if (job.payload === "second") {
secondJobWasCalled.signal();
}

return false;
});
beforeEach(env.setup);
afterEach(env.teardown);

describe("when job is overriden while being executed", () => {
it("is executed along the new (overriden) schedule", async () => {
const queue = "override-during";
const id = "foo";
await env.producer.enqueue({
queue,
id,
schedule: { type: "every", meta: "100" },
payload: "wait",
});

await executionStarted;

const newRunAt = new Date(Date.now() + 10000000);
await env.producer.enqueue({
queue,
id,
runAt: newRunAt,
override: true,
payload: "",
});

jobWasOverriden.signal();
await delay(10);

const job = await env.producer.findById(queue, id);
expect(+job.runAt).to.equal(+newRunAt);
});
it("doesnt stop next job from being executed (repro quirrel#739)", async () => {
const queue = "doesntstopnext";
const id = "foo";
await env.producer.enqueue({
queue,
id,
payload: "enqueue-second",
exclusive: true,
});

await secondJobWasCalled
});
});
});
4 changes: 2 additions & 2 deletions test/functional/support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export function makeProducerEnv(
return env;
}

type WorkerFailPredicate = (job: Job<string>) => boolean;
type WorkerFailPredicate = (job: Job<string>) => boolean | Promise<boolean>;

type JobListener = (job: Job<string>) => void;

Expand Down Expand Up @@ -126,7 +126,7 @@ export function makeWorkerEnv(
await delay(1);
}

if (fail(job)) {
if (await fail(job)) {
await workerEnv.worker.acknowledger.reportFailure(
ackDescriptor,
job,
Expand Down

0 comments on commit 2472fec

Please sign in to comment.