-
Notifications
You must be signed in to change notification settings - Fork 19
/
service.go
212 lines (177 loc) · 6.44 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package epochtime
import (
"context"
"time"
"code.vegaprotocol.io/vega/core/events"
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/protos/vega"
eventspb "code.vegaprotocol.io/vega/protos/vega/events/v1"
"code.vegaprotocol.io/vega/libs/proto"
)
type Broker interface {
Send(e events.Event)
}
// Svc represents the Service managing epoch inside Vega.
type Svc struct {
config Config
length time.Duration
epoch types.Epoch
listeners []func(context.Context, types.Epoch) // for when the epoch state changes
restoreListeners []func(context.Context, types.Epoch) // for when the epoch has been restored from a snapshot
log *logging.Logger
broker Broker
readyToStartNewEpoch bool
readyToEndEpoch bool
// Snapshot state
state *types.EpochState
pl types.Payload
data []byte
currentTime time.Time
needsFastForward bool
}
// NewService instantiates a new epochtime service.
func NewService(l *logging.Logger, conf Config, broker Broker) *Svc {
s := &Svc{
config: conf,
log: l,
broker: broker,
readyToStartNewEpoch: false,
readyToEndEpoch: false,
}
s.state = &types.EpochState{}
s.pl = types.Payload{
Data: &types.PayloadEpoch{
EpochState: s.state,
},
}
return s
}
// ReloadConf reload the configuration for the epochtime service.
func (s *Svc) ReloadConf(conf Config) {
// do nothing here, conf is not used for now
}
// OnBlockEnd handles a callback from the abci when the block ends.
func (s *Svc) OnBlockEnd(ctx context.Context) {
if s.readyToEndEpoch {
s.readyToStartNewEpoch = true
s.readyToEndEpoch = false
}
}
// NB: An epoch is ended when the first block that exceeds the expiry of the current epoch ends. As onTick is called from onBlockStart - to make epoch continuous
// and avoid no man's epoch - once we get the first block past expiry we mark get ready to end the epoch. Once we get the on block end callback we're setting
// the flag to be ready to start a new block on the next onTick (i.e. preceding the beginning of the next block). Once we get the next block's on tick we close
// the epoch and notify on its end and start a new epoch (with incremented sequence) and notify about it.
func (s *Svc) OnTick(ctx context.Context, t time.Time) {
if t.IsZero() {
// We haven't got a block time yet, ignore
return
}
if s.needsFastForward && t.Equal(s.currentTime) {
s.log.Debug("onTick called with the same time again", logging.Time("tick-time", t))
return
}
s.currentTime = t
if s.needsFastForward {
s.log.Info("fast forwarding epoch starts", logging.Uint64("from-epoch", s.epoch.Seq), logging.Time("at", t))
s.needsFastForward = false
s.fastForward(ctx)
s.currentTime = t
s.log.Info("fast forwarding epochs ended", logging.Uint64("current-epoch", s.epoch.Seq))
}
if s.epoch.StartTime.IsZero() {
// First block so let's create our first epoch
s.epoch.Seq = 0
s.epoch.StartTime = t
s.epoch.ExpireTime = t.Add(s.length) // current time + epoch length
s.epoch.Action = vega.EpochAction_EPOCH_ACTION_START
// Send out new epoch event
s.notify(ctx, s.epoch)
return
}
if s.readyToStartNewEpoch {
// close previous epoch and send an event
s.epoch.EndTime = t
s.epoch.Action = vega.EpochAction_EPOCH_ACTION_END
s.notify(ctx, s.epoch)
// Move the epoch details forward
s.epoch.Seq++
s.readyToStartNewEpoch = false
// Create a new epoch
s.epoch.StartTime = t
s.epoch.ExpireTime = t.Add(s.length) // now + epoch length
s.epoch.EndTime = time.Time{}
s.epoch.Action = vega.EpochAction_EPOCH_ACTION_START
s.notify(ctx, s.epoch)
return
}
// if the block time is past the expiry - this is the last block to go into the epoch - when the block ends we end the epoch and start a new one
if s.epoch.ExpireTime.Before(t) {
// Set the flag to tell us to end the epoch when the block ends
s.readyToEndEpoch = true
return
}
}
func (*Svc) Name() types.CheckpointName {
return types.EpochCheckpoint
}
func (s *Svc) Checkpoint() ([]byte, error) {
return proto.Marshal(s.epoch.IntoProto())
}
func (s *Svc) Load(ctx context.Context, data []byte) error {
pb := &eventspb.EpochEvent{}
if err := proto.Unmarshal(data, pb); err != nil {
return err
}
e := types.NewEpochFromProto(pb)
s.epoch = *e
// let the time end the epoch organically
s.readyToStartNewEpoch = false
s.readyToEndEpoch = false
s.notify(ctx, s.epoch)
s.needsFastForward = true
return nil
}
// fastForward advances time and expires/starts any epoch that would have expired/started during the time period. It would trigger the epoch events naturally
// so will have a side effect of delegations getting promoted and rewards getting calculated and potentially paid.
func (s *Svc) fastForward(ctx context.Context) {
tt := s.currentTime
for s.epoch.ExpireTime.Before(tt) {
s.OnBlockEnd(ctx)
s.OnTick(ctx, s.epoch.ExpireTime.Add(1*time.Second))
}
s.OnTick(ctx, tt)
}
// NotifyOnEpoch allows other services to register 2 callback functions.
// The first will be called once we enter or leave a new epoch, and the second
// will be called when the epoch service has been restored from a snapshot.
func (s *Svc) NotifyOnEpoch(f func(context.Context, types.Epoch), r func(context.Context, types.Epoch)) {
s.listeners = append(s.listeners, f)
s.restoreListeners = append(s.restoreListeners, r)
}
func (s *Svc) notify(ctx context.Context, e types.Epoch) {
// Push this updated epoch message onto the event bus
s.broker.Send(events.NewEpochEvent(ctx, &e))
for _, f := range s.listeners {
f(ctx, e)
}
}
func (s *Svc) OnEpochLengthUpdate(ctx context.Context, l time.Duration) error {
s.length = l
// @TODO down the line, we ought to send an event signaling a change in epoch length
return nil
}