/
control.go
178 lines (158 loc) · 5.03 KB
/
control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package node
import (
"context"
"time"
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/identity"
consensus "github.com/oasisprotocol/oasis-core/go/consensus/api"
control "github.com/oasisprotocol/oasis-core/go/control/api"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
storage "github.com/oasisprotocol/oasis-core/go/storage/api"
upgrade "github.com/oasisprotocol/oasis-core/go/upgrade/api"
"github.com/oasisprotocol/oasis-core/go/worker/registration"
)
var (
_ control.ControlledNode = (*Node)(nil)
_ registration.Delegate = (*Node)(nil)
)
// Implements registration.Delegate.
func (n *Node) RegistrationStopped() {
n.Stop()
}
// Implements control.ControlledNode.
func (n *Node) RequestShutdown() (<-chan struct{}, error) {
if n.RegistrationWorker == nil {
// In case there is no registration worker, we can just trigger an immediate shutdown.
ch := make(chan struct{})
go func() {
close(ch)
n.RegistrationStopped()
}()
return ch, nil
}
if err := n.RegistrationWorker.RequestDeregistration(); err != nil {
return nil, err
}
// This returns only the registration worker's event channel,
// otherwise the caller (usually the control grpc server) will only
// get notified once everything is already torn down - perhaps
// including the server.
return n.RegistrationWorker.Quit(), nil
}
// Implements control.ControlledNode.
func (n *Node) Ready() <-chan struct{} {
return n.readyCh
}
// Implements control.ControlledNode.
func (n *Node) GetIdentity() *identity.Identity {
return n.Identity
}
// Implements control.ControlledNode.
func (n *Node) GetRegistrationStatus(ctx context.Context) (*control.RegistrationStatus, error) {
if n.RegistrationWorker == nil {
return &control.RegistrationStatus{}, nil
}
return n.RegistrationWorker.GetRegistrationStatus(ctx)
}
// Implements control.ControlledNode.
func (n *Node) GetRuntimeStatus(ctx context.Context) (map[common.Namespace]control.RuntimeStatus, error) {
runtimes := make(map[common.Namespace]control.RuntimeStatus)
// Seed node doesn't have a runtime registry.
if n.RuntimeRegistry == nil {
return runtimes, nil
}
for _, rt := range n.RuntimeRegistry.Runtimes() {
var status control.RuntimeStatus
// Fetch runtime registry descriptor. Do not wait too long for the descriptor to become
// available as otherwise we may be blocked until the node is synced.
dscCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
dsc, err := rt.ActiveDescriptor(dscCtx)
cancel()
switch err {
case nil:
status.Descriptor = dsc
case context.DeadlineExceeded:
// The descriptor may not yet be available. It is fine if we use nil in this case.
default:
n.logger.Error("failed to fetch registry descriptor",
"err", err,
"runtime_id", rt.ID(),
)
}
// Fetch latest block as seen by this node.
blk, err := n.Consensus.RootHash().GetLatestBlock(ctx, &roothash.RuntimeRequest{
RuntimeID: rt.ID(),
Height: consensus.HeightLatest,
})
switch err {
case nil:
status.LatestRound = blk.Header.Round
status.LatestHash = blk.Header.EncodedHash()
status.LatestTime = blk.Header.Timestamp
status.LatestStateRoot = storage.Root{
Namespace: blk.Header.Namespace,
Version: blk.Header.Round,
Type: storage.RootTypeState,
Hash: blk.Header.StateRoot,
}
default:
n.logger.Error("failed to fetch latest runtime block",
"err", err,
"runtime_id", rt.ID(),
)
}
// Fetch latest genesis block as seen by this node.
blk, err = n.Consensus.RootHash().GetGenesisBlock(ctx, &roothash.RuntimeRequest{
RuntimeID: rt.ID(),
Height: consensus.HeightLatest,
})
switch err {
case nil:
status.GenesisRound = blk.Header.Round
status.GenesisHash = blk.Header.EncodedHash()
default:
n.logger.Error("failed to fetch genesis runtime block",
"err", err,
"runtime_id", rt.ID(),
)
}
// Fetch the oldest retained block.
blk, err = rt.History().GetEarliestBlock(ctx)
switch err {
case nil:
status.LastRetainedRound = blk.Header.Round
status.LastRetainedHash = blk.Header.EncodedHash()
default:
n.logger.Error("failed to fetch last retained runtime block",
"err", err,
"runtime_id", rt.ID(),
)
}
// Fetch common committee worker status.
if rtNode := n.CommonWorker.GetRuntime(rt.ID()); rtNode != nil {
status.Committee, err = rtNode.GetStatus(ctx)
if err != nil {
n.logger.Error("failed to fetch common committee worker status",
"err", err,
"runtime_id", rt.ID(),
)
}
}
// Fetch storage worker status.
if storageNode := n.StorageWorker.GetRuntime(rt.ID()); storageNode != nil {
status.Storage, err = storageNode.GetStatus(ctx)
if err != nil {
n.logger.Error("failed to fetch storage worker status",
"err", err,
"runtime_id", rt.ID(),
)
}
}
runtimes[rt.ID()] = status
}
return runtimes, nil
}
// Implements control.ControlledNode.
func (n *Node) GetPendingUpgrades(ctx context.Context) ([]*upgrade.PendingUpgrade, error) {
return n.Upgrader.PendingUpgrades(ctx)
}