Skip to content
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ source 'https://rubygems.org'
gemspec name: 'concurrent-ruby'

group :development do
gem 'rake', '~> 10.3.2'
gem 'rake', '~> 10.4.2'
gem 'rake-compiler', '~> 0.9.5'
gem 'gem-compiler', '~> 0.3.0'
end
Expand Down
21 changes: 21 additions & 0 deletions lib/concurrent/exchanger.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
module Concurrent

# A synchronization point at which threads can pair and swap elements within
# pairs. Each thread presents some object on entry to the exchange method,
# matches with a partner thread, and receives its partner's object on return.
#
# Uses `MVar` to manage synchronization of the individual elements.
# Since `MVar` is also a `Dereferenceable`, the exchanged values support all
# dereferenceable options. The constructor options hash will be passed to
# the `MVar` constructors.
#
# @see Concurrent::MVar
# @see Concurrent::Dereferenceable
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger
class Exchanger

EMPTY = Object.new

# Create a new `Exchanger` object.
#
# @param [Hash] opts the options controlling how the managed references
# will be processed
def initialize(opts = {})
@first = MVar.new(EMPTY, opts)
@second = MVar.new(MVar::EMPTY, opts)
end

# Waits for another thread to arrive at this exchange point (unless the
# current thread is interrupted), and then transfers the given object to
# it, receiving its object in return.
#
# @param [Object] value the value to exchange with an other thread
# @param [Numeric] timeout the maximum time in second to wait for one other
# thread. nil (default value) means no timeout
Expand Down
10 changes: 5 additions & 5 deletions lib/concurrent/executor/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ def auto_terminate?
def enable_at_exit_handler!(opts = {})
if opts.fetch(:stop_on_exit, true)
@auto_terminate = true
if RUBY_PLATFORM == 'java'
create_java_at_exit_handler!(self)
if RUBY_PLATFORM == 'ruby'
create_mri_at_exit_handler!(self.object_id)
else
create_ruby_at_exit_handler!(self.object_id)
create_at_exit_handler!(self)
end
end
end

def create_ruby_at_exit_handler!(id)
def create_mri_at_exit_handler!(id)
at_exit do
if Concurrent.auto_terminate_all_executors?
this = ObjectSpace._id2ref(id)
Expand All @@ -86,7 +86,7 @@ def create_ruby_at_exit_handler!(id)
end
end

def create_java_at_exit_handler!(this)
def create_at_exit_handler!(this)
at_exit do
this.kill if Concurrent.auto_terminate_all_executors?
end
Expand Down
16 changes: 16 additions & 0 deletions lib/concurrent/utility/monotonic_time.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ def monotonic_time
GLOBAL_MONOTONIC_CLOCK.get_time
end
module_function :monotonic_time

# Runs the given block and returns the number of seconds that elapsed.
#
# @yield the block to run and time
# @return [Float] the number of seconds the block took to run
#
# @raise [ArgumentError] when no block given
#
# @!macro monotonic_clock_warning
def monotonic_interval
raise ArgumentError.new('no block given') unless block_given?
start_time = GLOBAL_MONOTONIC_CLOCK.get_time
yield
GLOBAL_MONOTONIC_CLOCK.get_time - start_time
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method may not return correct results on MRI if there was time shift, it may be better kept just in test helper.

module_function :monotonic_interval
end

__END__
Expand Down
19 changes: 9 additions & 10 deletions spec/concurrent/agent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ module Concurrent

subject { Agent.new(0, executor: executor) }

after(:each) do
executor.kill
end

let(:observer) do
Class.new do
attr_reader :value
Expand Down Expand Up @@ -196,17 +200,12 @@ def trigger_observable(observable)
context '#await' do

it 'waits until already sent updates are done' do
fn = false
subject.post { fn = true; sleep 0.1 }
subject.await
expect(fn).to be_truthy
end

it 'does not waits until updates sent after are done' do
fn = false
actual = Concurrent::AtomicBoolean.new(false)
latch = Concurrent::CountDownLatch.new
subject.post { latch.count_down; sleep(0.1); actual.make_true }
latch.wait(1)
subject.await
subject.post { fn = true; sleep 0.1 }
expect(fn).to be_falsey
expect(actual.value).to be true
end

it 'does not alter the value' do
Expand Down
Loading