forked from celo-org/celo-blockchain
-
Notifications
You must be signed in to change notification settings - Fork 1
/
validator_assignment.go
228 lines (194 loc) · 8.45 KB
/
validator_assignment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright 2017 The celo Authors
// This file is part of the celo library.
//
// The celo library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The celo library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the celo library. If not, see <http://www.gnu.org/licenses/>.
package proxy
import (
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
"github.com/keep-network/celo-blockchain/common"
"github.com/keep-network/celo-blockchain/log"
"github.com/keep-network/celo-blockchain/p2p/enode"
)
// This type stores the assignment of remote validators to proxies, as well as the
// reverse assignment.
// If a validator is assigned to a nil proxy, then that means that it's
// not assigned yet.
// WARNING: None of this object's functions are threadsafe, so it's
// the user's responsibility to ensure that.
type valAssignments struct {
valToProxy map[common.Address]*enode.ID // map of validator address -> proxy assignment ID
proxyToVals map[enode.ID]map[common.Address]struct{} // map of proxy ID to set of validator addresses
logger log.Logger
}
func newValAssignments() *valAssignments {
return &valAssignments{
valToProxy: make(map[common.Address]*enode.ID),
proxyToVals: make(map[enode.ID]map[common.Address]struct{}),
logger: log.New(),
}
}
// addValidators adds validators to valToProxy without an assigned proxy
func (va *valAssignments) addValidators(vals []common.Address) {
logger := va.logger.New("func", "addValidators")
logger.Info("adding validators to val assignments", "new vals", common.ConvertToStringSlice(vals))
for _, val := range vals {
va.valToProxy[val] = nil
}
}
// removeValidators removes validators from any proxy assignments and deletes
// them from valToProxy
func (va *valAssignments) removeValidators(vals []common.Address) {
logger := va.logger.New("func", "removeValidators")
logger.Info("removing validators from val assignments", "removed vals", common.ConvertToStringSlice(vals))
for _, val := range vals {
va.unassignValidator(val)
delete(va.valToProxy, val)
}
}
// assignValidator assigns a validator with address valAddress to the proxy
// with ID proxyID
func (va *valAssignments) assignValidator(valAddress common.Address, proxyID enode.ID) {
va.valToProxy[valAddress] = &proxyID
if _, ok := va.proxyToVals[proxyID]; !ok {
va.proxyToVals[proxyID] = make(map[common.Address]struct{})
}
va.proxyToVals[proxyID][valAddress] = struct{}{}
}
// unassignValidator unassigns a validator with address valAddress from
// its proxy. If it was never assigned, this does nothing
func (va *valAssignments) unassignValidator(valAddress common.Address) {
proxyID := va.valToProxy[valAddress]
if proxyID != nil {
va.valToProxy[valAddress] = nil
delete(va.proxyToVals[*proxyID], valAddress)
if len(va.proxyToVals[*proxyID]) == 0 {
delete(va.proxyToVals, *proxyID)
}
}
}
// getValidators returns all validator addresses that are found in valToProxy.
// Note that it will also return both assigned and unassigne validators.
func (va *valAssignments) getValidators() []common.Address {
vals := make([]common.Address, 0, len(va.valToProxy))
for val := range va.valToProxy {
vals = append(vals, val)
}
return vals
}
// assignmentPolicy is intended to allow multiple implementations of validator assignment
// policies
type assignmentPolicy interface {
assignProxy(proxy *Proxy, valAssignments *valAssignments) bool
removeProxy(proxy *Proxy, valAssignments *valAssignments) bool
assignRemoteValidators(validators []common.Address, valAssignments *valAssignments) bool
removeRemoteValidators(validators []common.Address, valAssignments *valAssignments) bool
}
// ==============================================
//
// define the consistent hashing assignment policy implementation
type hasher struct{}
func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}
// consistentHashingPolicy uses consistent hashing to assign validators to proxies.
// Validator <-> proxy pairings are recalculated every time a proxy or validator
// is added/removed
// WARNING: None of this object's functions are threadsafe, so it's
// the user's responsibility to ensure that.
type consistentHashingPolicy struct {
c *consistent.Consistent // used for consistent hashing
logger log.Logger
}
func newConsistentHashingPolicy() *consistentHashingPolicy {
// This sets up a consistent hasher with bounded loads:
// https://ai.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html
// Partitions are assigned to members (proxies in this case).
// using a hash ring.
// When locating a validator's proxy using `LocateKey`, the validator is assigned
// to a partition using hash(validator's address) % PartitionCount in constant time.
cfg := consistent.Config{
// Prime to distribute validators more uniformly.
// Higher partition count generally gives a more even distribution
PartitionCount: 271,
// The number of replications of a proxy on the hash ring
ReplicationFactor: 40,
// Used to enforce a max # of partitions assigned per proxy, which is
// (PartitionCount / len(proxies)) * Load. A load closer to 1 gives
// more uniformity in the # of partitions assigned to specific members,
// but a higher load results in less relocations when proxies are added/removed
Load: 1.2,
Hasher: hasher{},
}
return &consistentHashingPolicy{
c: consistent.New(nil, cfg),
logger: log.New(),
}
}
// assignProxy adds a proxy to the consistent hasher and recalculates all validator assignments
func (ch *consistentHashingPolicy) assignProxy(proxy *Proxy, valAssignments *valAssignments) bool {
ch.c.Add(proxy.ID())
return ch.reassignValidators(valAssignments)
}
// removeProxy removes a proxy from the consistent hasher and recalculates all validator assignments
func (ch *consistentHashingPolicy) removeProxy(proxy *Proxy, valAssignments *valAssignments) bool {
ch.c.Remove(proxy.ID().String())
return ch.reassignValidators(valAssignments)
}
// assignRemoteValidators adds remote validators to the valAssignments struct and recalculates
// all validator assignments
func (ch *consistentHashingPolicy) assignRemoteValidators(vals []common.Address, valAssignments *valAssignments) bool {
valAssignments.addValidators(vals)
return ch.reassignValidators(valAssignments)
}
// removeRemoteValidators removes remote validators from the valAssignments struct and recalculates
// all validator assignments
func (ch *consistentHashingPolicy) removeRemoteValidators(vals []common.Address, valAssignments *valAssignments) bool {
valAssignments.removeValidators(vals)
return ch.reassignValidators(valAssignments)
}
// reassignValidators recalculates all validator <-> proxy pairings
func (ch *consistentHashingPolicy) reassignValidators(valAssignments *valAssignments) bool {
logger := ch.logger.New("func", "reassignValidators")
anyAssignmentsChanged := false
for val, proxyID := range valAssignments.valToProxy {
newProxyID := ch.c.LocateKey(val.Bytes())
if newProxyID == nil {
logger.Trace("Unassigning validator", "validator", val)
valAssignments.unassignValidator(val)
anyAssignmentsChanged = true
} else if proxyID == nil || newProxyID.String() != proxyID.String() {
proxyIDStr := "nil"
if proxyID != nil {
proxyIDStr = proxyID.String()
}
logger.Trace("Reassigning validator", "validator", val, "original proxy", proxyIDStr, "new proxy", newProxyID.String())
valAssignments.unassignValidator(val)
valAssignments.assignValidator(val, enode.HexID(newProxyID.String()))
anyAssignmentsChanged = true
}
}
if anyAssignmentsChanged {
outputMap := make(map[enode.ID][]string)
for proxyID, validatorSet := range valAssignments.proxyToVals {
validatorSlice := make([]common.Address, 0, len(validatorSet))
for valAddress := range validatorSet {
validatorSlice = append(validatorSlice, valAddress)
}
outputMap[proxyID] = common.ConvertToStringSlice(validatorSlice)
}
logger.Info("remote validator to proxy assignment has changed", "new assignment", outputMap)
}
return anyAssignmentsChanged
}