New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Messaging API support of core nodes #2402
Conversation
This is going to take quite some time to review given it changes so many core nodes. In future, please consider breaking up items like this into multiple PRs that are easier to review in small steps. |
My basic comment is that they all seem to just replace |
I agree. I'm sorry for causing you trouble.
From my understanding, we can safely assume that "input" callback is called with the correct signature, if we use a recent (1.0.0-beta3 or newer) runtime. |
To help me track progress through reviewing this PR, here's a checklist of all nodes it touches. I'll tick them off as I complete the review for each one.
|
}, delayvar); | ||
send(msg); | ||
done(); | ||
}, delayvar, () => done("queue cleared")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think clearing the delay queue should be considered an error - which calling done(...)
with an argument suggests. The queue is being cleared because the user has asked for that to happen.
(This applies to all the places you call done(...)
with a string argument).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I'll remove an argument on done(...) when clearing the queue:
}, node.timeout, () => done("queue cleared")); }, delayvar, () => done("queue cleared")); msgInfo.done("queue cleared"); msgInfo.done("queue cleared"); msgInfo.done("queue cleared");
@@ -354,9 +360,12 @@ module.exports = function(RED) { | |||
node.pending_count = pending_count; | |||
var max_msgs = maxKeptMsgsCount(node); | |||
if ((max_msgs > 0) && (pending_count > max_msgs)) { | |||
Object.values(node.pending).forEach(group => { | |||
group.msgs.forEach(mInfo => mInfo.done(RED._("join.too-many"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you describe the logic here?
It looks like you now report the too-many
error against every single message, rather than just the most recent one. Not sure if that's the right behaviour - the error log will be flooded. But I think we need to discuss it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention is to notify which message is dropped due to buffer overflow.
But, yes, it will cause a flood of error messages.
Possible alternative is:
- call msgInfo.done()
for queued messages,
- call done(RED._("join.too-many"))
for emit an error message, and
- call node.trace("overflowed message ["+msgInfo.msg.id+"]")
to log the flashed messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reply for the Batch node - I agree with point 1 and 2 - but not 3 (node.trace
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll also update for this as same as Batch node.
Object.values(node.pending).forEach(p_topic => { | ||
p_topic.groups.forEach(group => { | ||
group.msgs.forEach(msg => { | ||
msg.done("premature close"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really an error? In other nodes the close function calls done()
with no error even though the message will be discarded.
We really need to be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to comment to this. I removed all occurrences of "premature close"
argument in batch node on commit c23e71b.
Object.values(node.pending).forEach(p_topic => { | ||
Object.values(p_topic.groups).forEach(group => { | ||
group.msgs.forEach(msg => { | ||
msg.done(RED._("batch.too-many")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with other comments - is it really the right thing to throw an error against every message in the group? That is going to cause a lot of noise in the log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as commented in split.js
(#2402 (comment)), I propose following behaviour:
- call
msgInfo.done()
for queued messages, - call
done(RED._("batch.too-many"))
for emit an error message, and - call
node.trace("overflowed message ["+msgInfo.msg.id+"]")
to log the flashed messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with calling done()
on the queued messages and done(RED._("batch.too-many"))
on the message that caused the overflow.
I do not think we should node.trace
all of the flushed messages as again, that could be a huge list and there's little the user would be able to do with that information from the trace log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this. I'll replace done(RED._("batch.too-many"))
with done()
, and emit a single error on the message that caused the overflow.
We are almost there with this PR! I think there is just the issue with the Batch/Join nodes on what they do when the max message count is reached. If we can get that sorted, then we can get this merged. I do need to point out that the unit tests have not been updated for any of these changes. That is not ideal - especially as these are changes to such a core piece of functionality. I should have flagged that up when this PR was first raised. Can you confirm you have manually tested all of the nodes covered by this PR for all the different paths that have been changed? |
I'll make unit tests to check the logic. Could you give me some more time? |
At now, I'm looking delay nodes logic, and I found several oversights on calling Would you mind closing this PR? |
This is your PR - if you think closing it and opening smaller individual PRs is a better choice, then that's absolutely fine by me 👍 |
Proposed changes
Update core nodes to support new Node Messaging API.
Invocation of done() is deferred until the message has been removed from node's own message buffer/queue, or promise for the message processing has been fulfilled. For example, in delay node, done() is not called when a message has pushed to its queue. It is called after the message has been removed from the queue and sent to next nodes.
This PR is a preparation for Graceful Shutdown function, which needs to detect a completion of message processing.
Checklist
grunt
to verify the unit tests pass