Skip to content

Commit

Permalink
fix(worker): abort rate-limit delay when closing worker
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 17, 2023
1 parent d82fdcc commit 264a81c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
15 changes: 11 additions & 4 deletions src/classes/worker.ts
Expand Up @@ -166,6 +166,7 @@ export class Worker<
private running = false;
private blockUntil = 0;
private limitUntil = 0;
private abortDelayController: AbortController | null = null;

protected processFn: Processor<DataType, ResultType, NameType>;

Expand Down Expand Up @@ -474,8 +475,9 @@ export class Worker<
}
} else {
if (this.limitUntil) {
// TODO: We need to be able to break this delay when we are closing the worker.
await this.delay(this.limitUntil);
this.abortDelayController?.abort();
this.abortDelayController = new AbortController();
await this.delay(this.limitUntil, this.abortDelayController);
}
return this.moveToActive(token);
}
Expand Down Expand Up @@ -567,8 +569,11 @@ export class Worker<
*
* This function is exposed only for testing purposes.
*/
async delay(milliseconds?: number): Promise<void> {
await delay(milliseconds || DELAY_TIME_1);
async delay(
milliseconds?: number,
abortController?: AbortController,
): Promise<void> {
await delay(milliseconds || DELAY_TIME_1, abortController);
}

protected async nextJobFromJobData(
Expand Down Expand Up @@ -742,6 +747,8 @@ export class Worker<
this.closing = (async () => {
this.emit('closing', 'closing queue');

this.abortDelayController?.abort();

const client = await this.blockingConnection.client;

this.resume();
Expand Down
14 changes: 12 additions & 2 deletions src/utils.ts
Expand Up @@ -46,9 +46,19 @@ export function array2obj(arr: string[]): Record<string, string> {
return obj;
}

export function delay(ms: number): Promise<void> {
export function delay(
ms: number,
abortController?: AbortController,
): Promise<void> {
return new Promise(resolve => {
setTimeout(() => resolve(), ms);
let timeout: ReturnType<typeof setTimeout> | undefined;
const callback = () => {
abortController?.signal.removeEventListener('abort', callback);
clearTimeout(timeout);
resolve();
};
timeout = setTimeout(callback, ms);
abortController?.signal.addEventListener('abort', callback);
});
}

Expand Down
11 changes: 11 additions & 0 deletions tests/test_rate_limiter.ts
Expand Up @@ -113,6 +113,17 @@ describe('Rate Limiter', function () {
await worker.close();
});

it('should quickly close a worker even with slow rate-limit', async function () {
const limiter = { max: 1, duration: 60 * 1000 };
const worker = new Worker(queueName, async () => {}, {
connection: { host: 'localhost' },
limiter,
});
await queue.add('test', 1);
await delay(500);
await worker.close();
});

describe('when queue is paused between rate limit', () => {
it('should add active jobs to paused', async function () {
this.timeout(20000);
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.json
Expand Up @@ -20,7 +20,7 @@
"paths": {
"@src/*": ["src/*"]
},
"lib": ["esnext"]
"lib": ["esnext", "DOM"]
},
"include": ["src"],
"exclude": ["node_modules", "dist", "tests/*"]
Expand Down

0 comments on commit 264a81c

Please sign in to comment.