Skip to content

Commit

Permalink
Merge bcb2098 into acc656f
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-van committed Oct 19, 2021
2 parents acc656f + bcb2098 commit 616eda7
Show file tree
Hide file tree
Showing 14 changed files with 504 additions and 231 deletions.
206 changes: 42 additions & 164 deletions src/Queue.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import assert from 'nanoassert'
import Deferred from './Deferred.mjs'
import asyncWrap from './asyncWrap.mjs'
import CancelledError from './CancelledError.mjs'
import queueMicrotask from './queueMicrotask.mjs'

/**
* A class representing a queue.
Expand Down Expand Up @@ -47,11 +48,9 @@ class Queue {
assert(Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY,
'concurrency must be an integer or positive infinity')
assert(concurrency > 0, 'concurrency must be greater than 0')
if (concurrency !== Number.POSITIVE_INFINITY) {
this._queue = new _InternalQueuePriority(concurrency)
} else {
this._queue = new _InternalInfinityQueue()
}
this._concurrency = concurrency
this._iqueue = []
this._running = 0
}

/**
Expand All @@ -62,7 +61,7 @@ class Queue {
* @returns {number} ignore
*/
get concurrency () {
return this._queue.concurrency
return this._concurrency
}

/**
Expand All @@ -73,7 +72,7 @@ class Queue {
* @returns {number} ignore
*/
get running () {
return this._queue.running
return this._running
}

/**
Expand All @@ -84,7 +83,7 @@ class Queue {
* @returns {number} ignore
*/
get pending () {
return this._queue.pending
return this._iqueue.length - this.running
}

/**
Expand All @@ -99,7 +98,7 @@ class Queue {
* than the promise returned by the call to `fct`.
*/
async exec (fct, priority = 0) {
return this._queue.exec(fct, priority)
return this.execCancellable(fct, priority)[0]
}

/**
Expand All @@ -123,79 +122,17 @@ class Queue {
* `false` in any other case.
*/
execCancellable (fct, priority = 0) {
return this._queue.execCancellable(fct, priority)
}

/**
* Cancels all pending tasks. Their corresponding promises will be rejected with a `CancelledError`. This method will
* not alter tasks that are already running.
*
* @returns {number} The number of pending tasks that were effectively cancelled.
*/
cancelAllPending () {
return this._queue.cancelAllPending()
}
}

export default Queue

/**
* @ignore
*/
class _InternalQueuePriority {
/**
* @ignore
*
* @param {number} concurrency ignore
*/
constructor (concurrency) {
this._concurrency = concurrency
this._iqueue = []
this._running = 0
}

/**
* @ignore
* @returns {number} ignore
*/
get concurrency () {
return this._concurrency
}

/**
* @ignore
* @returns {number} ignore
*/
get running () {
return this._running
}

/**
* @ignore
* @returns {number} ignore
*/
get pending () {
return this._iqueue.length - this.running
}

/**
* @ignore
*
* @param {*} fct ignored
* @param {*} priority ignored
* @returns {*} ignored
*/
async exec (fct, priority) {
return this.execCancellable(fct, priority)[0]
return this._execCancellableInternal(fct, priority)
}

/**
* @ignore
* @param {*} fct ignore
* @param {*} priority ignore
* @param {*} CancelledErrorClass ignore
* @returns {*} ignore
*/
execCancellable (fct, priority) {
_execCancellableInternal (fct, priority, CancelledErrorClass = CancelledError) {
assert(typeof fct === 'function', 'fct must be a function')
assert(typeof priority === 'number', 'priority must be a number')
const deferred = new Deferred()
Expand All @@ -210,23 +147,22 @@ class _InternalQueuePriority {
const task = {
asyncFct: asyncWrap(fct),
deferred,
running: false,
priority
priority,
state: 'pending',
CancelledErrorClass
}
this._iqueue.splice(i, 0, task)
this._checkQueue()
return [deferred.promise, () => {
if (task.running) {
if (task.state !== 'pending' && task.state !== 'scheduled') {
return false
} else {
task.state = 'cancelled'
const filtered = this._iqueue.filter((v) => v !== task)
if (filtered.length < this._iqueue.length) {
this._iqueue = filtered
deferred.reject(new CancelledError())
return true
} else {
return false
}
assert(filtered.length < this._iqueue.length)
this._iqueue = filtered
deferred.reject(new task.CancelledErrorClass())
return true
}
}]
}
Expand All @@ -241,99 +177,41 @@ class _InternalQueuePriority {
if (this.running === this.concurrency) {
return
}
const task = this._iqueue.find((v) => !v.running)
const task = this._iqueue.find((v) => v.state === 'pending')
if (task === undefined) {
return
}
task.running = true
task.state = 'scheduled'
this._running += 1
task.asyncFct().finally(() => {
this._running -= 1
this._iqueue = this._iqueue.filter((v) => v !== task)
this._checkQueue()
}).then(task.deferred.resolve, task.deferred.reject)
queueMicrotask(() => {
if (task.state === 'cancelled') {
return
}
task.state = 'running'
task.asyncFct().finally(() => {
this._running -= 1
this._iqueue = this._iqueue.filter((v) => v !== task)
}).then(task.deferred.resolve, task.deferred.reject).then(() => {
this._checkQueue()
})
})
}
}

/**
* @ignore
* @returns {*} ignore
* Cancels all pending tasks. Their corresponding promises will be rejected with a `CancelledError`. This method will
* not alter tasks that are already running.
*
* @returns {number} The number of pending tasks that were effectively cancelled.
*/
cancelAllPending () {
const toCancel = this._iqueue.filter((task) => !task.running)
this._iqueue = this._iqueue.filter((task) => task.running)
const toCancel = this._iqueue.filter((task) => task.state === 'pending' || task.state === 'scheduled')
this._iqueue = this._iqueue.filter((task) => task.state !== 'pending' && task.state !== 'scheduled')
toCancel.forEach((task) => {
task.deferred.reject(new CancelledError())
task.deferred.reject(new task.CancelledErrorClass())
})
return toCancel.length
}
}

/**
* @ignore
*/
class _InternalInfinityQueue {
/**
* @ignore
*/
constructor () {
this._running = 0
}

/**
* @ignore
* @returns {number} ignore
*/
get concurrency () {
return Number.POSITIVE_INFINITY
}

/**
* @ignore
* @returns {number} ignore
*/
get running () {
return this._running
}

/**
* @ignore
* @returns {number} ignore
*/
get pending () {
return 0
}

/**
* @ignore
*
* @param {Function} fct ignore
* @returns {Promise} ignore
*/
async exec (fct) {
return this.execCancellable(fct)[0]
}

/**
* @ignore
*
* @param {*} fct ignore
* @returns {*} ignore
*/
execCancellable (fct) {
this._running += 1
const asyncFct = asyncWrap(fct)
const p = asyncFct()
return [p.finally(() => {
this._running -= 1
}), () => false]
}

/**
* @ignore
* @returns {*} ignore
*/
cancelAllPending () {
return 0
}
}
export default Queue
Loading

0 comments on commit 616eda7

Please sign in to comment.