Skip to content

Commit

Permalink
Implemented Future.execute and Future#execute
Browse files Browse the repository at this point in the history
  • Loading branch information
jdantonio committed Feb 8, 2014
1 parent 7a8a852 commit 3904512
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 47 deletions.
31 changes: 24 additions & 7 deletions lib/concurrent/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ class Future

def initialize(*args, &block)
init_mutex
unless block_given?
@state = :fulfilled
if block_given?
@args = args
@state = :unscheduled
@task = block
else
@value = nil
@state = :pending
Future.thread_pool.post(*args) do
work(*args, &block)
end
@state = :fulfilled
end
end

# Is the future still unscheduled?
# @return [Boolean]
def unscheduled?() return(@state == :unscheduled); end

def add_observer(observer, func = :update)
val = self.value
mutex.synchronize do
Expand All @@ -38,6 +40,21 @@ def add_observer(observer, func = :update)
return func
end

def execute
mutex.synchronize do
return unless @state == :unscheduled
@state = :pending
end
Future.thread_pool.post(*@args) do
work(*@args, &@task)
end
return self
end

def self.execute(&block)
return Future.new(&block).execute
end

private

# @private
Expand Down
10 changes: 3 additions & 7 deletions spec/concurrent/event_machine_defer_proxy_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,10 @@ module Concurrent
it 'supports fulfillment' do

EventMachine.run do

@a = @b = @c = nil
f = Future.new(1, 2, 3) do |a, b, c|
@a, @b, @c = a, b, c
end
@expected = false
f = Future.execute{ @expected = true }
sleep(0.1)
[@a, @b, @c].should eq [1, 2, 3]

@expected.should be_true
sleep(0.1)
EventMachine.stop
end
Expand Down
99 changes: 66 additions & 33 deletions spec/concurrent/future_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ module Concurrent
let!(:rejected_reason) { StandardError.new('mojo jojo') }

let(:pending_subject) do
Future.new{ sleep(3); fulfilled_value }
Future.new{ sleep(3); fulfilled_value }.execute
end

let(:fulfilled_subject) do
Future.new{ fulfilled_value }.tap{ sleep(0.1) }
Future.new{ fulfilled_value }.execute.tap{ sleep(0.1) }
end

let(:rejected_subject) do
Future.new{ raise rejected_reason }.tap{ sleep(0.1) }
Future.new{ raise rejected_reason }.execute.tap{ sleep(0.1) }
end

before(:each) do
Expand All @@ -37,16 +37,18 @@ module Concurrent

context '#initialize' do

it 'sets the state to :unscheduled'
it 'sets the state to :unscheduled' do
Future.new{ nil }.should be_unscheduled
end

it 'does not spawn a new thread when a block is given' do
Future.thread_pool.should_not_receive(:post).once.with(any_args)
Future.thread_pool.should_not_receive(:post).with(any_args)
Thread.should_not_receive(:new).with(any_args)
Future.new{ nil }
end

it 'does not spawn a new thread when no block given' do
Future.thread_pool.should_not_receive(:post).once.with(any_args)
Future.thread_pool.should_not_receive(:post).with(any_args)
Thread.should_not_receive(:new).with(any_args)
Future.new
end
Expand All @@ -62,25 +64,55 @@ module Concurrent

context 'instance #execute' do

it 'does nothing unless the state is :unscheduled'

it 'spawns a new thread when a block was given on construction'
it 'does nothing unless the state is :unscheduled' do
Future.should_not_receive(:thread_pool).with(any_args)
future = Future.new{ nil }
future.instance_variable_set(:@state, :pending)
future.execute
future.instance_variable_set(:@state, :rejected)
future.execute
future.instance_variable_set(:@state, :fulfilled)
future.execute
end

it 'sets the sate to :pending'
it 'spawns a new thread when a block was given on construction' do
Future.thread_pool.should_receive(:post).with(any_args)
future = Future.new{ nil }
future.execute
end

it 'returns self'
it 'sets the sate to :pending' do
future = Future.new{ nil }
future.execute
future.should be_pending
end

it 'returns self' do
future = Future.new{ nil }
future.execute.should eq future
end
end

context 'class #execute' do

it 'creates a new Future'

it 'passes the block to Future'
it 'creates a new Future' do
future = Future.execute{ nil }
future.should be_a(Future)
end

it 'calls #execute on the new Future'
it 'passes the block to the new Future' do
@expected = false
future = Future.execute{ @expected = true }
sleep(0.1)
@expected.should be_true
end

it 'returns the new Future'
it 'calls #execute on the new Future' do
future = Future.new{ nil }
Future.stub(:new).with(any_args).and_return(future)
future.should_receive(:execute).with(no_args)
Future.execute{ nil }
end
end

context 'fulfillment' do
Expand All @@ -90,32 +122,33 @@ module Concurrent
end

it 'passes all arguments to handler' do
result = nil

Future.new(1, 2, 3) do |a, b, c|
result = [a, b, c]
end

result.should eq [1, 2, 3]
@expected = nil
Future.new(1, 2, 3){|a, b, c| @expected = [a, b, c] }.execute
sleep(0.1)
@expected.should eq [1, 2, 3]
end

it 'sets the value to the result of the handler' do
f = Future.new(10){ |a| a * 2 }
f = Future.new(10){ |a| a * 2 }.execute
sleep(0.1)
f.value.should eq 20
end

it 'sets the state to :fulfilled when the block completes' do
f = Future.new(10){ |a| a * 2 }
f = Future.new(10){ |a| a * 2 }.execute
sleep(0.1)
f.should be_fulfilled
end

it 'sets the value to nil when the handler raises an exception' do
f = Future.new{ raise StandardError }
f = Future.new{ raise StandardError }.execute
sleep(0.1)
f.value.should be_nil
end

it 'sets the state to :rejected when the handler raises an exception' do
f = Future.new{ raise StandardError }
f = Future.new{ raise StandardError }.execute
sleep(0.1)
f.should be_rejected
end

Expand Down Expand Up @@ -149,7 +182,7 @@ module Concurrent
let(:observer) { clazz.new }

it 'notifies all observers on fulfillment' do
future = Future.new{ sleep(0.1); 42 }
future = Future.new{ sleep(0.1); 42 }.execute
future.add_observer(observer)
future.value.should == 42
future.reason.should be_nil
Expand All @@ -159,7 +192,7 @@ module Concurrent
end

it 'notifies all observers on rejection' do
future = Future.new{ sleep(0.1); raise StandardError }
future = Future.new{ sleep(0.1); raise StandardError }.execute
future.add_observer(observer)
future.value.should be_nil
future.reason.should be_a(StandardError)
Expand All @@ -169,7 +202,7 @@ module Concurrent
end

it 'notifies an observer added after fulfillment' do
future = Future.new{ 42 }
future = Future.new{ 42 }.execute
sleep(0.1)
future.value.should == 42
future.add_observer(observer)
Expand All @@ -178,7 +211,7 @@ module Concurrent
end

it 'notifies an observer added after rejection' do
future = Future.new{ raise StandardError }
future = Future.new{ raise StandardError }.execute
sleep(0.1)
future.reason.should be_a(StandardError)
future.add_observer(observer)
Expand All @@ -187,7 +220,7 @@ module Concurrent
end

it 'does not notify existing observers when a new observer added after fulfillment' do
future = Future.new{ 42 }
future = Future.new{ 42 }.execute
future.add_observer(observer)
sleep(0.1)
future.value.should == 42
Expand All @@ -202,7 +235,7 @@ module Concurrent
end

it 'does not notify existing observers when a new observer added after rejection' do
future = Future.new{ raise StandardError }
future = Future.new{ raise StandardError }.execute
future.add_observer(observer)
sleep(0.1)
future.reason.should be_a(StandardError)
Expand Down

0 comments on commit 3904512

Please sign in to comment.