/
zap_writer.go
80 lines (64 loc) · 1.82 KB
/
zap_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Inspired by github.com/ORBAT/krater/unsafe_writer.go
package logger
import (
"errors"
"sync"
"sync/atomic"
"syscall"
)
// ZapKafkaWriter is a zap WriteSyncer (io.Writer) that writes messages to Kafka
type ZapKafkaWriter struct {
kp *KafkaProducer
ce *CloudEvents
closed int32 // Nonzero if closing, must access atomically
pendingWg sync.WaitGroup // WaitGroup for pending messages
closeMut sync.Mutex
}
// newZapKafkaWriter returns a kafka io.writer instance
func newZapKafkaWriter(
kpCfg ProducerConfiguration, cloudEvents *CloudEvents,
ceCfg CloudEventsConfiguration) (*ZapKafkaWriter, error) {
// create an async producer
kp, err := newKafkaProducer(kpCfg, cloudEvents, ceCfg)
if err != nil {
return nil, err
}
zw := &ZapKafkaWriter{
kp: kp,
ce: cloudEvents,
}
return zw, nil
}
// Sync satisfies zapcore.WriteSyncer interface, zapcore.AddSync works as well
func (zw *ZapKafkaWriter) Sync() error {
return nil
}
// Write sends byte slices to Kafka ignoring error responses (Thread-safe)
// Write might block if the Input() channel of the AsyncProducer is full
func (zw *ZapKafkaWriter) Write(msg []byte) (int, error) {
if zw.Closed() {
return 0, syscall.EINVAL
}
if zw.kp.producer == nil {
return 0, errors.New("No producer defined")
}
zw.pendingWg.Add(1)
defer zw.pendingWg.Done()
err := zw.kp.sendMessage(msg)
return len(msg), err
}
// Closed returns true if the writer is closed, false otherwise (Thread-safe)
func (zw *ZapKafkaWriter) Closed() bool {
return atomic.LoadInt32(&zw.closed) != 0
}
// Close must be called when the writer is no longer needed (Thread-safe)
func (zw *ZapKafkaWriter) Close() (err error) {
zw.closeMut.Lock()
defer zw.closeMut.Unlock()
if zw.Closed() {
return syscall.EINVAL
}
atomic.StoreInt32(&zw.closed, 1)
zw.pendingWg.Wait()
return nil
}