From 8796bbed3faf159be0e640d6a97e6890bbdd031e Mon Sep 17 00:00:00 2001 From: Abenet Tamiru Date: Mon, 4 Dec 2023 09:23:07 +0100 Subject: [PATCH] feat(bullmq): allow settings custom job ids from within job class --- docs/tutorials/bullmq.md | 28 +++++ packages/third-parties/bullmq/.npmignore | 20 ++++ packages/third-parties/bullmq/jest.config.js | 2 +- .../bullmq/src/contracts/JobMethods.ts | 2 + .../src/dispatchers/JobDispatcher.spec.ts | 112 +++++++++++------- .../bullmq/src/dispatchers/JobDispatcher.ts | 29 ++++- 6 files changed, 140 insertions(+), 53 deletions(-) create mode 100644 packages/third-parties/bullmq/.npmignore diff --git a/docs/tutorials/bullmq.md b/docs/tutorials/bullmq.md index fbe0277966f..2552630fe73 100644 --- a/docs/tutorials/bullmq.md +++ b/docs/tutorials/bullmq.md @@ -99,6 +99,34 @@ class OtherExampleJob implements JobMethods { } ``` +## Defining a custom job id within a job + +The `JobMethods` interface has an optional method `jobId`, which when implemented instructs the dispatcher to use it when defining the id for the job. + +The method will accept the payload of the job and because it is defined within the job class will also have access to all injected services. + +```ts +import {JobController, JobMethods} from "@tsed/bullmq"; + +@JobController("example-with-custom-id") +class ExampleJobWithCustomId implements JobMethods { + public handle(payload: {num: number}) { + console.info("look at my awesome number: ", payload.num); + } + + public jobId(payload: {num: number}): string { + return `very realistic job id #${payload.num}`; + } +} +``` + +Keep in mind tho, that when defining a job using the dispatcher when dispatching the job, the id defined using the dispatcher will take precedence! + +```ts +this.dispatcher(ExampleJobWithCustomId, { num: 1 }); // id: 'very realistic job id #1' +this.dispatcher(ExampleJobWithCustomId, { num: 2 }, { jobId: 'I do my own thing!' }) // id: 'I do my own thing!' +``` + ## Defining a repeating job Jobs that should be run regularly on a schedule can also easily defined using the `@JobController` decorator. diff --git a/packages/third-parties/bullmq/.npmignore b/packages/third-parties/bullmq/.npmignore new file mode 100644 index 00000000000..fdefbeb377c --- /dev/null +++ b/packages/third-parties/bullmq/.npmignore @@ -0,0 +1,20 @@ +test +src/**/*.js +src/**/*.js.map +src/**/*.ts +src/**/*.ts.map +__mock__ +*.spec.js +*.spec.ts +coverage +tsconfig.json +tsconfig.esm.json +tsconfig.compile.json +tslint.json +.eslintignore +.eslintrc.js +jest.config.js +.travis.yml +.gitignore +.idea +packages diff --git a/packages/third-parties/bullmq/jest.config.js b/packages/third-parties/bullmq/jest.config.js index 4daeae05f9e..3a67139191f 100644 --- a/packages/third-parties/bullmq/jest.config.js +++ b/packages/third-parties/bullmq/jest.config.js @@ -2,7 +2,7 @@ module.exports = { ...require("@tsed/jest-config"), coverageThreshold: { global: { - branches: 89.58, + branches: 90.74, functions: 100, lines: 100, statements: 100 diff --git a/packages/third-parties/bullmq/src/contracts/JobMethods.ts b/packages/third-parties/bullmq/src/contracts/JobMethods.ts index 96be7c8233b..2eeec56d9c6 100644 --- a/packages/third-parties/bullmq/src/contracts/JobMethods.ts +++ b/packages/third-parties/bullmq/src/contracts/JobMethods.ts @@ -2,4 +2,6 @@ import {Job} from "bullmq"; export interface JobMethods { handle(payload: DataType, job: Job): ReturnType | Promise; + + jobId?(payload: DataType): string | Promise; } diff --git a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts index 03e4de04c49..d8f18e17e4e 100644 --- a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts +++ b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.spec.ts @@ -3,7 +3,16 @@ import {JobDispatcher} from "./JobDispatcher"; import {JobMethods} from "../contracts"; import {JobController} from "../decorators"; import {Queue} from "bullmq"; -import {instance, mock, verify, when, objectContaining} from "ts-mockito"; +import {instance, mock, verify, when, objectContaining, capture, anything, spy} from "ts-mockito"; + +@JobController("example-job-with-custom-id-from-job-methods") +class ExampleJobWithCustomJobIdFromJobMethods implements JobMethods { + handle(payload: string) {} + + jobId(payload: string): string { + return payload.toUpperCase(); + } +} @JobController("example-job", "default", { backoff: 69 @@ -18,25 +27,27 @@ class NotConfiguredQueueTestJob implements JobMethods { } describe("JobDispatcher", () => { - it("should throw an exception when a queue is not configured", () => { - const injector = mock(InjectorService); - when(injector.get("bullmq.queue.not-configured")).thenReturn(); + let injector: InjectorService; + let queue: Queue; + let dispatcher: JobDispatcher; + beforeEach(() => { + injector = mock(InjectorService); + queue = mock(Queue); + when(queue.name).thenReturn("default"); + when(injector.get("bullmq.queue.default")).thenReturn(instance(queue)); + when(injector.get("bullmq.job.example-job")).thenReturn(new ExampleTestJob()); + + dispatcher = new JobDispatcher(instance(injector)); + }); - const dispatcher = new JobDispatcher(instance(injector)); + it("should throw an exception when a queue is not configured", async () => { + when(injector.get("bullmq.queue.not-configured")).thenReturn(undefined); - expect(dispatcher.dispatch(NotConfiguredQueueTestJob)).rejects.toThrow(new Error("Queue(not-configured) not defined")); + await expect(dispatcher.dispatch(NotConfiguredQueueTestJob)).rejects.toThrow(new Error("Queue(not-configured) not defined")); verify(injector.get("bullmq.queue.not-configured")).once(); }); it("should dispatch job as type", async () => { - const injector = mock(InjectorService); - const queue = mock(Queue); - when(queue.name).thenReturn("default"); - - when(injector.get("bullmq.queue.default")).thenReturn(instance(queue)); - - const dispatcher = new JobDispatcher(instance(injector)); - await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}); verify( @@ -51,13 +62,9 @@ describe("JobDispatcher", () => { }); it("should dispatch job as options", async () => { - const injector = mock(InjectorService); - const queue = mock(Queue); - when(queue.name).thenReturn("special"); - - when(injector.get("bullmq.queue.special")).thenReturn(instance(queue)); - - const dispatcher = new JobDispatcher(instance(injector)); + const specialQueue = mock(Queue); + when(specialQueue.name).thenReturn("special"); + when(injector.get("bullmq.queue.special")).thenReturn(instance(specialQueue)); await dispatcher.dispatch( { @@ -67,32 +74,16 @@ describe("JobDispatcher", () => { {msg: "hello test"} ); - verify(queue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once(); + verify(specialQueue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once(); }); it("should dispatch job as string", async () => { - const injector = mock(InjectorService); - const queue = mock(Queue); - when(queue.name).thenReturn("default"); - - when(injector.get("bullmq.queue.default")).thenReturn(instance(queue)); - - const dispatcher = new JobDispatcher(instance(injector)); - await dispatcher.dispatch("some-name", {msg: "hello test"}); verify(queue.add("some-name", objectContaining({msg: "hello test"}), objectContaining({}))).once(); }); it("should overwrite job options defined by the job", async () => { - const injector = mock(InjectorService); - const queue = mock(Queue); - when(queue.name).thenReturn("default"); - - when(injector.get("bullmq.queue.default")).thenReturn(instance(queue)); - - const dispatcher = new JobDispatcher(instance(injector)); - await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}, {backoff: 42, jobId: "ffeeaa"}); verify( @@ -108,14 +99,6 @@ describe("JobDispatcher", () => { }); it("should keep existing options and add new ones", async () => { - const injector = mock(InjectorService); - const queue = mock(Queue); - when(queue.name).thenReturn("default"); - - when(injector.get("bullmq.queue.default")).thenReturn(instance(queue)); - - const dispatcher = new JobDispatcher(instance(injector)); - await dispatcher.dispatch(ExampleTestJob, {msg: "hello test"}, {jobId: "ffeeaa"}); verify( @@ -129,4 +112,41 @@ describe("JobDispatcher", () => { ) ).once(); }); + + describe("custom jobId", () => { + let job: ExampleJobWithCustomJobIdFromJobMethods; + beforeEach(() => { + job = new ExampleJobWithCustomJobIdFromJobMethods(); + when(injector.get("bullmq.job.example-job-with-custom-id-from-job-methods")).thenReturn(job); + }); + + it("should allow setting the job id from within the job", async () => { + await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world"); + + verify(queue.add("example-job-with-custom-id-from-job-methods", "hello world", anything())).once(); + + const [, , opts] = capture(queue.add).last(); + expect(opts).toMatchObject({ + jobId: "HELLO WORLD" + }); + }); + + it("should pass the payload to the jobId method", async () => { + const spyJob = spy(job); + await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world"); + + verify(spyJob.jobId("hello world")).once(); + }); + + it("should choose the jobId provided to the dispatcher even when the method is implemented", async () => { + await dispatcher.dispatch(ExampleJobWithCustomJobIdFromJobMethods, "hello world", { + jobId: "I don't think so" + }); + + const [, , opts] = capture(queue.add).last(); + expect(opts).toMatchObject({ + jobId: "I don't think so" + }); + }); + }); }); diff --git a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts index 948ac0df21a..9eab0bd3499 100644 --- a/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts +++ b/packages/third-parties/bullmq/src/dispatchers/JobDispatcher.ts @@ -14,10 +14,10 @@ export class JobDispatcher { payload?: Parameters[0], options?: JobsOptions ): Promise; - public async dispatch(job: JobDispatcherOptions, payload?: T, options?: JobsOptions): Promise; - public async dispatch(job: string, payload?: T, options?: JobsOptions): Promise; - public async dispatch(job: Type | JobDispatcherOptions | string, payload: any, options: JobsOptions = {}): Promise { - const {queueName, jobName, defaultJobOptions} = this.resolveDispatchArgs(job); + public async dispatch

(job: JobDispatcherOptions, payload?: P, options?: JobsOptions): Promise; + public async dispatch

(job: string, payload?: P, options?: JobsOptions): Promise; + public async dispatch(job: Type | JobDispatcherOptions | string, payload: unknown, options: JobsOptions = {}): Promise { + const {queueName, jobName, defaultJobOptions} = await this.resolveDispatchArgs(job, payload); const queue = this.injector.get(`bullmq.queue.${queueName}`); @@ -31,7 +31,7 @@ export class JobDispatcher { }); } - private resolveDispatchArgs(job: Type | JobDispatcherOptions | string) { + private async resolveDispatchArgs(job: Type | JobDispatcherOptions | string, payload: unknown) { let queueName: string; let jobName: string; let defaultJobOptions: JobsOptions | undefined; @@ -41,7 +41,7 @@ export class JobDispatcher { const store = Store.from(job).get("bullmq"); queueName = store.queue; jobName = store.name; - defaultJobOptions = store.opts; + defaultJobOptions = await this.retrieveJobOptionsFromClassBasedJob(store, payload); } else if (typeof job === "object") { // job is passed as JobDispatcherOptions queueName = job.queue; @@ -58,4 +58,21 @@ export class JobDispatcher { defaultJobOptions }; } + + private async retrieveJobOptionsFromClassBasedJob(store: JobStore, payload: unknown): Promise { + const job = this.injector.get(`bullmq.job.${store.name}`); + if (!job) { + return store.opts; + } + + const jobId = await job.jobId?.(payload); + if (jobId === undefined) { + return store.opts; + } + + return { + ...store.opts, + jobId + }; + } }