Skip to content

Commit

Permalink
feat(worker): simplify lock extension to one call independent of conc…
Browse files Browse the repository at this point in the history
…urrency
  • Loading branch information
manast committed Feb 22, 2023
1 parent cc35401 commit ebf1aeb
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 107 deletions.
3 changes: 2 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ export class Scripts {
jobId: string,
token: string,
duration: number,
client?: RedisClient | ChainableCommander,
): Promise<number> {
const client = await this.queue.client;
client = client || (await this.queue.client);
const args = [
this.queue.toKey(jobId) + ':lock',
this.queue.keys.stalled,
Expand Down
211 changes: 105 additions & 106 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ export class Worker<
private childPool: ChildPool;
protected timerManager: TimerManager;

private extendLocksTimer: NodeJS.Timeout | null = null;

private blockingConnection: RedisConnection;

private processing: Map<
Expand Down Expand Up @@ -355,77 +357,84 @@ export class Worker<
}

async run() {
if (this.processFn) {
if (!this.running) {
try {
this.running = true;
if (!this.processFn) {
throw new Error('No process function is defined.');
}

if (this.closing) {
return;
}
if (this.running) {
throw new Error('Worker is already running.');
}

this.runStalledJobsCheck();

const processing = (this.processing = new Map());
let tokenPostfix = 0;

while (!this.closing) {
if (
!this.waiting &&
processing.size < this.opts.concurrency &&
(!this.limitUntil || processing.size == 0)
) {
const restProcesses = this.opts.concurrency - processing.size;
for (let i = 0; i < restProcesses; i++) {
const token = `${this.id}:${tokenPostfix++}`;
processing.set(
this.retryIfFailed<Job<DataType, ResultType, NameType>>(
() => this.getNextJob(token),
this.opts.runRetryDelay,
),
token,
);
}
}

/*
* Get the first promise that completes
*/
const promises = [...processing.keys()];
const completedIdx = await Promise.race(
promises.map((p, idx) => p.then(() => idx)),
try {
this.running = true;

if (this.closing) {
return;
}

this.runStalledJobsCheck();

const jobsInProgress = {};
this.extendLocksTimer = setInterval(async () => {
this.extendLocks(jobsInProgress);
}, this.opts.lockRenewTime);

const processing = (this.processing = new Map());
let tokenPostfix = 0;

while (!this.closing) {
if (
!this.waiting &&
processing.size < this.opts.concurrency &&
(!this.limitUntil || processing.size == 0)
) {
const restProcesses = this.opts.concurrency - processing.size;
for (let i = 0; i < restProcesses; i++) {
const token = `${this.id}:${tokenPostfix++}`;
processing.set(
this.retryIfFailed<Job<DataType, ResultType, NameType>>(
() => this.getNextJob(token),
this.opts.runRetryDelay,
),
token,
);
const completed = promises[completedIdx];
const job = await completed;
if (job) {
const token = processing.get(completed);
processing.set(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() =>
this.processJob(
job,
token,
() => processing.size <= this.opts.concurrency,
),
this.opts.runRetryDelay,
),
token,
);
}
processing.delete(completed);
}
this.running = false;
return Promise.all([...processing.keys()]);
} catch (error) {
this.running = false;
}

throw error;
/*
* Get the first promise that completes
*/
const promises = [...processing.keys()];
const completedIdx = await Promise.race(
promises.map((p, idx) => p.then(() => idx)),
);
const completed = promises[completedIdx];
const job = await completed;
if (job) {
const token = processing.get(completed);
processing.set(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() =>
this.processJob(
job,
token,
() => processing.size <= this.opts.concurrency,
jobsInProgress,
),
this.opts.runRetryDelay,
),
token,
);
}
} else {
throw new Error('Worker is already running.');
processing.delete(completed);
}
} else {
throw new Error('No process function is defined.');
this.running = false;
return Promise.all([...processing.keys()]);
} catch (error) {
this.running = false;
throw error;
} finally {
clearInterval(this.extendLocksTimer);
}
}

Expand Down Expand Up @@ -599,49 +608,12 @@ export class Worker<
job: Job<DataType, ResultType, NameType>,
token: string,
fetchNextCallback = () => true,
jobsInProgress: { [key: string]: string } = {},
): Promise<void | Job<DataType, ResultType, NameType>> {
if (!job || this.closing || this.paused) {
return;
}

//
// There are two cases to take into consideration regarding locks.
// 1) The lock renewer fails to renew a lock, this should make this job
// unable to complete, since some other worker is also working on it.
// 2) The lock renewer is called more seldom than the check for stalled
// jobs, so we can assume the job has been stalled and is already being processed
// by another worker. See https://github.com/OptimalBits/bull/issues/308
//
// TODO: Have only 1 timer that extends all the locks instead of one timer
// per concurrency setting.
let lockRenewId: string;
let timerStopped = false;
const lockExtender = () => {
lockRenewId = this.timerManager.setTimer(
'lockExtender',
this.opts.lockRenewTime,
async () => {
try {
const result = await job.extendLock(token, this.opts.lockDuration);
if (result && !timerStopped) {
lockExtender();
}
// FIXME if result = 0 (missing lock), reject processFn promise to take next job?
} catch (error) {
console.error('Error extending lock ', error);
// Somehow tell the worker this job should stop processing...
}
},
);
};

const stopTimer = () => {
timerStopped = true;
this.timerManager.clearTimer(lockRenewId);
};

// end copy-paste from Bull3

const handleCompleted = async (result: ResultType) => {
if (!this.connection.closing) {
const completed = await job.moveToCompleted(
Expand All @@ -668,23 +640,22 @@ export class Worker<
} catch (err) {
this.emit('error', <Error>err);
// It probably means that the job has lost the lock before completion
// The QueueScheduler will (or already has) moved the job back
// A worker will (or already has) moved the job back
// to the waiting list (as stalled)
}
}
};

this.emit('active', job, 'waiting');

lockExtender();

try {
jobsInProgress[job.id] = token;
const result = await this.callProcessJob(job, token);
return await handleCompleted(result);
} catch (err) {
return handleFailed(<Error>err);
} finally {
stopTimer();
delete jobsInProgress[job.id];
}
}

Expand Down Expand Up @@ -821,6 +792,34 @@ export class Worker<
} while (retry);
}

private async extendLocks(jobsInProgress: { [key: string]: string }) {
try {
const multi = (await this.client).multi();
const jobIds = Object.keys(jobsInProgress);
for (const jobId of jobIds) {
await this.scripts.extendLock(
jobId,
jobsInProgress[jobId],
this.opts.lockDuration,
multi,
);
}
const result = (await multi.exec()) as [Error, string][];

for (const [err, jobId] of result) {
if (err) {
delete jobsInProgress[jobId];
this.emit(
'error',
new Error(`could not renew lock for job ${jobId}`),
);
}
}
} catch (err) {
this.emit('error', <Error>err);
}
}

private async runStalledJobsCheck() {
try {
if (!this.closing) {
Expand Down

0 comments on commit ebf1aeb

Please sign in to comment.