Skip to content

Commit

Permalink
fix: fix a bug of job interval ended up set in future
Browse files Browse the repository at this point in the history
  • Loading branch information
yujiosaka committed Oct 31, 2023
1 parent 05d20dd commit c4d1f01
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 54 deletions.
19 changes: 14 additions & 5 deletions src/job-runner.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Duration } from "date-fns";
import { differenceInMilliseconds } from "date-fns";
import { differenceInMilliseconds, min } from "date-fns";
import { CronyxArgumentError, CronyxError } from "./error";
import Job from "./job";
import type BaseJobLock from "./job-lock";
Expand Down Expand Up @@ -61,10 +61,19 @@ export default class JobRunner<I> {
}

async requestJobStart(): Promise<Job<I> | null> {
const requestedAt = new Date();
const bufferedRequestedAt = subInterval(requestedAt, this.#startBuffer, this.#timezone);

if (this.#jobIntervalStartedAt) {
if (!this.#noLock) throw new CronyxArgumentError("Should enable `noLock` when `jobIntervalStartedAt` is passed");

const jobIntervalEndedAt = addInterval(this.#jobIntervalStartedAt, this.#jobInterval, this.#timezone);
if (bufferedRequestedAt < this.#jobIntervalStartedAt) {
log(`Job is not reached to start time for ${this.#jobName}`);
return null;
}

const maxJobIntervalEndedAt = addInterval(this.#jobIntervalStartedAt, this.#jobInterval, this.#timezone);
const jobIntervalEndedAt = min([maxJobIntervalEndedAt, bufferedRequestedAt]);
const jobInterval = differenceInMilliseconds(jobIntervalEndedAt, this.#jobIntervalStartedAt);
const jobLock = MockJobLock.parse({ jobName: this.#jobName, jobInterval, jobIntervalEndedAt });

Expand All @@ -76,20 +85,20 @@ export default class JobRunner<I> {
return new Job(this.#jobStore, jobLock);
}

const requestedAt = new Date();
const retryIntervalStartedAt = subInterval(requestedAt, this.#retryInterval ?? requestedAt.getTime(), this.#timezone);
const lastJobLock = await this.#ensureLastJobLock(requestedAt);
if (lastJobLock.isActive && lastJobLock.updatedAt > retryIntervalStartedAt) {
return null;
}

const jobIntervalStartedAt = getLastDeactivatedJobIntervalEndedAt(lastJobLock);
const jobIntervalEndedAt = addInterval(jobIntervalStartedAt, this.#jobInterval, this.#timezone);
if (lastJobLock._id !== null && subInterval(requestedAt, this.#startBuffer, this.#timezone) < jobIntervalEndedAt) {
const maxJobIntervalEndedAt = addInterval(jobIntervalStartedAt, this.#jobInterval, this.#timezone);
if (lastJobLock._id !== null && bufferedRequestedAt < maxJobIntervalEndedAt) {
log(`Job is not reached to start time for ${this.#jobName}`);
return null;
}

const jobIntervalEndedAt = min([maxJobIntervalEndedAt, bufferedRequestedAt]);
const areRequiredJobsFulfilled = await this.#areRequiredJobsFulfilled(jobIntervalEndedAt);
if (!areRequiredJobsFulfilled) {
return null;
Expand Down
File renamed without changes.
File renamed without changes.
104 changes: 55 additions & 49 deletions test/integrations/shared.ts → test/integration/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ export function testBehavesLikeCronyx<S extends BaseJobStore<I>, I = JobLockId<S
const secondJobIntervalEndedAt = add(requestedAt, { days: 1 });
const timezone = "Asia/Tokyo";
const jobName = "jobName";
const jobInterval = 1000 * 60 * 60 * 24; // 1 day
const jobOptions = {
jobName,
jobInterval: "0 0 0 * * *", // daily
startBuffer: { minutes: 30 },
retryInterval: { days: 1 },
};
const requiredJobOptions = {
jobName: "requiredJobName",
Expand All @@ -43,32 +42,60 @@ export function testBehavesLikeCronyx<S extends BaseJobStore<I>, I = JobLockId<S
setSystemTime();
});

test("reruns job after interruption", async () => {
// first job with interruption
const firstJobWithInterruption = await cronyx.requestJobStart(jobOptions);
expect(firstJobWithInterruption?.id).not.toBe(null);
expect(firstJobWithInterruption?.name).toBe(jobName);
expect(firstJobWithInterruption?.interval).toBe(jobInterval);
expect(firstJobWithInterruption?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJobWithInterruption?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJobWithInterruption?.isActive).toBe(true);
expect(firstJobWithInterruption?.createdAt).toBeDate();
expect(firstJobWithInterruption?.updatedAt).toBeDate();
await firstJobWithInterruption!.interrupt();

// first job
const firstJob = await cronyx.requestJobStart(jobOptions);
expect(firstJob?.id).not.toBe(null);
expect(firstJob?.name).toBe(jobName);
expect(firstJob?.interval).toBe(jobInterval);
expect(firstJob?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJob?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJob?.isActive).toBe(true);
expect(firstJob?.createdAt).toBeDate();
expect(firstJob?.updatedAt).toBeDate();
await firstJob!.finish();
});

test("does not run next job before start buffer", async () => {
// first job
const firstJob = await cronyx.requestJobStart(jobOptions);
expect(firstJob?.id).not.toBe(null);
expect(firstJob?.name).toBe(jobName);
expect(firstJob?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(firstJob?.interval).toBe(jobInterval);
expect(firstJob?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJob?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJob?.isActive).toBe(true);
expect(firstJob?.createdAt).toBeDate();
expect(firstJob?.updatedAt).toBeDate();
await firstJob!.finish();

const jobOptionsWithStartBuffer = { ...jobOptions, startBuffer: { minutes: 30 } } as const;

setSystemTime(add(requestedAt, { days: 1 }));

// second job before start buffer
const secondJobBeforeStartBuffer = await cronyx.requestJobStart(jobOptions);
const secondJobBeforeStartBuffer = await cronyx.requestJobStart(jobOptionsWithStartBuffer);
expect(secondJobBeforeStartBuffer).toBe(null);

setSystemTime(add(requestedAt, { days: 1, minutes: 30 }));

// second job after start buffer
const secondJobAfterStartBuffer = await cronyx.requestJobStart(jobOptions);
const secondJobAfterStartBuffer = await cronyx.requestJobStart(jobOptionsWithStartBuffer);
expect(secondJobAfterStartBuffer?.id).not.toBe(null);
expect(secondJobAfterStartBuffer?.name).toBe(jobName);
expect(secondJobAfterStartBuffer?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(secondJobAfterStartBuffer?.interval).toBe(jobInterval);
expect(secondJobAfterStartBuffer?.intervalStartedAt.getTime()).toBe(secondJobIntervalStartedAt.getTime());
expect(secondJobAfterStartBuffer?.intervalEndedAt.getTime()).toBe(secondJobIntervalEndedAt.getTime());
expect(secondJobAfterStartBuffer?.isActive).toBe(true);
Expand All @@ -77,55 +104,31 @@ export function testBehavesLikeCronyx<S extends BaseJobStore<I>, I = JobLockId<S
await secondJobAfterStartBuffer!.finish();
});

test("reruns job after interruption", async () => {
// first job with interruption
const firstJobWithInterruption = await cronyx.requestJobStart(jobOptions);
expect(firstJobWithInterruption?.id).not.toBe(null);
expect(firstJobWithInterruption?.name).toBe(jobName);
expect(firstJobWithInterruption?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(firstJobWithInterruption?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJobWithInterruption?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJobWithInterruption?.isActive).toBe(true);
expect(firstJobWithInterruption?.createdAt).toBeDate();
expect(firstJobWithInterruption?.updatedAt).toBeDate();
await firstJobWithInterruption!.interrupt();

// first job
const firstJob = await cronyx.requestJobStart(jobOptions);
expect(firstJob?.id).not.toBe(null);
expect(firstJob?.name).toBe(jobName);
expect(firstJob?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(firstJob?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJob?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJob?.isActive).toBe(true);
expect(firstJob?.createdAt).toBeDate();
expect(firstJob?.updatedAt).toBeDate();
await firstJob!.finish();
});

test("reruns job after retry interval", async () => {
// first job without finish
const firstJobWithoutFinish = await cronyx.requestJobStart(jobOptions);
expect(firstJobWithoutFinish?.id).not.toBe(null);
expect(firstJobWithoutFinish?.name).toBe(jobName);
expect(firstJobWithoutFinish?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(firstJobWithoutFinish?.interval).toBe(jobInterval);
expect(firstJobWithoutFinish?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJobWithoutFinish?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJobWithoutFinish?.isActive).toBe(true);
expect(firstJobWithoutFinish?.createdAt).toBeDate();
expect(firstJobWithoutFinish?.updatedAt).toBeDate();

const jobOptionsWithRetryInterval = { ...jobOptions, retryInterval: { days: 1 } } as const;

// first job before retry interval
const firstJobBeforeRetryInterval = await cronyx.requestJobStart(jobOptions);
const firstJobBeforeRetryInterval = await cronyx.requestJobStart(jobOptionsWithRetryInterval);
expect(firstJobBeforeRetryInterval).toBe(null);

setSystemTime(add(requestedAt, { days: 2 }));

// first job after retry interval
const firstJobAfterRetryInterval = await cronyx.requestJobStart(jobOptions);
const firstJobAfterRetryInterval = await cronyx.requestJobStart(jobOptionsWithRetryInterval);
expect(firstJobAfterRetryInterval?.id).not.toBe(null);
expect(firstJobAfterRetryInterval?.name).toBe(jobName);
expect(firstJobAfterRetryInterval?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(firstJobAfterRetryInterval?.interval).toBe(jobInterval);
expect(firstJobAfterRetryInterval?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJobAfterRetryInterval?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJobAfterRetryInterval?.isActive).toBe(true);
Expand All @@ -135,57 +138,60 @@ export function testBehavesLikeCronyx<S extends BaseJobStore<I>, I = JobLockId<S
});

test("runs job with no lock", async () => {
const jobOptionsWithNoLock = { ...jobOptions, noLock: true } as const;

// first job without finish
const firstJobWithoutFinish = await cronyx.requestJobStart({ ...jobOptions, noLock: true });
const firstJobWithoutFinish = await cronyx.requestJobStart(jobOptionsWithNoLock);
expect(firstJobWithoutFinish?.id).toBe(null);
expect(firstJobWithoutFinish?.name).toBe(jobName);
expect(firstJobWithoutFinish?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(firstJobWithoutFinish?.interval).toBe(jobInterval);
expect(firstJobWithoutFinish?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJobWithoutFinish?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJobWithoutFinish?.isActive).toBe(true);
expect(firstJobWithoutFinish?.createdAt).toBeDate();
expect(firstJobWithoutFinish?.updatedAt).toBeDate();

// first job with interruption
const firstJobWithFinish = await cronyx.requestJobStart({ ...jobOptions, noLock: true });
const firstJobWithFinish = await cronyx.requestJobStart(jobOptionsWithNoLock);
expect(firstJobWithFinish?.id).toBe(null);
expect(firstJobWithFinish?.name).toBe(jobName);
expect(firstJobWithFinish?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(firstJobWithFinish?.interval).toBe(jobInterval);
expect(firstJobWithFinish?.intervalStartedAt.getTime()).toBe(firstJobIntervalStartedAt.getTime());
expect(firstJobWithFinish?.intervalEndedAt.getTime()).toBe(firstJobIntervalEndedAt.getTime());
expect(firstJobWithFinish?.isActive).toBe(true);
expect(firstJobWithFinish?.createdAt).toBeDate();
expect(firstJobWithFinish?.updatedAt).toBeDate();
await firstJobWithFinish!.interrupt();

// second job
// second job with specified job interval started at
const secondJob = await cronyx.requestJobStart({
...jobOptions,
noLock: true,
...jobOptionsWithNoLock,
jobIntervalStartedAt: secondJobIntervalStartedAt,
});
expect(secondJob?.id).toBe(null);
expect(secondJob?.name).toBe(jobName);
expect(secondJob?.interval).toBe(1000 * 60 * 60 * 24); // 1 day
expect(secondJob?.interval).toBe(0);
expect(secondJob?.intervalStartedAt.getTime()).toBe(secondJobIntervalStartedAt.getTime());
expect(secondJob?.intervalEndedAt.getTime()).toBe(secondJobIntervalEndedAt.getTime());
expect(secondJob?.intervalEndedAt.getTime()).toBe(secondJobIntervalStartedAt.getTime());
expect(secondJob?.isActive).toBe(true);
expect(secondJob?.createdAt).toBeDate();
expect(secondJob?.updatedAt).toBeDate();
await secondJob!.finish();
});

test("runs job after required jobs fulfilled", async () => {
const jobOptionsWithRequiredJobNames = { ...jobOptions, requiredJobNames: ["requiredJobName"] };

// first job with required job not found
await cronyx.requestJobExec({ ...jobOptions, requiredJobNames: ["requiredJobName"] }, jobTask);
await cronyx.requestJobExec(jobOptionsWithRequiredJobNames, jobTask);
expect(jobTask).not.toHaveBeenCalled();

// first required job
await cronyx.requestJobExec(requiredJobOptions, requiredJobTask);
expect(requiredJobTask).toHaveBeenCalledTimes(1);

// first job with required job fulfilled
await cronyx.requestJobExec({ ...jobOptions, requiredJobNames: ["requiredJobName"] }, jobTask);
await cronyx.requestJobExec(jobOptionsWithRequiredJobNames, jobTask);
expect(jobTask).toHaveBeenCalledTimes(1);

setSystemTime(add(requestedAt, { days: 1, minutes: 30 }));
Expand All @@ -197,15 +203,15 @@ export function testBehavesLikeCronyx<S extends BaseJobStore<I>, I = JobLockId<S
}

// second job with required job unfullfilled
await cronyx.requestJobExec({ ...jobOptions, requiredJobNames: ["requiredJobName"] }, jobTask);
await cronyx.requestJobExec(jobOptionsWithRequiredJobNames, jobTask);
expect(jobTask).toHaveBeenCalledTimes(1);

// required jobs 24 hours
await cronyx.requestJobExec(requiredJobOptions, requiredJobTask);
expect(requiredJobTask).toHaveBeenCalledTimes(25);

// second job with required job fulfilled
await cronyx.requestJobExec({ ...jobOptions, requiredJobNames: ["requiredJobName"] }, jobTask);
await cronyx.requestJobExec(jobOptionsWithRequiredJobNames, jobTask);
expect(jobTask).toHaveBeenCalledTimes(2);
});
}
File renamed without changes.

0 comments on commit c4d1f01

Please sign in to comment.