-
Notifications
You must be signed in to change notification settings - Fork 153
/
statsd.rb
400 lines (344 loc) · 11.3 KB
/
statsd.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
require 'socket'
require 'forwardable'
require 'json'
# = Statsd: A Statsd client (https://github.com/etsy/statsd)
#
# @example Set up a global Statsd client for a server on localhost:8125
# $statsd = Statsd.new 'localhost', 8125
# @example Set up a global Statsd client for a server on IPv6 port 8125
# $statsd = Statsd.new '::1', 8125
# @example Send some stats
# $statsd.increment 'garets'
# $statsd.timing 'glork', 320
# $statsd.gauge 'bork', 100
# @example Use {#time} to time the execution of a block
# $statsd.time('account.activate') { @account.activate! }
# @example Create a namespaced statsd client and increment 'account.activate'
# statsd = Statsd.new('localhost').tap{|sd| sd.namespace = 'account'}
# statsd.increment 'activate'
#
# Statsd instances are thread safe for general usage, by using a thread local
# UDPSocket and carrying no state. The attributes are stateful, and are not
# mutexed, it is expected that users will not change these at runtime in
# threaded environments. If users require such use cases, it is recommend that
# users either mutex around their Statsd object, or create separate objects for
# each namespace / host+port combination.
class Statsd
# = Batch: A batching statsd proxy
#
# @example Batch a set of instruments using Batch and manual flush:
# $statsd = Statsd.new 'localhost', 8125
# batch = Statsd::Batch.new($statsd)
# batch.increment 'garets'
# batch.timing 'glork', 320
# batch.gauge 'bork', 100
# batch.flush
#
# Batch is a subclass of Statsd, but with a constructor that proxies to a
# normal Statsd instance. It has it's own batch_size and namespace parameters
# (that inherit defaults from the supplied Statsd instance). It is recommended
# that some care is taken if setting very large batch sizes. If the batch size
# exceeds the allowed packet size for UDP on your network, communication
# troubles may occur and data will be lost.
class Batch < Statsd
extend Forwardable
def_delegators :@statsd,
:namespace, :namespace=,
:host, :host=,
:port, :port=,
:prefix,
:postfix
attr_accessor :batch_size
# @param [Statsd] requires a configured Statsd instance
def initialize(statsd)
@statsd = statsd
@batch_size = statsd.batch_size
@backlog = []
end
# @yields [Batch] yields itself
#
# A convenience method to ensure that data is not lost in the event of an
# exception being thrown. Batches will be transmitted on the parent socket
# as soon as the batch is full, and when the block finishes.
def easy
yield self
ensure
flush
end
def flush
unless @backlog.empty?
@statsd.send_to_socket @backlog.join("\n")
@backlog.clear
end
end
protected
def send_to_socket(message)
@backlog << message
if @backlog.size >= @batch_size
flush
end
end
end
class Admin
# StatsD host. Defaults to 127.0.0.1.
attr_reader :host
# StatsD admin port. Defaults to 8126.
attr_reader :port
class << self
# Set to a standard logger instance to enable debug logging.
attr_accessor :logger
end
# @attribute [w] host
# Writes are not thread safe.
def host=(host)
@host = host || '127.0.0.1'
end
# @attribute [w] port
# Writes are not thread safe.
def port=(port)
@port = port || 8126
end
# @param [String] host your statsd host
# @param [Integer] port your statsd port
def initialize(host = '127.0.0.1', port = 8126)
self.host, self.port = host, port
end
# Reads all gauges from StatsD.
def gauges
read_metric :gauges
end
# Reads all timers from StatsD.
def timers
read_metric :timers
end
# Reads all counters from StatsD.
def counters
read_metric :counters
end
# @param[String] item
# Deletes one or more gauges. Wildcards are allowed.
def delgauges item
delete_metric :gauges, item
end
# @param[String] item
# Deletes one or more timers. Wildcards are allowed.
def deltimers item
delete_metric :timers, item
end
# @param[String] item
# Deletes one or more counters. Wildcards are allowed.
def delcounters item
delete_metric :counters, item
end
def stats
# the format of "stats" isn't JSON, who knows why
send_to_socket "stats"
result = read_from_socket
items = {}
result.split("\n").each do |line|
key, val = line.chomp.split(": ")
items[key] = val.to_i
end
items
end
private
def read_metric name
send_to_socket name
result = read_from_socket
# for some reason, the reply looks like JSON, but isn't, quite
JSON.parse result.gsub("'", "\"")
end
def delete_metric name, item
send_to_socket "del#{name} #{item}"
result = read_from_socket
deleted = []
result.split("\n").each do |line|
deleted << line.chomp.split(": ")[-1]
end
deleted
end
def send_to_socket(message)
self.class.logger.debug { "Statsd: #{message}" } if self.class.logger
socket.write(message.to_s + "\n")
rescue => boom
self.class.logger.error { "Statsd: #{boom.class} #{boom}" } if self.class.logger
nil
end
def read_from_socket
buffer = ""
loop do
line = socket.readline
break if line == "END\n"
buffer += line
end
socket.readline # clear the closing newline out of the socket
buffer
end
def socket
Thread.current[:statsd_admin_socket] ||= TCPSocket.new(host, port)
end
end
# A namespace to prepend to all statsd calls.
attr_reader :namespace
# StatsD host. Defaults to 127.0.0.1.
attr_reader :host
# StatsD port. Defaults to 8125.
attr_reader :port
# StatsD namespace prefix, generated from #namespace
attr_reader :prefix
# The default batch size for new batches (default: 10)
attr_accessor :batch_size
# a postfix to append to all metrics
attr_reader :postfix
class << self
# Set to a standard logger instance to enable debug logging.
attr_accessor :logger
end
# @param [String] host your statsd host
# @param [Integer] port your statsd port
def initialize(host = '127.0.0.1', port = 8125)
self.host, self.port = host, port
@prefix = nil
@batch_size = 10
@postfix = nil
end
# @attribute [w] namespace
# Writes are not thread safe.
def namespace=(namespace)
@namespace = namespace
@prefix = "#{namespace}."
end
# @attribute [w] postfix
# A value to be appended to the stat name after a '.'. If the value is
# blank then the postfix will be reset to nil (rather than to '.').
def postfix=(pf)
case pf
when nil, false, '' then @postfix = nil
else @postfix = ".#{pf}"
end
end
# @attribute [w] host
# Writes are not thread safe.
def host=(host)
@host = host || '127.0.0.1'
end
# @attribute [w] port
# Writes are not thread safe.
def port=(port)
@port = port || 8125
end
# Sends an increment (count = 1) for the given stat to the statsd server.
#
# @param [String] stat stat name
# @param [Numeric] sample_rate sample rate, 1 for always
# @see #count
def increment(stat, sample_rate=1)
count stat, 1, sample_rate
end
# Sends a decrement (count = -1) for the given stat to the statsd server.
#
# @param [String] stat stat name
# @param [Numeric] sample_rate sample rate, 1 for always
# @see #count
def decrement(stat, sample_rate=1)
count stat, -1, sample_rate
end
# Sends an arbitrary count for the given stat to the statsd server.
#
# @param [String] stat stat name
# @param [Integer] count count
# @param [Numeric] sample_rate sample rate, 1 for always
def count(stat, count, sample_rate=1)
send_stats stat, count, :c, sample_rate
end
# Sends an arbitary gauge value for the given stat to the statsd server.
#
# This is useful for recording things like available disk space,
# memory usage, and the like, which have different semantics than
# counters.
#
# @param [String] stat stat name.
# @param [Numeric] value gauge value.
# @param [Numeric] sample_rate sample rate, 1 for always
# @example Report the current user count:
# $statsd.gauge('user.count', User.count)
def gauge(stat, value, sample_rate=1)
send_stats stat, value, :g, sample_rate
end
# Sends an arbitary set value for the given stat to the statsd server.
#
# This is for recording counts of unique events, which are useful to
# see on graphs to correlate to other values. For example, a deployment
# might get recorded as a set, and be drawn as annotations on a CPU history
# graph.
#
# @param [String] stat stat name.
# @param [Numeric] value event value.
# @param [Numeric] sample_rate sample rate, 1 for always
# @example Report a deployment happening:
# $statsd.set('deployment', DEPLOYMENT_EVENT_CODE)
def set(stat, value, sample_rate=1)
send_stats stat, value, :s, sample_rate
end
# Sends a timing (in ms) for the given stat to the statsd server. The
# sample_rate determines what percentage of the time this report is sent. The
# statsd server then uses the sample_rate to correctly track the average
# timing for the stat.
#
# @param [String] stat stat name
# @param [Integer] ms timing in milliseconds
# @param [Numeric] sample_rate sample rate, 1 for always
def timing(stat, ms, sample_rate=1)
send_stats stat, ms, :ms, sample_rate
end
# Reports execution time of the provided block using {#timing}.
#
# @param [String] stat stat name
# @param [Numeric] sample_rate sample rate, 1 for always
# @yield The operation to be timed
# @see #timing
# @example Report the time (in ms) taken to activate an account
# $statsd.time('account.activate') { @account.activate! }
def time(stat, sample_rate=1)
start = Time.now
result = yield
timing(stat, ((Time.now - start) * 1000).round, sample_rate)
result
end
# Creates and yields a Batch that can be used to batch instrument reports into
# larger packets. Batches are sent either when the packet is "full" (defined
# by batch_size), or when the block completes, whichever is the sooner.
#
# @yield [Batch] a statsd subclass that collects and batches instruments
# @example Batch two instument operations:
# $statsd.batch do |batch|
# batch.increment 'sys.requests'
# batch.gauge('user.count', User.count)
# end
def batch(&block)
Batch.new(self).easy &block
end
protected
def send_to_socket(message)
self.class.logger.debug { "Statsd: #{message}" } if self.class.logger
socket.send(message, 0, @host, @port)
rescue => boom
self.class.logger.error { "Statsd: #{boom.class} #{boom}" } if self.class.logger
nil
end
private
def send_stats(stat, delta, type, sample_rate=1)
if sample_rate == 1 or rand < sample_rate
# Replace Ruby module scoping with '.' and reserved chars (: | @) with underscores.
stat = stat.to_s.gsub('::', '.').tr(':|@', '_')
rate = "|@#{sample_rate}" unless sample_rate == 1
send_to_socket "#{prefix}#{stat}#{postfix}:#{delta}|#{type}#{rate}"
end
end
def socket
Thread.current[:statsd_socket] ||= UDPSocket.new addr_family
end
def addr_family
Addrinfo.udp(@host, @port).ipv6? ? Socket::AF_INET6 : Socket::AF_INET
end
end