/
producer.go
141 lines (127 loc) · 3.77 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package kafka
import (
"context"
"errors"
"fmt"
"hash"
"strings"
"sync"
"github.com/Shopify/sarama"
"github.com/aviddiviner/go-murmur"
"github.com/davecgh/go-spew/spew"
messaging "github.com/veritone/go-messaging-lib"
)
// Strategy is a type of routing rule
type Strategy string
type producer struct {
sarama.Client
asyncProducer sarama.AsyncProducer
config *sarama.Config
topic string
*sync.Mutex
}
const (
// StrategyRoundRobin distributes writes evenly
StrategyRoundRobin Strategy = "RoundRobin"
// StrategyLeastBytes distributes writes to nodes with least amount of traffic
StrategyLeastBytes Strategy = "LeastBytes"
// StrategyHash distributes writes based on 32-bit FNV-1 Hash function. This
// guarantees messages with the same key are routed to the same host
StrategyHash Strategy = "Hash"
// Uses the same strategy for assigning partitions as the java client
//https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
StrategyHashMurmur2 Strategy = "HashMurmur2"
)
// Producer initializes a default producer client for publishing messages
func Producer(topic string, strategy Strategy, brokers ...string) (messaging.Producer, error) {
config, _ := GetDefaultConfig()
var balancer sarama.PartitionerConstructor
switch strategy {
case StrategyRoundRobin:
balancer = sarama.NewRoundRobinPartitioner
case StrategyLeastBytes:
return nil, errors.New("balancer is not available")
case StrategyHashMurmur2:
//https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L246
seed := uint32(0x9747b28c)
balancer = sarama.NewCustomHashPartitioner(func() hash.Hash32 { return murmur.New32(seed) })
default:
balancer = sarama.NewHashPartitioner
}
config.Producer.Partitioner = balancer
client, err := sarama.NewClient(brokers, config)
if err != nil {
return nil, err
}
asyncProducer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
return &producer{
Client: client,
asyncProducer: asyncProducer,
config: config,
Mutex: new(sync.Mutex),
topic: topic}, nil
}
//NewProducer initializes a new client for publishing messages
func NewProducer(topic string, config *sarama.Config, brokers ...string) (messaging.Producer, error) {
client, err := sarama.NewClient(brokers, config)
if err != nil {
return nil, err
}
asyncProducer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
return &producer{
Client: client,
asyncProducer: asyncProducer,
config: config,
Mutex: new(sync.Mutex),
topic: topic}, nil
}
func (p *producer) Produce(_ context.Context, msg messaging.Messager, _ ...messaging.Event) error {
kafkaMsg, ok := msg.Message().(*Message)
if !ok {
return fmt.Errorf("unsupported Kafka message: %s", spew.Sprint(msg))
}
var err error
saramaMsg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.ByteEncoder(kafkaMsg.Key),
Value: sarama.ByteEncoder(kafkaMsg.Value),
}
p.asyncProducer.Input() <- saramaMsg
select {
case <-p.asyncProducer.Successes():
break
case err = <-p.asyncProducer.Errors():
break
}
kafkaMsg.Partition = saramaMsg.Partition
kafkaMsg.Offset = saramaMsg.Offset
kafkaMsg.Topic = saramaMsg.Topic
return err
}
func (p *producer) Close() error {
p.Lock()
defer p.Unlock()
if p.Closed() {
return nil
}
var errorStrs []string
if err := p.asyncProducer.Close(); err != nil {
errorStrs = append(errorStrs, err.Error())
}
if err := p.Client.Close(); err != nil {
errorStrs = append(errorStrs, err.Error())
}
if len(errorStrs) > 0 {
return fmt.Errorf(
"(%d) errors while producing: %s",
len(errorStrs),
strings.Join(errorStrs, "\n"))
}
return nil
}