From 3350f061b246884233e13680f319995de8853e62 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 1 Apr 2021 09:18:11 +0700 Subject: [PATCH] headers: remove with cancel (#1640) * remove with cancel * remove with cancel * remove with cancel --- cmd/headers/download/downloader.go | 198 ++++++++++++----------------- cmd/headers/download/sentry.go | 19 ++- 2 files changed, 92 insertions(+), 125 deletions(-) diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go index c32a190bf82..d87a948f88c 100644 --- a/cmd/headers/download/downloader.go +++ b/cmd/headers/download/downloader.go @@ -87,12 +87,11 @@ func Download(sentryAddr string, coreAddr string, db ethdb.Database, timeout, wi Forks: controlServer.forks, }, } - callCtx, cancelFn := context.WithCancel(ctx) - if _, err = sentryClient.SetStatus(callCtx, statusMsg, &grpc.EmptyCallOption{}); err != nil { - cancelFn() + + if _, err = sentryClient.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil { return fmt.Errorf("setting initial status message: %w", err) } - cancelFn() + go func() { for { select { @@ -100,22 +99,7 @@ func Download(sentryAddr string, coreAddr string, db ethdb.Database, timeout, wi return default: } - callCtx, cancelFn = context.WithCancel(ctx) - receiveClient, err2 := sentryClient.ReceiveMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{}) - if err2 != nil { - log.Error("Receive messages failed", "error", err2) - } else { - inreq, err := receiveClient.Recv() - for ; err == nil; inreq, err = receiveClient.Recv() { - if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil { - log.Error("Handling incoming message", "error", err1) - } - } - if err != nil && !errors.Is(err, io.EOF) { - log.Error("Receive loop terminated", "error", err) - } - } - cancelFn() + recvMessage(ctx, sentryClient, controlServer) // Wait before trying to reconnect to prevent log flooding time.Sleep(2 * time.Second) } @@ -127,22 +111,7 @@ func Download(sentryAddr string, coreAddr string, db ethdb.Database, timeout, wi return default: } - callCtx, cancelFn = context.WithCancel(ctx) - receiveUploadClient, err3 := sentryClient.ReceiveUploadMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{}) - if err3 != nil { - log.Error("Receive upload messages failed", "error", err3) - } else { - inreq, err := receiveUploadClient.Recv() - for ; err == nil; inreq, err = receiveUploadClient.Recv() { - if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil { - log.Error("Handling incoming message", "error", err1) - } - } - if err != nil && !errors.Is(err, io.EOF) { - log.Error("Receive upload loop terminated", "error", err) - } - } - cancelFn() + recvUploadMessage(ctx, sentryClient, controlServer) // Wait before trying to reconnect to prevent log flooding time.Sleep(2 * time.Second) } @@ -167,6 +136,56 @@ func Download(sentryAddr string, coreAddr string, db ethdb.Database, timeout, wi return nil } +func recvUploadMessage(ctx context.Context, sentryClient proto_sentry.SentryClient, controlServer *ControlServerImpl) { + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + receiveUploadClient, err3 := sentryClient.ReceiveUploadMessages(streamCtx, &empty.Empty{}, &grpc.EmptyCallOption{}) + if err3 != nil { + log.Error("Receive upload messages failed", "error", err3) + return + } + + for inreq, err := receiveUploadClient.Recv(); ; inreq, err = receiveUploadClient.Recv() { + if err != nil { + if !errors.Is(err, io.EOF) { + log.Error("Receive upload loop terminated", "error", err) + return + } + return + } + + if err = controlServer.handleInboundMessage(ctx, inreq); err != nil { + log.Error("Handling incoming message", "error", err) + } + } +} + +func recvMessage(ctx context.Context, sentryClient proto_sentry.SentryClient, controlServer *ControlServerImpl) { + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + receiveClient, err2 := sentryClient.ReceiveMessages(streamCtx, &empty.Empty{}, &grpc.EmptyCallOption{}) + if err2 != nil { + log.Error("Receive messages failed", "error", err2) + return + } + + for inreq, err := receiveClient.Recv(); ; inreq, err = receiveClient.Recv() { + if err != nil { + if !errors.Is(err, io.EOF) { + log.Error("Receive loop terminated", "error", err) + return + } + return + } + + if err = controlServer.handleInboundMessage(ctx, inreq); err != nil { + log.Error("Handling incoming message", "error", err) + } + } +} + // Combined creates and starts sentry and downloader in the same process func Combined(natSetting string, port int, staticPeers []string, discovery bool, netRestrict string, db ethdb.Database, timeout, window int, chain string) error { ctx := rootContext() @@ -195,48 +214,13 @@ func Combined(natSetting string, port int, staticPeers []string, discovery bool, Forks: controlServer.forks, }, } - callCtx, cancelFn := context.WithCancel(ctx) - if _, err = sentryClient.SetStatus(callCtx, statusMsg, &grpc.EmptyCallOption{}); err != nil { - cancelFn() + + if _, err := sentryClient.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil { return fmt.Errorf("setting initial status message: %w", err) } - cancelFn() - callCtx, cancelFn = context.WithCancel(ctx) - receiveClient, err2 := sentryClient.ReceiveMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{}) - if err2 != nil { - cancelFn() - return fmt.Errorf("receive messages failed: %w", err2) - } - cancelFn() - go func() { - inreq, err := receiveClient.Recv() - for ; err == nil; inreq, err = receiveClient.Recv() { - if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil { - log.Error("Handling incoming message", "error", err1) - } - } - if err != nil && !errors.Is(err, io.EOF) { - log.Error("Receive loop terminated", "error", err) - } - }() - callCtx, cancelFn = context.WithCancel(ctx) - receiveUploadClient, err3 := sentryClient.ReceiveUploadMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{}) - if err3 != nil { - cancelFn() - return fmt.Errorf("receive upload messages failed: %w", err3) - } - cancelFn() - go func() { - inreq, err := receiveUploadClient.Recv() - for ; err == nil; inreq, err = receiveUploadClient.Recv() { - if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil { - log.Error("Handling incoming message", "error", err1) - } - } - if err != nil && !errors.Is(err, io.EOF) { - log.Error("Receive loop terminated", "error", err) - } - }() + + go recvMessage(ctx, sentryClient, controlServer) + go recvUploadMessage(ctx, sentryClient, controlServer) if err := stages.StageLoop( ctx, @@ -346,9 +330,7 @@ func (cs *ControlServerImpl) updateHead(ctx context.Context, height uint64, hash Forks: cs.forks, }, } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - if _, err := cs.sentryClient.SetStatus(callCtx, statusMsg, &grpc.EmptyCallOption{}); err != nil { + if _, err := cs.sentryClient.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil { log.Error("Update status message for the sentry", "error", err) } } @@ -377,7 +359,8 @@ func (cs *ControlServerImpl) newBlockHashes(ctx context.Context, inreq *proto_se Data: b, }, } - if err := sendMessageById(ctx, cs.sentryClient, &outreq); err != nil { + + if _, err = cs.sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}); err != nil { return fmt.Errorf("send header request: %v", err) } } @@ -385,13 +368,6 @@ func (cs *ControlServerImpl) newBlockHashes(ctx context.Context, inreq *proto_se return nil } -func sendMessageById(ctx context.Context, sentryClient proto_sentry.SentryClient, outreq *proto_sentry.SendMessageByIdRequest) error { - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - _, err := sentryClient.SendMessageById(callCtx, outreq, &grpc.EmptyCallOption{}) - return err -} - func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sentry.InboundMessage) error { rlpStream := rlp.NewStream(bytes.NewReader(inreq.Data), uint64(len(inreq.Data))) _, err := rlpStream.List() @@ -400,12 +376,16 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sent } var headersRaw [][]byte var headerRaw []byte - for headerRaw, err = rlpStream.Raw(); err == nil; headerRaw, err = rlpStream.Raw() { + for headerRaw, err = rlpStream.Raw(); ; headerRaw, err = rlpStream.Raw() { + if err != nil { + if !errors.Is(err, rlp.EOL) { + return fmt.Errorf("decode BlockHeaders: %w", err) + } + break + } + headersRaw = append(headersRaw, headerRaw) } - if err != nil && !errors.Is(err, rlp.EOL) { - return fmt.Errorf("decode BlockHeaders: %w", err) - } headers := make([]*types.Header, len(headersRaw)) var heighestBlock uint64 var i int @@ -429,9 +409,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sent PeerId: inreq.PeerId, Penalty: proto_sentry.PenaltyKind_Kick, // TODO: Extend penalty kinds } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - if _, err1 := cs.sentryClient.PenalizePeer(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { + if _, err1 := cs.sentryClient.PenalizePeer(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { log.Error("Could not send penalty", "err", err1) } } @@ -442,9 +420,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sent PeerId: inreq.PeerId, MinBlock: heighestBlock, } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - if _, err1 := cs.sentryClient.PeerMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { + if _, err1 := cs.sentryClient.PeerMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { log.Error("Could not send min block for peer", "err", err1) } //log.Info("HeadersMsg processed") @@ -479,9 +455,7 @@ func (cs *ControlServerImpl) newBlock(ctx context.Context, inreq *proto_sentry.I PeerId: inreq.PeerId, Penalty: proto_sentry.PenaltyKind_Kick, // TODO: Extend penalty kinds } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - if _, err1 := cs.sentryClient.PenalizePeer(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { + if _, err1 := cs.sentryClient.PenalizePeer(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { log.Error("Could not send penalty", "err", err1) } } @@ -493,9 +467,7 @@ func (cs *ControlServerImpl) newBlock(ctx context.Context, inreq *proto_sentry.I PeerId: inreq.PeerId, MinBlock: request.Block.NumberU64(), } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - if _, err1 := cs.sentryClient.PeerMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { + if _, err1 := cs.sentryClient.PeerMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil { log.Error("Could not send min block for peer", "err", err1) } log.Info(fmt.Sprintf("NewBlockMsg{blockNumber: %d} from [%s]", request.Block.NumberU64(), gointerfaces.ConvertH512ToBytes(inreq.PeerId))) @@ -665,9 +637,7 @@ func (cs *ControlServerImpl) getBlockHeaders(ctx context.Context, inreq *proto_s Data: b, }, } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - _, err = cs.sentryClient.SendMessageById(callCtx, &outreq, &grpc.EmptyCallOption{}) + _, err = cs.sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) if err != nil { return fmt.Errorf("send header response: %v", err) } @@ -721,9 +691,7 @@ func (cs *ControlServerImpl) getBlockBodies(ctx context.Context, inreq *proto_se Data: b, }, } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - _, err = cs.sentryClient.SendMessageById(callCtx, &outreq, &grpc.EmptyCallOption{}) + _, err = cs.sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) if err != nil { return fmt.Errorf("send bodies response: %v", err) } @@ -787,9 +755,7 @@ func (cs *ControlServerImpl) sendHeaderRequest(ctx context.Context, req *headerd Data: bytes, }, } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{}) + sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}) if err1 != nil { log.Error("Could not send header request", "err", err1) return nil @@ -816,9 +782,7 @@ func (cs *ControlServerImpl) sendBodyRequest(ctx context.Context, req *bodydownl Data: bytes, }, } - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{}) + sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}) if err1 != nil { log.Error("Could not send block bodies request", "err", err1) return nil @@ -831,9 +795,7 @@ func (cs *ControlServerImpl) sendBodyRequest(ctx context.Context, req *bodydownl func (cs *ControlServerImpl) penalise(ctx context.Context, peer []byte) { penalizeReq := proto_sentry.PenalizePeerRequest{PeerId: gointerfaces.ConvertBytesToH512(peer), Penalty: proto_sentry.PenaltyKind_Kick} - callCtx, cancel := context.WithCancel(ctx) - defer cancel() - if _, err := cs.sentryClient.PenalizePeer(callCtx, &penalizeReq, &grpc.EmptyCallOption{}); err != nil { + if _, err := cs.sentryClient.PenalizePeer(ctx, &penalizeReq, &grpc.EmptyCallOption{}); err != nil { log.Error("Could not penalise", "peer", peer, "error", err) } } diff --git a/cmd/headers/download/sentry.go b/cmd/headers/download/sentry.go index a2b7915a2e7..6ccb795d814 100644 --- a/cmd/headers/download/sentry.go +++ b/cmd/headers/download/sentry.go @@ -178,9 +178,9 @@ func runPeer( return fmt.Errorf("handshake to peer %s: %v", peerID, err) } // Read handshake message - msg, err := rw.ReadMsg() - if err != nil { - return err + msg, err1 := rw.ReadMsg() + if err1 != nil { + return err1 } if msg.Code != eth.StatusMsg { @@ -193,9 +193,9 @@ func runPeer( } // Decode the handshake and make sure everything matches var status eth.StatusPacket - if err = msg.Decode(&status); err != nil { + if err1 = msg.Decode(&status); err1 != nil { msg.Discard() - return fmt.Errorf("decode message %v: %v", msg, err) + return fmt.Errorf("decode message %v: %v", msg, err1) } msg.Discard() if status.NetworkID != networkID { @@ -207,12 +207,17 @@ func runPeer( if status.Genesis != genesisHash { return fmt.Errorf("genesis hash does not match: theirs %x, ours %x", status.Genesis, genesisHash) } - if err = forkFilter(status.ForkID); err != nil { - return fmt.Errorf("%v", err) + if err1 = forkFilter(status.ForkID); err1 != nil { + return fmt.Errorf("%v", err1) } //log.Info(fmt.Sprintf("[%s] Received status message OK", peerID), "name", peer.Name()) for { + var err error + if err = common.Stopped(ctx.Done()); err != nil { + return err + } + if _, ok := peerRwMap.Load(peerID); !ok { return fmt.Errorf("peer has been penalized") }