-
-
Notifications
You must be signed in to change notification settings - Fork 190
/
partitioner.go
183 lines (163 loc) · 5.63 KB
/
partitioner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package kgo
import (
"math/rand"
"time"
)
// Partitioner creates topic partitioners to determine which partition messages
// should be sent to.
type Partitioner interface {
// forTopic returns a partitioner for an individual topic. It is
// guaranteed that only one record will use the an individual topic's
// topicPartitioner at a time, meaning partitioning within a topic does
// not require locks.
ForTopic(string) TopicPartitioner
}
// TopicPartitioner partitions records in an individual topic.
type TopicPartitioner interface {
// OnNewBatch is called when producing a record if that record would
// trigger a new batch on its current partition.
OnNewBatch()
// RequiresConsistency returns true if a record must hash to the same
// partition even if a partition is down.
// If true, a record may hash to a partition that cannot be written to
// and will error until the partition comes back.
RequiresConsistency(*Record) bool
// Partition determines, among a set of n partitions, which index should
// be chosen to use for the partition for r.
Partition(r *Record, n int) int
}
// StickyPartitioner is the same as StickyKeyPartitioner, but with no logic to
// consistently hash keys. That is, this only partitions according to the
// sticky partition strategy.
func StickyPartitioner() Partitioner {
return new(stickyPartitioner)
}
type stickyPartitioner struct{}
func (*stickyPartitioner) ForTopic(string) TopicPartitioner {
p := newStickyTopicPartitioner()
return &p
}
func newStickyTopicPartitioner() stickyTopicPartitioner {
return stickyTopicPartitioner{
lastPart: -1,
onPart: -1,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
type stickyTopicPartitioner struct {
lastPart int
onPart int
rng *rand.Rand
}
func (p *stickyTopicPartitioner) OnNewBatch() { p.lastPart, p.onPart = p.onPart, -1 }
func (*stickyTopicPartitioner) RequiresConsistency(*Record) bool { return false }
func (p *stickyTopicPartitioner) Partition(_ *Record, n int) int {
if p.onPart == -1 || p.onPart >= n {
p.onPart = p.rng.Intn(n)
if p.onPart == p.lastPart {
p.onPart = (p.onPart + 1) % n
}
}
return p.onPart
}
// StickyKeyPartitioner mirrors the default Java partitioner from Kafka's 2.4.0
// release (see KAFKA-8601).
//
// This is the same "hash the key consistently, if no key, choose random
// partition" strategy that the Java partitioner has always used, but rather
// than always choosing a random partition, the partitioner pins a partition to
// produce to until that partition rolls over to a new batch. Only when rolling
// to new batches does this partitioner switch partitions.
//
// The benefit with this pinning is less CPU utilization on Kafka brokers.
// Over time, the random distribution is the same, but the brokers are handling
// on average larger batches.
//
// overrideHasher is optional; if nil, this will return a partitioner that
// partitions exactly how Kafka does. Specifically, the partitioner will use
// murmur2 to hash keys, will mask out the 32nd bit, and then will mod by the
// number of potential partitions.
func StickyKeyPartitioner(overrideHasher PartitionerHasher) Partitioner {
if overrideHasher == nil {
overrideHasher = KafkaHasher(murmur2)
}
return &keyPartitioner{overrideHasher}
}
// PartitionerHasher returns a partition to use given the input data and number
// of partitions.
type PartitionerHasher func([]byte, int) int
// KafkaHasher returns a PartitionerHasher using hashFn that mirrors how
// Kafka partitions after hashing data.
func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
// https://github.com/apache/kafka/blob/d91a94e/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L59
// https://github.com/apache/kafka/blob/d91a94e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L865-L867
// Masking before or after the int conversion makes no difference.
return int(hashFn(key)&0x7fffffff) % n
}
}
// SaramaHasher returns a PartitionerHasher using hashFn that mirrors how
// Sarama partitions after hashing data.
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
p := int(hashFn(key)) % n
if p < 0 {
p = -p
}
return p
}
}
type keyPartitioner struct {
hasher PartitionerHasher
}
func (k *keyPartitioner) ForTopic(string) TopicPartitioner {
return &stickyKeyTopicPartitioner{k.hasher, newStickyTopicPartitioner()}
}
type stickyKeyTopicPartitioner struct {
hasher PartitionerHasher
stickyTopicPartitioner
}
func (*stickyKeyTopicPartitioner) RequiresConsistency(r *Record) bool { return r.Key != nil }
func (p *stickyKeyTopicPartitioner) Partition(r *Record, n int) int {
if r.Key != nil {
return p.hasher(r.Key, n)
}
return p.stickyTopicPartitioner.Partition(r, n)
}
// Straight from the C++ code and from the Java code duplicating it.
// https://github.com/apache/kafka/blob/d91a94e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L383-L421
// https://github.com/aappleby/smhasher/blob/61a0530f/src/MurmurHash2.cpp#L37-L86
//
// The Java code uses ints but with unsigned shifts; we do not need to.
func murmur2(b []byte) uint32 {
const (
seed uint32 = 0x9747b28c
m uint32 = 0x5bd1e995
r = 24
)
h := seed ^ uint32(len(b))
for len(b) >= 4 {
k := uint32(b[3])<<24 + uint32(b[2])<<16 + uint32(b[1])<<8 + uint32(b[0])
b = b[4:]
k *= m
k ^= k >> r
k *= m
h *= m
h ^= k
}
switch len(b) {
case 3:
h ^= uint32(b[2]) << 16
fallthrough
case 2:
h ^= uint32(b[1]) << 8
fallthrough
case 1:
h ^= uint32(b[0])
h *= m
}
h ^= h >> 13
h *= m
h ^= h >> 15
return h
}