Skip to content

Commit

Permalink
pause/resume now works at this level! (yay!)
Browse files Browse the repository at this point in the history
  • Loading branch information
slyphon committed May 17, 2012
1 parent a31d5b4 commit 1d5060a
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 15 deletions.
4 changes: 3 additions & 1 deletion Guardfile
@@ -1,6 +1,8 @@
guard 'rspec' do

guard 'rspec', :version => 2, :cli => '-c -f progress --fail-fast' do
watch(%r{^spec/.+_spec.rb$})
watch(%r{^lib/(.+)\.rb$}) { |m| %w[spec/zookeeper_spec.rb spec/chrooted_connection_spec.rb] }
watch(%r{^ext/zookeeper_c.bundle}) { %w[spec/c_zookeeper_spec.rb] }
watch(%r{^ext/zookeeper_base.rb}) { "spec" }
end

4 changes: 2 additions & 2 deletions ext/c_zookeeper.rb
Expand Up @@ -139,13 +139,13 @@ def close
#
# requests may still be added during this time, but they will not be
# processed until you call resume
def pause
def pause_before_fork_in_parent
logger.debug { "#{self.class}##{__method__}" }
@mutex.synchronize { stop_event_thread }
end

# call this if 'pause' was previously called to start the event loop again
def resume
def resume_after_fork_in_parent
logger.debug { "#{self.class}##{__method__}" }

@mutex.synchronize do
Expand Down
25 changes: 24 additions & 1 deletion ext/zookeeper_base.rb
Expand Up @@ -32,7 +32,7 @@ class ClientShutdownException < StandardError; end


def_delegators :@czk, :get_children, :exists, :delete, :get, :set,
:set_acl, :get_acl, :client_id, :sync, :wait_until_connected, :pause, :resume
:set_acl, :get_acl, :client_id, :sync, :wait_until_connected

# some state methods need to be more paranoid about locking to ensure the correct
# state is returned
Expand Down Expand Up @@ -194,6 +194,29 @@ def closed?
@mutex.synchronize { !@czk or @czk.closed? }
end

def pause_before_fork_in_parent
@mutex.synchronize do
logger.debug { "ZookeeperBase#pause_before_fork_in_parent" }

# XXX: add anal-retentive state checking
raise "EXPLODERATE! @czk was nil!" unless @czk

@czk.pause_before_fork_in_parent
stop_dispatch_thread!
end
end

def resume_after_fork_in_parent
@mutex.synchronize do
logger.debug { "ZookeeperBase#resume_after_fork_in_parent" }

raise "EXPLODERATE! @czk was nil!" unless @czk

event_queue.open
setup_dispatch_thread!
@czk.resume_after_fork_in_parent
end
end

protected
# this is a hack: to provide consistency between the C and Java drivers when
Expand Down
6 changes: 3 additions & 3 deletions lib/zookeeper/common.rb
Expand Up @@ -94,7 +94,7 @@ def setup_dispatch_thread!
#
# @dispatcher will be nil when this method exits
#
def stop_dispatch_thread!
def stop_dispatch_thread!(timeout=2)
logger.debug { "#{self.class}##{__method__}" }

if @dispatcher
Expand All @@ -111,8 +111,8 @@ def stop_dispatch_thread!
#
@dispatch_shutdown_cond.wait

# wait for another 2 sec for the thread to join
until @dispatcher.join(2)
# wait for another timeout sec for the thread to join
until @dispatcher.join(timeout)
logger.error { "Dispatch thread did not join cleanly, waiting" }
end
@dispatcher = nil
Expand Down
1 change: 0 additions & 1 deletion lib/zookeeper/common/queue_with_pipe.rb
Expand Up @@ -84,7 +84,6 @@ def graceful_close!
def open
@mutex.lock
begin
return unless @closed
@closed = @graceful = false
@cond.broadcast
ensure
Expand Down
26 changes: 19 additions & 7 deletions spec/forked_connection_spec.rb
Expand Up @@ -14,8 +14,16 @@ def process_alive?(pid)
false
end

before do
LBORDER = ('-' * 35) << '< '
RBORDER = ' >' << ('-' * 35)


def mark(thing)
logger << "\n#{LBORDER}#{thing}#{RBORDER}\n\n"
end

before do
mark "BEFORE: START"
if defined?(::Rubinius)
pending("this test is currently broken in rbx")
# elsif ENV['TRAVIS']
Expand All @@ -24,11 +32,12 @@ def process_alive?(pid)
@zk = Zookeeper.new(connection_string)
rm_rf(@zk, path)
end
logger.debug { "----------------< BEFORE: END >-------------------" }
mark "BEFORE: END"
end

after do
logger.debug { "----------------< AFTER: BEGIN >-------------------" }
mark "AFTER: START"

if @pid and process_alive?(@pid)
begin
Process.kill('KILL', @pid)
Expand All @@ -39,6 +48,8 @@ def process_alive?(pid)

@zk.close if @zk and !@zk.closed?
with_open_zk(connection_string) { |z| rm_rf(z, path) }

mark "AFTER: END"
end

def wait_for_child_safely(pid, timeout=5)
Expand All @@ -56,7 +67,8 @@ def wait_for_child_safely(pid, timeout=5)
end

it %[should do the right thing and not fail] do
logger.debug { "----------------< TEST: BEGIN >-------------------" }
mark "TEST: START"

@zk.wait_until_connected

mkdir_p(@zk, pids_root)
Expand All @@ -75,9 +87,9 @@ def wait_for_child_safely(pid, timeout=5)

@zk.stat(:path => "#{pids_root}/child", :watcher => cb)

logger.debug { "-------------------> FORK <---------------------------" }
@zk.pause_before_fork_in_parent

@zk.pause
mark "FORK"

@pid = fork do
logger.debug { "reopening connection in child: #{$$}" }
Expand All @@ -91,7 +103,7 @@ def wait_for_child_safely(pid, timeout=5)
exit!(0)
end

@zk.resume
@zk.resume_after_fork_in_parent

event_waiter_th = Thread.new do
@latch.await(5) unless @event
Expand Down

0 comments on commit 1d5060a

Please sign in to comment.