-
Notifications
You must be signed in to change notification settings - Fork 58
/
context.go
285 lines (257 loc) · 7.5 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/**
* Tencent is pleased to support the open source community by making polaris-go available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package model
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/polarismesh/polaris-go/pkg/clock"
)
const (
// 主流程引擎的上下文key
// SDK初始化后,会将主流程引擎对象放入上下文中,供插件按需使用
ContextKeyEngine = "engine"
// SDK的唯一标识id
ContextKeyToken = "SDKToken"
// sdkcontext上面的pluginManager
ContextKeyPlugins = "plugins"
// sdkContext创建开始时间
ContextKeyTakeEffectTime = "SDKTakeEffectTime"
// sdkContext创建结束时间
ContextKeyFinishInitTime = "SDKFinishInitTime"
)
// sdkContext的唯一标识
type SDKToken struct {
IP string
PID int32
UID string
Client string
Version string
PodName string
HostName string
}
// LocationInfo 地域信息
type LocationInfo interface {
// 获取地域明细
GetLocation() *Location
// 在地域信息获取过程中的错误信息
GetLastError() SDKError
// 获取地域信息状态
GetStatus() uint32
// 查看地域信息是否已初始化状态
IsLocationInitialized() bool
// 查看地域信息是否ready状态
IsLocationReady() bool
}
// ValueContext 用于主流程传递kv数据的上下文对象,线程安全
type ValueContext interface {
// 设置kv值
SetValue(key string, value interface{})
// 获取kv值
GetValue(key string) (interface{}, bool)
// 获取当前节点地域信息
GetCurrentLocation() LocationInfo
// 获取客户端ID
GetClientId() string
// 获取引擎接口
GetEngine() Engine
// 等待location是否达到locationStatus
WaitLocationInfo(ctx context.Context, locationStatus uint32) bool
// 设置当前节点地域信息
// 返回是否由非ready转换为ready
SetCurrentLocation(*Location, SDKError) bool
// 获取当前时间戳
Now() time.Time
// 计算时间间隔
Since(time.Time) time.Duration
}
// NewValueContext 创建kv上下文对象
func NewValueContext() ValueContext {
ctx := &valueContext{
coreMap: &sync.Map{},
}
ctx.clock = clock.GetClock()
ctx.currentLocation.Store(&locationInfo{
locationStatus: LocationInit,
})
ctx.locationInitializedNotify.Context, ctx.locationInitializedNotify.cancel = context.WithCancel(context.Background())
ctx.locationReadyNotify.Context, ctx.locationReadyNotify.cancel = context.WithCancel(context.Background())
return ctx
}
const (
// 地域信息初始化,未获取到地域信息
LocationInit uint32 = iota
// 地域信息获取失败,出现异常
LocationError
// 地域信息获取成功
LocationReady
// 地域信息获取成功,但是是空的,即没有在cmdb上面发现地域信息
LocationEmpty
)
// locationInfo 地域信息包装类型,含控制及状态信息
type locationInfo struct {
// 地域详情
location *Location
// 上一次获取失败
lastErr SDKError
// 地域信息状态
locationStatus uint32
}
// GetLocation 获取地域明细
func (l *locationInfo) GetLocation() *Location {
return l.location
}
// GetLastError 在地域信息获取过程中的错误信息
func (l *locationInfo) GetLastError() SDKError {
return l.lastErr
}
// GetStatus 获取地域信息状态
func (l *locationInfo) GetStatus() uint32 {
return l.locationStatus
}
// IsLocationInitialized 查看地域信息是否已经初始化过
func (l *locationInfo) IsLocationInitialized() bool {
return l.GetStatus() > LocationInit
}
// IsLocationReady 查看地域信息是否ready状态
func (l *locationInfo) IsLocationReady() bool {
return l.GetStatus() == LocationReady
}
// contextNotifier 通过context.Done通知一个事件的发生
type contextNotifier struct {
context.Context
cancel context.CancelFunc
onceNotify sync.Once
}
// Notify 通知事件发生
func (c *contextNotifier) Notify() {
c.onceNotify.Do(func() {
c.cancel()
})
}
// valueContext ValueContext的实现类
type valueContext struct {
// 当前地域信息
currentLocation atomic.Value
// 用于查看location是否initialized
locationInitializedNotify contextNotifier
// 用于查看location是否ready
locationReadyNotify contextNotifier
// 当前时间戳,存放类型为*time.Time
currentTimestamp atomic.Value
// 时钟,用于获取当前时间戳
clock clock.Clock
// 使用线程安全的map进行值的存储
coreMap *sync.Map
}
// WaitLocationInfo 等待地域信息状态
func (v *valueContext) WaitLocationInfo(ctx context.Context, locationStatus uint32) bool {
switch locationStatus {
case LocationInit:
<-v.locationInitializedNotify.Done()
return true
case LocationError:
<-v.locationInitializedNotify.Done()
return v.GetCurrentLocation().GetStatus() == LocationError
case LocationReady:
for {
select {
case <-ctx.Done():
return false
case <-v.locationReadyNotify.Done():
return true
}
}
}
return false
}
// SetValue 设置kv值
func (v *valueContext) SetValue(key string, value interface{}) {
v.coreMap.Store(key, value)
}
// GetValue 获取kv值
func (v *valueContext) GetValue(key string) (interface{}, bool) {
return v.coreMap.Load(key)
}
// GetCurrentLocation 获取当前节点地域信息
func (v *valueContext) GetCurrentLocation() LocationInfo {
value := v.currentLocation.Load()
return value.(LocationInfo)
}
// SetCurrentLocation 设置当前节点地域信息
func (v *valueContext) SetCurrentLocation(location *Location, lastErr SDKError) bool {
locInfo := &locationInfo{
location: location,
lastErr: lastErr,
}
if nil != lastErr {
locInfo.locationStatus = LocationError
} else if location != nil && !location.IsEmpty() {
locInfo.locationStatus = LocationReady
} else {
locInfo.locationStatus = LocationEmpty
}
lastLocationStatus := v.currentLocation.Load().(LocationInfo).GetStatus()
var becomeReady bool
switch lastLocationStatus {
case LocationReady:
// LocationReady状态,只能接受ready的更新
if locInfo.locationStatus == LocationReady {
v.currentLocation.Store(locInfo)
}
becomeReady = false
default:
// 其他2个状态可接受任意更新
v.currentLocation.Store(locInfo)
becomeReady = locInfo.locationStatus == LocationReady
}
v.locationInitializedNotify.Notify()
if becomeReady {
v.locationReadyNotify.Notify()
}
return becomeReady
}
// Now 获取当前时间戳
func (v *valueContext) Now() time.Time {
return v.clock.Now()
}
// Since 计算时间间隔
func (v *valueContext) Since(startTime time.Time) time.Duration {
return v.Now().Sub(startTime)
}
// IsLocationReady 查看地域信息是否ready状态
func (v *valueContext) IsLocationReady() bool {
return v.GetCurrentLocation().IsLocationReady()
}
// GetClientId 获取客户端ID
func (v *valueContext) GetClientId() string {
tokenValue, ok := v.GetValue(ContextKeyToken)
if !ok {
return ""
}
sdkToken := tokenValue.(SDKToken)
return sdkToken.UID
}
// GetEngine 获取客户端ID
func (v *valueContext) GetEngine() Engine {
value, ok := v.GetValue(ContextKeyEngine)
if !ok {
return nil
}
return value.(Engine)
}