-
Notifications
You must be signed in to change notification settings - Fork 4
/
statsd.go
125 lines (108 loc) · 2.61 KB
/
statsd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package metrics
import (
"io"
"strconv"
"time"
)
// StatsDPackMax is the maximum packet size for batches.
// Be carefull not to exceed the network's MTU. See
// https://github.com/etsy/statsd/blob/master/docs/metric_types.md#multi-metric-packets
var StatsDPackMax = 1432
type statsD struct {
prefix string
// pool holds buffers to prevent mallocs at runtime.
pool chan []byte
// queue holds the pending messages.
queue chan []byte
}
// NewStatsD returns a new Register with a StatsD implementation.
// Generally speaking, you want to connect over UDP.
//
// conn, err := net.DialTimeout("udp", "localhost:8125", 4*time.Second)
// if err != nil {
// ...
// }
// stats := metrics.NewStatsD(conn, time.Second)
//
// Batches are limited by the flushInterval span. A flushInterval of 0 implies no buffering.
func NewStatsD(conn io.Writer, flushInterval time.Duration) Register {
d := new(statsD)
{
size := 1000
d.queue = make(chan []byte, size)
d.pool = make(chan []byte, size)
for i := 0; i < size; i++ {
d.pool <- make([]byte, 0, 40)
}
}
// write statsD.queue to conn
go func() {
buf := make([]byte, 0, StatsDPackMax)
var batchStart time.Time
for {
// flush on interval timeout
if len(buf) > 0 && time.Since(batchStart) >= flushInterval {
conn.Write(buf)
buf = buf[:0]
continue
}
var next []byte
select {
case next = <-d.queue:
default:
time.Sleep(time.Millisecond)
continue
}
// flush first when size limit reached
if len(buf)+1+len(next) > StatsDPackMax {
conn.Write(buf)
buf = buf[:0]
}
if len(buf) != 0 {
buf = append(buf, '\n')
} else {
batchStart = time.Now()
}
buf = append(buf, next...)
// reuse buffer
d.pool <- next[:0]
}
}()
return d
}
func (d *statsD) Seen(key string, n int) {
buf := <-d.pool
buf = append(buf, d.prefix...)
buf = append(buf, key...)
buf = append(buf, ':')
switch {
case n < 0 || n >= 100:
buf = append(buf, strconv.Itoa(n)...)
case n < 10:
buf = append(buf, byte('0'+n))
default:
buf = append(buf, byte('0'+(n/10)), byte('0'+(n%10)))
}
buf = append(buf, '|', 'c')
d.queue <- buf
}
func (d *statsD) Took(key string, since time.Time) {
buf := <-d.pool
buf = append(buf, d.prefix...)
buf = append(buf, key...)
buf = append(buf, ':')
n := int64(time.Since(since) / time.Millisecond)
switch {
case n < 0 || n >= 100:
buf = append(buf, strconv.FormatInt(n, 10)...)
case n < 10:
buf = append(buf, byte('0'+n))
default:
buf = append(buf, byte('0'+(n/10)), byte('0'+(n%10)))
}
buf = append(buf, '|', 'm', 's')
d.queue <- buf
}
func (d *statsD) KeyPrefix(s string) {
d.prefix = s
}