Skip to content

Commit

Permalink
feat(worker): add support to disable stalled checks
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Feb 23, 2023
1 parent a185d5e commit 49e860c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 7 deletions.
17 changes: 10 additions & 7 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { Job } from './job';
import { RedisConnection } from './redis-connection';
import sandbox from './sandbox';
import { TimerManager } from './timer-manager';
import { clearInterval } from 'timers';

// 10 seconds is the maximum time a BRPOPLPUSH can block.
const maximumBlockTimeout = 10;
Expand Down Expand Up @@ -175,6 +176,8 @@ export class Worker<

private blockingConnection: RedisConnection;

private stalledCheckTimer: NodeJS.Timeout;

private processing: Map<
Promise<void | Job<DataType, ResultType, NameType>>,
string
Expand Down Expand Up @@ -354,6 +357,12 @@ export class Worker<
}

async run() {
if (!this.stalledCheckTimer && !this.opts.skipStalledCheck) {
this.stalledCheckTimer = setInterval(() => {
this.runStalledJobsCheck();
}, this.opts.stalledInterval);
}

if (this.processFn) {
if (!this.running) {
try {
Expand All @@ -363,8 +372,6 @@ export class Worker<
return;
}

this.runStalledJobsCheck();

const processing = (this.processing = new Map());
let tokenPostfix = 0;

Expand Down Expand Up @@ -775,6 +782,7 @@ export class Worker<
return closePoolPromise;
})
.finally(() => client.disconnect())
.finally(() => clearInterval(this.stalledCheckTimer))
.finally(() => this.timerManager && this.timerManager.clearAllTimers())
.finally(() => this.connection.close())
.finally(() => this.emit('closed'));
Expand Down Expand Up @@ -824,11 +832,6 @@ export class Worker<
try {
if (!this.closing) {
await this.checkConnectionError(() => this.moveStalledJobsToWait());
this.timerManager.setTimer(
'checkStalledJobs',
this.opts.stalledInterval,
() => this.runStalledJobsCheck(),
);
}
} catch (err) {
this.emit('error', <Error>err);
Expand Down
7 changes: 7 additions & 0 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ export interface WorkerOptions extends QueueBaseOptions {
*/
stalledInterval?: number;

/**
* Skip stalled check for this worker. Note that other workers could still
* perform stalled checkd and move jobs back to wait for jobs being processed
* by this worker.
*/
skipStalledCheck?: boolean;

skipDelayCheck?: boolean;
drainDelay?: number;
lockDuration?: number;
Expand Down
31 changes: 31 additions & 0 deletions tests/test_stalled_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,37 @@ describe('stalled jobs', function () {
await worker2.close();
});

it("don't process stalled jobs when starting a queue with skipStalledCheck", async function () {
const concurrency = 4;

const worker = new Worker(
queueName,
async () => {
return delay(1000);
},
{
connection,
stalledInterval: 50,
skipStalledCheck: true,
concurrency,
},
);

const allCompleted = new Promise(resolve => {
worker.on('completed', after(concurrency, resolve));
});

await Promise.all([
queue.add('test', { bar: 'baz' }),
queue.add('test', { bar1: 'baz1' }),
queue.add('test', { bar2: 'baz2' }),
queue.add('test', { bar3: 'baz3' }),
]);

await allCompleted;
await worker.close();
});

it('fail stalled jobs that stall more than allowable stalled limit', async function () {
this.timeout(6000);

Expand Down

0 comments on commit 49e860c

Please sign in to comment.