Skip to content

Commit

Permalink
slurm: reace condition when consulting job state
Browse files Browse the repository at this point in the history
* Because job IDs are reused in the Slurm DB when we consult the state
  of a job very quickly we get the state of an old job with the same
  ID which tipically is `COMPLETED`, causing REANA to think it is done
  and then try to find the output files (which do not exist because
  the actual job didn't finished).

  Technically speaking `sacct` queries `slurmdbd` but the job we submitted
  is still in memory only (`slurmctld`) and not yet committed to DB
  (`slurmdb`), where the old job ID lays (with its old information).

  This commit fixes the problem by using `scontrol show job` because it
  uses `slurmctld` directly and has the latest data (closes #272).
  • Loading branch information
Diego Rodriguez committed Oct 9, 2020
1 parent a8d47d2 commit e973f32
Showing 1 changed file with 13 additions and 20 deletions.
33 changes: 13 additions & 20 deletions reana_job_controller/job_monitor.py
Expand Up @@ -210,10 +210,12 @@ def get_job_logs(self, job_pod):
logging.info("Grabbing pod {} logs ...".format(job_pod.metadata.name))
for container in container_statuses:
if container.state.terminated:
container_log = current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=job_pod.metadata.name,
container=container.name,
container_log = (
current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=job_pod.metadata.name,
container=container.name,
)
)
pod_logs += "{}: :\n {}\n".format(container.name, container_log)
elif container.state.waiting:
Expand Down Expand Up @@ -418,14 +420,6 @@ def __init__(self, app=None):
self.job_manager_cls = COMPUTE_BACKENDS["slurmcern"]()
super(__class__, self).__init__(thread_name="slurm_job_monitor")

def format_slurm_job_query(self, backend_job_ids):
"""Format Slurm job query."""
cmd = (
"sacct --jobs {} --noheader --allocations --parsable "
"--format State,JobID".format(",".join(backend_job_ids))
)
return cmd

def watch_jobs(self, job_db, app=None):
"""Use SSH connection to slurm submitnode to monitor jobs.
Expand All @@ -448,20 +442,19 @@ def watch_jobs(self, job_db, app=None):
):
slurm_jobs[job_dict["backend_job_id"]] = id
if not slurm_jobs.keys():
logging.error("No slurm jobs")
continue
slurm_query_cmd = self.format_slurm_job_query(slurm_jobs.keys())
stdout = slurm_connection.exec_command(slurm_query_cmd)
for item in stdout.rstrip().split("\n"):
slurm_job_status = item.split("|")[0]
slurm_job_id = item.split("|")[1]

for slurm_job_id, job_dict in slurm_jobs.items():
slurm_job_status = slurm_connection.exec_command(
f"scontrol show job {slurm_job_id} -o | tr ' ' '\n' | grep JobState | cut -f2 -d '='"
).rstrip()
job_id = slurm_jobs[slurm_job_id]
if slurm_job_status in slurmJobStatus["succeeded"]:
self.job_manager_cls.get_outputs()
job_db[job_id]["status"] = "succeeded"
job_db[job_id]["deleted"] = True
job_db[job_id]["log"] = self.job_manager_cls.get_logs(
backend_job_id=job_dict["backend_job_id"],
backend_job_id=slurm_job_id,
workspace=job_db[job_id]["obj"].workflow_workspace,
)
store_logs(logs=job_db[job_id]["log"], job_id=job_id)
Expand All @@ -470,7 +463,7 @@ def watch_jobs(self, job_db, app=None):
job_db[job_id]["status"] = "failed"
job_db[job_id]["deleted"] = True
job_db[job_id]["log"] = self.job_manager_cls.get_logs(
backend_job_id=job_dict["backend_job_id"],
backend_job_id=slurm_job_id,
workspace=job_db[job_id]["obj"].workflow_workspace,
)
store_logs(logs=job_db[job_id]["log"], job_id=job_id)
Expand Down

0 comments on commit e973f32

Please sign in to comment.