From 91ad1f91cee77e9245a321ac10c605802bf1e008 Mon Sep 17 00:00:00 2001 From: Ioannis Paraskevakos Date: Wed, 10 Mar 2021 12:28:04 -0500 Subject: [PATCH] expanded resolve_placeholders for multiple placeholders --- src/radical/entk/execman/rp/task_processor.py | 101 +++++++++--------- tests/test_component/test_tproc_rp.py | 39 ++++--- 2 files changed, 79 insertions(+), 61 deletions(-) diff --git a/src/radical/entk/execman/rp/task_processor.py b/src/radical/entk/execman/rp/task_processor.py index 2c89cc5c..f601564d 100644 --- a/src/radical/entk/execman/rp/task_processor.py +++ b/src/radical/entk/execman/rp/task_processor.py @@ -37,61 +37,64 @@ def resolve_placeholders(path, placeholders): if '$' not in path: return path + path_placeholders = [] # Extract placeholder from path if len(path.split('>')) == 1: - placeholder = path.split('/')[0] + path_placeholders.append(path.split('/')[0]) else: if path.split('>')[0].strip().startswith('$'): - placeholder = path.split('>')[0].strip().split('/')[0] - else: - placeholder = path.split('>')[1].strip().split('/')[0] - - # SHARED - if placeholder == "$SHARED": - return path.replace(placeholder, 'pilot://') - - # Expected placeholder format: - # $Pipeline_{pipeline.uid}_Stage_{stage.uid}_Task_{task.uid} - - elems = placeholder.split('/')[0].split('_') - - if not len(elems) == 6: - - expected = '$Pipeline_(pipeline_name)_' \ - 'Stage_(stage_name)_' \ - 'Task_(task_name) or $SHARED', - raise ree.ValueError(obj='placeholder', attribute='task', - expected_value=expected, actual_value=elems) - - pname = elems[1] - sname = elems[3] - tname = elems[5] - resolved = None - - if pname in placeholders: - if sname in placeholders[pname]: - if tname in placeholders[pname][sname]: - resolved = path.replace(placeholder, - placeholders[pname][sname][tname]['path']) + path_placeholders.append(path.split('>')[0].strip().split('/')[0]) + if path.split('>')[1].strip().startswith('$'): + path_placeholders.append(path.split('>')[1].strip().split('/')[0]) + + resolved = path + + for placeholder in path_placeholders: + + # SHARED + if placeholder == "$SHARED": + resolved = resolved.replace(placeholder, 'pilot://') + continue + + # Expected placeholder format: + # $Pipeline_{pipeline.uid}_Stage_{stage.uid}_Task_{task.uid} + elems = placeholder.split('/')[0].split('_') + if not len(elems) == 6: + + expected = '$Pipeline_(pipeline_name)_' \ + 'Stage_(stage_name)_' \ + 'Task_(task_name) or $SHARED', + raise ree.ValueError(obj='placeholder', attribute='task', + expected_value=expected, actual_value=elems) + + pname = elems[1] + sname = elems[3] + tname = elems[5] + + if pname in placeholders: + if sname in placeholders[pname]: + if tname in placeholders[pname][sname]: + resolved = resolved.replace(placeholder, + placeholders[pname][sname][tname]['path']) + else: + logger.warning('%s not assigned to any task in Stage %s Pipeline %s' % + (tname, sname, pname)) else: - logger.warning('%s not assigned to any task in Stage %s Pipeline %s' % - (tname, sname, pname)) + logger.warning('%s not assigned to any Stage in Pipeline %s' % ( + sname, pname)) else: - logger.warning('%s not assigned to any Stage in Pipeline %s' % ( - sname, pname)) - else: - logger.warning('%s not assigned to any Pipeline' % (pname)) - - if not resolved: - logger.warning('No placeholder could be found for task name %s \ - stage name %s and pipeline name %s. Please be sure to \ - use object names and not uids in your references,i.e, \ - $Pipeline_(pipeline_name)_Stage_(stage_name)_Task_(task_name)') - expected = '$Pipeline_(pipeline_name)_' \ - 'Stage_(stage_name)_' \ - 'Task_(task_name) or $SHARED' - raise ree.ValueError(obj='placeholder', attribute='task', - expected_value=expected, actual_value=elems) + logger.warning('%s not assigned to any Pipeline' % (pname)) + + if not resolved: + logger.warning('No placeholder could be found for task name %s \ + stage name %s and pipeline name %s. Please be sure to \ + use object names and not uids in your references,i.e, \ + $Pipeline_(pipeline_name)_Stage_(stage_name)_Task_(task_name)') + expected = '$Pipeline_(pipeline_name)_' \ + 'Stage_(stage_name)_' \ + 'Task_(task_name) or $SHARED' + raise ree.ValueError(obj='placeholder', attribute='task', + expected_value=expected, actual_value=elems) return resolved diff --git a/tests/test_component/test_tproc_rp.py b/tests/test_component/test_tproc_rp.py index 6ea3ccfd..68f65455 100755 --- a/tests/test_component/test_tproc_rp.py +++ b/tests/test_component/test_tproc_rp.py @@ -316,23 +316,38 @@ def test_get_input_list_from_task(self, mocked_Logger): with self.assertRaises(ree.TypeError): get_input_list_from_task(task, '') - task = mock.MagicMock(spec=radical.entk.Task) - task.link_input_data = ['test_file > $SHARED/test_file'] - task.upload_input_data = ['test_file > $SHARED/test_file'] - task.copy_input_data = ['test_file > $SHARED/test_file'] - task.move_input_data = ['test_file > $SHARED/test_file'] - test = get_input_list_from_task(task, {}) + pipeline_name = 'p1' + stage_name = 's1' + t1_name = 't1' - input_list = [{'source': 'test_file', - 'target': 'pilot:///test_file', + placeholders = { + pipeline_name: { + stage_name: { + t1_name: { + 'path' : '/home/vivek/t1', + 'rts_uid': 'unit.0002' + } + } + } + } + + task = mock.MagicMock(spec=radical.entk.Task) + task.link_input_data = ['$SHARED/test_folder/test_file > test_folder/test_file'] + task.upload_input_data = ['$SHARED/test_folder/test_file > test_file'] + task.copy_input_data = ['$Pipeline_p1_Stage_s1_Task_t1/test_file > $SHARED/test_file'] + task.move_input_data = ['test_file > test_file'] + test = get_input_list_from_task(task, placeholders) + + input_list = [{'source': 'pilot:///test_folder/test_file', + 'target': 'test_folder/test_file', 'action': 'Link'}, - {'source': 'test_file', - 'target': 'pilot:///test_file'}, - {'source': 'test_file', + {'source': 'pilot:///test_folder/test_file', + 'target': 'test_file'}, + {'source': '/home/vivek/t1/test_file', 'target': 'pilot:///test_file', 'action': 'Copy'}, {'source': 'test_file', - 'target': 'pilot:///test_file', + 'target': 'test_file', 'action': 'Move'}] self.assertEqual(test[0], input_list[0])