-
Notifications
You must be signed in to change notification settings - Fork 1
/
producer.go
115 lines (92 loc) · 2.34 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package kafka
import (
"context"
"fmt"
"os"
"strconv"
"sync"
"time"
kafka "github.com/segmentio/kafka-go"
)
func getBatchTimeout() time.Duration {
amount := 2000
if amountStr, present := os.LookupEnv("KAFKA_BATCH_TIMEOUT"); present {
amount, _ = strconv.Atoi(amountStr)
}
return time.Duration(amount) * time.Millisecond
}
func getBatchSize() int {
size := 100
if sizeStr, present := os.LookupEnv("KAFKA_BATCH_SIZE"); present {
size, _ = strconv.Atoi(sizeStr)
}
return size
}
func GetWriterTopic() string {
if topic, present := os.LookupEnv("KAFKA_WRITER_TOPIC"); present {
return topic
}
return ""
}
func MustProduce() bool {
return GetWriterTopic() != ""
}
func getKafkaWriter(kafkaURL string) *kafka.Writer {
if topic := GetWriterTopic(); topic != "" {
return &kafka.Writer{
Logger: kl,
Addr: kafka.TCP(kafkaURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
BatchSize: getBatchSize(),
}
}
return nil
}
func closeWriter(writer *kafka.Writer) {
kl.Printf("closing writer")
if err := writer.Close(); err != nil {
kl.Fatal("failed to close writer:", fields{"err": err})
}
}
func NewProducer(done context.Context, cancelSend context.CancelFunc, kafkaURL string, in <-chan kafka.Message, wg *sync.WaitGroup) {
writer := getKafkaWriter(kafkaURL)
if writer == nil {
wg.Done()
return
}
kl.Printf("Producer setup (topic: %v)", writer.Topic)
go func() {
batchCtx, batchDone := context.WithCancel(done)
defer func() {
kl.Printf("closing producer...")
batchDone() // Indicates batch to stop sending
cancelSend() // Indicates upstream to stop sending
closeWriter(writer)
kl.Printf("producer closed, signaling work group...")
wg.Done()
}()
batches := BatchMessages(batchCtx, in, getBatchSize(), getBatchTimeout())
for messages := range batches {
if err := writeMessage(done, writer, messages); err != nil {
return
}
}
}()
}
func writeMessage(ctx context.Context, writer *kafka.Writer, messages []kafka.Message) error {
err := writer.WriteMessages(
ctx,
messages...,
)
if err != nil {
kl.Error("Producer - Failed to write messages", fields{"err": err})
return err
}
kl.Info("Producer - wrote messages to kafka", fields{
"topic": writer.Topic,
"count": len(messages),
"messages": fmt.Sprintf("%+v", MessagesSimplified(messages)),
})
return nil
}