Skip to content

Commit

Permalink
Merge pull request #198 from ruby-concurrency/promise-all
Browse files Browse the repository at this point in the history
Initial implementation of Promises.all? and Promise.any? class methods.
  • Loading branch information
jdantonio committed Dec 11, 2014
2 parents 67d6631 + 4fb4b7d commit 29cd69a
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 124 deletions.
123 changes: 0 additions & 123 deletions doc/promise.md

This file was deleted.

220 changes: 219 additions & 1 deletion lib/concurrent/promise.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,167 @@

module Concurrent

# {include:file:doc/promise.md}
PromiseExecutionError = Class.new(StandardError)

# Promises are inspired by the JavaScript [Promises/A](http://wiki.commonjs.org/wiki/Promises/A)
# and [Promises/A+](http://promises-aplus.github.io/promises-spec/) specifications.
#
# > A promise represents the eventual value returned from the single completion of an operation.
#
# Promises are similar to futures and share many of the same behaviours. Promises are far more
# robust, however. Promises can be chained in a tree structure where each promise may have zero
# or more children. Promises are chained using the `then` method. The result of a call to `then`
# is always another promise. Promises are resolved asynchronously (with respect to the main thread)
# but in a strict order: parents are guaranteed to be resolved before their children, children
# before their younger siblings. The `then` method takes two parameters: an optional block to
# be executed upon parent resolution and an optional callable to be executed upon parent failure.
# The result of each promise is passed to each of its children upon resolution. When a promise
# is rejected all its children will be summarily rejected and will receive the reason.
#
# Promises have four possible states: *unscheduled*, *pending*, *rejected*, and *fulfilled*. A
# Promise created using `.new` will be *unscheduled*. It is scheduled by calling the `execute`
# method. Upon execution the Promise and all its children will be set to *pending*. When a promise
# is *pending* it will remain in that state until processing is complete. A completed Promise is
# either *rejected*, indicating that an exception was thrown during processing, or *fulfilled*,
# indicating it succeeded. If a Promise is *fulfilled* its `value` will be updated to reflect
# the result of the operation. If *rejected* the `reason` will be updated with a reference to
# the thrown exception. The predicate methods `unscheduled?`, `pending?`, `rejected?`, and
# `fulfilled?` can be called at any time to obtain the state of the Promise, as can the `state`
# method, which returns a symbol. A Promise created using `.execute` will be *pending*, a Promise
# created using `.fulfill(value)` will be *fulfilled* with the given value and a Promise created
# using `.reject(reason)` will be *rejected* with the given reason.
#
# Retrieving the value of a promise is done through the `value` (alias: `deref`) method. Obtaining
# the value of a promise is a potentially blocking operation. When a promise is *rejected* a call
# to `value` will return `nil` immediately. When a promise is *fulfilled* a call to `value` will
# immediately return the current value. When a promise is *pending* a call to `value` will block
# until the promise is either *rejected* or *fulfilled*. A *timeout* value can be passed to `value`
# to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call
# will not block. Any other integer or float value will indicate the maximum number of seconds to block.
#
# Promises run on the global thread pool.
#
# ### Examples
#
# Start by requiring promises
#
# ```ruby
# require 'concurrent'
# ```
#
# Then create one
#
# ```ruby
# p = Concurrent::Promise.execute do
# # do something
# 42
# end
# ```
#
# Promises can be chained using the `then` method. The `then` method accepts a block, to be executed
# on fulfillment, and a callable argument to be executed on rejection. The result of the each promise
# is passed as the block argument to chained promises.
#
# ```ruby
# p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute
# ```
#
# And so on, and so on, and so on...
#
# ```ruby
# p = Concurrent::Promise.fulfill(20).
# then{|result| result - 10 }.
# then{|result| result * 3 }.
# then{|result| result % 5 }.execute
# ```
#
# The initial state of a newly created Promise depends on the state of its parent:
# - if parent is *unscheduled* the child will be *unscheduled*
# - if parent is *pending* the child will be *pending*
# - if parent is *fulfilled* the child will be *pending*
# - if parent is *rejected* the child will be *pending* (but will ultimately be *rejected*)
#
# Promises are executed asynchronously from the main thread. By the time a child Promise finishes
# nitialization it may be in a different state that its parent (by the time a child is created its parent
# may have completed execution and changed state). Despite being asynchronous, however, the order of
# execution of Promise objects in a chain (or tree) is strictly defined.
#
# There are multiple ways to create and execute a new `Promise`. Both ways provide identical behavior:
#
# ```ruby
# # create, operate, then execute
# p1 = Concurrent::Promise.new{ "Hello World!" }
# p1.state #=> :unscheduled
# p1.execute
#
# # create and immediately execute
# p2 = Concurrent::Promise.new{ "Hello World!" }.execute
#
# # execute during creation
# p3 = Concurrent::Promise.execute{ "Hello World!" }
# ```
#
# Once the `execute` method is called a `Promise` becomes `pending`:
#
# ```ruby
# p = Concurrent::Promise.execute{ "Hello, world!" }
# p.state #=> :pending
# p.pending? #=> true
# ```
#
# Wait a little bit, and the promise will resolve and provide a value:
#
# ```ruby
# p = Concurrent::Promise.execute{ "Hello, world!" }
# sleep(0.1)
#
# p.state #=> :fulfilled
# p.fulfilled? #=> true
# p.value #=> "Hello, world!"
# ```
#
# If an exception occurs, the promise will be rejected and will provide
# a reason for the rejection:
#
# ```ruby
# p = Concurrent::Promise.execute{ raise StandardError.new("Here comes the Boom!") }
# sleep(0.1)
#
# p.state #=> :rejected
# p.rejected? #=> true
# p.reason #=> "#<StandardError: Here comes the Boom!>"
# ```
#
# #### Rejection
#
# When a promise is rejected all its children will be rejected and will receive the rejection `reason`
# as the rejection callable parameter:
#
# ```ruby
# p = [ Concurrent::Promise.execute{ Thread.pass; raise StandardError } ]
#
# c1 = p.then(Proc.new{ |reason| 42 })
# c2 = p.then(Proc.new{ |reason| raise 'Boom!' })
#
# sleep(0.1)
#
# c1.state #=> :rejected
# c2.state #=> :rejected
# ```
#
# Once a promise is rejected it will continue to accept children that will receive immediately rejection
# (they will be executed asynchronously).
#
# #### Aliases
#
# The `then` method is the most generic alias: it accepts a block to be executed upon parent fulfillment
# and a callable to be executed upon parent rejection. At least one of them should be passed. The default
# block is `{ |result| result }` that fulfills the child with the parent value. The default callable is
# `{ |reason| raise reason }` that rejects the child with the parent reason.
#
# - `on_success { |result| ... }` is the same as `then {|result| ... }`
# - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )`
# - `rescue` is aliased by `catch` and `on_error`
class Promise
# TODO unify promise and future to single class, with dataflow
include Obligation
Expand Down Expand Up @@ -168,8 +328,66 @@ def zip(*others)
self.class.zip(self, *others)
end

# Aggregates a collection of promises and executes the `then` condition
# if all aggregated promises succeed. Executes the `rescue` handler with
# a `Concurrent::PromiseExecutionError` if any of the aggregated promises
# fail. Upon execution will execute any of the aggregate promises that
# were not already executed.
#
# @!macro [attach] promise_self_aggregate
#
# The returned promise will not yet have been executed. Additional `#then`
# and `#rescue` handlers may still be provided. Once the returned promise
# is execute the aggregate promises will be also be executed (if they have
# not been executed already). The results of the aggregate promises will
# be checked upon completion. The necessary `#then` and `#rescue` blocks
# on the aggregating promise will then be executed as appropriate. If the
# `#rescue` handlers are executed the raises exception will be
# `Concurrent::PromiseExecutionError`.
#
# @param [Array] promises Zero or more promises to aggregate
# @return [Promise] an unscheduled (not executed) promise that aggregates
# the promises given as arguments
def self.all?(*promises)
aggregate(:all?, *promises)
end

# Aggregates a collection of promises and executes the `then` condition
# if any aggregated promises succeed. Executes the `rescue` handler with
# a `Concurrent::PromiseExecutionError` if any of the aggregated promises
# fail. Upon execution will execute any of the aggregate promises that
# were not already executed.
#
# @!macro promise_self_aggregate
def self.any?(*promises)
aggregate(:any?, *promises)
end

protected

# Aggregate a collection of zero or more promises under a composite promise,
# execute the aggregated promises and collect them into a standard Ruby array,
# call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`,
# or `one?`) on the collection checking for the success or failure of each,
# then executing the composite's `#then` handlers if the predicate returns
# `true` or executing the composite's `#rescue` handlers if the predicate
# returns false.
#
# @!macro promise_self_aggregate
def self.aggregate(method, *promises)
composite = Promise.new do
completed = promises.collect do |promise|
promise.execute if promise.unscheduled?
promise.wait
promise
end
unless completed.empty? || completed.send(method){|promise| promise.fulfilled? }
raise PromiseExecutionError
end
end
composite
end

def set_pending
mutex.synchronize do
@state = :pending
Expand Down
Loading

0 comments on commit 29cd69a

Please sign in to comment.