forked from go-kit/kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
emitter.go
159 lines (142 loc) · 4.27 KB
/
emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package statsd
import (
"bytes"
"fmt"
"net"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/util/conn"
)
// Emitter is a struct to manage connections and orchestrate the emission of
// metrics to a Statsd process.
type Emitter struct {
prefix string
keyVals chan keyVal
mgr *conn.Manager
logger log.Logger
quitc chan chan struct{}
}
type keyVal struct {
key string
val string
}
func stringToKeyVal(key string, keyVals chan keyVal) chan string {
vals := make(chan string)
go func() {
for val := range vals {
keyVals <- keyVal{key: key, val: val}
}
}()
return vals
}
// NewEmitter will return an Emitter that will prefix all metrics names with the
// given prefix. Once started, it will attempt to create a connection with the
// given network and address via `net.Dial` and periodically post metrics to the
// connection in the statsd protocol.
func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
}
// NewEmitterDial is the same as NewEmitter, but allows you to specify your own
// Dialer function. This is primarily useful for tests.
func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
e := &Emitter{
prefix: metricsPrefix,
mgr: conn.NewManager(dialer, network, address, time.After, logger),
logger: logger,
keyVals: make(chan keyVal),
quitc: make(chan chan struct{}),
}
go e.loop(flushInterval)
return e
}
// NewCounter returns a Counter that emits observations in the statsd protocol
// via the Emitter's connection manager. Observations are buffered for the
// report interval or until the buffer exceeds a max packet size, whichever
// comes first. Fields are ignored.
func (e *Emitter) NewCounter(key string) metrics.Counter {
key = e.prefix + key
return &counter{
key: key,
c: stringToKeyVal(key, e.keyVals),
}
}
// NewHistogram returns a Histogram that emits observations in the statsd
// protocol via the Emitter's connection manager. Observations are buffered for
// the reporting interval or until the buffer exceeds a max packet size,
// whichever comes first. Fields are ignored.
//
// NewHistogram is mapped to a statsd Timing, so observations should represent
// milliseconds. If you observe in units of nanoseconds, you can make the
// translation with a ScaledHistogram:
//
// NewScaledHistogram(histogram, time.Millisecond)
//
// You can also enforce the constraint in a typesafe way with a millisecond
// TimeHistogram:
//
// NewTimeHistogram(histogram, time.Millisecond)
//
// TODO: support for sampling.
func (e *Emitter) NewHistogram(key string) metrics.Histogram {
key = e.prefix + key
return &histogram{
key: key,
h: stringToKeyVal(key, e.keyVals),
}
}
// NewGauge returns a Gauge that emits values in the statsd protocol via the
// the Emitter's connection manager. Values are buffered for the report
// interval or until the buffer exceeds a max packet size, whichever comes
// first. Fields are ignored.
//
// TODO: support for sampling
func (e *Emitter) NewGauge(key string) metrics.Gauge {
key = e.prefix + key
return &gauge{
key: key,
g: stringToKeyVal(key, e.keyVals),
}
}
func (e *Emitter) loop(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
buf := &bytes.Buffer{}
for {
select {
case kv := <-e.keyVals:
fmt.Fprintf(buf, "%s:%s\n", kv.key, kv.val)
if buf.Len() > maxBufferSize {
e.Flush(buf)
}
case <-ticker.C:
e.Flush(buf)
case q := <-e.quitc:
e.Flush(buf)
close(q)
return
}
}
}
// Stop will flush the current metrics and close the active connection. Calling
// stop more than once is a programmer error.
func (e *Emitter) Stop() {
q := make(chan struct{})
e.quitc <- q
<-q
}
// Flush will write the given buffer to a connection provided by the Emitter's
// connection manager.
func (e *Emitter) Flush(buf *bytes.Buffer) {
conn := e.mgr.Take()
if conn == nil {
e.logger.Log("during", "flush", "err", "connection unavailable")
return
}
_, err := conn.Write(buf.Bytes())
if err != nil {
e.logger.Log("during", "flush", "err", err)
}
buf.Reset()
e.mgr.Put(err)
}