Skip to content

Commit

Permalink
Merge pull request #82 from jbr/dollarsign-as-default-queue-event-id
Browse files Browse the repository at this point in the history
$ as default queue event id #76
  • Loading branch information
manast committed Dec 16, 2019
2 parents 5410d23 + 878fc0d commit 7a6b2ee
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 31 deletions.
3 changes: 1 addition & 2 deletions src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ export class Queue3<T = any> extends EventEmitter {
* The name of the queue
*/
name: string;
queueEvents: QueueEvents;

private opts: CommonOptions;

private readonly queue: Queue;
private queueEvents: QueueEvents;
private worker: Worker;
private queueScheduler: QueueScheduler;

Expand Down
2 changes: 1 addition & 1 deletion src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class QueueEvents extends QueueBase {
const opts: QueueEventsOptions = this.opts;

const key = this.keys.events;
let id = opts.lastEventId || '0-0';
let id = opts.lastEventId || '$';

while (!this.closing) {
try {
Expand Down
75 changes: 47 additions & 28 deletions src/test/test_compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,23 +369,37 @@ describe('Compat', function() {
});
});

it('should listen to global events', function(done) {
let state: string;
queue.on('global:waiting', function() {
expect(state).to.be.undefined;
state = 'waiting';
});
queue.once('global:active', function() {
expect(state).to.be.equal('waiting');
state = 'active';
});
queue.once('global:completed', async function() {
expect(state).to.be.equal('active');
done();
});
it('should listen to global events with .once', async function() {
const events: string[] = [];
queue.once('global:waiting', () => events.push('waiting'));
queue.once('global:active', () => events.push('active'));
queue.once('global:completed', () => events.push('completed'));
await queue.queueEvents.waitUntilReady();
await queue.add('test', {});
await queue.add('test', {});
await queue.process(() => null);
await delay(50);
expect(events).to.eql(['waiting', 'active', 'completed']);
});

queue.add('test', {});
queue.process(async () => {});
it('should listen to global events with .on', async function() {
const events: string[] = [];
queue.on('global:waiting', () => events.push('waiting'));
queue.on('global:active', () => events.push('active'));
queue.on('global:completed', () => events.push('completed'));
await queue.queueEvents.waitUntilReady();
await queue.add('test', {});
await queue.add('test', {});
await queue.process(() => null);
await delay(50);
expect(events).to.eql([
'waiting',
'waiting',
'active',
'completed',
'active',
'completed',
]);
});
});

Expand Down Expand Up @@ -443,6 +457,17 @@ describe('Compat', function() {
isResumed = true,
first = true;

queue.on('global:paused', async () => {
isPaused = false;
await queue.resume();
});

queue.on('global:resumed', () => {
isResumed = true;
});

await queue.queueEvents.waitUntilReady();

const processPromise = new Promise((resolve, reject) => {
process = async (job: Job) => {
try {
Expand All @@ -468,15 +493,6 @@ describe('Compat', function() {
queue.add('test', { foo: 'paused' });
queue.add('test', { foo: 'paused' });

queue.on('global:paused', async () => {
isPaused = false;
await queue.resume();
});

queue.on('global:resumed', () => {
isResumed = true;
});

return processPromise;
});

Expand Down Expand Up @@ -619,9 +635,7 @@ describe('Compat', function() {
it('pauses fast when queue is drained', async function() {
await queue.process(async () => {});

await queue.add('test', {});

return new Promise((resolve, reject) => {
const promise = new Promise((resolve, reject) => {
queue.on('global:drained', async () => {
try {
const start = new Date().getTime();
Expand All @@ -635,6 +649,11 @@ describe('Compat', function() {
}
});
});

await queue.queueEvents.waitUntilReady();

await queue.add('test', {});
return promise;
});
});
});

0 comments on commit 7a6b2ee

Please sign in to comment.