/
service.go
227 lines (208 loc) · 6.78 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// Package initialsync includes all initial block download and processing
// logic for the beacon node, using a round robin strategy and a finite-state-machine
// to handle edge-cases in a beacon node's sync status.
package initialsync
import (
"context"
"time"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/abool"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/timeutils"
"github.com/sirupsen/logrus"
)
var _ shared.Service = (*Service)(nil)
// blockchainService defines the interface for interaction with block chain service.
type blockchainService interface {
blockchain.BlockReceiver
blockchain.ChainInfoFetcher
}
// Config to set up the initial sync service.
type Config struct {
P2P p2p.P2P
DB db.ReadOnlyDatabase
Chain blockchainService
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier
}
// Service service.
type Service struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
synced *abool.AtomicBool
chainStarted *abool.AtomicBool
counter *ratecounter.RateCounter
genesisChan chan time.Time
}
// NewService configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
s := &Service{
cfg: cfg,
ctx: ctx,
cancel: cancel,
synced: abool.New(),
chainStarted: abool.New(),
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
genesisChan: make(chan time.Time),
}
go s.waitForStateInitialization()
return s
}
// Start the initial sync service.
func (s *Service) Start() {
// Wait for state initialized event.
genesis := <-s.genesisChan
if genesis.IsZero() {
log.Debug("Exiting Initial Sync Service")
return
}
if flags.Get().DisableSync {
s.markSynced(genesis)
log.WithField("genesisTime", genesis).Info("Due to Sync Being Disabled, entering regular sync immediately.")
return
}
if genesis.After(timeutils.Now()) {
s.markSynced(genesis)
log.WithField("genesisTime", genesis).Info("Genesis time has not arrived - not syncing")
return
}
currentSlot := helpers.SlotsSince(genesis)
if helpers.SlotToEpoch(currentSlot) == 0 {
log.WithField("genesisTime", genesis).Info("Chain started within the last epoch - not syncing")
s.markSynced(genesis)
return
}
s.chainStarted.Set()
log.Info("Starting initial chain sync...")
// Are we already in sync, or close to it?
if helpers.SlotToEpoch(s.cfg.Chain.HeadSlot()) == helpers.SlotToEpoch(currentSlot) {
log.Info("Already synced to the current chain head")
s.markSynced(genesis)
return
}
s.waitForMinimumPeers()
if err := s.roundRobinSync(genesis); err != nil {
if errors.Is(s.ctx.Err(), context.Canceled) {
return
}
panic(err)
}
log.Infof("Synced up to slot %d", s.cfg.Chain.HeadSlot())
s.markSynced(genesis)
}
// Stop initial sync.
func (s *Service) Stop() error {
s.cancel()
return nil
}
// Status of initial sync.
func (s *Service) Status() error {
if s.synced.IsNotSet() && s.chainStarted.IsSet() {
return errors.New("syncing")
}
return nil
}
// Syncing returns true if initial sync is still running.
func (s *Service) Syncing() bool {
return s.synced.IsNotSet()
}
// Initialized returns true if initial sync has been started.
func (s *Service) Initialized() bool {
return s.chainStarted.IsSet()
}
// Synced returns true if initial sync has been completed.
func (s *Service) Synced() bool {
return s.synced.IsSet()
}
// Resync allows a node to start syncing again if it has fallen
// behind the current network head.
func (s *Service) Resync() error {
headState, err := s.cfg.Chain.HeadState(s.ctx)
if err != nil || headState == nil || headState.IsNil() {
return errors.Errorf("could not retrieve head state: %v", err)
}
// Set it to false since we are syncing again.
s.synced.UnSet()
defer func() { s.synced.Set() }() // Reset it at the end of the method.
genesis := time.Unix(int64(headState.GenesisTime()), 0)
s.waitForMinimumPeers()
if err = s.roundRobinSync(genesis); err != nil {
log = log.WithError(err)
}
log.WithField("slot", s.cfg.Chain.HeadSlot()).Info("Resync attempt complete")
return nil
}
func (s *Service) waitForMinimumPeers() {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
_, peers := s.cfg.P2P.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, s.cfg.Chain.FinalizedCheckpt().Epoch)
if len(peers) >= required {
break
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required,
}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}
// waitForStateInitialization makes sure that beacon node is ready to be accessed: it is either
// already properly configured or system waits up until state initialized event is triggered.
func (s *Service) waitForStateInitialization() {
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel)
defer stateSub.Unsubscribe()
log.Info("Waiting for state to be initialized")
for {
select {
case event := <-stateChannel:
if event.Type == statefeed.Initialized {
data, ok := event.Data.(*statefeed.InitializedData)
if !ok {
log.Error("Event feed data is not type *statefeed.InitializedData")
continue
}
log.WithField("starttime", data.StartTime).Debug("Received state initialized event")
s.genesisChan <- data.StartTime
return
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
// Send a zero time in the event we are exiting.
s.genesisChan <- time.Time{}
return
case err := <-stateSub.Err():
log.WithError(err).Error("Subscription to state notifier failed")
// Send a zero time in the event we are exiting.
s.genesisChan <- time.Time{}
return
}
}
}
// markSynced marks node as synced and notifies feed listeners.
func (s *Service) markSynced(genesis time.Time) {
s.synced.Set()
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Synced,
Data: &statefeed.SyncedData{
StartTime: genesis,
},
})
}