/
point.go
142 lines (116 loc) · 3.29 KB
/
point.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package writer
import (
"io"
"sync"
"time"
"github.com/influxdata/influxdb-client-go"
)
const defaultFlushInterval = 1 * time.Second
// MetricsWriteFlush is a type of metrics writer which is
// buffered and metrics can be flushed to
type MetricsWriteFlusher interface {
Write(m ...influxdb.Metric) (int, error)
Available() int
Flush() error
}
// PointWriter delegates calls to Write to an underlying flushing writer
// implementation. It also periodically calls flush on the underlying writer and is safe
// to be called concurrently. As the flushing writer can also flush on calls to Write
// when the number of metrics being written exceeds the buffer capacity, it also ensures
// to reset its timer in this scenario as to avoid calling flush multiple times
type PointWriter struct {
w MetricsWriteFlusher
flushInterval time.Duration
resetTick chan struct{}
stopped chan struct{}
err error
mu sync.Mutex
}
// NewPointWriter configures and returns a *PointWriter writer type
// The new writer will automatically begin scheduling periodic flushes based on the
// provided duration
func NewPointWriter(w MetricsWriteFlusher, flushInterval time.Duration) *PointWriter {
writer := &PointWriter{
w: w,
flushInterval: flushInterval,
// buffer of one in order to not block writes
resetTick: make(chan struct{}, 1),
// stopped is closed once schedule has exited
stopped: make(chan struct{}),
}
go writer.schedule()
return writer
}
func (p *PointWriter) schedule() {
defer close(p.stopped)
ticker := time.NewTicker(p.flushInterval)
for {
select {
case <-ticker.C:
if err := func() error {
p.mu.Lock()
defer p.mu.Unlock()
// return if error is now not nil
if p.err != nil {
return p.err
}
// between the recv on the ticker and the lock obtain
// the reset tick could've been triggered so we check
// and skip the flush if it did
select {
case <-p.resetTick:
return nil
default:
}
p.err = p.w.Flush()
return p.err
}(); err != nil {
return
}
case _, ok := <-p.resetTick:
if !ok {
return
}
ticker.Stop()
ticker = time.NewTicker(p.flushInterval)
}
}
}
// Write delegates to an underlying metrics writer
// If the delegating call is going to cause a flush, it signals
// to the schduled periodic flush to reset its timer
func (p *PointWriter) Write(m ...influxdb.Metric) (int, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.err != nil {
return 0, p.err
}
// check if the underlying flush will flush
if len(m) > p.w.Available() {
// tell the ticker to reset flush interval
select {
case p.resetTick <- struct{}{}:
default:
}
}
var n int
n, p.err = p.w.Write(m...)
return n, p.err
}
// Close signals to stop flushing metrics and causes subsequent
// calls to Write to return a closed pipe error
// Close returns once scheduledge flushing has stopped
// Close does a final flush on return and returns any
// error from the final flush if it occurs
func (p *PointWriter) Close() error {
p.mu.Lock()
// signal close
close(p.resetTick)
// return err io closed pipe for subsequent writes
p.err = io.ErrClosedPipe
// release lock so scheduled may acknowledge and exit
p.mu.Unlock()
// wait until schedule exits
<-p.stopped
return p.w.Flush()
}