From e7099a323d2ebfc8ba44044074df0f0fe2bb44af Mon Sep 17 00:00:00 2001 From: EinfachHans Date: Sat, 18 Nov 2023 14:16:39 +0100 Subject: [PATCH] feat(bullmq): extend configuration --- docs/tutorials/bullmq.md | 49 ++- packages/third-parties/bullmq/README.md | 37 +- packages/third-parties/bullmq/jest.config.js | 2 +- .../bullmq/src/BullMQModule.spec.ts | 346 +++++++++++------- .../third-parties/bullmq/src/BullMQModule.ts | 31 +- .../third-parties/bullmq/src/config/config.ts | 46 ++- .../bullmq/src/contracts/JobMethods.ts | 6 +- 7 files changed, 351 insertions(+), 166 deletions(-) diff --git a/docs/tutorials/bullmq.md b/docs/tutorials/bullmq.md index 3ce2b68e55d..60eb42cd08a 100644 --- a/docs/tutorials/bullmq.md +++ b/docs/tutorials/bullmq.md @@ -12,7 +12,7 @@ meta: ## Feature -The [BullMQ](https://bullmq.io) Module for Ts.ED allows you to decorate a class using the `@Job` decorator and implement the `Job` interface provided by the module. +The [BullMQ](https://bullmq.io) Module for Ts.ED allows you to decorate a class using the `@JobController` decorator and implement the `JobMethods` interface provided by the module. Repeatable Jobs can also be defined using this decorator. For more information about BullMQ look at the documentation [here](https://docs.bullmq.io/); @@ -35,16 +35,32 @@ import "@tsed/bullmq"; // import bullmq ts.ed module @Configuration({ bullmq: { - queues: ["default", "special"]; + // Specify queue name's to create + queues: ["default", "special"], connection: { // redisio connection options - }; - // optional default job options - defaultJobOptions: {}; - disableWorker: false; - // optionally specify for which queues to start a worker for - // in case not all queues should be worked on - workerQueues: ["default"]; + }, + defaultQueueOptions: { + // Default queue options which are applied to every queue + // Can be extended/overridden by `queueOptions` + }, + queueOptions: { + special: { + // Specify additional queue options by queue name + } + }, + // Specify for which queues to start a worker for. + // Defaultly for every queue added in the `queues` parameter + workerQueues: ["default"], + defaultWorkerOptions: { + // Default worker options which are applied to every worker + // Can be extended/overridden by `workerOptions` + }, + workerOptions: { + special: { + // Specify additional worker options by queue name + } + } } }) export class Server {} @@ -52,7 +68,7 @@ export class Server {} ## Define a Job -A job is defined as a class decorated with the `@Job` decorator and implementing the `Job` interface of the `@tsed/bullmq` package +A job is defined as a class decorated with the `@JobController` decorator and implementing the `Job` interface of the `@tsed/bullmq` package ```ts import {JobController, JobMethods} from "@tsed/bullmq"; @@ -82,7 +98,8 @@ class OtherExampleJob implements JobMethods { ## Defining a repeating job -Jobs that should be run regularly on a schedule can also easily defined using the `@Job` decorator +Jobs that should be run regularly on a schedule can also easily defined using the `@JobController` decorator. +Doing so will automatically dispatch it without any data. ```ts import {JobController, JobMethods} from "@tsed/bullmq"; @@ -146,9 +163,13 @@ class MyService { constructor(private readonly dispatcher: JobDispatcher) {} public async doingSomething() { - await this.dispatcher.dispatch(ExampleJob, {msg: "this message is part of the payload for the job"}, { - delay: 600_000 // 10 minutes in milliseconds - }); + await this.dispatcher.dispatch( + ExampleJob, + {msg: "this message is part of the payload for the job"}, + { + delay: 600_000 // 10 minutes in milliseconds + } + ); console.info("I just dispatched a job!"); } diff --git a/packages/third-parties/bullmq/README.md b/packages/third-parties/bullmq/README.md index f254831d0c9..f411abc5459 100644 --- a/packages/third-parties/bullmq/README.md +++ b/packages/third-parties/bullmq/README.md @@ -33,7 +33,7 @@ A package of Ts.ED framework. See website: https://tsed.io ## Feature -The `@tsed/bullmq` package allows you to define jobs using the `@Job` decorator and the `JobMethods` interface and have them picked up by the `BullMQ` worker. +The `@tsed/bullmq` package allows you to define jobs using the `@JobController` decorator and the `JobMethods` interface and have them picked up by the `BullMQ` worker. ## Installation @@ -53,16 +53,32 @@ import "@tsed/bullmq"; // import bullmq ts.ed module @Configuration({ bullmq: { - queues: ["default", "special"]; + // Specify queue name's to create + queues: ["default", "special"], connection: { // redisio connection options - }; - // optional default job options - defaultJobOptions: {}; - disableWorker: false; - // optionally specify for which queues to start a worker for - // in case not all queues should be worked on - workerQueues: ["default"]; + }, + defaultQueueOptions: { + // Default queue options which are applied to every queue + // Can be extended/overridden by `queueOptions` + }, + queueOptions: { + special: { + // Specify additional queue options by queue name + } + }, + // Specify for which queues to start a worker for. + // Defaultly for every queue added in the `queues` parameter + workerQueues: ["default"], + defaultWorkerOptions: { + // Default worker options which are applied to every worker + // Can be extended/overridden by `workerOptions` + }, + workerOptions: { + special: { + // Specify additional worker options by queue name + } + } } }) export class Server {} @@ -100,7 +116,8 @@ class OtherExampleJob implements JobMethods { ## Defining a repeating job -Jobs that should be run regularly on a schedule can also easily defined using the `@AsJob` decorator +Jobs that should be run regularly on a schedule can also easily defined using the `@JobController` decorator. +Doing so will automatically dispatch it without any data ```ts import {JobController, JobMethods} from "@tsed/bullmq"; diff --git a/packages/third-parties/bullmq/jest.config.js b/packages/third-parties/bullmq/jest.config.js index 24dcca87ccd..c80fd1e6d21 100644 --- a/packages/third-parties/bullmq/jest.config.js +++ b/packages/third-parties/bullmq/jest.config.js @@ -2,7 +2,7 @@ module.exports = { ...require("@tsed/jest-config"), coverageThreshold: { global: { - branches: 82.85, + branches: 86.48, functions: 100, lines: 100, statements: 100 diff --git a/packages/third-parties/bullmq/src/BullMQModule.spec.ts b/packages/third-parties/bullmq/src/BullMQModule.spec.ts index d5c966d4641..791f4ba4eb0 100644 --- a/packages/third-parties/bullmq/src/BullMQModule.spec.ts +++ b/packages/third-parties/bullmq/src/BullMQModule.spec.ts @@ -1,7 +1,7 @@ -import {InjectorService, PlatformTest} from "@tsed/common"; +import {PlatformTest} from "@tsed/common"; import {catchAsyncError} from "@tsed/core"; import {Queue, Worker} from "bullmq"; -import {anyString, anything, instance, mock, verify, when} from "ts-mockito"; +import {anything, instance, mock, verify, when} from "ts-mockito"; import "./BullMQModule"; import {BullMQModule} from "./BullMQModule"; @@ -10,33 +10,26 @@ import {JobMethods} from "./contracts"; import {JobController} from "./decorators"; import {JobDispatcher} from "./dispatchers"; +const queueConstructorSpy = jest.fn(); +const workerConstructorSpy = jest.fn(); + jest.mock("bullmq", () => { return { Queue: class { - public name: string; - - constructor(name: string) { - this.name = name; + constructor(...args: any[]) { + queueConstructorSpy(...args); } close() {} }, Worker: class { - public name: string; - - constructor(name: string) { - this.name = name; + constructor(...args: any[]) { + workerConstructorSpy(...args); } close() {} } }; }); -const bullmq = { - queues: ["default", "foo", "bar"], - connection: {}, - workerQueues: ["default", "foo"] -} as BullMQConfig; - @JobController("cron", "default", { repeat: { pattern: "* * * * *" @@ -51,154 +44,255 @@ class RegularJob { handle() {} } -describe("module", () => { +describe("BullMQModule", () => { let dispatcher: JobDispatcher; - beforeEach(async () => { + + beforeEach(() => { dispatcher = mock(JobDispatcher); when(dispatcher.dispatch(CustomCronJob)).thenResolve(); - - await PlatformTest.create({ - bullmq, - imports: [ - { - token: JobDispatcher, - use: instance(dispatcher) - } - ] - }); }); - beforeEach(() => {}); + afterEach(PlatformTest.reset); - describe("cronjobs", () => { - it("should dispatch cron jobs", () => { - verify(dispatcher.dispatch(CustomCronJob)).once(); + describe("configuration", () => { + beforeEach(() => { + queueConstructorSpy.mockClear(); + workerConstructorSpy.mockClear(); }); - }); - - describe("queues", () => { - it("should get default", () => { - const instance = PlatformTest.get("bullmq.queue.default"); - expect(instance).toBeInstanceOf(Queue); - expect(instance.name).toBe("default"); - }); + describe("merges config correctly", () => { + beforeEach(async () => { + await PlatformTest.create({ + bullmq: { + queues: ["default", "special"], + connection: { + connectionName: "defaultConnectionName" + }, + defaultQueueOptions: { + defaultJobOptions: { + delay: 100 + }, + blockingConnection: true + }, + queueOptions: { + special: { + connection: { + connectionName: "specialConnectionName" + }, + defaultJobOptions: { + attempts: 9 + } + } + }, + defaultWorkerOptions: { + connection: { + connectTimeout: 123 + }, + concurrency: 50 + }, + workerOptions: { + special: { + concurrency: 1, + lockDuration: 2 + } + } + }, + imports: [ + { + token: JobDispatcher, + use: instance(dispatcher) + } + ] + }); + }); - it.each(bullmq.queues)("should register queue(%s)", (queue) => { - const instance = PlatformTest.get(`bullmq.queue.${queue}`); + it("queue", () => { + expect(queueConstructorSpy).toHaveBeenCalledTimes(2); + + expect(queueConstructorSpy).toHaveBeenNthCalledWith(1, "default", { + connection: { + connectionName: "defaultConnectionName" + }, + defaultJobOptions: { + delay: 100 + }, + blockingConnection: true + }); + + expect(queueConstructorSpy).toHaveBeenNthCalledWith(2, "special", { + connection: { + connectionName: "specialConnectionName" + }, + defaultJobOptions: { + attempts: 9, + delay: 100 + }, + blockingConnection: true + }); + }); - expect(instance).toBeInstanceOf(Queue); - expect(instance.name).toBe(queue); + it("worker", () => { + expect(workerConstructorSpy).toHaveBeenCalledTimes(2); + + expect(workerConstructorSpy).toHaveBeenNthCalledWith(1, "default", expect.any(Function), { + connection: { + connectTimeout: 123 + }, + concurrency: 50 + }); + + expect(workerConstructorSpy).toHaveBeenNthCalledWith(2, "special", expect.any(Function), { + connection: { + connectTimeout: 123 + }, + concurrency: 1, + lockDuration: 2 + }); + }); }); - it("should not allow direct injection of the queue", () => { - expect(PlatformTest.get(Queue)).not.toBeInstanceOf(Queue); + describe("without", () => { + it("skips initialization", async () => { + await PlatformTest.create({ + imports: [ + { + token: JobDispatcher, + use: instance(dispatcher) + } + ] + }); + + expect(queueConstructorSpy).not.toHaveBeenCalled(); + verify(dispatcher.dispatch(anything())).never(); + }); }); }); - describe("workers", () => { - it("should get default", () => { - const instance = PlatformTest.get("bullmq.worker.default"); + describe("functionality", () => { + const config = { + queues: ["default", "foo", "bar"], + connection: {}, + workerQueues: ["default", "foo"] + } as BullMQConfig; + + beforeEach(async () => { + await PlatformTest.create({ + bullmq: config, + imports: [ + { + token: JobDispatcher, + use: instance(dispatcher) + } + ] + }); + }); - expect(instance).toBeInstanceOf(Worker); - expect(instance.name).toBe("default"); + describe("cronjobs", () => { + it("should dispatch cron jobs automatically", () => { + verify(dispatcher.dispatch(CustomCronJob)).once(); + }); }); - it.each(bullmq.workerQueues)("should register worker(%s)", (queue) => { - const instance = PlatformTest.get(`bullmq.worker.${queue}`); + describe("queues", () => { + it("should get default", () => { + const instance = PlatformTest.get("bullmq.queue.default"); - expect(instance).toBeInstanceOf(Worker); - expect(instance.name).toBe(queue); - }); + expect(instance).toBeInstanceOf(Queue); + }); - it("should not register unspecified worker queue", () => { - expect(PlatformTest.get("bullmq.worker.bar")).toBeUndefined(); - }); + it.each(config.queues)("should register queue(%s)", (queue) => { + const instance = PlatformTest.get(`bullmq.queue.${queue}`); - it("should not allow direct injection of the worker", () => { - expect(PlatformTest.get(Worker)).not.toBeInstanceOf(Worker); + expect(instance).toBeInstanceOf(Queue); + }); + + it("should not allow direct injection of the queue", () => { + expect(PlatformTest.get(Queue)).not.toBeInstanceOf(Queue); + }); }); - it("should run worker and execute processor", async () => { - const bullMQModule = PlatformTest.get(BullMQModule); - const worker = PlatformTest.get("bullmq.job.default.regular"); - const job = { - name: "regular", - queueName: "default", - data: {test: "test"} - }; + describe("workers", () => { + it("should get default", () => { + const instance = PlatformTest.get("bullmq.worker.default"); - jest.spyOn(worker, "handle").mockResolvedValueOnce(undefined as never); + expect(instance).toBeInstanceOf(Worker); + }); - await (bullMQModule as any).onProcess(job); + it.each(config.workerQueues)("should register worker(%s)", (queue) => { + const instance = PlatformTest.get(`bullmq.worker.${queue}`); - expect(worker.handle).toHaveBeenCalledWith({test: "test"}, job); - }); + expect(instance).toBeInstanceOf(Worker); + }); + + it("should not register unspecified worker queue", () => { + expect(PlatformTest.get("bullmq.worker.bar")).toBeUndefined(); + }); - it("should log warning when the worker doesn't exists", async () => { - const bullMQModule = PlatformTest.get(BullMQModule); + it("should not allow direct injection of the worker", () => { + expect(PlatformTest.get(Worker)).not.toBeInstanceOf(Worker); + }); - jest.spyOn(PlatformTest.injector.logger, "warn"); + it("should run worker and execute processor", async () => { + const bullMQModule = PlatformTest.get(BullMQModule); + const worker = PlatformTest.get("bullmq.job.default.regular"); + const job = { + name: "regular", + queueName: "default", + data: {test: "test"} + }; - const job = { - name: "regular", - queueName: "toto", - data: {test: "test"} - }; + jest.spyOn(worker, "handle").mockResolvedValueOnce(undefined as never); - await (bullMQModule as any).onProcess(job); + await (bullMQModule as any).onProcess(job); - expect(PlatformTest.injector.logger.warn).toHaveBeenCalledWith({ - event: "BULLMQ_JOB_NOT_FOUND", - message: "Job regular toto not found" + expect(worker.handle).toHaveBeenCalledWith({test: "test"}, job); }); - }); - it("should run worker, execute processor and handle error", async () => { - const bullMQModule = PlatformTest.get(BullMQModule); - const worker = PlatformTest.get("bullmq.job.default.regular"); - const job = { - name: "regular", - queueName: "default", - data: {test: "test"} - }; + it("should log warning when the worker doesn't exists", async () => { + const bullMQModule = PlatformTest.get(BullMQModule); - jest.spyOn(PlatformTest.injector.logger, "error"); + jest.spyOn(PlatformTest.injector.logger, "warn"); - jest.spyOn(worker, "handle").mockRejectedValue(new Error("error") as never); + const job = { + name: "regular", + queueName: "toto", + data: {test: "test"} + }; - const error = await catchAsyncError(() => (bullMQModule as any).onProcess(job)); + await (bullMQModule as any).onProcess(job); + + expect(PlatformTest.injector.logger.warn).toHaveBeenCalledWith({ + event: "BULLMQ_JOB_NOT_FOUND", + message: "Job regular toto not found" + }); + }); - expect(worker.handle).toHaveBeenCalledWith({test: "test"}, job); - expect(PlatformTest.injector.logger.error).toHaveBeenCalledWith({ - duration: expect.any(Number), - event: "BULLMQ_JOB_ERROR", - message: "error", - reqId: expect.any(String), - stack: expect.any(String), - time: expect.any(Object) + it("should run worker, execute processor and handle error", async () => { + const bullMQModule = PlatformTest.get(BullMQModule); + const worker = PlatformTest.get("bullmq.job.default.regular"); + const job = { + name: "regular", + queueName: "default", + data: {test: "test"} + }; + + jest.spyOn(PlatformTest.injector.logger, "error"); + + jest.spyOn(worker, "handle").mockRejectedValue(new Error("error") as never); + + const error = await catchAsyncError(() => (bullMQModule as any).onProcess(job)); + + expect(worker.handle).toHaveBeenCalledWith({test: "test"}, job); + expect(PlatformTest.injector.logger.error).toHaveBeenCalledWith({ + duration: expect.any(Number), + event: "BULLMQ_JOB_ERROR", + message: "error", + reqId: expect.any(String), + stack: expect.any(String), + time: expect.any(Object) + }); + expect(error?.message).toEqual("error"); }); - expect(error?.message).toEqual("error"); }); }); }); - -it('skips initialization when no config is provided', async () => { - const injector = mock(InjectorService) - const dispatcher = mock(JobDispatcher); - await PlatformTest.create({ - imports: [ - { - token: InjectorService, - use: instance(injector), - }, - { - token: JobDispatcher, - use: instance(dispatcher) - } - ] - }) - - verify(injector.add(anyString(), anything())).never(); - verify(dispatcher.dispatch(anything())).never() -}); diff --git a/packages/third-parties/bullmq/src/BullMQModule.ts b/packages/third-parties/bullmq/src/BullMQModule.ts index e91af8e66b4..fb5370e3854 100644 --- a/packages/third-parties/bullmq/src/BullMQModule.ts +++ b/packages/third-parties/bullmq/src/BullMQModule.ts @@ -1,7 +1,8 @@ import {BeforeInit, DIContext, runInContext} from "@tsed/common"; import {Constant, InjectorService, Module} from "@tsed/di"; +import {deepMerge} from "@tsed/core"; import {getComputedType} from "@tsed/schema"; -import {Job, Queue, Worker} from "bullmq"; +import {Job, Queue, QueueOptions, Worker, WorkerOptions} from "bullmq"; import {v4} from "uuid"; import {BullMQConfig} from "./config/config"; import {JobMethods} from "./contracts"; @@ -27,13 +28,19 @@ export class BullMQModule implements BeforeInit { private buildQueues() { this.bullmq.queues.forEach((queue) => { + const ops = deepMerge( + { + connection: this.bullmq.connection, + defaultJobOptions: this.bullmq.defaultJobOptions, + ...this.bullmq.defaultQueueOptions + }, + this.bullmq.queueOptions?.[queue] + )!; + this.injector .add(`bullmq.queue.${queue}`, { type: "bullmq:queue", - useValue: new Queue(queue, { - connection: this.bullmq.connection, - defaultJobOptions: this.bullmq.defaultJobOptions - }), + useValue: new Queue(queue, ops), hooks: { $onDestroy: (queue) => queue.close() } @@ -44,12 +51,18 @@ export class BullMQModule implements BeforeInit { private buildWorkers() { (this.bullmq.workerQueues ?? this.bullmq.queues).forEach((queue) => { + const ops = deepMerge( + { + connection: this.bullmq.connection, + ...this.bullmq.defaultWorkerOptions + }, + this.bullmq.workerOptions?.[queue] + )!; + this.injector .add(`bullmq.worker.${queue}`, { type: "bullmq:worker", - useValue: new Worker(queue, this.onProcess.bind(this), { - connection: this.bullmq.connection - }), + useValue: new Worker(queue, this.onProcess.bind(this), ops), hooks: { $onDestroy: (worker) => worker.close() } @@ -59,7 +72,7 @@ export class BullMQModule implements BeforeInit { } private getJob(name: string, queueName: string) { - return this.injector.get(`bullmq.job.${queueName}.${name}`); + return this.injector.get(`bullmq.job.${queueName}.${name}`); } private async onProcess(job: Job) { diff --git a/packages/third-parties/bullmq/src/config/config.ts b/packages/third-parties/bullmq/src/config/config.ts index ca7a469218b..c164101dee3 100644 --- a/packages/third-parties/bullmq/src/config/config.ts +++ b/packages/third-parties/bullmq/src/config/config.ts @@ -1,13 +1,51 @@ -import {type ConnectionOptions, type DefaultJobOptions} from "bullmq"; +import type {ConnectionOptions, DefaultJobOptions, QueueOptions, WorkerOptions} from "bullmq"; export type BullMQConfig = { + /** + * Specify queue name's to create + */ queues: string[]; + + /** + * Default connection to use for queue's and worker's + */ connection: ConnectionOptions; + + /** + * @deprecated Use defaultQueueOptions instead. Will be removed in the next major release + */ defaultJobOptions?: DefaultJobOptions; - disableWorker?: boolean; - // optionally specify for which queues to start a worker for - // in case not all queues should be worked on + + /** + * Default queue options which are applied to every queue + * + * Can be extended/overridden by `queueOptions` + */ + defaultQueueOptions?: QueueOptions; + + /** + * Specify additional queue options by queue name + */ + queueOptions?: Record; + + /** + * Specify for which queues to start a worker for. + * + * Defaultly for every queue added in the `queues` parameter + */ workerQueues?: string[]; + + /** + * Default worker options which are applied to every worker + * + * Can be extended/overridden by `workerOptions` + */ + defaultWorkerOptions?: WorkerOptions; + + /** + * Specify additional worker options by queue name + */ + workerOptions?: Record; }; declare global { diff --git a/packages/third-parties/bullmq/src/contracts/JobMethods.ts b/packages/third-parties/bullmq/src/contracts/JobMethods.ts index 3ff828e166f..96be7c8233b 100644 --- a/packages/third-parties/bullmq/src/contracts/JobMethods.ts +++ b/packages/third-parties/bullmq/src/contracts/JobMethods.ts @@ -1,3 +1,5 @@ -export interface JobMethods { - handle(payload: unknown): unknown; +import {Job} from "bullmq"; + +export interface JobMethods { + handle(payload: DataType, job: Job): ReturnType | Promise; }