forked from dropbox/godropbox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
load_balanced_pool.go
233 lines (203 loc) · 6.64 KB
/
load_balanced_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package http2
import (
"math/rand"
"net/http"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/dropbox/godropbox/errors"
)
const (
// Number of attempts to try to connect to a target host.
connectionAttempts = 3
// Default instance mark down duration.
markDownDuration = 10 * time.Second
)
type LBStrategy int
const (
// In 'RoundRobin' load balancing strategy requests are sent to
// different hosts in round robin fashion.
LBRoundRobin LBStrategy = 0
// In 'Fixed' load balancing strategy requests are routed to same host,
// others are used only in case of failover.
LBFixed LBStrategy = 1
)
type LoadBalancedPool struct {
lock sync.RWMutex
// Maps "host:port" -> instancePool.
instances map[string]*instancePool
instanceList instancePoolSlice
// Atomic counter that is used for round robining instances
// from instanceList.
instanceIdx uint64
// UNIX epoch time in seconds that represents time till address is considered
// as down and unusuable.
markDownUntil []int64
params ConnectionParams // Parameters for creating SimplePool-s.
strategy LBStrategy // Load balancing strategy.
}
type instancePool struct {
SimplePool
instanceId int
}
type instancePoolSlice []*instancePool
func (s instancePoolSlice) Len() int { return len(s) }
func (s instancePoolSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// instancePoolSlice sorts by instanceId in descending order.
func (s instancePoolSlice) Less(i, j int) bool { return s[i].instanceId > s[j].instanceId }
type LBPoolInstanceInfo struct {
InstanceId int
Addr string
}
func NewLoadBalancedPool(params ConnectionParams) *LoadBalancedPool {
return &LoadBalancedPool{
instances: make(map[string]*instancePool),
instanceList: make(instancePoolSlice, 0),
markDownUntil: make([]int64, 0),
params: params,
strategy: LBRoundRobin,
}
}
// Sets Load Balancing strategy. Must be called before pool is actually put to use.
func (pool *LoadBalancedPool) SetStrategy(strategy LBStrategy) {
pool.strategy = strategy
}
func (pool *LoadBalancedPool) newInstancePool(info LBPoolInstanceInfo) *instancePool {
simplePool := NewSimplePool(info.Addr, pool.params)
return &instancePool{SimplePool: *simplePool, instanceId: info.InstanceId}
}
func (pool *LoadBalancedPool) Update(instanceInfos []LBPoolInstanceInfo) {
pool.lock.Lock()
defer pool.lock.Unlock()
newInstances := make(map[string]*instancePool)
newInstanceList := make(instancePoolSlice, len(instanceInfos))
for i, instanceInfo := range instanceInfos {
var instance *instancePool
var ok bool
if instance, ok = pool.instances[instanceInfo.Addr]; !ok {
instance = pool.newInstancePool(instanceInfo)
}
newInstances[instanceInfo.Addr] = instance
newInstanceList[i] = instance
}
switch pool.strategy {
case LBRoundRobin:
// In RoundRobin strategy, InstanceList is a randomly shuffled list of instances.
for i, _ := range newInstanceList {
randIdx := rand.Intn(i + 1)
newInstanceList.Swap(i, randIdx)
}
case LBFixed:
// In Fixed strategy, InstanceList is a sorted list, sorted by instanceId.
sort.Sort(newInstanceList)
}
for addr, instancePool := range pool.instances {
// Close out all InstancePools that are not needed anymore.
if _, ok := newInstances[addr]; !ok {
instancePool.Close()
}
}
pool.instances = newInstances
pool.instanceList = newInstanceList
pool.markDownUntil = make([]int64, len(newInstanceList))
}
//
// Pool interface methods
//
// Issues an HTTP request, distributing more load to relatively unloaded instances.
func (pool *LoadBalancedPool) Do(req *http.Request) (*http.Response, error) {
for i := 0; ; i++ {
idx, instance, err := pool.getInstance()
if err != nil {
return nil, errors.Wrap(err, "can't get HTTP connection")
}
resp, err := instance.Do(req)
if err != nil || resp.StatusCode == 500 {
// 500s are also treated as service being down momentarily,
// note that even if all servers get marked down LBPool continues
// to send requests in round robin manner, thus this provides extra
// protection when service may still be up but have higher rate of
// 500s for whatever reason.
pool.markInstanceDown(idx, instance, time.Now().Add(markDownDuration).Unix())
}
if err != nil {
if _, ok := err.(DialError); !ok {
return resp, err
}
if (i + 1) < connectionAttempts {
continue
}
}
return resp, err
}
}
// Checks out an HTTP connection from an instance pool, favoring less loaded instances.
func (pool *LoadBalancedPool) Get() (*http.Client, error) {
_, instance, err := pool.getInstance()
if err != nil {
return nil, errors.Wrap(err, "can't get HTTP connection")
}
conn, err := instance.Get()
if err != nil {
return nil, errors.Wrap(err, "couldn't Get from LoadBalancedPool")
}
return conn, err
}
// Returns instance that isn't marked down, if all instances are
// marked as down it will just choose a next one.
func (pool *LoadBalancedPool) getInstance() (int, *instancePool, error) {
pool.lock.RLock()
defer pool.lock.RUnlock()
if len(pool.instanceList) == 0 {
return 0, nil, errors.Newf("no available instances")
}
now := time.Now().Unix()
var idx int
for i := 0; i < len(pool.instanceList); i++ {
switch pool.strategy {
case LBRoundRobin:
// In RoundRobin strategy instanceIdx keeps changing, to
// achieve round robin load balancing.
instanceIdx := atomic.AddUint64(&pool.instanceIdx, 1)
idx = int(instanceIdx % uint64(len(pool.instanceList)))
case LBFixed:
// In Fixed strategy instances are always traversed in same
// exact order.
idx = i
}
if pool.markDownUntil[idx] < now {
break
}
}
return idx, pool.instanceList[idx], nil
}
// Returns a SimplePool for given instanceId, or an error if it does not exist.
// TODO(zviad): right now this scans all instances, thus if there are a lot of
// instances per partition it can become very slow. If it becomes a problem, fix it!
func (pool *LoadBalancedPool) GetInstancePool(instanceId int) (*SimplePool, error) {
pool.lock.RLock()
defer pool.lock.RUnlock()
for _, instancePool := range pool.instanceList {
if instancePool.instanceId == instanceId {
return &instancePool.SimplePool, nil
}
}
return nil, errors.Newf("InstanceId: %v not found in the pool", instanceId)
}
// Marks instance down till downUntil epoch in seconds.
func (pool *LoadBalancedPool) markInstanceDown(
idx int, instance *instancePool, downUntil int64) {
pool.lock.Lock()
defer pool.lock.Unlock()
if idx < len(pool.instanceList) && pool.instanceList[idx] == instance {
pool.markDownUntil[idx] = downUntil
}
}
func (pool *LoadBalancedPool) Close() {
pool.lock.Lock()
defer pool.lock.Unlock()
for _, instance := range pool.instances {
instance.Close()
}
}