-
Notifications
You must be signed in to change notification settings - Fork 47
/
datapoint_writer.gen.go
323 lines (269 loc) · 10 KB
/
datapoint_writer.gen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
// Code generated by genny. DO NOT EDIT.
// This file was automatically generated by genny.
// Any changes will be lost if this file is regenerated.
// see https://github.com/mauricelam/genny
package writer
import (
"context"
"sync/atomic"
"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/sfxclient"
)
// nolint: dupl
// DatapointPreprocessor is used to filter out or otherwise change datapoints
// before being sent. If the return value is false, the datapoint won't be
// sent.
type DatapointPreprocessor func(*datapoint.Datapoint) bool
// DatapointSender is what sends a slice of datapoints. It should block until
// the datapoints have been sent, or an error has occurred.
type DatapointSender func(context.Context, []*datapoint.Datapoint) error
const (
DefaultDatapointMaxBuffered = 10000
DefaultDatapointMaxRequests = 10
DefaultDatapointMaxBatchSize = 1000
)
// DatapointWriter is an abstraction that accepts a bunch of datapoints, buffers
// them in a circular buffer and sends them out in concurrent batches. This
// prioritizes newer Datapoints at the expense of older ones, which is generally
// desirable from a monitoring standpoint.
//
// You must call the non-blocking method Start on a created datapoint for it to
// do anything.
type DatapointWriter struct {
// This must be provided by the user of this writer.
InputChan chan []*datapoint.Datapoint
// PreprocessFunc can be used for filtering or modifying datapoints before
// being sent. If PreprocessFunc returns false, the datapoint will not be
// sent. PreprocessFunc can be left nil, in which case all datapoints will
// be sent.
PreprocessFunc DatapointPreprocessor
// SendFunc must be provided as the writer is useless without it. SendFunc
// should synchronously process/send the Datapoints passed to it and not
// return until they have been dealt with. The slice passed to SendFunc
// should not be used after the function returns, as its backing array
// might get reused.
SendFunc DatapointSender
// OverwriteFunc can be set to a function that will be called
// whenever an Add call to the underlying ring buffer results in the
// overwriting of an unprocessed datapoint.
OverwriteFunc func()
// The maximum number of Datapoints that this writer will hold before
// overwriting. You must set this before calling Start.
MaxBuffered int
// The maximum number of concurrent calls to sendFunc that can be
// active at a given datapoint. You must set this before calling Start.
MaxRequests int
// The biggest batch of Datapoints the writer will emit to sendFunc at once.
// You must set this before calling Start.
MaxBatchSize int
shutdownFlag chan struct{}
buff *DatapointRingBuffer
requestDoneCh chan int64
// Holds up to MaxRequests slices that can be used to copy in Datapoint:datapoint.Datapoint
// pointers to avoid reusing the backing array of the ring buffer and
// risking overwriting in the middle of sending.
chunkSliceCache chan []*datapoint.Datapoint
_ int32
requestsActive int64
// Datapoints waiting to be sent but are blocked due to MaxRequests limit
totalWaiting int64
// Purely internal metrics. If accessing any of these externally, use
// atomic.LoadInt64!
TotalReceived int64
TotalFilteredOut int64
TotalInFlight int64
TotalSent int64
TotalFailedToSend int64
TotalOverwritten int64
}
// WaitForShutdown will block until all of the elements inserted to the writer
// have been processed.
func (w *DatapointWriter) WaitForShutdown() {
if w.shutdownFlag == nil {
panic("should not wait for writer shutdown when not running")
}
<-w.shutdownFlag
}
// Returns a slice that has size len. This reuses the backing array of the
// slices for all requests, so that only MaxRequests must be allocated for the
// lifetime of the writer. Benchmark shows that this improves performance by
// ~5% and reduces allocations within the writer to almost zero.
func (w *DatapointWriter) getChunkSlice(size int) []*datapoint.Datapoint {
slice := <-w.chunkSliceCache
// Nil out the elements above size in the slice so they will be GCed
// quickly. If you shorten a slice with the s[:n] trick, as below, without
// niling out truncated elements, they won't be cleaned up. If batches are
// roughly the same size this will be minimal.
for i := size; i < len(slice); i++ {
slice[i] = nil
}
return slice[:size]
}
// Try and send the next batch in the buffer, if there are requests available.
func (w *DatapointWriter) tryToSendChunk(ctx context.Context) {
totalUnprocessed := w.buff.UnprocessedCount()
if w.requestsActive >= int64(w.MaxRequests) {
w.totalWaiting = int64(totalUnprocessed)
// The request done handler will notice that there are datapoints
// waiting to be sent and will call this method again.
return
}
chunk := w.buff.NextBatch(w.MaxBatchSize)
count := int64(len(chunk))
if count == 0 {
return
}
atomic.AddInt64(&w.TotalInFlight, count)
w.requestsActive++
chunkCopy := w.getChunkSlice(len(chunk))
// Make a copy of the slice in the buffer so that it is safe against
// being overwritten by wrap around (NextBatch returns a slice against
// the original backing array of the buffer).
copy(chunkCopy, chunk)
// Nil out existing elements in the buffer so they get GCd.
for i := range chunk {
chunk[i] = nil
}
go func() {
err := w.SendFunc(ctx, chunkCopy)
if err != nil {
// Use atomic so that internal metrics method doesn't have to
// run in the same goroutine.
atomic.AddInt64(&w.TotalFailedToSend, count)
} else {
atomic.AddInt64(&w.TotalSent, count)
}
w.chunkSliceCache <- chunkCopy
w.requestDoneCh <- count
}()
w.totalWaiting = int64(w.buff.UnprocessedCount())
}
func (w *DatapointWriter) processInput(ctx context.Context, insts []*datapoint.Datapoint) {
atomic.AddInt64(&w.TotalReceived, int64(len(insts)))
for i := range insts {
if w.PreprocessFunc != nil && !w.PreprocessFunc(insts[i]) {
atomic.AddInt64(&w.TotalFilteredOut, 1)
continue
}
if w.buff.Add(insts[i]) {
atomic.AddInt64(&w.TotalOverwritten, 1)
if w.OverwriteFunc != nil {
w.OverwriteFunc()
}
}
// Handle request done cleanup and try to send chunks if the buffer
// gets full so that we can avoid overflowing the buffer on big input
// slices where len(insts) > w.MaxBuffered.
select {
case count := <-w.requestDoneCh:
w.handleRequestDone(ctx, count)
default:
// If there isn't any request done then continue on
}
if w.buff.UnprocessedCount() >= w.MaxBatchSize {
w.tryToSendChunk(ctx)
}
}
}
// Start the writer processing loop
func (w *DatapointWriter) Start(ctx context.Context) {
// Initialize the writer fields in the same goroutine as the one calling
// start to avoid data races when calling WaitForShutdown.
w.shutdownFlag = make(chan struct{})
if w.MaxBuffered == 0 {
w.MaxBuffered = DefaultDatapointMaxBuffered
}
if w.MaxRequests == 0 {
w.MaxRequests = DefaultDatapointMaxRequests
}
if w.MaxBatchSize == 0 {
w.MaxBatchSize = DefaultDatapointMaxBatchSize
}
w.buff = NewDatapointRingBuffer(w.MaxBuffered)
go func() {
w.run(ctx)
close(w.shutdownFlag)
}()
}
func (w *DatapointWriter) handleRequestDone(ctx context.Context, count int64) {
w.requestsActive--
atomic.AddInt64(&w.TotalInFlight, -count)
if w.totalWaiting > 0 {
w.tryToSendChunk(ctx)
}
}
// run waits for Datapoints to come in on the provided channel and gives them to
// sendFunc in batches. This function blocks until the provided context is
// canceled.
//nolint: dupl
func (w *DatapointWriter) run(ctx context.Context) {
// Make the slice copy cache and prime it with preallocated slices
w.chunkSliceCache = make(chan []*datapoint.Datapoint, w.MaxRequests)
for i := 0; i < w.MaxRequests; i++ {
w.chunkSliceCache <- make([]*datapoint.Datapoint, 0, w.MaxBatchSize)
}
w.requestDoneCh = make(chan int64, w.MaxRequests)
waitForRequests := func() {
for w.requestsActive > 0 {
count := <-w.requestDoneCh
w.handleRequestDone(ctx, count)
}
}
drainInput := func() {
defer waitForRequests()
defer w.tryToSendChunk(ctx)
for {
select {
case insts := <-w.InputChan:
w.processInput(ctx, insts)
default:
return
}
}
}
// The main loop. The basic technique is to pull as many Datapoints from
// the input channel as possible until the channel is exhausted, at which
// point the Datapoints are attempted to be sent. All of the request
// finalization is also handled here so that everything is done within a
// single goroutine and does not require explicit locking.
for {
select {
case <-ctx.Done():
drainInput()
return
case insts := <-w.InputChan:
w.processInput(ctx, insts)
case count := <-w.requestDoneCh:
w.handleRequestDone(ctx, count)
default:
// The input chan is exhaused, try to send whatever was there.
w.tryToSendChunk(ctx)
// Duplicate the cases from above to avoid hot looping and using
// unnecessary CPU.
select {
case <-ctx.Done():
drainInput()
return
case count := <-w.requestDoneCh:
w.handleRequestDone(ctx, count)
case insts := <-w.InputChan:
w.processInput(ctx, insts)
}
}
}
}
// InternalMetrics about the datapoint writer
func (w *DatapointWriter) InternalMetrics(prefix string) []*datapoint.Datapoint {
return []*datapoint.Datapoint{
sfxclient.CumulativeP(prefix+"datapoints_sent", nil, &w.TotalSent),
sfxclient.CumulativeP(prefix+"datapoints_failed", nil, &w.TotalFailedToSend),
sfxclient.CumulativeP(prefix+"datapoints_filtered", nil, &w.TotalFilteredOut),
sfxclient.CumulativeP(prefix+"datapoints_received", nil, &w.TotalReceived),
sfxclient.CumulativeP(prefix+"datapoints_overwritten", nil, &w.TotalOverwritten),
sfxclient.Gauge(prefix+"datapoints_buffered", nil, int64(w.buff.UnprocessedCount())),
sfxclient.Gauge(prefix+"datapoints_max_buffered", nil, int64(w.buff.Size())),
sfxclient.Gauge(prefix+"datapoints_in_flight", nil, atomic.LoadInt64(&w.TotalInFlight)),
sfxclient.Gauge(prefix+"datapoints_waiting", nil, atomic.LoadInt64(&w.totalWaiting)),
sfxclient.Gauge(prefix+"datapoint_requests_active", nil, atomic.LoadInt64(&w.requestsActive)),
}
}