/
consistent_hash.go
152 lines (134 loc) · 3.58 KB
/
consistent_hash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package comm
import (
"errors"
"hash/crc32"
"sort"
"strconv"
"sync"
)
// 一致性哈希取值范围 [0,2^32-1]
type uint32slice []uint32
func (x uint32slice) Len() int {
return len(x)
}
// 比对两个数大小
func (x uint32slice) Less(i, j int) bool {
return x[i] < x[j]
}
// 切片中两个值交换
func (x uint32slice) Swap(i, j int) {
x[i], x[j] = x[j], x[i]
}
// 保存一致性hash信息
type Consistent struct {
// hash环 key为哈希值 值为节点信息
circleHashNodeMap map[uint32]string
// 已经排序的节点hash切片
sortedHashes uint32slice
// 虚拟节点个数 用来增加hash的平衡性 避免数据倾斜问题
virtualNodeCount int
// map 读写锁
sync.RWMutex
}
// 一致性哈希实例构造函数 设置默认节点数量
func NewConsistent() *Consistent {
return &Consistent{
// 初始化变量
circleHashNodeMap: make(map[uint32]string),
// 设置虚拟节点个数
virtualNodeCount: 20,
}
}
// 自动生成key
func (c *Consistent) generateKey(element string, index int) (key string) {
// 副本 key 生成逻辑
return element + strconv.Itoa(index)
}
// 获取hash位置
func (c *Consistent) hashKey(key string) (hash uint32) {
if len(key) < 64 {
// 声明一个字节数组长度为64
var scratch [64]byte
// 拷贝key数据到数组中
copy(scratch[:], key)
// 使用IEEE 多项式返回数据的CRC-32校验和 通过该函数计算哈希值
return crc32.ChecksumIEEE(scratch[:len(key)])
}
return crc32.ChecksumIEEE([]byte(key))
}
// 更新排序 方便查找
func (c *Consistent) updateSortedHashes() {
hashes := c.sortedHashes[:0]
// 判断切片容量 是否过大 如果过大则重置
if cap(c.sortedHashes)/(c.virtualNodeCount*4) > len(c.circleHashNodeMap) {
hashes = nil
}
// 添加 hash
// for hashKey := range c.circleHashNodeMap {
for hashKey, _ := range c.circleHashNodeMap {
hashes = append(hashes, hashKey)
}
// 对所有节点hash值进行排序 方便之后进行二分查找
sort.Sort(hashes)
// 重新赋值
c.sortedHashes = hashes
}
// 向hash环中添加1个节点
func (c *Consistent) Add(element string) {
// 在并发场景 对map的写操作需要加锁
// 加锁
c.Lock()
// 解锁
defer c.Unlock()
c.add(element)
}
func (c *Consistent) add(element string) {
// 循环虚拟节点设置副本
for i := 0; i < c.virtualNodeCount; i++ {
// 把虚拟节点映射到hash环中
c.circleHashNodeMap[c.hashKey(c.generateKey(element, i))] = element
}
// 更新排序
c.updateSortedHashes()
}
// 向hash环中删除1个节点
func (c *Consistent) Remove(element string) {
c.Lock()
defer c.Unlock()
c.remove(element)
}
func (c *Consistent) remove(element string) {
for i := 0; i < c.virtualNodeCount; i++ {
delete(c.circleHashNodeMap, c.hashKey(c.generateKey(element, i)))
}
c.updateSortedHashes()
}
// 根据数据标示获取最近的服务器节点信息
func (c *Consistent) Get(element string) (string, error) {
// 在并发场景 对map的读操作需要加读锁
// 加读锁
c.RLock()
// 解读锁
defer c.RUnlock()
if len(c.circleHashNodeMap) == 0 {
return "", errors.New("error : hash circle has no data")
}
// 计算hash值
key := c.hashKey(element)
i := c.search(key)
return c.circleHashNodeMap[c.sortedHashes[i]], nil
}
// 顺时针查找最近的服务端节点
func (c *Consistent) search(key uint32) int {
// 查找算法
f := func(x int) bool {
return c.sortedHashes[x] > key
}
// 使用 二分查找 来搜索指定切片满足条件的最小值
i := sort.Search(len(c.sortedHashes), f)
// 如果超出范围则设置i=0
if i >= len(c.sortedHashes) {
i = 0
}
return i
}