-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADR - 43] blockchain riri-org #3753
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3753 +/- ##
==========================================
- Coverage 64.06% 64.03% -0.03%
==========================================
Files 242 241 -1
Lines 19968 19966 -2
==========================================
- Hits 12792 12785 -7
- Misses 6137 6143 +6
+ Partials 1039 1038 -1
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Co-Authored-By: Marko <marbar3778@yahoo.com>
Co-Authored-By: Marko <marbar3778@yahoo.com>
Co-Authored-By: Marko <marbar3778@yahoo.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🌟
type Proccesor struct { | ||
height ... | ||
state ... | ||
blocks [height]*Block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how many blocks will be stored in the worst case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. The worst case would be unbounded. We can look to provide back pressure by counting unprocessed messages and sending stopScheduling
messages to the scheduler
when we exceed some threshold. Was this your concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I was thinking if there are any ways for malicious peer (group of peers) to exploit this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be using len() on channels to "provide backpressure". Len(ch) should only be used in certain cases... (1) when you're the only reader routine, and you want to ensure that there exists something in the channel before trying something, but otherwise not pull from it. Even still, a value of len()=0 does not mean that a pull will fail. (2) for gathering statistics that does not affect the logic. Even if there was a save way to ensure that there is space in a channel before pushing to it, such a method would not be safe in the face of concurrent pushes to that channel. In other words, even if it works today, it is brittle and can easily break for a variety of reasons in the future.
https://groups.google.com/forum/#!topic/golang-nuts/yQw1Wx6BoUU
The only good general way to deal with fullness of channels is to use select{default:}, but this implies that either (1) you end up losing the thing you are trying to push, or (2) you need some complicate method of dealing with that lost data. A common antipattern here is to launch a new goroutine to push the value asynchronously which has two problems: (a) unlimited blowup of goroutines and (b) loss of order.
Seems like this ADR is incomplete... would like a complete idea of how to deal with full channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This design has evolved with the implementation to resolve a bunch of these open questions. In particular we explicitly use [priority queue](https://github.com/tendermint/tendermint/blob/master/blockchain/v2/routine.go#L22] within the each which is now bounded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing anything that is bounding the size of the queue. bufferSize is used as a hint in constructing a priority queue. The // XXX backpressure
comment and issue still exists in reactor.go.
Also, #4142
msg := <-ioMsgs | ||
switch msg := msg.(type) { | ||
case scBlockRequestMessage: | ||
r.sendBlockToPeer(...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if IO is slow? won't this slow down demuxRoutine
in effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC this goes down results in a peer.TrySend
which are queued and sent later until the queue fills up. Similar to the comment above, we can use the return value from trySend to send stopScheduling
messages to the scheduler
until the queue flushes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Not sure what is the level of detail you want to have in this document wrt different scenarios. But in general I think it would benefit to clarify a bit more the responsibilities of the different modules (especially the processor and scheduler).
I have some comments for some of the code snippets, I know they are there to give hints ...but just to make them correct :)
} | ||
return events | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not clear why the processor needs to keep track of peers. Maybe more details on responsibilities for the different modules could help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, The processor is responsible for removing/rescheduling all unprocessed block received from an errored peer. Is that true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO we need to clarify this. Looks like both the processor and the scheduler maintain some information about the fast sync peers and blocks. They may both detect errors and trigger peer removal and eventually the rescheduling of requests. Crossing messages may cause their view of peers and blocks to get out of sync. At this point the scheduler struct is not filled in, also how the actual scheduling works is missing. I think we need more details on the scheduler and then make the analysis.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The processor is responsible for removing/rescheduling all unprocessed block received from an errored peer. Is that true?
One way to think about Processor is that it should order, verify and execute blocks. If it fails by doing this for some block it will notify scheduler (and io) about error so they can do corresponding actions.
``` | ||
|
||
## Schedule | ||
The scheduler (previously the pool) is responsible for squeding peer status requests and block request responses. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scheduler (previously the pool) is responsible for squeding peer status requests and block request responses. | |
The scheduler (previously the pool) is responsible for handling status and block requests and responses. |
(?) I think the scheduler is reponsible for handling all fast sync messages: statusRequest, statusResponse, blockRequest, blockResponse...
IMO we should add more detail here to clarify responsibilities.
// clean peer list | ||
|
||
events = [] | ||
for peerID := range schedule.peersTouchedSince(time) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for peerID := range schedule.peersTouchedSince(time) { | |
for peerID := range schedule.peersNotTouchedSince(time) { |
(?) Not sure I understand what this loop does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop is intended to iterate over peers that have timed out, change their state to timed out and reschedule blocks pending from those timed out peer to other peers.
} | ||
} | ||
``` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still have a monitor to detect slow peers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think we the schedule.peersNotTouchedSince
loop could be done on schedule.peersSlowerThan(minSpeed)
to prune peers in a more sophisticated way. It could be as simple as that or if we may eventually need something more sophisticated. We could have a sort of PeerPruning
strategy which could be applied to the schedule. Not sure if that's needed now or if we need to emit events indicating why the peer was pruned 🤔.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both v0 and v1 have a monitor and remove slow peers. Events are sent to the switch to remove the slow peer. It is probably ok if we want to leave this aside initially for this refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Yeah I think we can do this without much trouble.
msg := <- input | ||
switch msg := msg.(type) { | ||
case bcBlockRequestMessage: | ||
output <- processor.handleBlockRequest(msg, time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why current time is not appended to a message by demuxRoutine? This might be relevant as we had a scenario where msg was received by a reactor but delayed to be processed for quite some time. Also for debugging and testing purposes might be interesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very good point! It seems like the demuxRoutine should be the sole component responsible for time ensuring a consistent view across *routines. One consequence of this change is that we need to ensure that the we channels are never blocked. If the demuxRoutine is responsible for dispatching correct timed messages to other channels, a block channel would block and starve the other channels for messages. We can do this by having a soft limit
on processes that institute back pressure before their hard limit
channel size is reached. I've added some inline comments to ensure we don't miss this during implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For some concrete hints about how we can handle this, one technique we can use (which we also used in v1
) is overflowing channel
|
||
```go | ||
func (r *BlockChainReactor) Start() { | ||
r.msgs := make(chan Message, maxInFlight) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What this message buffer corresponds to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be the channel which funnels messages from the Receive
method to the demuxRoutine
.
|
||
func (r *BlockchainReactor) AddPeer(peer p2p.Peer) { | ||
... | ||
r.msgs <- bcAddPeerEv{peer.ID} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we write here to a buffer which is used as an outgoing buffer for routines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry if that is not clear.This "buffer" would be used as the input for the routines. Out of curiosity, what would be the motivation for calling this a buffer here instead of channel?
``` | ||
### Processor internals | ||
|
||
The processor will be responsible for validating and processing blocks. The Processor will maintain an internal cursor `height` of the last processed block. As a set of blocks arrive unordered, the the Processor will check if it has `height+1` nessary to process the next block. The processor also maintains the map `blockPeers` of peers to height, to keep track of which peer provided the block at `height`. `blockPeers` can be used in`handleRemovePeer(...)` to reschedule all unprocessed blocks provided by a peer who has errored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
height
seems to be the next block that should be processed and height-1
the last processed block. Typo two "the" in the second sentence.
delete blocks[height] | ||
height++ | ||
lastTouch = time | ||
return pcBlockProcessed{height} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if there are several blocks that can be processed? Should we have a loop here or have external mechanism to trigger this check?
In general looks good to me. I think it gives sufficient details on how to proceed. Then some details or concerns can be addressed on the way when we focus on concrete routine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a section that covers high level a few scenarios where the fast sync terminates and switch to consensus happens?
return pcBlockProcessed{height} | ||
} else { | ||
... // Delete all unprocessed block from the peer | ||
return pcBlockProcessError{peerID, height} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove peer for height+1 also. I was wondering if an error of processing 'height' (H) is enough to be sent to the scheduler. Probably not and the processor should include the height and the peers for both H and H+1.
In one case the scheduler would still have the blocks at H and H+1 and the same peers as the processor. But there might be a race condition where the scheduler has:
- detected errors from P1 after it received H but before the processor has processed H, and/ or
- detected errors from P2 after it received H+1 but before the processor has processed H
- rescheduled H and H+1 with P1' and P2' respectively
- received new blocks H and H+1 and sent them to the processor
- now the processor tries to execute the block at H (from P1), detects an error and sends it to the scheduler.
- the scheduler receives the error for H. It should not remove P1' and P2' and should not reschedule the blocks, therefore it needs the peerID for H and H+1.
Alternatively maybe call handleRemovePeer(peerID) below twice, for H and H+1?
In general I still have concerns on scheduler and processor getting out of sync and accumulation of events on processor side. In the example above the processor will have in its queue:
block(H, P1), block(H+1, P2),..., error(P1), error(P2), block(H, P1'), block(H+1, P2'),...
``` | ||
### Processor internals | ||
|
||
The processor will be responsible for validating and processing blocks. The Processor will maintain an internal cursor `height` of the last processed block. As a set of blocks arrive unordered, the the Processor will check if it has `height+1` nessary to process the next block. The processor also maintains the map `blockPeers` of peers to height, to keep track of which peer provided the block at `height`. `blockPeers` can be used in`handleRemovePeer(...)` to reschedule all unprocessed blocks provided by a peer who has errored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify if the processor receives the blocks from the scheduler (after scheduler validation) or directly from the demux routine? I thought is the former but the diagram shows they are coming from the demux (I think) and this paragraph is not clear.
|
||
func (sc *schedule) numBlockInState(state blockState) uint32 { | ||
num := 0 | ||
for i := sc.minHeight(); i <= sc.maxHeight(); i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is minHeight() computed? What about maxHeight()?
func (sc *schedule) popSchedule(maxRequest int) []scBlockRequestMessage { | ||
// We only want to schedule requests such that we have less than sc.targetPending and sc.targetReceived | ||
// This ensures we don't saturate the network or flood the processor with unprocessed blocks | ||
todo := min(sc.targetPending - sc.numBlockInState(blockStatePending), sc.numBlockInState(blockStateReceived)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this works as it looks like todo starts with 0 (there should be 0 received blocks initially) and the loop will break immediately. Also, not clear it protects from growing the block pool if we receive blocks faster than we process (which is the case). But I think we can figure this out during coding where any issues will become obvious.
... | ||
} | ||
|
||
func handlePeerError(peerID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is this called? On errors from processor?
Should we also have a handlePeerRemove()? It is not clear how the switch sending PeerRemove is handled, should it be handled in both processor and scheduler?
if todo == 0 { | ||
break | ||
} | ||
if blockStates[i] == blockStateNew { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is blockStates slice initialized?
// Delete all unprocessed blocks from peerID | ||
for i = height; i < len(blocks); i++ { | ||
if blockPeers[i] == peerID { | ||
events = append(events, pcBlockReschedule{height}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the event include the peerID of the height? It's for similar reasons as above where the scheduler may have already rescheduled this (if it also processes RemovePeer events)
Co-Authored-By: Anca Zamfir <ancazamfir@users.noreply.github.com>
Documented the pending concern about processing invalidated blocks in #3777. Looks like this is ready to merge 👍 |
The blockchain reactor has multiple high level responsibilities that would be easier to understand/test if seperated. This ADR contains a proposal splitting up the block processing and scheduling requests for fast-sync in order to achieve the proposed API in ADR-40.