-
Notifications
You must be signed in to change notification settings - Fork 48
/
span_buffer.gen.go
120 lines (99 loc) · 3.51 KB
/
span_buffer.gen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Code generated by genny. DO NOT EDIT.
// This file was automatically generated by genny.
// Any changes will be lost if this file is regenerated.
// see https://github.com/mauricelam/genny
package writer
import "github.com/signalfx/golib/v3/trace"
// nolint: dupl
// SpanRingBuffer is a ring buffer that supports inserting and reading
// chunks of elements in an orderly fashion. It is NOT thread-safe and the
// returned batches are not copied, they are a slice against the original
// backing array of this span. This means that if the buffer wraps around,
// elements in the slice returned by NextBatch will be changed, and you are
// subject to all of the rules of Go's memory model if accessing the data in a
// separate goroutine.
type SpanRingBuffer struct {
// The main buffer
buffer []*trace.Span
// Store length explicitly as optimization
bufferLen int
nextIdx int
// How many times around the ring buffer we have gone when putting
// datapoints onto the buffer
writtenCircuits int64
// The index that indicates the last read position in the buffer. It is
// one greater than the actual index, to match the golang slice high range.
readHigh int
// How many elements are in the buffer on which processing has not begun.
// This could be calculated from readHigh and nextIdx on demand, but
// precalculate it in Add and NextBatch since it tends to get read often.
// Also by precalculating it, we can tell if the buffer was completely
// overwritten since the last read.
unprocessed int
}
// NewSpanRingBuffer creates a new initialized buffer ready for use.
func NewSpanRingBuffer(size int) *SpanRingBuffer {
return &SpanRingBuffer{
// Preallocate the buffer to its maximum length
buffer: make([]*trace.Span, size),
bufferLen: size,
}
}
// Add an Span:trace.Span to the buffer. It will overwrite any existing element in the
// buffer as the buffer wraps around. Returns whether the new element
// overwrites an uncommitted element already in the buffer.
func (b *SpanRingBuffer) Add(inst *trace.Span) (isOverwrite bool) {
if b.unprocessed >= b.bufferLen {
isOverwrite = true
// Drag the read cursor along with the overwritten elements
b.readHigh++
if b.readHigh > b.bufferLen {
// Wrap around to cover the 0th element of the buffer
b.readHigh = 1
}
} else {
b.unprocessed++
}
b.buffer[b.nextIdx] = inst
b.nextIdx++
if b.nextIdx == b.bufferLen { // Wrap around the buffer
b.nextIdx = 0
b.writtenCircuits++
}
return isOverwrite
}
// Size returns how many elements can fit in the buffer at once.
func (b *SpanRingBuffer) Size() int {
return b.bufferLen
}
// UnprocessedCount returns the number of elements that have been written to
// the buffer but not read via NextBatch.
func (b *SpanRingBuffer) UnprocessedCount() int {
return b.unprocessed
}
// NextBatch returns the next batch of unprocessed elements. If there are
// none, this can return nil.
func (b *SpanRingBuffer) NextBatch(maxSize int) []*trace.Span {
prevReadHigh := b.readHigh
if prevReadHigh == b.bufferLen {
// Wrap around
prevReadHigh = 0
}
if b.unprocessed == 0 {
return nil
}
targetSize := b.unprocessed
if targetSize > maxSize {
targetSize = maxSize
}
b.readHigh = prevReadHigh + targetSize
if b.readHigh > b.bufferLen {
// Wrap around happened, just take what we have left until wrap around
// so that we can take a single slice of it since slice ranges can't
// wrap around.
b.readHigh = b.bufferLen
}
b.unprocessed -= b.readHigh - prevReadHigh
out := b.buffer[prevReadHigh:b.readHigh]
return out
}