diff --git a/features/queue_steps.rb b/features/queue_steps.rb index c9e4924..b1078e3 100644 --- a/features/queue_steps.rb +++ b/features/queue_steps.rb @@ -19,20 +19,25 @@ # Broker client definition STOMP_PORT = 21613 +REQUEST_QUEUE_NAME = 'some-user-req' +RESPONSE_QUEUE_NAME = 'some-user-resp' + # ~~~~~ Setup -Given(/^I start with a clean broker and a client for user "([^\"]*)"$/) do |username| - unique_id = username - @request_queue = BROKER.add_queue("#{unique_id}.req") +Given(/^I start with a clean broker having a request and a response queue$/) do || + @request_queue = BROKER.add_queue(REQUEST_QUEUE_NAME) @request_queue.purge - @response_queue = BROKER.add_queue("#{unique_id}.resp") + @response_queue = BROKER.add_queue(RESPONSE_QUEUE_NAME) @response_queue.purge +end +Given(/^a client that connects to the queues$/) do config = TDL::ImplementationRunnerConfig.new() .set_hostname(HOSTNAME) .set_port(STOMP_PORT) - .set_unique_id(unique_id) + .set_request_queue_name(REQUEST_QUEUE_NAME) + .set_response_queue_name(RESPONSE_QUEUE_NAME) @queueBasedImplementationRunnerBuilder = TDL::QueueBasedImplementationRunnerBuilder.new() .set_config(config) @@ -43,7 +48,8 @@ config = TDL::ImplementationRunnerConfig.new() .set_hostname('111') .set_port(STOMP_PORT) - .set_unique_id('X') + .set_request_queue_name('X') + .set_response_queue_name('Y') @queueBasedImplementationRunnerBuilder = TDL::QueueBasedImplementationRunnerBuilder.new() .set_config(config); @@ -116,8 +122,7 @@ def as_implementation(call) @queueBasedImplementationRunnerBuilder .with_solution_for( row[:method], - as_implementation(row[:call]), - as_action(row[:action])) + as_implementation(row[:call])) end @queueBasedImplementationRunner = @queueBasedImplementationRunnerBuilder.create @@ -165,6 +170,16 @@ def as_implementation(call) 'The response queue has different size. Messages have been published' end +Then(/^the client should consume one request$/) do + assert_equal @request_count - 1, @request_queue.get_size, + 'The request queue has different size. Messages have been consumed' +end + +Then(/^the client should publish one response$/) do + assert_equal @request_count - 2, @response_queue.get_size, + 'The response queue has different size. Messages have been published' +end + Then(/^I should get no exception$/) do #OBS if you get here there were no exceptions end diff --git a/lib/tdl/queue/implementation_runner_config.rb b/lib/tdl/queue/implementation_runner_config.rb index f606fc9..82b650d 100644 --- a/lib/tdl/queue/implementation_runner_config.rb +++ b/lib/tdl/queue/implementation_runner_config.rb @@ -8,6 +8,8 @@ def initialize @port = 61613 @time_to_wait_for_requests = 500 @audit_stream = ConsoleAuditStream.new + @request_queue_name = '' + @response_queue_name = '' end def set_hostname(hostname) @@ -20,8 +22,13 @@ def set_port(port) self end - def set_unique_id(unique_id) - @unique_id = unique_id + def set_request_queue_name(queue_name) + @request_queue_name = queue_name + self + end + + def set_response_queue_name(queue_name) + @response_queue_name = queue_name self end @@ -43,8 +50,12 @@ def get_port @port end - def get_unique_id - @unique_id + def get_request_queue_name + @request_queue_name + end + + def get_response_queue_name + @response_queue_name end def get_time_to_wait_for_requests diff --git a/lib/tdl/queue/queue_based_implementation_runner.rb b/lib/tdl/queue/queue_based_implementation_runner.rb index 47f3c99..03a31a4 100644 --- a/lib/tdl/queue/queue_based_implementation_runner.rb +++ b/lib/tdl/queue/queue_based_implementation_runner.rb @@ -19,7 +19,8 @@ def run remote_broker = RemoteBroker.new( @config.get_hostname, @config.get_port, - @config.get_unique_id, + @config.get_request_queue_name, + @config.get_response_queue_name, @config.get_time_to_wait_for_requests) remote_broker.subscribe(ApplyProcessingRules.new(@deploy_processing_rules)) @logger.info 'Waiting for requests.' @@ -61,15 +62,13 @@ def process_next_request_from(remote_broker, request) @audit.log(response) # Act - if response = fatal_error_response - @audit.end_line + if response.instance_of? FatalErrorResponse remote_broker.close - return + @audit.end_line + else + remote_broker.respond_to(request, response) end - - remote_broker.respond_to(request, response) end - end end end diff --git a/lib/tdl/queue/transport/remote_broker.rb b/lib/tdl/queue/transport/remote_broker.rb index 1f4b95b..96887f2 100644 --- a/lib/tdl/queue/transport/remote_broker.rb +++ b/lib/tdl/queue/transport/remote_broker.rb @@ -4,11 +4,10 @@ module TDL class RemoteBroker - def initialize(hostname, port, unique_id, request_timeout_millis) + def initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis) @stomp_client = Stomp::Client.new('', '', hostname, port) - @unique_id = unique_id - @request_queue = "/queue/#{@unique_id}.req" - @response_queue = "/queue/#{@unique_id}.resp" + @request_queue = "/queue/#{request_queue_name}" + @response_queue = "/queue/#{response_queue_name}" @serialization_provider = JSONRPCSerializationProvider.new @timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? }) @timer.start