-
Notifications
You must be signed in to change notification settings - Fork 861
PeerManager rewrite #2539
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PeerManager rewrite #2539
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is ❌ Your patch status has failed because the patch coverage (63.34%) is below the target coverage (70.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #2539 +/- ##
==========================================
- Coverage 43.34% 43.20% -0.14%
==========================================
Files 1576 1579 +3
Lines 137842 137345 -497
==========================================
- Hits 59744 59343 -401
+ Misses 72678 72624 -54
+ Partials 5420 5378 -42
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| @@ -540,26 +522,6 @@ | |||
Check warning
Code scanning / CodeQL
Calling the system time Warning
| for _, r := range byNodeID { | ||
| byLastConnected.ReplaceOrInsert(r) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| if err := m.store.Set(peer); err != nil { | ||
| return err | ||
| // Record the failure time. | ||
| now := time.Now() |
Check warning
Code scanning / CodeQL
Calling the system time Warning
| for id := range s.last { | ||
| if _, ok := conns.Get(id); !ok { | ||
| delete(s.last, id) | ||
| update = PeerUpdate{ | ||
| NodeID: id, | ||
| Status: PeerStatusDown, | ||
| } | ||
| return true | ||
| } | ||
| if _, ok := m.dynamicPrivatePeers[nodeAddr.NodeID]; ok { | ||
| continue | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for id := range inner.persistentAddrs { | ||
| ids = append(ids, id) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for addr := range pa.addrs { | ||
| addrs = append(addrs, addr) | ||
| } |
Check warning
Code scanning / CodeQL
Iteration over map Warning
| for { | ||
| for db := range r.peerDB.Lock() { | ||
| // Mark connections as still available. | ||
| now := time.Now() |
Check warning
Code scanning / CodeQL
Calling the system time Warning
| if !ok || m == 0 { | ||
| return 0 | ||
| } | ||
| return float64(r.peerManager.Conns().Len()) / float64(m) |
Check notice
Code scanning / CodeQL
Floating point arithmetic Note
| BlockSyncPeers string `mapstructure:"blocksync-peers"` | ||
|
|
||
| // UPNP port forwarding | ||
| // UPNP port forwarding. UNUSED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is UNUSED, do we want to do something about it (eg. remove it) or will that affect existing network compatibility? if we want to do so down the road, lets add a TODO here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just removing it will break config parsing afaiu. I don't have any long term deprecation strategy here. Just wanted to document the fact for clarity.
| // * allow inbound conn to override outbound iff peerID > selfID. | ||
| // This resolves the situation when peers try to connect to each other | ||
| // at the same time. | ||
| oldDir := old.Info().DialAddr.IsPresent() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be oldAddr or something instead of oldDir? (same for newDir)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dir is an abbreviation of "direction" (see comment above). I'll expand the names to old/newDirection to make it more clear.
| // Add new peerAddrs if missing. | ||
| if !ok { | ||
| // Prune some peer if maxPeers limit has been reached. | ||
| if len(i.addrs) == i.options.maxPeers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do we intend this to behave if we have the len(i.addrs) == maxPeers (eg. 128) but none of the peers have failed. In this case, we would just drop this peer, just want to make sure this is the intended behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we just drop it. The rough idea here is that we want to dial at least once every address we accept. This doesn't give us any nice network properties, it just makes the semantics predictable (bounds the churn). I would like to move to some more useful algorithm soon (once I have time to implement it).
| func NewAtomicSend[T any](value T) (w AtomicSend[T]) { | ||
| w.ptr.Store(newVersion(value)) | ||
| // nolint:nakedret | ||
| return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we want the naked return and the nolint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AtomicSend is (intentionally) a no-copy object, because it embeds atomic.Pointer. Naked return is the canonical way of initializing no-copy objects (return w would be already a copy of w).
| // OnStop implements service.Service. | ||
| func (r *Reactor) OnStop() {} | ||
|
|
||
| func (r *Reactor) sendRoutine(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any concerns on whether this will cause the network adapter to be utilized/blocked as we broadcast to a batch of peers (eg. 100). Is it possible here that every 10 seconds, we cause a slowdown to other network communications (eg. consensus voting - maybe this specific message type isnt as problematic bc priority) or should it be fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a couple of aspects to that:
- sendRoutine just puts msgs into peers' outbound queues, which is non-blocking and doesn't do any IO.
- pex msgs have lower priority than consensus messages, so they will be deprioritized anyway
- pex requests are very small so it actually doesn't matter at all
- pex responses are desynchronized due to different roundtrip latency to different peers.
- eventually I would like to get rid of pex requests altogether (make peers proactively push pex responses) and make pex response sizes negligible (limit the size, perhaps sending just diffs).
udpatil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, just had a few comments to improve my understanding further.
…/sei-chain into gprusak-disconnects
* main: PeerManager rewrite (#2539)
Rewrite of the PeerManager logic to make it simple, predictable and race-condition resistant. There are further improvements to be done, but need to keep compatibility with the current p2p implementation. With this pr: