Skip to content

Commit

Permalink
Merge d5df6f5 into b4a7dfd
Browse files Browse the repository at this point in the history
  • Loading branch information
stansv committed Jan 15, 2020
2 parents b4a7dfd + d5df6f5 commit bfc8f66
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/classes/job.ts
Expand Up @@ -208,6 +208,7 @@ export class Job<T = any, R = any> {
* Moves a job to the completed queue.
* Returned job to be used with Queue.prototype.nextJobFromJobData.
* @param returnValue {string} The jobs success message.
* @param token {string} worker token used to acquire completed job.
* @param fetchNext {boolean} True when wanting to fetch the next job
* @returns {Promise} Returns the jobData of the next job in the waiting queue.
*/
Expand Down
5 changes: 2 additions & 3 deletions src/classes/queue-scheduler.ts
Expand Up @@ -29,8 +29,7 @@ export class QueueScheduler extends QueueBase {
constructor(protected name: string, opts: QueueSchedulerOptions = {}) {
super(name, { maxStalledCount: 1, stalledInterval: 30000, ...opts });

// tslint:disable: no-floating-promises
this.run();
this.run().catch(this.emit.bind(this));
}

private async run() {
Expand Down Expand Up @@ -163,7 +162,7 @@ export class QueueScheduler extends QueueBase {
if (this.isBlocked) {
this.closing = this.disconnect();
} else {
super.close();
await super.close();
}
return this.closing;
}
Expand Down
43 changes: 22 additions & 21 deletions src/classes/queue.ts
Expand Up @@ -21,14 +21,15 @@ export class Queue<T = any> extends QueueGetters {

this.jobsOpts = get(opts, 'defaultJobOptions');

// tslint:disable: no-floating-promises
this.waitUntilReady().then(client => {
client.hset(
this.keys.meta,
'opts.maxLenEvents',
get(opts, 'streams.events.maxLen', 10000),
);
});
this.waitUntilReady()
.then(client =>
client.hset(
this.keys.meta,
'opts.maxLenEvents',
get(opts, 'streams.events.maxLen', 10000),
),
)
.catch(this.emit.bind(this));
}

get defaultJobOptions() {
Expand Down Expand Up @@ -66,11 +67,11 @@ export class Queue<T = any> extends QueueGetters {
}

/**
Adds an array of jobs to the queue.
@method add
@param jobs: [] The array of jobs to add to the queue. Each job is defined by 3
properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
*/
Adds an array of jobs to the queue.
@method add
@param jobs: [] The array of jobs to add to the queue. Each job is defined by 3
properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
*/
async addBulk(jobs: { name: string; data: any; opts?: JobsOptions }[]) {
return Job.createBulk(
this,
Expand All @@ -83,16 +84,16 @@ export class Queue<T = any> extends QueueGetters {
}

/**
Pauses the processing of this queue globally.
Pauses the processing of this queue globally.
We use an atomic RENAME operation on the wait queue. Since
we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
is renamed to 'paused', no new jobs will be processed (the current ones
will run until finalized).
We use an atomic RENAME operation on the wait queue. Since
we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
is renamed to 'paused', no new jobs will be processed (the current ones
will run until finalized).
Adding jobs requires a LUA script to check first if the paused list exist
and in that case it will add it there instead of the wait list.
*/
Adding jobs requires a LUA script to check first if the paused list exist
and in that case it will add it there instead of the wait list.
*/
async pause() {
await Scripts.pause(this, true);
this.emit('paused');
Expand Down
31 changes: 14 additions & 17 deletions src/classes/worker.ts
Expand Up @@ -19,6 +19,7 @@ export const clientCommandMessageReg = /ERR unknown command '\s*client\s*'/;

export class Worker<T = any> extends QueueBase {
opts: WorkerOptions;
tokens: string[];

private drained: boolean;
private waiting = false;
Expand All @@ -32,7 +33,7 @@ export class Worker<T = any> extends QueueBase {

private blockingConnection: RedisConnection;

private processing: Map<Promise<Job<T> | string>, string>; // { [index: number]: Promise<Job | void> } = {};
private processing: Map<Promise<Job<T> | void>, string> = new Map();
constructor(
name: string,
processor: string | Processor,
Expand All @@ -50,6 +51,9 @@ export class Worker<T = any> extends QueueBase {

this.opts.lockRenewTime =
this.opts.lockRenewTime || this.opts.lockDuration / 2;
this.tokens = Array.from({ length: this.opts.concurrency }, () =>
uuid.v4(),
);

this.blockingConnection = new RedisConnection(
isRedisInstance(opts.connection)
Expand Down Expand Up @@ -77,10 +81,7 @@ export class Worker<T = any> extends QueueBase {
}
this.timerManager = new TimerManager();

/* tslint:disable: no-floating-promises */
this.run().catch(error => {
console.error(error);
});
this.run().catch(this.emit.bind(this));
}

get repeat() {
Expand Down Expand Up @@ -116,13 +117,7 @@ export class Worker<T = any> extends QueueBase {
}
}

const opts: WorkerOptions = <WorkerOptions>this.opts;

const processing = (this.processing = new Map());

const tokens: string[] = Array.from({ length: opts.concurrency }, () =>
uuid.v4(),
);
const { opts, processing, tokens } = this;

while (!this.closing) {
if (processing.size < opts.concurrency) {
Expand Down Expand Up @@ -251,13 +246,15 @@ export class Worker<T = any> extends QueueBase {
this.opts.lockRenewTime,
async () => {
try {
const result = await Scripts.extendLock(this, job.id, token);
if (result && !timerStopped) {
lockExtender();
if (!this.closing) {
const result = await Scripts.extendLock(this, job.id, token);
if (result && !timerStopped) {
lockExtender();
}
// FIXME if result = 0 (missing lock), reject processFn promise to take next job?
}
// FIXME if result = 0 (missing lock), reject processFn promise to take next job?
} catch (error) {
console.error('Error extending lock ', error);
this.emit.bind(this)(error);
// Somehow tell the worker this job should stop processing...
}
},
Expand Down
2 changes: 2 additions & 0 deletions src/test/test_connection.ts
Expand Up @@ -136,6 +136,8 @@ describe('connection', () => {
*/

it('should fail if redis connection fails', async () => {
await queue.waitUntilReady(); // queue can report error when closed (the test is too fast)

const queueFail = new Queue('connection fail port', {
connection: { port: 1234, host: '127.0.0.1' },
});
Expand Down
1 change: 1 addition & 0 deletions src/test/test_sandboxed_process.ts
Expand Up @@ -265,6 +265,7 @@ describe('sandboxed process', () => {
});

it('should error if processor file is missing', async () => {
await queue.waitUntilReady(); // queue can report error when closed (the test is too fast)
let worker;
let didThrow = false;
try {
Expand Down

0 comments on commit bfc8f66

Please sign in to comment.