Skip to content

Commit

Permalink
Simple PriorityQueue (#4)
Browse files Browse the repository at this point in the history
Fixes #1
  • Loading branch information
floatdrop authored and sindresorhus committed Nov 17, 2016
1 parent e02801b commit 44a340b
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 9 deletions.
43 changes: 43 additions & 0 deletions bench.js
@@ -0,0 +1,43 @@
'use strict';

const Benchmark = require('benchmark');
const PQueue = require('./');

const suite = new Benchmark.Suite();

suite
.add('baseline', deferred => {
const queue = new PQueue();
for (let i = 0; i < 100; i++) {
queue.add(() => Promise.resolve());
}
queue.onEmpty().then(() => deferred.resolve());
})
.add('operation with random priority', deferred => {
const queue = new PQueue();
for (let i = 0; i < 100; i++) {
queue.add(() => Promise.resolve(), {
priority: Math.random() * 100 | 0
});
}
queue.onEmpty().then(() => deferred.resolve());
})
.add('operation with increasing priority', deferred => {
const queue = new PQueue();
for (let i = 0; i < 100; i++) {
queue.add(() => Promise.resolve(), {
priority: i
});
}
queue.onEmpty().then(() => deferred.resolve());
})
.on('cycle', event => {
console.log(String(event.target));
})
.on('complete', function () {
console.log('Fastest is ' + this.filter('fastest').map('name'));
})
.run({
defer: true,
async: true
});
61 changes: 54 additions & 7 deletions index.js
@@ -1,30 +1,77 @@
'use strict';

// Port of lower_bound from http://en.cppreference.com/w/cpp/algorithm/lower_bound
// Used to compute insertion index to keep queue sorted after insertion
function lowerBound(array, value, comp) {
let first = 0;
let count = array.length;
while (count > 0) {
const step = (count / 2) | 0;
let it = first + step;
if (comp(array[it], value) <= 0) {
first = ++it;
count -= step + 1;
} else {
count = step;
}
}

return first;
}

class PriorityQueue {
constructor() {
this._queue = [];
}
enqueue(run, opts) {
opts = Object.assign({
priority: 0
}, opts);

const element = {priority: opts.priority, run};

if (this.size && this._queue[this.size - 1].priority >= opts.priority) {
this._queue.push(element);
return;
}

const index = lowerBound(this._queue, element, (a, b) => b.priority - a.priority);
this._queue.splice(index, 0, element);
}
dequeue() {
return this._queue.shift().run;
}
get size() {
return this._queue.length;
}
}

class PQueue {
constructor(opts) {
opts = Object.assign({
concurrency: Infinity
concurrency: Infinity,
queueClass: PriorityQueue
}, opts);

if (opts.concurrency < 1) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}

this.queue = [];
this.queue = new opts.queueClass(); // eslint-disable-line new-cap
this._pendingCount = 0;
this._concurrency = opts.concurrency;
this._resolveEmpty = () => {};
}
_next() {
this._pendingCount--;

if (this.queue.length > 0) {
this.queue.shift()();
if (this.queue.size > 0) {
this.queue.dequeue()();
} else {
this._resolveEmpty();
}
}
add(fn) {
add(fn, opts) {
return new Promise((resolve, reject) => {
const run = () => {
this._pendingCount++;
Expand All @@ -44,7 +91,7 @@ class PQueue {
if (this._pendingCount < this._concurrency) {
run();
} else {
this.queue.push(run);
this.queue.enqueue(run, opts);
}
});
}
Expand All @@ -58,7 +105,7 @@ class PQueue {
});
}
get size() {
return this.queue.length;
return this.queue.size;
}
get pending() {
return this._pendingCount;
Expand Down
4 changes: 3 additions & 1 deletion package.json
Expand Up @@ -13,7 +13,8 @@
"node": ">=4"
},
"scripts": {
"test": "xo && ava"
"test": "xo && ava",
"bench": "node bench"
},
"files": [
"index.js"
Expand All @@ -40,6 +41,7 @@
],
"devDependencies": {
"ava": "*",
"benchmark": "^2.1.2",
"delay": "^1.3.1",
"in-range": "^1.0.0",
"random-int": "^1.0.0",
Expand Down
42 changes: 41 additions & 1 deletion readme.md
Expand Up @@ -54,11 +54,17 @@ Minimum: `1`

Concurrency limit.

##### queueClass

Type: `Function`

Class with `enqueue`, `dequeue` method and `size` getter. See [Custom QueueClass](#custom-queueclass) section.

### queue

`PQueue` instance.

#### .add(fn)
#### .add(fn, [options])

Returns the promise returned by calling `fn`.

Expand All @@ -68,6 +74,17 @@ Type: `Function`

Promise-returning/async function.

#### options

Type: `Object`

##### priority

Type: `number`<br>
Default: `0`

Priority of operation. Operations with greater priority will be scheduled first.

#### .onEmpty()

Returns a promise that settles when the queue becomes empty.
Expand Down Expand Up @@ -141,6 +158,29 @@ $ node example.js
12. Queue is empty again
```

## Custom QueueClass

For implementing more complex scheduling policies, you can provide a QueueClass in the options:

```js
class QueueClass {
constructor() {
this._queue = [];
}
enqueue(run, options) {
this._queue.push(run);
}
dequeue() {
return this._queue.shift();
}
get size() {
return this._queue.length;
}
}
```

`p-queue` will call corresponding methods to put and get operations from this queue.


## Related

Expand Down
11 changes: 11 additions & 0 deletions test.js
Expand Up @@ -58,6 +58,17 @@ test('.add() - concurrency: 5', async t => {
await Promise.all(input);
});

test('.add() - priority', async t => {
const result = [];
const queue = new PQueue({concurrency: 1});
queue.add(async () => result.push(0), {priority: 0});
queue.add(async () => result.push(1), {priority: 1});
queue.add(async () => result.push(2), {priority: 1});
queue.add(async () => result.push(3), {priority: 2});
await queue.onEmpty();
t.deepEqual(result, [0, 3, 1, 2]);
});

test('.onEmpty()', async t => {
const queue = new PQueue({concurrency: 1});

Expand Down

0 comments on commit 44a340b

Please sign in to comment.