From 264a81ca5f4e4f88c361d507312324b5f6c3225c Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Wed, 16 Aug 2023 17:58:11 +0200 Subject: [PATCH] fix(worker): abort rate-limit delay when closing worker --- src/classes/worker.ts | 15 +++++++++++---- src/utils.ts | 14 ++++++++++++-- tests/test_rate_limiter.ts | 11 +++++++++++ tsconfig.json | 2 +- 4 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 649ad91588..c44fd527e8 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -166,6 +166,7 @@ export class Worker< private running = false; private blockUntil = 0; private limitUntil = 0; + private abortDelayController: AbortController | null = null; protected processFn: Processor; @@ -474,8 +475,9 @@ export class Worker< } } else { if (this.limitUntil) { - // TODO: We need to be able to break this delay when we are closing the worker. - await this.delay(this.limitUntil); + this.abortDelayController?.abort(); + this.abortDelayController = new AbortController(); + await this.delay(this.limitUntil, this.abortDelayController); } return this.moveToActive(token); } @@ -567,8 +569,11 @@ export class Worker< * * This function is exposed only for testing purposes. */ - async delay(milliseconds?: number): Promise { - await delay(milliseconds || DELAY_TIME_1); + async delay( + milliseconds?: number, + abortController?: AbortController, + ): Promise { + await delay(milliseconds || DELAY_TIME_1, abortController); } protected async nextJobFromJobData( @@ -742,6 +747,8 @@ export class Worker< this.closing = (async () => { this.emit('closing', 'closing queue'); + this.abortDelayController?.abort(); + const client = await this.blockingConnection.client; this.resume(); diff --git a/src/utils.ts b/src/utils.ts index 6a7fddc178..74e9277abb 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -46,9 +46,19 @@ export function array2obj(arr: string[]): Record { return obj; } -export function delay(ms: number): Promise { +export function delay( + ms: number, + abortController?: AbortController, +): Promise { return new Promise(resolve => { - setTimeout(() => resolve(), ms); + let timeout: ReturnType | undefined; + const callback = () => { + abortController?.signal.removeEventListener('abort', callback); + clearTimeout(timeout); + resolve(); + }; + timeout = setTimeout(callback, ms); + abortController?.signal.addEventListener('abort', callback); }); } diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 700097f5d3..6e387864aa 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -113,6 +113,17 @@ describe('Rate Limiter', function () { await worker.close(); }); + it('should quickly close a worker even with slow rate-limit', async function () { + const limiter = { max: 1, duration: 60 * 1000 }; + const worker = new Worker(queueName, async () => {}, { + connection: { host: 'localhost' }, + limiter, + }); + await queue.add('test', 1); + await delay(500); + await worker.close(); + }); + describe('when queue is paused between rate limit', () => { it('should add active jobs to paused', async function () { this.timeout(20000); diff --git a/tsconfig.json b/tsconfig.json index 6167b27daf..1d744aba84 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -20,7 +20,7 @@ "paths": { "@src/*": ["src/*"] }, - "lib": ["esnext"] + "lib": ["esnext", "DOM"] }, "include": ["src"], "exclude": ["node_modules", "dist", "tests/*"]