/
upstream.go
448 lines (353 loc) · 14.8 KB
/
upstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package types
import (
"context"
"net"
"sort"
"time"
metrics "github.com/rcrowley/go-metrics"
"mosn.io/api"
v2 "mosn.io/mosn/pkg/config/v2"
)
// Below is the basic relation between clusterManager, cluster, hostSet, and hosts:
//
// 1 * | 1 1 | 1 *
// clusterManager --------- cluster --------- --------- hostSet------hosts
// CDS Handler for cluster manager
type ClusterUpdateHandler func(oldCluster, newCluster Cluster)
// EDS Handler for cluster manager
type HostUpdateHandler func(cluster Cluster, hostConfigs []v2.Host)
// ClusterManager manages connection pools and load balancing for upstream clusters.
type ClusterManager interface {
// Add or update a cluster via API.
AddOrUpdatePrimaryCluster(cluster v2.Cluster) error
// AddOrUpdateClusterAndHost
AddOrUpdateClusterAndHost(cluster v2.Cluster, hosts []v2.Host) error
// Cluster Update functions, keep AddOrUpdatePrimaryCluster and AddOrUpdateClusterAndHost for compatible
UpdateCluster(cluster v2.Cluster, clusterHandler ClusterUpdateHandler) error
// Add Cluster health check callbacks
AddClusterHealthCheckCallbacks(name string, cb HealthCheckCb) error
// Get, use to get the snapshot of a cluster
GetClusterSnapshot(context context.Context, cluster string) ClusterSnapshot
// Deprecated: PutClusterSnapshot exists for historical compatibility and should not be used.
PutClusterSnapshot(ClusterSnapshot)
// UpdateClusterHosts used to update cluster's hosts
// temp interface todo: remove it
UpdateClusterHosts(cluster string, hosts []v2.Host) error
// AppendClusterHosts used to add cluster's hosts
AppendClusterHosts(clusterName string, hostConfigs []v2.Host) error
// Host Update functions, keep UpdateClusterHosts and AppendClusterHosts for compatible
UpdateHosts(clusterName string, hostConfigs []v2.Host, hostHandler HostUpdateHandler) error
// Get or Create tcp conn pool for a cluster
TCPConnForCluster(balancerContext LoadBalancerContext, snapshot ClusterSnapshot) CreateConnectionData
// Get or Create tcp conn pool for a cluster
UDPConnForCluster(balancerContext LoadBalancerContext, snapshot ClusterSnapshot) CreateConnectionData
// ConnPoolForCluster used to get protocol related conn pool
ConnPoolForCluster(balancerContext LoadBalancerContext, snapshot ClusterSnapshot, protocol api.ProtocolName) (ConnectionPool, Host)
// RemovePrimaryCluster used to remove cluster from set
RemovePrimaryCluster(clusters ...string) error
// ClusterExist, used to check whether 'clusterName' exist or not
ClusterExist(clusterName string) bool
// RemoveClusterHosts, remove the host by address string
RemoveClusterHosts(clusterName string, hosts []string) error
// TLSManager is used to cluster tls config
GetTLSManager() TLSClientContextManager
// UpdateTLSManager updates the tls manager which is used to cluster tls config
UpdateTLSManager(*v2.TLSConfig)
// ShutdownConnectionPool shutdown the connection pool by address and ProtocolName
// If ProtocolName is not specified, remove the addr's connection pool of all protocols
ShutdownConnectionPool(proto ProtocolName, addr string)
// Destroy the cluster manager
Destroy()
}
// ClusterSnapshot is a thread-safe cluster snapshot
type ClusterSnapshot interface {
// HostSet returns the cluster snapshot's host set
HostSet() HostSet
// ClusterInfo returns the cluster snapshot's cluster info
ClusterInfo() ClusterInfo
// LoadBalancer returns the cluster snapshot's load balancer
LoadBalancer() LoadBalancer
// IsExistsHosts checks whether the metadata's subset contains host or not
// if metadata is nil, check the cluster snapshot contains host or not
IsExistsHosts(metadata api.MetadataMatchCriteria) bool
HostNum(metadata api.MetadataMatchCriteria) int
}
// Cluster is a group of upstream hosts
type Cluster interface {
// Snapshot returns the cluster snapshot, which contains cluster info, hostset and load balancer
Snapshot() ClusterSnapshot
// UpdateHosts updates the host set's hosts
UpdateHosts(HostSet)
// Add health check callbacks in health checker
AddHealthCheckCallbacks(cb HealthCheckCb)
// Shutdown the healthcheck routine, if exists
StopHealthChecking()
}
// HostPredicate checks whether the host is matched the metadata
type HostPredicate func(Host) bool
// HostSet is as set of hosts that contains all the endpoints for a given
type HostSet interface {
// Size return len(hosts) in hostSet
Size() int
// Get get hosts[i] in hostSet
// The value range of i should be [0, len(hosts) )
Get(i int) Host
// Range iterates each host in hostSet
Range(func(Host) bool)
}
// Host is an upstream host
type Host interface {
api.HostInfo
// HostStats returns the host stats metrics
HostStats() *HostStats
// ClusterInfo returns the cluster info
ClusterInfo() ClusterInfo
// SetClusterInfo updates the host's cluster info
SetClusterInfo(info ClusterInfo)
// TLSHashValue TLS HashValue effects the host support tls state
TLSHashValue() *HashValue
// CreateConnection a connection for this host.
CreateConnection(context context.Context) CreateConnectionData
// CreateUDPConnection an udp connection for this host.
CreateUDPConnection(context context.Context) CreateConnectionData
// Address returns the host's Addr structure
Address() net.Addr
// Config creates a host config by the host attributes
Config() v2.Host
// LastHealthCheckPassTime returns the timestamp when host has translated from unhealthy to healthy state
LastHealthCheckPassTime() time.Time
// SetLastHealthCheckPassTime updates the timestamp when host has translated from unhealthy to healthy state,
// or translated from other host
SetLastHealthCheckPassTime(lastHealthCheckPassTime time.Time)
}
// ClusterInfo defines a cluster's information
type ClusterInfo interface {
// Name returns the cluster name
Name() string
// ClusterType returns the cluster type
ClusterType() v2.ClusterType
// LbType returns the cluster's load balancer type
LbType() LoadBalancerType
// ConnBufferLimitBytes returns the connection buffer limits
ConnBufferLimitBytes() uint32
// MaxRequestsPerConn returns a connection's max request
MaxRequestsPerConn() uint32
Mark() uint32
// Stats returns the cluster's stats metrics
Stats() *ClusterStats
// ResourceManager returns the ResourceManager
ResourceManager() ResourceManager
// TLSMng returns the tls manager
TLSMng() TLSClientContextManager
// LbSubsetInfo returns the load balancer subset's config
LbSubsetInfo() LBSubsetInfo
// ConnectTimeout returns the connect timeout
ConnectTimeout() time.Duration
// IdleTimeout returns the idle timeout
IdleTimeout() time.Duration
// LbOriDstInfo returns the load balancer oridst config
LbOriDstInfo() LBOriDstInfo
// Optional configuration for the load balancing algorithm selected by
LbConfig() *v2.LbConfig
// Optional configuration for some cluster description
SubType() string
// SlowStart returns the slow start configurations
SlowStart() SlowStart
// IsClusterPoolEnable returns the cluster pool enable or not
IsClusterPoolEnable() bool
}
// ResourceManager manages different types of Resource
type ResourceManager interface {
// Connections resource to count connections in pool. Only used by protocol which has a connection pool which has multiple connections.
Connections() Resource
// Pending request resource to count pending requests. Only used by protocol which has a connection pool and pending requests to assign to connections.
PendingRequests() Resource
// Request resource to count requests
Requests() Resource
// Retries resource to count retries
Retries() Resource
}
// Resource is an interface to statistics information
type Resource interface {
CanCreate() bool
Increase()
Decrease()
Max() uint64
Cur() int64
UpdateCur(int64)
}
// HostStats defines a host's statistics information
type HostStats struct {
UpstreamConnectionTotal metrics.Counter
UpstreamConnectionClose metrics.Counter
UpstreamConnectionActive metrics.Counter
UpstreamConnectionConFail metrics.Counter
UpstreamConnectionLocalClose metrics.Counter
UpstreamConnectionRemoteClose metrics.Counter
UpstreamConnectionLocalCloseWithActiveRequest metrics.Counter
UpstreamConnectionRemoteCloseWithActiveRequest metrics.Counter
UpstreamConnectionCloseNotify metrics.Counter
UpstreamRequestTotal metrics.Counter
UpstreamRequestActive metrics.Counter
UpstreamRequestLocalReset metrics.Counter
UpstreamRequestRemoteReset metrics.Counter
UpstreamRequestTimeout metrics.Counter
UpstreamRequestFailureEject metrics.Counter
UpstreamRequestPendingOverflow metrics.Counter
UpstreamRequestDuration metrics.Histogram
UpstreamRequestDurationEWMA metrics.EWMA
UpstreamRequestDurationTotal metrics.Counter
UpstreamResponseSuccess metrics.Counter
UpstreamResponseFailed metrics.Counter
UpstreamResponseTotalEWMA metrics.EWMA
UpstreamResponseClientErrorEWMA metrics.EWMA
UpstreamResponseServerErrorEWMA metrics.EWMA
}
// ClusterStats defines a cluster's statistics information
type ClusterStats struct {
UpstreamConnectionTotal metrics.Counter
UpstreamConnectionClose metrics.Counter
UpstreamConnectionActive metrics.Counter
UpstreamConnectionConFail metrics.Counter
UpstreamConnectionRetry metrics.Counter
UpstreamConnectionLocalClose metrics.Counter
UpstreamConnectionRemoteClose metrics.Counter
UpstreamConnectionLocalCloseWithActiveRequest metrics.Counter
UpstreamConnectionRemoteCloseWithActiveRequest metrics.Counter
UpstreamConnectionCloseNotify metrics.Counter
UpstreamBytesReadTotal metrics.Counter
UpstreamBytesWriteTotal metrics.Counter
UpstreamRequestTotal metrics.Counter
UpstreamRequestActive metrics.Counter
UpstreamRequestLocalReset metrics.Counter
UpstreamRequestRemoteReset metrics.Counter
UpstreamRequestRetry metrics.Counter
UpstreamRequestRetryOverflow metrics.Counter
UpstreamRequestTimeout metrics.Counter
UpstreamRequestFailureEject metrics.Counter
UpstreamRequestPendingOverflow metrics.Counter
UpstreamRequestDuration metrics.Histogram
UpstreamRequestDurationEWMA metrics.EWMA
UpstreamRequestDurationTotal metrics.Counter
UpstreamResponseSuccess metrics.Counter
UpstreamResponseFailed metrics.Counter
LBSubSetsFallBack metrics.Counter
LBSubsetsCreated metrics.Gauge
}
type CreateConnectionData struct {
Connection ClientConnection
Host Host
}
type SlowStart struct {
Mode SlowStartMode
SlowStartDuration time.Duration
Aggression float64
MinWeightPercent float64
}
// SimpleCluster is a simple cluster in memory
type SimpleCluster interface {
UpdateHosts(newHosts []Host)
}
// ClusterConfigFactoryCb is a callback interface
type ClusterConfigFactoryCb interface {
UpdateClusterConfig(configs []v2.Cluster) error
}
type ClusterHostFactoryCb interface {
UpdateClusterHost(cluster string, hosts []v2.Host) error
}
type ClusterManagerFilter interface {
OnCreated(cccb ClusterConfigFactoryCb, chcb ClusterHostFactoryCb)
}
// RegisterUpstreamUpdateMethodCb is a callback interface
type RegisterUpstreamUpdateMethodCb interface {
TriggerClusterUpdate(clusterName string, hosts []v2.Host)
GetClusterNameByServiceName(serviceName string) string
}
type LBSubsetInfo interface {
// IsEnabled represents whether the subset load balancer is configured or not
IsEnabled() bool
// FallbackPolicy returns the fallback policy
FallbackPolicy() FallBackPolicy
// DefaultSubset returns the default subset's metadata configure
// it takes effects when the fallback policy is default subset
DefaultSubset() SubsetMetadata
// SubsetKeys returns the sorted subset keys
SubsetKeys() []SortedStringSetType
}
type LBOriDstInfo interface {
// Check use host header
IsEnabled() bool
// GET header name
GetHeader() string
IsReplaceLocal() bool
}
// SortedHosts is an implementation of sort.Interface
// a slice of host can be sorted as address string
type SortedHosts []Host
func (s SortedHosts) Len() int {
return len(s)
}
func (s SortedHosts) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s SortedHosts) Less(i, j int) bool {
return s[i].AddressString() < s[j].AddressString()
}
// SortedStringSetType is a sorted key collection with no duplicate
type SortedStringSetType struct {
keys []string
}
// InitSet returns a SortedStringSetType
// The input key will be sorted and deduplicated
func InitSet(input []string) SortedStringSetType {
var ssst SortedStringSetType
var keys []string
for _, keyInput := range input {
exist := false
for _, keyIn := range keys {
if keyIn == keyInput {
exist = true
break
}
}
if !exist {
keys = append(keys, keyInput)
}
}
ssst.keys = keys
sort.Sort(&ssst)
return ssst
}
// Keys is the keys in the collection
func (ss *SortedStringSetType) Keys() []string {
return ss.keys
}
// Len is the number of elements in the collection.
func (ss *SortedStringSetType) Len() int {
return len(ss.keys)
}
// Less reports whether the element with
// index i should sort before the element with index j.
func (ss *SortedStringSetType) Less(i, j int) bool {
return ss.keys[i] < ss.keys[j]
}
// Swap swaps the elements with indexes i and j.
func (ss *SortedStringSetType) Swap(i, j int) {
ss.keys[i], ss.keys[j] = ss.keys[j], ss.keys[i]
}