Skip to content

Commit

Permalink
feat: add support for dynamic rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 25, 2022
1 parent 81f780a commit 2d51d2b
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 1 deletion.
41 changes: 41 additions & 0 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ export interface WorkerListener<
stalled: (jobId: string, prev: string) => void;
}

const RATE_LIMIT_ERROR = 'bullmq:rateLimitExceeded';

/**
*
* This class represents a worker that is able to process jobs from the queue.
Expand Down Expand Up @@ -161,6 +163,11 @@ export class Worker<
private blockingConnection: RedisConnection;

private processing: Set<Promise<void | Job<DataType, ResultType, NameType>>>;

static RateLimitError() {
return new Error(RATE_LIMIT_ERROR);
}

constructor(
name: string,
processor?: string | Processor<DataType, ResultType, NameType>,
Expand Down Expand Up @@ -435,6 +442,22 @@ export class Worker<
}
}

/**
* Overrides the rate limit to be active for the next jobs.
*
* @param expireTimeMs expire time in ms of this rate limit.
*/
async rateLimit(expireTimeMs: number) {
await this.client.then(client =>
client.set(
this.keys.limiter,
Number.MAX_SAFE_INTEGER,
'PX',
expireTimeMs,
),
);
}

protected async moveToActive(
token: string,
jobId?: string,
Expand Down Expand Up @@ -584,6 +607,11 @@ export class Worker<
const handleFailed = async (err: Error) => {
if (!this.connection.closing) {
try {
if (err.message == RATE_LIMIT_ERROR) {
this.limitUntil = await this.moveLimitedBackToWait(job);
return;
}

await job.moveToFailed(err, token);
this.emit('failed', job, err, 'active');
} catch (err) {
Expand All @@ -598,6 +626,7 @@ export class Worker<
this.emit('active', job, 'waiting');

lockExtender();

try {
const result = await this.callProcessJob(job, token);
return await handleCompleted(result);
Expand Down Expand Up @@ -786,4 +815,16 @@ export class Worker<
),
);
}

private async moveLimitedBackToWait(
job: Job<DataType, ResultType, NameType>,
) {
const multi = (await this.client).multi();
multi.pttl(this.keys.limiter);
multi.lrem(this.keys.active, 1, job.id);
multi.rpush(this.keys.wait, job.id);
multi.del(`${this.toKey(job.id)}:lock`);
const [[err, limitUntil]] = await multi.exec();
return <number>limitUntil;
}
}
62 changes: 61 additions & 1 deletion tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,66 @@ describe('Rate Limiter', function () {
await worker.close();
});

it('should obey the rate limit with dynamic limit', async function () {
this.timeout(5000);

const numJobs = 10;
const dynamicLimit = 250;
const duration = 100;

const worker = new Worker(
queueName,
async job => {
if (job.attemptsMade === 1) {
await worker.rateLimit(dynamicLimit);
throw Worker.RateLimitError();
}
},
{
connection,
limiter: {
max: 1,
duration,
},
},
);

const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
await worker.close();

try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte(
numJobs * dynamicLimit + numJobs * duration,
);
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
await worker.close();
reject(err);
});
});

const startTime = new Date().getTime();
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

await result;
await worker.close();
});

it('should obey the rate limit with workerDelay enabled', async function () {
this.timeout(20000);

Expand Down Expand Up @@ -200,7 +260,7 @@ describe('Rate Limiter', function () {
await worker.close();
});

it.skip('should obey priority', async function () {
it('should obey priority', async function () {
this.timeout(20000);

const numJobs = 10;
Expand Down

0 comments on commit 2d51d2b

Please sign in to comment.