From e973f323db82a8299df5568af1e0c77d446ad1d3 Mon Sep 17 00:00:00 2001 From: Diego Rodriguez Date: Wed, 7 Oct 2020 17:50:13 +0200 Subject: [PATCH] slurm: reace condition when consulting job state * Because job IDs are reused in the Slurm DB when we consult the state of a job very quickly we get the state of an old job with the same ID which tipically is `COMPLETED`, causing REANA to think it is done and then try to find the output files (which do not exist because the actual job didn't finished). Technically speaking `sacct` queries `slurmdbd` but the job we submitted is still in memory only (`slurmctld`) and not yet committed to DB (`slurmdb`), where the old job ID lays (with its old information). This commit fixes the problem by using `scontrol show job` because it uses `slurmctld` directly and has the latest data (closes #272). --- reana_job_controller/job_monitor.py | 33 ++++++++++++----------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index 0beede1e..23d345bd 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -210,10 +210,12 @@ def get_job_logs(self, job_pod): logging.info("Grabbing pod {} logs ...".format(job_pod.metadata.name)) for container in container_statuses: if container.state.terminated: - container_log = current_k8s_corev1_api_client.read_namespaced_pod_log( - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - name=job_pod.metadata.name, - container=container.name, + container_log = ( + current_k8s_corev1_api_client.read_namespaced_pod_log( + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + name=job_pod.metadata.name, + container=container.name, + ) ) pod_logs += "{}: :\n {}\n".format(container.name, container_log) elif container.state.waiting: @@ -418,14 +420,6 @@ def __init__(self, app=None): self.job_manager_cls = COMPUTE_BACKENDS["slurmcern"]() super(__class__, self).__init__(thread_name="slurm_job_monitor") - def format_slurm_job_query(self, backend_job_ids): - """Format Slurm job query.""" - cmd = ( - "sacct --jobs {} --noheader --allocations --parsable " - "--format State,JobID".format(",".join(backend_job_ids)) - ) - return cmd - def watch_jobs(self, job_db, app=None): """Use SSH connection to slurm submitnode to monitor jobs. @@ -448,20 +442,19 @@ def watch_jobs(self, job_db, app=None): ): slurm_jobs[job_dict["backend_job_id"]] = id if not slurm_jobs.keys(): - logging.error("No slurm jobs") continue - slurm_query_cmd = self.format_slurm_job_query(slurm_jobs.keys()) - stdout = slurm_connection.exec_command(slurm_query_cmd) - for item in stdout.rstrip().split("\n"): - slurm_job_status = item.split("|")[0] - slurm_job_id = item.split("|")[1] + + for slurm_job_id, job_dict in slurm_jobs.items(): + slurm_job_status = slurm_connection.exec_command( + f"scontrol show job {slurm_job_id} -o | tr ' ' '\n' | grep JobState | cut -f2 -d '='" + ).rstrip() job_id = slurm_jobs[slurm_job_id] if slurm_job_status in slurmJobStatus["succeeded"]: self.job_manager_cls.get_outputs() job_db[job_id]["status"] = "succeeded" job_db[job_id]["deleted"] = True job_db[job_id]["log"] = self.job_manager_cls.get_logs( - backend_job_id=job_dict["backend_job_id"], + backend_job_id=slurm_job_id, workspace=job_db[job_id]["obj"].workflow_workspace, ) store_logs(logs=job_db[job_id]["log"], job_id=job_id) @@ -470,7 +463,7 @@ def watch_jobs(self, job_db, app=None): job_db[job_id]["status"] = "failed" job_db[job_id]["deleted"] = True job_db[job_id]["log"] = self.job_manager_cls.get_logs( - backend_job_id=job_dict["backend_job_id"], + backend_job_id=slurm_job_id, workspace=job_db[job_id]["obj"].workflow_workspace, ) store_logs(logs=job_db[job_id]["log"], job_id=job_id)