From a391f83633f9772562a42fdf96bc56f1db9fe70b Mon Sep 17 00:00:00 2001 From: hobbyquaker Date: Sat, 2 Feb 2019 13:39:55 +0100 Subject: [PATCH] setValueQueued --- nodes/ccu-connection.js | 97 ++++++++++++++++++++++++++++++----------- 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/nodes/ccu-connection.js b/nodes/ccu-connection.js index bb0468b..16a986d 100644 --- a/nodes/ccu-connection.js +++ b/nodes/ccu-connection.js @@ -311,6 +311,7 @@ module.exports = function (RED) { this.setValueThrottle = 500; this.setValueTimers = {}; this.setValueCache = {}; + this.setValueQueue = []; this.lastEvent = {}; this.rxCounters = {}; @@ -1983,46 +1984,92 @@ module.exports = function (RED) { }); } - setValue(iface, address, datapoint, value, burst) { - const id = `${iface}.${address}.${datapoint}`; - value = this.paramCast(iface, address, 'VALUES', datapoint, value); - const params = [address, datapoint, value]; - if (iface === 'BidCos-RF' && burst) { - params.push(burst); + setValueQueued(iface, address, datapoint, value, burst) { + return new Promise((resolve, reject) => { + this.setValueQueue = this.setValueQueue.filter(el => { + return el.iface !== iface || el.address !== address || el.datapoint !== datapoint; + }); + this.setValueQueue.push({iface, address, datapoint, value, burst, resolve, reject}); + this.setValueShiftQueue(); + }); + } + + setValueShiftQueue() { + if (this.setValuePending || this.setValueQueue.length === 0) { + return; } + this.setValuePending = true; + const {iface, address, datapoint, value, burst, resolve, reject} = this.setValueQueue.shift(); + let timeout; + + this.setValuePendingTimeout = setTimeout(() => { + timeout = true; + reject(new Error('setValueQueued timeout')); + this.setValuePending = false; + this.setValueShiftQueue(); + }, 7500); + + this.setValue(iface, address, datapoint, value, burst) + .then(() => { + if (!timeout) { + resolve(); + } + }) + .catch(error => { + if (!timeout) { + reject(error); + } + }) + .finally(() => { + clearTimeout(this.setValuePendingTimeout); + if (!timeout) { + this.setValuePending = false; + setTimeout(() => { + this.setValueShiftQueue(); + }, 750); + } + }); + } - if (this.setValueTimers[id]) { - // Return new Promise(resolve => { - this.setValueCache[id] = params; - this.logger.debug('deferred', id); - // Resolve(); - // }); - } else { - if (iface === 'BidCos-RF') { - this.setValueTimers[id] = setTimeout(() => { - delete this.setValueTimers[id]; - this.setValueDeferred(id); - }, this.setValueThrottle); + setValue(iface, address, datapoint, value, burst) { + return new Promise((resolve, reject) => { + const id = `${iface}.${address}.${datapoint}`; + value = this.paramCast(iface, address, 'VALUES', datapoint, value); + const params = [address, datapoint, value]; + if (iface === 'BidCos-RF' && burst) { + params.push(burst); } - return new Promise((resolve, reject) => { + if (this.setValueTimers[id]) { + if (this.setValueCache[id] && typeof this.setValueCache[id].reject === 'function') { + this.setValueCache[id].reject(new Error('overwritten')); + } + this.setValueCache[id] = {params, resolve, reject}; + this.logger.debug('deferred', id); + } else { + if (iface !== 'BidCos-Wired') { + this.setValueTimers[id] = setTimeout(() => { + delete this.setValueTimers[id]; + this.setValueDeferred(id); + }, this.setValueThrottle); + } this.methodCall(iface, 'setValue', params).then(resolve).catch(err => { this.logger.error('rpc >', iface, 'setValue', JSON.stringify(params), '<', err); reject(err); }); - }); - } - return new Promise(resolve => resolve()); + } + }); } setValueDeferred(id) { if (this.setValueCache[id]) { - this.logger.debug('setValueDeferred', id, this.setValueCache[id]); + this.logger.debug('setValueDeferred', id, this.setValueCache[id].params); const [iface] = id.split('.'); - const params = this.setValueCache[id]; + const {params, resolve, reject} = this.setValueCache[id]; delete this.setValueCache[id]; - return this.methodCall(iface, 'setValue', params).catch(err => { + return this.methodCall(iface, 'setValue', params).then(resolve).catch(err => { this.logger.error('rpc >', iface, 'setValue', JSON.stringify(params), '<', err); + reject(err); }); } }