Skip to content

Commit

Permalink
Merge pull request #4 from sean256/fix/stop_multiple_done_when_empty_…
Browse files Browse the repository at this point in the history
…queue

Fix/stop multiple done when empty queue
  • Loading branch information
sean256 committed Mar 7, 2018
2 parents 53e8888 + 6f79995 commit aa70616
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 10 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "partition-queue",
"version": "1.0.3",
"version": "1.0.4",
"description": "A dead simple partitioned asynchronous queue with adjustable concurrency. Jobs with the same key are guaranteed to be processed in order.",
"main": "index.js",
"scripts": {
Expand Down
25 changes: 17 additions & 8 deletions partition-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ class PartitionQueue extends EventEmitter {
this.startPromiseResolve = null;
}

done() {
this.emit('done');
if (this.startPromiseResolve) {
this.startPromiseResolve();
this.startPromiseResolve = null;
}
}

/**
* Add a job
* @param {string} key The partition string to use. May be a non string when
Expand Down Expand Up @@ -42,6 +50,11 @@ class PartitionQueue extends EventEmitter {
const { queues } = this;
return new Promise((resolve) => {
this.startPromiseResolve = resolve;
if (!this.remaining) {
// empty queue
this.done();
return;
}
queues.forEach((queue, queueNumber) => {
if (!queue.running) {
this.next(queueNumber);
Expand All @@ -54,8 +67,8 @@ class PartitionQueue extends EventEmitter {
const { queues, timeout: timeoutMs } = this;
const queue = queues[queueNumber];
const job = queue.shift();
queue.running = true;
if (job) {
queue.running = true;
let timeout;
let doneCalled = false;
const done = (error, result) => {
Expand Down Expand Up @@ -84,14 +97,10 @@ class PartitionQueue extends EventEmitter {
done(error);
}
} else {
queue.running = false;
if (this.remaining === 0) {
this.emit('done');
if (this.startPromiseResolve) {
this.startPromiseResolve();
this.startPromiseResolve = null;
}
if (queue.running && this.remaining === 0) {
this.done();
}
queue.running = false;
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,12 @@ describe('PartitionQueue', () => {
const q = new PartitionQueue({ autostart: true, timeout: 50 });
const key = 'some key';
const job = () => {};
let timeoutCalled = false;
q.on('timeout', () => {
timeoutCalled = true;
});
q.on('error', () => {
assert.equal(timeoutCalled, true);
done();
});
q.push(key, job);
Expand All @@ -221,6 +226,9 @@ describe('PartitionQueue', () => {
q.on('timeout', () => {
done();
});
q.on('error', () => {
// to prevent an uncaught error
});
q.push(key, job);
});

Expand Down Expand Up @@ -392,4 +400,19 @@ describe('PartitionQueue', () => {

q.push(key, job);
});

it('Empty queue with concurrency > 1 does not emit done more than once', (done) => {
const q = new PartitionQueue({ concurrency: 5 });
let doneCalls = 0;
q.on('done', () => {
doneCalls += 1;
setImmediate(() => {
assert.equal(doneCalls, 1);
done();
});
});
setImmediate(() => {
q.start();
});
});
});

0 comments on commit aa70616

Please sign in to comment.