Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset Streams Properly #6066

Merged
merged 2 commits into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
)

const (
// The time to wait before disconnecting a peer.
flushDuration = 50 * time.Millisecond
// The time to wait for a status request.
timeForStatus = 10 * time.Second
)
Expand Down Expand Up @@ -85,9 +83,6 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
if err := goodbyeFunc(context.Background(), remotePeer); err != nil {
log.WithError(err).Trace("Unable to send goodbye message to peer")
}
// Add a short delay to allow the stream to flush before closing the connection.
// There is still a chance that the peer won't receive the message.
time.Sleep(flushDuration)
disconnectFromPeer()
return
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ func (f *blocksFetcher) requestBlocks(
return nil, err
}
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Error("Failed to close stream")
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to close stream with protocol %s", stream.Protocol())
}
}()

Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ func (r *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()
for i := 0; i < len(blockRoots); i++ {
blk, err := ReadChunkedBlock(stream, r.p2p)
if err == io.EOF {
Expand Down
11 changes: 8 additions & 3 deletions beacon-chain/sync/rpc_goodbye.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ func (r *Service) sendGoodByeAndDisconnect(ctx context.Context, code uint64, id
"peer": id,
}).Debug("Could not send goodbye message to peer")
}
// Add a short delay to allow the stream to flush before closing the connection.
// There is still a chance that the peer won't receive the message.
time.Sleep(50 * time.Millisecond)
if err := r.p2p.Disconnect(id); err != nil {
return err
}
Expand All @@ -68,8 +65,16 @@ func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.I
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()
log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")
// Add a short delay to allow the stream to flush before resetting it.
// There is still a chance that the peer won't receive the message.
time.Sleep(50 * time.Millisecond)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (r *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
// metadata requests send no payload, so closing the
// stream early leads it to a reset.
defer func() {
if err := stream.Close(); err != nil {
log.WithError(err).Error("Failed to close stream")
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream for protocol %s", stream.Protocol())
}
}()
code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/rpc_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func (r *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()

code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/rpc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (r *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
if err != nil {
return err
}
defer func() {
if err := stream.Reset(); err != nil {
log.WithError(err).Errorf("Failed to reset stream with protocol %s", stream.Protocol())
}
}()

code, errMsg, err := ReadStatusCode(stream, r.p2p.Encoding())
if err != nil {
Expand Down