/
producer.go
122 lines (112 loc) · 2.37 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package kafka
import (
"library/service"
log "github.com/sirupsen/logrus"
"github.com/Shopify/sarama"
"sync"
)
const (
isClose = 1 << iota
)
type Producer struct {
service.Service
AccessLogProducer sarama.AsyncProducer
enable bool
topic string
filter []string
status int
lock *sync.Mutex
}
var _ service.Service = &Producer{}
func NewProducer() service.Service {
config, _ := getConfig()
if !config.Enable {
return &Producer{
enable:false,
}
}
//brokers := strings.Split(",", config.Brokers)
//[]string{"127.0.0.1:9092"}),
log.Debugf("kafka config: %+v", *config)
return &Producer{
AccessLogProducer:newAccessLogProducer(config.Brokers),
enable:true,
topic:config.Topic,
filter:config.Filter,
status:0,
lock:new(sync.Mutex),
}
}
func (r *Producer) SendAll(table string, data []byte) bool {
if !r.enable {
return false
}
r.lock.Lock()
if r.status & isClose > 0 {
r.lock.Unlock()
return false
}
r.lock.Unlock()
entry := &accessLogEntry{
Data:data,
}
if !service.MatchFilters(r.filter, table) {
log.Debugf("table(%v) does not match filter", table)
return false
}
log.Debugf("##########push to kafka: %v", data)
// We will use the client's IP address as key. This will cause
// all the access log entries of the same IP address to end up
// on the same partition.
r.AccessLogProducer.Input() <- &sarama.ProducerMessage{
Topic: r.topic,
Key: sarama.StringEncoder(table),
Value: entry,
}
return true
}
func (r *Producer) Start() {
if !r.enable {
log.Infof("kafka service is disable")
return
}
go func() {
select {
case e, ok := <- r.AccessLogProducer.Errors():
if !ok {
return
}
if e.Msg != nil {
log.Errorf("kafka error: %+v", *e.Msg)
}
log.Errorf("kafka error: %+v", e.Err)
}
}()
}
func (r *Producer) Close() {
if !r.enable {
return
}
r.lock.Lock()
if r.status & isClose <= 0 {
r.status |= isClose
}
r.lock.Unlock()
if err := r.AccessLogProducer.Close(); err != nil {
log.Println("Failed to shut down access log producer cleanly", err)
}
}
func (r *Producer) Reload() {
log.Debugf("kafka service reload")
config, _ := getConfig()
if r.AccessLogProducer != nil {
r.AccessLogProducer.Close()
}
r.AccessLogProducer = newAccessLogProducer(config.Brokers)
r.enable = true
r.topic = config.Topic
r.filter = config.Filter
}
func (r *Producer) Name() string {
return "kafka"
}