diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index 0beede1e..23d345bd 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -210,10 +210,12 @@ def get_job_logs(self, job_pod): logging.info("Grabbing pod {} logs ...".format(job_pod.metadata.name)) for container in container_statuses: if container.state.terminated: - container_log = current_k8s_corev1_api_client.read_namespaced_pod_log( - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - name=job_pod.metadata.name, - container=container.name, + container_log = ( + current_k8s_corev1_api_client.read_namespaced_pod_log( + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + name=job_pod.metadata.name, + container=container.name, + ) ) pod_logs += "{}: :\n {}\n".format(container.name, container_log) elif container.state.waiting: @@ -418,14 +420,6 @@ def __init__(self, app=None): self.job_manager_cls = COMPUTE_BACKENDS["slurmcern"]() super(__class__, self).__init__(thread_name="slurm_job_monitor") - def format_slurm_job_query(self, backend_job_ids): - """Format Slurm job query.""" - cmd = ( - "sacct --jobs {} --noheader --allocations --parsable " - "--format State,JobID".format(",".join(backend_job_ids)) - ) - return cmd - def watch_jobs(self, job_db, app=None): """Use SSH connection to slurm submitnode to monitor jobs. @@ -448,20 +442,19 @@ def watch_jobs(self, job_db, app=None): ): slurm_jobs[job_dict["backend_job_id"]] = id if not slurm_jobs.keys(): - logging.error("No slurm jobs") continue - slurm_query_cmd = self.format_slurm_job_query(slurm_jobs.keys()) - stdout = slurm_connection.exec_command(slurm_query_cmd) - for item in stdout.rstrip().split("\n"): - slurm_job_status = item.split("|")[0] - slurm_job_id = item.split("|")[1] + + for slurm_job_id, job_dict in slurm_jobs.items(): + slurm_job_status = slurm_connection.exec_command( + f"scontrol show job {slurm_job_id} -o | tr ' ' '\n' | grep JobState | cut -f2 -d '='" + ).rstrip() job_id = slurm_jobs[slurm_job_id] if slurm_job_status in slurmJobStatus["succeeded"]: self.job_manager_cls.get_outputs() job_db[job_id]["status"] = "succeeded" job_db[job_id]["deleted"] = True job_db[job_id]["log"] = self.job_manager_cls.get_logs( - backend_job_id=job_dict["backend_job_id"], + backend_job_id=slurm_job_id, workspace=job_db[job_id]["obj"].workflow_workspace, ) store_logs(logs=job_db[job_id]["log"], job_id=job_id) @@ -470,7 +463,7 @@ def watch_jobs(self, job_db, app=None): job_db[job_id]["status"] = "failed" job_db[job_id]["deleted"] = True job_db[job_id]["log"] = self.job_manager_cls.get_logs( - backend_job_id=job_dict["backend_job_id"], + backend_job_id=slurm_job_id, workspace=job_db[job_id]["obj"].workflow_workspace, ) store_logs(logs=job_db[job_id]["log"], job_id=job_id)