diff --git a/src/classes/job.ts b/src/classes/job.ts index b8170dbacb..cfff431eb9 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -208,6 +208,7 @@ export class Job { * Moves a job to the completed queue. * Returned job to be used with Queue.prototype.nextJobFromJobData. * @param returnValue {string} The jobs success message. + * @param token {string} worker token used to acquire completed job. * @param fetchNext {boolean} True when wanting to fetch the next job * @returns {Promise} Returns the jobData of the next job in the waiting queue. */ diff --git a/src/classes/queue-scheduler.ts b/src/classes/queue-scheduler.ts index d1f1a47afd..7511d92379 100644 --- a/src/classes/queue-scheduler.ts +++ b/src/classes/queue-scheduler.ts @@ -29,8 +29,7 @@ export class QueueScheduler extends QueueBase { constructor(protected name: string, opts: QueueSchedulerOptions = {}) { super(name, { maxStalledCount: 1, stalledInterval: 30000, ...opts }); - // tslint:disable: no-floating-promises - this.run(); + this.run().catch(this.emit.bind(this)); } private async run() { @@ -163,7 +162,7 @@ export class QueueScheduler extends QueueBase { if (this.isBlocked) { this.closing = this.disconnect(); } else { - super.close(); + await super.close(); } return this.closing; } diff --git a/src/classes/queue.ts b/src/classes/queue.ts index be665d5c4b..7bacaee6b3 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -21,14 +21,15 @@ export class Queue extends QueueGetters { this.jobsOpts = get(opts, 'defaultJobOptions'); - // tslint:disable: no-floating-promises - this.waitUntilReady().then(client => { - client.hset( - this.keys.meta, - 'opts.maxLenEvents', - get(opts, 'streams.events.maxLen', 10000), - ); - }); + this.waitUntilReady() + .then(client => + client.hset( + this.keys.meta, + 'opts.maxLenEvents', + get(opts, 'streams.events.maxLen', 10000), + ), + ) + .catch(this.emit.bind(this)); } get defaultJobOptions() { @@ -66,11 +67,11 @@ export class Queue extends QueueGetters { } /** - Adds an array of jobs to the queue. - @method add - @param jobs: [] The array of jobs to add to the queue. Each job is defined by 3 - properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'. -*/ + Adds an array of jobs to the queue. + @method add + @param jobs: [] The array of jobs to add to the queue. Each job is defined by 3 + properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'. + */ async addBulk(jobs: { name: string; data: any; opts?: JobsOptions }[]) { return Job.createBulk( this, @@ -83,16 +84,16 @@ export class Queue extends QueueGetters { } /** - Pauses the processing of this queue globally. + Pauses the processing of this queue globally. - We use an atomic RENAME operation on the wait queue. Since - we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue - is renamed to 'paused', no new jobs will be processed (the current ones - will run until finalized). + We use an atomic RENAME operation on the wait queue. Since + we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue + is renamed to 'paused', no new jobs will be processed (the current ones + will run until finalized). - Adding jobs requires a LUA script to check first if the paused list exist - and in that case it will add it there instead of the wait list. - */ + Adding jobs requires a LUA script to check first if the paused list exist + and in that case it will add it there instead of the wait list. + */ async pause() { await Scripts.pause(this, true); this.emit('paused'); diff --git a/src/classes/worker.ts b/src/classes/worker.ts index bff0405855..c92fee2fdf 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -19,6 +19,7 @@ export const clientCommandMessageReg = /ERR unknown command '\s*client\s*'/; export class Worker extends QueueBase { opts: WorkerOptions; + tokens: string[]; private drained: boolean; private waiting = false; @@ -32,7 +33,7 @@ export class Worker extends QueueBase { private blockingConnection: RedisConnection; - private processing: Map | string>, string>; // { [index: number]: Promise } = {}; + private processing: Map | void>, string> = new Map(); constructor( name: string, processor: string | Processor, @@ -50,6 +51,9 @@ export class Worker extends QueueBase { this.opts.lockRenewTime = this.opts.lockRenewTime || this.opts.lockDuration / 2; + this.tokens = Array.from({ length: this.opts.concurrency }, () => + uuid.v4(), + ); this.blockingConnection = new RedisConnection( isRedisInstance(opts.connection) @@ -77,10 +81,7 @@ export class Worker extends QueueBase { } this.timerManager = new TimerManager(); - /* tslint:disable: no-floating-promises */ - this.run().catch(error => { - console.error(error); - }); + this.run().catch(this.emit.bind(this)); } get repeat() { @@ -116,13 +117,7 @@ export class Worker extends QueueBase { } } - const opts: WorkerOptions = this.opts; - - const processing = (this.processing = new Map()); - - const tokens: string[] = Array.from({ length: opts.concurrency }, () => - uuid.v4(), - ); + const { opts, processing, tokens } = this; while (!this.closing) { if (processing.size < opts.concurrency) { @@ -251,13 +246,15 @@ export class Worker extends QueueBase { this.opts.lockRenewTime, async () => { try { - const result = await Scripts.extendLock(this, job.id, token); - if (result && !timerStopped) { - lockExtender(); + if (!this.closing) { + const result = await Scripts.extendLock(this, job.id, token); + if (result && !timerStopped) { + lockExtender(); + } + // FIXME if result = 0 (missing lock), reject processFn promise to take next job? } - // FIXME if result = 0 (missing lock), reject processFn promise to take next job? } catch (error) { - console.error('Error extending lock ', error); + this.emit.bind(this)(error); // Somehow tell the worker this job should stop processing... } }, diff --git a/src/test/test_connection.ts b/src/test/test_connection.ts index 0836e8e011..810875b145 100644 --- a/src/test/test_connection.ts +++ b/src/test/test_connection.ts @@ -136,6 +136,8 @@ describe('connection', () => { */ it('should fail if redis connection fails', async () => { + await queue.waitUntilReady(); // queue can report error when closed (the test is too fast) + const queueFail = new Queue('connection fail port', { connection: { port: 1234, host: '127.0.0.1' }, }); diff --git a/src/test/test_sandboxed_process.ts b/src/test/test_sandboxed_process.ts index 381837a292..7a52a96c4e 100644 --- a/src/test/test_sandboxed_process.ts +++ b/src/test/test_sandboxed_process.ts @@ -265,6 +265,7 @@ describe('sandboxed process', () => { }); it('should error if processor file is missing', async () => { + await queue.waitUntilReady(); // queue can report error when closed (the test is too fast) let worker; let didThrow = false; try {