/
upstream.go
412 lines (323 loc) · 12.9 KB
/
upstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package types
import (
"context"
"net"
"sort"
"time"
metrics "github.com/rcrowley/go-metrics"
v2 "mosn.io/mosn/pkg/api/v2"
)
// Below is the basic relation between clusterManager, cluster, hostSet, and hosts:
//
// 1 * | 1 1 | 1 *
// clusterManager --------- cluster --------- --------- hostSet------hosts
// ClusterManager manages connection pools and load balancing for upstream clusters.
type ClusterManager interface {
// Add or update a cluster via API.
AddOrUpdatePrimaryCluster(cluster v2.Cluster) error
// Add Cluster health check callbacks
AddClusterHealthCheckCallbacks(name string, cb HealthCheckCb) error
// Get, use to get the snapshot of a cluster
GetClusterSnapshot(context context.Context, cluster string) ClusterSnapshot
// Deprecated: PutClusterSnapshot exists for historical compatibility and should not be used.
PutClusterSnapshot(ClusterSnapshot)
// UpdateClusterHosts used to update cluster's hosts
// temp interface todo: remove it
UpdateClusterHosts(cluster string, hosts []v2.Host) error
// AppendClusterHosts used to add cluster's hosts
AppendClusterHosts(clusterName string, hostConfigs []v2.Host) error
// Get or Create tcp conn pool for a cluster
TCPConnForCluster(balancerContext LoadBalancerContext, snapshot ClusterSnapshot) CreateConnectionData
// ConnPoolForCluster used to get protocol related conn pool
ConnPoolForCluster(balancerContext LoadBalancerContext, snapshot ClusterSnapshot, protocol Protocol) ConnectionPool
// RemovePrimaryCluster used to remove cluster from set
RemovePrimaryCluster(clusters ...string) error
// ClusterExist, used to check whether 'clusterName' exist or not
ClusterExist(clusterName string) bool
// RemoveClusterHosts, remove the host by address string
RemoveClusterHosts(clusterName string, hosts []string) error
// Destroy the cluster manager
Destroy()
}
// ClusterSnapshot is a thread-safe cluster snapshot
type ClusterSnapshot interface {
// HostSet returns the cluster snapshot's host set
HostSet() HostSet
// ClusterInfo returns the cluster snapshot's cluster info
ClusterInfo() ClusterInfo
// LoadBalancer returns the cluster snapshot's load balancer
LoadBalancer() LoadBalancer
// IsExistsHosts checks whether the metadata's subset contains host or not
// if metadata is nil, check the cluster snapshot contains host or not
IsExistsHosts(metadata MetadataMatchCriteria) bool
HostNum(metadata MetadataMatchCriteria) int
}
// Cluster is a group of upstream hosts
type Cluster interface {
// Snapshot returns the cluster snapshot, which contains cluster info, hostset and load balancer
Snapshot() ClusterSnapshot
// UpdateHosts updates the host set's hosts
UpdateHosts([]Host)
// Add health check callbacks in health checker
AddHealthCheckCallbacks(cb HealthCheckCb)
}
// HostPredicate checks wether the host is matched the metadata
type HostPredicate func(Host) bool
// HostSet is as set of hosts that contains all of the endpoints for a given
type HostSet interface {
// Hosts returns all hosts that make up the set at the current time.
Hosts() []Host
// HealthyHosts returns all healthy hosts
HealthyHosts() []Host
}
// HealthFlag type
type HealthFlag int
const (
// The host is currently failing active health checks.
FAILED_ACTIVE_HC HealthFlag = 0x1
// The host is currently considered an outlier and has been ejected.
FAILED_OUTLIER_CHECK HealthFlag = 0x02
)
// Host is an upstream host
type Host interface {
HostInfo
// Create a connection for this host.
CreateConnection(context context.Context) CreateConnectionData
// ClearHealthFlag clear the input flag
ClearHealthFlag(flag HealthFlag)
// ContainHealthFlag checks whether the heatlhy state contains the flag
ContainHealthFlag(flag HealthFlag) bool
// SetHealthFlag set the input flag
SetHealthFlag(flag HealthFlag)
// HealthFlag returns the current healthy flag
HealthFlag() HealthFlag
// Health checks whether the host is healthy or not
Health() bool
// Address returns the host's Addr structure
Address() net.Addr
}
// HostInfo defines a host's basic information
type HostInfo interface {
// Hostname returns the host's name
Hostname() string
// Metadata returns the host's meta data
Metadata() v2.Metadata
// ClusterInfo returns the cluster info
ClusterInfo() ClusterInfo
// AddressString retuens the host's address string
AddressString() string
// HostStats returns the host stats metrics
HostStats() HostStats
// Weight returns the host weight
Weight() uint32
// Config creates a host config by the host attributes
Config() v2.Host
// SupportTLS returns whether the host support tls connections or not
// If returns true, means support tls connection
SupportTLS() bool
// TODO: add deploy locality
}
// HostStats defines a host's statistics information
type HostStats struct {
UpstreamConnectionTotal metrics.Counter
UpstreamConnectionClose metrics.Counter
UpstreamConnectionActive metrics.Counter
UpstreamConnectionConFail metrics.Counter
UpstreamConnectionLocalClose metrics.Counter
UpstreamConnectionRemoteClose metrics.Counter
UpstreamConnectionLocalCloseWithActiveRequest metrics.Counter
UpstreamConnectionRemoteCloseWithActiveRequest metrics.Counter
UpstreamConnectionCloseNotify metrics.Counter
UpstreamRequestTotal metrics.Counter
UpstreamRequestActive metrics.Counter
UpstreamRequestLocalReset metrics.Counter
UpstreamRequestRemoteReset metrics.Counter
UpstreamRequestTimeout metrics.Counter
UpstreamRequestFailureEject metrics.Counter
UpstreamRequestPendingOverflow metrics.Counter
UpstreamRequestDuration metrics.Histogram
UpstreamRequestDurationTotal metrics.Counter
UpstreamResponseSuccess metrics.Counter
UpstreamResponseFailed metrics.Counter
}
// ClusterInfo defines a cluster's information
type ClusterInfo interface {
// Name returns the cluster name
Name() string
// LbType returns the cluster's load balancer type
LbType() LoadBalancerType
// ConnBufferLimitBytes returns the connection buffer limits
ConnBufferLimitBytes() uint32
// MaxRequestsPerConn returns a connection's max request
MaxRequestsPerConn() uint32
// Stats returns the cluster's stats metrics
Stats() ClusterStats
// ResourceManager returns the ResourceManager
ResourceManager() ResourceManager
// TLSMng returns the tls manager
TLSMng() TLSContextManager
// LbSubsetInfo returns the load balancer subset's config
LbSubsetInfo() LBSubsetInfo
// ConectTimeout returns the connect timeout
ConnectTimeout() time.Duration
}
// ResourceManager manages different types of Resource
type ResourceManager interface {
// Connections resource to count connections in pool. Only used by protocol which has a connection pool which has multiple connections.
Connections() Resource
// Pending request resource to count pending requests. Only used by protocol which has a connection pool and pending requests to assign to connections.
PendingRequests() Resource
// Request resource to count requests
Requests() Resource
// Retries resource to count retries
Retries() Resource
}
// Resource is a interface to statistics information
type Resource interface {
CanCreate() bool
Increase()
Decrease()
Max() uint64
}
// ClusterStats defines a cluster's statistics information
type ClusterStats struct {
UpstreamConnectionTotal metrics.Counter
UpstreamConnectionClose metrics.Counter
UpstreamConnectionActive metrics.Counter
UpstreamConnectionConFail metrics.Counter
UpstreamConnectionRetry metrics.Counter
UpstreamConnectionLocalClose metrics.Counter
UpstreamConnectionRemoteClose metrics.Counter
UpstreamConnectionLocalCloseWithActiveRequest metrics.Counter
UpstreamConnectionRemoteCloseWithActiveRequest metrics.Counter
UpstreamConnectionCloseNotify metrics.Counter
UpstreamBytesReadTotal metrics.Counter
UpstreamBytesWriteTotal metrics.Counter
UpstreamRequestTotal metrics.Counter
UpstreamRequestActive metrics.Counter
UpstreamRequestLocalReset metrics.Counter
UpstreamRequestRemoteReset metrics.Counter
UpstreamRequestRetry metrics.Counter
UpstreamRequestRetryOverflow metrics.Counter
UpstreamRequestTimeout metrics.Counter
UpstreamRequestFailureEject metrics.Counter
UpstreamRequestPendingOverflow metrics.Counter
UpstreamRequestDuration metrics.Histogram
UpstreamRequestDurationTotal metrics.Counter
UpstreamResponseSuccess metrics.Counter
UpstreamResponseFailed metrics.Counter
LBSubSetsFallBack metrics.Counter
LBSubsetsCreated metrics.Gauge
}
type CreateConnectionData struct {
Connection ClientConnection
HostInfo HostInfo
}
// SimpleCluster is a simple cluster in memory
type SimpleCluster interface {
UpdateHosts(newHosts []Host)
}
// ClusterConfigFactoryCb is a callback interface
type ClusterConfigFactoryCb interface {
UpdateClusterConfig(configs []v2.Cluster) error
}
type ClusterHostFactoryCb interface {
UpdateClusterHost(cluster string, hosts []v2.Host) error
}
type ClusterManagerFilter interface {
OnCreated(cccb ClusterConfigFactoryCb, chcb ClusterHostFactoryCb)
}
// RegisterUpstreamUpdateMethodCb is a callback interface
type RegisterUpstreamUpdateMethodCb interface {
TriggerClusterUpdate(clusterName string, hosts []v2.Host)
GetClusterNameByServiceName(serviceName string) string
}
type LBSubsetInfo interface {
// IsEnabled represents whether the subset load balancer is configured or not
IsEnabled() bool
// FallbackPolicy returns the fallback policy
FallbackPolicy() FallBackPolicy
// DefaultSubset returns the default subset's metadata configure
// it takes effects when the fallback policy is default subset
DefaultSubset() SubsetMetadata
// SubsetKeys returns the sorted subset keys
SubsetKeys() []SortedStringSetType
}
// SortedHosts is an implementation of sort.Interface
// a slice of host can be sorted as address string
type SortedHosts []Host
func (s SortedHosts) Len() int {
return len(s)
}
func (s SortedHosts) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s SortedHosts) Less(i, j int) bool {
return s[i].AddressString() < s[j].AddressString()
}
// SortedStringSetType is a sorted key collection with no duplicate
type SortedStringSetType struct {
keys []string
}
// InitSet returns a SortedStringSetType
// The input key will be sorted and deduplicated
func InitSet(input []string) SortedStringSetType {
var ssst SortedStringSetType
var keys []string
for _, keyInput := range input {
exist := false
for _, keyIn := range keys {
if keyIn == keyInput {
exist = true
break
}
}
if !exist {
keys = append(keys, keyInput)
}
}
ssst.keys = keys
sort.Sort(&ssst)
return ssst
}
// Keys is the keys in the collection
func (ss *SortedStringSetType) Keys() []string {
return ss.keys
}
// Len is the number of elements in the collection.
func (ss *SortedStringSetType) Len() int {
return len(ss.keys)
}
// Less reports whether the element with
// index i should sort before the element with index j.
func (ss *SortedStringSetType) Less(i, j int) bool {
return ss.keys[i] < ss.keys[j]
}
// Swap swaps the elements with indexes i and j.
func (ss *SortedStringSetType) Swap(i, j int) {
ss.keys[i], ss.keys[j] = ss.keys[j], ss.keys[i]
}
func init() {
ConnPoolFactories = make(map[Protocol]bool)
}
var ConnPoolFactories map[Protocol]bool
func RegisterConnPoolFactory(protocol Protocol, registered bool) {
//other
ConnPoolFactories[protocol] = registered
}