Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 104 improve error handling #105

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ export class Job<T = any, R = any> {
* Moves a job to the completed queue.
* Returned job to be used with Queue.prototype.nextJobFromJobData.
* @param returnValue {string} The jobs success message.
* @param token {string} worker token used to acquire completed job.
* @param fetchNext {boolean} True when wanting to fetch the next job
* @returns {Promise} Returns the jobData of the next job in the waiting queue.
*/
Expand Down
7 changes: 4 additions & 3 deletions src/classes/queue-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ export class QueueScheduler extends QueueBase {
constructor(protected name: string, opts: QueueSchedulerOptions = {}) {
super(name, { maxStalledCount: 1, stalledInterval: 30000, ...opts });

// tslint:disable: no-floating-promises
this.run();
this.run().catch(error => {
console.error(error);
stansv marked this conversation as resolved.
Show resolved Hide resolved
});
}

private async run() {
Expand Down Expand Up @@ -163,7 +164,7 @@ export class QueueScheduler extends QueueBase {
if (this.isBlocked) {
this.closing = this.disconnect();
} else {
super.close();
await super.close();
}
return this.closing;
}
Expand Down
45 changes: 24 additions & 21 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ export class Queue<T = any> extends QueueGetters {

this.jobsOpts = get(opts, 'defaultJobOptions');

// tslint:disable: no-floating-promises
this.waitUntilReady().then(client => {
client.hset(
this.keys.meta,
'opts.maxLenEvents',
get(opts, 'streams.events.maxLen', 10000),
);
});
this.waitUntilReady()
.then(client =>
client.hset(
this.keys.meta,
'opts.maxLenEvents',
get(opts, 'streams.events.maxLen', 10000),
),
)
.catch(error => {
console.error(error);
stansv marked this conversation as resolved.
Show resolved Hide resolved
});
}

get defaultJobOptions() {
Expand Down Expand Up @@ -66,11 +69,11 @@ export class Queue<T = any> extends QueueGetters {
}

/**
Adds an array of jobs to the queue.
@method add
@param jobs: [] The array of jobs to add to the queue. Each job is defined by 3
properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
*/
Adds an array of jobs to the queue.
@method add
@param jobs: [] The array of jobs to add to the queue. Each job is defined by 3
properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
*/
async addBulk(jobs: { name: string; data: any; opts?: JobsOptions }[]) {
return Job.createBulk(
this,
Expand All @@ -83,16 +86,16 @@ export class Queue<T = any> extends QueueGetters {
}

/**
Pauses the processing of this queue globally.
Pauses the processing of this queue globally.

We use an atomic RENAME operation on the wait queue. Since
we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
is renamed to 'paused', no new jobs will be processed (the current ones
will run until finalized).
We use an atomic RENAME operation on the wait queue. Since
we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
is renamed to 'paused', no new jobs will be processed (the current ones
will run until finalized).

Adding jobs requires a LUA script to check first if the paused list exist
and in that case it will add it there instead of the wait list.
*/
Adding jobs requires a LUA script to check first if the paused list exist
and in that case it will add it there instead of the wait list.
*/
async pause() {
await Scripts.pause(this, true);
this.emit('paused');
Expand Down
25 changes: 12 additions & 13 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const clientCommandMessageReg = /ERR unknown command '\s*client\s*'/;

export class Worker<T = any> extends QueueBase {
opts: WorkerOptions;
tokens: string[];

private drained: boolean;
private waiting = false;
Expand All @@ -32,7 +33,7 @@ export class Worker<T = any> extends QueueBase {

private blockingConnection: RedisConnection;

private processing: Map<Promise<Job<T> | string>, string>; // { [index: number]: Promise<Job | void> } = {};
private processing: Map<Promise<Job<T> | void>, string> = new Map();
constructor(
name: string,
processor: string | Processor,
Expand All @@ -50,6 +51,9 @@ export class Worker<T = any> extends QueueBase {

this.opts.lockRenewTime =
this.opts.lockRenewTime || this.opts.lockDuration / 2;
this.tokens = Array.from({ length: this.opts.concurrency }, () =>
uuid.v4(),
);

this.blockingConnection = new RedisConnection(
isRedisInstance(opts.connection)
Expand Down Expand Up @@ -77,7 +81,6 @@ export class Worker<T = any> extends QueueBase {
}
this.timerManager = new TimerManager();

/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
stansv marked this conversation as resolved.
Show resolved Hide resolved
});
Expand Down Expand Up @@ -116,13 +119,7 @@ export class Worker<T = any> extends QueueBase {
}
}

const opts: WorkerOptions = <WorkerOptions>this.opts;

const processing = (this.processing = new Map());

const tokens: string[] = Array.from({ length: opts.concurrency }, () =>
uuid.v4(),
);
const { opts, processing, tokens } = this;

while (!this.closing) {
if (processing.size < opts.concurrency) {
Expand Down Expand Up @@ -251,11 +248,13 @@ export class Worker<T = any> extends QueueBase {
this.opts.lockRenewTime,
async () => {
try {
const result = await Scripts.extendLock(this, job.id, token);
if (result && !timerStopped) {
lockExtender();
if (!this.closing) {
const result = await Scripts.extendLock(this, job.id, token);
if (result && !timerStopped) {
lockExtender();
}
// FIXME if result = 0 (missing lock), reject processFn promise to take next job?
}
// FIXME if result = 0 (missing lock), reject processFn promise to take next job?
} catch (error) {
stansv marked this conversation as resolved.
Show resolved Hide resolved
console.error('Error extending lock ', error);
// Somehow tell the worker this job should stop processing...
Expand Down
2 changes: 2 additions & 0 deletions src/test/test_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ describe('connection', () => {
*/

it('should fail if redis connection fails', async () => {
await queue.waitUntilReady(); // queue can report error when closed (the test is too fast)

const queueFail = new Queue('connection fail port', {
connection: { port: 1234, host: '127.0.0.1' },
});
Expand Down
1 change: 1 addition & 0 deletions src/test/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ describe('sandboxed process', () => {
});

it('should error if processor file is missing', async () => {
await queue.waitUntilReady(); // queue can report error when closed (the test is too fast)
let worker;
let didThrow = false;
try {
Expand Down