/
sink.go
115 lines (97 loc) · 3.33 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package dpsink
import (
"context"
"github.com/signalfx/golib/v3/datapoint"
"github.com/signalfx/golib/v3/event"
)
// A DSink is an object that can accept datapoints and do something with them, like forward them
// to some endpoint
type DSink interface {
AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) error
}
// A ESink is an object that can accept events and do something with them, like forward them
// to some endpoint
type ESink interface {
AddEvents(ctx context.Context, events []*event.Event) error
}
// A Sink is an object that can accept datapoints or events and do something with them, like forward them
// to some endpoint
type Sink interface {
DSink
ESink
}
// NextSink is a special case of a sink that forwards to another sink
type NextSink interface {
AddDatapoints(ctx context.Context, points []*datapoint.Datapoint, next Sink) error
AddEvents(ctx context.Context, events []*event.Event, next Sink) error
}
// A MiddlewareConstructor is used by FromChain to chain together a bunch of sinks that forward
// to each other
type MiddlewareConstructor func(sendTo Sink) Sink
// FromChain creates an endpoint Sink that sends calls between multiple middlewares for things like
// counting points in between.
func FromChain(endSink Sink, sinks ...MiddlewareConstructor) Sink {
for i := len(sinks) - 1; i >= 0; i-- {
endSink = sinks[i](endSink)
}
return endSink
}
type nextWrapped struct {
forwardTo Sink
wrapping NextSink
}
func (n *nextWrapped) AddDatapoints(ctx context.Context, points []*datapoint.Datapoint) error {
return n.wrapping.AddDatapoints(ctx, points, n.forwardTo)
}
func (n *nextWrapped) AddEvents(ctx context.Context, events []*event.Event) error {
return n.wrapping.AddEvents(ctx, events, n.forwardTo)
}
// IncludingDimensions returns a sink that wraps another sink adding dims to each datapoint and
// event
func IncludingDimensions(dims map[string]string, sink Sink) Sink {
if len(dims) == 0 {
return sink
}
return NextWrap(&WithDimensions{
Dimensions: dims,
})(sink)
}
// NextWrap wraps a NextSink to make it usable by MiddlewareConstructor
func NextWrap(wrapping NextSink) MiddlewareConstructor {
return func(sendTo Sink) Sink {
return &nextWrapped{
forwardTo: sendTo,
wrapping: wrapping,
}
}
}
// WithDimensions adds dimensions on top of the datapoints of a collector
type WithDimensions struct {
Dimensions map[string]string
}
func (w *WithDimensions) appendDimensions(dps []*datapoint.Datapoint) []*datapoint.Datapoint {
if len(w.Dimensions) == 0 {
return dps
}
for _, dp := range dps {
dp.Dimensions = datapoint.AddMaps(dp.Dimensions, w.Dimensions)
}
return dps
}
func (w *WithDimensions) appendDimensionsEvents(events []*event.Event) []*event.Event {
if len(w.Dimensions) == 0 {
return events
}
for _, e := range events {
e.Dimensions = datapoint.AddMaps(e.Dimensions, w.Dimensions)
}
return events
}
// AddDatapoints calls next() including the wrapped dimensions on each point
func (w *WithDimensions) AddDatapoints(ctx context.Context, points []*datapoint.Datapoint, next Sink) error {
return next.AddDatapoints(ctx, w.appendDimensions(points))
}
// AddEvents calls next() including the wrapped dimensions on each point
func (w *WithDimensions) AddEvents(ctx context.Context, events []*event.Event, next Sink) error {
return next.AddEvents(ctx, w.appendDimensionsEvents(events))
}