Skip to content

Commit

Permalink
headers: remove with cancel (#1640)
Browse files Browse the repository at this point in the history
* remove with cancel

* remove with cancel

* remove with cancel
  • Loading branch information
AskAlexSharov committed Apr 6, 2021
1 parent 885ed5a commit 3350f06
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 125 deletions.
198 changes: 80 additions & 118 deletions cmd/headers/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,35 +87,19 @@ func Download(sentryAddr string, coreAddr string, db ethdb.Database, timeout, wi
Forks: controlServer.forks,
},
}
callCtx, cancelFn := context.WithCancel(ctx)
if _, err = sentryClient.SetStatus(callCtx, statusMsg, &grpc.EmptyCallOption{}); err != nil {
cancelFn()

if _, err = sentryClient.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil {
return fmt.Errorf("setting initial status message: %w", err)
}
cancelFn()

go func() {
for {
select {
case <-ctx.Done():
return
default:
}
callCtx, cancelFn = context.WithCancel(ctx)
receiveClient, err2 := sentryClient.ReceiveMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err2 != nil {
log.Error("Receive messages failed", "error", err2)
} else {
inreq, err := receiveClient.Recv()
for ; err == nil; inreq, err = receiveClient.Recv() {
if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil {
log.Error("Handling incoming message", "error", err1)
}
}
if err != nil && !errors.Is(err, io.EOF) {
log.Error("Receive loop terminated", "error", err)
}
}
cancelFn()
recvMessage(ctx, sentryClient, controlServer)
// Wait before trying to reconnect to prevent log flooding
time.Sleep(2 * time.Second)
}
Expand All @@ -127,22 +111,7 @@ func Download(sentryAddr string, coreAddr string, db ethdb.Database, timeout, wi
return
default:
}
callCtx, cancelFn = context.WithCancel(ctx)
receiveUploadClient, err3 := sentryClient.ReceiveUploadMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err3 != nil {
log.Error("Receive upload messages failed", "error", err3)
} else {
inreq, err := receiveUploadClient.Recv()
for ; err == nil; inreq, err = receiveUploadClient.Recv() {
if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil {
log.Error("Handling incoming message", "error", err1)
}
}
if err != nil && !errors.Is(err, io.EOF) {
log.Error("Receive upload loop terminated", "error", err)
}
}
cancelFn()
recvUploadMessage(ctx, sentryClient, controlServer)
// Wait before trying to reconnect to prevent log flooding
time.Sleep(2 * time.Second)
}
Expand All @@ -167,6 +136,56 @@ func Download(sentryAddr string, coreAddr string, db ethdb.Database, timeout, wi
return nil
}

func recvUploadMessage(ctx context.Context, sentryClient proto_sentry.SentryClient, controlServer *ControlServerImpl) {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()

receiveUploadClient, err3 := sentryClient.ReceiveUploadMessages(streamCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err3 != nil {
log.Error("Receive upload messages failed", "error", err3)
return
}

for inreq, err := receiveUploadClient.Recv(); ; inreq, err = receiveUploadClient.Recv() {
if err != nil {
if !errors.Is(err, io.EOF) {
log.Error("Receive upload loop terminated", "error", err)
return
}
return
}

if err = controlServer.handleInboundMessage(ctx, inreq); err != nil {
log.Error("Handling incoming message", "error", err)
}
}
}

func recvMessage(ctx context.Context, sentryClient proto_sentry.SentryClient, controlServer *ControlServerImpl) {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()

receiveClient, err2 := sentryClient.ReceiveMessages(streamCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err2 != nil {
log.Error("Receive messages failed", "error", err2)
return
}

for inreq, err := receiveClient.Recv(); ; inreq, err = receiveClient.Recv() {
if err != nil {
if !errors.Is(err, io.EOF) {
log.Error("Receive loop terminated", "error", err)
return
}
return
}

if err = controlServer.handleInboundMessage(ctx, inreq); err != nil {
log.Error("Handling incoming message", "error", err)
}
}
}

// Combined creates and starts sentry and downloader in the same process
func Combined(natSetting string, port int, staticPeers []string, discovery bool, netRestrict string, db ethdb.Database, timeout, window int, chain string) error {
ctx := rootContext()
Expand Down Expand Up @@ -195,48 +214,13 @@ func Combined(natSetting string, port int, staticPeers []string, discovery bool,
Forks: controlServer.forks,
},
}
callCtx, cancelFn := context.WithCancel(ctx)
if _, err = sentryClient.SetStatus(callCtx, statusMsg, &grpc.EmptyCallOption{}); err != nil {
cancelFn()

if _, err := sentryClient.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil {
return fmt.Errorf("setting initial status message: %w", err)
}
cancelFn()
callCtx, cancelFn = context.WithCancel(ctx)
receiveClient, err2 := sentryClient.ReceiveMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err2 != nil {
cancelFn()
return fmt.Errorf("receive messages failed: %w", err2)
}
cancelFn()
go func() {
inreq, err := receiveClient.Recv()
for ; err == nil; inreq, err = receiveClient.Recv() {
if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil {
log.Error("Handling incoming message", "error", err1)
}
}
if err != nil && !errors.Is(err, io.EOF) {
log.Error("Receive loop terminated", "error", err)
}
}()
callCtx, cancelFn = context.WithCancel(ctx)
receiveUploadClient, err3 := sentryClient.ReceiveUploadMessages(callCtx, &empty.Empty{}, &grpc.EmptyCallOption{})
if err3 != nil {
cancelFn()
return fmt.Errorf("receive upload messages failed: %w", err3)
}
cancelFn()
go func() {
inreq, err := receiveUploadClient.Recv()
for ; err == nil; inreq, err = receiveUploadClient.Recv() {
if err1 := controlServer.handleInboundMessage(ctx, inreq); err1 != nil {
log.Error("Handling incoming message", "error", err1)
}
}
if err != nil && !errors.Is(err, io.EOF) {
log.Error("Receive loop terminated", "error", err)
}
}()

go recvMessage(ctx, sentryClient, controlServer)
go recvUploadMessage(ctx, sentryClient, controlServer)

if err := stages.StageLoop(
ctx,
Expand Down Expand Up @@ -346,9 +330,7 @@ func (cs *ControlServerImpl) updateHead(ctx context.Context, height uint64, hash
Forks: cs.forks,
},
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
if _, err := cs.sentryClient.SetStatus(callCtx, statusMsg, &grpc.EmptyCallOption{}); err != nil {
if _, err := cs.sentryClient.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil {
log.Error("Update status message for the sentry", "error", err)
}
}
Expand Down Expand Up @@ -377,21 +359,15 @@ func (cs *ControlServerImpl) newBlockHashes(ctx context.Context, inreq *proto_se
Data: b,
},
}
if err := sendMessageById(ctx, cs.sentryClient, &outreq); err != nil {

if _, err = cs.sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}); err != nil {
return fmt.Errorf("send header request: %v", err)
}
}
}
return nil
}

func sendMessageById(ctx context.Context, sentryClient proto_sentry.SentryClient, outreq *proto_sentry.SendMessageByIdRequest) error {
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
_, err := sentryClient.SendMessageById(callCtx, outreq, &grpc.EmptyCallOption{})
return err
}

func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sentry.InboundMessage) error {
rlpStream := rlp.NewStream(bytes.NewReader(inreq.Data), uint64(len(inreq.Data)))
_, err := rlpStream.List()
Expand All @@ -400,12 +376,16 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sent
}
var headersRaw [][]byte
var headerRaw []byte
for headerRaw, err = rlpStream.Raw(); err == nil; headerRaw, err = rlpStream.Raw() {
for headerRaw, err = rlpStream.Raw(); ; headerRaw, err = rlpStream.Raw() {
if err != nil {
if !errors.Is(err, rlp.EOL) {
return fmt.Errorf("decode BlockHeaders: %w", err)
}
break
}

headersRaw = append(headersRaw, headerRaw)
}
if err != nil && !errors.Is(err, rlp.EOL) {
return fmt.Errorf("decode BlockHeaders: %w", err)
}
headers := make([]*types.Header, len(headersRaw))
var heighestBlock uint64
var i int
Expand All @@ -429,9 +409,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sent
PeerId: inreq.PeerId,
Penalty: proto_sentry.PenaltyKind_Kick, // TODO: Extend penalty kinds
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
if _, err1 := cs.sentryClient.PenalizePeer(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
if _, err1 := cs.sentryClient.PenalizePeer(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
log.Error("Could not send penalty", "err", err1)
}
}
Expand All @@ -442,9 +420,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, inreq *proto_sent
PeerId: inreq.PeerId,
MinBlock: heighestBlock,
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
if _, err1 := cs.sentryClient.PeerMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
if _, err1 := cs.sentryClient.PeerMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
log.Error("Could not send min block for peer", "err", err1)
}
//log.Info("HeadersMsg processed")
Expand Down Expand Up @@ -479,9 +455,7 @@ func (cs *ControlServerImpl) newBlock(ctx context.Context, inreq *proto_sentry.I
PeerId: inreq.PeerId,
Penalty: proto_sentry.PenaltyKind_Kick, // TODO: Extend penalty kinds
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
if _, err1 := cs.sentryClient.PenalizePeer(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
if _, err1 := cs.sentryClient.PenalizePeer(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
log.Error("Could not send penalty", "err", err1)
}
}
Expand All @@ -493,9 +467,7 @@ func (cs *ControlServerImpl) newBlock(ctx context.Context, inreq *proto_sentry.I
PeerId: inreq.PeerId,
MinBlock: request.Block.NumberU64(),
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
if _, err1 := cs.sentryClient.PeerMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
if _, err1 := cs.sentryClient.PeerMinBlock(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
log.Error("Could not send min block for peer", "err", err1)
}
log.Info(fmt.Sprintf("NewBlockMsg{blockNumber: %d} from [%s]", request.Block.NumberU64(), gointerfaces.ConvertH512ToBytes(inreq.PeerId)))
Expand Down Expand Up @@ -665,9 +637,7 @@ func (cs *ControlServerImpl) getBlockHeaders(ctx context.Context, inreq *proto_s
Data: b,
},
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
_, err = cs.sentryClient.SendMessageById(callCtx, &outreq, &grpc.EmptyCallOption{})
_, err = cs.sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("send header response: %v", err)
}
Expand Down Expand Up @@ -721,9 +691,7 @@ func (cs *ControlServerImpl) getBlockBodies(ctx context.Context, inreq *proto_se
Data: b,
},
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
_, err = cs.sentryClient.SendMessageById(callCtx, &outreq, &grpc.EmptyCallOption{})
_, err = cs.sentryClient.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("send bodies response: %v", err)
}
Expand Down Expand Up @@ -787,9 +755,7 @@ func (cs *ControlServerImpl) sendHeaderRequest(ctx context.Context, req *headerd
Data: bytes,
},
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{})
sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{})
if err1 != nil {
log.Error("Could not send header request", "err", err1)
return nil
Expand All @@ -816,9 +782,7 @@ func (cs *ControlServerImpl) sendBodyRequest(ctx context.Context, req *bodydownl
Data: bytes,
},
}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(callCtx, &outreq, &grpc.EmptyCallOption{})
sentPeers, err1 := cs.sentryClient.SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{})
if err1 != nil {
log.Error("Could not send block bodies request", "err", err1)
return nil
Expand All @@ -831,9 +795,7 @@ func (cs *ControlServerImpl) sendBodyRequest(ctx context.Context, req *bodydownl

func (cs *ControlServerImpl) penalise(ctx context.Context, peer []byte) {
penalizeReq := proto_sentry.PenalizePeerRequest{PeerId: gointerfaces.ConvertBytesToH512(peer), Penalty: proto_sentry.PenaltyKind_Kick}
callCtx, cancel := context.WithCancel(ctx)
defer cancel()
if _, err := cs.sentryClient.PenalizePeer(callCtx, &penalizeReq, &grpc.EmptyCallOption{}); err != nil {
if _, err := cs.sentryClient.PenalizePeer(ctx, &penalizeReq, &grpc.EmptyCallOption{}); err != nil {
log.Error("Could not penalise", "peer", peer, "error", err)
}
}
19 changes: 12 additions & 7 deletions cmd/headers/download/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ func runPeer(
return fmt.Errorf("handshake to peer %s: %v", peerID, err)
}
// Read handshake message
msg, err := rw.ReadMsg()
if err != nil {
return err
msg, err1 := rw.ReadMsg()
if err1 != nil {
return err1
}

if msg.Code != eth.StatusMsg {
Expand All @@ -193,9 +193,9 @@ func runPeer(
}
// Decode the handshake and make sure everything matches
var status eth.StatusPacket
if err = msg.Decode(&status); err != nil {
if err1 = msg.Decode(&status); err1 != nil {
msg.Discard()
return fmt.Errorf("decode message %v: %v", msg, err)
return fmt.Errorf("decode message %v: %v", msg, err1)
}
msg.Discard()
if status.NetworkID != networkID {
Expand All @@ -207,12 +207,17 @@ func runPeer(
if status.Genesis != genesisHash {
return fmt.Errorf("genesis hash does not match: theirs %x, ours %x", status.Genesis, genesisHash)
}
if err = forkFilter(status.ForkID); err != nil {
return fmt.Errorf("%v", err)
if err1 = forkFilter(status.ForkID); err1 != nil {
return fmt.Errorf("%v", err1)
}
//log.Info(fmt.Sprintf("[%s] Received status message OK", peerID), "name", peer.Name())

for {
var err error
if err = common.Stopped(ctx.Done()); err != nil {
return err
}

if _, ok := peerRwMap.Load(peerID); !ok {
return fmt.Errorf("peer has been penalized")
}
Expand Down

0 comments on commit 3350f06

Please sign in to comment.