From 26ae8887fd5d955c5bb4d4e834403ca355539a66 Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Wed, 26 Nov 2014 13:35:28 -0500 Subject: [PATCH 1/3] Initial implementation of Promises.all, .any, .none, and .one class methods. --- doc/promise.md | 123 ----------------- lib/concurrent/promise.rb | 206 ++++++++++++++++++++++++++++- spec/concurrent/promise_spec.rb | 227 ++++++++++++++++++++++++++++++++ 3 files changed, 432 insertions(+), 124 deletions(-) delete mode 100644 doc/promise.md diff --git a/doc/promise.md b/doc/promise.md deleted file mode 100644 index 87005a9b6..000000000 --- a/doc/promise.md +++ /dev/null @@ -1,123 +0,0 @@ -Promises are inspired by the JavaScript [Promises/A](http://wiki.commonjs.org/wiki/Promises/A) and [Promises/A+](http://promises-aplus.github.io/promises-spec/) specifications. - -> A promise represents the eventual value returned from the single completion of an operation. - -Promises are similar to futures and share many of the same behaviours. Promises are far more robust, however. Promises can be chained in a tree structure where each promise may have zero or more children. Promises are chained using the `then` method. The result of a call to `then` is always another promise. Promises are resolved asynchronously (with respect to the main thread) but in a strict order: parents are guaranteed to be resolved before their children, children before their younger siblings. The `then` method takes two parameters: an optional block to be executed upon parent resolution and an optional callable to be executed upon parent failure. The result of each promise is passed to each of its children upon resolution. When a promise is rejected all its children will be summarily rejected and will receive the reason. - -Promises have four possible states: *unscheduled*, *pending*, *rejected*, and *fulfilled*. A Promise created using `.new` will be *unscheduled*. It is scheduled by calling the `execute` method. Upon execution the Promise and all its children will be set to *pending*. When a promise is *pending* it will remain in that state until processing is complete. A completed Promise is either *rejected*, indicating that an exception was thrown during processing, or *fulfilled*, indicating it succeeded. If a Promise is *fulfilled* its `value` will be updated to reflect the result of the operation. If *rejected* the `reason` will be updated with a reference to the thrown exception. The predicate methods `unscheduled?`, `pending?`, `rejected?`, and `fulfilled?` can be called at any time to obtain the state of the Promise, as can the `state` method, which returns a symbol. A Promise created using `.execute` will be *pending*, a Promise created using `.fulfill(value)` will be *fulfilled* with the given value and a Promise created using `.reject(reason)` will be *rejected* with the given reason. - -Retrieving the value of a promise is done through the `value` (alias: `deref`) method. Obtaining the value of a promise is a potentially blocking operation. When a promise is *rejected* a call to `value` will return `nil` immediately. When a promise is *fulfilled* a call to `value` will immediately return the current value. When a promise is *pending* a call to `value` will block until the promise is either *rejected* or *fulfilled*. A *timeout* value can be passed to `value` to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call will not block. Any other integer or float value will indicate the maximum number of seconds to block. - -Promises run on the global thread pool. - -### Examples - -Start by requiring promises - -```ruby -require 'concurrent' -``` - -Then create one - -```ruby -p = Promise.execute do - # do something - 42 - end -``` - -Promises can be chained using the `then` method. The `then` method accepts a block, to be executed on fulfillment, and a callable argument to be executed on rejection. The result of the each promise is passed as the block argument to chained promises. - -```ruby -p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute -``` - -And so on, and so on, and so on... - -```ruby -p = Concurrent::Promise.fulfill(20). - then{|result| result - 10 }. - then{|result| result * 3 }. - then{|result| result % 5 }.execute -``` - -The initial state of a newly created Promise depends on the state of its parent: -- if parent is *unscheduled* the child will be *unscheduled* -- if parent is *pending* the child will be *pending* -- if parent is *fulfilled* the child will be *pending* -- if parent is *rejected* the child will be *pending* (but will ultimately be *rejected*) - -Promises are executed asynchronously from the main thread. By the time a child Promise finishes initialization it may be in a different state that its parent (by the time a child is created its parent may have completed execution and changed state). Despite being asynchronous, however, the order of execution of Promise objects in a chain (or tree) is strictly defined. - -There are multiple ways to create and execute a new `Promise`. Both ways provide identical behavior: - -```ruby -# create, operate, then execute -p1 = Concurrent::Promise.new{ "Hello World!" } -p1.state #=> :unscheduled -p1.execute - -# create and immediately execute -p2 = Concurrent::Promise.new{ "Hello World!" }.execute - -# execute during creation -p3 = Concurrent::Promise.execute{ "Hello World!" } -``` - -Once the `execute` method is called a `Promise` becomes `pending`: - -```ruby -p = Concurrent::Promise.execute{ "Hello, world!" } -p.state #=> :pending -p.pending? #=> true -``` - -Wait a little bit, and the promise will resolve and provide a value: - -```ruby -p = Concurrent::Promise.execute{ "Hello, world!" } -sleep(0.1) - -p.state #=> :fulfilled -p.fulfilled? #=> true -p.value #=> "Hello, world!" -``` - -If an exception occurs, the promise will be rejected and will provide -a reason for the rejection: - -```ruby -p = Concurrent::Promise.execute{ raise StandardError.new("Here comes the Boom!") } -sleep(0.1) - -p.state #=> :rejected -p.rejected? #=> true -p.reason #=> "#" -``` - -#### Rejection - -When a promise is rejected all its children will be rejected and will receive the rejection `reason` as the rejection callable parameter: - -```ruby -p = [ Concurrent::Promise.execute{ Thread.pass; raise StandardError } ] - -c1 = p.then(Proc.new{ |reason| 42 }) -c2 = p.then(Proc.new{ |reason| raise 'Boom!' }) - -sleep(0.1) - -c1.state #=> :rejected -c2.state #=> :rejected -``` - -Once a promise is rejected it will continue to accept children that will receive immediately rejection (they will be executed asynchronously). - -#### Aliases - -The `then` method is the most generic alias: it accepts a block to be executed upon parent fulfillment and a callable to be executed upon parent rejection. At least one of them should be passed. The default block is `{ |result| result }` that fulfills the child with the parent value. The default callable is `{ |reason| raise reason }` that rejects the child with the parent reason. - -- `on_success { |result| ... }` is the same as `then {|result| ... }` -- `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )` -- `rescue` is aliased by `catch` and `on_error` diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index eeba72583..acc6a8daf 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -5,7 +5,131 @@ module Concurrent - # {include:file:doc/promise.md} + PromiseExecutionError = Class.new(StandardError) + + # Promises are inspired by the JavaScript [Promises/A](http://wiki.commonjs.org/wiki/Promises/A) and [Promises/A+](http://promises-aplus.github.io/promises-spec/) specifications. + # + # > A promise represents the eventual value returned from the single completion of an operation. + # + # Promises are similar to futures and share many of the same behaviours. Promises are far more robust, however. Promises can be chained in a tree structure where each promise may have zero or more children. Promises are chained using the `then` method. The result of a call to `then` is always another promise. Promises are resolved asynchronously (with respect to the main thread) but in a strict order: parents are guaranteed to be resolved before their children, children before their younger siblings. The `then` method takes two parameters: an optional block to be executed upon parent resolution and an optional callable to be executed upon parent failure. The result of each promise is passed to each of its children upon resolution. When a promise is rejected all its children will be summarily rejected and will receive the reason. + # + # Promises have four possible states: *unscheduled*, *pending*, *rejected*, and *fulfilled*. A Promise created using `.new` will be *unscheduled*. It is scheduled by calling the `execute` method. Upon execution the Promise and all its children will be set to *pending*. When a promise is *pending* it will remain in that state until processing is complete. A completed Promise is either *rejected*, indicating that an exception was thrown during processing, or *fulfilled*, indicating it succeeded. If a Promise is *fulfilled* its `value` will be updated to reflect the result of the operation. If *rejected* the `reason` will be updated with a reference to the thrown exception. The predicate methods `unscheduled?`, `pending?`, `rejected?`, and `fulfilled?` can be called at any time to obtain the state of the Promise, as can the `state` method, which returns a symbol. A Promise created using `.execute` will be *pending*, a Promise created using `.fulfill(value)` will be *fulfilled* with the given value and a Promise created using `.reject(reason)` will be *rejected* with the given reason. + # + # Retrieving the value of a promise is done through the `value` (alias: `deref`) method. Obtaining the value of a promise is a potentially blocking operation. When a promise is *rejected* a call to `value` will return `nil` immediately. When a promise is *fulfilled* a call to `value` will immediately return the current value. When a promise is *pending* a call to `value` will block until the promise is either *rejected* or *fulfilled*. A *timeout* value can be passed to `value` to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call will not block. Any other integer or float value will indicate the maximum number of seconds to block. + # + # Promises run on the global thread pool. + # + # ### Examples + # + # Start by requiring promises + # + # ```ruby + # require 'concurrent' + # ``` + # + # Then create one + # + # ```ruby + # p = Promise.execute do + # # do something + # 42 + # end + # ``` + # + # Promises can be chained using the `then` method. The `then` method accepts a block, to be executed on fulfillment, and a callable argument to be executed on rejection. The result of the each promise is passed as the block argument to chained promises. + # + # ```ruby + # p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute + # ``` + # + # And so on, and so on, and so on... + # + # ```ruby + # p = Concurrent::Promise.fulfill(20). + # then{|result| result - 10 }. + # then{|result| result * 3 }. + # then{|result| result % 5 }.execute + # ``` + # + # The initial state of a newly created Promise depends on the state of its parent: + # - if parent is *unscheduled* the child will be *unscheduled* + # - if parent is *pending* the child will be *pending* + # - if parent is *fulfilled* the child will be *pending* + # - if parent is *rejected* the child will be *pending* (but will ultimately be *rejected*) + # + # Promises are executed asynchronously from the main thread. By the time a child Promise finishes initialization it may be in a different state that its parent (by the time a child is created its parent may have completed execution and changed state). Despite being asynchronous, however, the order of execution of Promise objects in a chain (or tree) is strictly defined. + # + # There are multiple ways to create and execute a new `Promise`. Both ways provide identical behavior: + # + # ```ruby + # # create, operate, then execute + # p1 = Concurrent::Promise.new{ "Hello World!" } + # p1.state #=> :unscheduled + # p1.execute + # + # # create and immediately execute + # p2 = Concurrent::Promise.new{ "Hello World!" }.execute + # + # # execute during creation + # p3 = Concurrent::Promise.execute{ "Hello World!" } + # ``` + # + # Once the `execute` method is called a `Promise` becomes `pending`: + # + # ```ruby + # p = Concurrent::Promise.execute{ "Hello, world!" } + # p.state #=> :pending + # p.pending? #=> true + # ``` + # + # Wait a little bit, and the promise will resolve and provide a value: + # + # ```ruby + # p = Concurrent::Promise.execute{ "Hello, world!" } + # sleep(0.1) + # + # p.state #=> :fulfilled + # p.fulfilled? #=> true + # p.value #=> "Hello, world!" + # ``` + # + # If an exception occurs, the promise will be rejected and will provide + # a reason for the rejection: + # + # ```ruby + # p = Concurrent::Promise.execute{ raise StandardError.new("Here comes the Boom!") } + # sleep(0.1) + # + # p.state #=> :rejected + # p.rejected? #=> true + # p.reason #=> "#" + # ``` + # + # #### Rejection + # + # When a promise is rejected all its children will be rejected and will receive the rejection `reason` as the rejection callable parameter: + # + # ```ruby + # p = [ Concurrent::Promise.execute{ Thread.pass; raise StandardError } ] + # + # c1 = p.then(Proc.new{ |reason| 42 }) + # c2 = p.then(Proc.new{ |reason| raise 'Boom!' }) + # + # sleep(0.1) + # + # c1.state #=> :rejected + # c2.state #=> :rejected + # ``` + # + # Once a promise is rejected it will continue to accept children that will receive immediately rejection (they will be executed asynchronously). + # + # #### Aliases + # + # The `then` method is the most generic alias: it accepts a block to be executed upon parent fulfillment and a callable to be executed upon parent rejection. At least one of them should be passed. The default block is `{ |result| result }` that fulfills the child with the parent value. The default callable is `{ |reason| raise reason }` that rejects the child with the parent reason. + # + # - `on_success { |result| ... }` is the same as `then {|result| ... }` + # - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )` + # - `rescue` is aliased by `catch` and `on_error` class Promise # TODO unify promise and future to single class, with dataflow include Obligation @@ -168,6 +292,86 @@ def zip(*others) self.class.zip(self, *others) end + # Aggregate a collection of zero or more promises under a composite promise, + # execute the aggregated promises and collect them into a standard Ruby array, + # call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`, + # or `one?`) on the collection checking for the success or failure of each, + # then executing the composite's `#then` handlers if the predicate returns + # `true` or executing the composite's `#rescue` handlers if the predicate + # returns false. + # + # @!macro [attach] promise_self_aggregate + # + # The returned promise will not yet have been executed. Additional `#then` + # and `#rescue` handlers may still be provided. Once the returned promise + # is execute the aggregate promises will be also be executed (if they have + # not been executed already). The results of the aggregate promises will + # be checked upon completion. The necessary `#then` and `#rescue` blocks + # on the aggregating promise will then be executed as appropriate. If the + # `#rescue` handlers are executed the raises exception will be + # `Concurrent::PromiseExecutionError`. + # + # @param [Array] promises Zero or more promises to aggregate + # @return [Promise] an unscheduled (not executed) promise that aggregates + # the promises given as arguments + def self.aggregate(method, *promises) + composite = Promise.new do + completed = promises.collect do |promise| + promise.execute if promise.unscheduled? + promise.wait + promise + end + unless completed.empty? || completed.send(method){|promise| promise.fulfilled? } + raise PromiseExecutionError + end + end + composite + end + + # Aggregates a collection of promises and executes the `then` condition + # if all aggregated promises succeed. Executes the `rescue` handler with + # a `Concurrent::PromiseExecutionError` if any of the aggregated promises + # fail. Upon execution will execute any of the aggregate promises that + # were not already executed. + # + # @!macro promise_self_aggregate + def self.all(*promises) + aggregate(:all?, *promises) + end + + # Aggregates a collection of promises and executes the `then` condition + # if any aggregated promises succeed. Executes the `rescue` handler with + # a `Concurrent::PromiseExecutionError` if any of the aggregated promises + # fail. Upon execution will execute any of the aggregate promises that + # were not already executed. + # + # @!macro promise_self_aggregate + def self.any(*promises) + aggregate(:any?, *promises) + end + + # Aggregates a collection of promises and executes the `then` condition + # if all aggregated promises fail. Executes the `rescue` handler with + # a `Concurrent::PromiseExecutionError` if any of the aggregated promises + # succeed. Upon execution will execute any of the aggregate promises that + # were not already executed. + # + # @!macro promise_self_aggregate + def self.none(*promises) + aggregate(:none?, *promises) + end + + # Aggregates a collection of promises and executes the `then` condition + # if one and only one of the aggregated promises succeeds. Executes the + # `rescue` handler with a `Concurrent::PromiseExecutionError` more than one + # of the aggregated promises succeed. Upon execution will execute any of + # the aggregate promises that were not already executed. + # + # @!macro promise_self_aggregate + def self.one(*promises) + aggregate(:one?, *promises) + end + protected def set_pending diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index b62f6bef4..52e8fa5ed 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -333,6 +333,233 @@ module Concurrent end end + describe 'aggregators' do + + let(:promise1) { Promise.new(executor: executor) { 1 } } + let(:promise2) { Promise.new(executor: executor) { 2 } } + let(:promise3) { Promise.new(executor: executor) { [3] } } + + describe '.all' do + + it 'returns a new Promise' do + composite = Promise.all(promise1, promise2, promise3).execute + expect(composite).to be_a Concurrent::Promise + end + + it 'does not execute the returned Promise' do + composite = Promise.all(promise1, promise2, promise3) + expect(composite).to be_unscheduled + end + + it 'executes the #then condition when all components succeed' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.all(promise1, promise2, promise3). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #then condition when no promises are given' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.all. + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #rescue handler if even one component fails' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.all(promise1, promise2, rejected_subject, promise3). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq -1 + end + end + + describe '.any' do + + it 'returns a new Promise' do + composite = Promise.any(promise1, promise2, promise3).execute + expect(composite).to be_a Concurrent::Promise + end + + it 'does not execute the returned Promise' do + composite = Promise.any(promise1, promise2, promise3) + expect(composite).to be_unscheduled + end + + it 'executes the #then condition when any components succeed' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.any(promise1, promise2, rejected_subject, promise3). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #then condition when no promises are given' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.any. + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #rescue handler if all componenst fail' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.any(rejected_subject, rejected_subject, rejected_subject, rejected_subject). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq -1 + end + end + + describe '.none' do + + it 'returns a new Promise' do + composite = Promise.none(promise1, promise2, promise3).execute + expect(composite).to be_a Concurrent::Promise + end + + it 'does not execute the returned Promise' do + composite = Promise.none(promise1, promise2, promise3) + expect(composite).to be_unscheduled + end + + it 'executes the #then condition when all components fail' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.none(rejected_subject, rejected_subject, rejected_subject, rejected_subject). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #then condition when no promises are given' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.none. + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #rescue handler if even one component succeeds' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.none(promise1, promise2, rejected_subject, promise3). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq -1 + end + end + + describe '.one' do + + it 'returns a new Promise' do + composite = Promise.one(promise1, promise2, promise3).execute + expect(composite).to be_a Concurrent::Promise + end + + it 'does not execute the returned Promise' do + composite = Promise.one(promise1, promise2, promise3) + expect(composite).to be_unscheduled + end + + it 'executes the #then condition when only one component succeeds' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.one(promise1, rejected_subject, rejected_subject, rejected_subject). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #then condition when no promises are given' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.one. + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq 1 + end + + it 'executes the #rescue handler if two or more components succeed' do + counter = Concurrent::AtomicFixnum.new(0) + latch = Concurrent::CountDownLatch.new(1) + + composite = Promise.one(promise1, promise2, rejected_subject, promise3). + then { counter.up; latch.count_down }. + rescue { counter.down; latch.count_down }. + execute + + latch.wait(1) + + expect(counter.value).to eq -1 + end + end + end + context 'fulfillment' do it 'passes the result of each block to all its children' do From 2fe40b906ee16de2d34f65e84de7a4717f310dee Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Sat, 6 Dec 2014 23:09:55 -0500 Subject: [PATCH 2/3] Renamed new Promise methods to all?, any?, none?, and on? (added question marks). --- lib/concurrent/promise.rb | 124 ++++++++++++++++++++------------ spec/concurrent/promise_spec.rb | 48 ++++++------- 2 files changed, 104 insertions(+), 68 deletions(-) diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index acc6a8daf..e7d162b43 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -7,15 +7,41 @@ module Concurrent PromiseExecutionError = Class.new(StandardError) - # Promises are inspired by the JavaScript [Promises/A](http://wiki.commonjs.org/wiki/Promises/A) and [Promises/A+](http://promises-aplus.github.io/promises-spec/) specifications. + # Promises are inspired by the JavaScript [Promises/A](http://wiki.commonjs.org/wiki/Promises/A) + # and [Promises/A+](http://promises-aplus.github.io/promises-spec/) specifications. # # > A promise represents the eventual value returned from the single completion of an operation. # - # Promises are similar to futures and share many of the same behaviours. Promises are far more robust, however. Promises can be chained in a tree structure where each promise may have zero or more children. Promises are chained using the `then` method. The result of a call to `then` is always another promise. Promises are resolved asynchronously (with respect to the main thread) but in a strict order: parents are guaranteed to be resolved before their children, children before their younger siblings. The `then` method takes two parameters: an optional block to be executed upon parent resolution and an optional callable to be executed upon parent failure. The result of each promise is passed to each of its children upon resolution. When a promise is rejected all its children will be summarily rejected and will receive the reason. - # - # Promises have four possible states: *unscheduled*, *pending*, *rejected*, and *fulfilled*. A Promise created using `.new` will be *unscheduled*. It is scheduled by calling the `execute` method. Upon execution the Promise and all its children will be set to *pending*. When a promise is *pending* it will remain in that state until processing is complete. A completed Promise is either *rejected*, indicating that an exception was thrown during processing, or *fulfilled*, indicating it succeeded. If a Promise is *fulfilled* its `value` will be updated to reflect the result of the operation. If *rejected* the `reason` will be updated with a reference to the thrown exception. The predicate methods `unscheduled?`, `pending?`, `rejected?`, and `fulfilled?` can be called at any time to obtain the state of the Promise, as can the `state` method, which returns a symbol. A Promise created using `.execute` will be *pending*, a Promise created using `.fulfill(value)` will be *fulfilled* with the given value and a Promise created using `.reject(reason)` will be *rejected* with the given reason. - # - # Retrieving the value of a promise is done through the `value` (alias: `deref`) method. Obtaining the value of a promise is a potentially blocking operation. When a promise is *rejected* a call to `value` will return `nil` immediately. When a promise is *fulfilled* a call to `value` will immediately return the current value. When a promise is *pending* a call to `value` will block until the promise is either *rejected* or *fulfilled*. A *timeout* value can be passed to `value` to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call will not block. Any other integer or float value will indicate the maximum number of seconds to block. + # Promises are similar to futures and share many of the same behaviours. Promises are far more + # robust, however. Promises can be chained in a tree structure where each promise may have zero + # or more children. Promises are chained using the `then` method. The result of a call to `then` + # is always another promise. Promises are resolved asynchronously (with respect to the main thread) + # but in a strict order: parents are guaranteed to be resolved before their children, children + # before their younger siblings. The `then` method takes two parameters: an optional block to + # be executed upon parent resolution and an optional callable to be executed upon parent failure. + # The result of each promise is passed to each of its children upon resolution. When a promise + # is rejected all its children will be summarily rejected and will receive the reason. + # + # Promises have four possible states: *unscheduled*, *pending*, *rejected*, and *fulfilled*. A + # Promise created using `.new` will be *unscheduled*. It is scheduled by calling the `execute` + # method. Upon execution the Promise and all its children will be set to *pending*. When a promise + # is *pending* it will remain in that state until processing is complete. A completed Promise is + # either *rejected*, indicating that an exception was thrown during processing, or *fulfilled*, + # indicating it succeeded. If a Promise is *fulfilled* its `value` will be updated to reflect + # the result of the operation. If *rejected* the `reason` will be updated with a reference to + # the thrown exception. The predicate methods `unscheduled?`, `pending?`, `rejected?`, and + # `fulfilled?` can be called at any time to obtain the state of the Promise, as can the `state` + # method, which returns a symbol. A Promise created using `.execute` will be *pending*, a Promise + # created using `.fulfill(value)` will be *fulfilled* with the given value and a Promise created + # using `.reject(reason)` will be *rejected* with the given reason. + # + # Retrieving the value of a promise is done through the `value` (alias: `deref`) method. Obtaining + # the value of a promise is a potentially blocking operation. When a promise is *rejected* a call + # to `value` will return `nil` immediately. When a promise is *fulfilled* a call to `value` will + # immediately return the current value. When a promise is *pending* a call to `value` will block + # until the promise is either *rejected* or *fulfilled*. A *timeout* value can be passed to `value` + # to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call + # will not block. Any other integer or float value will indicate the maximum number of seconds to block. # # Promises run on the global thread pool. # @@ -30,13 +56,15 @@ module Concurrent # Then create one # # ```ruby - # p = Promise.execute do + # p = Concurrent::Promise.execute do # # do something # 42 # end # ``` # - # Promises can be chained using the `then` method. The `then` method accepts a block, to be executed on fulfillment, and a callable argument to be executed on rejection. The result of the each promise is passed as the block argument to chained promises. + # Promises can be chained using the `then` method. The `then` method accepts a block, to be executed + # on fulfillment, and a callable argument to be executed on rejection. The result of the each promise + # is passed as the block argument to chained promises. # # ```ruby # p = Concurrent::Promise.new{10}.then{|x| x * 2}.then{|result| result - 10 }.execute @@ -57,7 +85,10 @@ module Concurrent # - if parent is *fulfilled* the child will be *pending* # - if parent is *rejected* the child will be *pending* (but will ultimately be *rejected*) # - # Promises are executed asynchronously from the main thread. By the time a child Promise finishes initialization it may be in a different state that its parent (by the time a child is created its parent may have completed execution and changed state). Despite being asynchronous, however, the order of execution of Promise objects in a chain (or tree) is strictly defined. + # Promises are executed asynchronously from the main thread. By the time a child Promise finishes + # nitialization it may be in a different state that its parent (by the time a child is created its parent + # may have completed execution and changed state). Despite being asynchronous, however, the order of + # execution of Promise objects in a chain (or tree) is strictly defined. # # There are multiple ways to create and execute a new `Promise`. Both ways provide identical behavior: # @@ -107,7 +138,8 @@ module Concurrent # # #### Rejection # - # When a promise is rejected all its children will be rejected and will receive the rejection `reason` as the rejection callable parameter: + # When a promise is rejected all its children will be rejected and will receive the rejection `reason` + # as the rejection callable parameter: # # ```ruby # p = [ Concurrent::Promise.execute{ Thread.pass; raise StandardError } ] @@ -121,11 +153,15 @@ module Concurrent # c2.state #=> :rejected # ``` # - # Once a promise is rejected it will continue to accept children that will receive immediately rejection (they will be executed asynchronously). + # Once a promise is rejected it will continue to accept children that will receive immediately rejection + # (they will be executed asynchronously). # # #### Aliases # - # The `then` method is the most generic alias: it accepts a block to be executed upon parent fulfillment and a callable to be executed upon parent rejection. At least one of them should be passed. The default block is `{ |result| result }` that fulfills the child with the parent value. The default callable is `{ |reason| raise reason }` that rejects the child with the parent reason. + # The `then` method is the most generic alias: it accepts a block to be executed upon parent fulfillment + # and a callable to be executed upon parent rejection. At least one of them should be passed. The default + # block is `{ |result| result }` that fulfills the child with the parent value. The default callable is + # `{ |reason| raise reason }` that rejects the child with the parent reason. # # - `on_success { |result| ... }` is the same as `then {|result| ... }` # - `rescue { |reason| ... }` is the same as `then(Proc.new { |reason| ... } )` @@ -292,13 +328,11 @@ def zip(*others) self.class.zip(self, *others) end - # Aggregate a collection of zero or more promises under a composite promise, - # execute the aggregated promises and collect them into a standard Ruby array, - # call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`, - # or `one?`) on the collection checking for the success or failure of each, - # then executing the composite's `#then` handlers if the predicate returns - # `true` or executing the composite's `#rescue` handlers if the predicate - # returns false. + # Aggregates a collection of promises and executes the `then` condition + # if all aggregated promises succeed. Executes the `rescue` handler with + # a `Concurrent::PromiseExecutionError` if any of the aggregated promises + # fail. Upon execution will execute any of the aggregate promises that + # were not already executed. # # @!macro [attach] promise_self_aggregate # @@ -314,28 +348,7 @@ def zip(*others) # @param [Array] promises Zero or more promises to aggregate # @return [Promise] an unscheduled (not executed) promise that aggregates # the promises given as arguments - def self.aggregate(method, *promises) - composite = Promise.new do - completed = promises.collect do |promise| - promise.execute if promise.unscheduled? - promise.wait - promise - end - unless completed.empty? || completed.send(method){|promise| promise.fulfilled? } - raise PromiseExecutionError - end - end - composite - end - - # Aggregates a collection of promises and executes the `then` condition - # if all aggregated promises succeed. Executes the `rescue` handler with - # a `Concurrent::PromiseExecutionError` if any of the aggregated promises - # fail. Upon execution will execute any of the aggregate promises that - # were not already executed. - # - # @!macro promise_self_aggregate - def self.all(*promises) + def self.all?(*promises) aggregate(:all?, *promises) end @@ -346,7 +359,7 @@ def self.all(*promises) # were not already executed. # # @!macro promise_self_aggregate - def self.any(*promises) + def self.any?(*promises) aggregate(:any?, *promises) end @@ -357,7 +370,7 @@ def self.any(*promises) # were not already executed. # # @!macro promise_self_aggregate - def self.none(*promises) + def self.none?(*promises) aggregate(:none?, *promises) end @@ -368,12 +381,35 @@ def self.none(*promises) # the aggregate promises that were not already executed. # # @!macro promise_self_aggregate - def self.one(*promises) + def self.one?(*promises) aggregate(:one?, *promises) end protected + # Aggregate a collection of zero or more promises under a composite promise, + # execute the aggregated promises and collect them into a standard Ruby array, + # call the given Ruby `Ennnumerable` predicate (such as `any?`, `all?`, `none?`, + # or `one?`) on the collection checking for the success or failure of each, + # then executing the composite's `#then` handlers if the predicate returns + # `true` or executing the composite's `#rescue` handlers if the predicate + # returns false. + # + # @!macro promise_self_aggregate + def self.aggregate(method, *promises) + composite = Promise.new do + completed = promises.collect do |promise| + promise.execute if promise.unscheduled? + promise.wait + promise + end + unless completed.empty? || completed.send(method){|promise| promise.fulfilled? } + raise PromiseExecutionError + end + end + composite + end + def set_pending mutex.synchronize do @state = :pending diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index 52e8fa5ed..f09f1be9b 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -339,15 +339,15 @@ module Concurrent let(:promise2) { Promise.new(executor: executor) { 2 } } let(:promise3) { Promise.new(executor: executor) { [3] } } - describe '.all' do + describe '.all?' do it 'returns a new Promise' do - composite = Promise.all(promise1, promise2, promise3).execute + composite = Promise.all?(promise1, promise2, promise3).execute expect(composite).to be_a Concurrent::Promise end it 'does not execute the returned Promise' do - composite = Promise.all(promise1, promise2, promise3) + composite = Promise.all?(promise1, promise2, promise3) expect(composite).to be_unscheduled end @@ -355,7 +355,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.all(promise1, promise2, promise3). + composite = Promise.all?(promise1, promise2, promise3). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -369,7 +369,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.all. + composite = Promise.all?. then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -383,7 +383,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.all(promise1, promise2, rejected_subject, promise3). + composite = Promise.all?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -394,15 +394,15 @@ module Concurrent end end - describe '.any' do + describe '.any?' do it 'returns a new Promise' do - composite = Promise.any(promise1, promise2, promise3).execute + composite = Promise.any?(promise1, promise2, promise3).execute expect(composite).to be_a Concurrent::Promise end it 'does not execute the returned Promise' do - composite = Promise.any(promise1, promise2, promise3) + composite = Promise.any?(promise1, promise2, promise3) expect(composite).to be_unscheduled end @@ -410,7 +410,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.any(promise1, promise2, rejected_subject, promise3). + composite = Promise.any?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -424,7 +424,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.any. + composite = Promise.any?. then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -438,7 +438,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.any(rejected_subject, rejected_subject, rejected_subject, rejected_subject). + composite = Promise.any?(rejected_subject, rejected_subject, rejected_subject, rejected_subject). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -449,15 +449,15 @@ module Concurrent end end - describe '.none' do + describe '.none?' do it 'returns a new Promise' do - composite = Promise.none(promise1, promise2, promise3).execute + composite = Promise.none?(promise1, promise2, promise3).execute expect(composite).to be_a Concurrent::Promise end it 'does not execute the returned Promise' do - composite = Promise.none(promise1, promise2, promise3) + composite = Promise.none?(promise1, promise2, promise3) expect(composite).to be_unscheduled end @@ -465,7 +465,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.none(rejected_subject, rejected_subject, rejected_subject, rejected_subject). + composite = Promise.none?(rejected_subject, rejected_subject, rejected_subject, rejected_subject). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -479,7 +479,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.none. + composite = Promise.none?. then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -493,7 +493,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.none(promise1, promise2, rejected_subject, promise3). + composite = Promise.none?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -504,15 +504,15 @@ module Concurrent end end - describe '.one' do + describe '.one?' do it 'returns a new Promise' do - composite = Promise.one(promise1, promise2, promise3).execute + composite = Promise.one?(promise1, promise2, promise3).execute expect(composite).to be_a Concurrent::Promise end it 'does not execute the returned Promise' do - composite = Promise.one(promise1, promise2, promise3) + composite = Promise.one?(promise1, promise2, promise3) expect(composite).to be_unscheduled end @@ -520,7 +520,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.one(promise1, rejected_subject, rejected_subject, rejected_subject). + composite = Promise.one?(promise1, rejected_subject, rejected_subject, rejected_subject). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -534,7 +534,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.one. + composite = Promise.one?. then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute @@ -548,7 +548,7 @@ module Concurrent counter = Concurrent::AtomicFixnum.new(0) latch = Concurrent::CountDownLatch.new(1) - composite = Promise.one(promise1, promise2, rejected_subject, promise3). + composite = Promise.one?(promise1, promise2, rejected_subject, promise3). then { counter.up; latch.count_down }. rescue { counter.down; latch.count_down }. execute From 4fb4b7dfc2be56698bc8a35f1f6653130fb0bb4b Mon Sep 17 00:00:00 2001 From: Jerry D'Antonio Date: Wed, 10 Dec 2014 22:41:30 -0500 Subject: [PATCH 3/3] Removed Promise.one? and Promise.none? The original request was for .any? and .all? methods. We added .none? and .one? methods for synergy with arrays, but didn't like that these methods supressed exceptions. So we decided to add only the requested methods and consider other methods in the future. --- lib/concurrent/promise.rb | 22 ------- spec/concurrent/promise_spec.rb | 110 -------------------------------- 2 files changed, 132 deletions(-) diff --git a/lib/concurrent/promise.rb b/lib/concurrent/promise.rb index 723f66a80..e33fb53ff 100644 --- a/lib/concurrent/promise.rb +++ b/lib/concurrent/promise.rb @@ -363,28 +363,6 @@ def self.any?(*promises) aggregate(:any?, *promises) end - # Aggregates a collection of promises and executes the `then` condition - # if all aggregated promises fail. Executes the `rescue` handler with - # a `Concurrent::PromiseExecutionError` if any of the aggregated promises - # succeed. Upon execution will execute any of the aggregate promises that - # were not already executed. - # - # @!macro promise_self_aggregate - def self.none?(*promises) - aggregate(:none?, *promises) - end - - # Aggregates a collection of promises and executes the `then` condition - # if one and only one of the aggregated promises succeeds. Executes the - # `rescue` handler with a `Concurrent::PromiseExecutionError` more than one - # of the aggregated promises succeed. Upon execution will execute any of - # the aggregate promises that were not already executed. - # - # @!macro promise_self_aggregate - def self.one?(*promises) - aggregate(:one?, *promises) - end - protected # Aggregate a collection of zero or more promises under a composite promise, diff --git a/spec/concurrent/promise_spec.rb b/spec/concurrent/promise_spec.rb index f09f1be9b..fa7bd87c5 100644 --- a/spec/concurrent/promise_spec.rb +++ b/spec/concurrent/promise_spec.rb @@ -448,116 +448,6 @@ module Concurrent expect(counter.value).to eq -1 end end - - describe '.none?' do - - it 'returns a new Promise' do - composite = Promise.none?(promise1, promise2, promise3).execute - expect(composite).to be_a Concurrent::Promise - end - - it 'does not execute the returned Promise' do - composite = Promise.none?(promise1, promise2, promise3) - expect(composite).to be_unscheduled - end - - it 'executes the #then condition when all components fail' do - counter = Concurrent::AtomicFixnum.new(0) - latch = Concurrent::CountDownLatch.new(1) - - composite = Promise.none?(rejected_subject, rejected_subject, rejected_subject, rejected_subject). - then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute - - latch.wait(1) - - expect(counter.value).to eq 1 - end - - it 'executes the #then condition when no promises are given' do - counter = Concurrent::AtomicFixnum.new(0) - latch = Concurrent::CountDownLatch.new(1) - - composite = Promise.none?. - then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute - - latch.wait(1) - - expect(counter.value).to eq 1 - end - - it 'executes the #rescue handler if even one component succeeds' do - counter = Concurrent::AtomicFixnum.new(0) - latch = Concurrent::CountDownLatch.new(1) - - composite = Promise.none?(promise1, promise2, rejected_subject, promise3). - then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute - - latch.wait(1) - - expect(counter.value).to eq -1 - end - end - - describe '.one?' do - - it 'returns a new Promise' do - composite = Promise.one?(promise1, promise2, promise3).execute - expect(composite).to be_a Concurrent::Promise - end - - it 'does not execute the returned Promise' do - composite = Promise.one?(promise1, promise2, promise3) - expect(composite).to be_unscheduled - end - - it 'executes the #then condition when only one component succeeds' do - counter = Concurrent::AtomicFixnum.new(0) - latch = Concurrent::CountDownLatch.new(1) - - composite = Promise.one?(promise1, rejected_subject, rejected_subject, rejected_subject). - then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute - - latch.wait(1) - - expect(counter.value).to eq 1 - end - - it 'executes the #then condition when no promises are given' do - counter = Concurrent::AtomicFixnum.new(0) - latch = Concurrent::CountDownLatch.new(1) - - composite = Promise.one?. - then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute - - latch.wait(1) - - expect(counter.value).to eq 1 - end - - it 'executes the #rescue handler if two or more components succeed' do - counter = Concurrent::AtomicFixnum.new(0) - latch = Concurrent::CountDownLatch.new(1) - - composite = Promise.one?(promise1, promise2, rejected_subject, promise3). - then { counter.up; latch.count_down }. - rescue { counter.down; latch.count_down }. - execute - - latch.wait(1) - - expect(counter.value).to eq -1 - end - end end context 'fulfillment' do