Skip to content

Commit

Permalink
Merge pull request #270 from ruby-concurrency/ivar-promise
Browse files Browse the repository at this point in the history
Promise extends IVar
  • Loading branch information
jdantonio committed Apr 24, 2015
2 parents c27d304 + 1314eda commit cd68976
Show file tree
Hide file tree
Showing 20 changed files with 554 additions and 397 deletions.
13 changes: 1 addition & 12 deletions lib/concurrent/agent.rb
Expand Up @@ -24,18 +24,7 @@ class Agent
#
# @param [Object] initial the initial value
#
# @!macro [attach] executor_and_deref_options
#
# @param [Hash] opts the options used to define the behavior at update and deref
# and to specify the executor on which to perform actions
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:task` returns the global task pool,
# `:operation` returns the global operation pool, and `:immediate` returns a new
# `ImmediateExecutor` object.
# @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data
# @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data
# @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
# @!macro executor_and_deref_options
def initialize(initial, opts = {})
@value = initial
@rescuers = []
Expand Down
7 changes: 2 additions & 5 deletions lib/concurrent/async.rb
Expand Up @@ -82,14 +82,11 @@ def method_missing(method, *args, &block)
self.define_singleton_method(method) do |*args2|
Async::validate_argc(@delegate, method, *args2)
ivar = Concurrent::IVar.new
value, reason = nil, nil
@serializer.post(@executor.value) do
begin
value = @delegate.send(method, *args2, &block)
ivar.set(@delegate.send(method, *args2, &block))
rescue => reason
# caught
ensure
ivar.complete(reason.nil?, value, reason)
ivar.fail(reason)
end
end
ivar.value if @blocking
Expand Down
26 changes: 11 additions & 15 deletions lib/concurrent/atomic/copy_on_notify_observer_set.rb
Expand Up @@ -33,40 +33,38 @@ def add_observer(observer=nil, func=:update, &block)
begin
@mutex.lock
@observers[observer] = func
observer
ensure
@mutex.unlock
end

observer
end

# @param [Object] observer the observer to remove
# @return [Object] the deleted observer
def delete_observer(observer)
@mutex.lock
@observers.delete(observer)
@mutex.unlock

observer
ensure
@mutex.unlock
end

# Deletes all observers
# @return [CopyOnWriteObserverSet] self
def delete_observers
@mutex.lock
@observers.clear
@mutex.unlock

self
ensure
@mutex.unlock
end

# @return [Integer] the observers count
def count_observers
@mutex.lock
result = @observers.count
@observers.count
ensure
@mutex.unlock

result
end

# Notifies all registered observers with optional args
Expand All @@ -75,7 +73,6 @@ def count_observers
def notify_observers(*args, &block)
observers = duplicate_observers
notify_to(observers, *args, &block)

self
end

Expand All @@ -86,7 +83,6 @@ def notify_observers(*args, &block)
def notify_and_delete_observers(*args, &block)
observers = duplicate_and_clear_observers
notify_to(observers, *args, &block)

self
end

Expand All @@ -96,17 +92,17 @@ def duplicate_and_clear_observers
@mutex.lock
observers = @observers.dup
@observers.clear
@mutex.unlock

observers
ensure
@mutex.unlock
end

def duplicate_observers
@mutex.lock
observers = @observers.dup
@mutex.unlock

observers
ensure
@mutex.unlock
end

def notify_to(observers, *args)
Expand Down
4 changes: 2 additions & 2 deletions lib/concurrent/channel/buffered_channel.rb
Expand Up @@ -40,7 +40,7 @@ def select(probe)
@probe_set.put(probe)
true
else
shift_buffer if probe.set_unless_assigned(peek_buffer, self)
shift_buffer if probe.try_set([peek_buffer, self])
end

end
Expand Down Expand Up @@ -76,7 +76,7 @@ def set_probe_or_push_into_buffer(value)
push_into_buffer(value)
true
else
@probe_set.take.set_unless_assigned(value, self)
@probe_set.take.try_set([value, self])
end
end
end
Expand Down
29 changes: 2 additions & 27 deletions lib/concurrent/channel/channel.rb
Expand Up @@ -3,37 +3,12 @@
module Concurrent
module Channel

class Probe < Concurrent::IVar

def initialize(value = NO_VALUE, opts = {})
super(value, opts)
end

def set_unless_assigned(value, channel)
mutex.synchronize do
return false if [:fulfilled, :rejected].include? @state

set_state(true, [value, channel], nil)
event.set
true
end
end

alias_method :composite_value, :value

def value
composite_value.nil? ? nil : composite_value[0]
end

def channel
composite_value.nil? ? nil : composite_value[1]
end
end
Probe = IVar

def self.select(*channels)
probe = Probe.new
channels.each { |channel| channel.select(probe) }
result = probe.composite_value
result = probe.value
channels.each { |channel| channel.remove_probe(probe) }
result
end
Expand Down
3 changes: 1 addition & 2 deletions lib/concurrent/channel/unbuffered_channel.rb
Expand Up @@ -12,8 +12,7 @@ def probe_set_size
end

def push(value)
# TODO set_unless_assigned define on IVar as #set_state? or #try_set_state
until @probe_set.take.set_unless_assigned(value, self)
until @probe_set.take.try_set([value, self])
end
end

Expand Down
55 changes: 34 additions & 21 deletions lib/concurrent/delay.rb
Expand Up @@ -3,6 +3,7 @@
require 'concurrent/obligation'
require 'concurrent/executor/executor_options'
require 'concurrent/executor/immediate_executor'
require 'concurrent/synchronization'

module Concurrent

Expand Down Expand Up @@ -37,7 +38,7 @@ module Concurrent
# execute on the given executor, allowing the call to timeout.
#
# @see Concurrent::Dereferenceable
class Delay
class Delay < Synchronization::Object
include Obligation
include ExecutorOptions

Expand All @@ -51,15 +52,27 @@ class Delay

# Create a new `Delay` in the `:pending` state.
#
# @yield the delayed operation to perform
# @!macro [attach] executor_and_deref_options
#
# @param [Hash] opts the options used to define the behavior at update and deref
# and to specify the executor on which to perform actions
# @option opts [Executor] :executor when set use the given `Executor` instance.
# Three special values are also supported: `:task` returns the global task pool,
# `:operation` returns the global operation pool, and `:immediate` returns a new
# `ImmediateExecutor` object.
# @option opts [Boolean] :dup_on_deref (false) call `#dup` before returning the data
# @option opts [Boolean] :freeze_on_deref (false) call `#freeze` before returning the data
# @option opts [Proc] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
#
# @!macro executor_and_deref_options
# @yield the delayed operation to perform
#
# @raise [ArgumentError] if no block is given
def initialize(opts = {}, &block)
raise ArgumentError.new('no block given') unless block_given?

init_obligation
super()
init_obligation(self)
set_deref_options(opts)
@task_executor = get_executor_from(opts)

Expand Down Expand Up @@ -145,16 +158,15 @@ def wait(timeout = nil)
# @yield the delayed operation to perform
# @return [true, false] if success
def reconfigure(&block)
mutex.lock
raise ArgumentError.new('no block given') unless block_given?
unless @computing
@task = block
true
else
false
mutex.synchronize do
raise ArgumentError.new('no block given') unless block_given?
unless @computing
@task = block
true
else
false
end
end
ensure
mutex.unlock
end

private
Expand All @@ -163,10 +175,11 @@ def reconfigure(&block)
def execute_task_once # :nodoc:
# this function has been optimized for performance and
# should not be modified without running new benchmarks
mutex.lock
execute = @computing = true unless @computing
task = @task
mutex.unlock
execute = task = nil
mutex.synchronize do
execute = @computing = true unless @computing
task = @task
end

if execute
@task_executor.post do
Expand All @@ -176,10 +189,10 @@ def execute_task_once # :nodoc:
rescue => ex
reason = ex
end
mutex.lock
set_state(success, result, reason)
event.set
mutex.unlock
mutex.synchronize do
set_state(success, result, reason)
event.set
end
end
end
end
Expand Down
34 changes: 13 additions & 21 deletions lib/concurrent/dereferenceable.rb
Expand Up @@ -39,24 +39,17 @@ module Dereferenceable
#
# @return [Object] the current value of the object
def value
mutex.lock
apply_deref_options(@value)
ensure
mutex.unlock
mutex.synchronize { apply_deref_options(@value) }
end

alias_method :deref, :value

protected

# Set the internal value of this object
#
# @param [Object] val the new value
def value=(val)
mutex.lock
@value = val
ensure
mutex.unlock
# @param [Object] value the new value
def value=(value)
mutex.synchronize{ @value = value }
end

# A mutex lock used for synchronizing thread-safe operations. Methods defined
Expand All @@ -74,8 +67,8 @@ def mutex
# @note This method *must* be called from within the constructor of the including class.
#
# @see #mutex
def init_mutex
@mutex = Mutex.new
def init_mutex(mutex = Mutex.new)
@mutex = mutex
end

# Set the options which define the operations #value performs before
Expand All @@ -91,14 +84,13 @@ def init_mutex
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing
# the internal value and returning the value returned from the proc
def set_deref_options(opts = {})
mutex.lock
@dup_on_deref = opts[:dup_on_deref] || opts[:dup]
@freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze]
@copy_on_deref = opts[:copy_on_deref] || opts[:copy]
@do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref)
nil
ensure
mutex.unlock
mutex.synchronize do
@dup_on_deref = opts[:dup_on_deref] || opts[:dup]
@freeze_on_deref = opts[:freeze_on_deref] || opts[:freeze]
@copy_on_deref = opts[:copy_on_deref] || opts[:copy]
@do_nothing_on_deref = !(@dup_on_deref || @freeze_on_deref || @copy_on_deref)
nil
end
end

# @!visibility private
Expand Down
13 changes: 12 additions & 1 deletion lib/concurrent/future.rb
Expand Up @@ -76,7 +76,18 @@ def self.execute(opts = {}, &block)
Future.new(opts, &block).execute
end

protected :set, :fail, :complete
# @!macro ivar_set_method
def set(value = IVar::NO_VALUE, &block)
check_for_block_or_value!(block_given?, value)
mutex.synchronize do
if @state != :unscheduled
raise MultipleAssignmentError
else
@task = block || Proc.new { value }
end
end
execute
end

private

Expand Down

0 comments on commit cd68976

Please sign in to comment.