Skip to content

Commit

Permalink
Register Subscribers After Node Is Synced (#7468)
Browse files Browse the repository at this point in the history
* wait for synced

* fix again

* add test

* fix all

* fixes deepsource reported issue

Co-authored-by: Victor Farazdagi <simple.square@gmail.com>
  • Loading branch information
nisdas and farazdagi committed Oct 10, 2020
1 parent 4c09e59 commit 43765b5
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 66 deletions.
1 change: 1 addition & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_test(
"//beacon-chain/sync:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/initial-sync/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package initialsync

import (
"context"
"fmt"
"testing"

eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand Down Expand Up @@ -333,6 +332,7 @@ func TestService_processBlock(t *testing.T) {
Epoch: 0,
},
},
StateNotifier: &mock.MockStateNotifier{},
})
ctx := context.Background()
genesis := makeGenesisTime(32)
Expand Down Expand Up @@ -392,6 +392,7 @@ func TestService_processBlockBatch(t *testing.T) {
Epoch: 0,
},
},
StateNotifier: &mock.MockStateNotifier{},
})
ctx := context.Background()
genesis := makeGenesisTime(32)
Expand Down Expand Up @@ -439,8 +440,7 @@ func TestService_processBlockBatch(t *testing.T) {
ctx context.Context, blocks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error {
return nil
})
expectedErr := fmt.Sprintf("no good blocks in batch")
assert.ErrorContains(t, expectedErr, err)
assert.ErrorContains(t, "no good blocks in batch", err)

var badBatch2 []*eth.SignedBeaconBlock
for i, b := range batch2 {
Expand Down
33 changes: 18 additions & 15 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,33 @@ type Service struct {
stateNotifier statefeed.Notifier
counter *ratecounter.RateCounter
lastProcessedSlot uint64
genesisChan chan time.Time
}

// NewService configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
s := &Service{
ctx: ctx,
cancel: cancel,
chain: cfg.Chain,
p2p: cfg.P2P,
db: cfg.DB,
stateNotifier: cfg.StateNotifier,
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
genesisChan: make(chan time.Time),
}
go s.waitForStateInitialization()
return s
}

// Start the initial sync service.
func (s *Service) Start() {
genesis, err := s.waitForStateInitialization()
if err != nil {
// Wait for state initialized event.
genesis := <-s.genesisChan
if genesis.IsZero() {
log.Debug("Exiting Initial Sync Service")
return
}
if flags.Get().DisableSync {
Expand Down Expand Up @@ -169,15 +175,7 @@ func (s *Service) waitForMinimumPeers() {

// waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either
// already properly configured or system waits up until state initialized event is triggered.
func (s *Service) waitForStateInitialization() (time.Time, error) {
headState, err := s.chain.HeadState(s.ctx)
if err != nil {
return time.Time{}, err
}
if headState != nil {
return time.Unix(int64(headState.GenesisTime()), 0), nil
}

func (s *Service) waitForStateInitialization() {
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
Expand All @@ -193,14 +191,19 @@ func (s *Service) waitForStateInitialization() (time.Time, error) {
continue
}
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
return data.StartTime, nil
s.genesisChan <- data.StartTime
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return time.Time{}, errors.New("context closed")
// Send a zero time in the event we are exiting.
s.genesisChan <- time.Time{}
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state notifier failed")
return time.Time{}, err
// Send a zero time in the event we are exiting.
s.genesisChan <- time.Time{}
return
}
}
}
Expand Down
134 changes: 97 additions & 37 deletions beacon-chain/sync/initial-sync/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
Expand All @@ -32,6 +33,7 @@ func TestService_InitStartStop(t *testing.T) {
tests := []struct {
name string
assert func()
methodRuns func(fd *event.Feed)
chainService func() *mock.ChainService
}{
{
Expand All @@ -45,36 +47,55 @@ func TestService_InitStartStop(t *testing.T) {
chainService: func() *mock.ChainService {
// Set to future time (genesis time hasn't arrived yet).
st := testutil.NewBeaconState()
require.NoError(t, st.SetGenesisTime(uint64(time.Unix(4113849600, 0).Unix())))

return &mock.ChainService{
State: st,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
}
},
methodRuns: func(fd *event.Feed) {
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Unix(4113849600, 0),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
{
name: "zeroth epoch",
chainService: func() *mock.ChainService {
// Set to nearby slot.
st := testutil.NewBeaconState()
require.NoError(t, st.SetGenesisTime(uint64(time.Now().Add(-5*time.Minute).Unix())))
return &mock.ChainService{
State: st,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
}
},
methodRuns: func(fd *event.Feed) {
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: time.Now().Add(-5 * time.Minute),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Chain started within the last epoch - not syncing")
assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
{
Expand All @@ -83,7 +104,6 @@ func TestService_InitStartStop(t *testing.T) {
// Set to some future slot, and then make sure that current head matches it.
st := testutil.NewBeaconState()
futureSlot := uint64(27354)
require.NoError(t, st.SetGenesisTime(uint64(makeGenesisTime(futureSlot).Unix())))
require.NoError(t, st.SetSlot(futureSlot))
return &mock.ChainService{
State: st,
Expand All @@ -92,19 +112,33 @@ func TestService_InitStartStop(t *testing.T) {
},
}
},
methodRuns: func(fd *event.Feed) {
futureSlot := uint64(27354)
// Send valid event.
fd.Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: makeGenesisTime(futureSlot),
GenesisValidatorsRoot: make([]byte, 32),
},
})
},
assert: func() {
assert.LogsContain(t, hook, "Starting initial chain sync...")
assert.LogsContain(t, hook, "Already synced to the current chain head")
assert.LogsDoNotContain(t, hook, "Chain started within the last epoch - not syncing")
assert.LogsDoNotContain(t, hook, "Genesis time has not arrived - not syncing")
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Waiting for state to be initialized")
},
},
}

p := p2pt.NewTestP2P(t)
connectPeers(t, p, []*peerData{}, p.Peers())
for _, tt := range tests {
for i, tt := range tests {
if i == 0 {
continue
}
t.Run(tt.name, func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -114,27 +148,34 @@ func TestService_InitStartStop(t *testing.T) {
if tt.chainService != nil {
mc = tt.chainService()
}
// Initialize feed
notifier := &mock.MockStateNotifier{}
s := NewService(ctx, &Config{
P2P: p,
Chain: mc,
StateNotifier: mc.StateNotifier(),
StateNotifier: notifier,
})
time.Sleep(500 * time.Millisecond)
assert.NotNil(t, s)
if tt.methodRuns != nil {
tt.methodRuns(notifier.StateFeed())
}

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.Start()
wg.Done()
}()

go func() {
// Allow to exit from test (on no head loop waiting for head is started).
// In most tests, this is redundant, as Start() already exited.
time.AfterFunc(500*time.Millisecond, func() {
time.AfterFunc(3*time.Second, func() {
cancel()
})
}()
if testutil.WaitTimeout(wg, time.Second*2) {
if testutil.WaitTimeout(wg, time.Second*4) {
t.Fatalf("Test should have exited by now, timed out")
}
tt.assert()
Expand All @@ -153,28 +194,6 @@ func TestService_waitForStateInitialization(t *testing.T) {
return s
}

t.Run("head state exists", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mc := &mock.ChainService{
State: testutil.NewBeaconState(),
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
}
s := newService(ctx, mc)

expectedGenesisTime := time.Unix(25000, 0)
var receivedGenesisTime time.Time
require.NoError(t, mc.State.SetGenesisTime(uint64(expectedGenesisTime.Unix())))
receivedGenesisTime, err := s.waitForStateInitialization()
assert.NoError(t, err)
assert.Equal(t, expectedGenesisTime, receivedGenesisTime)
assert.LogsDoNotContain(t, hook, "Waiting for state to be initialized")
})

t.Run("no state and context close", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -184,8 +203,9 @@ func TestService_waitForStateInitialization(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
_, err := s.waitForStateInitialization()
assert.ErrorContains(t, "context closed", err)
go s.waitForStateInitialization()
currTime := <-s.genesisChan
assert.Equal(t, true, currTime.IsZero())
wg.Done()
}()
go func() {
Expand Down Expand Up @@ -213,9 +233,9 @@ func TestService_waitForStateInitialization(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
var err error
receivedGenesisTime, err = s.waitForStateInitialization()
assert.NoError(t, err)
go s.waitForStateInitialization()
receivedGenesisTime = <-s.genesisChan
assert.Equal(t, false, receivedGenesisTime.IsZero())
wg.Done()
}()
go func() {
Expand Down Expand Up @@ -245,6 +265,46 @@ func TestService_waitForStateInitialization(t *testing.T) {
assert.LogsContain(t, hook, "Received state initialized event")
assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine")
})

t.Run("no state and state init event received and service start", func(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
// Initialize mock feed
_ = s.stateNotifier.StateFeed()

expectedGenesisTime := time.Now().Add(60 * time.Second)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
s.waitForStateInitialization()
wg.Done()
}()

wg.Add(1)
go func() {
time.AfterFunc(500*time.Millisecond, func() {
// Send valid event.
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Initialized,
Data: &statefeed.InitializedData{
StartTime: expectedGenesisTime,
GenesisValidatorsRoot: make([]byte, 32),
},
})
})
s.Start()
wg.Done()
}()

if testutil.WaitTimeout(wg, time.Second*3) {
t.Fatalf("Test should have exited by now, timed out")
}
assert.LogsContain(t, hook, "Waiting for state to be initialized")
assert.LogsContain(t, hook, "Received state initialized event")
assert.LogsDoNotContain(t, hook, "Context closed, exiting goroutine")
})
}

func TestService_markSynced(t *testing.T) {
Expand Down

0 comments on commit 43765b5

Please sign in to comment.