Skip to content

Commit

Permalink
htcondor: fixes bash characters escaping & not found job log files
Browse files Browse the repository at this point in the history
* Refactors SLURM job mintor &
  Closes reanahub/reana-demo-root6-roofit#36
  Closes reanahub/reana-client#343
  • Loading branch information
Rokas Maciulaitis committed Jan 9, 2020
1 parent 3238032 commit 8829be1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 28 deletions.
7 changes: 5 additions & 2 deletions etc/job_wrapper.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
# temporary wrapper file.
tmpjob=$(mktemp -p .)
chmod +x $tmpjob
echo "command to execute:" $@
echo "$@" > $tmpjob
echo "command to execute:"
# $@ input is base64 encoded string of command to execute
eval $@

echo "$@" "|bash" > $tmpjob
bash $tmpjob
res=$?
rm $tmpjob
Expand Down
33 changes: 14 additions & 19 deletions reana_job_controller/htcondorcern_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,12 @@ def execute(self):
clusterid = future.result()
return clusterid

def _replace_absolute_paths_with_relative(self, base_64_enconded_cmd):
def _replace_absolute_paths_with_relative(self, cmd):
"""Replace absolute with relative path."""
relative_paths_command = None
decoded_cmd = \
base64.b64decode(base_64_enconded_cmd).decode('utf-8')
if self.workflow_workspace in decoded_cmd:
decoded_cmd = \
decoded_cmd.replace(self.workflow_workspace + '/', '')
if self.workflow_workspace in cmd:
relative_paths_command = \
base64.b64encode(
decoded_cmd.encode('utf-8')).decode('utf-8')
cmd.replace(self.workflow_workspace + '/', '')
return relative_paths_command

def _format_arguments(self):
Expand All @@ -133,24 +128,24 @@ def _format_arguments(self):
--outputfile \"results/greetings.txt\" --sleeptime 0
"""
if self.workflow.type_ == 'serial':
arguments = re.sub(r'"', '\\"', " ".join(self.cmd[2].split()[3:]))
base_cmd = " ".join(self.cmd[2].split()[3:])
elif self.workflow.type_ == 'cwl':
arguments = self.cmd[2].replace(self.workflow_workspace,
'$_CONDOR_JOB_IWD')
base_cmd = self.cmd[2].replace(self.workflow_workspace,
'$_CONDOR_JOB_IWD')
elif self.workflow.type_ == 'yadage':
if 'base64' in ' '.join(self.cmd):
base_64_encoded_cmd = self.cmd[2].split('|')[0].split()[1]
base_64_encoded_cmd = \
decoded_cmd = \
base64.b64decode(base_64_encoded_cmd).decode('utf-8')
base_cmd = \
self._replace_absolute_paths_with_relative(
base_64_encoded_cmd) or base_64_encoded_cmd
arguments = \
'echo {}|base64 -d|bash'.format(base_64_encoded_cmd)
decoded_cmd) or decoded_cmd
else:
if self.workflow_workspace in self.cmd[2]:
arguments = \
self.cmd[2].replace(self.workflow_workspace + '/', '')
arguments = re.sub(r'"', '\"', arguments)
return "{}".format(arguments)
base_cmd = self._replace_absolute_paths_with_relative(
self.cmd[2]) or self.cmd[2]
return 'echo {}|base64 -d'.format(
base64.b64encode(base_cmd.encode('utf-8')).decode('utf-8'))

def _format_env_vars(self):
"""Return job env vars in job description format."""
Expand Down
20 changes: 13 additions & 7 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ def watch_jobs(self, job_db, app):
'failed'.format(job_id,
condor_job['ClusterId']))
job_db[job_id]['status'] = 'failed'
job_db[job_id]['log'] = \
HTCondorJobManagerCERN.get_logs(
backend_job_id=job_dict['backend_job_id'],
workspace=job_db[
job_id]['obj'].workflow_workspace)
job_logs = app.htcondor_executor.submit(
HTCondorJobManagerCERN.get_logs,
job_dict['backend_job_id'],
job_db[job_id]['obj'].workflow_workspace)
job_db[job_id]['log'] = job_logs.result()
store_logs(logs=job_db[job_id]['log'], job_id=job_id)

job_db[job_id]['deleted'] = True
elif (condor_job['JobStatus'] ==
condorJobStatus['Held'] and
Expand Down Expand Up @@ -324,11 +325,16 @@ def watch_jobs(self, job_db, app=None):
SlurmJobManagerCERN.get_outputs()
job_db[job_id]['status'] = 'succeeded'
job_db[job_id]['deleted'] = True
job_db[job_id]['log'] = \
SlurmJobManagerCERN.get_logs(
backend_job_id=job_dict['backend_job_id'],
workspace=job_db[
job_id]['obj'].workflow_workspace)
store_logs(logs=job_db[job_id]['log'], job_id=job_id)
if slurm_job_status in slurmJobStatus['failed']:
SlurmJobManagerCERN.get_outputs()
job_db[job_id]['status'] = 'failed'
job_db[job_id]['deleted'] = True
if slurm_job_status in slurmJobStatus['failed'] or \
slurm_job_status in slurmJobStatus['succeeded']:
job_db[job_id]['log'] = \
SlurmJobManagerCERN.get_logs(
backend_job_id=job_dict['backend_job_id'],
Expand Down

0 comments on commit 8829be1

Please sign in to comment.