Skip to content

Better API for future zipping #496

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 60 additions & 52 deletions lib/concurrent/edge/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,23 @@ def schedule(intended_time, default_executor = :io, &task)
ScheduledPromise.new(default_executor, intended_time).future.then(&task)
end

# Constructs new {Future} which is completed after all futures are complete. Its value is array
# of dependent future values. If there is an error it fails with the first one.
# @param [Event] futures
# Constructs new {Future} which is completed after all futures_and_or_events are complete. Its value is array
# of dependent future values. If there is an error it fails with the first one. Event does not
# have a value so it's represented by nil in the array of values.
# @param [Event] futures_and_or_events
# @return [Future]
def zip(*futures)
ZipPromise.new(futures, :io).future
def zip_futures(*futures_and_or_events)
ZipFuturesPromise.new(futures_and_or_events, :io).future
end

alias_method :zip, :zip_futures

# Constructs new {Event} which is completed after all futures_and_or_events are complete
# (Future is completed when Success or Failed).
# @param [Event] futures_and_or_events
# @return [Event]
def zip_events(*futures_and_or_events)
ZipEventsPromise.new(futures_and_or_events, :io).future
end

# Constructs new {Future} which is completed after first of the futures is complete.
Expand All @@ -95,6 +106,7 @@ def any(*futures)
# @return [Future]
def select(*channels)
future do
# noinspection RubyArgCount
Channel.select do |s|
channels.each do |ch|
s.take(ch) { |value| [value, ch] }
Expand Down Expand Up @@ -503,9 +515,9 @@ def apply(block)
# @!visibility private
class PartiallyFailed < CompletedWithResult
def initialize(value, reason)
super()
@Value = value
@Reason = reason
super()
end

def success?
Expand Down Expand Up @@ -670,7 +682,7 @@ def schedule(intended_time)
# Zips with selected value form the suplied channels
# @return [Future]
def then_select(*channels)
ZipPromise.new([self, Concurrent.select(*channels)], @DefaultExecutor).future
ZipFuturesPromise.new([self, Concurrent.select(*channels)], @DefaultExecutor).future
end

# Changes default executor for rest of the chain
Expand Down Expand Up @@ -987,12 +999,16 @@ class InnerPromise < AbstractPromise
# @abstract
# @!visibility private
class BlockedPromise < InnerPromise
def self.new(*args, &block)
promise = super(*args, &block)
promise.blocked_by.each { |f| f.add_callback :pr_callback_notify_blocked, promise }
promise
end

def initialize(future, blocked_by_futures, countdown)
super(future)
initialize_blocked_by(blocked_by_futures)
@Countdown = AtomicFixnum.new countdown

super(future)
@BlockedBy.each { |f| f.add_callback :pr_callback_notify_blocked, self }
end

# @api private
Expand Down Expand Up @@ -1053,9 +1069,9 @@ def on_completable(done_future)
class BlockedTaskPromise < BlockedPromise
def initialize(blocked_by_future, default_executor, executor, &task)
raise ArgumentError, 'no block given' unless block_given?
super Future.new(self, default_executor), blocked_by_future, 1
@Executor = executor
@Task = task
super Future.new(self, default_executor), blocked_by_future, 1
end

def executor
Expand Down Expand Up @@ -1203,8 +1219,8 @@ def on_completable(done_future)
# @!visibility private
class ZipFutureEventPromise < BlockedPromise
def initialize(future, event, default_executor)
@FutureResult = future
super Future.new(self, default_executor), [future, event], 2
@FutureResult = future
end

def on_completable(done_future)
Expand All @@ -1215,9 +1231,9 @@ def on_completable(done_future)
# @!visibility private
class ZipFutureFuturePromise < BlockedPromise
def initialize(future1, future2, default_executor)
super Future.new(self, default_executor), [future1, future2], 2
@Future1Result = future1
@Future2Result = future2
super Future.new(self, default_executor), [future1, future2], 2
end

def on_completable(done_future)
Expand Down Expand Up @@ -1256,62 +1272,54 @@ def on_completable(done_future)
end

# @!visibility private
class ZipPromise < BlockedPromise
class ZipFuturesPromise < BlockedPromise

private

def initialize(blocked_by_futures, default_executor)
klass = Event
blocked_by_futures.each do |f|
if f.is_a?(Future)
if klass == Event
klass = Future
break
end
end
end

# noinspection RubyArgCount
super(klass.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)
super(Future.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)

if blocked_by_futures.empty?
on_completable nil
end
on_completable nil if blocked_by_futures.empty?
end

def on_completable(done_future)
all_success = true
values = []
reasons = []

blocked_by.each do |future|
next unless future.is_a?(Future)
success, value, reason = future.result
values = Array.new(blocked_by.size)
reasons = Array.new(blocked_by.size)

unless success
all_success = false
blocked_by.each_with_index do |future, i|
if future.is_a?(Future)
success, values[i], reasons[i] = future.result
all_success &&= success
else
values[i] = reasons[i] = nil
end

values << value
reasons << reason
end

if all_success
if values.empty?
complete_with Event::COMPLETED
else
if values.size == 1
complete_with Future::Success.new(values.first)
else
complete_with Future::SuccessArray.new(values)
end
end
complete_with Future::SuccessArray.new(values)
else
complete_with Future::PartiallyFailed.new(values, reasons)
end
end
end

# @!visibility private
class ZipEventsPromise < BlockedPromise

private

def initialize(blocked_by_futures, default_executor)
super(Event.new(self, default_executor), blocked_by_futures, blocked_by_futures.size)

on_completable nil if blocked_by_futures.empty?
end

def on_completable(done_future)
complete_with Event::COMPLETED
end
end

# @!visibility private
class AnyPromise < BlockedPromise

Expand Down Expand Up @@ -1354,8 +1362,8 @@ def touch
private

def initialize(default_executor, value)
@Value = value
super Future.new(self, default_executor)
@Value = value
end
end

Expand All @@ -1373,6 +1381,8 @@ def inspect
private

def initialize(default_executor, intended_time)
super Event.new(self, default_executor)

@IntendedTime = intended_time

in_seconds = begin
Expand All @@ -1385,8 +1395,6 @@ def initialize(default_executor, intended_time)
[0, schedule_time.to_f - now.to_f].max
end

super Event.new(self, default_executor)

Concurrent.global_timer_set.post(in_seconds) do
@Future.complete_with Event::COMPLETED
end
Expand Down
39 changes: 36 additions & 3 deletions spec/concurrent/edge/future_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
specify do
completable_future = Concurrent.future
one = completable_future.then(&:succ)
join = Concurrent.zip(completable_future).then { |v| v }
join = Concurrent.zip_futures(completable_future).then { |v| v }
expect(one.completed?).to be false
completable_future.success 0
expect(one.value!).to eq 1
Expand Down Expand Up @@ -138,25 +138,43 @@
end

describe '.zip' do
it 'continues on first result' do
it 'waits for all results' do
a = Concurrent.future { 1 }
b = Concurrent.future { 2 }
c = Concurrent.future { 3 }

z1 = a & b
z2 = Concurrent.zip a, b, c
z3 = Concurrent.zip a
z4 = Concurrent.zip

expect(z1.value!).to eq [1, 2]
expect(z2.value!).to eq [1, 2, 3]
expect(z3.value!).to eq [1]
expect(z4.value!).to eq []

q = Queue.new
z1.then { |*args| q << args }
expect(q.pop).to eq [1, 2]

z1.then { |a, b, c| q << [a, b, c] }
expect(q.pop).to eq [1, 2, nil]

z2.then { |a, b, c| q << [a, b, c] }
expect(q.pop).to eq [1, 2, 3]

z3.then { |a| q << a }
expect(q.pop).to eq 1

z3.then { |*a| q << a }
expect(q.pop).to eq [1]

z4.then { |a| q << a }
expect(q.pop).to eq nil

z4.then { |*a| q << a }
expect(q.pop).to eq []

expect(z1.then { |a, b| a+b }.value!).to eq 3
expect(z1.then { |a, b| a+b }.value!).to eq 3
expect(z1.then(&:+).value!).to eq 3
Expand Down Expand Up @@ -188,7 +206,22 @@
end

end
end

describe '.zip_events' do
it 'waits for all and returns event' do
a = Concurrent.succeeded_future 1
b = Concurrent.failed_future :any
c = Concurrent.event.complete

z2 = Concurrent.zip_events a, b, c
z3 = Concurrent.zip_events a
z4 = Concurrent.zip_events

expect(z2.completed?).to be_truthy
expect(z3.completed?).to be_truthy
expect(z4.completed?).to be_truthy
end
end

describe 'Future' do
Expand Down Expand Up @@ -340,7 +373,7 @@
f.wait 1
expect(f).to be_completed
expect(f).to be_failed
expect{ f.value! }.to raise_error(Exception, 'fail')
expect { f.value! }.to raise_error(Exception, 'fail')
end
end

Expand Down