Skip to content

Commit

Permalink
Fix a race condition by unsubscribing from the requests queue
Browse files Browse the repository at this point in the history
  • Loading branch information
julianghionoiu committed Nov 30, 2017
1 parent e10a162 commit 1972343
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 10 deletions.
1 change: 1 addition & 0 deletions features/step_definitions.rb
Expand Up @@ -121,6 +121,7 @@ def as_action(actionName)
@captured_io = capture_subprocess_io do
@client.go_live_with(processing_rules)
end
@captured_io.each { |x| puts x }
end


Expand Down
1 change: 0 additions & 1 deletion lib/tdl/client.rb
Expand Up @@ -33,7 +33,6 @@ def go_live_with(processing_rules)
@logger.info 'Waiting for requests.'
remote_broker.join
@logger.info 'Stopping client.'
remote_broker.close unless remote_broker.closed?

rescue Exception => e
# raise e if ENV['TDL_ENV'] == 'test'
Expand Down
6 changes: 3 additions & 3 deletions lib/tdl/thread_timer.rb
Expand Up @@ -6,15 +6,15 @@ def initialize(timeout_millis, callback)
@timer_thread = nil
end

def start_timer
def start
@continue = true
@timer_thread = Thread.new { start_timeout }
end

def stop_timer
@continue = false
def stop
@timer_thread.terminate unless @timer_thread.nil?
@timer_thread = nil
@continue = false
end

private
Expand Down
15 changes: 9 additions & 6 deletions lib/tdl/transport/remote_broker.rb
Expand Up @@ -5,31 +5,34 @@ class RemoteBroker
def initialize(hostname, port, unique_id, request_timeout_millis)
@stomp_client = Stomp::Client.new('', '', hostname, port)
@unique_id = unique_id
@request_queue = "/queue/#{@unique_id}.req"
@response_queue = "/queue/#{@unique_id}.resp"
@serialization_provider = JSONRPCSerializationProvider.new
@timer_thread = ThreadTimer.new(request_timeout_millis, lambda = ->() { close })
@timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? })
@timer.start
end

def subscribe(handling_strategy)
@stomp_client.subscribe("/queue/#{@unique_id}.req", {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg|
@timer_thread.stop_timer
@stomp_client.subscribe(@request_queue, {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg|
@timer.stop
request = @serialization_provider.deserialize(msg)
handling_strategy.process_next_request_from(self, request)
@timer_thread.start_timer
@timer.start
end
end

def respond_to(request, response)
serialized_response = @serialization_provider.serialize(response)
@stomp_client.publish("/queue/#{@unique_id}.resp", serialized_response)
@stomp_client.publish(@response_queue, serialized_response)
@stomp_client.acknowledge(request.original_message)
end

def join
@timer_thread.start_timer
@stomp_client.join
end

def close
@stomp_client.unsubscribe(@request_queue)
@stomp_client.close
end

Expand Down

0 comments on commit 1972343

Please sign in to comment.