-
Notifications
You must be signed in to change notification settings - Fork 107
/
scheduler.go
635 lines (564 loc) · 19.8 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
package scheduler
import (
"bytes"
"crypto"
"fmt"
"math/rand"
"sort"
"github.com/tendermint/tendermint/abci/types"
beacon "github.com/oasisprotocol/oasis-core/go/beacon/api"
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/crypto/drbg"
"github.com/oasisprotocol/oasis-core/go/common/crypto/mathrand"
"github.com/oasisprotocol/oasis-core/go/common/crypto/signature"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/quantity"
"github.com/oasisprotocol/oasis-core/go/consensus/api/transaction"
"github.com/oasisprotocol/oasis-core/go/consensus/tendermint/api"
beaconapp "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/beacon"
beaconState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/beacon/state"
governanceApi "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/governance/api"
registryapp "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/registry"
registryState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/registry/state"
schedulerApi "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/scheduler/api"
schedulerState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/scheduler/state"
stakingapp "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/staking"
stakingState "github.com/oasisprotocol/oasis-core/go/consensus/tendermint/apps/staking/state"
registry "github.com/oasisprotocol/oasis-core/go/registry/api"
scheduler "github.com/oasisprotocol/oasis-core/go/scheduler/api"
staking "github.com/oasisprotocol/oasis-core/go/staking/api"
)
var (
_ api.Application = (*schedulerApplication)(nil)
RNGContextExecutor = []byte("EkS-ABCI-Compute")
RNGContextValidators = []byte("EkS-ABCI-Validators")
RNGContextEntities = []byte("EkS-ABCI-Entities")
RNGContextRoleWorker = []byte("Worker")
RNGContextRoleBackupWorker = []byte("Backup-Worker")
)
type schedulerApplication struct {
state api.ApplicationState
md api.MessageDispatcher
}
func (app *schedulerApplication) Name() string {
return AppName
}
func (app *schedulerApplication) ID() uint8 {
return AppID
}
func (app *schedulerApplication) Methods() []transaction.MethodName {
return nil
}
func (app *schedulerApplication) Blessed() bool {
return true
}
func (app *schedulerApplication) Dependencies() []string {
return []string{beaconapp.AppName, registryapp.AppName, stakingapp.AppName}
}
func (app *schedulerApplication) OnRegister(state api.ApplicationState, md api.MessageDispatcher) {
app.state = state
app.md = md
// Subscribe to messages emitted by other apps.
md.Subscribe(governanceApi.MessageChangeParameters, app)
md.Subscribe(governanceApi.MessageValidateParameterChanges, app)
}
func (app *schedulerApplication) OnCleanup() {}
func (app *schedulerApplication) BeginBlock(ctx *api.Context, request types.RequestBeginBlock) error {
// Check if any stake slashing has occurred in the staking layer.
// NOTE: This will NOT trigger for any slashing that happens as part of
// any transactions being submitted to the chain.
slashed := ctx.HasEvent(stakingapp.AppName, &staking.TakeEscrowEvent{})
// Check if epoch has changed.
// TODO: We'll later have this for each type of committee.
epochChanged, epoch := app.state.EpochChanged(ctx)
if epochChanged || slashed {
// Notify applications that we are going to schedule committees.
_, err := app.md.Publish(ctx, schedulerApi.MessageBeforeSchedule, epoch)
if err != nil {
return fmt.Errorf("tendermint/scheduler: before schedule notification failed: %w", err)
}
// The 0th epoch will not have suitable entropy for elections, nor
// will it have useful node registrations.
baseEpoch, err := app.state.GetBaseEpoch()
if err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't get base epoch: %w", err)
}
if epoch == baseEpoch {
ctx.Logger().Info("system in bootstrap period, skipping election",
"epoch", epoch,
)
return nil
}
state := schedulerState.NewMutableState(ctx.State())
params, err := state.ConsensusParameters(ctx)
if err != nil {
ctx.Logger().Error("failed to fetch consensus parameters",
"err", err,
)
return err
}
beaconState := beaconState.NewMutableState(ctx.State())
beaconParameters, err := beaconState.ConsensusParameters(ctx)
if err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't get beacon parameters: %w", err)
}
// If weak alphas are allowed then skip the eligibility check as
// well because the byzantine node and associated tests are extremely
// fragile, and breaks in hard-to-debug ways if timekeeping isn't
// exactly how it expects.
filterCommitteeNodes := beaconParameters.Backend == beacon.BackendVRF && !params.DebugAllowWeakAlpha
regState := registryState.NewMutableState(ctx.State())
registryParameters, err := regState.ConsensusParameters(ctx)
if err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't get registry parameters: %w", err)
}
runtimes, err := regState.Runtimes(ctx)
if err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't get runtimes: %w", err)
}
allNodes, err := regState.Nodes(ctx)
if err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't get nodes: %w", err)
}
// Filter nodes.
var (
nodes []*node.Node
committeeNodes []*nodeWithStatus
)
for _, node := range allNodes {
var status *registry.NodeStatus
status, err = regState.NodeStatus(ctx, node.ID)
if err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't get node status: %w", err)
}
// Nodes which are currently frozen cannot be scheduled.
if status.IsFrozen() {
continue
}
// Expired nodes cannot be scheduled (nodes can be expired and not yet removed).
if node.IsExpired(uint64(epoch)) {
continue
}
nodes = append(nodes, node)
if !filterCommitteeNodes || (status.ElectionEligibleAfter != beacon.EpochInvalid && epoch > status.ElectionEligibleAfter) {
committeeNodes = append(committeeNodes, &nodeWithStatus{node, status})
}
}
var stakeAcc *stakingState.StakeAccumulatorCache
if !params.DebugBypassStake {
stakeAcc, err = stakingState.NewStakeAccumulatorCache(ctx)
if err != nil {
return fmt.Errorf("tendermint/scheduler: failed to create stake accumulator cache: %w", err)
}
defer stakeAcc.Discard()
}
var entitiesEligibleForReward map[staking.Address]bool
if epochChanged {
// For elections on epoch changes, distribute rewards to entities with any eligible nodes.
entitiesEligibleForReward = make(map[staking.Address]bool)
}
// Handle the validator election first, because no consensus is
// catastrophic, while failing to elect other committees is not.
var validatorEntities map[staking.Address]bool
if validatorEntities, err = app.electValidators(
ctx,
app.state,
beaconState,
beaconParameters,
stakeAcc,
entitiesEligibleForReward,
nodes,
params,
); err != nil {
// It is unclear what the behavior should be if the validator
// election fails. The system can not ensure integrity, so
// presumably manual intervention is required...
return fmt.Errorf("tendermint/scheduler: couldn't elect validators: %w", err)
}
kinds := []scheduler.CommitteeKind{
scheduler.KindComputeExecutor,
}
for _, kind := range kinds {
if err = app.electAllCommittees(
ctx,
app.state,
params,
beaconState,
beaconParameters,
registryParameters,
stakeAcc,
entitiesEligibleForReward,
validatorEntities,
runtimes,
committeeNodes,
kind,
); err != nil {
return fmt.Errorf("tendermint/scheduler: couldn't elect %s committees: %w", kind, err)
}
}
ctx.EmitEvent(api.NewEventBuilder(app.Name()).TypedAttribute(&scheduler.ElectedEvent{Kinds: kinds}))
var kindNames []string
for _, kind := range kinds {
kindNames = append(kindNames, kind.String())
}
var runtimeIDs []string
for _, rt := range runtimes {
runtimeIDs = append(runtimeIDs, rt.ID.String())
}
ctx.Logger().Debug("finished electing committees",
"epoch", epoch,
"kinds", kindNames,
"runtimes", runtimeIDs,
)
if entitiesEligibleForReward != nil {
accountAddrs := stakingAddressMapToSortedSlice(entitiesEligibleForReward)
stakingSt := stakingState.NewMutableState(ctx.State())
if err = stakingSt.AddRewards(ctx, epoch, ¶ms.RewardFactorEpochElectionAny, accountAddrs); err != nil {
return fmt.Errorf("tendermint/scheduler: failed to add rewards: %w", err)
}
}
}
return nil
}
func (app *schedulerApplication) ExecuteMessage(ctx *api.Context, kind, msg interface{}) (interface{}, error) {
switch kind {
case governanceApi.MessageValidateParameterChanges:
// A change parameters proposal is about to be submitted. Validate changes.
return app.changeParameters(ctx, msg, false)
case governanceApi.MessageChangeParameters:
// A change parameters proposal has just been accepted and closed. Validate and apply
// changes.
return app.changeParameters(ctx, msg, true)
default:
return nil, fmt.Errorf("tendermint/scheduler: unexpected message")
}
}
func (app *schedulerApplication) ExecuteTx(ctx *api.Context, tx *transaction.Transaction) error {
return fmt.Errorf("tendermint/scheduler: unexpected transaction")
}
func diffValidators(logger *logging.Logger, current, pending map[signature.PublicKey]int64) []types.ValidatorUpdate {
var updates []types.ValidatorUpdate
for v := range current {
if _, ok := pending[v]; !ok {
// Existing validator is not part of the new set, reduce its
// voting power to 0, to indicate removal.
logger.Debug("removing existing validator from validator set",
"id", v,
)
updates = append(updates, api.PublicKeyToValidatorUpdate(v, 0))
}
}
for v, newPower := range pending {
if curPower, ok := current[v]; ok && curPower == newPower {
logger.Debug("keeping existing validator in the validator set",
"id", v,
)
continue
}
// We're adding this validator or changing its power.
logger.Debug("upserting validator to validator set",
"id", v,
"power", newPower,
)
updates = append(updates, api.PublicKeyToValidatorUpdate(v, newPower))
}
return updates
}
func (app *schedulerApplication) EndBlock(ctx *api.Context, req types.RequestEndBlock) (types.ResponseEndBlock, error) {
var resp types.ResponseEndBlock
state := schedulerState.NewMutableState(ctx.State())
pendingValidators, err := state.PendingValidators(ctx)
if err != nil {
return resp, fmt.Errorf("scheduler/tendermint: failed to query pending validators: %w", err)
}
if pendingValidators == nil {
// No validator updates to apply.
return resp, nil
}
currentValidators, err := state.CurrentValidators(ctx)
if err != nil {
return resp, fmt.Errorf("scheduler/tendermint: failed to query current validators: %w", err)
}
// Clear out the pending validator update.
if err = state.PutPendingValidators(ctx, nil); err != nil {
return resp, fmt.Errorf("scheduler/tendermint: failed to clear validators: %w", err)
}
// Tendermint expects a vector of ValidatorUpdate that expresses
// the difference between the current validator set (tracked manually
// from InitChain), and the new validator set, which is a huge pain
// in the ass.
resp.ValidatorUpdates = diffValidators(ctx.Logger(), currentValidators, pendingValidators)
// Stash the updated validator set.
if err = state.PutCurrentValidators(ctx, pendingValidators); err != nil {
return resp, fmt.Errorf("scheduler/tendermint: failed to set validators: %w", err)
}
return resp, nil
}
func (app *schedulerApplication) isSuitableExecutorWorker(
ctx *api.Context,
n *nodeWithStatus,
rt *registry.Runtime,
epoch beacon.EpochTime,
registryParams *registry.ConsensusParameters,
) bool {
if !n.node.HasRoles(node.RoleComputeWorker) {
return false
}
activeDeployment := rt.ActiveDeployment(epoch)
if activeDeployment == nil {
return false
}
for _, nrt := range n.node.Runtimes {
if !nrt.ID.Equal(&rt.ID) {
continue
}
if nrt.Version.ToU64() != activeDeployment.Version.ToU64() {
continue
}
if n.status.IsSuspended(rt.ID, epoch) {
return false
}
switch rt.TEEHardware {
case node.TEEHardwareInvalid:
if nrt.Capabilities.TEE != nil {
return false
}
return true
default:
if nrt.Capabilities.TEE == nil {
return false
}
if nrt.Capabilities.TEE.Hardware != rt.TEEHardware {
return false
}
if err := nrt.Capabilities.TEE.Verify(
registryParams.TEEFeatures,
ctx.Now(),
uint64(ctx.BlockHeight()),
activeDeployment.TEE,
n.node.ID,
); err != nil {
ctx.Logger().Warn("failed to verify node TEE attestaion",
"err", err,
"node_id", n.node.ID,
"timestamp", ctx.Now(),
"runtime", rt.ID,
)
return false
}
return true
}
}
return false
}
// GetPerm generates a permutation that we use to choose nodes from a list of eligible nodes to elect.
func GetPerm(beacon []byte, runtimeID common.Namespace, rngCtx []byte, nrNodes int) ([]int, error) {
drbg, err := drbg.New(crypto.SHA512, beacon, runtimeID[:], rngCtx)
if err != nil {
return nil, fmt.Errorf("tendermint/scheduler: couldn't instantiate DRBG: %w", err)
}
rng := rand.New(mathrand.New(drbg))
return rng.Perm(nrNodes), nil
}
// Operates on consensus connection.
func (app *schedulerApplication) electAllCommittees(
ctx *api.Context,
appState api.ApplicationQueryState,
schedulerParameters *scheduler.ConsensusParameters,
beaconState *beaconState.MutableState,
beaconParameters *beacon.ConsensusParameters,
registryParameters *registry.ConsensusParameters,
stakeAcc *stakingState.StakeAccumulatorCache,
entitiesEligibleForReward map[staking.Address]bool,
validatorEntities map[staking.Address]bool,
runtimes []*registry.Runtime,
nodeList []*nodeWithStatus,
kind scheduler.CommitteeKind,
) error {
for _, runtime := range runtimes {
if err := app.electCommittee(
ctx,
appState,
schedulerParameters,
beaconState,
beaconParameters,
registryParameters,
stakeAcc,
entitiesEligibleForReward,
validatorEntities,
runtime,
nodeList,
kind,
); err != nil {
return err
}
}
return nil
}
func (app *schedulerApplication) electValidators(
ctx *api.Context,
appState api.ApplicationQueryState,
beaconState *beaconState.MutableState,
beaconParameters *beacon.ConsensusParameters,
stakeAcc *stakingState.StakeAccumulatorCache,
entitiesEligibleForReward map[staking.Address]bool,
nodes []*node.Node,
params *scheduler.ConsensusParameters,
) (map[staking.Address]bool, error) {
// Filter the node list based on eligibility and minimum required
// entity stake.
var nodeList []*node.Node
entities := make(map[staking.Address]bool)
for _, n := range nodes {
if !n.HasRoles(node.RoleValidator) {
continue
}
entAddr := staking.NewAddress(n.EntityID)
if stakeAcc != nil {
if err := stakeAcc.CheckStakeClaims(entAddr); err != nil {
continue
}
}
nodeList = append(nodeList, n)
entities[entAddr] = true
}
// Sort all of the entities that are actually running eligible validator
// nodes by descending stake.
weakEntropy, err := beaconState.Beacon(ctx)
if err != nil {
return nil, fmt.Errorf("tendermint/scheduler: couldn't get beacon: %w", err)
}
sortedEntities, err := stakingAddressMapToSliceByStake(entities, stakeAcc, weakEntropy)
if err != nil {
return nil, err
}
// Shuffle the node list.
shuffledNodes, err := shuffleValidators(ctx, appState, params, beaconState, beaconParameters, nodeList)
if err != nil {
return nil, err
}
// Gather all the entities nodes. If the entity has more than one node,
// ordering will be deterministically random due to the shuffle.
entityNodesMap := make(map[staking.Address][]*node.Node)
for i := range shuffledNodes {
n := shuffledNodes[i] // This is due to the use of append.
entityAddress := staking.NewAddress(n.EntityID)
entityNodes := entityNodesMap[entityAddress]
entityNodes = append(entityNodes, n)
entityNodesMap[entityAddress] = entityNodes
}
// Go down the list of entities running nodes by stake, picking one node
// to act as a validator till the maximum is reached.
validatorEntities := make(map[staking.Address]bool)
newValidators := make(map[signature.PublicKey]int64)
electLoop:
for _, entAddr := range sortedEntities {
nodes := entityNodesMap[entAddr]
// This is usually a maximum of 1, but if more are allowed,
// like in certain test scenarios, then pick as many nodes
// as the entity's stake allows
for i := 0; i < params.MaxValidatorsPerEntity; i++ {
if i >= len(nodes) {
break
}
n := nodes[i]
// If the entity gets a validator elected, it is eligible
// for rewards, but only once regardless of the number
// of validators owned by the entity in the set.
if entitiesEligibleForReward != nil {
entitiesEligibleForReward[entAddr] = true
}
var power int64
if stakeAcc == nil {
// In simplified no-stake deployments, make validators have flat voting power.
power = 1
} else {
var stake *quantity.Quantity
stake, err = stakeAcc.GetEscrowBalance(entAddr)
if err != nil {
return nil, fmt.Errorf("failed to fetch escrow balance for account %s: %w", entAddr, err)
}
power, err = scheduler.VotingPowerFromStake(stake)
if err != nil {
return nil, fmt.Errorf("computing voting power for account %s with balance %v: %w",
entAddr, stake, err,
)
}
}
validatorEntities[entAddr] = true
newValidators[n.Consensus.ID] = power
if len(newValidators) >= params.MaxValidators {
break electLoop
}
}
}
if len(newValidators) == 0 {
return nil, fmt.Errorf("tendermint/scheduler: failed to elect any validators")
}
if len(newValidators) < params.MinValidators {
return nil, fmt.Errorf("tendermint/scheduler: insufficient validators")
}
// Set the new pending validator set in the ABCI state. It needs to be
// applied in EndBlock.
state := schedulerState.NewMutableState(ctx.State())
if err = state.PutPendingValidators(ctx, newValidators); err != nil {
return nil, fmt.Errorf("failed to set pending validators: %w", err)
}
return validatorEntities, nil
}
func stakingAddressMapToSliceByStake(
entMap map[staking.Address]bool,
stakeAcc *stakingState.StakeAccumulatorCache,
beacon []byte,
) ([]staking.Address, error) {
// Convert the map of entity's stake account addresses to a lexicographically
// sorted slice (i.e. make it deterministic).
entities := stakingAddressMapToSortedSlice(entMap)
// Shuffle the sorted slice to make tie-breaks "random".
drbg, err := drbg.New(crypto.SHA512, beacon, nil, RNGContextEntities)
if err != nil {
return nil, fmt.Errorf("tendermint/scheduler: couldn't instantiate DRBG: %w", err)
}
rngSrc := mathrand.New(drbg)
rng := rand.New(rngSrc)
rng.Shuffle(len(entities), func(i, j int) {
entities[i], entities[j] = entities[j], entities[i]
})
if stakeAcc == nil {
return entities, nil
}
// Stable-sort the shuffled slice by descending escrow balance.
var balanceErr error
sort.SliceStable(entities, func(i, j int) bool {
iBal, err := stakeAcc.GetEscrowBalance(entities[i])
if err != nil {
balanceErr = err
return false
}
jBal, err := stakeAcc.GetEscrowBalance(entities[j])
if err != nil {
balanceErr = err
return false
}
return iBal.Cmp(jBal) == 1 // Note: Not -1 to get a reversed sort.
})
if balanceErr != nil {
return nil, fmt.Errorf("failed to fetch escrow balance: %w", balanceErr)
}
return entities, nil
}
func stakingAddressMapToSortedSlice(m map[staking.Address]bool) []staking.Address {
sorted := make([]staking.Address, 0, len(m))
for mk := range m {
sorted = append(sorted, mk)
}
sort.Slice(sorted, func(i, j int) bool {
return bytes.Compare(sorted[i][:], sorted[j][:]) < 0
})
return sorted
}
// New constructs a new scheduler application instance.
func New() api.Application {
return &schedulerApplication{}
}