-
-
Notifications
You must be signed in to change notification settings - Fork 157
/
consumer_direct.go
224 lines (197 loc) · 6.03 KB
/
consumer_direct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package kgo
import "regexp"
// DirectConsumeOpt is an option to configure direct topic / partition consuming.
type DirectConsumeOpt interface {
apply(*directConsumer)
}
type directConsumeOpt struct {
fn func(cfg *directConsumer)
}
func (opt directConsumeOpt) apply(cfg *directConsumer) { opt.fn(cfg) }
// ConsumeTopics sets topics to consume directly and the offsets to start
// consuming partitions from in those topics.
//
// If a metadata update sees partitions added to a topic, the client will
// automatically begin consuming from those new partitions.
func ConsumeTopics(offset Offset, topics ...string) DirectConsumeOpt {
return directConsumeOpt{func(cfg *directConsumer) {
cfg.topics = make(map[string]Offset, len(topics))
for _, topic := range topics {
cfg.topics[topic] = offset
}
}}
}
// ConsumePartitions sets partitions to consume from directly and the offsets
// to start consuming those partitions from.
//
// Offsets from option have higher precedence than ConsumeTopics. If a topic's
// partition is set in this option and that topic is also set in ConsumeTopics,
// offsets on partitions in this option are used in favor of the more general
// topic offset from ConsumeTopics.
func ConsumePartitions(partitions map[string]map[int32]Offset) DirectConsumeOpt {
return directConsumeOpt{func(cfg *directConsumer) { cfg.partitions = partitions }}
}
// ConsumeTopicsRegex sets all topics in ConsumeTopics to be parsed as regular
// expressions.
func ConsumeTopicsRegex() DirectConsumeOpt {
return directConsumeOpt{func(cfg *directConsumer) { cfg.regexTopics = true }}
}
type directConsumer struct {
topics map[string]Offset
partitions map[string]map[int32]Offset
regexTopics bool
reTopics map[string]Offset
reIgnore map[string]struct{}
using map[string]map[int32]struct{}
}
// AssignPartitions assigns an exact set of partitions for the client to
// consume from. Any prior direct assignment or group assignment is
// invalidated.
//
// This takes ownership of any assignments.
func (cl *Client) AssignPartitions(opts ...DirectConsumeOpt) {
c := &cl.consumer
c.mu.Lock()
defer c.mu.Unlock()
if c.typ != consumerTypeUnset {
c.unassignPrior()
}
d := &directConsumer{
topics: make(map[string]Offset),
partitions: make(map[string]map[int32]Offset),
reTopics: make(map[string]Offset),
reIgnore: make(map[string]struct{}),
using: make(map[string]map[int32]struct{}),
}
for _, opt := range opts {
opt.apply(d)
}
if len(d.topics) == 0 && len(d.partitions) == 0 || c.dead {
c.typ = consumerTypeUnset
return
}
c.typ = consumerTypeDirect
c.direct = d
defer cl.triggerUpdateMetadata()
if d.regexTopics {
return
}
cl.topicsMu.Lock()
defer cl.topicsMu.Unlock()
clientTopics := cl.cloneTopics()
for topic := range d.topics {
if _, exists := clientTopics[topic]; !exists {
clientTopics[topic] = newTopicPartitions(topic)
}
}
for topic := range d.partitions {
if _, exists := clientTopics[topic]; !exists {
clientTopics[topic] = newTopicPartitions(topic)
}
}
cl.topics.Store(clientTopics)
}
// findNewAssignments returns new partitions to consume at given offsets
// based off the current topics.
func (d *directConsumer) findNewAssignments(
topics map[string]*topicPartitions,
) map[string]map[int32]Offset {
// First, we build everything we could theoretically want to consume.
toUse := make(map[string]map[int32]Offset, 10)
for topic, topicPartitions := range topics {
var useTopic bool
var useOffset Offset
// If we are using regex topics, we have to check all
// topic regexes to see if any match on this topic.
if d.regexTopics {
// If we have already matched this topic prior,
// we do not need to check all regexes.
if offset, exists := d.reTopics[topic]; exists {
useTopic = true
useOffset = offset
} else if _, exists := d.reIgnore[topic]; exists {
// skip
} else {
for reTopic, offset := range d.topics {
if match, _ := regexp.MatchString(reTopic, topic); match {
useTopic = true
useOffset = offset
d.reTopics[topic] = offset
break
}
}
if !useTopic {
d.reIgnore[topic] = struct{}{}
}
}
} else {
// If we are not using regex, we can just lookup.
useOffset, useTopic = d.topics[topic]
}
// If the above detected that we want to keep this topic, we
// set all partitions as usable.
if useTopic {
partitions := topicPartitions.load()
if d.regexTopics && partitions.isInternal {
continue
}
toUseTopic := make(map[int32]Offset, len(partitions.partitions))
for _, partition := range partitions.partitions {
toUseTopic[partition] = useOffset
}
toUse[topic] = toUseTopic
}
// Lastly, if this topic has some specific partitions pinned,
// we set those.
for partition, offset := range d.partitions[topic] {
toUseTopic, exists := toUse[topic]
if !exists {
toUseTopic = make(map[int32]Offset, 10)
toUse[topic] = toUseTopic
}
toUseTopic[partition] = offset
}
}
// With everything we want to consume, remove what we are already.
for topic, partitions := range d.using {
toUseTopic, exists := toUse[topic]
if !exists {
continue // forgotten topic
}
if len(partitions) == len(toUseTopic) {
delete(toUse, topic)
continue
}
for partition := range partitions {
delete(toUseTopic, partition)
}
}
if len(toUse) == 0 {
return nil
}
// Finally, toUse contains new partitions that we must consume.
// Add them to our using map and assign them.
for topic, partitions := range toUse {
topicUsing, exists := d.using[topic]
if !exists {
topicUsing = make(map[int32]struct{})
d.using[topic] = topicUsing
}
for partition := range partitions {
topicUsing[partition] = struct{}{}
}
}
return toUse
}
// deleteUsing is for deleting a specific partition from the consumer; this
// is called sequentially at the end of a metadata update.
func (d *directConsumer) deleteUsing(topic string, partition int32) {
if d.using == nil {
return
}
partitions := d.using[topic]
if partitions == nil {
return
}
delete(partitions, partition)
}