-
Notifications
You must be signed in to change notification settings - Fork 0
/
batching.go
112 lines (87 loc) · 2.51 KB
/
batching.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package batching
import (
"time"
"github.com/sirupsen/logrus"
syslog "github.com/wolfeidau/go-syslog"
)
// DispatchFunc invoked when a batch is ready to send
type DispatchFunc func([]*LogEntry)
// LogEntry decoded log entry
type LogEntry struct {
Message string
Parts map[string]interface{}
MilliTimestamp int64
}
// Batcher builds lists of records for dispatch
type Batcher struct {
dispatchFunc DispatchFunc
records []*LogEntry
flushTimer *time.Timer
size int
capacity int
duration time.Duration
}
// NewBatcher configure a new batcher and it's dipsatch function
func NewBatcher(capacity int, duration time.Duration, dispatchFunc DispatchFunc) *Batcher {
return &Batcher{
dispatchFunc: dispatchFunc,
records: []*LogEntry{},
capacity: capacity,
duration: duration,
}
}
// Handler handle incoming log messages and write batches to the dispatcher function
func (b *Batcher) Handler(channel syslog.LogPartsChannel) {
logrus.Info("handle ready")
b.flushTimer = time.NewTimer(b.duration)
for {
select {
case logParts := <-channel:
content, ok := logParts["content"].(string)
if !ok {
logrus.WithField("content", logParts["content"]).Warn("missing field in logParts")
}
logrus.WithField("logParts", logParts).Debug("received message")
entry := &LogEntry{
Message: content,
Parts: logParts,
MilliTimestamp: makeMilliTimestamp(logParts["timestamp"].(time.Time)),
}
if b.willOverflow(len(entry.Message)) {
logrus.Debugf("Batch flushed to prevent size overflow - size: %d, capacity: %v", b.size, b.capacity)
b.flush()
}
b.records = append(b.records, entry)
b.size += len(entry.Message)
if b.isFullSize() {
logrus.Debugf("Batch flushed due to batch size - size: %d, capacity: %v", b.size, b.capacity)
b.flush()
}
case <-b.flushTimer.C:
// logrus.Debugf("Batch flushed due to timer - length: %v", len(b.records))
b.flush()
}
}
}
// Length returns the current length of the buffer.
func (b *Batcher) Length() int {
return len(b.records)
}
func (b *Batcher) willOverflow(size int) bool {
return b.size+size > b.capacity
}
func (b *Batcher) isFullSize() bool {
return b.size >= b.capacity
}
func (b *Batcher) flush() {
records := b.records
b.flushTimer = time.NewTimer(b.duration)
b.records = []*LogEntry{}
b.size = 0
if len(records) != 0 {
b.dispatchFunc(records)
}
}
func makeMilliTimestamp(input time.Time) int64 {
return input.UTC().UnixNano() / int64(time.Millisecond)
}