Skip to content
Browse files

Add thread safe batching support

  • Loading branch information...
1 parent d99be81 commit 3d4d86cf9a12cbb5ea71b8cfd7d4fcc1faf55a26 @raggi raggi committed
Showing with 133 additions and 9 deletions.
  1. +94 −9 lib/statsd.rb
  2. +39 −0 spec/statsd_spec.rb
View
103 lib/statsd.rb
@@ -1,4 +1,5 @@
require 'socket'
+require 'forwardable'
# = Statsd: A Statsd client (https://github.com/etsy/statsd)
#
@@ -21,6 +22,67 @@
# users either mutex around their Statsd object, or create separate objects for
# each namespace / host+port combination.
class Statsd
+
+ # = Batch: A batching statsd proxy
+ #
+ # @example Batch a set of instruments using Batch and manual flush:
+ # $statsd = Statsd.new 'localhost', 8125
+ # batch = Statsd::Batch.new($statsd)
+ # batch.increment 'garets'
+ # batch.timing 'glork', 320
+ # batch.gauge 'bork', 100
+ # batch.flush
+ #
+ # Batch is a subclass of Statsd, but with a constructor that proxies to a
+ # normal Statsd instance. It has it's own batch_size and namespace parameters
+ # (that inherit defaults from the supplied Statsd instance). It is recommended
+ # that some care is taken if setting very large batch sizes. If the batch size
+ # exceeds the allowed packet size for UDP on your network, communication
+ # troubles may occur and data will be lost.
+ class Batch < Statsd
+
+ extend Forwardable
+ def_delegators :@statsd, :namespace, :namespace=, :host, :port, :prefix
+
+ attr_accessor :batch_size
+
+ # @param [Statsd] requires a configured Statsd instance
+ def initialize(statsd)
+ @statsd = statsd
+ @batch_size = statsd.batch_size
+ @backlog = []
+ end
+
+ # @yields [Batch] yields itself
+ #
+ # A convenience method to ensure that data is not lost in the event of an
+ # exception being thrown. Batches will be transmitted on the parent socket
+ # as soon as the batch is full, and when the block finishes.
+ def easy
+ yield self
+ ensure
+ flush
+ end
+
+ def flush
+ unless @backlog.empty?
+ @statsd.send_to_socket @backlog.join("\n")
+ @backlog.clear
+ end
+ end
+
+ protected
+
+ def send_to_socket(message)
+ @backlog << message
+ if @backlog.size >= @batch_size
+ flush
+ end
+ end
+
+ end
+
+
# A namespace to prepend to all statsd calls.
attr_reader :namespace
@@ -30,6 +92,12 @@ class Statsd
# StatsD port. Defaults to 8125.
attr_reader :port
+ # StatsD namespace prefix, generated from #namespace
+ attr_reader :prefix
+
+ # The default batch size for new batches (default: 10)
+ attr_accessor :batch_size
+
class << self
# Set to a standard logger instance to enable debug logging.
attr_accessor :logger
@@ -40,6 +108,7 @@ class << self
def initialize(host = '127.0.0.1', port = 8125)
self.host, self.port = host, port
@prefix = nil
+ @batch_size = 10
end
# @attribute [w] namespace
@@ -130,17 +199,22 @@ def time(stat, sample_rate=1)
result
end
- private
-
- def send_stats(stat, delta, type, sample_rate=1)
- if sample_rate == 1 or rand < sample_rate
- # Replace Ruby module scoping with '.' and reserved chars (: | @) with underscores.
- stat = stat.to_s.gsub('::', '.').tr(':|@', '_')
- rate = "|@#{sample_rate}" unless sample_rate == 1
- send_to_socket "#{@prefix}#{stat}:#{delta}|#{type}#{rate}"
- end
+ # Creates and yields a Batch that can be used to batch instrument reports into
+ # larger packets. Batches are sent either when the packet is "full" (defined
+ # by batch_size), or when the block completes, whichever is the sooner.
+ #
+ # @yield [Batch] a statsd subclass that collects and batches instruments
+ # @example Batch two instument operations:
+ # $statsd.batch do |batch|
+ # batch.increment 'sys.requests'
+ # batch.gauge('user.count', User.count)
+ # end
+ def batch(&block)
+ Batch.new(self).easy &block
end
+ protected
+
def send_to_socket(message)
self.class.logger.debug { "Statsd: #{message}" } if self.class.logger
socket.send(message, 0, @host, @port)
@@ -149,6 +223,17 @@ def send_to_socket(message)
nil
end
+ private
+
+ def send_stats(stat, delta, type, sample_rate=1)
+ if sample_rate == 1 or rand < sample_rate
+ # Replace Ruby module scoping with '.' and reserved chars (: | @) with underscores.
+ stat = stat.to_s.gsub('::', '.').tr(':|@', '_')
+ rate = "|@#{sample_rate}" unless sample_rate == 1
+ send_to_socket "#{prefix}#{stat}:#{delta}|#{type}#{rate}"
+ end
+ end
+
def socket
Thread.current[:statsd_socket] ||= UDPSocket.new
end
View
39 spec/statsd_spec.rb
@@ -245,6 +245,45 @@ class Statsd::SomeClass; end
end
end
+ describe "batching" do
+ it "should have a default batch size of 10" do
+ @statsd.batch_size.must_equal 10
+ end
+
+ it "should have a modifiable batch size" do
+ @statsd.batch_size = 7
+ @statsd.batch_size.must_equal 7
+ @statsd.batch do |b|
+ b.batch_size.must_equal 7
+ end
+ end
+
+ it "should flush the batch at the batch size or at the end of the block" do
+ @statsd.batch do |b|
+ b.batch_size = 3
+
+ # The first three should flush, the next two will be flushed when the
+ # block is done.
+ 5.times { b.increment('foobar') }
+
+ @socket.recv.must_equal [(["foobar:1|c"] * 3).join("\n")]
+ end
+
+ @socket.recv.must_equal [(["foobar:1|c"] * 2).join("\n")]
+ end
+
+ it "should not flush to the socket if the backlog is empty" do
+ batch = Statsd::Batch.new(@statsd)
+ batch.flush
+ @socket.recv.must_be :nil?
+
+ batch.increment 'foobar'
+ batch.flush
+ @socket.recv.must_equal %w[foobar:1|c]
+ end
+
+ end
+
describe "thread safety" do
it "should use a thread local socket" do

0 comments on commit 3d4d86c

Please sign in to comment.
Something went wrong with that request. Please try again.