Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add thread safe batching support #22

Merged
merged 2 commits into from

3 participants

@raggi
Collaborator

References #19. This implementation is much longer :sob: but it should also have sufficient thread safety semantics for almost all users :innocent:

It introduces a somewhat hacked up subclass, and depends on stdlib forwardable (which could be eradicated if we want to write more code).

Thoughts?

@raggi
Collaborator

That JRuby failure is a wobbler.

There's a couple of places I missed docs. I'll add those if everyones :+1:

@raggi
Collaborator

It's also worth noting here, that the thread safety of this batch object is only as safe as common code paths. The following may introduce problems:

$statsd.batch do |batch|
  10.times { Thread.new { 1000.times { batch.time('rand') { sleep(rand) } } } }
end

The race there would occur on the @backlog array, and could even result in dropped data.

Only a single batch object per thread should be used. This is more than likely the way everyone would consume this code, but the value of the API over creating more statsd instances, is that the block form, provided threads aren't created inside the block, should be sufficient to maintain an easy barrier.

@reinh
Owner

This looks really nice. :thumbsup:

I like the use of Forwardable. I'd be interested in seeing some benchmarks that show the performance implications of batched delivery, especially in tight loops.

@reinh
Owner

:thumbsup:

Travis, sudo rebuild the branch. :(

@raggi raggi Merge branch 'master' into thread_safe_batching
* master:
  Fix typo in README example server config
  Fixed Statsd.gauge parameter name in documentation.
  Removed an unused variable from the statsd spec
  Removed an unused variable from spec/helper
  Add basic usage to docs
  adding postfix

Conflicts:
	lib/statsd.rb
0274a41
@raggi raggi merged commit cb55d1a into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 8, 2012
  1. @raggi
Commits on Dec 5, 2012
  1. @raggi

    Merge branch 'master' into thread_safe_batching

    raggi authored
    * master:
      Fix typo in README example server config
      Fixed Statsd.gauge parameter name in documentation.
      Removed an unused variable from the statsd spec
      Removed an unused variable from spec/helper
      Add basic usage to docs
      adding postfix
    
    Conflicts:
    	lib/statsd.rb
This page is out of date. Refresh to see the latest.
Showing with 134 additions and 9 deletions.
  1. +95 −9 lib/statsd.rb
  2. +39 −0 spec/statsd_spec.rb
View
104 lib/statsd.rb
@@ -1,4 +1,5 @@
require 'socket'
+require 'forwardable'
# = Statsd: A Statsd client (https://github.com/etsy/statsd)
#
@@ -21,6 +22,68 @@
# users either mutex around their Statsd object, or create separate objects for
# each namespace / host+port combination.
class Statsd
+
+ # = Batch: A batching statsd proxy
+ #
+ # @example Batch a set of instruments using Batch and manual flush:
+ # $statsd = Statsd.new 'localhost', 8125
+ # batch = Statsd::Batch.new($statsd)
+ # batch.increment 'garets'
+ # batch.timing 'glork', 320
+ # batch.gauge 'bork', 100
+ # batch.flush
+ #
+ # Batch is a subclass of Statsd, but with a constructor that proxies to a
+ # normal Statsd instance. It has it's own batch_size and namespace parameters
+ # (that inherit defaults from the supplied Statsd instance). It is recommended
+ # that some care is taken if setting very large batch sizes. If the batch size
+ # exceeds the allowed packet size for UDP on your network, communication
+ # troubles may occur and data will be lost.
+ class Batch < Statsd
+
+ extend Forwardable
+ def_delegators :@statsd,
+ :namespace, :namespace=, :host, :port, :prefix, :postfix
+
+ attr_accessor :batch_size
+
+ # @param [Statsd] requires a configured Statsd instance
+ def initialize(statsd)
+ @statsd = statsd
+ @batch_size = statsd.batch_size
+ @backlog = []
+ end
+
+ # @yields [Batch] yields itself
+ #
+ # A convenience method to ensure that data is not lost in the event of an
+ # exception being thrown. Batches will be transmitted on the parent socket
+ # as soon as the batch is full, and when the block finishes.
+ def easy
+ yield self
+ ensure
+ flush
+ end
+
+ def flush
+ unless @backlog.empty?
+ @statsd.send_to_socket @backlog.join("\n")
+ @backlog.clear
+ end
+ end
+
+ protected
+
+ def send_to_socket(message)
+ @backlog << message
+ if @backlog.size >= @batch_size
+ flush
+ end
+ end
+
+ end
+
+
# A namespace to prepend to all statsd calls.
attr_reader :namespace
@@ -30,6 +93,12 @@ class Statsd
# StatsD port. Defaults to 8125.
attr_reader :port
+ # StatsD namespace prefix, generated from #namespace
+ attr_reader :prefix
+
+ # The default batch size for new batches (default: 10)
+ attr_accessor :batch_size
+
# a postfix to append to all metrics
attr_reader :postfix
@@ -43,6 +112,7 @@ class << self
def initialize(host = '127.0.0.1', port = 8125)
self.host, self.port = host, port
@prefix = nil
+ @batch_size = 10
@postfix = nil
end
@@ -138,17 +208,22 @@ def time(stat, sample_rate=1)
result
end
- private
-
- def send_stats(stat, delta, type, sample_rate=1)
- if sample_rate == 1 or rand < sample_rate
- # Replace Ruby module scoping with '.' and reserved chars (: | @) with underscores.
- stat = stat.to_s.gsub('::', '.').tr(':|@', '_')
- rate = "|@#{sample_rate}" unless sample_rate == 1
- send_to_socket "#{@prefix}#{stat}#{@postfix}:#{delta}|#{type}#{rate}"
- end
+ # Creates and yields a Batch that can be used to batch instrument reports into
+ # larger packets. Batches are sent either when the packet is "full" (defined
+ # by batch_size), or when the block completes, whichever is the sooner.
+ #
+ # @yield [Batch] a statsd subclass that collects and batches instruments
+ # @example Batch two instument operations:
+ # $statsd.batch do |batch|
+ # batch.increment 'sys.requests'
+ # batch.gauge('user.count', User.count)
+ # end
+ def batch(&block)
+ Batch.new(self).easy &block
end
+ protected
+
def send_to_socket(message)
self.class.logger.debug { "Statsd: #{message}" } if self.class.logger
socket.send(message, 0, @host, @port)
@@ -157,6 +232,17 @@ def send_to_socket(message)
nil
end
+ private
+
+ def send_stats(stat, delta, type, sample_rate=1)
+ if sample_rate == 1 or rand < sample_rate
+ # Replace Ruby module scoping with '.' and reserved chars (: | @) with underscores.
+ stat = stat.to_s.gsub('::', '.').tr(':|@', '_')
+ rate = "|@#{sample_rate}" unless sample_rate == 1
+ send_to_socket "#{prefix}#{stat}#{postfix}:#{delta}|#{type}#{rate}"
+ end
+ end
+
def socket
Thread.current[:statsd_socket] ||= UDPSocket.new
end
View
39 spec/statsd_spec.rb
@@ -269,6 +269,45 @@ class Statsd::SomeClass; end
end
end
+ describe "batching" do
+ it "should have a default batch size of 10" do
+ @statsd.batch_size.must_equal 10
+ end
+
+ it "should have a modifiable batch size" do
+ @statsd.batch_size = 7
+ @statsd.batch_size.must_equal 7
+ @statsd.batch do |b|
+ b.batch_size.must_equal 7
+ end
+ end
+
+ it "should flush the batch at the batch size or at the end of the block" do
+ @statsd.batch do |b|
+ b.batch_size = 3
+
+ # The first three should flush, the next two will be flushed when the
+ # block is done.
+ 5.times { b.increment('foobar') }
+
+ @socket.recv.must_equal [(["foobar:1|c"] * 3).join("\n")]
+ end
+
+ @socket.recv.must_equal [(["foobar:1|c"] * 2).join("\n")]
+ end
+
+ it "should not flush to the socket if the backlog is empty" do
+ batch = Statsd::Batch.new(@statsd)
+ batch.flush
+ @socket.recv.must_be :nil?
+
+ batch.increment 'foobar'
+ batch.flush
+ @socket.recv.must_equal %w[foobar:1|c]
+ end
+
+ end
+
describe "thread safety" do
it "should use a thread local socket" do
Something went wrong with that request. Please try again.