From 1224e75ba1af59d5ed074db766d0bd4e9b77ea64 Mon Sep 17 00:00:00 2001 From: Ivan Martinez Date: Sat, 18 Apr 2020 06:45:20 -0400 Subject: [PATCH] Add WaitForSynced to beacon node for validator startup (#5366) * Add WaitForSynced to beacon-chain * Fix build issues * Fix comment * Merge branch 'master' of https://github.com/prysmaticlabs/prysm into simplify-validator * Fix tests * Merge branch 'master' of https://github.com/prysmaticlabs/prysm into simplify-validator * Merge branch 'master' of https://github.com/prysmaticlabs/prysm into simplify-validator * Merge branch 'master' into simplify-validator * Merge branch 'master' into simplify-validator * Merge branch 'master' into simplify-validator --- beacon-chain/core/feed/state/events.go | 8 + .../beacon_node_validator_service_mock.go | 204 ++++++++++++++---- beacon-chain/rpc/validator/server.go | 47 +++- beacon-chain/rpc/validator/server_test.go | 119 ++++++++++ beacon-chain/sync/initial-sync/service.go | 18 ++ 5 files changed, 349 insertions(+), 47 deletions(-) diff --git a/beacon-chain/core/feed/state/events.go b/beacon-chain/core/feed/state/events.go index 6789267002ac..f1b5caee1559 100644 --- a/beacon-chain/core/feed/state/events.go +++ b/beacon-chain/core/feed/state/events.go @@ -9,6 +9,8 @@ const ( ChainStarted // Initialized is sent when the internal beacon node's state is ready to be accessed. Initialized + // Synced is sent when the beacon node has completed syncing and is ready to participate in the network. + Synced ) // BlockProcessedData is the data sent with BlockProcessed events. @@ -27,6 +29,12 @@ type ChainStartedData struct { StartTime time.Time } +// SyncedData is the data sent with Synced events. +type SyncedData struct { + // StartTime is the time at which the chain started. + StartTime time.Time +} + // InitializedData is the data sent with Initialized events. type InitializedData struct { // StartTime is the time at which the chain started. diff --git a/beacon-chain/rpc/testing/beacon_node_validator_service_mock.go b/beacon-chain/rpc/testing/beacon_node_validator_service_mock.go index 56b2070e483e..36642b5802d9 100644 --- a/beacon-chain/rpc/testing/beacon_node_validator_service_mock.go +++ b/beacon-chain/rpc/testing/beacon_node_validator_service_mock.go @@ -1,42 +1,160 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/prysmaticlabs/ethereumapis/eth/v1alpha1 (interfaces: BeaconNodeValidator_WaitForActivationServer,BeaconNodeValidator_WaitForChainStartServer) +// Source: github.com/prysmaticlabs/ethereumapis/eth/v1alpha1 (interfaces: BeaconNodeValidator_WaitForSyncedServer,BeaconNodeValidator_WaitForActivationServer,BeaconNodeValidator_WaitForChainStartServer) -// Package testing is a generated GoMock package. +// Package mock is a generated GoMock package. package testing import ( context "context" - reflect "reflect" - gomock "github.com/golang/mock/gomock" - v1alpha1 "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" metadata "google.golang.org/grpc/metadata" + reflect "reflect" ) -// MockBeaconNodeValidator_WaitForActivationServer is a mock of BeaconNodeValidator_WaitForActivationServer interface +// MockBeaconNodeValidator_WaitForSyncedServer is a mock of BeaconNodeValidator_WaitForSyncedServer interface. +type MockBeaconNodeValidator_WaitForSyncedServer struct { + ctrl *gomock.Controller + recorder *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder +} + +// MockBeaconNodeValidator_WaitForSyncedServerMockRecorder is the mock recorder for MockBeaconNodeValidator_WaitForSyncedServer. +type MockBeaconNodeValidator_WaitForSyncedServerMockRecorder struct { + mock *MockBeaconNodeValidator_WaitForSyncedServer +} + +// NewMockBeaconNodeValidator_WaitForSyncedServer creates a new mock instance. +func NewMockBeaconNodeValidator_WaitForSyncedServer(ctrl *gomock.Controller) *MockBeaconNodeValidator_WaitForSyncedServer { + mock := &MockBeaconNodeValidator_WaitForSyncedServer{ctrl: ctrl} + mock.recorder = &MockBeaconNodeValidator_WaitForSyncedServerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) EXPECT() *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder { + return m.recorder +} + +// Context mocks base method. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBeaconNodeValidator_WaitForSyncedServer)(nil).Context)) +} + +// RecvMsg mocks base method. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForSyncedServer)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) Send(arg0 *eth.SyncedResponse) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockBeaconNodeValidator_WaitForSyncedServer)(nil).Send), arg0) +} + +// SendHeader mocks base method. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) SendHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendHeader indicates an expected call of SendHeader. +func (mr *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockBeaconNodeValidator_WaitForSyncedServer)(nil).SendHeader), arg0) +} + +// SendMsg mocks base method. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForSyncedServer)(nil).SendMsg), arg0) +} + +// SetHeader mocks base method. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) SetHeader(arg0 metadata.MD) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetHeader", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SetHeader indicates an expected call of SetHeader. +func (mr *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockBeaconNodeValidator_WaitForSyncedServer)(nil).SetHeader), arg0) +} + +// SetTrailer mocks base method. +func (m *MockBeaconNodeValidator_WaitForSyncedServer) SetTrailer(arg0 metadata.MD) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetTrailer", arg0) +} + +// SetTrailer indicates an expected call of SetTrailer. +func (mr *MockBeaconNodeValidator_WaitForSyncedServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockBeaconNodeValidator_WaitForSyncedServer)(nil).SetTrailer), arg0) +} + +// MockBeaconNodeValidator_WaitForActivationServer is a mock of BeaconNodeValidator_WaitForActivationServer interface. type MockBeaconNodeValidator_WaitForActivationServer struct { ctrl *gomock.Controller recorder *MockBeaconNodeValidator_WaitForActivationServerMockRecorder } -// MockBeaconNodeValidator_WaitForActivationServerMockRecorder is the mock recorder for MockBeaconNodeValidator_WaitForActivationServer +// MockBeaconNodeValidator_WaitForActivationServerMockRecorder is the mock recorder for MockBeaconNodeValidator_WaitForActivationServer. type MockBeaconNodeValidator_WaitForActivationServerMockRecorder struct { mock *MockBeaconNodeValidator_WaitForActivationServer } -// NewMockBeaconNodeValidator_WaitForActivationServer creates a new mock instance +// NewMockBeaconNodeValidator_WaitForActivationServer creates a new mock instance. func NewMockBeaconNodeValidator_WaitForActivationServer(ctrl *gomock.Controller) *MockBeaconNodeValidator_WaitForActivationServer { mock := &MockBeaconNodeValidator_WaitForActivationServer{ctrl: ctrl} mock.recorder = &MockBeaconNodeValidator_WaitForActivationServerMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockBeaconNodeValidator_WaitForActivationServer) EXPECT() *MockBeaconNodeValidator_WaitForActivationServerMockRecorder { return m.recorder } -// Context mocks base method +// Context mocks base method. func (m *MockBeaconNodeValidator_WaitForActivationServer) Context() context.Context { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Context") @@ -44,13 +162,13 @@ func (m *MockBeaconNodeValidator_WaitForActivationServer) Context() context.Cont return ret0 } -// Context indicates an expected call of Context +// Context indicates an expected call of Context. func (mr *MockBeaconNodeValidator_WaitForActivationServerMockRecorder) Context() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBeaconNodeValidator_WaitForActivationServer)(nil).Context)) } -// RecvMsg mocks base method +// RecvMsg mocks base method. func (m *MockBeaconNodeValidator_WaitForActivationServer) RecvMsg(arg0 interface{}) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RecvMsg", arg0) @@ -58,27 +176,27 @@ func (m *MockBeaconNodeValidator_WaitForActivationServer) RecvMsg(arg0 interface return ret0 } -// RecvMsg indicates an expected call of RecvMsg +// RecvMsg indicates an expected call of RecvMsg. func (mr *MockBeaconNodeValidator_WaitForActivationServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForActivationServer)(nil).RecvMsg), arg0) } -// Send mocks base method -func (m *MockBeaconNodeValidator_WaitForActivationServer) Send(arg0 *v1alpha1.ValidatorActivationResponse) error { +// Send mocks base method. +func (m *MockBeaconNodeValidator_WaitForActivationServer) Send(arg0 *eth.ValidatorActivationResponse) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Send", arg0) ret0, _ := ret[0].(error) return ret0 } -// Send indicates an expected call of Send +// Send indicates an expected call of Send. func (mr *MockBeaconNodeValidator_WaitForActivationServerMockRecorder) Send(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockBeaconNodeValidator_WaitForActivationServer)(nil).Send), arg0) } -// SendHeader mocks base method +// SendHeader mocks base method. func (m *MockBeaconNodeValidator_WaitForActivationServer) SendHeader(arg0 metadata.MD) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendHeader", arg0) @@ -86,13 +204,13 @@ func (m *MockBeaconNodeValidator_WaitForActivationServer) SendHeader(arg0 metada return ret0 } -// SendHeader indicates an expected call of SendHeader +// SendHeader indicates an expected call of SendHeader. func (mr *MockBeaconNodeValidator_WaitForActivationServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockBeaconNodeValidator_WaitForActivationServer)(nil).SendHeader), arg0) } -// SendMsg mocks base method +// SendMsg mocks base method. func (m *MockBeaconNodeValidator_WaitForActivationServer) SendMsg(arg0 interface{}) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendMsg", arg0) @@ -100,13 +218,13 @@ func (m *MockBeaconNodeValidator_WaitForActivationServer) SendMsg(arg0 interface return ret0 } -// SendMsg indicates an expected call of SendMsg +// SendMsg indicates an expected call of SendMsg. func (mr *MockBeaconNodeValidator_WaitForActivationServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForActivationServer)(nil).SendMsg), arg0) } -// SetHeader mocks base method +// SetHeader mocks base method. func (m *MockBeaconNodeValidator_WaitForActivationServer) SetHeader(arg0 metadata.MD) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SetHeader", arg0) @@ -114,48 +232,48 @@ func (m *MockBeaconNodeValidator_WaitForActivationServer) SetHeader(arg0 metadat return ret0 } -// SetHeader indicates an expected call of SetHeader +// SetHeader indicates an expected call of SetHeader. func (mr *MockBeaconNodeValidator_WaitForActivationServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockBeaconNodeValidator_WaitForActivationServer)(nil).SetHeader), arg0) } -// SetTrailer mocks base method +// SetTrailer mocks base method. func (m *MockBeaconNodeValidator_WaitForActivationServer) SetTrailer(arg0 metadata.MD) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetTrailer", arg0) } -// SetTrailer indicates an expected call of SetTrailer +// SetTrailer indicates an expected call of SetTrailer. func (mr *MockBeaconNodeValidator_WaitForActivationServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockBeaconNodeValidator_WaitForActivationServer)(nil).SetTrailer), arg0) } -// MockBeaconNodeValidator_WaitForChainStartServer is a mock of BeaconNodeValidator_WaitForChainStartServer interface +// MockBeaconNodeValidator_WaitForChainStartServer is a mock of BeaconNodeValidator_WaitForChainStartServer interface. type MockBeaconNodeValidator_WaitForChainStartServer struct { ctrl *gomock.Controller recorder *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder } -// MockBeaconNodeValidator_WaitForChainStartServerMockRecorder is the mock recorder for MockBeaconNodeValidator_WaitForChainStartServer +// MockBeaconNodeValidator_WaitForChainStartServerMockRecorder is the mock recorder for MockBeaconNodeValidator_WaitForChainStartServer. type MockBeaconNodeValidator_WaitForChainStartServerMockRecorder struct { mock *MockBeaconNodeValidator_WaitForChainStartServer } -// NewMockBeaconNodeValidator_WaitForChainStartServer creates a new mock instance +// NewMockBeaconNodeValidator_WaitForChainStartServer creates a new mock instance. func NewMockBeaconNodeValidator_WaitForChainStartServer(ctrl *gomock.Controller) *MockBeaconNodeValidator_WaitForChainStartServer { mock := &MockBeaconNodeValidator_WaitForChainStartServer{ctrl: ctrl} mock.recorder = &MockBeaconNodeValidator_WaitForChainStartServerMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockBeaconNodeValidator_WaitForChainStartServer) EXPECT() *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder { return m.recorder } -// Context mocks base method +// Context mocks base method. func (m *MockBeaconNodeValidator_WaitForChainStartServer) Context() context.Context { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Context") @@ -163,13 +281,13 @@ func (m *MockBeaconNodeValidator_WaitForChainStartServer) Context() context.Cont return ret0 } -// Context indicates an expected call of Context +// Context indicates an expected call of Context. func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) Context() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).Context)) } -// RecvMsg mocks base method +// RecvMsg mocks base method. func (m *MockBeaconNodeValidator_WaitForChainStartServer) RecvMsg(arg0 interface{}) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RecvMsg", arg0) @@ -177,27 +295,27 @@ func (m *MockBeaconNodeValidator_WaitForChainStartServer) RecvMsg(arg0 interface return ret0 } -// RecvMsg indicates an expected call of RecvMsg +// RecvMsg indicates an expected call of RecvMsg. func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).RecvMsg), arg0) } -// Send mocks base method -func (m *MockBeaconNodeValidator_WaitForChainStartServer) Send(arg0 *v1alpha1.ChainStartResponse) error { +// Send mocks base method. +func (m *MockBeaconNodeValidator_WaitForChainStartServer) Send(arg0 *eth.ChainStartResponse) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Send", arg0) ret0, _ := ret[0].(error) return ret0 } -// Send indicates an expected call of Send +// Send indicates an expected call of Send. func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) Send(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).Send), arg0) } -// SendHeader mocks base method +// SendHeader mocks base method. func (m *MockBeaconNodeValidator_WaitForChainStartServer) SendHeader(arg0 metadata.MD) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendHeader", arg0) @@ -205,13 +323,13 @@ func (m *MockBeaconNodeValidator_WaitForChainStartServer) SendHeader(arg0 metada return ret0 } -// SendHeader indicates an expected call of SendHeader +// SendHeader indicates an expected call of SendHeader. func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) SendHeader(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeader", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).SendHeader), arg0) } -// SendMsg mocks base method +// SendMsg mocks base method. func (m *MockBeaconNodeValidator_WaitForChainStartServer) SendMsg(arg0 interface{}) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendMsg", arg0) @@ -219,13 +337,13 @@ func (m *MockBeaconNodeValidator_WaitForChainStartServer) SendMsg(arg0 interface return ret0 } -// SendMsg indicates an expected call of SendMsg +// SendMsg indicates an expected call of SendMsg. func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).SendMsg), arg0) } -// SetHeader mocks base method +// SetHeader mocks base method. func (m *MockBeaconNodeValidator_WaitForChainStartServer) SetHeader(arg0 metadata.MD) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SetHeader", arg0) @@ -233,19 +351,19 @@ func (m *MockBeaconNodeValidator_WaitForChainStartServer) SetHeader(arg0 metadat return ret0 } -// SetHeader indicates an expected call of SetHeader +// SetHeader indicates an expected call of SetHeader. func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) SetHeader(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHeader", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).SetHeader), arg0) } -// SetTrailer mocks base method +// SetTrailer mocks base method. func (m *MockBeaconNodeValidator_WaitForChainStartServer) SetTrailer(arg0 metadata.MD) { m.ctrl.T.Helper() m.ctrl.Call(m, "SetTrailer", arg0) } -// SetTrailer indicates an expected call of SetTrailer +// SetTrailer indicates an expected call of SetTrailer. func (mr *MockBeaconNodeValidator_WaitForChainStartServerMockRecorder) SetTrailer(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTrailer", reflect.TypeOf((*MockBeaconNodeValidator_WaitForChainStartServer)(nil).SetTrailer), arg0) diff --git a/beacon-chain/rpc/validator/server.go b/beacon-chain/rpc/validator/server.go index 648662c61576..5fb68764e653 100644 --- a/beacon-chain/rpc/validator/server.go +++ b/beacon-chain/rpc/validator/server.go @@ -7,6 +7,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" ptypes "github.com/gogo/protobuf/types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -90,7 +91,8 @@ func (vs *Server) WaitForActivation(req *ethpb.ValidatorActivationRequest, strea for { select { - case <-time.After(6 * time.Second): + // Pinging every slot for activation. + case <-time.After(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second): activeValidatorExists, validatorStatuses, err := vs.multipleValidatorStatus(stream.Context(), req.PublicKeys) if err != nil { return status.Errorf(codes.Internal, "Could not fetch validator status: %v", err) @@ -196,7 +198,44 @@ func (vs *Server) WaitForChainStart(req *ptypes.Empty, stream ethpb.BeaconNodeVa } } -// WaitForSynced is to be implemented. -func (vs *Server) WaitForSynced(_ *ptypes.Empty, stream ethpb.BeaconNodeValidator_WaitForSyncedServer) error { - return status.Error(codes.Unimplemented, "not implemented") +// WaitForSynced subscribes to the state channel and ends the stream when the state channel +// indicates the beacon node has been initialized and is ready +func (vs *Server) WaitForSynced(req *ptypes.Empty, stream ethpb.BeaconNodeValidator_WaitForSyncedServer) error { + head, err := vs.HeadFetcher.HeadState(context.Background()) + if err != nil { + return status.Errorf(codes.Internal, "Could not retrieve head state: %v", err) + } + if head != nil && !vs.SyncChecker.Syncing() { + res := ðpb.SyncedResponse{ + Synced: true, + GenesisTime: head.GenesisTime(), + } + return stream.Send(res) + } + + stateChannel := make(chan *feed.Event, 1) + stateSub := vs.StateNotifier.StateFeed().Subscribe(stateChannel) + defer stateSub.Unsubscribe() + for { + select { + case event := <-stateChannel: + if event.Type == statefeed.Synced { + data, ok := event.Data.(*statefeed.SyncedData) + if !ok { + return errors.New("event data is not type *statefeed.SyncedData") + } + log.WithField("starttime", data.StartTime).Debug("Received sync completed event") + log.Info("Sending genesis time notification to connected validator clients") + res := ðpb.SyncedResponse{ + Synced: true, + GenesisTime: uint64(data.StartTime.Unix()), + } + return stream.Send(res) + } + case <-stateSub.Err(): + return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine") + case <-vs.Ctx.Done(): + return status.Error(codes.Canceled, "Context canceled") + } + } } diff --git a/beacon-chain/rpc/validator/server_test.go b/beacon-chain/rpc/validator/server_test.go index 8277d00b98b8..6638b005f4a4 100644 --- a/beacon-chain/rpc/validator/server_test.go +++ b/beacon-chain/rpc/validator/server_test.go @@ -22,6 +22,7 @@ import ( internal "github.com/prysmaticlabs/prysm/beacon-chain/rpc/testing" mockRPC "github.com/prysmaticlabs/prysm/beacon-chain/rpc/testing" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" pbp2p "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/event" @@ -333,3 +334,121 @@ func TestWaitForChainStart_NotStartedThenLogFired(t *testing.T) { exitRoutine <- true testutil.AssertLogsContain(t, hook, "Sending genesis time") } + +func TestWaitForSynced_ContextClosed(t *testing.T) { + db := dbutil.SetupDB(t) + defer dbutil.TeardownDB(t, db) + ctx := context.Background() + + ctx, cancel := context.WithCancel(context.Background()) + chainService := &mockChain.ChainService{} + Server := &Server{ + Ctx: ctx, + ChainStartFetcher: &mockPOW.FaultyMockPOWChain{ + ChainFeed: new(event.Feed), + }, + StateNotifier: chainService.StateNotifier(), + BeaconDB: db, + HeadFetcher: chainService, + } + + exitRoutine := make(chan bool) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStream := mockRPC.NewMockBeaconNodeValidator_WaitForSyncedServer(ctrl) + go func(tt *testing.T) { + if err := Server.WaitForSynced(&ptypes.Empty{}, mockStream); err == nil || !strings.Contains(err.Error(), "Context canceled") { + tt.Errorf("Could not call RPC method: %v", err) + } + <-exitRoutine + }(t) + cancel() + exitRoutine <- true +} + +func TestWaitForSynced_AlreadySynced(t *testing.T) { + db := dbutil.SetupDB(t) + defer dbutil.TeardownDB(t, db) + ctx := context.Background() + headBlockRoot := [32]byte{0x01, 0x02} + trie := testutil.NewBeaconState() + if err := trie.SetSlot(3); err != nil { + t.Fatal(err) + } + if err := db.SaveState(ctx, trie, headBlockRoot); err != nil { + t.Fatal(err) + } + if err := db.SaveHeadBlockRoot(ctx, headBlockRoot); err != nil { + t.Fatal(err) + } + + chainService := &mockChain.ChainService{State: trie} + Server := &Server{ + Ctx: context.Background(), + ChainStartFetcher: &mockPOW.POWChain{ + ChainFeed: new(event.Feed), + }, + BeaconDB: db, + StateNotifier: chainService.StateNotifier(), + HeadFetcher: chainService, + SyncChecker: &mockSync.Sync{IsSyncing: false}, + } + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStream := mockRPC.NewMockBeaconNodeValidator_WaitForSyncedServer(ctrl) + mockStream.EXPECT().Send( + ðpb.SyncedResponse{ + Synced: true, + GenesisTime: uint64(time.Unix(0, 0).Unix()), + }, + ).Return(nil) + if err := Server.WaitForSynced(&ptypes.Empty{}, mockStream); err != nil { + t.Errorf("Could not call RPC method: %v", err) + } +} + +func TestWaitForSynced_NotStartedThenLogFired(t *testing.T) { + db := dbutil.SetupDB(t) + defer dbutil.TeardownDB(t, db) + + hook := logTest.NewGlobal() + chainService := &mockChain.ChainService{} + Server := &Server{ + Ctx: context.Background(), + ChainStartFetcher: &mockPOW.FaultyMockPOWChain{ + ChainFeed: new(event.Feed), + }, + BeaconDB: db, + StateNotifier: chainService.StateNotifier(), + HeadFetcher: chainService, + } + exitRoutine := make(chan bool) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockStream := mockRPC.NewMockBeaconNodeValidator_WaitForSyncedServer(ctrl) + mockStream.EXPECT().Send( + ðpb.SyncedResponse{ + Synced: true, + GenesisTime: uint64(time.Unix(0, 0).Unix()), + }, + ).Return(nil) + go func(tt *testing.T) { + if err := Server.WaitForSynced(&ptypes.Empty{}, mockStream); err != nil { + tt.Errorf("Could not call RPC method: %v", err) + } + <-exitRoutine + }(t) + + // Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed). + for sent := 0; sent == 0; { + sent = Server.StateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Synced, + Data: &statefeed.SyncedData{ + StartTime: time.Unix(0, 0), + }, + }) + } + + exitRoutine <- true + testutil.AssertLogsContain(t, hook, "Sending genesis time") +} diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 6ade46221e30..1fa04e8adffd 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -120,6 +120,12 @@ func (s *Service) Start() { if helpers.SlotToEpoch(currentSlot) == 0 { log.Info("Chain started within the last epoch - not syncing") s.synced = true + s.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Synced, + Data: &statefeed.SyncedData{ + StartTime: genesis, + }, + }) return } log.Info("Starting initial chain sync...") @@ -127,6 +133,12 @@ func (s *Service) Start() { if helpers.SlotToEpoch(s.chain.HeadSlot()) == helpers.SlotToEpoch(currentSlot) { log.Info("Already synced to the current chain head") s.synced = true + s.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Synced, + Data: &statefeed.SyncedData{ + StartTime: genesis, + }, + }) return } s.waitForMinimumPeers() @@ -135,6 +147,12 @@ func (s *Service) Start() { } log.Infof("Synced up to slot %d", s.chain.HeadSlot()) s.synced = true + s.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Synced, + Data: &statefeed.SyncedData{ + StartTime: genesis, + }, + }) } // Stop initial sync.