Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Implement thread safety using a local UDPSocket

  • Loading branch information...
commit f6473a495a8373141fdab9771cf5a964a007a52f 1 parent 7d6bdaa
@raggi raggi authored
Showing with 53 additions and 27 deletions.
  1. +15 −2 lib/statsd.rb
  2. +38 −25 spec/statsd_spec.rb
View
17 lib/statsd.rb
@@ -13,6 +13,13 @@
# @example Create a namespaced statsd client and increment 'account.activate'
# statsd = Statsd.new('localhost').tap{|sd| sd.namespace = 'account'}
# statsd.increment 'activate'
+#
+# Statsd instances are thread safe for general usage, by using a thread local
+# UDPSocket and carrying no state. The attributes are stateful, and are not
+# mutexed, it is expected that users will not change these at runtime in
+# threaded environments. If users require such use cases, it is recommend that
+# users either mutex around their Statsd object, or create separate objects for
+# each namespace / host+port combination.
class Statsd
# A namespace to prepend to all statsd calls.
attr_reader :namespace
@@ -33,21 +40,23 @@ class << self
def initialize(host = '127.0.0.1', port = 8125)
self.host, self.port = host, port
@prefix = nil
- @socket = UDPSocket.new
end
# @attribute [w] namespace
+ # Writes are not thread safe.
def namespace=(namespace)
@namespace = namespace
@prefix = "#{namespace}."
end
# @attribute [w] host
+ # Writes are not thread safe.
def host=(host)
@host = host || '127.0.0.1'
end
# @attribute [w] port
+ # Writes are not thread safe.
def port=(port)
@port = port || 8125
end
@@ -134,9 +143,13 @@ def send_stats(stat, delta, type, sample_rate=1)
def send_to_socket(message)
self.class.logger.debug { "Statsd: #{message}" } if self.class.logger
- @socket.send(message, 0, @host, @port)
+ socket.send(message, 0, @host, @port)
rescue => boom
self.class.logger.error { "Statsd: #{boom.class} #{boom}" } if self.class.logger
nil
end
+
+ def socket
+ Thread.current[:statsd_socket] ||= UDPSocket.new
+ end
end
View
63 spec/statsd_spec.rb
@@ -2,16 +2,15 @@
describe Statsd do
class Statsd
- # we need to stub this
- attr_accessor :socket
+ public :socket
end
before do
@statsd = Statsd.new('localhost', 1234)
- @statsd.socket = FakeUDPSocket.new
+ @socket = Thread.current[:statsd_socket] = FakeUDPSocket.new
end
- after { @statsd.socket.clear }
+ after { Thread.current[:statsd_socket] = nil }
describe "#initialize" do
it "should set the host and port" do
@@ -53,14 +52,14 @@ class Statsd
describe "#increment" do
it "should format the message according to the statsd spec" do
@statsd.increment('foobar')
- @statsd.socket.recv.must_equal ['foobar:1|c']
+ @socket.recv.must_equal ['foobar:1|c']
end
describe "with a sample rate" do
before { class << @statsd; def rand; 0; end; end } # ensure delivery
it "should format the message according to the statsd spec" do
@statsd.increment('foobar', 0.5)
- @statsd.socket.recv.must_equal ['foobar:1|c|@0.5']
+ @socket.recv.must_equal ['foobar:1|c|@0.5']
end
end
end
@@ -68,14 +67,14 @@ class Statsd
describe "#decrement" do
it "should format the message according to the statsd spec" do
@statsd.decrement('foobar')
- @statsd.socket.recv.must_equal ['foobar:-1|c']
+ @socket.recv.must_equal ['foobar:-1|c']
end
describe "with a sample rate" do
before { class << @statsd; def rand; 0; end; end } # ensure delivery
it "should format the message according to the statsd spec" do
@statsd.decrement('foobar', 0.5)
- @statsd.socket.recv.must_equal ['foobar:-1|c|@0.5']
+ @socket.recv.must_equal ['foobar:-1|c|@0.5']
end
end
end
@@ -83,16 +82,16 @@ class Statsd
describe "#gauge" do
it "should send a message with a 'g' type, per the nearbuy fork" do
@statsd.gauge('begrutten-suffusion', 536)
- @statsd.socket.recv.must_equal ['begrutten-suffusion:536|g']
+ @socket.recv.must_equal ['begrutten-suffusion:536|g']
@statsd.gauge('begrutten-suffusion', -107.3)
- @statsd.socket.recv.must_equal ['begrutten-suffusion:-107.3|g']
+ @socket.recv.must_equal ['begrutten-suffusion:-107.3|g']
end
describe "with a sample rate" do
before { class << @statsd; def rand; 0; end; end } # ensure delivery
it "should format the message according to the statsd spec" do
@statsd.gauge('begrutten-suffusion', 536, 0.1)
- @statsd.socket.recv.must_equal ['begrutten-suffusion:536|g|@0.1']
+ @socket.recv.must_equal ['begrutten-suffusion:536|g|@0.1']
end
end
end
@@ -100,14 +99,14 @@ class Statsd
describe "#timing" do
it "should format the message according to the statsd spec" do
@statsd.timing('foobar', 500)
- @statsd.socket.recv.must_equal ['foobar:500|ms']
+ @socket.recv.must_equal ['foobar:500|ms']
end
describe "with a sample rate" do
before { class << @statsd; def rand; 0; end; end } # ensure delivery
it "should format the message according to the statsd spec" do
@statsd.timing('foobar', 500, 0.5)
- @statsd.socket.recv.must_equal ['foobar:500|ms|@0.5']
+ @socket.recv.must_equal ['foobar:500|ms|@0.5']
end
end
end
@@ -115,7 +114,7 @@ class Statsd
describe "#time" do
it "should format the message according to the statsd spec" do
@statsd.time('foobar') { sleep(0.001); 'test' }
- @statsd.socket.recv.must_equal ['foobar:1|ms']
+ @socket.recv.must_equal ['foobar:1|ms']
end
it "should return the result of the block" do
@@ -128,7 +127,7 @@ class Statsd
it "should format the message according to the statsd spec" do
result = @statsd.time('foobar', 0.5) { sleep(0.001); 'test' }
- @statsd.socket.recv.must_equal ['foobar:1|ms|@0.5']
+ @socket.recv.must_equal ['foobar:1|ms|@0.5']
end
end
end
@@ -138,7 +137,7 @@ class Statsd
before { class << @statsd; def rand; raise end; end }
it "should send" do
@statsd.timing('foobar', 500, 1)
- @statsd.socket.recv.must_equal ['foobar:500|ms']
+ @socket.recv.must_equal ['foobar:500|ms']
end
end
@@ -146,7 +145,7 @@ class Statsd
before { class << @statsd; def rand; 0; end; end } # ensure delivery
it "should send" do
@statsd.timing('foobar', 500, 0.5)
- @statsd.socket.recv.must_equal ['foobar:500|ms|@0.5']
+ @socket.recv.must_equal ['foobar:500|ms|@0.5']
end
end
@@ -161,7 +160,7 @@ class Statsd
before { class << @statsd; def rand; 0; end; end } # ensure delivery
it "should send" do
@statsd.timing('foobar', 500, 0.5)
- @statsd.socket.recv.must_equal ['foobar:500|ms|@0.5']
+ @socket.recv.must_equal ['foobar:500|ms|@0.5']
end
end
end
@@ -171,22 +170,22 @@ class Statsd
it "should add namespace to increment" do
@statsd.increment('foobar')
- @statsd.socket.recv.must_equal ['service.foobar:1|c']
+ @socket.recv.must_equal ['service.foobar:1|c']
end
it "should add namespace to decrement" do
@statsd.decrement('foobar')
- @statsd.socket.recv.must_equal ['service.foobar:-1|c']
+ @socket.recv.must_equal ['service.foobar:-1|c']
end
it "should add namespace to timing" do
@statsd.timing('foobar', 500)
- @statsd.socket.recv.must_equal ['service.foobar:500|ms']
+ @socket.recv.must_equal ['service.foobar:500|ms']
end
it "should add namespace to gauge" do
@statsd.gauge('foobar', 500)
- @statsd.socket.recv.must_equal ['service.foobar:500|g']
+ @socket.recv.must_equal ['service.foobar:500|g']
end
end
@@ -220,12 +219,12 @@ class Statsd
class Statsd::SomeClass; end
@statsd.increment(Statsd::SomeClass, 1)
- @statsd.socket.recv.must_equal ['Statsd.SomeClass:1|c']
+ @socket.recv.must_equal ['Statsd.SomeClass:1|c']
end
it "should replace statsd reserved chars in the stat name" do
@statsd.increment('ray@hostname.blah|blah.blah:blah', 1)
- @statsd.socket.recv.must_equal ['ray_hostname.blah_blah.blah_blah:1|c']
+ @socket.recv.must_equal ['ray_hostname.blah_blah.blah_blah:1|c']
end
end
@@ -233,7 +232,7 @@ class Statsd::SomeClass; end
before do
require 'stringio'
Statsd.logger = Logger.new(@log = StringIO.new)
- @statsd.socket.instance_eval { def send(*) raise SocketError end }
+ @socket.instance_eval { def send(*) raise SocketError end }
end
it "should ignore socket errors" do
@@ -245,6 +244,20 @@ class Statsd::SomeClass; end
@log.string.must_match 'Statsd: SocketError'
end
end
+
+ describe "thread safety" do
+
+ it "should use a thread local socket" do
+ Thread.current[:statsd_socket].must_equal @socket
+ @statsd.send(:socket).must_equal @socket
+ end
+
+ it "should create a new socket when used in a new thread" do
+ sock = @statsd.send(:socket)
+ Thread.new { Thread.current[:statsd_socket].wont_equal sock }.join
+ end
+
+ end
end
describe Statsd do
Please sign in to comment.
Something went wrong with that request. Please try again.