-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
114 lines (93 loc) · 2.27 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package kafka
import (
"time"
)
type topicPartitionOffsetMap map[string]map[Partition]Offset
// These are slightly different than the java api. They encompass multiple topics
type KafkaStream struct {
offsets topicPartitionOffsetMap
c *SimpleConsumer
Ch FetchResponseChan
}
func NewKafkaStreamWithOffsets(c *SimpleConsumer, targets []TopicPartition, startTime OffsetTime) (s *KafkaStream, err error) {
newT := make([]TopicPartitionOffset, len(targets))
offReq := OffsetsRequest{
Time: startTime,
MaxNumber: 1,
}
resChans := make([]OffsetsResponseChan, len(targets))
// Let's pipeline these requests
for i, t := range targets {
offReq.TopicPartition = t
if resChans[i], err = c.Offsets(offReq); err != nil {
return nil, err
}
}
// Now read the offsets
for i, offC := range resChans {
offRes := <-offC
if offRes.Err != nil {
return nil, offRes.Err
}
newT[i] = offRes.Offsets[0]
}
return NewKafkaStream(c, newT)
}
func (s *KafkaStream) partCount() (i int) {
for _, pm := range s.offsets {
i += len(pm)
}
return
}
const pollTime time.Duration = 50 * time.Millisecond
func (s *KafkaStream) poll() (err error) {
a := time.After(pollTime)
mfr := make(MultiFetchRequest, 0, s.partCount())
fr := FetchRequest{}
fr.MaxSize = 1024 * 1024
var pm map[Partition]Offset
for fr.Topic, pm = range s.offsets {
for fr.Partition, fr.Offset = range pm {
mfr = append(mfr, fr)
}
}
resChan, err := s.c.MultiFetch(mfr)
if err != nil {
return err
}
for res := range resChan {
s.Ch <- res
if res.Err != nil {
close(s.Ch)
return res.Err
}
s.updatePartitionMap(res.TopicPartitionOffset)
}
<-a
return
}
func (s *KafkaStream) pollLoop() (err error) {
for ; err == nil; err = s.poll() {
}
return
}
func (s *KafkaStream) updatePartitionMap(offsets ...TopicPartitionOffset) {
for _, o := range offsets {
po := s.offsets[o.Topic]
if po == nil {
po = make(map[Partition]Offset)
s.offsets[o.Topic] = po
}
po[o.Partition] = o.Offset
}
}
func NewKafkaStream(c *SimpleConsumer, targets []TopicPartitionOffset) (s *KafkaStream, err error) {
s = &KafkaStream{
c: c,
offsets: make(topicPartitionOffsetMap),
Ch: make(FetchResponseChan),
}
s.updatePartitionMap(targets...)
go s.pollLoop()
return s, err
}