Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/integration tests #566

Merged
merged 21 commits into from
Apr 14, 2021
41 changes: 24 additions & 17 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ on:

jobs:
test:

runs-on: ubuntu-latest

services:
Expand All @@ -24,33 +23,41 @@ jobs:
- uses: actions/checkout@v2
with:
fetch-depth: 2
- name: Set up Python 3.6.13
- name: Set up Python 3.6
uses: actions/setup-python@v2
with:
python-version: 3.6.13
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install . --upgrade
pip install psutil
pip install hypothesis
pip install coverage
pip install codecov
pip install pytest
pip install pytest-xdist
pip install pytest-timeout
pip install timeout-decorator
python -m venv testenv
. testenv/bin/activate
python -m pip install --upgrade pip setuptools
python -m pip install --upgrade .
python -m pip install psutil
python -m pip install hypothesis
python -m pip install coverage
python -m pip install codecov
python -m pip install pytest
python -m pip install pytest-xdist
python -m pip install pytest-timeout
python -m pip install timeout-decorator
- name: Test with pytest
env:
RMQ_HOSTNAME: localhost
RMQ_PORT: ${{ job.services.rabbitmq.ports[5672] }} # get randomly assigned published port
RMQ_USERNAME: guest
RMQ_PASSWORD: guest
LOC: /opt/hostedtoolcache/Python/3.6.13/x64/lib/python3.6/site-packages
LOC: ./testenv/lib/python3.6/site-packages
run: |
coverage run --include=$LOC/radical/entk/* -m pytest -ra --timeout=600 -vvv --showlocals tests/test_component tests/test_utils/ tests/test_integration
source ./testenv/bin/activate
coverage run --include=$LOC/radical/entk/* -m pytest -ra --timeout=600 -vvv --showlocals tests/test_component tests/test_utils/ tests/test_integration
- name: Codecov
uses: codecov/codecov-action@v1.2.2
run: |
source ./testenv/bin/activate
coverage combine
coverage xml
coverage report
curl -s https://codecov.io/bash | bash


flake8:
Expand All @@ -62,7 +69,7 @@ jobs:
- name: Set up Python 3.6
uses: actions/setup-python@v2
with:
python-version: 3.6
python-version: 3.6.13
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand All @@ -87,7 +94,7 @@ jobs:
- name: Set up Python 3.6
uses: actions/setup-python@v2
with:
python-version: 3.6
python-version: 3.6.13
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
15 changes: 0 additions & 15 deletions src/radical/entk/execman/base/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,40 +277,25 @@ def _heartbeat(self):
# no usable response
self._log.error('Heartbeat response no body')
return
# raise EnTKError('heartbeat timeout')

if corr_id != props.correlation_id:
# incorrect response
self._log.error('Heartbeat response wrong correlation')
return
# raise EnTKError('heartbeat timeout')

self._log.info('Received heartbeat response')
mq_channel.basic_ack(delivery_tag=method_frame.delivery_tag)

# Appease pika cos it thinks the connection is dead
# mq_connection.close()

except EnTKError as ex:
# make sure that timeouts did not race with termination
if 'heartbeat timeout' not in str(ex):
raise EnTKError(ex) from ex

if not self._hb_terminate.is_set():
raise EnTKError(ex) from ex

# we did indeed race with termination - exit gracefully
return

except KeyboardInterrupt as ex:
self._log.exception('Execution interrupted by user (probably '
' hit Ctrl+C), cancel tmgr gracefully...')
raise KeyboardInterrupt from ex

except Exception as ex:
self._log.exception('Heartbeat failed with error: %s', ex)
raise EnTKError(ex) from ex

finally:
try:
mq_connection.close()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_component/test_tmgr_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def _tmgr_side_effect(amount):
time.sleep(amount)

tmgr._hb_thread = mt.Thread(target=_tmgr_side_effect,
name='test_tmgr', args=(1))
name='test_tmgr', args=(1,))
tmgr._hb_thread.start()

self.assertTrue(tmgr.check_heartbeat())
Expand Down
201 changes: 201 additions & 0 deletions tests/test_integration/test_tmgr_base/test_rmq_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# pylint: disable=protected-access, unused-argument
# pylint: disable=no-value-for-parameter
import os
import pika
import json
import time

from unittest import TestCase, mock
import threading as mt

import radical.utils as ru
from radical.entk import Task, Stage
from radical.entk.execman.base import Base_TaskManager as BaseTmgr


# ------------------------------------------------------------------------------
#
class TestTask(TestCase):

# --------------------------------------------------------------------------
#
@mock.patch.object(BaseTmgr, '__init__', return_value=None)
@mock.patch('radical.utils.Logger')
@mock.patch('radical.utils.Profiler')
def test_sync_with_master(self, mocked_init, mocked_Logger, mocked_Profiler):

# --------------------------------------------------------------------------
#
def component_execution(inputs, method, channel, conn_params, queue):

for obj_type, obj, in inputs:
method(obj, obj_type, channel, conn_params, queue)
return True


task = Task()
task.parent_stage = {'uid':'stage.0000', 'name': 'stage.0000'}
hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = int(os.environ.get('RMQ_PORT', '5672'))
username = os.environ.get('RMQ_USERNAME','guest')
password = os.environ.get('RMQ_PASSWORD','guest')
packets = [('Task', task)]
stage = Stage()
stage.parent_pipeline = {'uid':'pipe.0000', 'name': 'pipe.0000'}
packets.append(('Stage', stage))
credentials = pika.PlainCredentials(username, password)
rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port,
credentials=credentials)
mq_connection = pika.BlockingConnection(rmq_conn_params)
mq_channel = mq_connection.channel()
mq_channel.queue_declare(queue='master')
tmgr = BaseTmgr(None, None, None, None, None, None)
tmgr._log = mocked_Logger
tmgr._prof = mocked_Profiler

master_thread = mt.Thread(target=component_execution,
name='tmgr_sync',
args=(packets, tmgr._sync_with_master,
mq_channel, rmq_conn_params, 'master'))
master_thread.start()
time.sleep(0.1)
try:
while packets:
packet = packets.pop(0)
_, _, body = mq_channel.basic_get(queue='master')
msg = json.loads(body)
self.assertEqual(msg['object'], packet[1].to_dict())
self.assertEqual(msg['type'], packet[0])
except Exception as ex:
print(body)
print(json.loads(body))
master_thread.join()
mq_channel.queue_delete(queue='master')
mq_channel.close()
mq_connection.close()
raise ex
else:
master_thread.join()
mq_channel.queue_delete(queue='master')
mq_channel.close()
mq_connection.close()


# --------------------------------------------------------------------------
#
@mock.patch.object(BaseTmgr, '__init__', return_value=None)
@mock.patch('radical.utils.Profiler')
def test_heartbeat(self, mocked_init, mocked_Profiler):


hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = int(os.environ.get('RMQ_PORT', '5672'))
username = os.environ.get('RMQ_USERNAME','guest')
password = os.environ.get('RMQ_PASSWORD','guest')
credentials = pika.PlainCredentials(username, password)
rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port,
credentials=credentials)
mq_connection = pika.BlockingConnection(rmq_conn_params)
mq_channel = mq_connection.channel()
mq_channel.queue_declare(queue='heartbeat_rq')
mq_channel.queue_declare(queue='heartbeat_res')
tmgr = BaseTmgr(None, None, None, None, None, None)
tmgr._uid = 'tmgr.0000'
tmgr._rmq_conn_params = rmq_conn_params
tmgr._hb_request_q = 'heartbeat_rq'
tmgr._hb_response_q = 'heartbeat_res'
tmgr._hb_interval = 2
tmgr._hb_terminate = mt.Event()
tmgr._heartbeat_error = mt.Event()
tmgr._log = ru.Logger(name='radical.entk.taskmanager', ns='radical.entk')
tmgr._prof = mocked_Profiler

master_thread = mt.Thread(target=tmgr._heartbeat,
name='tmgr_heartbeat')
master_thread.start()
time.sleep(0.1)
i = 0
try:
while i < 20:
method_frame, props, body = mq_channel.basic_get(queue='heartbeat_rq')
i += 1
if body and body.decode('utf-8') == 'request':
nprops = pika.BasicProperties(correlation_id=props.correlation_id)
mq_channel.basic_publish(exchange='', routing_key='heartbeat_res',
properties=nprops, body='response')
mq_channel.basic_ack(delivery_tag=method_frame.delivery_tag)
self.assertTrue(master_thread.is_alive())
else:
self.assertTrue(master_thread.is_alive())
time.sleep(0.5)
except Exception as ex:
raise ex
finally:
tmgr._hb_terminate.set()
time.sleep(3)
master_thread.join()
mq_channel.queue_delete(queue='heartbeat_rq')
mq_channel.queue_delete(queue='heartbeat_res')
mq_channel.close()
mq_connection.close()


# --------------------------------------------------------------------------
#
@mock.patch.object(BaseTmgr, '__init__', return_value=None)
@mock.patch('radical.utils.Profiler')
def test_heartbeat_error(self, mocked_init, mocked_Profiler):


hostname = os.environ.get('RMQ_HOSTNAME', 'localhost')
port = int(os.environ.get('RMQ_PORT', '5672'))
username = os.environ.get('RMQ_USERNAME','guest')
password = os.environ.get('RMQ_PASSWORD','guest')
credentials = pika.PlainCredentials(username, password)
rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port,
credentials=credentials)
mq_connection = pika.BlockingConnection(rmq_conn_params)
mq_channel = mq_connection.channel()
mq_channel.queue_declare(queue='heartbeat_rq')
mq_channel.queue_declare(queue='heartbeat_res')
tmgr = BaseTmgr(None, None, None, None, None, None)
tmgr._uid = 'tmgr.0000'
tmgr._rmq_conn_params = rmq_conn_params
tmgr._hb_request_q = 'heartbeat_rq'
tmgr._hb_response_q = 'heartbeat_res'
tmgr._hb_interval = 2
tmgr._hb_terminate = mt.Event()
tmgr._log = ru.Logger(name='radical.entk.taskmanager', ns='radical.entk')
tmgr._prof = mocked_Profiler

master_thread = mt.Thread(target=tmgr._heartbeat,
name='tmgr_heartbeat')
master_thread.start()

body = None
while not body:
method_frame, _, body = mq_channel.basic_get(queue='heartbeat_rq')

time.sleep(3)
self.assertFalse(master_thread.is_alive())
master_thread.join()

master_thread = mt.Thread(target=tmgr._heartbeat,
name='tmgr_heartbeat')
master_thread.start()
body = None
while not body:
method_frame, _, body = mq_channel.basic_get(queue='heartbeat_rq')

nprops = pika.BasicProperties(correlation_id='wrong_id')
mq_channel.basic_publish(exchange='', routing_key='heartbeat_res',
properties=nprops, body='response')
mq_channel.basic_ack(delivery_tag=method_frame.delivery_tag)

time.sleep(3)
self.assertFalse(master_thread.is_alive())
master_thread.join()
mq_channel.queue_delete(queue='heartbeat_rq')
mq_channel.queue_delete(queue='heartbeat_res')
mq_channel.close()
mq_connection.close()