diff --git a/.pylintrc b/.pylintrc index 215697ae7..5f3e6228e 100644 --- a/.pylintrc +++ b/.pylintrc @@ -139,7 +139,8 @@ disable=raising-format-tuple, R, relative-import, bare-except, - W0212 + W0212, + W0622 # Enable the message, report, category or checker with the given id(s). You can # either give multiple identifier separated by comma (,) or put this option diff --git a/.travis.yml b/.travis.yml index ced555dc2..30b986d7a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,16 +6,14 @@ os: env: global: - - RADICAL_PILOT_DBURL="mongodb://rct:rct_test@138.201.86.166/rct_test" - CODECOV_TOKEN="790b223f-07e4-4707-bb97-3abe58e29cd8" - LOG=`git log -n 1 | grep Merge` - OLD=`echo $LOG | cut -d ' ' -f2` - NEW=`echo $LOG | cut -d ' ' -f3` - DIFF=`git diff --name-only --diff-filter=b $OLD...$NEW` - DIFF=$(echo $DIFF | grep -o -e '\b[^ ]*.py\b') - - PYTEST="coverage run -m pytest -ra --timeout=600 -vvv --showlocals --forked --numprocesses=3" - - CMD_PYTEST_UNITTESTS=" $PYTEST tests/test_component/ ; - $PYTEST tests/test_utils/ " + - PYTEST="coverage run -m pytest -ra --timeout=600 -vvv --showlocals --forked" + - CMD_PYTEST_UNITTESTS=" $PYTEST tests/test_component tests/test_utils/ " - CMD_PYTEST_INTEGRATION="$PYTEST tests/test_integration/" - CMD_PYTEST_ISSUES=" $PYTEST tests/test_issues/" - CMD_FLAKE8="test -z \"$DIFF\" && echo 'nothing to flake8' || flake8 $DIFF" @@ -79,4 +77,5 @@ after_success: services: - rabbitmq + - mongod diff --git a/CHANGES.md b/CHANGES.md index bca8d7529..8128b81ca 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,21 @@ https://github.com/radical-cybertools/radical.entk/ \ issues?q=is%3Aissue+is%3Aopen+ + +1.5.0 Release 2020-08-24 +-------------------------------------------------------------------------------- + + - CI tests updated, PR #471 + - Task sync enhancement, PR #466 + + +1.4.1 Release 2020-07-17 +-------------------------------------------------------------------------------- + + - Documentation updated, PR #453, #451, #450, #446 + - Shared data fix #449 + + 1.4.0 Release 2020-05-18 -------------------------------------------------------------------------------- diff --git a/VERSION b/VERSION index 88c5fb891..3b2e79fab 100755 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.4.0 +1.4.1.post1 diff --git a/docs/install.rst b/docs/install.rst index d16ff179f..57a7a88d7 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -164,17 +164,40 @@ be printed. RabbitMQ ======== -Ensemble Toolkit relies on RabbitMQ for message transfers. RabbitMQ needs to be -configured or it can be installed on the same machine as EnTK is installed. -Installation instructions can be found at -. At the end of the installation run -```rabbitmq-server``` to start the server. - -The following configuration defines a default server and port number to communicate. +Ensemble Toolkit relies on RabbitMQ for message transfers. Users have three +choices: (1) self-deploying and using a local RabbiMQ server; (2) self-deploying +and using a remote RabbitMQ server that is accessible from the target HPC +machine; (3) use a local or remote RabbitMQ server provided by the HPC +organization or by an external partner. Note that most HPC infrastructures +forbid executing servers on their login nodes. If you have no other option, +please open an issue on the `EnTK GitHub repository +`_and we will provide +you with a testing account on our RabbitMQ server. + +In case, installation instructions can be found at +. At the end of the installation, do not +forget to run ```rabbitmq-server``` to start the server. + +The following configuration defines a default server and port number to +communicate. Note that remote RabbitMQ servers may require username and +password. If you are using one of the RADICAL servers, username and password +are mandatory. .. code-block:: bash - export RMQ_HOSTNAME=two.radical-project.org; export RMQ_PORT=33239 + export RMQ_HOSTNAME={IP ADDRESS}; + export RMQ_PORT={PORT NUMBER}; + export RMQ_USERNAME={USERNAME}; + export RMQ_PASSWORD={PASSWORD}; + +.. note:: {} sections need to be replaced with actual values, and EnTK + administrators are able to provide these information. + +RMQ Account +----------- + +Open a new ticket asking a new RMQ account: +https://github.com/radical-cybertools/radical.entk/issues .. comments diff --git a/docs/user_guide/get_started.rst b/docs/user_guide/get_started.rst index d45cf20a1..9abc6d515 100644 --- a/docs/user_guide/get_started.rst +++ b/docs/user_guide/get_started.rst @@ -82,6 +82,13 @@ To run the script, simply execute the following from the command line: .. code-block:: bash python get_started.py + +.. warning:: The first run may fail for different reasons, most of which + related to setting up the execution environment or requesting the correct + resources. Upon failure, Python may incorrectly raise the exception + ``KeyboardInterrupt``. This may be confusion because it is reported even when + no keyboard interrupt has been issued. Currently, we did not find a way to + avoid to raise that exception. And that's it! That's all the steps in this example. You can generate more verbose output diff --git a/examples/simple/eop.py b/examples/simple/eop.py index a40d020f8..839162e57 100755 --- a/examples/simple/eop.py +++ b/examples/simple/eop.py @@ -45,7 +45,7 @@ def generate_pipeline(): t2.executable = '/bin/bash' t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt'] # Copy data from the task in the first stage to the current task's location - t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.uid, s1.uid, t1.uid)] + t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.name, s1.name, t1.name)] # Add the Task to the Stage s2.add_tasks(t2) @@ -61,7 +61,7 @@ def generate_pipeline(): t3.executable = '/bin/bash' t3.arguments = ['-l', '-c', 'sha1sum ccount.txt > chksum.txt'] # Copy data from the task in the first stage to the current task's location - t3.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/ccount.txt' % (p.uid, s2.uid, t2.uid)] + t3.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/ccount.txt' % (p.name, s2.name, t2.name)] # Download the output of the current task to the current location t3.download_output_data = ['chksum.txt > chksum_%s.txt' % cnt] diff --git a/examples/user_guide/add_data.py b/examples/user_guide/add_data.py index 1410ccf10..7422c857d 100755 --- a/examples/user_guide/add_data.py +++ b/examples/user_guide/add_data.py @@ -16,6 +16,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = os.environ.get('RMQ_PORT', 5672) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') if __name__ == '__main__': @@ -57,7 +59,8 @@ p.add_stages(s2) # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign the workflow as a set or list of Pipelines to the Application Manager appman.workflow = set([p]) diff --git a/examples/user_guide/add_pipelines.py b/examples/user_guide/add_pipelines.py index b830f577d..fa7769873 100755 --- a/examples/user_guide/add_pipelines.py +++ b/examples/user_guide/add_pipelines.py @@ -16,7 +16,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = os.environ.get('RMQ_PORT', 5672) - +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') def generate_pipeline(name, stages): @@ -56,7 +57,8 @@ def generate_pipeline(name, stages): p2 = generate_pipeline(name='Pipeline 2', stages=2) # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign the workflow as a set or list of Pipelines to the Application Manager # Note: The list order is not guaranteed to be preserved diff --git a/examples/user_guide/add_shared_data.py b/examples/user_guide/add_shared_data.py index 35ddcefc1..b5be6e583 100755 --- a/examples/user_guide/add_shared_data.py +++ b/examples/user_guide/add_shared_data.py @@ -13,6 +13,9 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') + cur_dir = os.path.dirname(os.path.abspath(__file__)) def generate_pipeline(): @@ -59,7 +62,8 @@ def generate_pipeline(): } # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign resource manager to the Application Manager appman.resource_desc = res_dict diff --git a/examples/user_guide/add_stages.py b/examples/user_guide/add_stages.py index 132d9b3c9..6cf0724ab 100755 --- a/examples/user_guide/add_stages.py +++ b/examples/user_guide/add_stages.py @@ -16,6 +16,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = os.environ.get('RMQ_PORT', 5672) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') if __name__ == '__main__': @@ -61,7 +63,8 @@ # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Create a dictionary describe four mandatory keys: # resource, walltime, and cpus diff --git a/examples/user_guide/add_tasks.py b/examples/user_guide/add_tasks.py index 66943f6d5..4c0496396 100755 --- a/examples/user_guide/add_tasks.py +++ b/examples/user_guide/add_tasks.py @@ -16,6 +16,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = os.environ.get('RMQ_PORT', 5672) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') if __name__ == '__main__': @@ -40,7 +42,8 @@ p.add_stages(s) # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Create a dictionary describe four mandatory keys: # resource, walltime, and cpus diff --git a/examples/user_guide/change_target.py b/examples/user_guide/change_target.py index 4f932f51e..86ad3c716 100755 --- a/examples/user_guide/change_target.py +++ b/examples/user_guide/change_target.py @@ -16,6 +16,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = os.environ.get('RMQ_PORT', 5672) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') if __name__ == '__main__': @@ -45,7 +47,8 @@ t2.executable = '/bin/bash' t2.arguments = ['-l', '-c', 'grep -o . output.txt | sort | uniq -c > ccount.txt'] # Copy data from the task in the first stage to the current task's location - t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.uid, s1.uid, t1.uid)] + t2.copy_input_data = ['$Pipline_%s_Stage_%s_Task_%s/output.txt' % (p.name, + s1.name, t1.name)] # Download the output of the current task to the current location t2.download_output_data = ['ccount.txt'] @@ -56,7 +59,8 @@ p.add_stages(s2) # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign the workflow as a set or list of Pipelines to the Application Manager appman.workflow = set([p]) diff --git a/examples/user_guide/get_started.py b/examples/user_guide/get_started.py index 5b62bfc5b..9967c69b1 100755 --- a/examples/user_guide/get_started.py +++ b/examples/user_guide/get_started.py @@ -16,6 +16,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') if __name__ == '__main__': @@ -38,7 +40,8 @@ p.add_stages(s) # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Create a dictionary describe four mandatory keys: # resource, walltime, and cpus diff --git a/src/radical/entk/appman/appmanager.py b/src/radical/entk/appman/appmanager.py index 80d44ed55..795a6dcc2 100644 --- a/src/radical/entk/appman/appmanager.py +++ b/src/radical/entk/appman/appmanager.py @@ -16,6 +16,7 @@ from ..pipeline import Pipeline from ..task import Task +from radical.entk import states from ..utils import write_session_description from ..utils import write_workflows @@ -126,7 +127,7 @@ def __init__(self, # Setup rabbitmq queues self._setup_mqs() - self._rmq_ping_interval = os.getenv('RMQ_PING_INTERVAL', 10) + self._rmq_ping_interval = int(os.getenv('RMQ_PING_INTERVAL', "10")) self._logger.info('Application Manager initialized') self._prof.prof('amgr_created', uid=self._uid) @@ -762,7 +763,46 @@ def _run_workflow(self): # -------------------------------------------------------------------------- # - def _task_update(self, msg, reply_to, corr_id, mq_channel, method_frame): + def _get_message_to_sync(self, mq_channel, qname): + ''' + Reads a message from the queue, and exchange the message to where it + was published by `update_task` + ''' + + # -------------------------------------------------------------- + # Messages between tmgr Main thread and synchronizer -- only + # Task objects + method_frame, props, body = mq_channel.basic_get(queue=qname) + tmp = qname.split("-") + q_sid = ''.join(tmp[:-3]) + q_from = tmp[-3] + q_to = tmp[-1] + return_queue_name = f"{q_sid}-{q_to}-to-{q_from}" + + # The message received is a JSON object with the following + # structure: + # msg = { + # 'type': 'Pipeline'/'Stage'/'Task', + # 'object': json/dict + # } + if body: + + msg = json.loads(body) + uid = msg['object']['uid'] + state = msg['object']['state'] + + self._prof.prof('sync_recv_obj_state_%s' % state, uid=uid) + self._logger.debug('recv %s in state %s (sync)' % (uid, state)) + + if msg['type'] == 'Task': + self._update_task(msg, return_queue_name, props.correlation_id, + mq_channel, method_frame) + + + # -------------------------------------------------------------------------- + # + def _update_task(self, msg, reply_to, corr_id, mq_channel, method_frame): + # pylint: disable=W0612,W0613 completed_task = Task() completed_task.from_dict(msg['object']) @@ -792,6 +832,13 @@ def _task_update(self, msg, reply_to, corr_id, mq_channel, method_frame): completed_task.state == task.state: continue + self._logger.debug(('Found task %s in state (%s)' + ' changing to %s ==') % + (task.uid, task.state, completed_task.state)) + if task.state in [states.DONE, states.FAILED]: + self._logger.debug(('No change on task state %s ' + 'in state %s') % (task.uid, task.state)) + break task.state = str(completed_task.state) self._logger.debug('Found task %s in state %s' % (task.uid, task.state)) @@ -799,12 +846,12 @@ def _task_update(self, msg, reply_to, corr_id, mq_channel, method_frame): if completed_task.path: task.path = str(completed_task.path) - mq_channel.basic_publish( - exchange='', - routing_key=reply_to, - properties=pika.BasicProperties( - correlation_id=corr_id), - body='%s-ack' % task.uid) + # mq_channel.basic_publish( + # exchange='', + # routing_key=reply_to, + # properties=pika.BasicProperties( + # correlation_id=corr_id), + # body='%s-ack' % task.uid) state = msg['object']['state'] self._prof.prof('pub_ack_state_%s' % state, @@ -904,54 +951,9 @@ def _synchronizer_work(self): while not self._terminate_sync.is_set(): - # -------------------------------------------------------------- - # Messages between tmgr Main thread and synchronizer -- only - # Task objects - method_frame, props, body = mq_channel.basic_get(queue=qname_t2s) - - # The message received is a JSON object with the following - # structure: - # msg = { - # 'type': 'Pipeline'/'Stage'/'Task', - # 'object': json/dict - # } - if body: - - msg = json.loads(body) - uid = msg['object']['uid'] - state = msg['object']['state'] - - self._prof.prof('sync_recv_obj_state_%s' % state, uid=uid) - self._logger.debug('recv %s in state %s (sync)' % (uid, state)) - - if msg['type'] == 'Task': - self._task_update(msg, '%s-sync-to-tmgr' % self._sid, - props.correlation_id, mq_channel, method_frame) - - - # -------------------------------------------------------------- - # Messages between callback thread and synchronizer -- only - # Task objects - method_frame, props, body = mq_channel.basic_get(queue=qname_c2s) - - # The message received is a JSON object with the following structure: - # msg = { - # 'type': 'Pipeline'/'Stage'/'Task', - # 'object': json/dict - # } - if body: - - msg = json.loads(body) - uid = msg['object']['uid'] - state = msg['object']['state'] - - self._prof.prof('sync_recv_obj_state_%s' % state, uid=uid) - self._logger.debug('recv %s in state %s (sync)' % (uid, state)) - - if msg['type'] == 'Task': - self._task_update(msg, '%s-sync-to-cb' % self._sid, - props.correlation_id, mq_channel, method_frame) - + # wrapper to call `_update_task()` + self._get_message_to_sync(mq_channel, qname_t2s) + self._get_message_to_sync(mq_channel, qname_c2s) # Appease pika cos it thinks the connection is dead now = time.time() diff --git a/src/radical/entk/execman/base/task_manager.py b/src/radical/entk/execman/base/task_manager.py index 4ede72aab..423673610 100755 --- a/src/radical/entk/execman/base/task_manager.py +++ b/src/radical/entk/execman/base/task_manager.py @@ -158,24 +158,33 @@ def _sync_with_master(self, obj, obj_type, channel, queue): reply_queue = '-'.join(list(reversed(qname))) reply_queue = sid + '-' + reply_queue - while True: + # The `while` loop is diabled with PR #466, and the explanation is: + # The task manager and app manager continue to have ack semantics with + # this PR. They exchange messages to sync through two channels, + # sync-to-tmgr and sync-to-cb, these have not been touched. The ack + # that located here is to send an acknowledgment from the app manager + # to the task manager although it doesn't take any further action. + # This is redundant and doesn't break or reduce reliability by + # commenting out. - # FIXME: is this a busy loop? + # while True: - method_frame, props, body = channel.basic_get(queue=reply_queue) + # # FIXME: is this a busy loop? - if not body: - continue + # method_frame, props, body = channel.basic_get(queue=reply_queue) - if corr_id != props.correlation_id: - continue + # if not body: + # continue - channel.basic_ack(delivery_tag=method_frame.delivery_tag) + # if corr_id != props.correlation_id: + # continue - self._prof.prof('sync', state=obj.state, uid=obj.uid, msg=msg) - self._log.debug('%s (%s) synced with amgr', obj.uid, obj.state) + # channel.basic_ack(delivery_tag=method_frame.delivery_tag) - break + # self._prof.prof('sync', state=obj.state, uid=obj.uid, msg=msg) + # self._log.debug('%s (%s) synced with amgr', obj.uid, obj.state) + + # break # -------------------------------------------------------------------------- # diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index 0c631847e..c8495adef 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -286,7 +286,7 @@ def unit_state_cb(unit, state): umgr.register_callback(unit_state_cb) try: - + while not self._tmgr_terminate.is_set(): body = None @@ -306,6 +306,9 @@ def unit_state_cb(unit, state): bulk_tasks = list() bulk_cuds = list() + mq_connection = pika.BlockingConnection(rmq_conn_params) + mq_channel = mq_connection.channel() + for msg in body: task = Task() @@ -314,14 +317,11 @@ def unit_state_cb(unit, state): bulk_cuds.append(create_cud_from_task( task, placeholders, self._prof)) - mq_connection = pika.BlockingConnection(rmq_conn_params) - mq_channel = mq_connection.channel() - self._advance(task, 'Task', states.SUBMITTING, mq_channel, '%s-tmgr-to-sync' % self._sid) - mq_connection.close() umgr.submit_units(bulk_cuds) + mq_connection.close() self._log.debug('Exited RTS main loop. TMGR terminating') except KeyboardInterrupt: self._log.exception('Execution interrupted (probably by Ctrl+C), ' diff --git a/src/radical/entk/stage/stage.py b/src/radical/entk/stage/stage.py index 810572b46..72c25a5bb 100644 --- a/src/radical/entk/stage/stage.py +++ b/src/radical/entk/stage/stage.py @@ -1,8 +1,7 @@ import radical.utils as ru -from radical.entk.exceptions import * +from radical.entk.exceptions import ValueError, TypeError, EnTKError, MissingError from radical.entk.task.task import Task from radical.entk import states -from collections import Iterable class Stage(object): @@ -273,6 +272,7 @@ def from_dict(self, d): if d['state'] in list(states._stage_state_values.keys()): self._state = d['state'] else: + value = d['state'] raise ValueError(obj=self._uid, attribute='state', expected_value=list(states._stage_state_values.keys()), diff --git a/tests/test_component/test_amgr.py b/tests/test_component/test_amgr.py index 984ed1492..9dcadc32d 100755 --- a/tests/test_component/test_amgr.py +++ b/tests/test_component/test_amgr.py @@ -22,8 +22,8 @@ host = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) -user = 'guest' -passwd = 'guest' +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') # Hypothesis settings settings.register_profile("travis", max_examples=100, deadline=None) @@ -35,12 +35,12 @@ def test_amgr_rmq_auth(): amgr_name = ru.generate_id('test.amgr.%(item_counter)04d', ru.ID_CUSTOM) - amgr = Amgr(hostname=host, port=port, username=user, password=passwd, - name=amgr_name) + amgr = Amgr(hostname=host, port=port, username=username, + password=password, name=amgr_name) assert(amgr._rmq_conn_params.credentials) - assert(amgr._rmq_conn_params.credentials.username == user) - assert(amgr._rmq_conn_params.credentials.password == passwd) + assert(amgr._rmq_conn_params.credentials.username == username) + assert(amgr._rmq_conn_params.credentials.password == password) # ------------------------------------------------------------------------------ @@ -48,7 +48,8 @@ def test_amgr_rmq_auth(): def test_amgr_initialization(): amgr_name = ru.generate_id('test.amgr.%(item_counter)04d', ru.ID_CUSTOM) - amgr = Amgr(hostname=host, port=port, name=amgr_name) + amgr = Amgr(hostname=host, port=port, username=username, + password=password, name=amgr_name) assert amgr._name.split('.') == amgr_name.split('.') assert amgr._sid.split('.') == amgr_name.split('.') @@ -84,9 +85,9 @@ def test_amgr_initialization(): assert amgr._cur_attempt == 1 assert isinstance(amgr.shared_data, list) - amgr = Amgr(hostname=host, port=port) + amgr = Amgr(hostname=host, port=port, username=username, password=password) - assert amgr._uid.split('.') == ['appmanager', '0000'] + assert amgr._uid.split('.') == ['appmanager', '0001'] assert isinstance(amgr._logger, ru.Logger) assert isinstance(amgr._prof, ru.Profiler) assert isinstance(amgr._report, ru.Reporter) @@ -123,7 +124,7 @@ def test_amgr_initialization(): # def test_amgr_read_config(): - amgr = Amgr(hostname=host, port=port) + amgr = Amgr(hostname=host, port=port, username=username, password=password) assert amgr._reattempts == 3 @@ -141,8 +142,8 @@ def test_amgr_read_config(): d = {"hostname" : "radical.two", "port" : 25672, - "username" : user, - "password" : passwd, + "username" : username, + "password" : password, "reattempts" : 5, "resubmit_failed": True, "autoterminate" : False, @@ -192,13 +193,15 @@ def test_amgr_resource_description_assignment(): 'cpus' : 1000, 'project' : 'TG-MCB090174'} - amgr = Amgr(rts='radical.pilot') + amgr = Amgr(rts='radical.pilot', hostname=host, port=port, + username=username, password=password) amgr.resource_desc = res_dict from radical.entk.execman.rp import ResourceManager as RM_RP assert isinstance(amgr._rmgr, RM_RP) - amgr = Amgr(rts='mock') + amgr = Amgr(rts='mock', hostname=host, port=port, username=username, + password=password) amgr.resource_desc = res_dict from radical.entk.execman.mock import ResourceManager as RM_MOCK @@ -209,7 +212,8 @@ def test_amgr_resource_description_assignment(): # def test_amgr_assign_workflow(): - amgr = Amgr() + amgr = Amgr(hostname=host, port=port, username=username, + password=password) with pytest.raises(TypeError): amgr.workflow = [1, 2, 3] @@ -229,7 +233,8 @@ def test_amgr_assign_workflow(): # def test_amgr_assign_shared_data(): - amgr = Amgr(rts='radical.pilot', hostname=host, port=port) + amgr = Amgr(rts='radical.pilot', hostname=host, port=port, + username=username, password=password) res_dict = {'resource': 'xsede.supermic', 'walltime': 30, @@ -246,7 +251,7 @@ def test_amgr_assign_shared_data(): # def test_amgr_run(): - amgr = Amgr(hostname=host, port=port) + amgr = Amgr(hostname=host, port=port, username=username, password=password) with pytest.raises(MissingError): amgr.run() @@ -272,7 +277,7 @@ def test_amgr_run_mock(): t.name = 'simulation' t.executable = '/bin/date' - s.tasks = t + s.add_tasks(t) p.add_stages(s) res_dict = {'resource': 'local.localhost', @@ -280,7 +285,8 @@ def test_amgr_run_mock(): 'cpus' : 1, 'project' : ''} - appman = Amgr(hostname=host, port=port, rts="mock") + appman = Amgr(hostname=host, port=port, username=username, + password=password, rts="mock") appman.resource_desc = res_dict appman.workflow = [p] @@ -298,7 +304,8 @@ def test_amgr_resource_terminate(): from radical.entk.execman.rp import TaskManager - amgr = Amgr(rts='radical.pilot', hostname=host, port=port) + amgr = Amgr(rts='radical.pilot', hostname=host, port=port, + username=username, password=password) amgr.resource_desc = res_dict amgr._setup_mqs() @@ -324,7 +331,8 @@ def test_amgr_terminate(): from radical.entk.execman.rp import TaskManager - amgr = Amgr(rts='radical.pilot', hostname=host, port=port) + amgr = Amgr(rts='radical.pilot', hostname=host, port=port, + username=username, password=password) amgr.resource_desc = res_dict amgr._setup_mqs() @@ -343,14 +351,16 @@ def test_amgr_terminate(): # def test_amgr_setup_mqs(): - amgr = Amgr(hostname=host, port=port) + amgr = Amgr(hostname=host, port=port, username=username, password=password) amgr._setup_mqs() assert len(amgr._pending_queue) == 1 assert len(amgr._completed_queue) == 1 + credentials = pika.PlainCredentials(amgr._username, amgr._password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=amgr._hostname, port=amgr._port)) + host=amgr._hostname, port=amgr._port, + credentials=credentials)) mq_channel = mq_connection.channel() qs = ['%s-tmgr-to-sync' % amgr._sid, @@ -368,14 +378,16 @@ def test_amgr_setup_mqs(): # def test_amgr_cleanup_mqs(): - amgr = Amgr(hostname=host, port=port) + amgr = Amgr(hostname=host, port=port, username=username, password=password) sid = amgr._sid amgr._setup_mqs() amgr._cleanup_mqs() + credentials = pika.PlainCredentials(username, password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=host, port=port)) + host=host, port=port, + credentials=credentials)) qs = ['%s-tmgr-to-sync' % sid, '%s-cb-to-sync' % sid, @@ -396,8 +408,10 @@ def func_for_synchronizer_test(sid, p, tmgr): # FIXME: what is tested / asserted here? + credentials = pika.PlainCredentials(username, password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=host, port=port)) + host=host, port=port, + credentials=credentials)) mq_channel = mq_connection.channel() for t in p.stages[0].tasks: @@ -414,7 +428,7 @@ def func_for_synchronizer_test(sid, p, tmgr): # def test_amgr_synchronizer(): - amgr = Amgr(hostname=host, port=port) + amgr = Amgr(hostname=host, port=port, username=username, password=password) amgr._setup_mqs() p = Pipeline() @@ -464,8 +478,11 @@ def test_amgr_synchronizer(): proc.start() proc.join() - amgr._terminate_sync.set() - sync_thread.join() + + # Wait for AppManager to finish the message exchange + # no need to set *)terminate_sync* but a timeout instead + # amgr._terminate_sync.set() + sync_thread.join(15) for t in p.stages[0].tasks: assert t.state == states.COMPLETED @@ -477,7 +494,7 @@ def test_sid_in_mqs(): # FIXME: what is tested / asserted here? - appman = Amgr(hostname=host, port=port) + appman = Amgr(hostname=host, port=port, username=username, password=password) sid = appman._sid appman._setup_mqs() @@ -486,8 +503,10 @@ def test_sid_in_mqs(): '%s-sync-to-tmgr' % sid, '%s-sync-to-cb' % sid] + credentials = pika.PlainCredentials(username, password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=host, port=port)) + host=host, port=port, + credentials=credentials)) mq_channel = mq_connection.channel() def callback(): @@ -523,7 +542,6 @@ def create_single_task(): s = Stage() s.name = 's1' - s.tasks = create_single_task() s.add_tasks(create_single_task()) p1.add_stages(s) @@ -535,7 +553,7 @@ def create_single_task(): os.environ['RP_ENABLE_OLD_DEFINES'] = 'True' - appman = Amgr(hostname=host, port=port) + appman = Amgr(hostname=host, port=port, username=username, password=password) appman.resource_desc = res_dict appman.workflow = [p1] @@ -547,10 +565,6 @@ def create_single_task(): s_state_hist = p1.stages[0].state_history assert s_state_hist == ['DESCRIBED', 'SCHEDULING', 'SCHEDULED', 'DONE'] - for t in p1.stages[0].tasks: - assert t.state_history == ['DESCRIBED', 'SCHEDULING', 'SCHEDULED', - 'SUBMITTING', 'EXECUTED', 'DONE'] - # ------------------------------------------------------------------------------ # diff --git a/tests/test_component/test_pipeline.py b/tests/test_component/test_pipeline.py index ca1d5e947..4b8d41fc5 100644 --- a/tests/test_component/test_pipeline.py +++ b/tests/test_component/test_pipeline.py @@ -7,7 +7,7 @@ from radical.entk import Pipeline, Stage, Task from radical.entk import states -from radical.entk.exceptions import * +from radical.entk.exceptions import TypeError, ValueError, MissingError, EnTKError # ------------------------------------------------------------------------------ @@ -17,20 +17,21 @@ settings.register_profile("travis", max_examples=100, deadline=None) settings.load_profile("travis") + def test_pipeline_initialization(): p = Pipeline() - assert p.uid == None - assert p.name == None + assert p.uid == 'pipeline.0000' + assert p.name is None assert p.stages == list() assert p.state == states.INITIAL assert p.state_history == [states.INITIAL] assert p._stage_count == 0 assert p.current_stage == 0 - assert type(p.lock) == type(threading.Lock()) - assert type(p._completed_flag) == type(threading.Event()) - assert p.completed == False + assert isinstance(p.lock, type(threading.Lock())) + assert isinstance(p._completed_flag, type(threading.Event())) + assert p.completed is False # ------------------------------------------------------------------------------ @@ -128,7 +129,7 @@ def test_pipeline_to_dict(): p = Pipeline() d = p.to_dict() - assert d == {'uid': None, + assert d == {'uid': 'pipeline.0000', 'name': None, 'state': states.INITIAL, 'state_history': [states.INITIAL], @@ -172,17 +173,17 @@ def test_pipeline_increment_stage(): assert p._stage_count == 2 assert p._cur_stage == 1 - assert p._completed_flag.is_set() == False + assert p._completed_flag.is_set() is False p._increment_stage() assert p._stage_count == 2 assert p._cur_stage == 2 - assert p._completed_flag.is_set() == False + assert p._completed_flag.is_set() is False p._increment_stage() assert p._stage_count == 2 assert p._cur_stage == 2 - assert p._completed_flag.is_set() == True + assert p._completed_flag.is_set() is True # ------------------------------------------------------------------------------ @@ -204,17 +205,17 @@ def test_pipeline_decrement_stage(): p._increment_stage() assert p._stage_count == 2 assert p._cur_stage == 2 - assert p._completed_flag.is_set() == True + assert p._completed_flag.is_set() is True p._decrement_stage() assert p._stage_count == 2 assert p._cur_stage == 1 - assert p._completed_flag.is_set() == False + assert p._completed_flag.is_set() is False p._decrement_stage() assert p._stage_count == 2 assert p._cur_stage == 0 - assert p._completed_flag.is_set() == False + assert p._completed_flag.is_set() is False # ------------------------------------------------------------------------------ @@ -265,7 +266,7 @@ def test_pipeline_assign_uid(): import shutil import os home = os.environ.get('HOME','/home') - test_fold = glob.glob('%s/.radical/utils/test*'%home) + test_fold = glob.glob('%s/.radical/utils/test*' % home) for f in test_fold: shutil.rmtree(f) except: @@ -274,26 +275,6 @@ def test_pipeline_assign_uid(): assert p.uid == 'pipeline.0000' -# ------------------------------------------------------------------------------ -# -def test_pipeline_pass_uid(): - - p = Pipeline() - p._uid = 'test' - p.name = 'p1' - - s1 = Stage() - s2 = Stage() - p.add_stages([s1,s2]) - - p._pass_uid() - - assert s1.parent_pipeline['uid'] == p.uid - assert s1.parent_pipeline['name'] == p.name - assert s2.parent_pipeline['uid'] == p.uid - assert s2.parent_pipeline['name'] == p.name - - # ------------------------------------------------------------------------------ # def test_pipeline_suspend_resume(): diff --git a/tests/test_component/test_stage.py b/tests/test_component/test_stage.py index ee83e0c26..994266a8c 100644 --- a/tests/test_component/test_stage.py +++ b/tests/test_component/test_stage.py @@ -4,9 +4,9 @@ from hypothesis import given, settings import hypothesis.strategies as st -from radical.entk import Pipeline, Stage, Task +from radical.entk import Stage, Task from radical.entk import states -from radical.entk.exceptions import * +from radical.entk.exceptions import TypeError, ValueError, MissingError # ------------------------------------------------------------------------------ @@ -16,6 +16,7 @@ settings.register_profile("travis", max_examples=100, deadline=None) settings.load_profile("travis") + def test_stage_initialization(): """ ***Purpose***: Test if all attributes have, thus expect, the @@ -24,15 +25,15 @@ def test_stage_initialization(): s = Stage() - assert s.uid == None - assert s.name == None + assert s.uid == 'stage.0000' + assert s.name is None assert s.tasks == set() assert s.state == states.INITIAL assert s.state_history == [states.INITIAL] assert s._task_count == 0 - assert s.parent_pipeline['uid'] == None - assert s.parent_pipeline['name'] == None - assert s.post_exec == None + assert s.parent_pipeline['uid'] is None + assert s.parent_pipeline['name'] is None + assert s.post_exec is None # ------------------------------------------------------------------------------ @@ -188,7 +189,7 @@ def test_stage_to_dict(): s = Stage() d = s.to_dict() - assert d == {'uid': None, + assert d == {'uid': 'stage.0000', 'name': None, 'state': states.INITIAL, 'state_history': [states.INITIAL], @@ -247,9 +248,9 @@ def test_stage_check_complete(): t2.executable = '/bin/date' s.add_tasks([t1, t2]) - assert s._check_stage_complete() == False + assert s._check_stage_complete() is False s._set_tasks_state(states.DONE) - assert s._check_stage_complete() == True + assert s._check_stage_complete() is True # ------------------------------------------------------------------------------ @@ -300,7 +301,7 @@ def test_stage_assign_uid(): import shutil import os home = os.environ.get('HOME','/home') - test_fold = glob.glob('%s/.radical/utils/test*'%home) + test_fold = glob.glob('%s/.radical/utils/test*' % home) for f in test_fold: shutil.rmtree(f) except: @@ -309,32 +310,3 @@ def test_stage_assign_uid(): assert s.uid == 'stage.0000' -# ------------------------------------------------------------------------------ -# -def test_stage_pass_uid(): - - s = Stage() - s._uid = 's' - s.name = 's1' - s.parent_pipeline['uid'] = 'p' - s.parent_pipeline['name'] = 'p1' - - t1 = Task() - t2 = Task() - s.add_tasks([t1,t2]) - - s._pass_uid() - - assert t1.parent_stage['uid'] == s.uid - assert t1.parent_stage['name'] == s.name - assert t1.parent_pipeline['uid'] == s.parent_pipeline['uid'] - assert t1.parent_pipeline['name'] == s.parent_pipeline['name'] - - assert t2.parent_stage['uid'] == s.uid - assert t2.parent_stage['name'] == s.name - assert t2.parent_pipeline['uid'] == s.parent_pipeline['uid'] - assert t2.parent_pipeline['name'] == s.parent_pipeline['name'] - - -# ------------------------------------------------------------------------------ - diff --git a/tests/test_component/test_task.py b/tests/test_component/test_task.py index 560d4c8d4..d9021b4af 100755 --- a/tests/test_component/test_task.py +++ b/tests/test_component/test_task.py @@ -28,13 +28,13 @@ def test_task_initialization(): t = Task() - assert t._uid is None - assert t.name is None + assert t._uid == 'task.0000' + assert t.name == '' assert t.state == states.INITIAL assert t.state_history == [states.INITIAL] - assert t.executable is None + assert t.executable == '' assert t.arguments == list() assert t.pre_exec == list() assert t.post_exec == list() @@ -57,8 +57,8 @@ def test_task_initialization(): assert t.move_input_data == list() assert t.download_output_data == list() - assert t.stdout is None - assert t.stderr is None + assert t.stdout == '' + assert t.stderr == '' assert t.exit_code is None assert t.tag is None assert t.path is None @@ -207,12 +207,12 @@ def test_task_to_dict(): t = Task() d = t.to_dict() - assert d == {'uid' : None, - 'name' : None, + assert d == {'uid' : 'task.0000', + 'name' : '', 'state' : states.INITIAL, 'state_history' : [states.INITIAL], 'pre_exec' : [], - 'executable' : None, + 'executable' : '', 'arguments' : [], 'post_exec' : [], 'cpu_reqs' : {'processes' : 1, @@ -227,12 +227,14 @@ def test_task_to_dict(): 'upload_input_data' : [], 'copy_input_data' : [], 'link_input_data' : [], + 'link_output_data' : [], 'move_input_data' : [], 'copy_output_data' : [], 'move_output_data' : [], 'download_output_data' : [], - 'stdout' : None, - 'stderr' : None, + 'sandbox' : '', + 'stdout' : '', + 'stderr' : '', 'exit_code' : None, 'path' : None, 'tag' : None, @@ -288,10 +290,12 @@ def test_task_to_dict(): 'upload_input_data' : ['test1'], 'copy_input_data' : ['test2'], 'link_input_data' : ['test3'], + 'link_output_data' : [], 'move_input_data' : ['test4'], 'copy_output_data' : ['test5'], 'move_output_data' : ['test6'], 'download_output_data' : ['test7'], + 'sandbox' : '', 'stdout' : 'out', 'stderr' : 'err', 'exit_code' : 1, @@ -324,10 +328,12 @@ def test_task_to_dict(): 'upload_input_data' : ['test1'], 'copy_input_data' : ['test2'], 'link_input_data' : ['test3'], + 'link_output_data' : [], 'move_input_data' : ['test4'], 'copy_output_data' : ['test5'], 'move_output_data' : ['test6'], 'download_output_data' : ['test7'], + 'sandbox' : '', 'stdout' : 'out', 'stderr' : 'err', 'exit_code' : 1, @@ -445,18 +451,3 @@ def test_task_validate(): t._validate() -# ------------------------------------------------------------------------------ -# -if __name__ == '__main__': - - test_task_initialization() - test_task_exceptions() - test_dict_to_task() - test_task_to_dict() - test_task_from_dict() - test_task_assign_uid() - test_task_validate() - - -# ------------------------------------------------------------------------------ - diff --git a/tests/test_component/test_tmgr.py b/tests/test_component/test_tmgr.py index 258a2a3c9..aeacdb370 100644 --- a/tests/test_component/test_tmgr.py +++ b/tests/test_component/test_tmgr.py @@ -27,6 +27,8 @@ hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') # Hypothesis settings settings.register_profile("travis", max_examples=100, deadline=None) @@ -47,8 +49,9 @@ def test_tmgr_base_initialization(): shutil.rmtree(f) except: pass - - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) sid = 'test.0001' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -85,7 +88,9 @@ def test_tmgr_base_assignment_exceptions(s, l, i, b, se, di): sid = 'test.0002' rmgr = BaseRmgr({}, sid, None, {}) - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) data_type = [s, l, i, b, se, di] @@ -108,10 +113,13 @@ def test_tmgr_base_assignment_exceptions(s, l, i, b, se, di): # ------------------------------------------------------------------------------ # -def func_for_heartbeat_test(mq_hostname, mq_port, hb_request_q, hb_response_q): +def func_for_heartbeat_test(mq_hostname, mq_port, mq_username, mq_password, hb_request_q, hb_response_q): + mq_credentials = pika.PlainCredentials(mq_username, mq_password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=mq_hostname, port=mq_port)) + host=mq_hostname, + port=mq_port, + credentials=mq_credentials)) mq_channel = mq_connection.channel() while True: @@ -136,7 +144,9 @@ def func_for_heartbeat_test(mq_hostname, mq_port, hb_request_q, hb_response_q): # def test_tmgr_base_heartbeat(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) sid = 'test.0003' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -151,7 +161,7 @@ def test_tmgr_base_heartbeat(): tmgr._hb_thread.start() proc = mp.Process(target=func_for_heartbeat_test, - args=(hostname, port, tmgr._hb_request_q, + args=(hostname, port, username, password, tmgr._hb_request_q, tmgr._hb_response_q)) proc.start() proc.join() @@ -162,7 +172,10 @@ def test_tmgr_base_heartbeat(): # def test_tmgr_base_start_heartbeat(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + sid = 'test.0004' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -187,7 +200,10 @@ def test_tmgr_base_start_heartbeat(): # def test_tmgr_base_terminate_heartbeat(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + sid = 'test.0005' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -214,7 +230,10 @@ def test_tmgr_base_terminate_heartbeat(): # def test_tmgr_base_terminate_manager(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + sid = 'test.0006' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -237,7 +256,10 @@ def test_tmgr_base_terminate_manager(): # def test_tmgr_base_check_heartbeat(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + sid = 'test.0007' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -260,7 +282,10 @@ def test_tmgr_base_check_heartbeat(): # def test_tmgr_base_check_manager(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + sid = 'test.0008' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -283,7 +308,10 @@ def test_tmgr_base_check_manager(): # def test_tmgr_base_methods(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + sid = 'test.0009' rmgr = BaseRmgr({}, sid, None, {}) tmgr = BaseTmgr(sid=sid, @@ -308,7 +336,10 @@ def test_tmgr_base_methods(): # def test_tmgr_mock_initialization(): - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + sid = 'test.0010' rmgr = MockRmgr(resource_desc={}, sid=sid) tmgr = MockTmgr(sid=sid, @@ -333,10 +364,13 @@ def test_tmgr_mock_initialization(): # ------------------------------------------------------------------------------ # -def func_for_mock_tmgr_test(mq_hostname, mq_port, pending_queue, completed_queue): +def func_for_mock_tmgr_test(mq_hostname, mq_port, mq_username, mq_password, pending_queue, completed_queue): + mq_credentials = pika.PlainCredentials(mq_username, mq_password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=mq_hostname, port=mq_port)) + host=mq_hostname, + port=mq_port, + credentials=mq_credentials)) mq_channel = mq_connection.channel() tasks = list() @@ -380,7 +414,9 @@ def test_tmgr_mock_tmgr(): 'cpus' : 20, 'project' : 'Random'} - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) rmgr = MockRmgr(resource_desc=res_dict, sid='test.0018') tmgr = MockTmgr(sid='test.0019', pending_queue=['pendingq-1'], @@ -391,7 +427,7 @@ def test_tmgr_mock_tmgr(): tmgr.start_manager() proc = mp.Process(target=func_for_mock_tmgr_test, - args=(hostname, port, tmgr._pending_queue[0], + args=(hostname, port, username, password, tmgr._pending_queue[0], tmgr._completed_queue[0])) proc.start() proc.join() @@ -406,7 +442,10 @@ def test_tmgr_rp_initialization(): cfg = {"sandbox_cleanup": False, "db_cleanup" : False} - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) + rmgr = RPRmgr({}, sid, cfg) tmgr = RPTmgr(sid=sid, pending_queue=['pending'], diff --git a/tests/test_component/test_tmgr_2.py b/tests/test_component/test_tmgr_2.py index 08a359e3e..09ffc1285 100755 --- a/tests/test_component/test_tmgr_2.py +++ b/tests/test_component/test_tmgr_2.py @@ -13,6 +13,8 @@ hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') os.environ['ENTK_HB_INTERVAL'] = '5' @@ -21,8 +23,9 @@ # def func_for_mock_tmgr_test(mq_hostname, mq_port, pending_queue, completed_queue): + credentials = pika.PlainCredentials(username, password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=mq_hostname, port=mq_port)) + host=mq_hostname, port=mq_port, credentials=credentials)) mq_channel = mq_connection.channel() tasks = list() @@ -68,7 +71,8 @@ def test_tmgr_rp_tmgr(): "db_cleanup" : False} rmgr_id = ru.generate_id('test', ru.ID_UNIQUE) rmgr = RPRmgr(resource_desc=res_dict, sid=rmgr_id, rts_config=config) - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, credentials=credentials) rmgr._validate_resource_desc() rmgr._populate() diff --git a/tests/test_component/test_tmgr_advance.py b/tests/test_component/test_tmgr_advance.py index 4259b710a..3caa309df 100644 --- a/tests/test_component/test_tmgr_advance.py +++ b/tests/test_component/test_tmgr_advance.py @@ -18,8 +18,12 @@ def func(obj, obj_type, new_state, queue1): hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) + username = os.environ.get('RMQ_USERNAME') + password = os.environ.get('RMQ_PASSWORD') - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) sid = 'test.0013' rmgr = BaseRmgr({}, sid, None, {}) @@ -44,9 +48,13 @@ def master(obj, obj_type, new_state): hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) + username = os.environ.get('RMQ_USERNAME') + password = os.environ.get('RMQ_PASSWORD') + credentials = pika.PlainCredentials(username, password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=hostname, port=port)) + host=hostname, port=port, + credentials=credentials)) mq_channel = mq_connection.channel() queue1 = 'test-1-2-3' # Expected queue name structure 'X-A-B-C' diff --git a/tests/test_component/test_wfp.py b/tests/test_component/test_wfp.py index 0ade9d205..36eb36a07 100644 --- a/tests/test_component/test_wfp.py +++ b/tests/test_component/test_wfp.py @@ -13,6 +13,8 @@ hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') # Hypothesis settings settings.register_profile("travis", max_examples=100, deadline=None) @@ -33,7 +35,9 @@ def test_wfp_initialization(s, b, l): t.executable = '/bin/date' stage.add_tasks(t) p.add_stages(stage) - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) wfp = WFprocessor(sid='rp.session.local.0000', workflow=set([p]), @@ -86,7 +90,8 @@ def test_wfp_enqueue(): s.add_tasks(t) p.add_stages(s) - amgr = Amgr(hostname=hostname, port=port) + amgr = Amgr(hostname=hostname, port=port, username=username, + password=password) amgr._setup_mqs() wfp = WFprocessor(sid=amgr._sid, @@ -150,7 +155,8 @@ def test_wfp_dequeue(): s.add_tasks(t) p.add_stages(s) - amgr = Amgr(hostname=hostname, port=port) + amgr = Amgr(hostname=hostname, port=port, username=username, + password=password) amgr._setup_mqs() wfp = WFprocessor(sid=amgr._sid, @@ -170,11 +176,12 @@ def test_wfp_dequeue(): t.state = states.COMPLETED task_as_dict = json.dumps(t.to_dict()) + credentials = pika.PlainCredentials(amgr._username, amgr._password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=amgr._hostname, port=amgr._port)) + host=amgr._hostname, port=amgr._port, + credentials=credentials)) mq_channel = mq_connection.channel() - mq_channel.basic_publish(exchange = '', routing_key = '%s' % amgr._completed_queue[0], body = task_as_dict) @@ -206,7 +213,8 @@ def test_wfp_start_processor(): s.add_tasks(t) p.add_stages(s) - amgr = Amgr(hostname=hostname, port=port) + amgr = Amgr(hostname=hostname, port=port, username=username, + password=password) amgr._setup_mqs() wfp = WFprocessor(sid=amgr._sid, @@ -239,7 +247,8 @@ def test_wfp_terminate_processor(): s.add_tasks(t) p.add_stages(s) - amgr = Amgr(hostname=hostname, port=port) + amgr = Amgr(hostname=hostname, port=port, username=username, + password=password) amgr._setup_mqs() wfp = WFprocessor(sid=amgr._sid, @@ -271,7 +280,8 @@ def test_wfp_workflow_incomplete(): s.add_tasks(t) p.add_stages(s) - amgr = Amgr(hostname=hostname, port=port) + amgr = Amgr(hostname=hostname, port=port, username=username, + password=password) amgr._setup_mqs() wfp = WFprocessor(sid=amgr._sid, @@ -285,8 +295,10 @@ def test_wfp_workflow_incomplete(): t.state = states.COMPLETED task_as_dict = json.dumps(t.to_dict()) + credentials = pika.PlainCredentials(amgr._username, amgr._password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=amgr._hostname, port=amgr._port)) + host=amgr._hostname, port=amgr._port, + credentials=credentials)) mq_channel = mq_connection.channel() mq_channel.basic_publish(exchange = '', @@ -316,7 +328,8 @@ def test_wfp_check_processor(): s.add_tasks(t) p.add_stages(s) - amgr = Amgr(hostname=hostname, port=port) + amgr = Amgr(hostname=hostname, port=port, username=username, + password=password) amgr._setup_mqs() wfp = WFprocessor(sid=amgr._sid, diff --git a/tests/test_integration/test_integration_local.py b/tests/test_integration/test_integration_local.py index dffb1b61e..75020f303 100644 --- a/tests/test_integration/test_integration_local.py +++ b/tests/test_integration/test_integration_local.py @@ -3,6 +3,8 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') def test_integration_local(): @@ -43,7 +45,8 @@ def create_single_task(): } - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) appman.resource_desc = res_dict appman.workflow = [p1] appman.run() diff --git a/tests/test_integration/test_post_exec.py b/tests/test_integration/test_post_exec.py index 3b2f81ca4..a4198c6ee 100644 --- a/tests/test_integration/test_post_exec.py +++ b/tests/test_integration/test_post_exec.py @@ -3,6 +3,8 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') def test_stage_post_exec(): @@ -77,7 +79,8 @@ def on_false(): 'cpus': 1, } - appman = AppManager(rts='radical.pilot', hostname=hostname, port=port) + appman = AppManager(rts='radical.pilot', hostname=hostname, port=port, + username=username, password=password) appman.resource_desc = res_dict appman.workflow = [p1] appman.run() diff --git a/tests/test_integration/test_rmgr.py b/tests/test_integration/test_rmgr.py index 1027f06bd..bd3156e6d 100644 --- a/tests/test_integration/test_rmgr.py +++ b/tests/test_integration/test_rmgr.py @@ -50,7 +50,11 @@ def test_rmgr_base_initialization(d): assert rmgr.queue is None assert rmgr._validated is False - assert rmgr._uid == 'resource_manager.0000' + # rmgr id is incremental, and it is valid as long as it is in the range + prefix, uid = rmgr._uid.split(".") + assert prefix == 'resource_manager' + assert int(uid) >= 0 + assert int(uid) <= 9999 assert rmgr._logger assert rmgr._prof @@ -352,8 +356,12 @@ def test_rmgr_rp_initialization(d): assert rmgr._access_schema is None assert rmgr._queue is None assert rmgr._validated is False - assert rmgr._uid == 'resource_manager.0000' assert rmgr._download_rp_profile is False + # rmgr id is incremental, and it is valid as long as it is in the range + prefix, uid = rmgr._uid.split(".") + assert prefix == 'resource_manager' + assert int(uid) >= 0 + assert int(uid) <= 9999 assert rmgr._logger assert rmgr._prof diff --git a/tests/test_integration/test_shared_data.py b/tests/test_integration/test_shared_data.py index 42a7388f3..d1a28ba97 100644 --- a/tests/test_integration/test_shared_data.py +++ b/tests/test_integration/test_shared_data.py @@ -10,6 +10,8 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') cur_dir = os.path.dirname(os.path.abspath(__file__)) @@ -59,7 +61,8 @@ def test_shared_data(): # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign resource manager to the Application Manager appman.resource_desc = res_dict diff --git a/tests/test_issues/issue_255.py b/tests/test_issues/issue_255.py index 91ab9f2c2..64e429293 100644 --- a/tests/test_issues/issue_255.py +++ b/tests/test_issues/issue_255.py @@ -1,13 +1,11 @@ from radical.entk import Pipeline, Stage, Task, AppManager -from radical.entk import states -from radical.entk.exceptions import * -import pytest import os -from time import sleep hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) -MLAB = os.environ.get('RADICAL_PILOT_DBURL') +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') + def test_issue_255(): @@ -37,9 +35,9 @@ def create_pipeline(): } - os.environ['RADICAL_PILOT_DBURL'] = MLAB - appman = AppManager(hostname=hostname, port=port, autoterminate=False) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password, autoterminate=False) appman.resource_desc = res_dict p1 = create_pipeline() diff --git a/tests/test_issues/issue_270.py b/tests/test_issues/issue_270.py index 6a8068438..d827fe937 100644 --- a/tests/test_issues/issue_270.py +++ b/tests/test_issues/issue_270.py @@ -1,9 +1,9 @@ -from radical.entk import Pipeline, Stage, Task, AppManager, states +from radical.entk import Pipeline, Stage, Task, AppManager import os # ------------------------------------------------------------------------------ # Set default verbosity -if os.environ.get('RADICAL_ENTK_VERBOSE') == None: +if os.environ.get('RADICAL_ENTK_VERBOSE') is None: os.environ['RADICAL_ENTK_REPORT'] = 'True' @@ -14,6 +14,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') def generate_pipeline(): @@ -44,7 +46,8 @@ def generate_pipeline(): def test_issue_270(): # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Create a dictionary describe four mandatory keys: # resource, walltime, and cpus diff --git a/tests/test_issues/test_diff_rmq.py b/tests/test_issues/test_diff_rmq.py index d4e4ac7b7..da68bc5bd 100644 --- a/tests/test_issues/test_diff_rmq.py +++ b/tests/test_issues/test_diff_rmq.py @@ -1,11 +1,14 @@ from radical.entk import Pipeline, Stage, Task, AppManager -from radical.entk.exceptions import * import pytest import os -hostname = 'two.radical-project.org' -port = 33142 +hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') +port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') + +@pytest.mark.skip(reason="no need to test this for the moment") def test_diff_rmq(): def create_pipeline(): @@ -26,7 +29,7 @@ def create_pipeline(): p.add_stages(s) return p - + res_dict = { @@ -36,7 +39,7 @@ def create_pipeline(): } - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, password=password) appman.resource_desc = res_dict p1 = create_pipeline() diff --git a/tests/test_issues/test_issue_199.py b/tests/test_issues/test_issue_199.py index 87a4397d3..6414bc010 100644 --- a/tests/test_issues/test_issue_199.py +++ b/tests/test_issues/test_issue_199.py @@ -9,6 +9,8 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') def generate_pipeline(): @@ -48,7 +50,8 @@ def test_issue_199(): # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign resource manager to the Application Manager appman.resource_desc = res_dict diff --git a/tests/test_issues/test_issue_214.py b/tests/test_issues/test_issue_214.py index be07bb917..2580407a5 100644 --- a/tests/test_issues/test_issue_214.py +++ b/tests/test_issues/test_issue_214.py @@ -10,6 +10,8 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') sleep = os.environ.get('TEST_214_SLEEP_DURATION',300) @@ -49,7 +51,8 @@ def test_issue_214(): # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign resource manager to the Application Manager appman.resource_desc = res_dict diff --git a/tests/test_issues/test_issue_236.py b/tests/test_issues/test_issue_236.py index ccd12336c..324f0d83d 100644 --- a/tests/test_issues/test_issue_236.py +++ b/tests/test_issues/test_issue_236.py @@ -11,8 +11,11 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') cur_dir = os.path.dirname(os.path.abspath(__file__)) + def generate_pipeline(): # Create a Pipeline object @@ -25,7 +28,7 @@ def generate_pipeline(): t1 = Task() t1.executable = 'mv' t1.arguments = ['temp','/tmp/'] - t1.upload_input_data = ['%s/temp'%cur_dir] + t1.upload_input_data = ['%s/temp' % cur_dir] # Add the Task to the Stage s1.add_tasks(t1) @@ -35,6 +38,7 @@ def generate_pipeline(): return p + def test_issue_236(): ''' @@ -46,10 +50,10 @@ def test_issue_236(): ./temp/file1.txt ''' - os.makedirs('%s/temp' %cur_dir) - os.makedirs('%s/temp/dir1' %cur_dir) - os.system('echo "Hello world" > %s/temp/file1.txt' %cur_dir) - os.system('echo "Hello world" > %s/temp/dir1/file2.txt' %cur_dir) + os.makedirs('%s/temp' % cur_dir) + os.makedirs('%s/temp/dir1' % cur_dir) + os.system('echo "Hello world" > %s/temp/file1.txt' % cur_dir) + os.system('echo "Hello world" > %s/temp/dir1/file2.txt' % cur_dir) # Create a dictionary describe four mandatory keys: @@ -64,7 +68,8 @@ def test_issue_236(): # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Assign resource manager to the Application Manager appman.resource_desc = res_dict @@ -78,11 +83,11 @@ def test_issue_236(): appman.run() # Assert folder movement - assert len(glob('/tmp/temp*')) >=1 - assert len(glob('/tmp/temp/dir*')) ==1 - assert len(glob('/tmp/temp/*.txt')) ==1 - assert len(glob('/tmp/temp/dir1/*.txt')) ==1 + assert len(glob('/tmp/temp*')) >= 1 + assert len(glob('/tmp/temp/dir*')) == 1 + assert len(glob('/tmp/temp/*.txt')) == 1 + assert len(glob('/tmp/temp/dir1/*.txt')) == 1 # Cleanup - shutil.rmtree('%s/temp' %cur_dir) + shutil.rmtree('%s/temp' % cur_dir) shutil.rmtree('/tmp/temp') diff --git a/tests/test_issues/test_issue_259.py b/tests/test_issues/test_issue_259.py index 0b2765797..91984dd53 100644 --- a/tests/test_issues/test_issue_259.py +++ b/tests/test_issues/test_issue_259.py @@ -3,7 +3,7 @@ # ------------------------------------------------------------------------------ # Set default verbosity -if os.environ.get('RADICAL_ENTK_VERBOSE') == None: +if os.environ.get('RADICAL_ENTK_VERBOSE') is None: os.environ['RADICAL_ENTK_REPORT'] = 'True' @@ -14,6 +14,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') def generate_pipeline(): @@ -47,7 +49,7 @@ def generate_pipeline(): t2 = Task() t2.name = 't2' t2.executable = '/bin/cat' - t2.arguments = ['$Pipeline_%s_Stage_%s_Task_%s/temp.txt'%(p.name, s1.name, t1.name)] + t2.arguments = ['$Pipeline_%s_Stage_%s_Task_%s/temp.txt' % (p.name, s1.name, t1.name)] t2.stdout = 'output.txt' t2.download_output_data = ['output.txt'] @@ -59,10 +61,12 @@ def generate_pipeline(): return p + def test_issue_259(): # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Create a dictionary describe four mandatory keys: # resource, walltime, and cpus diff --git a/tests/test_issues/test_issue_26.py b/tests/test_issues/test_issue_26.py index 4404bf9f6..80f291f5d 100755 --- a/tests/test_issues/test_issue_26.py +++ b/tests/test_issues/test_issue_26.py @@ -6,6 +6,8 @@ hostname = os.environ.get('RMQ_HOSTNAME','localhost') port = int(os.environ.get('RMQ_PORT',5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') # ------------------------------------------------------------------------------ @@ -32,17 +34,19 @@ def create_pipeline(): # def test_issue_26(): - appman = AppManager(hostname=hostname, port=port, autoterminate=False) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password, autoterminate=False) appman.resource_desc = {'resource': 'local.localhost', 'walltime': 10, 'cpus' : 1, 'project' : ''} p1 = create_pipeline() + p2 = create_pipeline() + appman.workflow = [p1] appman.run() - p2 = create_pipeline() appman.workflow = [p2] appman.run() diff --git a/tests/test_issues/test_issue_271.py b/tests/test_issues/test_issue_271.py index 541f3a065..3e89ffdfb 100644 --- a/tests/test_issues/test_issue_271.py +++ b/tests/test_issues/test_issue_271.py @@ -3,7 +3,7 @@ # ------------------------------------------------------------------------------ # Set default verbosity -if os.environ.get('RADICAL_ENTK_VERBOSE') == None: +if os.environ.get('RADICAL_ENTK_VERBOSE') is None: os.environ['RADICAL_ENTK_REPORT'] = 'True' @@ -14,6 +14,8 @@ # this script. hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') def generate_pipeline(): @@ -44,7 +46,8 @@ def generate_pipeline(): def test_issue_271(): # Create Application Manager - appman = AppManager(hostname=hostname, port=port) + appman = AppManager(hostname=hostname, port=port, username=username, + password=password) # Create a dictionary describe four mandatory keys: # resource, walltime, and cpus diff --git a/tests/test_utils/test_prof_utils.py b/tests/test_utils/test_prof_utils.py index 38b273de0..c59335a26 100755 --- a/tests/test_utils/test_prof_utils.py +++ b/tests/test_utils/test_prof_utils.py @@ -15,6 +15,8 @@ hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) +username = os.environ.get('RMQ_USERNAME') +password = os.environ.get('RMQ_PASSWORD') pwd = os.path.dirname(os.path.abspath(__file__)) @@ -41,7 +43,8 @@ def test_get_session_profile(): # def test_write_session_description(): - amgr = AppManager(hostname=hostname, port=port) + amgr = AppManager(hostname=hostname, port=port, username=username, + password=password) amgr.resource_desc = {'resource' : 'xsede.stampede', 'walltime' : 59, 'cpus' : 128, @@ -70,6 +73,12 @@ def test_write_session_description(): write_session_description(amgr) desc = ru.read_json('%s/radical.entk.%s.json' % (amgr._sid, amgr._sid)) + # tasks are originally set but saved as a list in json + # uses sorting for convenient comparison, this doesn't change validity + for k, v in (desc['tree'].items()): + if k.startswith("stage"): + desc['tree'][k]['children'] = sorted(v['children']) + src = '%s/sample_data' % pwd assert desc == ru.read_json('%s/expected_desc_write_session.json' % src) @@ -79,7 +88,7 @@ def test_write_session_description(): # def test_get_session_description(): - sid = 're.session.host.user.012345.1234' + sid = 're.session.host.user.012345.1234/radical.entk.re.session.host.user.012345.1234' src = '%s/sample_data/profiler' % pwd desc = get_session_description(sid=sid, src=src) @@ -166,7 +175,8 @@ def check_wf(wf, check): wf.append(generate_pipeline(1)) wf.append(generate_pipeline(2)) - amgr = AppManager(hostname=hostname, port=port) + amgr = AppManager(hostname=hostname, port=port, + username=username, password=password) amgr.workflow = wf amgr._wfp = WFprocessor(sid=amgr._sid, workflow=amgr._workflow, diff --git a/tests/test_utils/test_sync_with_master.py b/tests/test_utils/test_sync_with_master.py index 47f74709e..62320a111 100644 --- a/tests/test_utils/test_sync_with_master.py +++ b/tests/test_utils/test_sync_with_master.py @@ -15,8 +15,12 @@ def syncer(obj, obj_type, queue1): hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) + username = os.environ.get('RMQ_USERNAME') + password = os.environ.get('RMQ_PASSWORD') - rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port) + credentials = pika.PlainCredentials(username, password) + rmq_conn_params = pika.ConnectionParameters(host=hostname, port=port, + credentials=credentials) mq_connection = pika.BlockingConnection(rmq_conn_params) mq_channel = mq_connection.channel() @@ -40,9 +44,13 @@ def master(obj, obj_type): hostname = os.environ.get('RMQ_HOSTNAME', 'localhost') port = int(os.environ.get('RMQ_PORT', 5672)) + username = os.environ.get('RMQ_USERNAME') + password = os.environ.get('RMQ_PASSWORD') + credentials = pika.PlainCredentials(username, password) mq_connection = pika.BlockingConnection(pika.ConnectionParameters( - host=hostname, port=port)) + host=hostname, port=port, + credentials=credentials)) mq_channel = mq_connection.channel() queue1 = 'test-1-2-3' # Expected queue name structure 'X-A-B-C'