Skip to content

Commit

Permalink
fix: elaborate exported types
Browse files Browse the repository at this point in the history
  • Loading branch information
yujiosaka committed Oct 24, 2023
1 parent e49455f commit b31a782
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 41 deletions.
24 changes: 17 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ import type BaseJobStore from "./job-store/base";
/**
* @public
*/
export type CronyxOptions<T> = {
jobStore: BaseJobStore<T>;
export enum Source {
Mongodb = "mongodb",
Redis = "redis",
Mysql = "mysql",
Postgres = "Postgres",
}

/**
* @public
*/
export type CronyxOptions<S extends Source> = {
jobStore: BaseJobStore<S>;
timezone?: string;
};

Expand All @@ -29,16 +39,16 @@ export type RequestJobOptions =
/**
* @public
*/
export default class Cronyx<T> {
#jobStore: BaseJobStore<T>;
export default class Cronyx<S extends Source> {
#jobStore: BaseJobStore<S>;
#timezone: string | undefined;

constructor(options: CronyxOptions<T>) {
constructor(options: CronyxOptions<S>) {
this.#jobStore = options.jobStore;
this.#timezone = options.timezone;
}

async requestJobExec(options: RequestJobOptions, task: (job: Job<T>) => Promise<void>): Promise<void> {
async requestJobExec(options: RequestJobOptions, task: (job: Job<S>) => Promise<void>): Promise<void> {
const jobRunner = new JobRunner(this.#jobStore, options.jobName, options.jobInterval, {
timezone: this.#timezone,
requiredJobNames: options.requiredJobNames,
Expand All @@ -50,7 +60,7 @@ export default class Cronyx<T> {
return await jobRunner.requestJobExec(task);
}

async requestJobStart(options: RequestJobOptions): Promise<Job<T> | null> {
async requestJobStart(options: RequestJobOptions): Promise<Job<S> | null> {
const jobRunner = new JobRunner(this.#jobStore, options.jobName, options.jobInterval, {
timezone: this.#timezone,
requiredJobNames: options.requiredJobNames,
Expand Down
16 changes: 16 additions & 0 deletions src/job-lock/base.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
import type { Types } from "mongoose";
import type { Source } from "..";

/**
* @public
*/
export type JobLockId<S extends Source> = S extends Source.Mongodb
? Types.ObjectId
: S extends Source.Redis
? string
: S extends Source.Mysql
? string
: S extends Source.Postgres
? string
: never;

/**
* @public
*/
Expand Down
2 changes: 1 addition & 1 deletion src/job-lock/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* @public
*/
export type { default as BaseJobLock } from "./base";
export type { default as BaseJobLock, JobLockId } from "./base";

/**
* @public
Expand Down
20 changes: 11 additions & 9 deletions src/job-runner.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { Duration } from "date-fns";
import { differenceInMilliseconds } from "date-fns";
import type { Source } from ".";
import { CronyxError } from "./error";
import Job from "./job";
import type BaseJobLock from "./job-lock/base";
import type { JobLockId } from "./job-lock/base";
import MockJobLock from "./job-lock/mock";
import type BaseJobStore from "./job-store/base";
import { addInterval, getLastDeactivatedJobIntervalEndedAt, log, subInterval } from "./util";
Expand All @@ -19,8 +21,8 @@ type JobRunnerOptions = {
/**
* @internal
*/
export default class JobRunner<T> {
#jobStore: BaseJobStore<T>;
export default class JobRunner<S extends Source> {
#jobStore: BaseJobStore<S>;
#timezone: string;
#jobName: string;
#jobInterval: Duration | string | number;
Expand All @@ -31,7 +33,7 @@ export default class JobRunner<T> {
#jobIntervalStartedAt: Date | undefined;

constructor(
jobStore: BaseJobStore<T>,
jobStore: BaseJobStore<S>,
jobName: string,
jobInterval: Duration | string | number,
options?: JobRunnerOptions,
Expand All @@ -47,7 +49,7 @@ export default class JobRunner<T> {
this.#jobIntervalStartedAt = options?.jobIntervalStartedAt;
}

async requestJobExec(task: (job: Job<T>) => Promise<void>): Promise<void> {
async requestJobExec(task: (job: Job<S>) => Promise<void>): Promise<void> {
const job = await this.requestJobStart();
if (!job) return;

Expand All @@ -60,7 +62,7 @@ export default class JobRunner<T> {
}
}

async requestJobStart(): Promise<Job<T> | null> {
async requestJobStart(): Promise<Job<S> | null> {
if (this.#jobIntervalStartedAt) {
if (!this.#noLock) throw new CronyxError("Should enable `noLock` when `jobIntervalStartedAt` is passed");

Expand Down Expand Up @@ -97,7 +99,7 @@ export default class JobRunner<T> {
return new Job(this.#jobStore, jobLock);
}

let jobLock: BaseJobLock<T> | null;
let jobLock: BaseJobLock<JobLockId<S>> | null;
try {
jobLock = await this.#jobStore.activateJobLock(this.#jobName, jobInterval, jobIntervalEndedAt, retryIntervalStartedAt);
} catch (error) {
Expand All @@ -112,8 +114,8 @@ export default class JobRunner<T> {
return new Job(this.#jobStore, jobLock);
}

async #ensureLastJobLock(requestedAt: Date): Promise<BaseJobLock<T>> {
let lastJobLock: BaseJobLock<T> | null;
async #ensureLastJobLock(requestedAt: Date): Promise<BaseJobLock<JobLockId<S>>> {
let lastJobLock: BaseJobLock<JobLockId<S>> | null;
try {
lastJobLock = await this.#jobStore.fetchLastJobLock(this.#jobName);
} catch (error) {
Expand All @@ -133,7 +135,7 @@ export default class JobRunner<T> {

async #areRequiredJobsFulfilled(jobIntervalEndedAt: Date): Promise<boolean> {
for (const requiredJobName of this.#requiredJobNames) {
let requiredJobLock: BaseJobLock<T> | null;
let requiredJobLock: BaseJobLock<JobLockId<S>> | null;

try {
requiredJobLock = await this.#jobStore.fetchLastJobLock(requiredJobName);
Expand Down
12 changes: 7 additions & 5 deletions src/job-store/base.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import type { Source } from "..";
import type BaseJobLock from "../job-lock/base";
import type { JobLockId } from "../job-lock/base";

/**
* @public
*/
export default interface BaseJobStore<T> {
export default interface BaseJobStore<S extends Source> {
close(): Promise<void>;
fetchLastJobLock(jobName: string): Promise<BaseJobLock<T> | null>;
fetchLastJobLock(jobName: string): Promise<BaseJobLock<JobLockId<S>> | null>;
activateJobLock(
jobName: string,
jobInterval: number,
jobIntervalEndedAt: Date,
retryIntervalStartedAt: Date,
): Promise<BaseJobLock<T> | null>;
deactivateJobLock(jobName: string, jobId: T): Promise<BaseJobLock<T>>;
removeJobLock(jobName: string, jobId: T): Promise<void>;
): Promise<BaseJobLock<JobLockId<S>> | null>;
deactivateJobLock(jobName: string, jobId: JobLockId<S>): Promise<BaseJobLock<JobLockId<S>>>;
removeJobLock(jobName: string, jobId: JobLockId<S>): Promise<void>;
}
3 changes: 2 additions & 1 deletion src/job-store/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ import { MongoError } from "mongodb";
import type { MongoClientOptions } from "mongodb";
import { createConnection } from "mongoose";
import type { Connection, Model, Types } from "mongoose";
import type { Source } from "..";
import type MongodbJobLock from "../job-lock/mongodb";
import { mongodbJobLockSchema } from "../job-lock/mongodb";
import type BaseJobStore from "./base";

/**
* @public
*/
export default class MongodbJobStore implements BaseJobStore<Types.ObjectId> {
export default class MongodbJobStore implements BaseJobStore<Source.Mongodb> {
#conn: Connection;
#model: Model<MongodbJobLock>;

Expand Down
3 changes: 2 additions & 1 deletion src/job-store/redis.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { isEqual } from "date-fns";
import { createClient, RedisClientOptions, WatchError } from "redis";
import type { Source } from "..";
import RedisJobLock from "../job-lock/redis";
import type BaseJobStore from "./base";

Expand All @@ -11,7 +12,7 @@ type RedisClientType = ReturnType<typeof createClient>;
/**
* @public
*/
export default class RedisJobStore implements BaseJobStore<string> {
export default class RedisJobStore implements BaseJobStore<Source.Redis> {
#client: RedisClientType;

/**
Expand Down
3 changes: 2 additions & 1 deletion src/job-store/typeorm/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { Repository } from "typeorm";
import { DataSource } from "typeorm";
import type { Source } from "../..";
import { TypeormJobLockEntity } from "../../job-lock/typeorm";
import type TypeormJobLock from "../../job-lock/typeorm";
import { hasErrorCode } from "../../util";
Expand All @@ -8,7 +9,7 @@ import type BaseJobStore from "../base";
/**
* @public
*/
export default abstract class TypeormJobStore implements BaseJobStore<string> {
export default abstract class TypeormJobStore implements BaseJobStore<Source.Mysql | Source.Postgres> {
protected abstract uniqueConstraintErrorCode: string;
#dataSource: DataSource;
#repository: Repository<TypeormJobLockEntity>;
Expand Down
4 changes: 3 additions & 1 deletion src/job-store/typeorm/mysql.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { DataSource } from "typeorm";
import type { AuroraMysqlConnectionOptions } from "typeorm/driver/aurora-mysql/AuroraMysqlConnectionOptions.js";
import type { MysqlConnectionOptions } from "typeorm/driver/mysql/MysqlConnectionOptions.js";
import type { BaseJobStore } from "..";
import type { Source } from "../..";
import { CronyxError } from "../../error";
import { TypeormJobLockEntity } from "../../job-lock/typeorm";
import TypeormJobStore from "./";

/**
* @public
*/
export default class MysqlJobStore extends TypeormJobStore {
export default class MysqlJobStore extends TypeormJobStore implements BaseJobStore<Source.Mysql> {
protected uniqueConstraintErrorCode: string = "ER_DUP_ENTRY";

static async connect(options: MysqlConnectionOptions | AuroraMysqlConnectionOptions): Promise<TypeormJobStore> {
Expand Down
4 changes: 3 additions & 1 deletion src/job-store/typeorm/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { DataSource } from "typeorm";
import type { AuroraPostgresConnectionOptions } from "typeorm/driver/aurora-postgres/AuroraPostgresConnectionOptions.js";
import type { PostgresConnectionOptions } from "typeorm/driver/postgres/PostgresConnectionOptions.js";
import type { BaseJobStore } from "..";
import type { Source } from "../..";
import { CronyxError } from "../../error";
import { TypeormJobLockEntity } from "../../job-lock/typeorm";
import TypeormJobStore from "./";

/**
* @public
*/
export default class PostgresJobStore extends TypeormJobStore {
export default class PostgresJobStore extends TypeormJobStore implements BaseJobStore<Source.Postgres> {
protected uniqueConstraintErrorCode: string = "23505";

static async connect(options: PostgresConnectionOptions | AuroraPostgresConnectionOptions): Promise<TypeormJobStore> {
Expand Down
12 changes: 7 additions & 5 deletions src/job.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import { subMilliseconds } from "date-fns";
import type { Source } from ".";
import { CronyxError } from "./error";
import type BaseJobLock from "./job-lock/base";
import type { JobLockId } from "./job-lock/base";
import type BaseJobStore from "./job-store/base";
import { log } from "./util";

/**
* @public
*/
export default class Job<T> {
export default class Job<S extends Source> {
#jobName: string;
#jobStore: BaseJobStore<T>;
#jobLock: BaseJobLock<T> | null;
#jobStore: BaseJobStore<S>;
#jobLock: BaseJobLock<JobLockId<S>> | null;
#pendingPromise: Promise<void> | null = null;

/**
* @internal
*/
constructor(jobStore: BaseJobStore<T>, jobLock: BaseJobLock<T>) {
constructor(jobStore: BaseJobStore<S>, jobLock: BaseJobLock<JobLockId<S>>) {
this.#jobName = jobLock.jobName;
this.#jobStore = jobStore;
this.#jobLock = jobLock;
}

get id(): T | null {
get id(): JobLockId<S> | null {
if (!this.#jobLock || !this.#jobLock.isActive) throw new CronyxError(`Job is not active for ${this.#jobName}`);

return this.#jobLock._id;
Expand Down
7 changes: 4 additions & 3 deletions test/integrations/shared.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { afterEach, beforeEach, expect, Mock, mock, setSystemTime, test } from "bun:test";
import { add, sub } from "date-fns";
import Cronyx from "../../src";
import type { Source } from "../../src";
import type BaseJobStore from "../../src/job-store/base";

export function testBehavesLikeCronyx<T>(getJobStore: () => BaseJobStore<T>) {
export function testBehavesLikeCronyx<S extends Source>(getJobStore: () => BaseJobStore<S>) {
const requestedAt = new Date("2023-02-03T15:00:00.000Z");
const firstJobIntervalStartedAt = sub(requestedAt, { days: 1 });
const firstJobIntervalEndedAt = requestedAt;
Expand All @@ -21,8 +22,8 @@ export function testBehavesLikeCronyx<T>(getJobStore: () => BaseJobStore<T>) {
jobInterval: "0 0 * * * *", // hourly
};

let jobStore: BaseJobStore<T>;
let cronyx: Cronyx<T>;
let jobStore: BaseJobStore<S>;
let cronyx: Cronyx<S>;
let jobTask: Mock<() => Promise<void>>;
let requiredJobTask: Mock<() => Promise<void>>;

Expand Down
3 changes: 2 additions & 1 deletion test/job-runner.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { afterEach, beforeEach, describe, expect, Mock, mock, spyOn, test } from "bun:test";
import { addMilliseconds, subMilliseconds } from "date-fns";
import type { Source } from "../src";
import Job from "../src/job";
import RedisJobLock from "../src/job-lock/redis";
import JobRunner from "../src/job-runner";
Expand Down Expand Up @@ -28,7 +29,7 @@ describe("JobRunner", () => {
const unfulfilledJobLock = { ...fulfilledJobLock, jobName: "unfilfilledJobName", jobIntervalEndedAt };
const fulfilledActiveJobLock = { ...fulfilledJobLock, jobName: "fulfilledActiveJobName", isActive: true };

let jobStore: BaseJobStore<string>;
let jobStore: BaseJobStore<Source>;
let failureTask: Mock<() => Promise<void>>;
let successTask: Mock<() => Promise<void>>;
let finish: Mock<() => Promise<void>>;
Expand Down
8 changes: 5 additions & 3 deletions test/job-store/shared.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { afterEach, beforeEach, describe, expect, setSystemTime, test } from "bun:test";
import { addHours, subMilliseconds } from "date-fns";
import type { Source } from "../../src";
import type BaseJobLock from "../../src/job-lock/base";
import type { JobLockId } from "../../src/job-lock/base";
import type BaseJobStore from "../../src/job-store/base";

export function testBehavesLikeJobStore<T>(getJobStore: () => BaseJobStore<T>) {
export function testBehavesLikeJobStore<S extends Source>(getJobStore: () => BaseJobStore<S>) {
const jobName = "jobName";
const jobIntervalStartedAt = new Date();
const jobIntervalEndedAt = addHours(jobIntervalStartedAt, 1);
const jobInterval = 1000 * 60 * 60; // 1 hour
const retryInterval = 1000 * 60 * 60 * 2; // 2 hours

let jobStore: BaseJobStore<T>;
let jobStore: BaseJobStore<S>;
let retryIntervalStartedAt: Date;

beforeEach(() => {
Expand All @@ -29,7 +31,7 @@ export function testBehavesLikeJobStore<T>(getJobStore: () => BaseJobStore<T>) {
});

describe("after activating a job lock", () => {
let jobLock: BaseJobLock<T>;
let jobLock: BaseJobLock<JobLockId<S>>;

beforeEach(async () => {
jobLock = (await jobStore.activateJobLock(jobName, jobInterval, jobIntervalEndedAt, retryIntervalStartedAt))!;
Expand Down
5 changes: 3 additions & 2 deletions test/job.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { beforeEach, describe, expect, mock, test } from "bun:test";
import { addMilliseconds, subMilliseconds } from "date-fns";
import type { Source } from "../src";
import Job from "../src/job";
import RedisJobLock from "../src/job-lock/redis";
import type BaseJobStore from "../src/job-store/base";
Expand All @@ -19,8 +20,8 @@ describe.each([[false], [true]])("Job", (noLock) => {
const activatedJobLock = { ...lastJobLock, jobIntervalEndedAt, isActive: true, _id: noLock ? null : lastJobLock._id };
const deactivatedJobLock = { ...activatedJobLock, isActive: false, updatedAt: addMilliseconds(now, 1) };

let jobStore: BaseJobStore<string>;
let job: Job<string>;
let jobStore: BaseJobStore<Source>;
let job: Job<Source>;

beforeEach(() => {
jobStore = {
Expand Down

0 comments on commit b31a782

Please sign in to comment.