Skip to content

Commit

Permalink
Cancelling thread start while unit tests running
Browse files Browse the repository at this point in the history
This change modifies the Radware driver and its unit testing code
to not start operations completion thread while unit tests are running.

The driver initialization changed not to start the operations completion thread,
the thread is started only when operation completion item is inserted into the queue
for the first time.
The operation completion functionality was moved to a new function which
is called by the operations completion thread run() function.
The run() function still have the functionality of popping operation completion
items out of the queue and push failed items back.

Unit testing code mocks the operation completion items queue
by calling the operations completion hanler new function when item
is added.

Start() and join() functions of the thread were mocked to do nothing.

All sleep() entrances were removed from the unit testing code.
All unnecessary mock_reset() calls were removed.

Change-Id: I72380bf223be690831aba1fc29c3dca910245516
Closes-Bug: #1245208
  • Loading branch information
evgenyfedoruk committed Mar 28, 2014
1 parent b76c9e8 commit d52f84d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 106 deletions.
94 changes: 54 additions & 40 deletions neutron/services/loadbalancer/drivers/radware/driver.py
Expand Up @@ -177,7 +177,7 @@ def __init__(self, plugin):
plugin)
self.workflow_templates_exists = False
self.completion_handler.setDaemon(True)
self.completion_handler.start()
self.completion_handler_started = False

def create_vip(self, context, vip):
LOG.debug(_('create_vip. vip: %s'), str(vip))
Expand Down Expand Up @@ -340,6 +340,12 @@ def _get_vip_network_id(self, context, extended_vip):
context, extended_vip['subnet_id'])
return subnet['network_id']

def _start_completion_handling_thread(self):
if not self.completion_handler_started:
LOG.info(_('Starting operation completion handling thread'))
self.completion_handler.start()
self.completion_handler_started = True

@call_log.log
def _update_workflow(self, wf_name, action,
wf_params, context,
Expand Down Expand Up @@ -371,6 +377,8 @@ def _update_workflow(self, wf_name, action,
entity_id,
delete=delete)
LOG.debug(_('Pushing operation %s to the queue'), oper)

self._start_completion_handling_thread()
self.queue.put_nowait(oper)

def _remove_workflow(self, ids, context):
Expand All @@ -391,6 +399,8 @@ def _remove_workflow(self, ids, context):
ids['vip'],
delete=True)
LOG.debug(_('Pushing operation %s to the queue'), oper)

self._start_completion_handling_thread()
self.queue.put_nowait(oper)

def _remove_service(self, service_name):
Expand Down Expand Up @@ -619,24 +629,52 @@ def __init__(self, queue, rest_client, plugin):
self.stoprequest = threading.Event()
self.opers_to_handle_before_rest = 0

def _get_db_status(self, operation, success, messages=None):
"""Get the db_status based on the status of the vdirect operation."""
if not success:
# we have a failure - log it and set the return ERROR as DB state
msg = ', '.join(messages) if messages else "unknown"
error_params = {"operation": operation, "msg": msg}
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
error_params)
return constants.ERROR
if operation.delete:
return None
else:
return constants.ACTIVE

def join(self, timeout=None):
self.stoprequest.set()
super(OperationCompletionHandler, self).join(timeout)

def handle_operation_completion(self, oper):
result = self.rest_client.call('GET',
oper.operation_url,
None,
None)
completed = result[RESP_DATA]['complete']
reason = result[RESP_REASON],
description = result[RESP_STR]
if completed:
# operation is done - update the DB with the status
# or delete the entire graph from DB
success = result[RESP_DATA]['success']
sec_to_completion = time.time() - oper.creation_time
debug_data = {'oper': oper,
'sec_to_completion': sec_to_completion,
'success': success}
LOG.debug(_('Operation %(oper)s is completed after '
'%(sec_to_completion)d sec '
'with success status: %(success)s :'),
debug_data)
db_status = None
if not success:
# failure - log it and set the return ERROR as DB state
if reason or description:
msg = 'Reason:%s. Description:%s' % (reason, description)
else:
msg = "unknown"
error_params = {"operation": oper, "msg": msg}
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
error_params)
db_status = constants.ERROR
else:
if oper.delete:
_remove_object_from_db(self.plugin, oper)
else:
db_status = constants.ACTIVE

if db_status:
_update_vip_graph_status(self.plugin, oper, db_status)

return completed

def run(self):
oper = None
while not self.stoprequest.isSet():
Expand All @@ -653,31 +691,7 @@ def run(self):
str(oper))
# check the status - if oper is done: update the db ,
# else push the oper again to the queue
result = self.rest_client.call('GET',
oper.operation_url,
None,
None)
completed = result[RESP_DATA]['complete']
if completed:
# operation is done - update the DB with the status
# or delete the entire graph from DB
success = result[RESP_DATA]['success']
sec_to_completion = time.time() - oper.creation_time
debug_data = {'oper': oper,
'sec_to_completion': sec_to_completion,
'success': success}
LOG.debug(_('Operation %(oper)s is completed after '
'%(sec_to_completion)d sec '
'with success status: %(success)s :'),
debug_data)
db_status = self._get_db_status(oper, success)
if db_status:
_update_vip_graph_status(
self.plugin, oper, db_status)
else:
_remove_object_from_db(
self.plugin, oper)
else:
if not self.handle_operation_completion(oper):
LOG.debug(_('Operation %s is not completed yet..') % oper)
# Not completed - push to the queue again
self.queue.put_nowait(oper)
Expand Down

0 comments on commit d52f84d

Please sign in to comment.