/
service.go
155 lines (132 loc) · 5.17 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright © 2021 Weald Technology Trading.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package standard
import (
"context"
eth2client "github.com/attestantio/go-eth2-client"
apiv1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/pkg/errors"
"github.com/rs/zerolog"
zerologger "github.com/rs/zerolog/log"
"github.com/wealdtech/chaind/services/chaindb"
"github.com/wealdtech/chaind/services/chaintime"
"golang.org/x/sync/semaphore"
)
// Service is a sync committee service.
type Service struct {
eventsProvider eth2client.EventsProvider
syncCommitteesProvider eth2client.SyncCommitteesProvider
chainDB chaindb.Service
syncCommitteesSetter chaindb.SyncCommitteesSetter
chainTime chaintime.Service
activitySem *semaphore.Weighted
epochsPerSyncCommitteePeriod uint64
}
// module-wide log.
var log zerolog.Logger
// New creates a new service.
func New(ctx context.Context, params ...Parameter) (*Service, error) {
parameters, err := parseAndCheckParameters(params...)
if err != nil {
return nil, errors.Wrap(err, "problem with parameters")
}
// Set logging.
log = zerologger.With().Str("service", "synccommittees").Str("impl", "standard").Logger().Level(parameters.logLevel)
if err := registerMetrics(ctx, parameters.monitor); err != nil {
return nil, errors.New("failed to register metrics")
}
spec, err := parameters.specProvider.ChainSpec(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain spec")
}
var epochsPerSyncCommitteePeriod uint64
if tmp, exists := spec["EPOCHS_PER_SYNC_COMMITTEE_PERIOD"]; exists {
tmp2, ok := tmp.(uint64)
if !ok {
return nil, errors.New("EPOCHS_PER_SYNC_COMMITTEE_PERIOD of unexpected type")
}
epochsPerSyncCommitteePeriod = tmp2
}
if epochsPerSyncCommitteePeriod == 0 {
log.Debug().Msg("Beacon chain node does not support Altair; not obtaining sync committees")
return nil, nil
}
syncCommitteesSetter, isSyncCommitteesSetter := parameters.chainDB.(chaindb.SyncCommitteesSetter)
if !isSyncCommitteesSetter {
return nil, errors.New("chain DB does not support sync committee setting")
}
s := &Service{
eventsProvider: parameters.eth2Client.(eth2client.EventsProvider),
syncCommitteesProvider: parameters.eth2Client.(eth2client.SyncCommitteesProvider),
chainDB: parameters.chainDB,
syncCommitteesSetter: syncCommitteesSetter,
chainTime: parameters.chainTime,
activitySem: semaphore.NewWeighted(1),
epochsPerSyncCommitteePeriod: epochsPerSyncCommitteePeriod,
}
// Update to current epoch (synchronous, as sync committee information is needed by blocks).
s.updateAfterRestart(ctx, parameters.startPeriod)
return s, nil
}
func (s *Service) updateAfterRestart(ctx context.Context, startPeriod int64) {
// Work out the period from which to start.
md, err := s.getMetadata(ctx)
if err != nil {
log.Fatal().Err(err).Msg("Failed to obtain metadata before catchup")
}
if startPeriod >= 0 {
// Explicit requirement to start at a given slot.
md.LatestPeriod = startPeriod - 1
}
if s.chainTime.AltairInitialSyncCommitteePeriod() > uint64(md.LatestPeriod) {
md.LatestPeriod = int64(s.chainTime.AltairInitialSyncCommitteePeriod()) - 1
}
log.Info().Int64("period", md.LatestPeriod).Msg("Catching up from period")
s.catchup(ctx, md)
log.Info().Msg("Caught up")
// Set up the handler for new chain head updates.
if err := s.eventsProvider.Events(ctx, []string{"head"}, func(event *apiv1.Event) {
eventData := event.Data.(*apiv1.HeadEvent)
s.OnBeaconChainHeadUpdated(ctx, eventData.Slot)
}); err != nil {
log.Fatal().Err(err).Msg("Failed to add sync chain head updated handler")
}
}
func (s *Service) catchup(ctx context.Context, md *metadata) {
for period := uint64(md.LatestPeriod + 1); period <= s.chainTime.CurrentSyncCommitteePeriod(); period++ {
log := log.With().Uint64("period", period).Logger()
// Each update goes in to its own transaction, to make the data available sooner.
ctx, cancel, err := s.chainDB.BeginTx(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to begin transaction on update after restart")
return
}
if err := s.updateSyncCommitteeForPeriod(ctx, period); err != nil {
log.Warn().Err(err).Msg("Failed to update sync committee")
cancel()
return
}
md.LatestPeriod = int64(period)
if err := s.setMetadata(ctx, md); err != nil {
log.Error().Err(err).Msg("Failed to set metadata")
cancel()
return
}
if err := s.chainDB.CommitTx(ctx); err != nil {
log.Error().Err(err).Msg("Failed to commit transaction")
cancel()
return
}
log.Trace().Msg("Added sync committee")
}
}