-
Notifications
You must be signed in to change notification settings - Fork 128
/
policy.go
155 lines (127 loc) · 3.71 KB
/
policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package loadbalancer
import (
"bufio"
"fmt"
"math/rand"
"net"
"net/http"
"sync"
"github.com/buraksezer/consistent"
istioapi "istio.io/client-go/pkg/apis/networking/v1alpha3"
"k8s.io/klog/v2"
"github.com/kubeedge/edgemesh/pkg/apis/config/v1alpha1"
)
const (
RoundRobin = "ROUND_ROBIN"
Random = "RANDOM"
ConsistentHash = "CONSISTENT_HASH"
HttpHeader = "HTTP_HEADER"
UserSourceIP = "USER_SOURCE_IP"
)
type Policy interface {
Name() string
Update(oldDr, dr *istioapi.DestinationRule)
Pick(endpoints []string, srcAddr net.Addr, tcpConn net.Conn) (string, *http.Request, error)
Sync(endpoints []string)
Release()
}
// RoundRobinPolicy is a default policy.
type RoundRobinPolicy struct {
}
func NewRoundRobinPolicy() *RoundRobinPolicy {
return &RoundRobinPolicy{}
}
func (*RoundRobinPolicy) Name() string {
return RoundRobin
}
func (*RoundRobinPolicy) Update(oldDr, dr *istioapi.DestinationRule) {}
func (*RoundRobinPolicy) Pick(endpoints []string, srcAddr net.Addr, netConn net.Conn) (string, *http.Request, error) {
// RoundRobinPolicy is an empty implementation and we won't use it,
// the outer round-robin policy will be used next.
return "", nil, fmt.Errorf("call RoundRobinPolicy is forbidden")
}
func (*RoundRobinPolicy) Sync(endpoints []string) {}
func (*RoundRobinPolicy) Release() {}
type RandomPolicy struct {
lock sync.Mutex
}
func NewRandomPolicy() *RandomPolicy {
return &RandomPolicy{}
}
func (rd *RandomPolicy) Name() string {
return Random
}
func (rd *RandomPolicy) Update(oldDr, dr *istioapi.DestinationRule) {}
func (rd *RandomPolicy) Pick(endpoints []string, srcAddr net.Addr, netConn net.Conn) (string, *http.Request, error) {
rd.lock.Lock()
k := rand.Int() % len(endpoints)
rd.lock.Unlock()
return endpoints[k], nil, nil
}
func (rd *RandomPolicy) Sync(endpoints []string) {}
func (rd *RandomPolicy) Release() {}
type ConsistentHashPolicy struct {
Config *v1alpha1.ConsistentHash
lock sync.Mutex
hashRing *consistent.Consistent
hashKey HashKey
}
func NewConsistentHashPolicy(config *v1alpha1.ConsistentHash, dr *istioapi.DestinationRule, endpoints []string) *ConsistentHashPolicy {
return &ConsistentHashPolicy{
Config: config,
hashRing: newHashRing(config, endpoints),
hashKey: getConsistentHashKey(dr),
}
}
func (ch *ConsistentHashPolicy) Name() string {
return ConsistentHash
}
func (ch *ConsistentHashPolicy) Update(oldDr, dr *istioapi.DestinationRule) {
ch.lock.Lock()
ch.hashKey = getConsistentHashKey(dr)
ch.lock.Unlock()
}
func (ch *ConsistentHashPolicy) Pick(endpoints []string, srcAddr net.Addr, netConn net.Conn) (endpoint string, req *http.Request, err error) {
ch.lock.Lock()
defer ch.lock.Unlock()
var keyValue string
switch ch.hashKey.Type {
case HttpHeader:
req, err = http.ReadRequest(bufio.NewReader(netConn))
if err != nil {
klog.Errorf("read http request err: %v", err)
return "", nil, err
}
keyValue = req.Header.Get(ch.hashKey.Key)
case UserSourceIP:
if srcAddr == nil && netConn != nil {
srcAddr = netConn.RemoteAddr()
}
keyValue = srcAddr.String()
default:
klog.Errorf("Failed to get hash key value")
keyValue = ""
}
klog.Infof("get key value: %s", keyValue)
member := ch.hashRing.LocateKey([]byte(keyValue))
if member == nil {
errMsg := fmt.Errorf("can't find a endpoint by given key: %s", keyValue)
klog.Errorf("%v", errMsg)
return "", req, errMsg
}
return member.String(), req, nil
}
func (ch *ConsistentHashPolicy) Sync(endpoints []string) {
ch.lock.Lock()
if ch.hashRing == nil {
ch.hashRing = newHashRing(ch.Config, endpoints)
} else {
updateHashRing(ch.hashRing, endpoints)
}
ch.lock.Unlock()
}
func (ch *ConsistentHashPolicy) Release() {
ch.lock.Lock()
clearHashRing(ch.hashRing)
ch.lock.Unlock()
}