Skip to content

Commit

Permalink
Adapting the broker, queue step and rest of the implementation to use…
Browse files Browse the repository at this point in the history
… request_queue_name and response_queue_name, injected via config rather than building it from user-name as per previous configuration
  • Loading branch information
neomatrix369 committed Oct 29, 2018
1 parent 8585a28 commit b2226dc
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 23 deletions.
31 changes: 23 additions & 8 deletions features/queue_steps.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,25 @@
# Broker client definition
STOMP_PORT = 21613

REQUEST_QUEUE_NAME = 'some-user-req'
RESPONSE_QUEUE_NAME = 'some-user-resp'

# ~~~~~ Setup

Given(/^I start with a clean broker and a client for user "([^\"]*)"$/) do |username|
unique_id = username
@request_queue = BROKER.add_queue("#{unique_id}.req")
Given(/^I start with a clean broker having a request and a response queue$/) do ||
@request_queue = BROKER.add_queue(REQUEST_QUEUE_NAME)
@request_queue.purge

@response_queue = BROKER.add_queue("#{unique_id}.resp")
@response_queue = BROKER.add_queue(RESPONSE_QUEUE_NAME)
@response_queue.purge
end

Given(/^a client that connects to the queues$/) do
config = TDL::ImplementationRunnerConfig.new()
.set_hostname(HOSTNAME)
.set_port(STOMP_PORT)
.set_unique_id(unique_id)
.set_request_queue_name(REQUEST_QUEUE_NAME)
.set_response_queue_name(RESPONSE_QUEUE_NAME)

@queueBasedImplementationRunnerBuilder = TDL::QueueBasedImplementationRunnerBuilder.new()
.set_config(config)
Expand All @@ -43,7 +48,8 @@
config = TDL::ImplementationRunnerConfig.new()
.set_hostname('111')
.set_port(STOMP_PORT)
.set_unique_id('X')
.set_request_queue_name('X')
.set_response_queue_name('Y')

@queueBasedImplementationRunnerBuilder = TDL::QueueBasedImplementationRunnerBuilder.new()
.set_config(config);
Expand Down Expand Up @@ -116,8 +122,7 @@ def as_implementation(call)
@queueBasedImplementationRunnerBuilder
.with_solution_for(
row[:method],
as_implementation(row[:call]),
as_action(row[:action]))
as_implementation(row[:call]))
end

@queueBasedImplementationRunner = @queueBasedImplementationRunnerBuilder.create
Expand Down Expand Up @@ -165,6 +170,16 @@ def as_implementation(call)
'The response queue has different size. Messages have been published'
end

Then(/^the client should consume one request$/) do
assert_equal @request_count - 1, @request_queue.get_size,
'The request queue has different size. Messages have been consumed'
end

Then(/^the client should publish one response$/) do
assert_equal @request_count - 2, @response_queue.get_size,
'The response queue has different size. Messages have been published'
end

Then(/^I should get no exception$/) do
#OBS if you get here there were no exceptions
end
19 changes: 15 additions & 4 deletions lib/tdl/queue/implementation_runner_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ def initialize
@port = 61613
@time_to_wait_for_requests = 500
@audit_stream = ConsoleAuditStream.new
@request_queue_name = ''
@response_queue_name = ''
end

def set_hostname(hostname)
Expand All @@ -20,8 +22,13 @@ def set_port(port)
self
end

def set_unique_id(unique_id)
@unique_id = unique_id
def set_request_queue_name(queue_name)
@request_queue_name = queue_name
self
end

def set_response_queue_name(queue_name)
@response_queue_name = queue_name
self
end

Expand All @@ -43,8 +50,12 @@ def get_port
@port
end

def get_unique_id
@unique_id
def get_request_queue_name
@request_queue_name
end

def get_response_queue_name
@response_queue_name
end

def get_time_to_wait_for_requests
Expand Down
13 changes: 6 additions & 7 deletions lib/tdl/queue/queue_based_implementation_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def run
remote_broker = RemoteBroker.new(
@config.get_hostname,
@config.get_port,
@config.get_unique_id,
@config.get_request_queue_name,
@config.get_response_queue_name,
@config.get_time_to_wait_for_requests)
remote_broker.subscribe(ApplyProcessingRules.new(@deploy_processing_rules))
@logger.info 'Waiting for requests.'
Expand Down Expand Up @@ -61,15 +62,13 @@ def process_next_request_from(remote_broker, request)
@audit.log(response)

# Act
if response = fatal_error_response
@audit.end_line
if response.instance_of? FatalErrorResponse
remote_broker.close
return
@audit.end_line
else
remote_broker.respond_to(request, response)
end

remote_broker.respond_to(request, response)
end

end
end
end
Expand Down
7 changes: 3 additions & 4 deletions lib/tdl/queue/transport/remote_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

module TDL
class RemoteBroker
def initialize(hostname, port, unique_id, request_timeout_millis)
def initialize(hostname, port, request_queue_name, response_queue_name, request_timeout_millis)
@stomp_client = Stomp::Client.new('', '', hostname, port)
@unique_id = unique_id
@request_queue = "/queue/#{@unique_id}.req"
@response_queue = "/queue/#{@unique_id}.resp"
@request_queue = "/queue/#{request_queue_name}"
@response_queue = "/queue/#{response_queue_name}"
@serialization_provider = JSONRPCSerializationProvider.new
@timer = ThreadTimer.new(request_timeout_millis, lambda = ->() { close unless closed? })
@timer.start
Expand Down

0 comments on commit b2226dc

Please sign in to comment.