Skip to content

Commit

Permalink
Fix handling of too many pending messages in SORT node (#1514)
Browse files Browse the repository at this point in the history
* initial support of SORT node

minor fix of sort node

fixed error message of sort node

fixed error handling of SORT node

add test case for SORT node

make limit of messages count computed once in SORT node

* update type in message & info description

* fix handling of pending messages in SORT node
  • Loading branch information
HiroyasuNishiyama authored and dceejay committed Dec 6, 2017
1 parent b98d121 commit d7c8adf
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 6 deletions.
3 changes: 2 additions & 1 deletion nodes/core/locales/en-US/messages.json
Expand Up @@ -857,6 +857,7 @@
"descending" : "descending",
"as-number" : "as number",
"invalid-exp" : "invalid JSONata expression in sort node",
"too-many" : "too many pending messages in sort node"
"too-many" : "too many pending messages in sort node",
"clear" : "clear pending message in sort node"
}
}
41 changes: 36 additions & 5 deletions nodes/core/logic/18-sort.js
Expand Up @@ -57,6 +57,7 @@ module.exports = function(RED) {
var node = this;
var pending = get_context_val(node, 'pending', {})
var pending_count = 0;
var pending_id = 0;
var order = n.order || "ascending";
var as_num = n.as_num || false;
var key_is_payload = (n.keyType === 'payload');
Expand Down Expand Up @@ -134,6 +135,32 @@ module.exports = function(RED) {
return false;
}

function clear_pending() {
for(var key in pending) {
node.log(RED._("sort.clear"), pending[key].msgs[0]);
delete pending[key];
}
pending_count = 0;
}

function remove_oldest_pending() {
var oldest = undefined;
var oldest_key = undefined;
for(var key in pending) {
var item = pending[key];
if((oldest === undefined) ||
(oldest.seq_no > item.seq_no)) {
oldest = item;
oldest_key = key;
}
}
if(oldest !== undefined) {
delete pending[oldest_key];
return oldest.msgs.length;
}
return 0;
}

function process_msg(msg) {
if (!msg.hasOwnProperty("parts")) {
if (sort_payload(msg)) {
Expand All @@ -149,7 +176,8 @@ module.exports = function(RED) {
if (!pending.hasOwnProperty(gid)) {
pending[gid] = {
count: undefined,
msgs: []
msgs: [],
seq_no: pending_id++
};
}
var group = pending[gid];
Expand All @@ -166,15 +194,18 @@ module.exports = function(RED) {
}
var max_msgs = max_kept_msgs_count(node);
if ((max_msgs > 0) && (pending_count > max_msgs)) {
pending = {};
pending_count = 0;
node.error(RED._("sort.too-many"),msg);
pending_count -= remove_oldest_pending();
node.error(RED._("sort.too-many"), msg);
}
}

this.on("input", function(msg) {
process_msg(msg);
});

this.on("close", function() {
clear_pending();
})
}

RED.nodes.registerType("sort", SortNode);
Expand Down
22 changes: 22 additions & 0 deletions test/nodes/core/logic/18-sort_spec.js
Expand Up @@ -207,4 +207,26 @@ describe('SORT node', function() {
});
});

it('should clear pending messages on close', function(done) {
var flow = [{id:"n1", type:"sort", order:"ascending", as_num:false, keyType:"payload", wires:[["n2"]]},
{id:"n2", type:"helper"}];
helper.load(sortNode, flow, function() {
var n1 = helper.getNode("n1");
setTimeout(function() {
var logEvents = helper.log().args.filter(function (evt) {
return evt[0].type == "sort";
});
var evt = logEvents[0][0];
evt.should.have.property('id', "n1");
evt.should.have.property('type', "sort");
evt.should.have.property('msg', "sort.clear");
done();
}, 150);
var msg = { payload: 0,
parts: { id: "X", index: 0, count: 2} };
n1.receive(msg);
n1.close();
});
});

});

0 comments on commit d7c8adf

Please sign in to comment.