Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not block Agent#value when updating && other improvements #65

Merged
merged 5 commits into from
May 7, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions lib/concurrent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Agent
# is given at initialization
TIMEOUT = 5

attr_reader :timeout
attr_reader :timeout, :executor

# Initialize a new Agent with the given initial value and provided options.
#
Expand All @@ -60,12 +60,12 @@ class Agent
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
# returning the value returned from the proc
def initialize(initial, opts = {})
@value = initial
@rescuers = []
@validator = Proc.new { |result| true }
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
@value = initial
@rescuers = []
@validator = Proc.new { |result| true }
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
self.observers = CopyOnWriteObserverSet.new
@executor = OptionsParser::get_executor_from(opts)
@executor = OneByOne.new OptionsParser::get_executor_from(opts)
init_mutex
set_deref_options(opts)
end
Expand Down Expand Up @@ -111,7 +111,11 @@ def rescue(clazz = StandardError, &block)
# @yieldparam [Object] value the result of the last update operation
# @yieldreturn [Boolean] true if the value is valid else false
def validate(&block)
@validator = block unless block.nil?
unless block.nil?
mutex.lock
@validator = block
mutex.unlock
end
self
end
alias_method :validates, :validate
Expand All @@ -124,8 +128,11 @@ def validate(&block)
# the new value
# @yieldparam [Object] value the current value
# @yieldreturn [Object] the new value
# @return [true, nil] nil when no block is given
def post(&block)
@executor.post{ work(&block) } unless block.nil?
return nil if block.nil?
@executor.post { work(&block) }
true
end

# Update the current value with the result of the given block operation
Expand All @@ -139,6 +146,16 @@ def <<(block)
self
end

# Waits/blocks until all the updates sent before this call are done.
#
# @param [Numeric] timeout the maximum time in second to wait.
# @return [Boolean] false on timeout, true otherwise
def await(timeout = nil)
done = Event.new
post { done.set }
done.wait timeout
end

private

# @!visibility private
Expand All @@ -147,33 +164,41 @@ def <<(block)
# @!visibility private
def try_rescue(ex) # :nodoc:
rescuer = mutex.synchronize do
@rescuers.find{|r| ex.is_a?(r.clazz) }
@rescuers.find { |r| ex.is_a?(r.clazz) }
end
rescuer.block.call(ex) if rescuer
rescue Exception => ex
# puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}"
# supress
end

# @!visibility private
def work(&handler) # :nodoc:
validator, value = mutex.synchronize { [@validator, @value] }

begin
# FIXME creates second thread
result, valid = Concurrent::timeout(@timeout) do
[result = handler.call(value),
validator.call(result)]
end
rescue Exception => ex
exception = ex
end

should_notify = false
mutex.lock
should_notify = if !exception && valid
@value = result
true
end
mutex.unlock

mutex.synchronize do
result = Concurrent::timeout(@timeout) do
handler.call(@value)
end
if @validator.call(result)
@value = result
should_notify = true
end
end
if should_notify
time = Time.now
observers.notify_observers{ [time, self.value] } if should_notify
rescue Exception => ex
try_rescue(ex)
observers.notify_observers { [time, self.value] }
end

try_rescue(exception)
end
end
end
71 changes: 71 additions & 0 deletions lib/concurrent/executor/one_by_one.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
module Concurrent

# Ensures that jobs are passed to the underlying executor one by one,
# never running at the same time.
class OneByOne

attr_reader :executor

Job = Struct.new(:args, :block) do
def call
block.call *args
end
end

# @param [Executor] executor
def initialize(executor)
@executor = executor
@being_executed = false
@stash = []
@mutex = Mutex.new
end

# Submit a task to the executor for asynchronous processing.
#
# @param [Array] args zero or more arguments to be passed to the task
#
# @yield the asynchronous task to perform
#
# @return [Boolean] `true` if the task is queued, `false` if the executor
# is not running
#
# @raise [ArgumentError] if no task is given
def post(*args, &task)
return nil if task.nil?
job = Job.new args, task
@mutex.lock
post = if @being_executed
@stash << job
false
else
@being_executed = true
end
@mutex.unlock
@executor.post { work(job) } if post
true
end

# Submit a task to the executor for asynchronous processing.
#
# @param [Proc] task the asynchronous task to perform
#
# @return [self] returns itself
def <<(task)
post(&task)
self
end

private

# ensures next job is executed if any is stashed
def work(job)
job.call
ensure
@mutex.lock
job = @stash.shift || (@being_executed = false)
@mutex.unlock
@executor.post { work(job) } if job
end

end
end
1 change: 1 addition & 0 deletions lib/concurrent/executors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
require 'concurrent/executor/single_thread_executor'
require 'concurrent/executor/thread_pool_executor'
require 'concurrent/executor/timer_set'
require 'concurrent/executor/one_by_one'
Loading