diff --git a/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js b/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js index 495862af8..77574f222 100644 --- a/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js +++ b/packages/node_modules/@node-red/nodes/core/sequence/19-batch.js @@ -179,6 +179,11 @@ module.exports = function(RED) { } node.pending = []; this.on("input", function(msg) { + if (msg.hasOwnProperty("reset")) { + node.pending = []; + node.pending_count = 0; + return; + } var queue = node.pending; queue.push(msg); node.pending_count++; @@ -204,11 +209,26 @@ module.exports = function(RED) { var interval = Number(n.interval || "0") *1000; var allow_empty_seq = n.allowEmptySequence; node.pending = [] - var timer = setInterval(function() { + function msgHandler() { send_interval(node, allow_empty_seq); node.pending_count = 0; - }, interval); + } + var timer = undefined; + if (interval > 0) { + timer = setInterval(msgHandler, interval); + } this.on("input", function(msg) { + if (msg.hasOwnProperty("reset")) { + if (timer !== undefined) { + clearInterval(timer); + } + node.pending = []; + node.pending_count = 0; + if (interval > 0) { + timer = setInterval(msgHandler, interval); + } + return; + } node.pending.push(msg); node.pending_count++; var max_msgs = max_kept_msgs_count(node); @@ -219,7 +239,9 @@ module.exports = function(RED) { } }); this.on("close", function() { - clearInterval(timer); + if (timer !== undefined) { + clearInterval(timer); + } node.pending = []; node.pending_count = 0; }); @@ -230,6 +252,11 @@ module.exports = function(RED) { }); node.pending = {}; this.on("input", function(msg) { + if (msg.hasOwnProperty("reset")) { + node.pending = {}; + node.pending_count = 0; + return; + } concat_msg(node, msg); }); this.on("close", function() { diff --git a/packages/node_modules/@node-red/nodes/locales/en-US/sequence/19-batch.html b/packages/node_modules/@node-red/nodes/locales/en-US/sequence/19-batch.html index 0564957fc..dbf00d287 100644 --- a/packages/node_modules/@node-red/nodes/locales/en-US/sequence/19-batch.html +++ b/packages/node_modules/@node-red/nodes/locales/en-US/sequence/19-batch.html @@ -39,4 +39,5 @@

Storing messages

This node will buffer messages internally in order to work across sequences. The runtime setting nodeMessageBufferMaxLength can be used to limit how many messages nodes will buffer.

+

If a message is received with the msg.reset property set, the buffered messages are deleted and not sent.

diff --git a/packages/node_modules/@node-red/nodes/locales/ja/sequence/19-batch.html b/packages/node_modules/@node-red/nodes/locales/ja/sequence/19-batch.html index 03f2f3f17..1306bdfa7 100644 --- a/packages/node_modules/@node-red/nodes/locales/ja/sequence/19-batch.html +++ b/packages/node_modules/@node-red/nodes/locales/ja/sequence/19-batch.html @@ -31,4 +31,6 @@

詳細

メッセージの蓄積

このノードの処理ではメッセージ列の処理のためメッセージを内部に蓄積します。settings.jsnodeMessageBufferMaxLengthを指定することで蓄積するメッセージの最大値を制限することができます。

+

メッセージがmsg.resetプロパティを持つ場合、蓄積したメッセージを削除し送信を行いません。

+ diff --git a/test/nodes/core/sequence/19-batch_spec.js b/test/nodes/core/sequence/19-batch_spec.js index 3a40ebfb5..b2e1e6de2 100644 --- a/test/nodes/core/sequence/19-batch_spec.js +++ b/test/nodes/core/sequence/19-batch_spec.js @@ -107,13 +107,18 @@ describe('BATCH node', function() { } } - function delayed_send(receiver, index, count, delay) { + function delayed_send(receiver, index, count, delay, done) { if (index < count) { setTimeout(function() { receiver.receive({payload: index}); - delayed_send(receiver, index+1, count, delay); + delayed_send(receiver, index+1, count, delay, done); }, delay); } + else if(index === count) { + if (done) { + done(); + } + } } function check_interval(flow, results, delay, done) { @@ -198,10 +203,28 @@ describe('BATCH node', function() { }); }); + it('should handle reset', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "count", count: 2, overlap: 0, interval: 0, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var results = [ + [0, 1], + [4, 5] + ]; + check_data(n1, n2, results, done); + n1.receive({payload:0}); + n1.receive({payload:1}); + n1.receive({payload:2}); + n1.receive({payload:3, reset: true}); + n1.receive({payload:4}); + n1.receive({payload:5}); + }); + }); }); describe('mode: interval', function() { - it('should create seq. with interval', function(done) { var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, {id:"n2", type:"helper"}]; @@ -265,10 +288,29 @@ describe('BATCH node', function() { }); }); + it('should handle reset', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "interval", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var results = [ + [0, 1], + [4, 5] + ]; + check_data(n1, n2, results, done); + delayed_send(n1, 0, 3, 400, function () { + setTimeout(function () { + n1.receive({payload: "3", reset: true}); + delayed_send(n1, 4, 7, 400); + }, 10); + }); + }); + }); + }); describe('mode: concat', function() { - it('should concat two seq. (series)', function(done) { var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, {id:"n2", type:"helper"}]; @@ -355,6 +397,58 @@ describe('BATCH node', function() { }); }); + it('should handle reset', function(done) { + var flow = [{id:"n1", type:"batch", name: "BatchNode", mode: "concat", count: 0, overlap: 0, interval: 1, allowEmptySequence: false, topics: [{topic: "TA"}, {topic: "TB"}], wires:[["n2"]]}, + {id:"n2", type:"helper"}]; + try { + helper.load(batchNode, flow, function() { + var n1 = helper.getNode("n1"); + var n2 = helper.getNode("n2"); + var results = [ + [2, 3, 0, 1] + ]; + check_data(n1, n2, results, done); + var inputs0 = [ + ["TB", 0, 0, 2], + ["TA", 1, 0, 2], + ]; + for(var data of inputs0) { + var msg = { + topic: data[0], + payload: data[1], + parts: { + id: data[0], + index: data[2], + count: data[3] + } + }; + n1.receive(msg); + } + n1.receive({payload: undefined, reset: true}); + var inputs1 = [ + ["TB", 0, 0, 2], + ["TB", 1, 1, 2], + ["TA", 2, 0, 2], + ["TA", 3, 1, 2] + ]; + for(var data of inputs1) { + var msg = { + topic: data[0], + payload: data[1], + parts: { + id: data[0], + index: data[2], + count: data[3] + } + }; + n1.receive(msg); + } + }); + } + catch (e) { + done(e); + } + }); }); });