Skip to content

Commit

Permalink
wip: First implementation of Runnable mixin. Need to review exception…
Browse files Browse the repository at this point in the history
… handling.
  • Loading branch information
jdantonio committed Oct 5, 2013
1 parent 07030b8 commit fcbcb99
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 7 deletions.
1 change: 1 addition & 0 deletions lib/concurrent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
require 'concurrent/goroutine'
require 'concurrent/obligation'
require 'concurrent/promise'
require 'concurrent/runnable'
require 'concurrent/supervisor'
require 'concurrent/utilities'

Expand Down
77 changes: 77 additions & 0 deletions lib/concurrent/runnable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
require 'thread'
require 'functional'

behavior_info(:runnable,
run: 0,
stop: 0,
running?: 0)

module Concurrent

module Runnable
behavior(:runnable)

Context = Struct.new(:runner, :thread)
LifecycleError = Class.new(StandardError)

def run
mutex.synchronize do
raise LifecycleError.new('already running') if @running == true
raise NotImplementedError.new('#on_task') unless self.respond_to?(:on_task)
@running = true
on_run if respond_to?(:on_run)
end

begin
loop do
break unless @running
on_task
break unless @running
Thread.pass
end

return true
rescue => ex
@running = false
return false
end
end

def stop
return true unless @running
mutex.synchronize do
on_stop if respond_to?(:on_stop)
@running = false
end
return true
rescue => ex
return false
end

def running?
return @running == true
end

def self.included(base)
class << base
def run!(*args)
context = Context.new
context.runner = self.new(*args)
context.thread = Thread.new(context.runner) do |runner|
Thread.abort_on_exception = false
runner.run
end
return context
rescue => ex
return nil
end
end
end

protected

def mutex
@mutex ||= Mutex.new
end
end
end
5 changes: 1 addition & 4 deletions lib/concurrent/supervisor.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
require 'thread'
require 'functional'

behavior_info(:runnable,
run: 0,
stop: 0,
running?: 0)
require 'concurrent/runnable'

module Concurrent

Expand Down
218 changes: 218 additions & 0 deletions spec/concurrent/runnable_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
require 'spec_helper'

module Concurrent

describe Runnable do

let(:runnable_without_callbacks) do
Class.new {
include Runnable
attr_reader :thread
def on_task
@thread = Thread.current
sleep(0.1)
end
}
end

let(:runnable_with_callbacks) do
Class.new(runnable_without_callbacks) do
def on_run() return true; end
def on_stop() return true; end
end
end

subject { runnable_without_callbacks.new }

after(:each) do
@thread.kill unless @thread.nil?
end

context '#run' do

it 'starts the (blocking) runner on the current thread when stopped' do
@thread = Thread.new { subject.run }
@thread.join(1).should be_nil
end

it 'calls #on_run when implemented' do
runner = runnable_with_callbacks.new
runner.should_receive(:on_run).with(no_args())
@thread = Thread.new { runner.run }
sleep(0.1)
end

it 'does not attempt to call #on_run when not implemented' do
runner = runnable_without_callbacks.new
@thread = Thread.new do
expect {
runner.run
}.not_to raise_error
end
sleep(0.1)
end

it 'raises an exception when already running' do
@thread = Thread.new { subject.run }
sleep(0.1)
expect {
subject.run
}.to raise_error(Runnable::LifecycleError)
end

it 'returns true when stopped normally' do
@expected = false
@thread = Thread.new { @expected = subject.run }
sleep(0.1)
subject.stop
sleep(0.1)
@expected.should be_true
end

it 'raises an exception if the #on_task callback is not implemented' do
runner = Class.new { include Runnable }.new
expect {
runner.run
}.to raise_error(NotImplementedError)
end

it 'calls #on_task in an infinite loop' do
subject.should_receive(:on_task).with(no_args()).at_least(1)
@thread = Thread.new { subject.run }
@thread.join(1)
end

it 'returns false when the task loop raises an exception' do
@expected = false
subject.stub(:on_task).and_raise(StandardError)
@thread = Thread.new { @expected = subject.run }
sleep(0.1)
@expected.should be_false
end

it 'returns false when stopped abnormally' do
@expected = false
subject.stub(:on_stop).and_raise(StandardError)
@thread = Thread.new { @expected = subject.run }
sleep(0.1)
@expected.should be_false
end

it 'raises an exception when it fails to run' do
subject.stub(:on_run).and_raise(StandardError)
@thread = Thread.new { subject.run }
sleep(0.1)
expect {
subject.run
}.to raise_error(StandardError)
end
end

context '#stop' do

it 'calls #on_stop when implemented' do
runner = runnable_with_callbacks.new
runner.should_receive(:on_stop).with(no_args())
@thread = Thread.new { runner.run }
sleep(0.1)
runner.stop
sleep(0.1)
end

it 'does not attempt to call #on_stop when not implemented' do
runner = runnable_without_callbacks.new
@thread = Thread.new { runner.run }
sleep(0.1)
expect {
runner.stop
}.not_to raise_error
end

it 'returns true when not running' do
subject.stop.should be_true
end

it 'returns true when successfully stopped' do
@thread = Thread.new { subject.run }
sleep(0.1)
subject.stop.should be_true
end

it 'returns false when it fails to stop' do
subject.stub(:on_stop).and_raise(StandardError)
@thread = Thread.new { subject.run }
sleep(0.1)
subject.stop.should be_false
end
end

context '#running?' do

it 'returns true when running' do
@thread = Thread.new { subject.run }
sleep(0.1)
subject.should be_running
end

it 'returns false when not running' do
subject.should_not be_running
end

it 'returns false if runner abends' do
subject.stub(:on_task).and_raise(StandardError)
@thread = Thread.new { subject.run }
sleep(0.1)
subject.should_not be_running
end
end

context '#run!' do

let(:runnable) { runnable_without_callbacks }

after(:each) do
@context.runner.stop if @context && @context.runner
@context.thread.kill if @context && @context.thread
end

it 'creates a new runner' do
runnable.should_receive(:new).once.with(no_args())
@context = runnable.run!
sleep(0.1)
end

it 'passes all args to the runner constructor' do
args = [1, 2, :three, :four]
runnable.should_receive(:new).once.with(*args)
@context = runnable.run!(*args)
sleep(0.1)
end

it 'creates a new thread' do
Thread.should_receive(:new).with(any_args()).and_return(nil)
@context = runnable.run!
sleep(0.1)
end

it 'runs the runner on the new thread' do
@context = runnable.run!
sleep(0.1)
@context.runner.thread.should_not eq Thread.current
@context.runner.thread.should eq @context.thread
end

it 'returns a context object on success' do
@context = runnable.run!
sleep(0.1)
@context.should be_a(Runnable::Context)
end

it 'returns nil on failure' do
Thread.stub(:new).with(any_args()).and_raise(StandardError)
@context = runnable.run!
sleep(0.1)
@context.should be_nil
end
end
end
end
4 changes: 1 addition & 3 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
require 'concurrent'
require 'concurrent/functions'

require 'functional'

# import all the support files
Dir[File.join(File.dirname(__FILE__), 'support/**/*.rb')].each { |f| require File.expand_path(f) }

RSpec.configure do |config|
config.order = 'random'
#config.order = 'random'

config.before(:suite) do
end
Expand Down

0 comments on commit fcbcb99

Please sign in to comment.