-
Notifications
You must be signed in to change notification settings - Fork 1
/
producer.go
109 lines (99 loc) · 2.45 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package main
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
type Producer struct {
producer *kafka.Producer
topic string
quit chan struct{}
consumerThreadQuit chan struct{}
waiter sync.WaitGroup
watcher *StateWatcher
fetcher TokenFetcher
}
func NewProducer(configMap *kafka.ConfigMap, topic string, watcher *StateWatcher, fetcher TokenFetcher) (*Producer, error) {
producer, err := kafka.NewProducer(configMap)
if err != nil {
return nil, err
}
return &Producer{
producer: producer,
topic: topic,
quit: make(chan struct{}),
consumerThreadQuit: make(chan struct{}),
watcher: watcher,
fetcher: fetcher,
}, nil
}
func (p *Producer) Stop() error {
close(p.consumerThreadQuit)
close(p.quit)
p.waiter.Wait()
p.producer.Close()
return nil
}
func (p *Producer) run(period time.Duration) {
ticker := time.NewTicker(period)
sequence := uint64(0)
p.waiter.Add(1)
go p.eventConsumerThread()
go func() {
for {
select {
case <-ticker.C:
_ = p.sendTimestamp(sequence)
sequence++
case <-p.quit:
ticker.Stop()
p.waiter.Done()
return
}
}
}()
}
func (p *Producer) eventConsumerThread() {
p.waiter.Add(1)
for {
select {
case e := <-p.producer.Events():
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Warnf("Failed to deliver message: %v\n", ev.TopicPartition)
}
case kafka.OAuthBearerTokenRefresh:
log.Infof("sending token for producer")
token, err := p.fetcher.Fetch()
if err != nil {
log.Errorf("Failed to fetch token: %v", err)
}
err = p.producer.SetOAuthBearerToken(*token)
if err != nil {
log.Errorf("Failed to set set bearer token: %v", err)
}
default:
log.Infof("Got event from events consumer thread: %s", ev)
}
case <-p.consumerThreadQuit:
p.waiter.Done()
return
}
}
}
func (p *Producer) sendTimestamp(sequence uint64) error {
message, _ := BytesFromTimestamp(time.Now().UnixMilli(), sequence)
p.watcher.sent(sequence)
err := p.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &p.topic, Partition: kafka.PartitionAny},
Key: []byte{},
Value: *message,
}, nil)
_ = p.producer.Flush(int(FlushTimeout.Milliseconds()))
if err != nil {
return err
}
return nil
}