forked from QubitProducts/bamboo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
g2s.go
181 lines (158 loc) · 4.75 KB
/
g2s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package g2s
import (
"io"
"log"
"math/rand"
"net"
"time"
)
const (
MAX_PACKET_SIZE = 65536 - 8 - 20 // 8-byte UDP header, 20-byte IP header
)
type Statter interface {
Counter(sampleRate float32, bucket string, n ...int)
Timing(sampleRate float32, bucket string, d ...time.Duration)
Gauge(sampleRate float32, bucket string, value ...string)
}
type statsd struct {
w io.Writer
}
// Dial takes the same parameters as net.Dial, ie. a transport protocol
// (typically "udp") and an endpoint. It returns a new Statsd structure,
// ready to use.
//
// Note that g2s currently performs no management on the connection it creates.
func Dial(proto, endpoint string) (Statter, error) {
c, err := net.DialTimeout(proto, endpoint, 2*time.Second)
if err != nil {
return nil, err
}
return New(c)
}
// New constructs a Statsd structure which will write statsd-protocol messages
// into the given io.Writer. New is intended to be used by consumers who want
// nonstandard behavior: for example, they may pass an io.Writer which performs
// buffering and aggregation of statsd-protocol messages.
//
// Note that g2s provides no synchronization. If you pass an io.Writer which
// is not goroutine-safe, for example a bytes.Buffer, you must make sure you
// synchronize your calls to the Statter methods.
func New(w io.Writer) (Statter, error) {
return &statsd{
w: w,
}, nil
}
// bufferize folds the slice of sendables into a slice of byte-buffers,
// each of which shall be no larger than max bytes.
func bufferize(sendables []sendable, max int) [][]byte {
bN := [][]byte{}
b1, b1sz := []byte{}, 0
for _, sendable := range sendables {
buf := []byte(sendable.Message())
if b1sz+len(buf) > max {
bN = append(bN, b1)
b1, b1sz = []byte{}, 0
}
b1 = append(b1, buf...)
b1sz += len(buf)
}
if len(b1) > 0 {
bN = append(bN, b1[0:len(b1)-1])
}
return bN
}
// publish folds the slice of sendables into one or more packets, each of which
// will be no larger than MAX_PACKET_SIZE. It then writes them, one by one,
// into the Statsd io.Writer.
func (s *statsd) publish(msgs []sendable) {
for _, buf := range bufferize(msgs, MAX_PACKET_SIZE) {
// In the base case, when the Statsd struct is backed by a net.Conn,
// "Multiple goroutines may invoke methods on a Conn simultaneously."
// -- http://golang.org/pkg/net/#Conn
// Otherwise, Bring Your Own Synchronization™.
n, err := s.w.Write(buf)
if err != nil {
log.Printf("g2s: publish: %s", err)
} else if n != len(buf) {
log.Printf("g2s: publish: short send: %d < %d", n, len(buf))
}
}
}
// maybeSample returns a sampling structure and true if a pseudorandom number
// in the range 0..1 is less than or equal to the passed rate.
//
// As a special case, if r >= 1.0, maybeSample will return an uninitialized
// sampling structure and true. The uninitialized sampling structure implies
// enabled == false, which tells statsd that the value is unsampled.
func maybeSample(r float32) (sampling, bool) {
if r >= 1.0 {
return sampling{}, true
}
if rand.Float32() > r {
return sampling{}, false
}
return sampling{
enabled: true,
rate: r,
}, true
}
// Counter sends one or more counter statistics to statsd.
//
// Application code should call it for every potential invocation of a
// statistic; it uses the sampleRate to determine whether or not to send or
// squelch the data, on an aggregate basis.
func (s *statsd) Counter(sampleRate float32, bucket string, n ...int) {
samp, ok := maybeSample(sampleRate)
if !ok {
return
}
msgs := make([]sendable, len(n))
for i, ni := range n {
msgs[i] = &counterUpdate{
bucket: bucket,
n: ni,
sampling: samp,
}
}
s.publish(msgs)
}
// Timing sends one or more timing statistics to statsd.
//
// Application code should call it for every potential invocation of a
// statistic; it uses the sampleRate to determine whether or not to send or
// squelch the data, on an aggregate basis.
func (s *statsd) Timing(sampleRate float32, bucket string, d ...time.Duration) {
samp, ok := maybeSample(sampleRate)
if !ok {
return
}
msgs := make([]sendable, len(d))
for i, di := range d {
msgs[i] = &timingUpdate{
bucket: bucket,
ms: int(di.Nanoseconds() / 1e6),
sampling: samp,
}
}
s.publish(msgs)
}
// Gauge sends one or more gauge statistics to statsd.
//
// Application code should call it for every potential invocation of a
// statistic; it uses the sampleRate to determine whether or not to send or
// squelch the data, on an aggregate basis.
func (s *statsd) Gauge(sampleRate float32, bucket string, v ...string) {
samp, ok := maybeSample(sampleRate)
if !ok {
return
}
msgs := make([]sendable, len(v))
for i, vi := range v {
msgs[i] = &gaugeUpdate{
bucket: bucket,
val: vi,
sampling: samp,
}
}
s.publish(msgs)
}