Skip to content

Commit

Permalink
fix: waitUntilFinished improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 24, 2019
1 parent 7ff72a7 commit 18d4afe
Showing 1 changed file with 27 additions and 39 deletions.
66 changes: 27 additions & 39 deletions src/classes/job.ts
Expand Up @@ -32,10 +32,11 @@ export class Job {
stacktrace: string[] = null;
timestamp: number;

private attemptsMade = 0;
private failedReason: string;
private finishedOn: number;
private processedOn: number;
attemptsMade = 0;
failedReason: string;
finishedOn: number;
processedOn: number;

private toKey: (type: string) => string;

private discarded: boolean;
Expand Down Expand Up @@ -190,15 +191,17 @@ export class Job {

this.returnvalue = returnValue || 0;

returnValue = tryCatch(JSON.stringify, JSON, [returnValue]);
if (returnValue === errorObject) {
const stringifiedReturnValue = tryCatch(JSON.stringify, JSON, [
returnValue,
]);
if (stringifiedReturnValue === errorObject) {
throw errorObject.value;
}

return Scripts.moveToCompleted(
this.queue,
this,
returnValue,
stringifiedReturnValue,
this.opts.removeOnComplete,
fetchNext,
);
Expand Down Expand Up @@ -307,11 +310,7 @@ export class Job {
/**
* Returns a promise the resolves when the job has finished. (completed or failed).
*/
async waitUntilFinished(
queueEvents: QueueEvents,
watchdog = 5000,
ttl?: number,
) {
async waitUntilFinished(queueEvents: QueueEvents, ttl?: number) {
await this.queue.waitUntilReady();

const jobId = this.id;
Expand All @@ -326,23 +325,18 @@ export class Job {
}
} else {
return new Promise((resolve, reject) => {
let interval: NodeJS.Timeout;
let timeout: NodeJS.Timeout;
if (ttl) {
timeout = setTimeout(() => onFailed('timedout'), ttl);
}

function onCompleted(args: any) {
let result: any = void 0;
try {
if (typeof args.returnvalue === 'string') {
result = JSON.parse(args.returnvalue);
}
} catch (err) {
//swallow exception because the resultValue got corrupted somehow.
debuglog(`corrupted resultValue: ${args.returnvalue}, ${err}`);
}
resolve(result);
resolve(args.returnvalue);
removeListeners();
}

function onFailed(args: any) {
reject(new Error(args.failedReason));
reject(new Error(args.failedReason || args));
removeListeners();
}

Expand All @@ -351,24 +345,14 @@ export class Job {

queueEvents.on(completedEvent, onCompleted);
queueEvents.on(failedEvent, onFailed);
this.queue.on('closing', onFailed);

function removeListeners() {
clearInterval(interval);
const removeListeners = () => {
clearInterval(timeout);
queueEvents.removeListener(completedEvent, onCompleted);
queueEvents.removeListener(failedEvent, onFailed);
}

//
// Watchdog
//
interval = setInterval(() => {
if (this.queue.closing) {
removeListeners();
reject(
new Error('cannot check if job is finished in a closing queue.'),
);
}
}, watchdog);
this.queue.removeListener('closing', onFailed);
};
});
}
}
Expand Down Expand Up @@ -418,6 +402,10 @@ export class Job {
}
}

discard() {
this.discarded = true;
}

private async isInZSet(set: string) {
const score = await this.queue.client.zscore(
this.queue.toKey(set),
Expand Down

0 comments on commit 18d4afe

Please sign in to comment.