-
Notifications
You must be signed in to change notification settings - Fork 111
/
kafka.go
97 lines (85 loc) · 2.17 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package activity
import (
"context"
"github.com/confluentinc/confluent-kafka-go/kafka"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// KafkaSink sinks events to a Kafka cluster.
type KafkaSink struct {
producer *kafka.Producer
topic string
logger *zap.Logger
logChan chan kafka.LogEvent
}
func NewKafkaSink(brokers, topic string, logger *zap.Logger) (*KafkaSink, error) {
logChan := make(chan kafka.LogEvent, 100)
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"go.logs.channel.enable": true,
"go.logs.channel": logChan,
})
if err != nil {
return nil, err
}
go forwardKafkaLogEventToLogger(logChan, logger)
return &KafkaSink{
producer: producer,
topic: topic,
logger: logger,
logChan: logChan,
}, nil
}
// Sink doesn't wait till all events are delivered to Kafka
func (s *KafkaSink) Sink(_ context.Context, events []Event) error {
for _, event := range events {
message, err := event.Marshal()
if err != nil {
return err
}
err = s.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &s.topic, Partition: kafka.PartitionAny},
Value: message,
}, nil)
if err != nil {
return err
}
}
return nil
}
func (s *KafkaSink) Close() error {
s.producer.Flush(100)
s.producer.Close()
close(s.logChan)
return nil
}
func forwardKafkaLogEventToLogger(logChan chan kafka.LogEvent, logger *zap.Logger) {
for logEvent := range logChan {
zapLevel := kafkaLogLevelToZapLevel(logEvent.Level)
if logger.Core().Enabled(zapLevel) {
fields := []zapcore.Field{
zap.String("kafka.producer.client.name", logEvent.Name),
zap.String("kafka.producer.tag", logEvent.Tag),
}
logger.Log(zapLevel, logEvent.Message, fields...)
}
}
}
// Log syslog level, lower is more critical
// https://en.wikipedia.org/wiki/Syslog#Severity_level
func kafkaLogLevelToZapLevel(level int) zapcore.Level {
switch level {
case 0, 1, 2:
return zap.FatalLevel
case 3:
return zap.ErrorLevel
case 4:
return zap.WarnLevel
case 5, 6:
return zap.InfoLevel
case 7:
return zap.DebugLevel
default:
return zap.DebugLevel // Default to debug for unrecognized levels
}
}