New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Consensus] Queue for synchronization.Engine
#910
Conversation
…ed implementing queue consuming in engine
…tributor. Updated all nodes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
|
||
// setupRequestMessageHandler initializes the inbound queues and the MessageHandler for UNTRUSTED requests. | ||
func (e *Engine) setupRequestMessageHandler() { | ||
e.pendingSyncRequests = NewRequestQueue(defaultSyncRequestQueueCapacity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.pendingSyncRequests = NewRequestQueue(defaultSyncRequestQueueCapacity) | |
// RequestQueue deduplicates requests by keeping only one sync request for each requester. | |
e.pendingSyncRequests = NewRequestQueue(defaultSyncRequestQueueCapacity) |
… implemented by `badger.Headers`
• reduced locking in `synchronization.Core`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments & Suggestions:
- It would be great to fix this performance bottleneck:
block, err := e.blocks.ByHeight(height) - In contrast to
badger.Headers
, which has a cache for heights,badger.blocks
does not have such a cache and instead always hits the data base.
- In contrast to
- I think we could inline method
flow-go/engine/common/synchronization/engine.go
Lines 581 to 596 in 5fd2ff2
// processIncoming processes an incoming block, so we can take into account the // overlap between block IDs and heights. func (e *Engine) processIncomingBlock(originID flow.Identifier, block *flow.Block) { shouldProcess := e.core.HandleBlock(block.Header) if !shouldProcess { return } synced := &events.SyncedBlock{ OriginID: originID, Block: block, } e.comp.SubmitLocal(synced) } - We might be able to reduce lock congestion in
Core. HandleHeight
:- we always lock right away, but on the happy path when the node is up to date, this code would return right away:
flow-go/module/synchronization/core.go
Lines 102 to 105 in 5fd2ff2
// don't bother queueing anything if we're within tolerance if c.WithinTolerance(final, height) { return } WithinTolerance
is fully concurrency safe without any locks (we also call it externally, here). Hence, we could move the lock further down into theif
statement, where we actually updateCore
's state.
- we always lock right away, but on the happy path when the node is up to date, this code would return right away:
Implemented these suggestions☝️ in my PR #926.
The suggestions below (code comments) are not implemented in PR 926.
|
||
// we keep reducing the cache size until we are at limit again | ||
for len(q.requests) >= int(q.limit) { | ||
|
||
// eject first element using go map properties | ||
var key flow.Identifier | ||
for originID := range q.requests { | ||
key = originID | ||
break | ||
} | ||
|
||
delete(q.requests, key) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we could write this much more concisely:
// we keep reducing the cache size until we are at limit again | |
for len(q.requests) >= int(q.limit) { | |
// eject first element using go map properties | |
var key flow.Identifier | |
for originID := range q.requests { | |
key = originID | |
break | |
} | |
delete(q.requests, key) | |
} | |
// we keep reducing the cache size until we are at limit again | |
for overCapacity := len(q.requests) - int(q.limit); overCapacity >= 0; overCapacity -- { | |
for originID := range q.requests { | |
delete(q.requests, originID) | |
} | |
} |
Did I miss something?
// first try to eject if we are at max capacity, we need to do this way | ||
// to prevent a situation where just inserted item gets ejected | ||
q.reduce() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the following scenario:
- the queue is at max capacity
- it contains already an element from Origin
A
- we are attempting to put a new element from
A
into the queue
The result is that:
- the old message from A is overwritten by the new one (desired).
- but we also ejected some other message (which seems not very intuitive)
Suggestion:
// first try to eject if we are at max capacity, we need to do this way | |
// to prevent a situation where just inserted item gets ejected | |
q.reduce() | |
if _, found := q.requests[message.OriginID]; !found { | |
// if no message from the origin is stored, make sure we have room to store the new message: | |
q.reduce() | |
} |
filter.Not(filter.HasNodeID(e.me.NodeID())), | ||
)) | ||
if err != nil { | ||
return fmt.Errorf("could not send get consensus participants: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return fmt.Errorf("could not send get consensus participants: %w", err) | |
return fmt.Errorf("could get consensus participants at latest finalized block: %w", err) |
…queue_-_suggestions suggestions for PR 910
Codecov Report
@@ Coverage Diff @@
## master #910 +/- ##
==========================================
+ Coverage 56.56% 56.69% +0.13%
==========================================
Files 424 425 +1
Lines 25008 25146 +138
==========================================
+ Hits 14145 14257 +112
- Misses 8941 8955 +14
- Partials 1922 1934 +12
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
…hub.com/onflow/flow-go into yurii/5615-synchronization-engine-queue
dapperlabs/flow-go/issues/5615
Context
This PR implements support of inbound queue for messages that are submitted by network layer and/or other engines.
As described in issue this PR mainly implements messages queening logic as it is implemented in other consensus engines(sealing, matching, compliance). Also currently there is quite inefficient access to final protocol state, this PR contains changes for this as well.
In proposed implementation we have two separate goroutines which handle requests and responses in parallel, maybe that's not needed but I though that it's a good idea to more evenly distribute load.
Contributions of this PR