This repository has been archived by the owner on Feb 14, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 26
/
g2s.go
199 lines (175 loc) · 5.5 KB
/
g2s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package g2s
import (
"fmt"
"io"
"log"
"math/rand"
"net"
"strings"
"time"
)
const maxPacketSize = 65536 - 8 - 20 // 8-byte UDP header, 20-byte IP header
// Statter collects the ways clients can send observations to a StatsD server.
type Statter interface {
Counter(sampleRate float32, bucket string, n ...int)
Timing(sampleRate float32, bucket string, d ...time.Duration)
Gauge(sampleRate float32, bucket string, value ...string)
}
// Statsd implements the Statter interface by writing StatsD formatted messages
// to the underlying writer.
type Statsd struct {
w io.Writer
prefix string
}
// Dial takes the same parameters as net.Dial, ie. a transport protocol
// (typically "udp") and an endpoint. It returns a new Statsd structure,
// ready to use.
//
// Note that g2s currently performs no management on the connection it creates.
func Dial(proto, endpoint string) (*Statsd, error) {
c, err := net.DialTimeout(proto, endpoint, 2*time.Second)
if err != nil {
return nil, err
}
return New(c, "")
}
// Variant of Dial that accepts a prefix that will be prepended to the name of
// all buckets sent (so it can be used with services that require an API key)
func DialWithPrefix(proto, endpoint string, prefix string) (*Statsd, error) {
c, err := net.DialTimeout(proto, endpoint, 2*time.Second)
if err != nil {
return nil, err
}
return New(c, prefix)
}
// New constructs a Statsd structure which will write statsd-protocol messages
// into the given io.Writer. New is intended to be used by consumers who want
// nonstandard behavior: for example, they may pass an io.Writer which performs
// buffering and aggregation of statsd-protocol messages.
//
// Note that g2s provides no synchronization. If you pass an io.Writer which
// is not goroutine-safe, for example a bytes.Buffer, you must make sure you
// synchronize your calls to the Statter methods.
func New(w io.Writer, prefix string) (*Statsd, error) {
if len(prefix) > 0 && !strings.HasSuffix(prefix, ".") {
prefix = fmt.Sprintf("%s.", prefix)
}
return &Statsd{w: w, prefix: prefix}, nil
}
// bufferize folds the slice of sendables into a slice of byte-buffers,
// each of which shall be no larger than max bytes.
func bufferize(sendables []sendable, max int) [][]byte {
bN := [][]byte{}
b1, b1sz := []byte{}, 0
for _, sendable := range sendables {
buf := []byte(sendable.Message())
if b1sz+len(buf) > max {
bN = append(bN, b1)
b1, b1sz = []byte{}, 0
}
b1 = append(b1, buf...)
b1sz += len(buf)
}
if len(b1) > 0 {
bN = append(bN, b1[0:len(b1)-1])
}
return bN
}
// publish folds the slice of sendables into one or more packets, each of which
// will be no larger than maxPacketSize. It then writes them, one by one,
// into the Statsd io.Writer.
func (s *Statsd) publish(msgs []sendable) {
for _, buf := range bufferize(msgs, maxPacketSize) {
// In the base case, when the Statsd struct is backed by a net.Conn,
// "Multiple goroutines may invoke methods on a Conn simultaneously."
// -- http://golang.org/pkg/net/#Conn
// Otherwise, Bring Your Own Synchronization™.
n, err := s.w.Write(buf)
if err != nil {
log.Printf("g2s: publish: %s", err)
} else if n != len(buf) {
log.Printf("g2s: publish: short send: %d < %d", n, len(buf))
}
}
}
// maybeSample returns a sampling structure and true if a pseudorandom number
// in the range 0..1 is less than or equal to the passed rate.
//
// As a special case, if r >= 1.0, maybeSample will return an uninitialized
// sampling structure and true. The uninitialized sampling structure implies
// enabled == false, which tells statsd that the value is unsampled.
func maybeSample(r float32) (sampling, bool) {
if r >= 1.0 {
return sampling{}, true
}
if rand.Float32() > r {
return sampling{}, false
}
return sampling{
enabled: true,
rate: r,
}, true
}
// Counter sends one or more counter statistics to StatsD.
//
// Application code should call it for every potential invocation of a
// statistic; it uses the sampleRate to determine whether or not to send or
// squelch the data, on an aggregate basis.
func (s *Statsd) Counter(sampleRate float32, bucket string, n ...int) {
samp, ok := maybeSample(sampleRate)
if !ok {
return
}
msgs := make([]sendable, len(n))
for i, ni := range n {
msgs[i] = &counterUpdate{
prefix: s.prefix,
bucket: bucket,
n: ni,
sampling: samp,
}
}
s.publish(msgs)
}
// Timing sends one or more timing statistics to StatsD.
//
// Application code should call it for every potential invocation of a
// statistic; it uses the sampleRate to determine whether or not to send or
// squelch the data, on an aggregate basis.
func (s *Statsd) Timing(sampleRate float32, bucket string, d ...time.Duration) {
samp, ok := maybeSample(sampleRate)
if !ok {
return
}
msgs := make([]sendable, len(d))
for i, di := range d {
msgs[i] = &timingUpdate{
prefix: s.prefix,
bucket: bucket,
ms: int(di.Nanoseconds() / 1e6),
sampling: samp,
}
}
s.publish(msgs)
}
// Gauge sends one or more gauge statistics to Statsd.
//
// Application code should call it for every potential invocation of a
// statistic; it uses the sampleRate to determine whether or not to send or
// squelch the data, on an aggregate basis.
func (s *Statsd) Gauge(sampleRate float32, bucket string, v ...string) {
samp, ok := maybeSample(sampleRate)
if !ok {
return
}
msgs := make([]sendable, len(v))
for i, vi := range v {
msgs[i] = &gaugeUpdate{
prefix: s.prefix,
bucket: bucket,
val: vi,
sampling: samp,
}
}
s.publish(msgs)
}