forked from project-iris/iris
-
Notifications
You must be signed in to change notification settings - Fork 0
/
balancer.go
157 lines (139 loc) · 4.21 KB
/
balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Iris - Decentralized cloud messaging
// Copyright (c) 2013 Project Iris. All rights reserved.
//
// Iris is dual licensed: you can redistribute it and/or modify it under the
// terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The framework is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// Alternatively, the Iris framework may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).
// Package balancer implements a capacity based load balancer where each entity
// periodically reports its actual processing capacity and the balancer issues
// requests based on those numbers.
package balancer
import (
"fmt"
"math/big"
"math/rand"
"sort"
"sync"
)
// The load balancer for a single topic.
type Balancer struct {
members entitySlice // Entries to which to balace to
capacity int // Total message capacity of the topic
lock sync.RWMutex // Mutex to allow reentrant balancing
}
// Creates a new - empty - load balancer.
func New() *Balancer {
return &Balancer{
members: []*entity{},
}
}
// Registers an entity to load balance to (no duplicate checks are done).
func (b *Balancer) Register(id *big.Int) {
b.lock.Lock()
defer b.lock.Unlock()
b.members = append(b.members, &entity{id: id, cap: 1})
sort.Sort(b.members)
b.capacity += 1
}
// Unregisters an entity from the possible balancing destinations.
func (b *Balancer) Unregister(id *big.Int) {
b.lock.Lock()
defer b.lock.Unlock()
idx := b.members.Search(id)
if idx < len(b.members) && b.members[idx].id.Cmp(id) == 0 {
// Update total system capacity
b.capacity -= b.members[idx].cap
// Swap with last element
last := len(b.members) - 1
b.members[idx] = b.members[last]
b.members = b.members[:last]
// Get back to sorted order
sort.Sort(b.members)
} else {
panic("trying to remove non-registered entity")
}
}
// Updates an entry's capacity to cap.
func (b *Balancer) Update(id *big.Int, cap int) error {
b.lock.Lock()
defer b.lock.Unlock()
// Zero capacity is not allowed
if cap <= 0 {
cap = 1
}
idx := b.members.Search(id)
if idx < len(b.members) && b.members[idx].id.Cmp(id) == 0 {
// Update total system capacity
b.capacity -= b.members[idx].cap
b.capacity += cap
// Update local capacity
b.members[idx].cap = cap
} else {
return fmt.Errorf("non-registered entity: %v", id)
}
return nil
}
// Returns an id to which to send the next message to. The optional ex (can be
// nil) is used to exclude an entity from balancing to (if it's the only one
// available then this guarantee will be forfeit).
func (b *Balancer) Balance(ex *big.Int) (*big.Int, error) {
b.lock.RLock()
defer b.lock.RUnlock()
// Make sure there is actually somebody to balance to
if b.capacity == 0 {
return nil, fmt.Errorf("no capacity to balance")
}
// Calculate the available capacity with ex excluded
available := b.capacity
exclude := -1
if ex != nil {
idx := b.members.Search(ex)
if idx < len(b.members) && b.members[idx].id.Cmp(ex) == 0 {
if available != b.members[idx].cap {
available -= b.members[idx].cap
exclude = idx
}
}
}
// Generate a uniform random capacity and send to the associated entity
cap := rand.Intn(available)
for i, m := range b.members {
// Skip the excluded source entity
if i == exclude {
continue
}
cap -= m.cap
if cap < 0 {
return m.id, nil
}
}
// Just in case to prevent bugs
panic("balanced out of bounds")
}
// Returns the total capacity that the balancer can handle, optionally with ex
// excluded from the count.
func (b *Balancer) Capacity(ex *big.Int) int {
b.lock.RLock()
defer b.lock.RUnlock()
// If nothing's excluded, return total capacity
if ex == nil {
return b.capacity
}
// Otherwise find the excluded node and return reduced capacity
idx := b.members.Search(ex)
if idx < len(b.members) && b.members[idx].id.Cmp(ex) == 0 {
return b.capacity - b.members[idx].cap
} else {
return b.capacity
}
}