/
scheduler.go
136 lines (111 loc) · 3.71 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package byzantine
import (
"bytes"
"context"
"fmt"
tmtypes "github.com/tendermint/tendermint/types"
"github.com/oasislabs/oasis-core/go/common"
"github.com/oasislabs/oasis-core/go/common/cbor"
"github.com/oasislabs/oasis-core/go/common/crypto/signature"
"github.com/oasislabs/oasis-core/go/common/node"
schedulerapp "github.com/oasislabs/oasis-core/go/consensus/tendermint/apps/scheduler"
"github.com/oasislabs/oasis-core/go/consensus/tendermint/service"
scheduler "github.com/oasislabs/oasis-core/go/scheduler/api"
"github.com/oasislabs/oasis-core/go/worker/common/p2p"
)
func schedulerNextElectionHeight(svc service.TendermintService, kind scheduler.CommitteeKind) (int64, error) {
sub, err := svc.Subscribe("script", schedulerapp.QueryApp)
if err != nil {
return 0, fmt.Errorf("Tendermint Subscribe error: %w", err)
}
defer svc.Unsubscribe("script", schedulerapp.QueryApp) // nolint: errcheck
for {
ev := (<-sub.Out()).Data().(tmtypes.EventDataNewBlock)
for _, tmEv := range ev.ResultBeginBlock.GetEvents() {
if tmEv.GetType() != schedulerapp.EventType {
continue
}
for _, pair := range tmEv.GetAttributes() {
if bytes.Equal(pair.GetKey(), schedulerapp.KeyElected) {
var kinds []scheduler.CommitteeKind
if err := cbor.Unmarshal(pair.GetValue(), &kinds); err != nil {
return 0, fmt.Errorf("CBOR Unmarshal kinds error: %w", err)
}
for _, k := range kinds {
if k == kind {
return ev.Block.Header.Height, nil
}
}
}
}
}
}
}
func schedulerGetCommittee(ht *honestTendermint, height int64, kind scheduler.CommitteeKind, runtimeID common.Namespace) (*scheduler.Committee, error) {
committees, err := ht.service.Scheduler().GetCommittees(context.Background(), &scheduler.GetCommitteesRequest{
RuntimeID: runtimeID,
Height: height,
})
if err != nil {
return nil, fmt.Errorf("Scheduler GetCommittees() error: %w", err)
}
for _, committee := range committees {
if committee.Kind != kind {
continue
}
if !committee.RuntimeID.Equal(&runtimeID) {
continue
}
return committee, nil
}
return nil, fmt.Errorf("query didn't return a committee for our runtime")
}
func schedulerCheckScheduled(committee *scheduler.Committee, nodeID signature.PublicKey, role scheduler.Role) error {
for _, member := range committee.Members {
if !member.PublicKey.Equal(nodeID) {
continue
}
if member.Role != role {
return fmt.Errorf("we're scheduled as %s, expected %s", member.Role, role)
}
// All good.
return nil
}
return fmt.Errorf("we're not scheduled")
}
func schedulerCheckNotScheduled(committee *scheduler.Committee, nodeID signature.PublicKey) error {
for _, member := range committee.Members {
if !member.PublicKey.Equal(nodeID) {
continue
}
return fmt.Errorf("we're scheduled as %s", member.Role)
}
// All good.
return nil
}
func schedulerForRoleInCommittee(ht *honestTendermint, height int64, committee *scheduler.Committee, role scheduler.Role, fn func(*node.Node) error) error {
for _, member := range committee.Members {
if member.Role != role {
continue
}
n, err := registryGetNode(ht, height, member.PublicKey)
if err != nil {
return fmt.Errorf("registry get node %s error: %w", member.PublicKey, err)
}
if err = fn(n); err != nil {
// Forward callback error to caller verbatim.
return err
}
}
return nil
}
func schedulerPublishToCommittee(ht *honestTendermint, height int64, committee *scheduler.Committee, role scheduler.Role, ph *p2pHandle, message *p2p.Message) error {
if err := schedulerForRoleInCommittee(ht, height, committee, role, func(n *node.Node) error {
ph.service.Publish(ph.context, n, message)
return nil
}); err != nil {
return err
}
ph.service.Flush()
return nil
}