Skip to content

Commit

Permalink
feat(worker): replace Promise.race with efficient an async fifo
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Mar 2, 2023
1 parent 3a8293a commit 0d94e35
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 36 deletions.
96 changes: 96 additions & 0 deletions src/classes/async-fifo-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* AsyncFifoQueue
*
* A minimal FIFO queue for asyncrhonous operations. Allows adding asynchronous operations
* and consume them in the order they are resolved.
*
*/

export class AsyncFifoQueue<T> {
private queue: T[] = [];

private nextPromise: Promise<T> | undefined;
private resolve: ((value: T | undefined) => void) | undefined;
private reject: ((reason?: any) => void) | undefined;
private pending = new Set<Promise<T>>();

constructor(private ignoreErrors = false) {
this.newPromise();
}

public add(promise: Promise<T>) {
this.pending.add(promise);

promise
.then(job => {
this.pending.delete(promise);

if (this.queue.length === 0) {
this.resolvePromise(job);
}
this.queue.push(job);
})
.catch(err => {
// Ignore errors
if (this.ignoreErrors) {
this.queue.push(undefined);
}
this.pending.delete(promise);
this.rejectPromise(err);
});
}

public async waitAll() {
await Promise.all(this.pending);
}

public numTotal() {
return this.pending.size + this.queue.length;
}

public numPending() {
return this.pending.size;
}

public numQueued() {
return this.queue.length;
}

private resolvePromise(job: T) {
this.resolve(job);
this.newPromise();
}

private rejectPromise(err: any) {
this.reject(err);
this.newPromise();
}

private newPromise() {
this.nextPromise = new Promise<T>((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
}

private async wait() {
return this.nextPromise;
}

public async fetch(): Promise<T | void> {
if (this.pending.size === 0 && this.queue.length === 0) {
return;
}
while (this.queue.length === 0) {
try {
await this.wait();
} catch (err) {
// Ignore errors
if (!this.ignoreErrors) {
console.error('Unexpected Error in AsyncFifoQueue', err);
}
}
}
return this.queue.shift();
}
}
67 changes: 34 additions & 33 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { ChildPool } from './child-pool';
import { Job } from './job';
import { RedisConnection } from './redis-connection';
import sandbox from './sandbox';
import { clearInterval } from 'timers';
import { AsyncFifoQueue } from './async-fifo-queue';

// 10 seconds is the maximum time a BRPOPLPUSH can block.
const maximumBlockTimeout = 10;
Expand Down Expand Up @@ -178,7 +178,11 @@ export class Worker<

private stalledCheckTimer: NodeJS.Timeout;

private processing: Set<Promise<void | Job<DataType, ResultType, NameType>>>;
private asyncFifoQueue: AsyncFifoQueue<void | Job<
DataType,
ResultType,
NameType
>>;

static RateLimitError(): Error {
return new Error(RATE_LIMIT_ERROR);
Expand Down Expand Up @@ -373,34 +377,37 @@ export class Worker<
const jobsInProgress = new Set<{ job: Job; ts: number }>();
await this.startLockExtenderTimer(jobsInProgress);

const processing = (this.processing = new Set()); //new Map());
const asyncFifoQueue = (this.asyncFifoQueue =
new AsyncFifoQueue<void | Job<DataType, ResultType, NameType>>());

let tokenPostfix = 0;

while (!this.closing) {
while (
!this.waiting &&
processing.size < this.opts.concurrency &&
(!this.limitUntil || processing.size == 0)
asyncFifoQueue.numTotal() < this.opts.concurrency &&
(!this.limitUntil || asyncFifoQueue.numTotal() == 0)
) {
const token = `${this.id}:${tokenPostfix++}`;
processing.add(
this.retryIfFailed<Job<DataType, ResultType, NameType>>(
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() => this.getNextJob(token),
this.opts.runRetryDelay,
),
);
}

const job = await this.getFirstCompletedJob();
const job = await asyncFifoQueue.fetch();

if (job) {
const token = job.token;
processing.add(
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() =>
this.processJob(
job,
token,
() => processing.size <= this.opts.concurrency,
() => asyncFifoQueue.numTotal() <= this.opts.concurrency,
jobsInProgress,
),
this.opts.runRetryDelay,
Expand All @@ -410,26 +417,14 @@ export class Worker<
}

this.running = false;
return Promise.all(processing);
// return Promise.all(processing);
return asyncFifoQueue.waitAll();
} catch (error) {
this.running = false;
throw error;
}
}

/*
* Get the first promise that completes
*/
private async getFirstCompletedJob() {
const promises = Array.from(this.processing);
const completedIdx = await Promise.race(
promises.map((p, idx) => p.then(() => idx)),
);
const completed = promises[completedIdx];
this.processing.delete(completed);
return completed;
}

/**
* Returns a promise that resolves to the next job in queue.
* @param token - worker token to be assigned to retrieved job
Expand Down Expand Up @@ -748,8 +743,8 @@ export class Worker<
}
return closePoolPromise;
})
.finally(() => clearInterval(this.extendLocksTimer))
.finally(() => clearInterval(this.stalledCheckTimer))
.finally(() => clearTimeout(this.extendLocksTimer))
.finally(() => clearTimeout(this.stalledCheckTimer))
.finally(() => client.disconnect())
.finally(() => this.connection.close())
.finally(() => this.emit('closed'));
Expand All @@ -770,19 +765,23 @@ export class Worker<
* @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs}
*/
async startStalledCheckTimer(): Promise<void> {
if (!this.stalledCheckTimer && !this.opts.skipStalledCheck) {
if (!this.opts.skipStalledCheck) {
clearTimeout(this.stalledCheckTimer);

await this.runStalledJobsCheck();
this.stalledCheckTimer = setInterval(() => {
this.runStalledJobsCheck();
this.stalledCheckTimer = setTimeout(async () => {
this.startStalledCheckTimer();
}, this.opts.stalledInterval);
}
}

private async startLockExtenderTimer(
jobsInProgress: Set<{ job: Job; ts: number }>,
): Promise<void> {
if (!this.extendLocksTimer) {
this.extendLocksTimer = setInterval(async () => {
if (!this.opts.skipLockRenewal) {
clearTimeout(this.extendLocksTimer);

this.extendLocksTimer = setTimeout(async () => {
// Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime
const now = Date.now();
const jobsToExtend = [];
Expand All @@ -807,6 +806,8 @@ export class Worker<
} catch (err) {
this.emit('error', <Error>err);
}

this.startLockExtenderTimer(jobsInProgress);
}, this.opts.lockRenewTime / 2);
}
}
Expand All @@ -826,8 +827,8 @@ export class Worker<
reconnect = false;
}

if (this.processing) {
await Promise.all(this.processing.keys());
if (this.asyncFifoQueue) {
await this.asyncFifoQueue.waitAll();
}

reconnect && (await this.blockingConnection.reconnect());
Expand Down
9 changes: 9 additions & 0 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ export interface WorkerOptions extends QueueBaseOptions {
*/
skipStalledCheck?: boolean;

/**
* Skip lock renewal for this worker. If set to true, the lock will expire
* after lockDuration and moved back to the wait queue (if the stalled check is
* not disabled)
*
* @default false
*/
skipLockRenewal?: boolean;

/**
*
* Number of seconds to long poll for jobs when the queue is empty.
Expand Down
121 changes: 121 additions & 0 deletions tests/test_async_fifo_queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { expect } from 'chai';
import { AsyncFifoQueue } from '../src/classes/async-fifo-queue';

describe('AsyncFIFOQueue', () => {
it('add several promises and wait for them to complete', async () => {
const asyncFifoQueue = new AsyncFifoQueue<number>();
const promises = [1, 2, 3, 4, 5].map(
i =>
new Promise<number>(resolve => {
setTimeout(() => resolve(i), i * 100);
}),
);
promises.forEach(p => asyncFifoQueue.add(p));

expect(asyncFifoQueue.numPending()).to.be.eql(promises.length);
expect(asyncFifoQueue.numQueued()).to.be.eql(0);
expect(asyncFifoQueue.numTotal()).to.be.eql(promises.length);

await asyncFifoQueue.waitAll();
expect(asyncFifoQueue.numPending()).to.be.eql(0);
expect(asyncFifoQueue.numQueued()).to.be.eql(promises.length);
expect(asyncFifoQueue.numTotal()).to.be.eql(promises.length);
});

it('add several promises and wait for them to complete in order', async () => {
const asyncFifoQueue = new AsyncFifoQueue<number>();
const promises = [1, 2, 3, 4, 5].map(
i =>
new Promise<number>(resolve => {
setTimeout(() => resolve(i), i * 100);
}),
);
promises.forEach(p => asyncFifoQueue.add(p));

expect(asyncFifoQueue.numPending()).to.be.eql(promises.length);
expect(asyncFifoQueue.numQueued()).to.be.eql(0);
expect(asyncFifoQueue.numTotal()).to.be.eql(promises.length);

const results: number[] = [];
for (let i = 0; i < promises.length; i++) {
results.push((await asyncFifoQueue.fetch())!);
}

expect(results).to.be.eql([1, 2, 3, 4, 5]);
});

it('add several promises with random delays and wait for them to complete in order', async () => {
const asyncFifoQueue = new AsyncFifoQueue<number>();

const randomDelays = [250, 100, 570, 50, 400, 10, 300, 125, 460, 200];

const promises = randomDelays.map(
i =>
new Promise<number>(resolve => {
setTimeout(() => resolve(i), i);
}),
);
promises.forEach(p => asyncFifoQueue.add(p));

expect(asyncFifoQueue.numPending()).to.be.eql(promises.length);
expect(asyncFifoQueue.numQueued()).to.be.eql(0);
expect(asyncFifoQueue.numTotal()).to.be.eql(promises.length);

const results: number[] = [];
for (let i = 0; i < promises.length; i++) {
results.push((await asyncFifoQueue.fetch())!);
}

expect(results).to.be.eql(randomDelays.sort((a, b) => a - b));
});

it('add several promises while fetching them concurrently', async () => {
const asyncFifoQueue = new AsyncFifoQueue<number>();

const randomDelays = [
250, 100, 570, 50, 400, 10, 300, 125, 460, 200, 60, 100,
];
const results: number[] = [];
const concurrency = 3;

for (let i = 0; i < randomDelays.length; i++) {
const delay = randomDelays[i];
asyncFifoQueue.add(
new Promise<number>(resolve => {
setTimeout(() => resolve(delay), delay);
}),
);

if ((i + 1) % concurrency === 0) {
for (let j = 0; j < concurrency; j++) {
results.push((await asyncFifoQueue.fetch())!);
}
}
}

const expected = [100, 250, 570, 10, 50, 400, 125, 300, 460, 60, 100, 200];

expect(results).to.be.eql(expected);
});

it("should handle promises that get rejected and don't block the queue", async () => {
const asyncFifoQueue = new AsyncFifoQueue<number>(true);

const randomDelays = [250, 100, 570, 50, 400, 10, 300, 125, 460, 200];

for (let i = 0; i < randomDelays.length; i++) {
asyncFifoQueue.add(
new Promise<number>((resolve, reject) => {
setTimeout(() => reject(new Error(`${randomDelays[i]}`)), i);
}),
);
}

const results: number[] = [];
for (let i = 0; i < randomDelays.length; i++) {
results.push((await asyncFifoQueue.fetch())!);
}

expect(results).to.be.eql(randomDelays.map(() => void 0));
});
});
Loading

0 comments on commit 0d94e35

Please sign in to comment.