From 4f37d68c4c11d86f9c7625dd6312f9a09d02eb55 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Tue, 26 Jan 2021 14:28:32 -0500 Subject: [PATCH 01/12] pika connection recreate --- src/radical/entk/execman/base/task_manager.py | 42 ++++++++++----- src/radical/entk/execman/rp/task_manager.py | 51 ++++++++++++------- 2 files changed, 63 insertions(+), 30 deletions(-) diff --git a/src/radical/entk/execman/base/task_manager.py b/src/radical/entk/execman/base/task_manager.py index 5b64b8046..555ed137e 100755 --- a/src/radical/entk/execman/base/task_manager.py +++ b/src/radical/entk/execman/base/task_manager.py @@ -136,7 +136,7 @@ def _tmgr(self, uid, rmgr, pending_queue, completed_queue, # -------------------------------------------------------------------------- # - def _sync_with_master(self, obj, obj_type, channel, queue): + def _sync_with_master(self, obj, obj_type, channel, conn_params, queue): corr_id = str(uuid.uuid4()) body = json.dumps({'object': obj.to_dict(), @@ -148,9 +148,15 @@ def _sync_with_master(self, obj, obj_type, channel, queue): self._prof.prof('pub_sync', state=obj.state, uid=obj.uid, msg=msg) self._log.debug('%s (%s) to sync with amgr', obj.uid, obj.state) - - channel.basic_publish(exchange='', routing_key=queue, body=body, + try: + channel.basic_publish(exchange='', routing_key=queue, body=body, properties=pika.BasicProperties(correlation_id=corr_id)) + except pika.exceptions.ConnectionClosed: + connection = pika.BlockingConnection(conn_params) + channel = connection.channel() + channel.basic_publish(exchange='', routing_key=queue, body=body, + properties=pika.BasicProperties(correlation_id=corr_id)) + # all queue name parts up to the last three are used as sid, the last # three parts are channel specifiers which need to be inversed to obtain @@ -190,7 +196,7 @@ def _sync_with_master(self, obj, obj_type, channel, queue): # -------------------------------------------------------------------------- # - def _advance(self, obj, obj_type, new_state, channel, queue): + def _advance(self, obj, obj_type, new_state, channel, conn_params, queue): try: old_state = obj.state @@ -203,14 +209,14 @@ def _advance(self, obj, obj_type, new_state, channel, queue): self._prof.prof('advance', uid=obj.uid, state=obj.state, msg=msg) self._log.info('Transition %s to %s', obj.uid, new_state) - self._sync_with_master(obj, obj_type, channel, queue) + self._sync_with_master(obj, obj_type, channel, conn_params, queue) except Exception as ex: self._log.exception('Transition %s to state %s failed, error: %s', obj.uid, new_state, ex) obj.state = old_state - self._sync_with_master(obj, obj_type, channel, queue) + self._sync_with_master(obj, obj_type, channel, conn_params, queue) raise EnTKError(ex) from ex @@ -240,14 +246,24 @@ def _heartbeat(self): while not self._hb_terminate.is_set(): corr_id = str(uuid.uuid4()) + try: + # Heartbeat request signal sent to task manager via rpc-queue + props = pika.BasicProperties(reply_to=self._hb_response_q, + correlation_id=corr_id) + mq_channel.basic_publish(exchange='', + routing_key=self._hb_request_q, + properties=props, + body='request') + except pika.exceptions.ConnectionClosed: + mq_connection = pika.BlockingConnection(self._rmq_conn_params) + mq_channel = mq_connection.channel() + props = pika.BasicProperties(reply_to=self._hb_response_q, + correlation_id=corr_id) + mq_channel.basic_publish(exchange='', + routing_key=self._hb_request_q, + properties=props, + body='request') - # Heartbeat request signal sent to task manager via rpc-queue - props = pika.BasicProperties(reply_to=self._hb_response_q, - correlation_id=corr_id) - mq_channel.basic_publish(exchange='', - routing_key=self._hb_request_q, - properties=props, - body='request') self._log.info('Sent heartbeat request') # Sleep for hb_interval and then check if tmgr responded diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index 2f0652bed..93304cc81 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -104,30 +104,39 @@ def _tmgr(self, uid, rmgr, pending_queue, completed_queue, try: # ------------------------------------------------------------------ - def heartbeat_response(mq_channel): + def heartbeat_response(mq_channel, conn_params): + channel = mq_channel try: # Get request from heartbeat-req for heartbeat response method_frame, props, body = \ - mq_channel.basic_get(queue=self._hb_request_q) + channel.basic_get(queue=self._hb_request_q) if not body: return self._log.info('Received heartbeat request') - - nprops = pika.BasicProperties( + try: + nprops = pika.BasicProperties( + correlation_id=props.correlation_id) + channel.basic_publish(exchange='', + routing_key=self._hb_response_q, + properties=nprops, + body='response') + except pika.exceptions.ConnectionClosed: + connection = pika.BlockingConnection(conn_params) + channel = connection.channel() + nprops = pika.BasicProperties( correlation_id=props.correlation_id) - mq_channel.basic_publish( - exchange='', - routing_key=self._hb_response_q, - properties=nprops, - body='response') + channel.basic_publish(exchange='', + routing_key=self._hb_response_q, + properties=nprops, + body='response') self._log.info('Sent heartbeat response') - mq_channel.basic_ack(delivery_tag=method_frame.delivery_tag) + channel.basic_ack(delivery_tag=method_frame.delivery_tag) except Exception as ex: self._log.exception('Failed to respond to heartbeat, ' + @@ -173,7 +182,7 @@ def heartbeat_response(mq_channel): mq_channel.basic_ack( delivery_tag=method_frame.delivery_tag) - heartbeat_response(mq_channel) + heartbeat_response(mq_channel, rmq_conn_params) except Exception as ex: self._log.exception('Error in task execution: %s', ex) @@ -248,16 +257,23 @@ def unit_state_cb(unit, state): task = create_task_from_cu(unit, self._prof) self._advance(task, 'Task', states.COMPLETED, - mq_channel, '%s-cb-to-sync' % self._sid) + mq_channel, rmq_conn_params, + '%s-cb-to-sync' % self._sid) load_placeholder(task, unit.uid) task_as_dict = json.dumps(task.to_dict()) + try: + mq_channel.basic_publish(exchange='', + routing_key='%s-completedq-1' % self._sid, + body=task_as_dict) + except pika.exceptions.ConnectionClosed: + connection = pika.BlockingConnection(rmq_conn_params) + mq_channel = connection.channel() + mq_channel.basic_publish(exchange='', + routing_key='%s-completedq-1' % self._sid, + body=task_as_dict) - mq_channel.basic_publish( - exchange='', - routing_key='%s-completedq-1' % self._sid, - body=task_as_dict) self._log.info('Pushed task %s with state %s to completed ' 'queue %s-completedq-1', @@ -311,7 +327,8 @@ def unit_state_cb(unit, state): task, placeholders, self._prof)) self._advance(task, 'Task', states.SUBMITTING, - mq_channel, '%s-tmgr-to-sync' % self._sid) + mq_channel, rmq_conn_params, + '%s-tmgr-to-sync' % self._sid) umgr.submit_units(bulk_cuds) mq_connection.close() From 061082bc3fb82c8547713d6c1c33469479299dd9 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Tue, 26 Jan 2021 15:29:12 -0500 Subject: [PATCH 02/12] updating unit callback --- src/radical/entk/execman/rp/task_manager.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index 93304cc81..6a02d4eb5 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -245,10 +245,10 @@ def load_placeholder(task, rts_uid): 'rts_uid': rts_uid} # ---------------------------------------------------------------------- - def unit_state_cb(unit, state): + def unit_state_cb(unit, state, mq_channel, conn_params): try: - + channel = mq_channel self._log.debug('Unit %s in state %s' % (unit.uid, unit.state)) if unit.state in rp.FINAL: @@ -257,20 +257,20 @@ def unit_state_cb(unit, state): task = create_task_from_cu(unit, self._prof) self._advance(task, 'Task', states.COMPLETED, - mq_channel, rmq_conn_params, + channel, conn_params, '%s-cb-to-sync' % self._sid) load_placeholder(task, unit.uid) task_as_dict = json.dumps(task.to_dict()) try: - mq_channel.basic_publish(exchange='', + channel.basic_publish(exchange='', routing_key='%s-completedq-1' % self._sid, body=task_as_dict) except pika.exceptions.ConnectionClosed: - connection = pika.BlockingConnection(rmq_conn_params) - mq_channel = connection.channel() - mq_channel.basic_publish(exchange='', + connection = pika.BlockingConnection(conn_params) + channel = connection.channel() + channel.basic_publish(exchange='', routing_key='%s-completedq-1' % self._sid, body=task_as_dict) From e0e7f08278fb0d7c05621dd1c4e777250802ce8d Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Tue, 26 Jan 2021 15:48:09 -0500 Subject: [PATCH 03/12] callback data --- src/radical/entk/execman/rp/task_manager.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index 6a02d4eb5..8aabb41c2 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -245,10 +245,11 @@ def load_placeholder(task, rts_uid): 'rts_uid': rts_uid} # ---------------------------------------------------------------------- - def unit_state_cb(unit, state, mq_channel, conn_params): + def unit_state_cb(unit, state, cb_data): try: - channel = mq_channel + channel = cb_data['channel'] + conn_params = cb_data['params'] self._log.debug('Unit %s in state %s' % (unit.uid, unit.state)) if unit.state in rp.FINAL: @@ -295,7 +296,8 @@ def unit_state_cb(unit, state, mq_channel, conn_params): umgr = rp.UnitManager(session=rmgr._session) umgr.add_pilots(rmgr.pilot) - umgr.register_callback(unit_state_cb) + umgr.register_callback(unit_state_cb, cb_data={'channel': mq_channel, + 'params': rmq_conn_params}) try: From 55230ea55c4e801cf75235052572d4c9dba3a9b0 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Tue, 26 Jan 2021 17:19:02 -0500 Subject: [PATCH 04/12] second exception occured --- src/radical/entk/execman/base/task_manager.py | 6 ++++-- src/radical/entk/execman/rp/task_manager.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/radical/entk/execman/base/task_manager.py b/src/radical/entk/execman/base/task_manager.py index 555ed137e..5cff84ced 100755 --- a/src/radical/entk/execman/base/task_manager.py +++ b/src/radical/entk/execman/base/task_manager.py @@ -151,7 +151,8 @@ def _sync_with_master(self, obj, obj_type, channel, conn_params, queue): try: channel.basic_publish(exchange='', routing_key=queue, body=body, properties=pika.BasicProperties(correlation_id=corr_id)) - except pika.exceptions.ConnectionClosed: + except (pika.exceptions.ConnectionClosed, + pika.exceptions.ChannelClosed): connection = pika.BlockingConnection(conn_params) channel = connection.channel() channel.basic_publish(exchange='', routing_key=queue, body=body, @@ -254,7 +255,8 @@ def _heartbeat(self): routing_key=self._hb_request_q, properties=props, body='request') - except pika.exceptions.ConnectionClosed: + except (pika.exceptions.ConnectionClosed, + pika.exceptions.ChannelClosed): mq_connection = pika.BlockingConnection(self._rmq_conn_params) mq_channel = mq_connection.channel() props = pika.BasicProperties(reply_to=self._hb_response_q, diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index 8aabb41c2..73d66b072 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -124,7 +124,8 @@ def heartbeat_response(mq_channel, conn_params): routing_key=self._hb_response_q, properties=nprops, body='response') - except pika.exceptions.ConnectionClosed: + except (pika.exceptions.ConnectionClosed, + pika.exceptions.ChannelClosed): connection = pika.BlockingConnection(conn_params) channel = connection.channel() nprops = pika.BasicProperties( @@ -268,7 +269,8 @@ def unit_state_cb(unit, state, cb_data): channel.basic_publish(exchange='', routing_key='%s-completedq-1' % self._sid, body=task_as_dict) - except pika.exceptions.ConnectionClosed: + except (pika.exceptions.ConnectionClosed, + pika.exceptions.ChannelClosed): connection = pika.BlockingConnection(conn_params) channel = connection.channel() channel.basic_publish(exchange='', From 15c1c7d7f76578845978c3990c34d7d2fdbeb622 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 27 Jan 2021 12:39:12 -0500 Subject: [PATCH 05/12] trying to trigger exception handling --- tests/test_integration/test_tmgr_base/test_sync_with_master.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_integration/test_tmgr_base/test_sync_with_master.py b/tests/test_integration/test_tmgr_base/test_sync_with_master.py index 696c6c4a2..bced08128 100644 --- a/tests/test_integration/test_tmgr_base/test_sync_with_master.py +++ b/tests/test_integration/test_tmgr_base/test_sync_with_master.py @@ -65,6 +65,7 @@ def component_execution(inputs, method, channel, queue): msg = json.loads(body) self.assertEqual(msg['object'], packet[1].to_dict()) self.assertEqual(msg['type'], packet[0]) + time.sleep(200) except Exception as ex: print(body) print(json.loads(body)) From 2872ef554ae1e3af6bc6e8d884d9e9a42cb4277c Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 27 Jan 2021 12:47:05 -0500 Subject: [PATCH 06/12] trying to trigger exception handling --- .../test_tmgr_base/test_sync_with_master.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_integration/test_tmgr_base/test_sync_with_master.py b/tests/test_integration/test_tmgr_base/test_sync_with_master.py index bced08128..9ad5c9c6c 100644 --- a/tests/test_integration/test_tmgr_base/test_sync_with_master.py +++ b/tests/test_integration/test_tmgr_base/test_sync_with_master.py @@ -25,10 +25,12 @@ def test_sync_with_master(self, mocked_init, mocked_Logger, mocked_Profiler): # -------------------------------------------------------------------------- # - def component_execution(inputs, method, channel, queue): + def component_execution(inputs, method, channel, conn_params, queue): for obj_type, obj, in inputs: - method(obj, obj_type, channel, queue) + method(obj, obj_type, channel, conn_params, queue) + if channel.is_open: + channel.close() return True @@ -65,7 +67,6 @@ def component_execution(inputs, method, channel, queue): msg = json.loads(body) self.assertEqual(msg['object'], packet[1].to_dict()) self.assertEqual(msg['type'], packet[0]) - time.sleep(200) except Exception as ex: print(body) print(json.loads(body)) From cef9fe27f7b4f50e554e415603220318b9c3cff9 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 27 Jan 2021 16:53:08 -0500 Subject: [PATCH 07/12] hopefully I trigger the exception --- tests/test_component/test_tmgr_base.py | 13 +++--- tests/test_component/test_tmgr_rp.py | 2 +- .../test_tmgr_base/test_sync_with_master.py | 44 +++++++++---------- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/tests/test_component/test_tmgr_base.py b/tests/test_component/test_tmgr_base.py index e9f40ca7d..eec64051f 100755 --- a/tests/test_component/test_tmgr_base.py +++ b/tests/test_component/test_tmgr_base.py @@ -107,9 +107,9 @@ def test_advance(self, mocked_init, mocked_Logger, mocked_Profiler, global_syncs = [] - def _sync_side_effect(log_entry, uid, state, msg): + def _sync_side_effect(obj, obj_type, channel, conn_params, queue): nonlocal global_syncs - global_syncs.append([log_entry, uid, state, msg]) + global_syncs.append([obj, obj_type, channel, conn_params, queue]) tmgr._log = mocked_Logger tmgr._prof = mocked_Profiler @@ -120,12 +120,13 @@ def _sync_side_effect(log_entry, uid, state, msg): obj.parent_pipeline = {'uid': 'test_pipe'} obj.uid = 'test_object' obj.state = 'test_state' - tmgr._advance(obj, 'Task', None, 'channel','queue') - self.assertEqual(global_syncs[0],[obj, 'Task', 'channel','queue']) + tmgr._advance(obj, 'Task', None, 'channel','params','queue') + self.assertEqual(global_syncs[0],[obj, 'Task', 'channel', 'params', 'queue']) self.assertIsNone(obj.state) global_syncs = [] - tmgr._advance(obj, 'Stage', 'new_state', 'channel','queue') - self.assertEqual(global_syncs[0],[obj, 'Stage', 'channel','queue']) + tmgr._advance(obj, 'Stage', 'new_state', 'channel', 'params', 'queue') + self.assertEqual(global_syncs[0],[obj, 'Stage', 'channel', 'params', + 'queue']) self.assertEqual(obj.state, 'new_state') # ------------------------------------------------------------------------------ diff --git a/tests/test_component/test_tmgr_rp.py b/tests/test_component/test_tmgr_rp.py index 458f0dea0..7a22d50ef 100644 --- a/tests/test_component/test_tmgr_rp.py +++ b/tests/test_component/test_tmgr_rp.py @@ -66,7 +66,7 @@ def test_start_manager(self, mocked_init, mocked_Logger, mocked_Profiler): tmgr._prof = mocked_Profiler tmgr._uid = 'tmgr.0000' tmgr._rmgr = 'test_rmgr' - tmgr._rmq_conn_params = 'test_params' + tmgr._rmq_conn_params = rmq_params tmgr._pending_queue = ['pending_queues'] tmgr._completed_queue = ['completed_queues'] tmgr._tmgr = _tmgr_side_effect diff --git a/tests/test_integration/test_tmgr_base/test_sync_with_master.py b/tests/test_integration/test_tmgr_base/test_sync_with_master.py index 9ad5c9c6c..c6a4bb3ca 100644 --- a/tests/test_integration/test_tmgr_base/test_sync_with_master.py +++ b/tests/test_integration/test_tmgr_base/test_sync_with_master.py @@ -6,7 +6,7 @@ import time from unittest import TestCase, mock -import threading as mt +import multiprocessing as mp from radical.entk import Task, Stage from radical.entk.execman.base import Base_TaskManager as BaseTmgr @@ -22,44 +22,44 @@ class TestTask(TestCase): @mock.patch('radical.utils.Logger') @mock.patch('radical.utils.Profiler') def test_sync_with_master(self, mocked_init, mocked_Logger, mocked_Profiler): - + self.maxDiff = None # -------------------------------------------------------------------------- # - def component_execution(inputs, method, channel, conn_params, queue): - - for obj_type, obj, in inputs: - method(obj, obj_type, channel, conn_params, queue) - if channel.is_open: - channel.close() - return True + def component_execution(packets, conn_params, queue): + tmgr = BaseTmgr(None, None, None, None, None, None) + tmgr._log = mocked_Logger + tmgr._prof = mocked_Profiler + mq_connection = pika.BlockingConnection(rmq_conn_params) + mq_channel = mq_connection.channel() + for obj_type, obj, in packets: + tmgr._sync_with_master(obj, obj_type, mq_channel, conn_params, + queue) + if mq_channel.is_open: + mq_channel.close() task = Task() task.parent_stage = {'uid':'stage.0000', 'name': 'stage.0000'} - hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') - port = int(os.environ.get('RMQ_PORT', '5672')) - username = os.environ.get('RMQ_USERNAME','guest') - password = os.environ.get('RMQ_PASSWORD','guest') packets = [('Task', task)] stage = Stage() stage.parent_pipeline = {'uid':'pipe.0000', 'name': 'pipe.0000'} packets.append(('Stage', stage)) + hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') + port = int(os.environ.get('RMQ_PORT', '5672')) + username = os.environ.get('RMQ_USERNAME','guest') + password = os.environ.get('RMQ_PASSWORD','guest') credentials = pika.PlainCredentials(username, password) rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, credentials=credentials) mq_connection = pika.BlockingConnection(rmq_conn_params) mq_channel = mq_connection.channel() mq_channel.queue_declare(queue='master') - tmgr = BaseTmgr(None, None, None, None, None, None) - tmgr._log = mocked_Logger - tmgr._prof = mocked_Profiler - - master_thread = mt.Thread(target=component_execution, + master_thread = mp.Process(target=component_execution, name='tmgr_sync', - args=(packets, tmgr._sync_with_master, - mq_channel, 'master')) + args=(packets, rmq_conn_params, 'master')) master_thread.start() - time.sleep(0.1) + + time.sleep(1) try: while packets: packet = packets.pop(0) @@ -68,7 +68,7 @@ def component_execution(inputs, method, channel, conn_params, queue): self.assertEqual(msg['object'], packet[1].to_dict()) self.assertEqual(msg['type'], packet[0]) except Exception as ex: - print(body) + print(ex) print(json.loads(body)) master_thread.join() mq_channel.queue_delete(queue='master') From ee31b8bf77af71680a01b0f8c7b35b1f4c39206a Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 28 Jan 2021 12:49:14 -0500 Subject: [PATCH 08/12] heartbeat integration test --- src/radical/entk/execman/base/task_manager.py | 2 ++ tests/test_integration/test_tmgr_base/test_sync_with_master.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/radical/entk/execman/base/task_manager.py b/src/radical/entk/execman/base/task_manager.py index 5cff84ced..1e0d932f7 100755 --- a/src/radical/entk/execman/base/task_manager.py +++ b/src/radical/entk/execman/base/task_manager.py @@ -275,11 +275,13 @@ def _heartbeat(self): queue=self._hb_response_q) if not body: # no usable response + self._log.error('Heartbeat response no body') return # raise EnTKError('heartbeat timeout') if corr_id != props.correlation_id: # incorrect response + self._log.error('Heartbeat response wrong correlation') return # raise EnTKError('heartbeat timeout') diff --git a/tests/test_integration/test_tmgr_base/test_sync_with_master.py b/tests/test_integration/test_tmgr_base/test_sync_with_master.py index c6a4bb3ca..44797018c 100644 --- a/tests/test_integration/test_tmgr_base/test_sync_with_master.py +++ b/tests/test_integration/test_tmgr_base/test_sync_with_master.py @@ -22,7 +22,7 @@ class TestTask(TestCase): @mock.patch('radical.utils.Logger') @mock.patch('radical.utils.Profiler') def test_sync_with_master(self, mocked_init, mocked_Logger, mocked_Profiler): - self.maxDiff = None + # -------------------------------------------------------------------------- # def component_execution(packets, conn_params, queue): From cf1e30112b230151a793d3473aeb44934de3e599 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 28 Jan 2021 12:53:26 -0500 Subject: [PATCH 09/12] heartbeat integration test --- .../test_tmgr_base/test_heartbeat.py | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 tests/test_integration/test_tmgr_base/test_heartbeat.py diff --git a/tests/test_integration/test_tmgr_base/test_heartbeat.py b/tests/test_integration/test_tmgr_base/test_heartbeat.py new file mode 100644 index 000000000..4cdee3dca --- /dev/null +++ b/tests/test_integration/test_tmgr_base/test_heartbeat.py @@ -0,0 +1,100 @@ +# pylint: disable=protected-access, unused-argument +# pylint: disable=no-value-for-parameter +import os +import pika +import json +import time + +from unittest import TestCase, mock + +import threading as mt +import radical.utils as ru + +from radical.entk.execman.base import Base_TaskManager as BaseTmgr + + +# ------------------------------------------------------------------------------ +# +class TestTask(TestCase): + + # -------------------------------------------------------------------------- + # + @mock.patch.object(BaseTmgr, '__init__', return_value=None) + @mock.patch('radical.utils.Profiler') + def test_heartbeat(self, mocked_init, mocked_Profiler): + + hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') + port = int(os.environ.get('RMQ_PORT', '5672')) + username = os.environ.get('RMQ_USERNAME','guest') + password = os.environ.get('RMQ_PASSWORD','guest') + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + tmgr = BaseTmgr(None, None, None, None, None, None) + tmgr._uid = 'tmgr.0000' + tmgr._log = ru.Logger('radical.entk.manager.base', level='DEBUG') + tmgr._prof = mocked_Profiler + tmgr._hb_interval = 0.1 + tmgr._hb_terminate = mt.Event() + tmgr._hb_request_q = 'tmgr-hb-request' + tmgr._hb_response_q = 'tmgr-hb-response' + tmgr._rmq_conn_params = rmq_conn_params + mq_connection = pika.BlockingConnection(rmq_conn_params) + mq_channel = mq_connection.channel() + mq_channel.queue_declare(queue='tmgr-hb-request') + mq_channel.queue_declare(queue='tmgr-hb-response') + tmgr._log.info('Starting test') + master_thread = mt.Thread(target=tmgr._heartbeat, + name='tmgr_heartbeat') + + master_thread.start() + time.sleep(0.1) + body = None + try: + for i in range(5): + while body is None: + _, props, body = mq_channel.basic_get(queue='tmgr-hb-request') + self.assertEqual(body, b'request') + nprops = pika.BasicProperties(correlation_id=props.correlation_id) + mq_channel.basic_publish(exchange='', + routing_key='tmgr-hb-response', + properties=nprops, + body='response') + self.assertTrue(master_thread.is_alive()) + body = None + + time.sleep(0.5) + self.assertFalse(master_thread.is_alive()) + master_thread.join() + mq_channel.queue_delete(queue='tmgr-hb-request') + mq_channel.queue_delete(queue='tmgr-hb-response') + mq_channel.queue_declare(queue='tmgr-hb-request') + mq_channel.queue_declare(queue='tmgr-hb-response') + + master_thread = mt.Thread(target=tmgr._heartbeat, + name='tmgr_heartbeat') + master_thread.start() + body = None + while body is None: + _, props, body = mq_channel.basic_get(queue='tmgr-hb-request') + mq_channel.basic_publish(exchange='', + routing_key='tmgr-hb-response', + body='response') + time.sleep(0.2) + self.assertFalse(master_thread.is_alive()) + + except Exception as ex: + tmgr._hb_terminate.set() + master_thread.join() + mq_channel.queue_delete(queue='tmgr-hb-request') + mq_channel.queue_delete(queue='tmgr-hb-response') + mq_channel.close() + mq_connection.close() + raise ex + else: + tmgr._hb_terminate.set() + master_thread.join() + mq_channel.queue_delete(queue='tmgr-hb-request') + mq_channel.queue_delete(queue='tmgr-hb-response') + mq_channel.close() + mq_connection.close() From d08624714ba7cbbf17c3d337da50ab7719eeba53 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 28 Jan 2021 13:06:20 -0500 Subject: [PATCH 10/12] threads instead of processes for coverage --- .../test_tmgr_base/test_heartbeat.py | 3 +-- .../test_tmgr_base/test_sync_with_master.py | 14 +++++++------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/test_integration/test_tmgr_base/test_heartbeat.py b/tests/test_integration/test_tmgr_base/test_heartbeat.py index 4cdee3dca..d6dfccc7b 100644 --- a/tests/test_integration/test_tmgr_base/test_heartbeat.py +++ b/tests/test_integration/test_tmgr_base/test_heartbeat.py @@ -2,7 +2,6 @@ # pylint: disable=no-value-for-parameter import os import pika -import json import time from unittest import TestCase, mock @@ -51,7 +50,7 @@ def test_heartbeat(self, mocked_init, mocked_Profiler): time.sleep(0.1) body = None try: - for i in range(5): + for _ in range(5): while body is None: _, props, body = mq_channel.basic_get(queue='tmgr-hb-request') self.assertEqual(body, b'request') diff --git a/tests/test_integration/test_tmgr_base/test_sync_with_master.py b/tests/test_integration/test_tmgr_base/test_sync_with_master.py index 44797018c..ea404bec1 100644 --- a/tests/test_integration/test_tmgr_base/test_sync_with_master.py +++ b/tests/test_integration/test_tmgr_base/test_sync_with_master.py @@ -6,7 +6,7 @@ import time from unittest import TestCase, mock -import multiprocessing as mp +import threading as mt from radical.entk import Task, Stage from radical.entk.execman.base import Base_TaskManager as BaseTmgr @@ -30,13 +30,13 @@ def component_execution(packets, conn_params, queue): tmgr = BaseTmgr(None, None, None, None, None, None) tmgr._log = mocked_Logger tmgr._prof = mocked_Profiler - mq_connection = pika.BlockingConnection(rmq_conn_params) - mq_channel = mq_connection.channel() + mq_connection2 = pika.BlockingConnection(rmq_conn_params) + mq_channel2 = mq_connection.channel() for obj_type, obj, in packets: - tmgr._sync_with_master(obj, obj_type, mq_channel, conn_params, + tmgr._sync_with_master(obj, obj_type, mq_channel2, conn_params, queue) - if mq_channel.is_open: - mq_channel.close() + if mq_channel2.is_open: + mq_channel2.close() task = Task() task.parent_stage = {'uid':'stage.0000', 'name': 'stage.0000'} @@ -54,7 +54,7 @@ def component_execution(packets, conn_params, queue): mq_connection = pika.BlockingConnection(rmq_conn_params) mq_channel = mq_connection.channel() mq_channel.queue_declare(queue='master') - master_thread = mp.Process(target=component_execution, + master_thread = mt.Thread(target=component_execution, name='tmgr_sync', args=(packets, rmq_conn_params, 'master')) master_thread.start() From 802a7dcf521a276b283f33637c3e525c860f14c1 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 28 Jan 2021 13:06:39 -0500 Subject: [PATCH 11/12] test bug fix --- tests/test_integration/test_tmgr_base/test_sync_with_master.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_integration/test_tmgr_base/test_sync_with_master.py b/tests/test_integration/test_tmgr_base/test_sync_with_master.py index ea404bec1..ebdfa6114 100644 --- a/tests/test_integration/test_tmgr_base/test_sync_with_master.py +++ b/tests/test_integration/test_tmgr_base/test_sync_with_master.py @@ -31,7 +31,7 @@ def component_execution(packets, conn_params, queue): tmgr._log = mocked_Logger tmgr._prof = mocked_Profiler mq_connection2 = pika.BlockingConnection(rmq_conn_params) - mq_channel2 = mq_connection.channel() + mq_channel2 = mq_connection2.channel() for obj_type, obj, in packets: tmgr._sync_with_master(obj, obj_type, mq_channel2, conn_params, queue) From 1baf30f40cd454d96f390e242211f44a8ded7c4d Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Thu, 28 Jan 2021 13:16:43 -0500 Subject: [PATCH 12/12] disabling patch as it is not required. --- .codecov.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.codecov.yml b/.codecov.yml index 798e806b4..25741db10 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -6,3 +6,4 @@ coverage: threshold: 1% paths: - "src" + patch: off