diff --git a/bench.js b/bench.js new file mode 100644 index 0000000..e04bbc7 --- /dev/null +++ b/bench.js @@ -0,0 +1,43 @@ +'use strict'; + +const Benchmark = require('benchmark'); +const PQueue = require('./'); + +const suite = new Benchmark.Suite(); + +suite + .add('baseline', deferred => { + const queue = new PQueue(); + for (let i = 0; i < 100; i++) { + queue.add(() => Promise.resolve()); + } + queue.onEmpty().then(() => deferred.resolve()); + }) + .add('operation with random priority', deferred => { + const queue = new PQueue(); + for (let i = 0; i < 100; i++) { + queue.add(() => Promise.resolve(), { + priority: Math.random() * 100 | 0 + }); + } + queue.onEmpty().then(() => deferred.resolve()); + }) + .add('operation with increasing priority', deferred => { + const queue = new PQueue(); + for (let i = 0; i < 100; i++) { + queue.add(() => Promise.resolve(), { + priority: i + }); + } + queue.onEmpty().then(() => deferred.resolve()); + }) + .on('cycle', event => { + console.log(String(event.target)); + }) + .on('complete', function () { + console.log('Fastest is ' + this.filter('fastest').map('name')); + }) +.run({ + defer: true, + async: true +}); diff --git a/index.js b/index.js index ed2f71c..36df598 100644 --- a/index.js +++ b/index.js @@ -1,16 +1,63 @@ 'use strict'; +// Port of lower_bound from http://en.cppreference.com/w/cpp/algorithm/lower_bound +// Used to compute insertion index to keep queue sorted after insertion +function lowerBound(array, value, comp) { + let first = 0; + let count = array.length; + while (count > 0) { + const step = (count / 2) | 0; + let it = first + step; + if (comp(array[it], value) <= 0) { + first = ++it; + count -= step + 1; + } else { + count = step; + } + } + + return first; +} + +class PriorityQueue { + constructor() { + this._queue = []; + } + enqueue(run, opts) { + opts = Object.assign({ + priority: 0 + }, opts); + + const element = {priority: opts.priority, run}; + + if (this.size && this._queue[this.size - 1].priority >= opts.priority) { + this._queue.push(element); + return; + } + + const index = lowerBound(this._queue, element, (a, b) => b.priority - a.priority); + this._queue.splice(index, 0, element); + } + dequeue() { + return this._queue.shift().run; + } + get size() { + return this._queue.length; + } +} + class PQueue { constructor(opts) { opts = Object.assign({ - concurrency: Infinity + concurrency: Infinity, + queueClass: PriorityQueue }, opts); if (opts.concurrency < 1) { throw new TypeError('Expected `concurrency` to be a number from 1 and up'); } - this.queue = []; + this.queue = new opts.queueClass(); // eslint-disable-line new-cap this._pendingCount = 0; this._concurrency = opts.concurrency; this._resolveEmpty = () => {}; @@ -18,13 +65,13 @@ class PQueue { _next() { this._pendingCount--; - if (this.queue.length > 0) { - this.queue.shift()(); + if (this.queue.size > 0) { + this.queue.dequeue()(); } else { this._resolveEmpty(); } } - add(fn) { + add(fn, opts) { return new Promise((resolve, reject) => { const run = () => { this._pendingCount++; @@ -44,7 +91,7 @@ class PQueue { if (this._pendingCount < this._concurrency) { run(); } else { - this.queue.push(run); + this.queue.enqueue(run, opts); } }); } @@ -58,7 +105,7 @@ class PQueue { }); } get size() { - return this.queue.length; + return this.queue.size; } get pending() { return this._pendingCount; diff --git a/package.json b/package.json index 7a8051d..a40f95a 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,8 @@ "node": ">=4" }, "scripts": { - "test": "xo && ava" + "test": "xo && ava", + "bench": "node bench" }, "files": [ "index.js" @@ -40,6 +41,7 @@ ], "devDependencies": { "ava": "*", + "benchmark": "^2.1.2", "delay": "^1.3.1", "in-range": "^1.0.0", "random-int": "^1.0.0", diff --git a/readme.md b/readme.md index 76861bd..a1ff40e 100644 --- a/readme.md +++ b/readme.md @@ -54,11 +54,17 @@ Minimum: `1` Concurrency limit. +##### queueClass + +Type: `Function` + +Class with `enqueue`, `dequeue` method and `size` getter. See [Custom QueueClass](#custom-queueclass) section. + ### queue `PQueue` instance. -#### .add(fn) +#### .add(fn, [options]) Returns the promise returned by calling `fn`. @@ -68,6 +74,17 @@ Type: `Function` Promise-returning/async function. +#### options + +Type: `Object` + +##### priority + +Type: `number`
+Default: `0` + +Priority of operation. Operations with greater priority will be scheduled first. + #### .onEmpty() Returns a promise that settles when the queue becomes empty. @@ -141,6 +158,29 @@ $ node example.js 12. Queue is empty again ``` +## Custom QueueClass + +For implementing more complex scheduling policies, you can provide a QueueClass in the options: + +```js +class QueueClass { + constructor() { + this._queue = []; + } + enqueue(run, options) { + this._queue.push(run); + } + dequeue() { + return this._queue.shift(); + } + get size() { + return this._queue.length; + } +} +``` + +`p-queue` will call corresponding methods to put and get operations from this queue. + ## Related diff --git a/test.js b/test.js index 36ff709..7e2c134 100644 --- a/test.js +++ b/test.js @@ -58,6 +58,17 @@ test('.add() - concurrency: 5', async t => { await Promise.all(input); }); +test('.add() - priority', async t => { + const result = []; + const queue = new PQueue({concurrency: 1}); + queue.add(async () => result.push(0), {priority: 0}); + queue.add(async () => result.push(1), {priority: 1}); + queue.add(async () => result.push(2), {priority: 1}); + queue.add(async () => result.push(3), {priority: 2}); + await queue.onEmpty(); + t.deepEqual(result, [0, 3, 1, 2]); +}); + test('.onEmpty()', async t => { const queue = new PQueue({concurrency: 1});