-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka.go
39 lines (35 loc) · 934 Bytes
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package ioc
import (
"github.com/IBM/sarama"
"github.com/spf13/viper"
"github.com/xiaoshanjiang/my-geektime/webook/internal/events"
"github.com/xiaoshanjiang/my-geektime/webook/internal/events/article"
)
func InitKafka() sarama.Client {
type Config struct {
Addrs []string `yaml:"addrs"`
}
saramaCfg := sarama.NewConfig()
saramaCfg.Producer.Return.Successes = true
var cfg Config
err := viper.UnmarshalKey("kafka", &cfg)
if err != nil {
panic(err)
}
client, err := sarama.NewClient(cfg.Addrs, saramaCfg)
if err != nil {
panic(err)
}
return client
}
func NewSyncProducer(client sarama.Client) sarama.SyncProducer {
res, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
panic(err)
}
return res
}
// NewConsumers 面临的问题依旧是所有的 Consumer 在这里注册一下
func NewConsumers(c1 *article.InteractiveReadEventConsumer) []events.Consumer {
return []events.Consumer{c1}
}