Skip to content

Commit

Permalink
Merge pull request #7 from julianghionoiu/fix-long-timeouts-test
Browse files Browse the repository at this point in the history
Fix long timeouts test, adds performance test
  • Loading branch information
julianghionoiu committed Nov 30, 2017
2 parents 09326b8 + 90e7722 commit e516e3e
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 9 deletions.
2 changes: 1 addition & 1 deletion features/spec
Submodule spec updated 1 files
+13 −1 client.feature
16 changes: 15 additions & 1 deletion features/step_definitions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
@client = TDL::Client.new(hostname: '111', port: STOMP_PORT, unique_id: 'broker')
end

Given(/^I receive 50 identical requests like:$/) do |table|
50.times do
table.hashes.each do |row|
@request_queue.send_text_message(row[:payload])
end
@request_count = table.hashes.count
end
end


Then(/^the time to wait for requests is (\d+)ms$/) do |expected_timeout|
assert_equal expected_timeout.to_i, @client.get_request_timeout_millis,
Expand All @@ -54,6 +63,11 @@
'Request queue has a different value.'
end

Then(/^the processing time should be lower than (\d+)ms$/) do |expected_value|
assert expected_value.to_i > @client.total_processing_time,
'Request queue has a different value.'
end

Given(/^I receive the following requests:$/) do |table|
table.hashes.each do |row|
@request_queue.send_text_message(row[:payload])
Expand All @@ -70,7 +84,7 @@
'some logic' => lambda {:value},
'increment number' => ->(x) {x + 1},
'echo the request' => ->(x) {x},
'work for 500ms' => ->(x) {sleep(0.01); 'OK'},
'work for 600ms' => ->(x) {sleep(0.6); 'OK'},
}

def as_implementation(call)
Expand Down
15 changes: 11 additions & 4 deletions lib/tdl/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,35 @@ def initialize(hostname:, port: 61613, unique_id:, request_timeout_millis: 500)
@unique_id = unique_id
@request_timeout_millis = request_timeout_millis
@logger = Logging.logger[self]
@total_processing_time = nil
end

def get_request_timeout_millis
@request_timeout_millis
end

def go_live_with(processing_rules)
time1 = Time.now.to_f
begin
@logger.info 'Starting client.'
remote_broker = RemoteBroker.new(@hostname, @port, @unique_id)
remote_broker = RemoteBroker.new(@hostname, @port, @unique_id, @request_timeout_millis)
remote_broker.subscribe(ApplyProcessingRules.new(processing_rules))

#DEBT: We should have no timeout here. We could put a special message in the queue
@logger.info 'Waiting for requests.'
remote_broker.join(@request_timeout_millis)
remote_broker.join
@logger.info 'Stopping client.'
remote_broker.close

rescue Exception => e
# raise e if ENV['TDL_ENV'] == 'test'
@logger.error "There was a problem processing messages. #{e.message}"
@logger.error e.backtrace.join("\n")
end
time2 = Time.now.to_f
@total_processing_time = (time2 - time1) * 1000.00
end

def total_processing_time
@total_processing_time
end


Expand Down
34 changes: 34 additions & 0 deletions lib/tdl/thread_timer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
class ThreadTimer
def initialize(timeout_millis, callback)
@timeout_millis=timeout_millis
@continue = true
@callback = callback
@timer_thread = nil
end

def start_timer
@continue = true
@timer_thread = Thread.new { start_timeout }
end

def stop_timer
@continue = false
@timer_thread.terminate unless @timer_thread.nil?
@timer_thread = nil
end

private

def start_timeout
total_millis = 0
interval_millis = 10
while @continue && total_millis < @timeout_millis
sleep interval_millis / 1000.00
total_millis += interval_millis
end

if total_millis >= @timeout_millis
@callback.call
end
end
end
16 changes: 13 additions & 3 deletions lib/tdl/transport/remote_broker.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
require_relative '../thread_timer'

module TDL
class RemoteBroker
def initialize(hostname, port, unique_id)
def initialize(hostname, port, unique_id, request_timeout_millis)
@stomp_client = Stomp::Client.new('', '', hostname, port)
@unique_id = unique_id
@serialization_provider = JSONRPCSerializationProvider.new
@timer_thread = ThreadTimer.new(request_timeout_millis, lambda = ->() { close })
end

def subscribe(handling_strategy)
@stomp_client.subscribe("/queue/#{@unique_id}.req", {:ack => 'client-individual', 'activemq.prefetchSize' => 1}) do |msg|
@timer_thread.stop_timer
request = @serialization_provider.deserialize(msg)
handling_strategy.process_next_request_from(self, request)
@timer_thread.start_timer
end
end

Expand All @@ -19,12 +24,17 @@ def respond_to(request, response)
@stomp_client.acknowledge(request.original_message)
end

def join(limit_millis)
@stomp_client.join(limit_millis / 1000.00)
def join
@timer_thread.start_timer
@stomp_client.join
end

def close
@stomp_client.close
end

def closed?
@stomp_client.closed?
end
end
end

0 comments on commit e516e3e

Please sign in to comment.