Skip to content

Commit

Permalink
Testing multiple Exchanger implementations.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdantonio committed Mar 29, 2015
1 parent 3735bb1 commit 308f2c5
Showing 1 changed file with 94 additions and 13 deletions.
107 changes: 94 additions & 13 deletions lib/concurrent/exchanger.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Concurrent
# @see Concurrent::MVar
# @see Concurrent::Dereferenceable
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Exchanger.html java.util.concurrent.Exchanger
class Exchanger
class MVarExchanger

EMPTY = Object.new

Expand All @@ -34,23 +34,104 @@ def initialize(opts = {})
# thread. nil (default value) means no timeout
# @return [Object] the value exchanged by the other thread; nil if timed out
def exchange(value, timeout = nil)
first = @first.take(timeout)
if first == MVar::TIMEOUT
nil
elsif first == EMPTY
@first.put value
second = @second.take timeout
if second == MVar::TIMEOUT
nil

# Both threads modify the first variable
first_result = @first.modify(timeout) do |first|
# Does it currently contain the special empty value?
if first == EMPTY
# If so, modify it to contain our value
value
else
second
# Otherwise, modify it back to the empty state
EMPTY
end
end

# If that timed out, the whole operation timed out
return nil if first_result == MVar::TIMEOUT

# What was in @first before we modified it?
if first_result == EMPTY
# We stored our object - someone else will turn up with the second
# object at some point in the future

# Wait for the second object to appear
second_result = @second.take(timeout)

# If that timed out, the whole operation timed out
return nil if second_result == MVar::TIMEOUT

# BUT HOW DO WE CANCEL OUR RESULT BEING AVAILABLE IN @first?

# Return that second object
second_result
else
@first.put EMPTY
@second.put value
first
# We reset @first to be empty again - so the other value is in
# first_result and we need to tell the other thread about our value

# Tell the other thread about our object
second_result = @second.put(value, timeout)

# If that timed out, the whole operation timed out
return nil if second_result == MVar::TIMEOUT

# We already have its object
first_result
end
end
end

class SlotExchanger

def initialize
@mutex = Mutex.new
@condition = Condition.new
@slot = new_slot
end

def exchange(value, timeout = nil)
@mutex.synchronize do

replace_slot_if_fulfilled

slot = @slot

if slot.state == :empty
slot.value_1 = value
slot.state = :waiting
wait_for_value(slot, timeout)
slot.value_2
else
slot.value_2 = value
slot.state = :fulfilled
@condition.broadcast
slot.value_1
end
end
end

Slot = Struct.new(:value_1, :value_2, :state)

private_constant :Slot

private

def replace_slot_if_fulfilled
@slot = new_slot if @slot.state == :fulfilled
end

def wait_for_value(slot, timeout)
remaining = Condition::Result.new(timeout)
while slot.state == :waiting && remaining.can_wait?
remaining = @condition.wait(@mutex, remaining.remaining_time)
end
end

def new_slot
Slot.new(nil, nil, :empty)
end
end

Exchanger = SlotExchanger

end

0 comments on commit 308f2c5

Please sign in to comment.