-
Notifications
You must be signed in to change notification settings - Fork 8
/
main.go
68 lines (56 loc) · 1.38 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main
import (
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"io/ioutil"
"log"
)
func main() {
example()
}
func example() {
conf := readConf()
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": conf.Servers})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "myTopic"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
}
func readConf() *Config {
data, err := ioutil.ReadFile("../config.json")
if err != nil {
log.Fatalf("ioutil.ReadFile error:%v", err)
}
var config = &Config{}
if err := json.Unmarshal(data, config); err != nil {
log.Fatalf("json.Unmarshal error:%v", err)
}
return config
}
type Config struct {
Servers string `json:"servers"`
}