This repository has been archived by the owner on May 22, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
registration.go
326 lines (296 loc) · 9.1 KB
/
registration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
package client
import (
"context"
"fmt"
"time"
"github.com/keep-network/keep-common/pkg/chain/ethlike"
corechain "github.com/keep-network/keep-core/pkg/chain"
"github.com/keep-network/keep-ecdsa/pkg/chain"
)
const statusCheckIntervalBlocks = 100
// retryDelay defines the delay between retries related to the registration logic
// that do not have their own specific values (like for example `eligibilityRetryDelay`
// for sortition pool join eligibility checks).
const retryDelay = 1 * time.Second
// eligibilityRetryDelay defines the delay between checks whether the operator
// is eligible to join the sortition pool.
const eligibilityRetryDelay = 20 * time.Minute
// checkStatusAndRegisterForApplication checks whether the operator is
// registered as a member candidate for keep for the given application.
// If not, checks operators's eligibility and retries until the operator is
// eligible. Eventually, once the operator is eligible, it is registered
// as a keep member candidate.
// Also, once the client is confirmed as registered, it triggers the monitoring
// process to keep the operator's status up to date in the pool.
// If operator status in the pool cannot be monitored, e.g. when operator is
// removed from the pool it triggers the registration process from the begining.
func checkStatusAndRegisterForApplication(
ctx context.Context,
blockCounter corechain.BlockCounter,
application chain.BondedECDSAKeepApplicationHandle,
) {
RegistrationLoop:
for {
select {
case <-ctx.Done():
return
default:
isRegistered, err := application.IsRegisteredForApplication()
if err != nil {
logger.Errorf(
"failed to check if member is registered for application [%s]: [%v]",
application.ID(),
err,
)
time.Sleep(retryDelay) // TODO: #413 Replace with backoff.
continue RegistrationLoop
}
if !isRegistered {
// if the operator is not registered, we need to register it and
// wait until registration is confirmed
registerAsMemberCandidate(ctx, blockCounter, application)
waitUntilRegistered(ctx, blockCounter, application)
}
// once the registration is confirmed or if the client is already
// registered, we can start to monitor the status
if err := monitorSignerPoolStatus(ctx, blockCounter, application); err != nil {
logger.Errorf(
"failed on signer pool status monitoring; please inspect "+
"signer's unbonded value and stake: [%v]",
err,
)
time.Sleep(retryDelay) // TODO: #413 Replace with backoff.
continue RegistrationLoop
}
}
}
}
// registerAsMemberCandidate checks current operator's eligibility to become
// keep member candidate for the given application and if it is positive,
// registers the operator as a keep member candidate for the given application.
// If the operator is not eligible, it executes the check for each new mined
// block until the operator is finally eligible and can be registered.
func registerAsMemberCandidate(
parentCtx context.Context,
blockCounter corechain.BlockCounter,
application chain.BondedECDSAKeepApplicationHandle,
) {
// If the operator is eligible right now for registering as a member
// candidate for the application, we register the operator.
isEligible, err := application.IsEligibleForApplication()
if err != nil {
logger.Errorf(
"failed to check operator eligibility for application [%s]: [%v]",
application.ID(),
err,
)
}
if isEligible {
logger.Infof(
"registering member candidate for application [%s]",
application.ID(),
)
err := application.RegisterAsMemberCandidate()
if err != nil {
logger.Errorf(
"failed to register member candidate for application [%s]: [%v]",
application.ID(),
err,
)
} else {
return
}
}
// If the operator is not yet eligible to be registered as a member candidate
// for the application, we start monitoring eligibility each now block.
// We do the same in case the registration of eligible operator failed for
// some reason. As soon as the operator is eligible, we will proceed with
// the registration.
registerAsMemberCandidateWhenEligible(parentCtx, blockCounter, application)
}
// registerAsMemberCandidateWhenEligible for each new block checks the operator's
// eligibility to be registered as a keep member candidate for the application.
// As soon as the operator becomes eligible, function triggers the registration.
//
// TODO Move this to application.RegisterAsMemberCandidateWhenEligible() <-chan error?
func registerAsMemberCandidateWhenEligible(
parentCtx context.Context,
blockCounter corechain.BlockCounter,
application chain.BondedECDSAKeepApplicationHandle,
) {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
newBlockChan := blockCounter.WatchBlocks(ctx)
for {
select {
case <-newBlockChan:
isEligible, err := application.IsEligibleForApplication()
if err != nil {
logger.Errorf(
"failed to check operator eligibility for application [%s]: [%v]",
application.ID(),
err,
)
time.Sleep(retryDelay) // TODO: #413 Replace with backoff.
continue
}
if !isEligible {
// if the operator is not yet eligible wait for the next
// block and execute the check again
logger.Warningf(
"operator is not eligible for application [%s]",
application.ID(),
)
time.Sleep(eligibilityRetryDelay) // TODO: #413 Replace with backoff.
continue
}
// if the operator is eligible, register it as a keep member
// candidate for this application
logger.Infof(
"registering member candidate for application [%s]",
application.ID(),
)
if err := application.RegisterAsMemberCandidate(); err != nil {
logger.Errorf(
"failed to register member candidate for application [%s]: [%v]",
application.ID(),
err,
)
time.Sleep(retryDelay) // TODO: #413 Replace with backoff.
continue
}
// we cancel the context in case the registration was successful,
// we don't want to do it again
cancel()
case <-ctx.Done():
return
}
}
}
// waitUntilRegistered blocks until the operator is registered as a keep member
// candidate for the given application.
func waitUntilRegistered(
ctx context.Context,
blockCounter corechain.BlockCounter,
application chain.BondedECDSAKeepApplicationHandle,
) {
newBlockChan := blockCounter.WatchBlocks(ctx)
for {
select {
case <-newBlockChan:
isRegistered, err := application.IsRegisteredForApplication()
if err != nil {
logger.Errorf(
"failed to check if member is registered for application [%s]: [%v]",
application.ID(),
err,
)
time.Sleep(retryDelay) // TODO: #413 Replace with backoff.
continue
}
if isRegistered {
logger.Infof(
"operator is registered for application [%s]",
application.ID(),
)
return
}
logger.Infof(
"operator is not yet registered for application [%s], waiting...",
application.ID(),
)
case <-ctx.Done():
return
}
}
}
// monitorSignerPoolStatus tracks operator's state in the signing pool
// (staking weight, bonding) and updates the status when it gets out of date.
func monitorSignerPoolStatus(
ctx context.Context,
blockCounter corechain.BlockCounter,
application chain.BondedECDSAKeepApplicationHandle,
) error {
logger.Infof(
"starting monitoring operatator status for application [%s]",
application.ID(),
)
startingBlock, err := blockCounter.CurrentBlock()
if err != nil {
return err
}
statusCheckTrigger, err := blockCounter.BlockHeightWaiter(
startingBlock + statusCheckIntervalBlocks,
)
if err != nil {
return err
}
for {
select {
case statusCheckBlock := <-statusCheckTrigger:
logger.Debugf(
"operator status check for application [%s] "+
"triggered at block [%v]",
application.ID(),
statusCheckBlock,
)
isUpToDate, err := application.IsStatusUpToDateForApplication()
if err != nil {
return fmt.Errorf(
"failed to check operator status for application [%s]: [%v]",
application.ID(),
err,
)
}
if isUpToDate {
logger.Debugf(
"operator status is up to date for application [%s]",
application.ID(),
)
} else {
logger.Infof(
"updating operator status for application [%s]",
application.ID(),
)
err := application.UpdateStatusForApplication()
if err != nil {
return fmt.Errorf(
"failed to update operator status for application [%s]: [%v]",
application.ID(),
err,
)
}
isRegistered, err := ethlike.WaitForBlockConfirmations(
blockCounter,
statusCheckBlock,
blockConfirmations,
func() (bool, error) {
return application.IsRegisteredForApplication()
},
)
if err != nil {
return fmt.Errorf(
"failed to confirm that operator is registered "+
"for application [%s]: [%v]",
application.ID(),
err,
)
}
if !isRegistered {
return fmt.Errorf(
"operator is no longer registered for application [%s]",
application.ID(),
)
}
}
statusCheckTrigger, err = blockCounter.BlockHeightWaiter(
statusCheckBlock + statusCheckIntervalBlocks,
)
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}