New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add message queue to transaction ingest
engine
#2035
Conversation
ingest
ingest engine
* implement Component * use LifecycleManager
Codecov Report
@@ Coverage Diff @@
## master #2035 +/- ##
==========================================
- Coverage 56.99% 56.96% -0.03%
==========================================
Files 635 635
Lines 36978 37038 +60
==========================================
+ Hits 21076 21100 +24
- Misses 13250 13278 +28
- Partials 2652 2660 +8
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
ingest
ingest engineingest
engine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
…go into jordan/6164-ln-ingest-msg-q
queue, err := fifoqueue.NewFifoQueue( | ||
fifoqueue.WithCapacity(int(config.MaxMessageQueueSize)), | ||
fifoqueue.WithLengthObserver(func(len int) { | ||
mempoolMetrics.MempoolEntries(metrics.ResourceTransactionIngestQueue, uint(len)) | ||
}), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general question: Any reason we seem to prefer using fifoqueue
in the codebase for this sort of thing? Aside from maybe the fact that it allows you to register a length observer?
I think in most cases buffered channels serve this purpose just as well, if not better (allows multiple consumers to consume from it at once), and it is simpler because you don't have to manually trigger the Notifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only main benefit I see of using fifoqueue
is that it could allow for a potentially infinitely growing queue, meaning we don't need to drop requests because of reaching capacity.
However, we eliminate this benefit if we restrict the capacity using WithCapacity
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The length observer is one, and also it is more flexible in terms of using different message stores (eg. a priority queue). We initially used buffered channels when first adding a per-engine message queue, and ultimately decided against it. Another reason was that the implementation we had initially had a lot of channel management boilerplate duplicated in each engine which was difficult to reason about if you're not already quite familiar with channels. Another alternative which we talked about (maybe worth revisiting) is to have the MessageStore
expose a channel for getting the next message to process, rather than using the notifier. Then the MessageStore
needs to maintain a goroutine to shovel messages into that channel, and to be startable/stoppable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember a while back we discovered that the Notifier
had a subtle bug. I don't remember the exact details, but basically if multiple messages are put into the queue concurrently, there's a race condition that could cause only a single notification to be emitted from the notifier. @AlexHentschel @durkmurder
So if we want to keep using this, we should either fix the Notifier or expose the channel as you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if multiple messages are put into the queue concurrently, there's a race condition that could cause only a single notification to be emitted from the notifier
This sounds like expected behaviour of the Notifier
to me. If routine A and B attempt to Notify
before routine C
wakes to read the notification, one of the Notify
calls from A or B will be a no-op and C
will only observe one notification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the problem, the problem is when there are multiple consumers, e.g C1
, C2
, and C3
. They could all be idle and waiting for a notification, but with the existing Notifier
implementation, it is possible that a producer could come and insert 20 items into the queue and call Notify
20 times, yet only one of the consumers would be woken up. This is undesirable, since if there are 3 workers available, we would obviously want to awaken all of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically, the root of the issue with the existing Notifier
implementation lies in the fact that in Go, all messages sent to a channel are delivered asynchronously, even when a consumer is already blocked on a read at the point when a send is initiated.
Therefore, in the scenario above, it's possible that when the producer sends notifications 2 to 20, notification 1 has still not been received by any receiver, even though all 3 receivers had already been blocked on a channel read prior to the first notification send. This results in notifications 2 to 20 being dropped on the floor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that one Notifier
does not work to notify multiple consumers. It is intended to send a "wake" signal to a worker when "some work" is ready. Not to send one signal per unit of work. That is why the consumer, when awoken, will process all the work available.
I think exposing a channel as the the mechanism to read from a message queue is a better approach than what we have here.
Also, just tracking #2046 (comment) here since it also needs to updated on this PR |
Replace ProcessLocal with ProcessTransaction, check for shutdown signal in Process.
(has the same effect as checking context cancellation, which we already do)
this makes it consistent with the Access RPC engine, and more obvious that this component belongs to the access node
previously the logger was never initialized
we have determined the network thread blocking issue is not caused by this engine - therefore removing the additional debug logging.
queue, err := fifoqueue.NewFifoQueue( | ||
fifoqueue.WithCapacity(int(config.MaxMessageQueueSize)), | ||
fifoqueue.WithLengthObserver(func(len int) { | ||
mempoolMetrics.MempoolEntries(metrics.ResourceTransactionIngestQueue, uint(len)) | ||
}), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember a while back we discovered that the Notifier
had a subtle bug. I don't remember the exact details, but basically if multiple messages are put into the queue concurrently, there's a race condition that could cause only a single notification to be emitted from the notifier. @AlexHentschel @durkmurder
So if we want to keep using this, we should either fix the Notifier or expose the channel as you suggested.
engine/collection/ingest/engine.go
Outdated
// onTransaction handles receipt of a new transaction. This can be submitted | ||
// from outside the system or routed from another collection node. | ||
func (e *Engine) onTransaction(originID flow.Identifier, tx *flow.TransactionBody) error { | ||
|
||
txID := tx.ID() | ||
defer e.engMetrics.MessageHandled(metrics.EngineCollectionIngest, metrics.MessageTransaction) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is enormous, would it be possible to split it up a bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split up in b2d3de8
engine/collection/ingest/engine.go
Outdated
func (e *Engine) Process(channel network.Channel, originID flow.Identifier, event interface{}) error { | ||
select { | ||
case <-e.ComponentManager.ShutdownSignal(): | ||
return component.ErrComponentShutdown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we actually handle this sentinel somewhere? How about we just return nil
in such case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather return an error than nil
, since the message is not being processed. We shouldn't assume the caller knows the component is shut down - even if the error isn't handled, it at least gives visibility that the component is not in a healthy state (eg. in network layer logs, which is where this would currently surface).
If we return nil
, and for some reason the node is not imminently shutting down, it's more difficult to see that there is a problem with this component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see where it will be useful, I mean in future iterations based on FLIP we won't return any error to network layer since it doesn't know what to do with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about ProcessTransaction
for another example? If another component calls ProcessTransaction
and the component has been shut down, should we return an error or nil
? I think we should return an error, for the same reasons mentioned in the last comment.
(Currently we don't handle this at all and just process the transaction normally, which I'll fix.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say in case of ProcessTransaction
it makes sense to return error since it's called from RPC and it can be reported to user.
Based on FLIP, I am mainly concerned about Process
which is called directly from network layer and as I said, network layer have no clue what to do with error so it's better to omit it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 2b6cd4f
err := e.processAvailableMessages(ctx) | ||
if err != nil { | ||
// if an error reaches this point, it is unexpected | ||
ctx.Throw(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking Throw
should terminate current goroutine but I would still add a return
after to be safe
@@ -12,6 +12,9 @@ import ( | |||
"github.com/onflow/flow-go/module/util" | |||
) | |||
|
|||
// ErrComponentShutdown is returned by a component which has already been shut down. | |||
var ErrComponentShutdown = fmt.Errorf("component has already shut down") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, don't believe we handle it somewhere and don't see a practical usage for it, I mean even if we handle it we would most likely just ignore it either way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Clean code, thank you. Added a few minor comments.
You did some refactoring of RPC ingestion engine, maybe we should move it to ComponentManager
as well? Could be done in separate PR as well.
* in Process, log a warning and return nil * in ProcessTransaction, return sentinel
This PR adds a message queue to the
ingest
engine to avoid blocking network layer threads when processing transactions.ComponentManager
iningest
engineProcessLocal
with typedProcessTransaction
for accepting transactions from local sourceingress
RPC server componentengine/collection/rpc
, for consistency with the Access node RPC engineBackend
for transaction processor dependency (theingest
engine) rather than a genericnetwork.Engine
)