-
Notifications
You must be signed in to change notification settings - Fork 0
/
consume.go
100 lines (87 loc) · 2.16 KB
/
consume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package kafka
import (
"encoding/json"
"io"
"time"
"github.com/Shopify/sarama"
"github.com/garyburd/redigo/redis"
"github.com/lovego/fs"
"github.com/lovego/logger"
)
type Consume struct {
Client sarama.Client
consumer sarama.Consumer
Topic string
Group string
Handler func(*sarama.ConsumerMessage, map[string]interface{}) error
RedisPool *redis.Pool
OffsetsKey string
LogPath string
Logger *logger.Logger
logWriter io.Writer
}
func (c *Consume) Start() {
if consumer, err := sarama.NewConsumerFromClient(c.Client); err == nil {
c.consumer = consumer
} else {
c.Logger.Panic(err)
}
if logfile, err := fs.NewLogFile(c.LogPath); err == nil {
c.logWriter = logfile
} else {
c.Logger.Panic(err)
}
if c.OffsetsKey == `` {
c.OffsetsKey = `kafka-offsets-` + c.Topic + `-` + c.Group
}
partitions, err := c.Client.Partitions(c.Topic)
if err != nil {
c.Logger.Panic(err)
}
for _, n := range partitions {
go c.startPartition(n)
}
select {}
}
func (c *Consume) startPartition(n int32) {
offset := c.getPartitionOffset(n)
pc, err := c.consumer.ConsumePartition(c.Topic, n, offset)
if err != nil {
c.Logger.Panic(err)
}
defer pc.Close()
for message := range pc.Messages() {
c.process(pc, n, message)
}
}
func (c *Consume) process(pc sarama.PartitionConsumer, n int32, message *sarama.ConsumerMessage) {
logMap := map[string]interface{}{
`partition`: n,
`at`: time.Now().Format(time.RFC3339),
`now`: message.Offset,
`max`: pc.HighWaterMarkOffset(),
`bytes`: len(message.Value),
}
c.callHandler(message, logMap)
c.setPartitionOffset(n, message.Offset)
c.writeLog(logMap)
}
func (c *Consume) callHandler(message *sarama.ConsumerMessage, logMap map[string]interface{}) {
defer c.Logger.Recover()
if err := c.Handler(message, logMap); err != nil {
c.Logger.Printf("consume handler error: %v", err)
}
}
func (c *Consume) writeLog(m map[string]interface{}) {
buf, err := json.Marshal(m)
if err != nil {
c.Logger.Printf("marshal log err: %v", err)
return
}
buf = append(buf, '\n')
_, err = c.logWriter.Write(buf)
if err != nil {
c.Logger.Printf("write log err: %v", err)
return
}
}