/
buffered_writer.go
50 lines (43 loc) · 958 Bytes
/
buffered_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package out
import (
"gopkg.in/raintank/schema.v1"
"time"
)
type funFlush func([]*schema.MetricData)
type BufferedWriter struct {
FlushInterval int
MetricsPerFlush int
FlushCB funFlush
WriteBufferSize int
in chan *schema.MetricData
buffer []*schema.MetricData
bufferPos int
}
func (b *BufferedWriter) GetChan() chan *schema.MetricData {
if b.in == nil {
b.in = make(chan *schema.MetricData, b.WriteBufferSize)
}
return b.in
}
func (b *BufferedWriter) Loop() {
var t = time.NewTicker(time.Duration(b.FlushInterval) * time.Millisecond)
b.bufferPos = 0
b.buffer = make([]*schema.MetricData, b.MetricsPerFlush)
for {
select {
case <-t.C:
b.flush()
case metric := <-b.in:
if b.bufferPos >= b.MetricsPerFlush {
<-t.C
b.flush()
}
b.buffer[b.bufferPos] = metric
b.bufferPos++
}
}
}
func (b *BufferedWriter) flush() {
b.FlushCB(b.buffer[:b.bufferPos])
b.bufferPos = 0
}