-
Notifications
You must be signed in to change notification settings - Fork 158
/
sync.go
239 lines (186 loc) · 6.54 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package env
import (
"github.com/openziti/channel/v2"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/network"
"sync"
)
// RouterSyncStrategyType aliased type for router strategies
type RouterSyncStrategyType string
// RouterSyncStatus aliased type for router sync status
type RouterSyncStatus string
const (
RouterSyncNew RouterSyncStatus = "SYNC_NEW" //connection accepted but no strategy actions have been taken
RouterSyncQueued RouterSyncStatus = "SYNC_QUEUED" //connection handed to strategy, but not processed
RouterSyncHello RouterSyncStatus = "SYNC_HELLO" //connection is beginning hello cycle
RouterSyncHelloWait RouterSyncStatus = "SYNC_HELLO_WAIT" //hello received from router, but there are too many synchronizing routers
RouterSyncResyncWait RouterSyncStatus = "SYNC_RESYNC_WAIT" //router requested a resync, in queue
RouterSynInProgress RouterSyncStatus = "SYNC_IN_PROGRESS" //hello finished, starting to send state
RouterSyncDone RouterSyncStatus = "SYNC_DONE" //initial state sent
//Error states
RouterSyncUnknown RouterSyncStatus = "SYNC_UNKNOWN" //the router is currently unknown
RouterSyncDisconnected RouterSyncStatus = "SYNC_DISCONNECTED" //strategy was disconnected before finishing
RouterSyncHelloTimeout RouterSyncStatus = "SYNC_HELLO_TIMEOUT" //sync failed due to a hello timeout.
RouterSyncError RouterSyncStatus = "SYNC_ERROR" //sync failed due to an unexpected error
//msg headers
SyncStrategyTypeHeader = 1013
SyncStrategyStateHeader = 1014
SyncStrategyLastIndex = 1015
)
// RouterSyncStrategy handles the life cycle of an Edge Router connecting to the controller, synchronizing
// any upfront state and then maintaining state after that.
type RouterSyncStrategy interface {
Type() RouterSyncStrategyType
GetEdgeRouterState(id string) RouterStateValues
Stop()
GetPublicKeys() map[string]*edge_ctrl_pb.DataState_PublicKey
RouterConnectionHandler
RouterSynchronizerEventHandler
}
// RouterConnectionHandler is responsible for handling router connect/disconnect for synchronizing state.
// This is intended for API Session but additional state is possible. Implementations may bind additional
// handlers to the channel.
type RouterConnectionHandler interface {
RouterConnected(edgeRouter *model.EdgeRouter, router *network.Router)
RouterDisconnected(router *network.Router)
GetReceiveHandlers() []channel.TypedReceiveHandler
}
// RouterSynchronizerEventHandler is responsible for keeping Edge Routers up to date on API Sessions
type RouterSynchronizerEventHandler interface {
ApiSessionAdded(apiSession *db.ApiSession)
ApiSessionUpdated(apiSession *db.ApiSession, apiSessionCert *db.ApiSessionCertificate)
ApiSessionDeleted(apiSession *db.ApiSession)
SessionDeleted(session *db.Session)
}
// RouterState provides a thread save mechanism to access and set router status information that may be influx
// due to reouter connection/disconnection.
type RouterState interface {
SetIsOnline(isOnline bool)
IsOnline() bool
SetHostname(hostname string)
Hostname() string
SetProtocols(protocols map[string]string)
Protocols() map[string]string
SetSyncStatus(status RouterSyncStatus)
SyncStatus() RouterSyncStatus
SetVersionInfo(versionInfo versions.VersionInfo)
GetVersionInfo() versions.VersionInfo
Values() RouterStateValues
}
var _ RouterState = &LockingRouterState{}
type RouterStateValues struct {
IsOnline bool
Hostname string
Protocols map[string]string
SyncStatus RouterSyncStatus
VersionInfo versions.VersionInfo
}
func NewRouterStatusValues() RouterStateValues {
return RouterStateValues{
IsOnline: false,
Hostname: "",
Protocols: map[string]string{},
SyncStatus: RouterSyncUnknown,
VersionInfo: versions.VersionInfo{
Version: "",
Revision: "",
BuildDate: "",
OS: "",
Arch: "",
},
}
}
type LockingRouterState struct {
internal RouterStateValues
lock sync.Mutex
}
func NewLockingRouterStatus() *LockingRouterState {
return &LockingRouterState{
internal: NewRouterStatusValues(),
lock: sync.Mutex{},
}
}
func (r *LockingRouterState) Values() RouterStateValues {
r.lock.Lock()
defer r.lock.Unlock()
ret := r.internal
ret.Protocols = map[string]string{}
for k, v := range r.internal.Protocols {
ret.Protocols[k] = v
}
return ret
}
func (r *LockingRouterState) SetIsOnline(isOnline bool) {
r.lock.Lock()
defer r.lock.Unlock()
r.internal.IsOnline = isOnline
}
func (r *LockingRouterState) IsOnline() bool {
r.lock.Lock()
defer r.lock.Unlock()
return r.internal.IsOnline
}
func (r *LockingRouterState) SetHostname(hostname string) {
r.lock.Lock()
defer r.lock.Unlock()
r.internal.Hostname = hostname
}
func (r *LockingRouterState) Hostname() string {
r.lock.Lock()
defer r.lock.Unlock()
return r.internal.Hostname
}
func (r *LockingRouterState) SetProtocols(protocols map[string]string) {
r.lock.Lock()
defer r.lock.Unlock()
newProtocols := map[string]string{}
for k, v := range protocols {
newProtocols[k] = v
}
r.internal.Protocols = newProtocols
}
func (r *LockingRouterState) Protocols() map[string]string {
r.lock.Lock()
defer r.lock.Unlock()
//to return empty, not nil
copy := map[string]string{}
for k, v := range r.internal.Protocols {
copy[k] = v
}
return copy
}
func (r *LockingRouterState) SetSyncStatus(syncStatus RouterSyncStatus) {
r.lock.Lock()
defer r.lock.Unlock()
r.internal.SyncStatus = syncStatus
}
func (r *LockingRouterState) SyncStatus() RouterSyncStatus {
r.lock.Lock()
defer r.lock.Unlock()
return r.internal.SyncStatus
}
func (r *LockingRouterState) SetVersionInfo(versionInfo versions.VersionInfo) {
r.lock.Lock()
defer r.lock.Unlock()
r.internal.VersionInfo = versionInfo
}
func (r *LockingRouterState) GetVersionInfo() versions.VersionInfo {
r.lock.Lock()
defer r.lock.Unlock()
return r.internal.VersionInfo
}