-
Notifications
You must be signed in to change notification settings - Fork 270
/
async_logger.go
99 lines (82 loc) · 2.45 KB
/
async_logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package logevents
import (
"bytes"
"encoding/json"
"fmt"
"github.com/jitsucom/jitsu/server/logging"
"github.com/jitsucom/jitsu/server/queue"
"github.com/jitsucom/jitsu/server/safego"
"go.uber.org/atomic"
"io"
)
//AsyncLogger write json logs to file system in different goroutine
type AsyncLogger struct {
writer io.WriteCloser
queue queue.Queue
showInGlobalLogger bool
closed *atomic.Bool
}
//NewAsyncLogger creates AsyncLogger and run goroutine that's read from channel and write to file
func NewAsyncLogger(writer io.WriteCloser, showInGlobalLogger bool, poolSize int) *AsyncLogger {
logger := &AsyncLogger{
writer: writer,
queue: queue.NewInMemory(100_000),
showInGlobalLogger: showInGlobalLogger,
closed: atomic.NewBool(false),
}
for i := 0; i < poolSize; i++ {
safego.RunWithRestart(logger.startObserver)
}
return logger
}
func (al *AsyncLogger) startObserver() {
for {
if al.closed.Load() {
break
}
event, err := al.queue.Pop()
if err != nil {
logging.Errorf("Error reading event from queue in async logger: %v", err)
continue
}
al.write(event)
}
}
func (al *AsyncLogger) write(event interface{}) {
bts, err := json.Marshal(event)
if err != nil {
logging.Errorf("Error marshaling event to json in async logger: %v", err)
return
}
if al.showInGlobalLogger {
prettyJSONBytes, _ := json.MarshalIndent(&event, " ", " ")
logging.Info(string(prettyJSONBytes))
}
buf := bytes.NewBuffer(bts)
buf.Write([]byte("\n"))
if _, err := al.writer.Write(buf.Bytes()); err != nil {
logging.Errorf("Error writing event to log file: %v", err)
}
}
//Consume gets event and puts it to channel
func (al *AsyncLogger) Consume(event map[string]interface{}, tokenID string) {
if err := al.queue.Push(event); err != nil {
b, _ := json.Marshal(event)
logging.SystemErrorf("error pushing event [%s] into the queue in async logger.Consume: %v", string(b), err)
}
}
//ConsumeAny put interface{} to the channel
func (al *AsyncLogger) ConsumeAny(object interface{}) {
if err := al.queue.Push(object); err != nil {
b, _ := json.Marshal(object)
logging.SystemErrorf("error pushing event [%s] into the queue in async logger.ConsumeAny: %v", string(b), err)
}
}
//Close underlying log file writer
func (al *AsyncLogger) Close() (resultErr error) {
al.closed.Store(true)
if err := al.writer.Close(); err != nil {
return fmt.Errorf("Error closing writer: %v", err)
}
return nil
}