Skip to content

Commit

Permalink
check chainstart (#7494)
Browse files Browse the repository at this point in the history
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
  • Loading branch information
nisdas and rauljordan committed Oct 15, 2020
1 parent 1caf2ca commit 0b64a33
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 12 deletions.
7 changes: 6 additions & 1 deletion beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (s *Service) registerHandlers() {
time.Sleep(timeutils.Until(startTime))
}
log.WithField("starttime", startTime).Debug("Chain started in sync service")
s.chainStarted = true
s.markForChainStart()
}()
case statefeed.Synced:
_, ok := event.Data.(*statefeed.SyncedData)
Expand All @@ -277,6 +277,11 @@ func (s *Service) registerHandlers() {
}
}

// marks the chain as having started.
func (s *Service) markForChainStart() {
s.chainStarted = true
}

// Checker defines a struct which can verify whether a node is currently
// synchronizing a chain with the rest of peers in the network.
type Checker interface {
Expand Down
32 changes: 32 additions & 0 deletions beacon-chain/sync/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
require.Equal(t, true, r.chainStarted, "Did not receive chain start event.")
}

func TestSyncHandlers_WaitForChainStart(t *testing.T) {
p2p := p2ptest.NewTestP2P(t)
chainService := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
r := Service{
ctx: context.Background(),
p2p: p2p,
chain: chainService,
stateNotifier: chainService.StateNotifier(),
initialSync: &mockSync.Sync{IsSyncing: false},
}

go r.registerHandlers()
time.Sleep(100 * time.Millisecond)
i := r.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Now().Add(2 * time.Second),
},
})
if i == 0 {
t.Fatal("didn't send genesis time to subscribers")
}
require.Equal(t, false, r.chainStarted, "Chainstart was marked prematurely")

// wait for chainstart to be sent
time.Sleep(3 * time.Second)
require.Equal(t, true, r.chainStarted, "Did not receive chain start event.")
}

func TestSyncHandlers_WaitTillSynced(t *testing.T) {
p2p := p2ptest.NewTestP2P(t)
chainService := &mockChain.ChainService{
Expand Down
9 changes: 7 additions & 2 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Service) subscribeWithBase(base proto.Message, topic string, validator
topic += s.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)

if err := s.p2p.PubSub().RegisterTopicValidator(wrapAndReportValidation(topic, validator)); err != nil {
if err := s.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
log.WithError(err).Error("Failed to register validator")
}

Expand Down Expand Up @@ -157,13 +157,18 @@ func (s *Service) subscribeWithBase(base proto.Message, topic string, validator

// Wrap the pubsub validator with a metric monitoring function. This function increments the
// appropriate counter if the particular message fails to validate.
func wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (string, pubsub.ValidatorEx) {
func (s *Service) wrapAndReportValidation(topic string, v pubsub.ValidatorEx) (string, pubsub.ValidatorEx) {
return topic, func(ctx context.Context, pid peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
defer messagehandler.HandlePanic(ctx, msg)
res = pubsub.ValidationIgnore // Default: ignore any message that panics.
ctx, cancel := context.WithTimeout(ctx, pubsubMessageTimeout)
defer cancel()
messageReceivedCounter.WithLabelValues(topic).Inc()
// Reject any messages received before chainstart.
if !s.chainStarted {
messageFailedValidationCounter.WithLabelValues(topic).Inc()
return pubsub.ValidationReject
}
b := v(ctx, pid, msg)
if b == pubsub.ValidationReject {
messageFailedValidationCounter.WithLabelValues(topic).Inc()
Expand Down
35 changes: 26 additions & 9 deletions beacon-chain/sync/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
wg.Done()
return nil
})
r.chainStarted = true
r.markForChainStart()

p2p.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, 96)})

Expand Down Expand Up @@ -94,7 +94,7 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
})
beaconState, privKeys := testutil.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.chainStarted = true
r.markForChainStart()
attesterSlashing, err := testutil.GenerateAttesterSlashingForValidator(
beaconState,
privKeys[1],
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
})
beaconState, privKeys := testutil.DeterministicGenesisState(t, 64)
chainService.State = beaconState
r.chainStarted = true
r.markForChainStart()
proposerSlashing, err := testutil.GenerateProposerSlashingForValidator(
beaconState,
privKeys[1],
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
defer wg.Done()
panic("bad")
})
r.chainStarted = true
r.markForChainStart()
p.ReceivePubSub(topic, &pb.SignedVoluntaryExit{Exit: &pb.VoluntaryExit{Epoch: 55}, Signature: make([]byte, 96)})

if testutil.WaitTimeout(&wg, time.Second) {
Expand Down Expand Up @@ -251,23 +251,36 @@ func TestStaticSubnets(t *testing.T) {

func Test_wrapAndReportValidation(t *testing.T) {
type args struct {
topic string
v pubsub.ValidatorEx
pid peer.ID
msg *pubsub.Message
topic string
v pubsub.ValidatorEx
chainstarted bool
pid peer.ID
msg *pubsub.Message
}
tests := []struct {
name string
args args
want pubsub.ValidationResult
}{
{
name: "validator Before chainstart",
args: args{
topic: "foo",
v: func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
return pubsub.ValidationAccept
},
chainstarted: false,
},
want: pubsub.ValidationReject,
},
{
name: "validator panicked",
args: args{
topic: "foo",
v: func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
panic("oh no!")
},
chainstarted: true,
},
want: pubsub.ValidationIgnore,
},
Expand All @@ -278,13 +291,17 @@ func Test_wrapAndReportValidation(t *testing.T) {
v: func(ctx context.Context, id peer.ID, message *pubsub.Message) pubsub.ValidationResult {
return pubsub.ValidationAccept
},
chainstarted: true,
},
want: pubsub.ValidationAccept,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, v := wrapAndReportValidation(tt.args.topic, tt.args.v)
s := &Service{
chainStarted: tt.args.chainstarted,
}
_, v := s.wrapAndReportValidation(tt.args.topic, tt.args.v)
got := v(context.Background(), tt.args.pid, tt.args.msg)
if got != tt.want {
t.Errorf("wrapAndReportValidation() got = %v, want %v", got, tt.want)
Expand Down

0 comments on commit 0b64a33

Please sign in to comment.