Skip to content

Commit

Permalink
trying to get client pool tests working with 1.9.2
Browse files Browse the repository at this point in the history
  • Loading branch information
slyphon committed Jun 7, 2011
1 parent 3b43c06 commit bcec16f
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 52 deletions.
4 changes: 4 additions & 0 deletions .rspec
@@ -0,0 +1,4 @@
--color
--require ./spec/support/logging_progress_bar_formatter.rb
--format Motionbox::LoggingProgressBarFormatter

2 changes: 1 addition & 1 deletion lib/z_k/event_handler.rb
Expand Up @@ -43,7 +43,7 @@ def initialize(zookeeper_client)
# @see ZooKeeper::WatcherEvent
# @see ZooKeeper::EventHandlerSubscription
def register(path, &block)
logger.debug { "EventHandler#register path=#{path.inspect}" }
# logger.debug { "EventHandler#register path=#{path.inspect}" }
EventHandlerSubscription.new(self, path, block).tap do |subscription|
synchronize { @callbacks[path] << subscription }
end
Expand Down
19 changes: 12 additions & 7 deletions lib/z_k/pool.rb
Expand Up @@ -154,12 +154,12 @@ def initialize(host, opts={})
# returns the current number of allocated clients in the pool (not
# available clients)
def size
@connections.length
synchronize { @connections.length }
end

# clients available for checkout (at time of call)
def available_size
@pool.length
synchronize { @pool.length }
end

def checkin(connection)
Expand Down Expand Up @@ -205,6 +205,11 @@ def checkout(blocking=true)
end
end

# @private
def can_grow_pool?
synchronize { @connections.size < @max_clients }
end

protected
def synchronize_with_waiter_count
synchronize do
Expand All @@ -225,15 +230,15 @@ def add_connection!
synchronize do
cnx = create_connection
@connections << cnx
logger.debug { "added connection #{cnx.object_id} to @connections" }

cnx.on_connected { checkin(cnx) }
cnx.on_connected do
logger.debug { "on connected called for cnx #{cnx.object_id}" }
checkin(cnx)
end
end
end

def can_grow_pool?
synchronize { @connections.size < @max_clients }
end

def create_connection
ZK.new(@host, @connection_timeout, @connection_args)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/z_k/threadpool.rb
Expand Up @@ -92,7 +92,7 @@ def spawn_threadpool #:nodoc:
while @running
begin
op = @threadqueue.pop
$stderr.puts "thread #{Thread.current.inspect} got #{op.inspect}"
# $stderr.puts "thread #{Thread.current.inspect} got #{op.inspect}"
break if op == KILL_TOKEN
op.call
rescue Exception => e
Expand Down
67 changes: 24 additions & 43 deletions spec/client_pool_spec.rb
Expand Up @@ -57,33 +57,32 @@
@about_to_block = false

open_th = Thread.new do
@connection_pool.with_connection do |cnx|
@about_to_block = true
# wait for signal to release our connection
release_q.pop
end
@cnx = @connection_pool.checkout(true)
@about_to_block = true
# wait for signal to release our connection
release_q.pop
end

wait_until(2) { @about_to_block }
wait_until(5) { @about_to_block }
@about_to_block.should be_true

release_q.num_waiting.should == 1

closing_th = Thread.new do
@connection_pool.close_all!
end

wait_until(2) { @connection_pool.closing? }
wait_until(5) { @connection_pool.closing? }
@connection_pool.should be_closing
logger.debug { "connection pool is closing" }

lambda { @connection_pool.with_connection { |c| } }.should raise_error(ZK::Exceptions::PoolIsShuttingDownException)

release_q << :ok_let_go

open_th.join(2).should == open_th
open_th.join(5).should == open_th

wait_until(2) { @connection_pool.closed? }
wait_until(5) { @connection_pool.closed? }
$stderr.puts "@connection_pool.pool_state: #{@connection_pool.pool_state.inspect}"

@connection_pool.should be_closed

lambda do
Expand Down Expand Up @@ -180,44 +179,26 @@
# end

it %[should grow if it can] do
q1 = Queue.new

@connection_pool.size.should == 1

th1 = Thread.new do
@connection_pool.with_connection do |cnx|
Thread.current[:cnx] = cnx
q1.pop # block here
end
end

th1.join_until(2) { th1[:cnx] }
th1[:cnx].should_not be_nil

th2 = Thread.new do
@connection_pool.with_connection do |cnx|
Thread.current[:cnx] = cnx
q1.pop
end
end
th = Thread.new do
wait_until(2) { @connection_pool.available_size > 0 }
@connection_pool.available_size.should > 0

th2.join_until(2) { th2[:cnx] }
@connection_pool.size.should == 1

th2[:cnx].should_not be_nil
th2[:cnx].should be_connected
@cnx1 = @connection_pool.checkout
@cnx1.should_not be_false

@connection_pool.size.should == 2
@connection_pool.available_size.should be_zero
@connection_pool.can_grow_pool?.should be_true

2.times { q1.enq(:release_cnx) }
@cnx2 = @connection_pool.checkout
@cnx2.should_not be_false
@cnx2.should be_connected

lambda do
th1.join(1).should == th1
th2.join(1).should == th2
end.should_not raise_error
[@cnx1, @cnx2].each { |c| @connection_pool.checkin(c) }
end

@connection_pool.size.should == 2
@connection_pool.available_size.should == 2

th.join
end

it %[should not grow past max_clients and block] do
Expand Down
14 changes: 14 additions & 0 deletions spec/support/logging_progress_bar_formatter.rb
@@ -0,0 +1,14 @@
require 'rspec/core/formatters/progress_formatter'

module Motionbox
# essentially a monkey-patch to the ProgressBarFormatter, outputs
# '== #{example_proxy.description} ==' in the logs before each test. makes it
# easier to match up tests with the SQL they produce
class LoggingProgressBarFormatter < RSpec::Core::Formatters::ProgressFormatter
def example_started(example)
ZK.logger.info(yellow("\n=====<([ #{example.full_description} ])>=====\n"))
super
end
end
end

0 comments on commit bcec16f

Please sign in to comment.