-
Notifications
You must be signed in to change notification settings - Fork 279
/
state.go
322 lines (277 loc) · 8.57 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package models
import (
"encoding/json"
"strconv"
"time"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/lindb/lindb/config"
"github.com/lindb/lindb/pkg/encoding"
"github.com/lindb/lindb/pkg/timeutil"
)
type ShardStateType int
const (
UnknownShard ShardStateType = iota
NewShard
OnlineShard
OfflineShard
NonExistentShard
)
const NoLeader NodeID = -1
// NodeStateType represents node state type
type NodeStateType int
const (
NodeOnline NodeStateType = iota + 1
NodeOffline
)
// ClusterStatus represents current cluster config status.
type ClusterStatus int
const (
ClusterStatusUnknown ClusterStatus = iota
ClusterStatusInitialize
ClusterStatusReady
)
// String returns the string value of StorageStatus.
func (s ClusterStatus) String() string {
val := "Unknown"
switch s {
case ClusterStatusInitialize:
val = "Initialize"
case ClusterStatusReady:
val = "Ready"
}
return val
}
// MarshalJSON encodes storage status.
func (s ClusterStatus) MarshalJSON() ([]byte, error) {
val := s.String()
return json.Marshal(&val)
}
// UnmarshalJSON decodes storage status.
func (s *ClusterStatus) UnmarshalJSON(value []byte) error {
switch string(value) {
case `"Initialize"`:
*s = ClusterStatusInitialize
return nil
case `"Ready"`:
*s = ClusterStatusReady
return nil
default:
*s = ClusterStatusUnknown
return nil
}
}
// Storage represents storage config and state.
type Broker struct {
config.BrokerCluster
Status ClusterStatus `json:"status"`
}
// Brokers represents the broker list.
type Brokers []Broker
// ToTable returns broker list as table if it has value, else return empty string.
func (s Brokers) ToTable() (rows int, tableStr string) {
if len(s) == 0 {
return 0, ""
}
writer := NewTableFormatter()
writer.AppendHeader(table.Row{"Namespace", "Status", "Configuration"})
for i := range s {
r := s[i]
writer.AppendRow(table.Row{
r.Config.Namespace,
r.Status.String(),
r.Config.String(),
})
}
return len(s), writer.Render()
}
// Storages represents the storage list.
type Storages []Storage
// ToTable returns storage list as table if it has value, else return empty string.
func (s Storages) ToTable() (rows int, tableStr string) {
if len(s) == 0 {
return 0, ""
}
writer := NewTableFormatter()
writer.AppendHeader(table.Row{"Namespace", "Status", "Configuration"})
for i := range s {
r := s[i]
writer.AppendRow(table.Row{
r.Config.Namespace,
r.Status.String(),
r.Config.String(),
})
}
return len(s), writer.Render()
}
// Storage represents storage config and state.
type Storage struct {
config.StorageCluster
Status ClusterStatus `json:"status"`
}
// ReplicaState represents the relationship for a replica.
type ReplicaState struct {
Database string `json:"database"`
ShardID ShardID `json:"shardId"`
Leader NodeID `json:"leader"`
Follower NodeID `json:"follower"`
FamilyTime int64 `json:"familyTime"`
}
// String returns the string value of ReplicaState.
func (r ReplicaState) String() string {
return "[" +
"database:" + r.Database +
",shard:" + strconv.Itoa(int(r.ShardID)) +
",family:" + timeutil.FormatTimestamp(r.FamilyTime, timeutil.DataTimeFormat4) +
",from(leader):" + strconv.Itoa(int(r.Leader)) +
",to(follower):" + strconv.Itoa(int(r.Follower)) +
"]"
}
// ShardState represents current state of shard.
type ShardState struct {
ID ShardID `json:"id"`
State ShardStateType `json:"state"`
Leader NodeID `json:"leader"`
Replica Replica `json:"replica"`
}
// FamilyState represents current state of shard's family.
type FamilyState struct {
Database string `json:"database"`
Shard ShardState `json:"shard"`
FamilyTime int64 `json:"familyTime"`
}
// BrokerState represents broker cluster state.
// NOTICE: it is not safe for concurrent use.
type BrokerState struct {
Name string `json:"name"` // ref Namespace
LiveNodes map[string]StatelessNode `json:"liveNodes"`
}
func NewBrokerState(name string) *BrokerState {
return &BrokerState{
Name: name,
LiveNodes: make(map[string]StatelessNode),
}
}
// GetLiveNodes returns all live node list.
func (b *BrokerState) GetLiveNodes() (rs []StatelessNode) {
for _, node := range b.LiveNodes {
rs = append(rs, node)
}
return
}
// NodeOnline adds a live node into node list.
func (b *BrokerState) NodeOnline(nodeID string, node StatelessNode) {
b.LiveNodes[nodeID] = node
}
// NodeOffline removes a offline node from live node list.
func (b *BrokerState) NodeOffline(nodeID string) {
delete(b.LiveNodes, nodeID)
}
// StorageState represents storage cluster state.
// NOTICE: it is not safe for concurrent use.
// TODO: need concurrent safe????
type StorageState struct {
Name string `json:"name"` // ref Namespace
LiveNodes map[NodeID]StatefulNode `json:"liveNodes"`
// TODO remove??
ShardAssignments map[string]*ShardAssignment `json:"shardAssignments"` // database's name => shard assignment
ShardStates map[string]map[ShardID]ShardState `json:"shardStates"` // database's name => shard state
}
// NewStorageState creates storage cluster state
func NewStorageState(name string) *StorageState {
return &StorageState{
Name: name,
LiveNodes: make(map[NodeID]StatefulNode),
ShardAssignments: make(map[string]*ShardAssignment),
ShardStates: make(map[string]map[ShardID]ShardState),
}
}
// LeadersOnNode returns leaders on this node.
func (s *StorageState) LeadersOnNode(nodeID NodeID) map[string][]ShardID {
result := make(map[string][]ShardID)
for name, shards := range s.ShardStates {
for shardID, shard := range shards {
if shard.Leader == nodeID {
result[name] = append(result[name], shardID)
}
}
}
return result
}
// ReplicasOnNode returns replicas on this node.
func (s *StorageState) ReplicasOnNode(nodeID NodeID) map[string][]ShardID {
result := make(map[string][]ShardID)
for name, shardAssignment := range s.ShardAssignments {
shards := shardAssignment.Shards
for shardID, replicas := range shards {
if replicas.Contain(nodeID) {
result[name] = append(result[name], shardID)
}
}
}
return result
}
// DropDatabase drops shard state/assignment by database's name.
func (s *StorageState) DropDatabase(name string) {
delete(s.ShardStates, name)
delete(s.ShardAssignments, name)
}
// NodeOnline adds a live node into node list.
func (s *StorageState) NodeOnline(node StatefulNode) {
s.LiveNodes[node.ID] = node
}
// NodeOffline removes a offline node from live node list.
func (s *StorageState) NodeOffline(nodeID NodeID) {
delete(s.LiveNodes, nodeID)
}
// Stringer returns a human readable string
func (s *StorageState) String() string {
return string(encoding.JSONMarshal(s))
}
// StateMachineInfo represents state machine register info.
type StateMachineInfo struct {
Path string `json:"path"`
CreateState func() interface{} `json:"-"`
}
// StateMetric represents internal state metric.
type StateMetric struct {
Tags map[string]string `json:"tags"`
Fields []StateField `json:"fields"`
}
// StateField represents internal state value.
type StateField struct {
Name string `json:"name"`
Type string `json:"type"`
Value float64 `json:"value"`
}
// DataFamilyState represents the state of data family.
type DataFamilyState struct {
ShardID ShardID `json:"shardId"`
FamilyTime string `json:"familyTime"`
AckSequences map[int32]int64 `json:"ackSequences"`
ReplicaSequences map[int32]int64 `json:"replicaSequences"`
MemoryDatabases []MemoryDatabaseState `json:"memoryDatabases"`
}
// MemoryDatabaseState represents the state of memory database.
type MemoryDatabaseState struct {
State string `json:"state"`
Uptime time.Duration `json:"uptime"`
MemSize int64 `json:"memSize"`
NumOfMetrics int `json:"numOfMetrics"`
NumOfSeries int `json:"numOfSeries"`
}