-
Notifications
You must be signed in to change notification settings - Fork 18
/
basicsink.go
124 lines (111 loc) · 2.97 KB
/
basicsink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package dptest
import (
"sync"
"context"
"github.com/signalfx/golib/datapoint"
"github.com/signalfx/golib/event"
"github.com/signalfx/golib/trace"
)
// BasicSink is a pure testing sink that blocks forwarded points onto a channel
type BasicSink struct {
RetErr error
PointsChan chan []*datapoint.Datapoint
EventsChan chan []*event.Event
TracesChan chan []*trace.Span
mu sync.Mutex
}
// Next returns a single datapoint from the top of PointsChan and panics if the top doesn't contain
// only one point
func (f *BasicSink) Next() *datapoint.Datapoint {
r := <-f.PointsChan
if len(r) != 1 {
panic("Expect a single point")
}
return r[0]
}
// NextEvent returns a single event from the top of EventsChan and panics if the top doesn't contain
// only one event
func (f *BasicSink) NextEvent() *event.Event {
r := <-f.EventsChan
if len(r) != 1 {
panic("Expect a single event")
}
return r[0]
}
// NextSpan returns a single span from the top of TracesChan and panics if the top doesn't contain
// only one span
func (f *BasicSink) NextSpan() *trace.Span {
r := <-f.TracesChan
if len(r) != 1 {
panic("Expect a single span")
}
return r[0]
}
// AddDatapoints buffers the point on an internal chan or returns errors if RetErr is set
func (f *BasicSink) AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.RetErr != nil {
return f.RetErr
}
select {
case f.PointsChan <- points:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// AddEvents buffers the event on an internal chan or returns errors if RetErr is set
func (f *BasicSink) AddEvents(ctx context.Context, points []*event.Event) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.RetErr != nil {
return f.RetErr
}
select {
case f.EventsChan <- points:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// AddSpans buffers the trace on an internal chan or returns errors if RetErr is set
func (f *BasicSink) AddSpans(ctx context.Context, points []*trace.Span) error {
f.mu.Lock()
defer f.mu.Unlock()
if f.RetErr != nil {
return f.RetErr
}
select {
case f.TracesChan <- points:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// RetError sets an error that is returned on AddDatapoints calls
func (f *BasicSink) RetError(err error) {
f.mu.Lock()
defer f.mu.Unlock()
f.RetErr = err
}
// Resize the internal chan of points sent here
func (f *BasicSink) Resize(size int) {
f.mu.Lock()
defer f.mu.Unlock()
if len(f.PointsChan) != 0 {
panic("can only resize when empty")
}
f.PointsChan = make(chan []*datapoint.Datapoint, size)
f.EventsChan = make(chan []*event.Event, size)
f.TracesChan = make(chan []*trace.Span, size)
}
// NewBasicSink creates a BasicSink with an unbuffered chan. Note, calls to AddDatapoints will then
// block until you drain the PointsChan.
func NewBasicSink() *BasicSink {
return &BasicSink{
PointsChan: make(chan []*datapoint.Datapoint),
EventsChan: make(chan []*event.Event),
TracesChan: make(chan []*trace.Span),
}
}