Skip to content
This repository has been archived by the owner on Sep 2, 2021. It is now read-only.

Commit

Permalink
Adding support for proper exception handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
nanodeath committed Dec 15, 2012
1 parent 4c3d415 commit 9b60700
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 9 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
1.1.0
=====
Improved exception handling!

1.0.0
=====
No changes from 1.0.0.beta

1.0.0.beta
==========
Removing rcov and jeweler dependencies, fixing rspec at 1.x, adding some
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
threadz (1.0.0.beta)
threadz (1.1.0.rc1)

GEM
remote: http://rubygems.org/
Expand Down
2 changes: 1 addition & 1 deletion lib/threadz.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ def Threadz.dputs(string)

Threadz::dputs("Loading threadz")

['atomic_integer', 'sleeper', 'directive', 'batch', 'thread_pool'].each { |lib| require File.join(File.dirname(__FILE__), 'threadz', lib) }
['atomic_integer', 'sleeper', 'directive', 'batch', 'thread_pool', 'errors'].each { |lib| require File.join(File.dirname(__FILE__), 'threadz', lib) }

34 changes: 32 additions & 2 deletions lib/threadz/batch.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
['atomic_integer', 'sleeper'].each { |lib| require File.join(File.dirname(__FILE__), lib) }
['atomic_integer', 'sleeper', 'errors'].each { |lib| require File.join(File.dirname(__FILE__), lib) }

module Threadz
# A batch is a collection of jobs you care about that gets pushed off to
Expand All @@ -16,6 +16,12 @@ def initialize(threadpool, opts={})
@jobs_count = AtomicInteger.new(0)
@when_done_blocks = []
@sleeper = ::Threadz::Sleeper.new
@error_lock = Mutex.new
@errors = []
@error_handler = opts[:error_handler]
if @error_handler && !@error_handler.respond_to?(:call)
raise ArgumentError.new("ErrorHandler must respond to #call")
end

## Options

Expand Down Expand Up @@ -59,13 +65,24 @@ def wait_until_done(opts={})

timeout = opts.key?(:timeout) ? opts[:timeout] : 0
@sleeper.wait(timeout) unless completed?
errors = self.errors
if !errors.empty? && !@error_handler
raise JobError.new(errors)
end
end

# Returns true iff there are no jobs outstanding.
def completed?
return @jobs_count.value == 0
end

# Returns the list of errors that occurred
def errors
arr = nil
@error_lock.synchronize { arr = @errors.dup }
arr
end

# If this is a latent batch, start processing all of the jobs in the queue.
def start
@job_lock.synchronize { # in case another thread tries to push new jobs onto the queue while we're starting
Expand Down Expand Up @@ -98,7 +115,20 @@ def handle_done

def send_to_threadpool(job)
@threadpool.process do
job.call
control = Control.new
begin
control.reset_retry
job.call
rescue StandardError => e
@error_lock.synchronize { @errors << e }
control.errors << e
if @error_handler
@error_handler.call(e, control)
if control.retry?
retry
end
end
end
# Lock in case we get two threads at the "fork in the road" at the same time
# Note: locking here actually creates undesirable behavior. Still investigating why,
# seems like it should be useful.
Expand Down
25 changes: 25 additions & 0 deletions lib/threadz/control.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module Threadz
# A control through which to manipulate an individual job
class Control
attr_reader :errors

def initialize
@errors = []
@retry = false
end

def try_again(error_limit=Infinity)
if @errors.size < error_limit
@retry = true
end
end

def retry?
@retry
end

def reset_retry
@retry = false
end
end
end
9 changes: 9 additions & 0 deletions lib/threadz/errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module Threadz
class JobError < StandardError
attr_reader :errors
def initialize(errors)
super("One or more jobs failed due to errors (see #errors)")
@errors = errors
end
end
end
6 changes: 4 additions & 2 deletions lib/threadz/thread_pool.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
require 'thread'
['control'].each { |lib| require File.join(File.dirname(__FILE__), lib) }


module Threadz

Expand Down Expand Up @@ -53,7 +55,7 @@ def thread_count
# finishes. If you care about when it finishes, use batches.
def process(callback = nil, &block)
callback ||= block
@queue << callback
@queue << {:control => Control.new, :callback => callback}
nil
end

Expand All @@ -76,7 +78,7 @@ def spawn_thread
end
Thread.pass
begin
x.call
x[:callback].call(x[:control])
rescue StandardError => e
$stderr.puts "Threadz: Error in thread, but restarting with next job: #{e.inspect}\n#{e.backtrace.join("\n")}"
end
Expand Down
2 changes: 1 addition & 1 deletion lib/threadz/version.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module Threadz
VERSION = "1.0.0"
VERSION = "1.1.0.rc1"
end

36 changes: 34 additions & 2 deletions spec/threadz_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@
b = @T.new_batch
b << lambda { i += 1 }
b << lambda { i -= 1 }
b << [lambda { i += 2}, lambda { sleep 0.01 while i < 10 }]
b << [lambda { i += 2}, lambda { sleep 0.1 while i < 10 }]

b.completed?.should be_false

Expand All @@ -145,7 +145,10 @@
b.completed?.should be_false

i = 10
sleep 0.1

5.times do
sleep 1 if !b.completed?
end

b.completed?.should be_true
end
Expand Down Expand Up @@ -218,6 +221,35 @@
b.completed?.should be_true
when_done_executed.should == 10
end

context "when exceptions occur" do
it "should throw on #wait_until_done if no exception handler" do
b = @T.new_batch
b << lambda { raise }
expect { b.wait_until_done }.to raise_error(Threadz::JobError)
end
it "should execute the exception handler when given (and not throw in #wait_until_done)" do
error = nil
b = @T.new_batch :error_handler => lambda { |e, ctrl| error = e }
b << lambda { raise }
b.wait_until_done
error.should_not be_nil
end
it "should retry up to the designated number of times" do
count = 0
b = @T.new_batch :error_handler => lambda { |e, ctrl| count += 1; ctrl.try_again(3) }
b << lambda { raise }
b.wait_until_done
count.should == 3
end
it "should stash exceptions in the #errors field" do
b = @T.new_batch
b.errors.should be_empty
b << lambda { raise }
expect { b.wait_until_done }.to raise_error(Threadz::JobError)
b.errors.should_not be_empty
end
end
end
end
end

0 comments on commit 9b60700

Please sign in to comment.