Skip to content

Commit

Permalink
feat(bullmq): allow settings custom job ids from within job class
Browse files Browse the repository at this point in the history
  • Loading branch information
abenerd authored and Romakita committed Dec 4, 2023
1 parent 2314c8c commit 8796bbe
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 53 deletions.
28 changes: 28 additions & 0 deletions docs/tutorials/bullmq.md
Expand Up @@ -99,6 +99,34 @@ class OtherExampleJob implements JobMethods {
}
```

## Defining a custom job id within a job

The `JobMethods` interface has an optional method `jobId`, which when implemented instructs the dispatcher to use it when defining the id for the job.

The method will accept the payload of the job and because it is defined within the job class will also have access to all injected services.

```ts
import {JobController, JobMethods} from "@tsed/bullmq";

@JobController("example-with-custom-id")
class ExampleJobWithCustomId implements JobMethods {
public handle(payload: {num: number}) {
console.info("look at my awesome number: ", payload.num);
}

public jobId(payload: {num: number}): string {
return `very realistic job id #${payload.num}`;
}
}
```

Keep in mind tho, that when defining a job using the dispatcher when dispatching the job, the id defined using the dispatcher will take precedence!

```ts
this.dispatcher(ExampleJobWithCustomId, { num: 1 }); // id: 'very realistic job id #1'
this.dispatcher(ExampleJobWithCustomId, { num: 2 }, { jobId: 'I do my own thing!' }) // id: 'I do my own thing!'
```

## Defining a repeating job

Jobs that should be run regularly on a schedule can also easily defined using the `@JobController` decorator.
Expand Down
20 changes: 20 additions & 0 deletions packages/third-parties/bullmq/.npmignore
@@ -0,0 +1,20 @@
test
src/**/*.js
src/**/*.js.map
src/**/*.ts
src/**/*.ts.map
__mock__
*.spec.js
*.spec.ts
coverage
tsconfig.json
tsconfig.esm.json
tsconfig.compile.json
tslint.json
.eslintignore
.eslintrc.js
jest.config.js
.travis.yml
.gitignore
.idea
packages
2 changes: 1 addition & 1 deletion packages/third-parties/bullmq/jest.config.js
Expand Up @@ -2,7 +2,7 @@ module.exports = {
...require("@tsed/jest-config"),
coverageThreshold: {
global: {
branches: 89.58,
branches: 90.74,
functions: 100,
lines: 100,
statements: 100
Expand Down
2 changes: 2 additions & 0 deletions packages/third-parties/bullmq/src/contracts/JobMethods.ts
Expand Up @@ -2,4 +2,6 @@ import {Job} from "bullmq";

export interface JobMethods<DataType = unknown, ReturnType = unknown> {
handle(payload: DataType, job: Job<DataType, ReturnType>): ReturnType | Promise<ReturnType>;

jobId?(payload: DataType): string | Promise<string>;
}
112 changes: 66 additions & 46 deletions packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts
Expand Up @@ -3,7 +3,16 @@ import {JobDispatcher} from "./JobDispatcher";
import {JobMethods} from "../contracts";
import {JobController} from "../decorators";
import {Queue} from "bullmq";
import {instance, mock, verify, when, objectContaining} from "ts-mockito";
import {instance, mock, verify, when, objectContaining, capture, anything, spy} from "ts-mockito";

@JobController("example-job-with-custom-id-from-job-methods")
class ExampleJobWithCustomJobIdFromJobMethods implements JobMethods {
handle(payload: string) {}

jobId(payload: string): string {
return payload.toUpperCase();
}
}

@JobController("example-job", "default", {
backoff: 69
Expand All @@ -18,25 +27,27 @@ class NotConfiguredQueueTestJob implements JobMethods {
}

describe("JobDispatcher", () => {
it("should throw an exception when a queue is not configured", () => {
const injector = mock(InjectorService);
when(injector.get("bullmq.queue.not-configured")).thenReturn();
let injector: InjectorService;
let queue: Queue;
let dispatcher: JobDispatcher;
beforeEach(() => {
injector = mock(InjectorService);
queue = mock(Queue);
when(queue.name).thenReturn("default");
when(injector.get("bullmq.queue.default")).thenReturn(instance(queue));
when(injector.get("bullmq.job.example-job")).thenReturn(new ExampleTestJob());

dispatcher = new JobDispatcher(instance(injector));
});

const dispatcher = new JobDispatcher(instance(injector));
it("should throw an exception when a queue is not configured", async () => {
when(injector.get("bullmq.queue.not-configured")).thenReturn(undefined);

expect(dispatcher.dispatch(NotConfiguredQueueTestJob)).rejects.toThrow(new Error("Queue(not-configured) not defined"));
await expect(dispatcher.dispatch(NotConfiguredQueueTestJob)).rejects.toThrow(new Error("Queue(not-configured) not defined"));
verify(injector.get("bullmq.queue.not-configured")).once();
});

it("should dispatch job as type", async () => {
const injector = mock(InjectorService);
const queue = mock(Queue);
when(queue.name).thenReturn("default");

when(injector.get("bullmq.queue.default")).thenReturn(instance(queue));

const dispatcher = new JobDispatcher(instance(injector));

await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"});

verify(
Expand All @@ -51,13 +62,9 @@ describe("JobDispatcher", () => {
});

it("should dispatch job as options", async () => {
const injector = mock(InjectorService);
const queue = mock(Queue);
when(queue.name).thenReturn("special");

when(injector.get("bullmq.queue.special")).thenReturn(instance(queue));

const dispatcher = new JobDispatcher(instance(injector));
const specialQueue = mock(Queue);
when(specialQueue.name).thenReturn("special");
when(injector.get("bullmq.queue.special")).thenReturn(instance(specialQueue));

await dispatcher.dispatch(
{
Expand All @@ -67,32 +74,16 @@ describe("JobDispatcher", () => {
{msg: "hello test"}
);

verify(queue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once();
verify(specialQueue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once();
});

it("should dispatch job as string", async () => {
const injector = mock(InjectorService);
const queue = mock(Queue);
when(queue.name).thenReturn("default");

when(injector.get("bullmq.queue.default")).thenReturn(instance(queue));

const dispatcher = new JobDispatcher(instance(injector));

await dispatcher.dispatch("some-name", {msg: "hello test"});

verify(queue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once();
});

it("should overwrite job options defined by the job", async () => {
const injector = mock(InjectorService);
const queue = mock(Queue);
when(queue.name).thenReturn("default");

when(injector.get("bullmq.queue.default")).thenReturn(instance(queue));

const dispatcher = new JobDispatcher(instance(injector));

await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}, {backoff: 42, jobId: "ffeeaa"});

verify(
Expand All @@ -108,14 +99,6 @@ describe("JobDispatcher", () => {
});

it("should keep existing options and add new ones", async () => {
const injector = mock(InjectorService);
const queue = mock(Queue);
when(queue.name).thenReturn("default");

when(injector.get("bullmq.queue.default")).thenReturn(instance(queue));

const dispatcher = new JobDispatcher(instance(injector));

await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}, {jobId: "ffeeaa"});

verify(
Expand All @@ -129,4 +112,41 @@ describe("JobDispatcher", () => {
)
).once();
});

describe("custom jobId", () => {
let job: ExampleJobWithCustomJobIdFromJobMethods;
beforeEach(() => {
job = new ExampleJobWithCustomJobIdFromJobMethods();
when(injector.get("bullmq.job.example-job-with-custom-id-from-job-methods")).thenReturn(job);
});

it("should allow setting the job id from within the job", async () => {
await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world");

verify(queue.add("example-job-with-custom-id-from-job-methods", "hello world", anything())).once();

const [, , opts] = capture(queue.add).last();
expect(opts).toMatchObject({
jobId: "HELLO WORLD"
});
});

it("should pass the payload to the jobId method", async () => {
const spyJob = spy(job);
await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world");

verify(spyJob.jobId("hello world")).once();
});

it("should choose the jobId provided to the dispatcher even when the method is implemented", async () => {
await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world", {
jobId: "I don't think so"
});

const [, , opts] = capture(queue.add).last();
expect(opts).toMatchObject({
jobId: "I don't think so"
});
});
});
});
29 changes: 23 additions & 6 deletions packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts
Expand Up @@ -14,10 +14,10 @@ export class JobDispatcher {
payload?: Parameters<T["handle"]>[0],
options?: JobsOptions
): Promise<BullMQJob>;
public async dispatch<T = any>(job: JobDispatcherOptions, payload?: T, options?: JobsOptions): Promise<BullMQJob>;
public async dispatch<T = any>(job: string, payload?: T, options?: JobsOptions): Promise<BullMQJob>;
public async dispatch(job: Type | JobDispatcherOptions | string, payload: any, options: JobsOptions = {}): Promise<BullMQJob> {
const {queueName, jobName, defaultJobOptions} = this.resolveDispatchArgs(job);
public async dispatch<P = unknown>(job: JobDispatcherOptions, payload?: P, options?: JobsOptions): Promise<BullMQJob>;
public async dispatch<P = unknown>(job: string, payload?: P, options?: JobsOptions): Promise<BullMQJob>;
public async dispatch(job: Type | JobDispatcherOptions | string, payload: unknown, options: JobsOptions = {}): Promise<BullMQJob> {
const {queueName, jobName, defaultJobOptions} = await this.resolveDispatchArgs(job, payload);

const queue = this.injector.get<Queue>(`bullmq.queue.${queueName}`);

Expand All @@ -31,7 +31,7 @@ export class JobDispatcher {
});
}

private resolveDispatchArgs(job: Type | JobDispatcherOptions | string) {
private async resolveDispatchArgs(job: Type | JobDispatcherOptions | string, payload: unknown) {
let queueName: string;
let jobName: string;
let defaultJobOptions: JobsOptions | undefined;
Expand All @@ -41,7 +41,7 @@ export class JobDispatcher {
const store = Store.from(job).get<JobStore>("bullmq");
queueName = store.queue;
jobName = store.name;
defaultJobOptions = store.opts;
defaultJobOptions = await this.retrieveJobOptionsFromClassBasedJob(store, payload);
} else if (typeof job === "object") {
// job is passed as JobDispatcherOptions
queueName = job.queue;
Expand All @@ -58,4 +58,21 @@ export class JobDispatcher {
defaultJobOptions
};
}

private async retrieveJobOptionsFromClassBasedJob(store: JobStore, payload: unknown): Promise<JobsOptions> {
const job = this.injector.get<JobMethods>(`bullmq.job.${store.name}`);
if (!job) {
return store.opts;
}

const jobId = await job.jobId?.(payload);
if (jobId === undefined) {
return store.opts;
}

return {
...store.opts,
jobId
};
}
}

0 comments on commit 8796bbe

Please sign in to comment.