Skip to content

Commit

Permalink
expanded resolve_placeholders for multiple placeholders
Browse files Browse the repository at this point in the history
  • Loading branch information
Ioannis Paraskevakos committed Mar 10, 2021
1 parent b266c75 commit 91ad1f9
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 61 deletions.
101 changes: 52 additions & 49 deletions src/radical/entk/execman/rp/task_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,61 +37,64 @@ def resolve_placeholders(path, placeholders):
if '$' not in path:
return path

path_placeholders = []
# Extract placeholder from path
if len(path.split('>')) == 1:
placeholder = path.split('/')[0]
path_placeholders.append(path.split('/')[0])
else:
if path.split('>')[0].strip().startswith('$'):
placeholder = path.split('>')[0].strip().split('/')[0]
else:
placeholder = path.split('>')[1].strip().split('/')[0]

# SHARED
if placeholder == "$SHARED":
return path.replace(placeholder, 'pilot://')

# Expected placeholder format:
# $Pipeline_{pipeline.uid}_Stage_{stage.uid}_Task_{task.uid}

elems = placeholder.split('/')[0].split('_')

if not len(elems) == 6:

expected = '$Pipeline_(pipeline_name)_' \
'Stage_(stage_name)_' \
'Task_(task_name) or $SHARED',
raise ree.ValueError(obj='placeholder', attribute='task',
expected_value=expected, actual_value=elems)

pname = elems[1]
sname = elems[3]
tname = elems[5]
resolved = None

if pname in placeholders:
if sname in placeholders[pname]:
if tname in placeholders[pname][sname]:
resolved = path.replace(placeholder,
placeholders[pname][sname][tname]['path'])
path_placeholders.append(path.split('>')[0].strip().split('/')[0])
if path.split('>')[1].strip().startswith('$'):
path_placeholders.append(path.split('>')[1].strip().split('/')[0])

resolved = path

for placeholder in path_placeholders:

# SHARED
if placeholder == "$SHARED":
resolved = resolved.replace(placeholder, 'pilot://')
continue

# Expected placeholder format:
# $Pipeline_{pipeline.uid}_Stage_{stage.uid}_Task_{task.uid}
elems = placeholder.split('/')[0].split('_')
if not len(elems) == 6:

expected = '$Pipeline_(pipeline_name)_' \
'Stage_(stage_name)_' \
'Task_(task_name) or $SHARED',
raise ree.ValueError(obj='placeholder', attribute='task',
expected_value=expected, actual_value=elems)

pname = elems[1]
sname = elems[3]
tname = elems[5]

if pname in placeholders:
if sname in placeholders[pname]:
if tname in placeholders[pname][sname]:
resolved = resolved.replace(placeholder,
placeholders[pname][sname][tname]['path'])
else:
logger.warning('%s not assigned to any task in Stage %s Pipeline %s' %
(tname, sname, pname))
else:
logger.warning('%s not assigned to any task in Stage %s Pipeline %s' %
(tname, sname, pname))
logger.warning('%s not assigned to any Stage in Pipeline %s' % (
sname, pname))
else:
logger.warning('%s not assigned to any Stage in Pipeline %s' % (
sname, pname))
else:
logger.warning('%s not assigned to any Pipeline' % (pname))

if not resolved:
logger.warning('No placeholder could be found for task name %s \
stage name %s and pipeline name %s. Please be sure to \
use object names and not uids in your references,i.e, \
$Pipeline_(pipeline_name)_Stage_(stage_name)_Task_(task_name)')
expected = '$Pipeline_(pipeline_name)_' \
'Stage_(stage_name)_' \
'Task_(task_name) or $SHARED'
raise ree.ValueError(obj='placeholder', attribute='task',
expected_value=expected, actual_value=elems)
logger.warning('%s not assigned to any Pipeline' % (pname))

if not resolved:
logger.warning('No placeholder could be found for task name %s \
stage name %s and pipeline name %s. Please be sure to \
use object names and not uids in your references,i.e, \
$Pipeline_(pipeline_name)_Stage_(stage_name)_Task_(task_name)')
expected = '$Pipeline_(pipeline_name)_' \
'Stage_(stage_name)_' \
'Task_(task_name) or $SHARED'
raise ree.ValueError(obj='placeholder', attribute='task',
expected_value=expected, actual_value=elems)

return resolved

Expand Down
39 changes: 27 additions & 12 deletions tests/test_component/test_tproc_rp.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,23 +316,38 @@ def test_get_input_list_from_task(self, mocked_Logger):
with self.assertRaises(ree.TypeError):
get_input_list_from_task(task, '')

task = mock.MagicMock(spec=radical.entk.Task)
task.link_input_data = ['test_file > $SHARED/test_file']
task.upload_input_data = ['test_file > $SHARED/test_file']
task.copy_input_data = ['test_file > $SHARED/test_file']
task.move_input_data = ['test_file > $SHARED/test_file']
test = get_input_list_from_task(task, {})
pipeline_name = 'p1'
stage_name = 's1'
t1_name = 't1'

input_list = [{'source': 'test_file',
'target': 'pilot:///test_file',
placeholders = {
pipeline_name: {
stage_name: {
t1_name: {
'path' : '/home/vivek/t1',
'rts_uid': 'unit.0002'
}
}
}
}

task = mock.MagicMock(spec=radical.entk.Task)
task.link_input_data = ['$SHARED/test_folder/test_file > test_folder/test_file']
task.upload_input_data = ['$SHARED/test_folder/test_file > test_file']
task.copy_input_data = ['$Pipeline_p1_Stage_s1_Task_t1/test_file > $SHARED/test_file']
task.move_input_data = ['test_file > test_file']
test = get_input_list_from_task(task, placeholders)

input_list = [{'source': 'pilot:///test_folder/test_file',
'target': 'test_folder/test_file',
'action': 'Link'},
{'source': 'test_file',
'target': 'pilot:///test_file'},
{'source': 'test_file',
{'source': 'pilot:///test_folder/test_file',
'target': 'test_file'},
{'source': '/home/vivek/t1/test_file',
'target': 'pilot:///test_file',
'action': 'Copy'},
{'source': 'test_file',
'target': 'pilot:///test_file',
'target': 'test_file',
'action': 'Move'}]

self.assertEqual(test[0], input_list[0])
Expand Down

0 comments on commit 91ad1f9

Please sign in to comment.