/* Copyright (c) 2014-2016 Richard Rodger, MIT License */
+ /* Copyright (c) 2014-2019 Richard Rodger, MIT License */
'use strict'
@@ -70,7 +70,7 @@ gate-executor.js
function make_GateExecutor (options) {
+ function make_GateExecutor(options) {
options = options || {}
options.interval = null == options.interval ? 111 : options.interval
options.timeout = null == options.timeout ? 2222 : options.timeout
@@ -102,7 +102,7 @@ gate-executor.js
- function GateExecutor (options, instance_counter) {
+ function GateExecutor(options, instance_counter) {
var self = this
Assert('object' === typeof options)
@@ -314,7 +314,7 @@ gate-executor.js
- function process () {
+ function processor() {
@@ -342,13 +342,13 @@ gate-executor.js
- Process the next work item, returning true
if there was one.
+ The timeout interval check is stopped and started only as needed.
- function next () {
- var res = false
- var work = null
+ if (!self.isclear() && !s.tm_in) {
+ s.tm_in = setInterval(timeout_check, options.interval)
+ }
@@ -359,6 +359,23 @@ gate-executor.js
+ Process the next work item, returning true
if there was one.
+
+
+
+ do {
+ var next = false
+ var work = null
+
+
+
+
+
+
+
+
+ ¶
+
Remove next work item from the front of the work queue.
@@ -375,11 +392,11 @@ gate-executor.js
-
+
Add work item to the work-in-progress set.
@@ -391,11 +408,11 @@ gate-executor.js
-
+
If work item is a gate, set the state of the instance as
gated. This work item will need to complete before later
@@ -408,11 +425,11 @@
gate-executor.js
-
+
Call the work item function (which does the real work),
passing a callback. This callback has no arguments
@@ -422,30 +439,15 @@
gate-executor.js
- work.fn(function () {
-
-
-
-
-
-
-
-
- ¶
-
- Remove the work item from the work-in-progress set. As
-work items may complete out of order, prune the history
-from the front until the first incomplete work
-item. Later complete work items will eventually be
-reached on another processing round.
+ work.start = Date.now()
+ work.callback = make_work_fn_callback(work)
-
-
- progress.lookup[work.id].done = true
- delete progress.lookup[work.id]
- while (progress.history[0] && progress.history[0].done) {
- progress.history.shift()
- }
+ timeout_checklist.push(work)
+ work.fn(work.callback)
+
+ next = true
+ }
+ } while (next)
@@ -456,15 +458,11 @@ gate-executor.js
- If the work item was a gate, it is now complete, and the
-instance can be ungated, allowing later work items in the
-queue to be processed.
+ Keep processing work items until none are left or a gate is reached.
- if (work.gate) {
- s.gate = false
- }
+ }
@@ -475,25 +473,15 @@ gate-executor.js
- If work queue and work-in-progress set are empty, then
-call the registered clear functions.
+ Create the callback for the work function
- if (0 === q.length && 0 === progress.history.length) {
- clearInterval(s.tm_in)
- s.tm_in = null
-
- if (s.firstclear) {
- var fc = s.firstclear
- s.firstclear = null
- fc()
- }
-
- if (s.clear) {
- s.clear()
- }
- }
+ function make_work_fn_callback(work) {
+ return function work_fn_callback() {
+ if (work.done) {
+ return
+ }
@@ -504,19 +492,24 @@ gate-executor.js
- Process each work item on next tick to avoid lockups.
+ Remove the work item from the work-in-progress set. As
+work items may complete out of order, prune the history
+from the front until the first incomplete work
+item. Later complete work items will eventually be
+reached on another processing round.
- setImmediate(function () {
- process()
- })
- })
+ work.done = true
+ delete progress.lookup[work.id]
- res = true
+ while (progress.history[0] && progress.history[0].done) {
+ progress.history.shift()
}
- return res
- }
+
+ while (timeout_checklist[0] && timeout_checklist[0].done) {
+ timeout_checklist.shift()
+ }
@@ -527,12 +520,15 @@ gate-executor.js
- Keep processing work items until none are left or a gate is reached.
+ If the work item was a gate, it is now complete, and the
+instance can be ungated, allowing later work items in the
+queue to be processed.
- while (next()) {}
- }
+ if (work.gate) {
+ s.gate = false
+ }
@@ -543,25 +539,25 @@ gate-executor.js
- Wrapper function to construct the timeout check.
+ If work queue and work-in-progress set are empty, then
+call the registered clear functions.
function timeout (work) {
- Assert('object' === typeof work)
- Assert('function' === typeof work.fn)
-
- work.finished = false
- work.orig_fn = work.fn
+ if (0 === q.length && 0 === progress.history.length) {
+ clearInterval(s.tm_in)
+ s.tm_in = null
- var timeout_fn = function (callback) {
- Assert('function' === typeof callback)
-
- work.callback = callback
- work.start = Date.now()
- timeout_checklist.push(work)
+ if (s.firstclear) {
+ var fc = s.firstclear
+ s.firstclear = null
+ fc()
+ }
- work.orig_fn(function () {
+ if (s.clear) {
+ s.clear()
+ }
+ }
gate-executor.js
-Absorb multiple callbacks.
+Process each work item on next tick to avoid lockups.
if (work.finished) {
- return
- }
- work.finished = true
- work.callback()
- })
+ setImmediate(processor)
}
-
- return timeout_fn
}
@@ -597,34 +586,29 @@ gate-executor.js
¶
To be run periodically via setInterval. For timed out work items,
-calls the done callback to allow work queue to proceed, and makes
+calls the done callback to allow work queue to proceed, and marks
the work item as finished. Work items can receive notification of
-timeouts by providing an on_timeout
callback property in the
+timeouts by providing an ontm
callback property in the
work definition object. Work items must handle timeout errors
themselves, gate-executor cares only for the fact that a timeout
happened, so it can continue processing.
function timeout_check () {
+ function timeout_check() {
var now = Date.now()
var work = null
for (var i = 0; i < timeout_checklist.length; ++i) {
work = timeout_checklist[i]
- if (!work.gate && !work.finished && work.tm < now - work.start) {
- work.finished = true
- work.callback()
-
+ if (!work.gate && !work.done && work.tm < now - work.start) {
if (work.ontm) {
- work.ontm()
+ work.ontm(work.tm,work.start,now)
}
- }
- }
- while (timeout_checklist[0] && timeout_checklist[0].finished) {
- work = timeout_checklist.shift()
+ work.callback()
+ }
}
}
@@ -644,7 +628,7 @@ gate-executor.js
self.start = function (firstclear) {
+ self.start = function(firstclear) {
Assert(null == firstclear || 'function' === typeof firstclear)
@@ -660,31 +644,14 @@ gate-executor.js
setImmediate(function () {
+ setImmediate(function() {
s.running = true
if (firstclear) {
s.firstclear = firstclear
- }
-
-
-
-
-
The timeout interval check is stopped and started only as needed.
- - if (!s.tm_in) {
- s.tm_in = setInterval(timeout_check, options.interval)
}
- process()
+ processor()
})
return self
@@ -693,11 +660,11 @@ gate-executor.js
-
Pause the processing of work items. Newly added items, and items not yet started, will not proceed, but items already in progress @@ -706,18 +673,18 @@
gate-executor.js
self.pause = function () {
+ self.pause = function() {
s.running = false
}
-
Submit a function that will be called each time there are no more work items to process. Multiple calls to this method will replace @@ -725,7 +692,7 @@
gate-executor.js
self.clear = function (done) {
+ self.clear = function(done) {
Assert('function' === typeof done)
s.clear = done
return self
@@ -734,28 +701,28 @@ gate-executor.js
-
+
- self.isclear = function () {
- return (0 === q.length && 0 === progress.history.length)
+ self.isclear = function() {
+ return 0 === q.length && 0 === progress.history.length
}
-
+
Add a work item. This is an object with fields:
@@ -768,13 +735,14 @@ gate-executor.js
id
(string): identifier for the work item. Optional.
tm
(integer): millisecond timeout specific to this work item,
overrides general timeout. Optional.
+ontm
(function): callback to indicate work item timeout. Optional.
dn
(string): description of the work item, used in the
state description. Optional.
- self.add = function (work) {
+ self.add = function(work) {
Assert('object' === typeof work)
Assert('function' === typeof work.fn)
Assert(null == work.id || 'string' === typeof work.id)
@@ -785,10 +753,24 @@ gate-executor.js
work.id = work.id || '' + s.work_counter
work.ge = self.id
work.tm = null == work.tm ? options.timeout : work.tm
- work.dn = work.dn || work.fn.name || '' + Date.now()
- work.fn = timeout(work)
+ work.dn = work.dn || work.fn.name || '' + Date.now()
+
+
+
+
+
+
+
+
+ ¶
+
+ Used by calling code to store additional context.
+
+
+ work.ctxt = {}
+
q.push(work)
if (s.running) {
@@ -808,9 +790,7 @@ gate-executor.js
- setImmediate(function () {
- process()
- })
+ setImmediate(processor)
}
return self
@@ -837,10 +817,10 @@ gate-executor.js
- self.gate = function () {
+ self.gate = function() {
var ge = new GateExecutor(options, instance_counter)
- var fn = function gate (done) {
+ var fn = function gate(done) {
@@ -860,7 +840,7 @@ gate-executor.js
ge.start(done)
}
- self.add({gate: ge, fn: fn})
+ self.add({ gate: ge, fn: fn })
return ge
}
@@ -880,8 +860,7 @@ gate-executor.js
-
- self.state = function () {
+ self.state = function() {
var out = []
@@ -900,7 +879,7 @@ gate-executor.js
for (var hI = 0; hI < progress.history.length; ++hI) {
var pe = progress.history[hI]
if (!pe.done) {
- out.push({s: 'a', ge: pe.ge, d: pe.dn, id: pe.id})
+ out.push({ s: 'a', ge: pe.ge, dn: pe.dn, id: pe.id })
}
}
@@ -935,10 +914,16 @@ gate-executor.js
out.push(qe.gate.state())
+ } else {
+ out.push({ s: 'w', ge: qe.ge, dn: qe.dn, id: qe.id })
}
- else {
- out.push({s: 'w', ge: qe.ge, d: qe.dn, id: qe.id})
- }
+ }
+
+ out.internal = {
+ qlen: q.length,
+ hlen: progress.history.length,
+ klen: Object.keys(progress.lookup).length,
+ tlen: timeout_check.length
}
return out