Skip to content

Commit

Permalink
executor: poll job-controller for job statuses
Browse files Browse the repository at this point in the history
- poll job-controller for job statuses instead of checking .jobfinished/.jobfailed files.

closes #33
  • Loading branch information
Vladyslav Moisieienkov committed Jan 10, 2022
1 parent 7269f5a commit 552b1e4
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 66 deletions.
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ The list of contributors in alphabetical order:
- `Marco Vidal <https://orcid.org/0000-0002-9363-4971>`_
- `Sinclert Perez <https://www.linkedin.com/in/sinclert>`_
- `Tibor Simko <https://orcid.org/0000-0001-7202-5803>`_
- `Vladyslav Moisieienkov <https://orcid.org/0000-0001-9717-0775>`_
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changes
=======

Version 0.8.1 (UNRELEASED)
---------------------------

- Adds polling job-controller to determine job statuses instead of checking files.

Version 0.8.0 (2021-11-22)
---------------------------

Expand Down
37 changes: 37 additions & 0 deletions reana_workflow_engine_snakemake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""REANA Workflow Engine Snakemake configuration."""

import os
from enum import Enum

MOUNT_CVMFS = os.getenv("REANA_MOUNT_CVMFS", "false")

Expand All @@ -20,3 +21,39 @@

SNAKEMAKE_MAX_PARALLEL_JOBS = 100
"""Snakemake maximum number of jobs that can run in parallel."""

POLL_JOBS_STATUS_SLEEP_IN_SECONDS = 10
"""Time to sleep between polling for job status."""


# defined in reana-db component, in reana_db/models.py file as JobStatus
class JobStatus(Enum):
"""Enumeration of job statuses.
Example:
JobStatus.started.name == "started" # True
"""

# FIXME: this state is not defined in reana-db but returned by r-job-controller
started = 6

created = 0
running = 1
finished = 2
failed = 3
stopped = 4
queued = 5


# defined in reana-db component, in reana_db/models.py file as RunStatus
class RunStatus(Enum):
"""Enumeration of possible run statuses of a workflow."""

created = 0
running = 1
finished = 2
failed = 3
deleted = 4
stopped = 5
queued = 6
pending = 7
151 changes: 85 additions & 66 deletions reana_workflow_engine_snakemake/executor.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021 CERN.
# Copyright (C) 2021, 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANA-Workflow-Engine-Snakemake executor."""

import os
import subprocess
import logging
import time
from collections import namedtuple
from typing import Callable

from bravado.exception import HTTPNotFound
from reana_commons.config import REANA_DEFAULT_SNAKEMAKE_ENV_IMAGE
from reana_commons.utils import build_progress_message
from snakemake import snakemake
Expand All @@ -26,20 +27,19 @@
LOGGING_MODULE,
MOUNT_CVMFS,
SNAKEMAKE_MAX_PARALLEL_JOBS,
POLL_JOBS_STATUS_SLEEP_IN_SECONDS,
JobStatus,
RunStatus,
)
from reana_workflow_engine_snakemake.utils import (
publish_job_submission,
publish_workflow_start,
)


log = logging.getLogger(LOGGING_MODULE)


REANAClusterJob = namedtuple(
"REANAClusterJob",
"job jobid callback error_callback jobscript jobfinished jobfailed",
)
REANAClusterJob = namedtuple("REANAClusterJob", "job callback error_callback")


class REANAClusterExecutor(GenericClusterExecutor):
Expand All @@ -54,16 +54,6 @@ def run(
):
"""Override GenericClusterExecutor run method."""
super()._run(job)
jobid = job.jobid

# Files needed for Snakemake (`GenericClusterExecutor._wait_for_jobs`)
# to check if a job finished successfully.
jobscript = self.get_jobscript(job)
jobfinished = os.path.join(self.tmpdir, "{}.jobfinished".format(jobid))
jobfailed = os.path.join(self.tmpdir, "{}.jobfailed".format(jobid))
self.write_jobscript(
job, jobscript, jobfinished=jobfinished, jobfailed=jobfailed
)

workflow_workspace = os.getenv("workflow_workspace", "default")
workflow_uuid = os.getenv("workflow_uuid", "default")
Expand All @@ -79,7 +69,7 @@ def run(
job_request_body = {
"workflow_uuid": workflow_uuid,
"image": container_image,
"cmd": f"cd {workflow_workspace} && {job.shellcmd} && touch {jobfinished} || (touch {jobfailed}; exit 1)",
"cmd": f"cd {workflow_workspace} && {job.shellcmd}",
"prettified_cmd": job.shellcmd,
"workflow_workspace": workflow_workspace,
"job_name": job.name,
Expand Down Expand Up @@ -112,40 +102,12 @@ def run(
log.error(f"Error submitting job {job.name}: {excep}")
error_callback(job)
return
# We don't need to call `submit_callback(job)` manually since
# it would immediately check if the output files are present
# and fail otherwise (3 sec timeout).

if job.is_norun:
job_id = "all"
# Manually create the jobfinished for the root rule (`all`)
# to mark it as successful.
try:
subprocess.check_output(
f"touch {jobfinished}", shell=True,
)
except subprocess.CalledProcessError as ex:
log.error(
"Error creating `all` jobfinished file (exit code {}):\n{}".format(
ex.returncode, ex.output.decode()
)
)
error_callback(job)
return

with self.lock:
self.active_jobs.append(
REANAClusterJob(
job,
job_id,
callback,
error_callback,
jobscript,
jobfinished,
jobfailed,
)
)
self.active_jobs.append(REANAClusterJob(job, callback, error_callback))

def _get_container_image(self, job: Job) -> str:
@staticmethod
def _get_container_image(job: Job) -> str:
if job.container_img_url:
container_image = job.container_img_url.replace("docker://", "")
log.info(f"Environment: {container_image}")
Expand All @@ -154,55 +116,112 @@ def _get_container_image(self, job: Job) -> str:
log.info(f"No environment specified, falling back to: {container_image}")
return container_image

def _handle_job_status(self, job, status):
def _handle_job_status(
self, job: Job, job_status: JobStatus, workflow_status: RunStatus
) -> None:
workflow_uuid = os.getenv("workflow_uuid", "default")
job_id = job.reana_job_id
log.info(f"{status} job: {job_id}")
log.info(f"{job_status} job: {job_id}")
message = {
"progress": build_progress_message(
**{status: {"total": 1, "job_ids": [job_id]}}
**{job_status.name: {"total": 1, "job_ids": [job_id]}}
)
}
status_running = 1
status_failed = 3
status_mapping = {"finished": status_running, "failed": status_failed}
self.publisher.publish_workflow_status(
workflow_uuid, status_mapping[status], message=message
workflow_uuid, workflow_status.value, message=message
)

def handle_job_success(self, job):
def handle_job_success(self, job: Job) -> None:
"""Override job success method to publish job status."""
# override handle_touch = True, to enable `touch()` in Snakefiles
# `touch()` is responsible for checking output files existence
super(ClusterExecutor, self).handle_job_success(
job, upload_remote=False, handle_log=False, handle_touch=True
)

self._handle_job_status(job, "finished")
self._handle_job_status(
job, job_status=JobStatus.finished, workflow_status=RunStatus.running
)

def handle_job_error(self, job):
def handle_job_error(self, job: Job) -> None:
"""Override job error method to publish job status."""
super().handle_job_error(job)

self._handle_job_status(job, "failed")
self._handle_job_status(
job, job_status=JobStatus.failed, workflow_status=RunStatus.failed
)

def _get_job_status_from_controller(self, job_id: str) -> str:
"""Get job status from controller.
If error occurs, return `failed` status.
"""
try:
response = self.rjc_api_client.check_status(job_id)
except HTTPNotFound:
log.error(
f"Job {job_id} was not found in job-controller. Return job failed status."
)
return JobStatus.failed.name
except Exception as exception:
log.error(
f"Error getting status of job with id {job_id}. Return job failed status. Details: {exception}"
)
return JobStatus.failed.name

try:
return response.status
except AttributeError:
log.error(
f"job-controller response for job {job_id} does not contain 'status' field. Return job failed status."
f"Response: {response}"
)
return JobStatus.failed.name

def _wait_for_jobs(self):
"""Override _wait_for_jobs method to poll job-controller for job statuses.
Original GenericClusterExecutor._wait_for_jobs method checks success/failure via .jobfinished or .jobfailed files.
"""
while True:
with self.lock:
if not self.wait:
return
active_jobs = self.active_jobs
self.active_jobs = []
still_running = []

for active_job in active_jobs:
job_id = active_job.job.reana_job_id

status = self._get_job_status_from_controller(job_id)

if status == JobStatus.finished.name or active_job.job.is_norun:
active_job.callback(active_job.job)
elif status == JobStatus.failed.name:
active_job.error_callback(active_job.job)
else:
still_running.append(active_job)

with self.lock:
self.active_jobs = still_running

time.sleep(POLL_JOBS_STATUS_SLEEP_IN_SECONDS)


def submit_job(rjc_api_client, publisher, job_request_body):
"""Submit job to REANA Job Controller."""
response = rjc_api_client.submit(**job_request_body)
job_id = str(response["job_id"])

log.info("submitted job:{0}".format(job_id))
log.info(f"submitted job: {job_id}")
publish_job_submission(
workflow_uuid=job_request_body["workflow_uuid"],
publisher=publisher,
reana_job_id=job_id,
)
return job_id

# FIXME: Call `job_status = poll_job_status(rjc_api_client, job_id)` instead of
# checking job success/failure via `jobfinished`/`jobfailed` files in .snakemake?
# In that case we would probably need to implement our own `_wait_for_jobs` method.


def run_jobs(
rjc_api_client,
Expand Down

0 comments on commit 552b1e4

Please sign in to comment.