Navigation Menu

Skip to content

Commit

Permalink
feat(queue-events): launch without launching process (#750)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Sep 16, 2021
1 parent e1cd8bb commit 23a2360
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 7 deletions.
1 change: 1 addition & 0 deletions .eslintrc.json
Expand Up @@ -48,6 +48,7 @@
}
],
"arrow-parens": [2, "as-needed", { "requireForBlockBody": false }],
"curly": "error",
"no-async-promise-executor": 0,
"no-extraneous-class": 0,
"@typescript-eslint/no-inferrable-types": 2,
Expand Down
30 changes: 26 additions & 4 deletions src/classes/queue-events.ts
Expand Up @@ -182,7 +182,12 @@ export declare interface QueueEvents {
*
*/
export class QueueEvents extends QueueBase {
constructor(name: string, { connection, ...opts }: QueueEventsOptions = {}) {
private running = false;

constructor(
name: string,
{ connection, autorun = true, ...opts }: QueueEventsOptions = {},
) {
super(name, {
...opts,
connection: isRedisInstance(connection)
Expand All @@ -197,12 +202,28 @@ export class QueueEvents extends QueueBase {
this.opts,
);

this.consumeEvents().catch(err => this.emit('error', err));
if (autorun) {
this.run().catch(error => this.emit('error', error));
}
}

private async consumeEvents() {
const client = await this.client;
async run(): Promise<void> {
if (!this.running) {
try {
this.running = true;
const client = await this.client;

await this.consumeEvents(client);
} catch (error) {
this.running = false;
throw error;
}
} else {
throw new Error('Queue Events is already running.');
}
}

private async consumeEvents(client: RedisClient) {
const opts: QueueEventsOptions = this.opts;

const key = this.keys.events;
Expand Down Expand Up @@ -253,6 +274,7 @@ export class QueueEvents extends QueueBase {
if (isNotConnectionError(err)) {
throw err;
}

await delay(DELAY_TIME_5);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/classes/redis-connection.ts
Expand Up @@ -118,7 +118,7 @@ export class RedisConnection extends EventEmitter {
if (client.status !== 'end') {
let _resolve, _reject;

const disconnecting = new Promise((resolve, reject) => {
const disconnecting = new Promise<void>((resolve, reject) => {
client.once('end', resolve);
client.once('error', reject);
_resolve = resolve;
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/queue-options.ts
Expand Up @@ -63,6 +63,7 @@ export interface QueueOptions extends QueueBaseOptions {
* Options for QueueEvents
*/
export interface QueueEventsOptions extends QueueBaseOptions {
autorun?: boolean;
/**
* Last event Id. If provided it is possible to continue
* consuming events from a known Id instead of from the last
Expand Down
48 changes: 47 additions & 1 deletion src/test/test_events.ts
Expand Up @@ -6,7 +6,7 @@ import { FlowProducer, Queue, QueueEvents, Worker } from '../classes';
import { delay, removeAllQueueData } from '../utils';

describe('events', function() {
this.timeout(4000);
this.timeout(6000);
let queue: Queue;
let queueEvents: QueueEvents;
let queueName: string;
Expand All @@ -24,6 +24,52 @@ describe('events', function() {
await removeAllQueueData(new IORedis(), queueName);
});

describe('when autorun option is provided as false', function() {
it('emits waiting when a job has been added', async () => {
const queueName2 = `test-${v4()}`;
const queue2 = new Queue(queueName2);
const queueEvents2 = new QueueEvents(queueName2, { autorun: false });
await queueEvents2.waitUntilReady();

const waiting = new Promise(resolve => {
queue2.on('waiting', resolve);
});

const running = queueEvents2.run();

await queue2.add('test', { foo: 'bar' });

await waiting;

await queue2.close();
await queueEvents2.close();
await expect(running).to.have.been.fulfilled;
await removeAllQueueData(new IORedis(), queueName2);
});

describe('when run method is called when queueEvent is running', function() {
it('throws error', async () => {
const queueName2 = `test-${v4()}`;
const queue2 = new Queue(queueName2);
const queueEvents2 = new QueueEvents(queueName2, { autorun: false });
await queueEvents2.waitUntilReady();

const running = queueEvents2.run();

await queue2.add('test', { foo: 'bar' });

await expect(queueEvents2.run()).to.be.rejectedWith(
'Queue Events is already running.',
);

await queue2.close();
await queueEvents2.close();
await expect(running).to.have.been.fulfilled;
await removeAllQueueData(new IORedis(), queueName2);
});
});
});

it('should emit waiting when a job has been added', async function() {
const waiting = new Promise(resolve => {
queue.on('waiting', resolve);
Expand Down
2 changes: 1 addition & 1 deletion src/test/test_worker.ts
Expand Up @@ -518,7 +518,7 @@ describe('workers', function() {
});

describe('when autorun option is provided as false', function() {
it('process several jobs serially using process option as false', async () => {
it('processes several jobs serially using process option as false', async () => {
let counter = 1;
const maxJobs = 10;

Expand Down

0 comments on commit 23a2360

Please sign in to comment.