Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, add job status polling #34

Merged
merged 1 commit into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ The list of contributors in alphabetical order:
- `Marco Vidal <https://orcid.org/0000-0002-9363-4971>`_
- `Sinclert Perez <https://www.linkedin.com/in/sinclert>`_
- `Tibor Simko <https://orcid.org/0000-0001-7202-5803>`_
- `Vladyslav Moisieienkov <https://orcid.org/0000-0001-9717-0775>`_
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Changes
=======

Version 0.8.1 (UNRELEASED)
---------------------------

- Adds polling job-controller to determine job statuses instead of checking files.

Version 0.8.0 (2021-11-22)
---------------------------

Expand Down
37 changes: 37 additions & 0 deletions reana_workflow_engine_snakemake/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""REANA Workflow Engine Snakemake configuration."""

import os
from enum import Enum

MOUNT_CVMFS = os.getenv("REANA_MOUNT_CVMFS", "false")

Expand All @@ -20,3 +21,39 @@

SNAKEMAKE_MAX_PARALLEL_JOBS = 100
"""Snakemake maximum number of jobs that can run in parallel."""

POLL_JOBS_STATUS_SLEEP_IN_SECONDS = 10
"""Time to sleep between polling for job status."""


# defined in reana-db component, in reana_db/models.py file as JobStatus
class JobStatus(Enum):
"""Enumeration of job statuses.

Example:
JobStatus.started.name == "started" # True
"""

# FIXME: this state is not defined in reana-db but returned by r-job-controller
started = 6

created = 0
running = 1
finished = 2
failed = 3
stopped = 4
queued = 5


# defined in reana-db component, in reana_db/models.py file as RunStatus
class RunStatus(Enum):
"""Enumeration of possible run statuses of a workflow."""

created = 0
running = 1
finished = 2
failed = 3
deleted = 4
stopped = 5
queued = 6
pending = 7
VMois marked this conversation as resolved.
Show resolved Hide resolved
151 changes: 85 additions & 66 deletions reana_workflow_engine_snakemake/executor.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2021 CERN.
# Copyright (C) 2021, 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANA-Workflow-Engine-Snakemake executor."""

import os
import subprocess
import logging
import time
from collections import namedtuple
from typing import Callable

from bravado.exception import HTTPNotFound
from reana_commons.config import REANA_DEFAULT_SNAKEMAKE_ENV_IMAGE
from reana_commons.utils import build_progress_message
from snakemake import snakemake
Expand All @@ -26,20 +27,19 @@
LOGGING_MODULE,
MOUNT_CVMFS,
SNAKEMAKE_MAX_PARALLEL_JOBS,
POLL_JOBS_STATUS_SLEEP_IN_SECONDS,
JobStatus,
RunStatus,
)
from reana_workflow_engine_snakemake.utils import (
publish_job_submission,
publish_workflow_start,
)


log = logging.getLogger(LOGGING_MODULE)


REANAClusterJob = namedtuple(
"REANAClusterJob",
"job jobid callback error_callback jobscript jobfinished jobfailed",
)
REANAClusterJob = namedtuple("REANAClusterJob", "job callback error_callback")


class REANAClusterExecutor(GenericClusterExecutor):
Expand All @@ -54,16 +54,6 @@ def run(
):
"""Override GenericClusterExecutor run method."""
super()._run(job)
jobid = job.jobid

# Files needed for Snakemake (`GenericClusterExecutor._wait_for_jobs`)
# to check if a job finished successfully.
jobscript = self.get_jobscript(job)
jobfinished = os.path.join(self.tmpdir, "{}.jobfinished".format(jobid))
jobfailed = os.path.join(self.tmpdir, "{}.jobfailed".format(jobid))
self.write_jobscript(
job, jobscript, jobfinished=jobfinished, jobfailed=jobfailed
)

workflow_workspace = os.getenv("workflow_workspace", "default")
workflow_uuid = os.getenv("workflow_uuid", "default")
Expand All @@ -79,7 +69,7 @@ def run(
job_request_body = {
"workflow_uuid": workflow_uuid,
"image": container_image,
"cmd": f"cd {workflow_workspace} && {job.shellcmd} && touch {jobfinished} || (touch {jobfailed}; exit 1)",
"cmd": f"cd {workflow_workspace} && {job.shellcmd}",
"prettified_cmd": job.shellcmd,
"workflow_workspace": workflow_workspace,
"job_name": job.name,
Expand Down Expand Up @@ -112,40 +102,12 @@ def run(
log.error(f"Error submitting job {job.name}: {excep}")
error_callback(job)
return
# We don't need to call `submit_callback(job)` manually since
# it would immediately check if the output files are present
# and fail otherwise (3 sec timeout).

if job.is_norun:
VMois marked this conversation as resolved.
Show resolved Hide resolved
job_id = "all"
# Manually create the jobfinished for the root rule (`all`)
# to mark it as successful.
try:
subprocess.check_output(
f"touch {jobfinished}", shell=True,
)
except subprocess.CalledProcessError as ex:
log.error(
"Error creating `all` jobfinished file (exit code {}):\n{}".format(
ex.returncode, ex.output.decode()
)
)
error_callback(job)
return

with self.lock:
self.active_jobs.append(
REANAClusterJob(
job,
job_id,
callback,
error_callback,
jobscript,
jobfinished,
jobfailed,
)
)
self.active_jobs.append(REANAClusterJob(job, callback, error_callback))

def _get_container_image(self, job: Job) -> str:
@staticmethod
def _get_container_image(job: Job) -> str:
if job.container_img_url:
container_image = job.container_img_url.replace("docker://", "")
log.info(f"Environment: {container_image}")
Expand All @@ -154,55 +116,112 @@ def _get_container_image(self, job: Job) -> str:
log.info(f"No environment specified, falling back to: {container_image}")
return container_image

def _handle_job_status(self, job, status):
def _handle_job_status(
self, job: Job, job_status: JobStatus, workflow_status: RunStatus
) -> None:
workflow_uuid = os.getenv("workflow_uuid", "default")
job_id = job.reana_job_id
log.info(f"{status} job: {job_id}")
log.info(f"{job.name} job is {job_status.name}. job_id: {job_id}")
message = {
"progress": build_progress_message(
**{status: {"total": 1, "job_ids": [job_id]}}
**{job_status.name: {"total": 1, "job_ids": [job_id]}}
)
}
status_running = 1
status_failed = 3
status_mapping = {"finished": status_running, "failed": status_failed}
self.publisher.publish_workflow_status(
workflow_uuid, status_mapping[status], message=message
workflow_uuid, workflow_status.value, message=message
)

def handle_job_success(self, job):
def handle_job_success(self, job: Job) -> None:
"""Override job success method to publish job status."""
# override handle_touch = True, to enable `touch()` in Snakefiles
# `touch()` is responsible for checking output files existence
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, touch() is just the Unix touch that creates empty files, so it can denote that a certain task was completed.

Copy link
Author

@VMois VMois Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I will disable the touch() (handle_touch=True) then the CMS-h4l demo is failing so I think it is checking output files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the docstring, it doesn't seem 100% accurate to me. No comments about functionality, we need to keep handle_touch=True as you've described.

# `touch()` is responsible for checking output files existence

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, what do you propose, remove the comment or rewrite it to be more accurate?

super(ClusterExecutor, self).handle_job_success(
job, upload_remote=False, handle_log=False, handle_touch=True
)

self._handle_job_status(job, "finished")
self._handle_job_status(
job, job_status=JobStatus.finished, workflow_status=RunStatus.running
)

def handle_job_error(self, job):
def handle_job_error(self, job: Job) -> None:
"""Override job error method to publish job status."""
super().handle_job_error(job)

self._handle_job_status(job, "failed")
self._handle_job_status(
job, job_status=JobStatus.failed, workflow_status=RunStatus.failed
)

def _get_job_status_from_controller(self, job_id: str) -> str:
"""Get job status from controller.

If error occurs, return `failed` status.
"""
try:
response = self.rjc_api_client.check_status(job_id)
except HTTPNotFound:
log.error(
f"Job {job_id} was not found in job-controller. Return job failed status."
)
return JobStatus.failed.name
except Exception as exception:
log.error(
f"Error getting status of job with id {job_id}. Return job failed status. Details: {exception}"
)
return JobStatus.failed.name

try:
return response.status
except AttributeError:
log.error(
f"job-controller response for job {job_id} does not contain 'status' field. Return job failed status."
f"Response: {response}"
)
return JobStatus.failed.name

def _wait_for_jobs(self):
"""Override _wait_for_jobs method to poll job-controller for job statuses.

Original GenericClusterExecutor._wait_for_jobs method checks success/failure via .jobfinished or .jobfailed files.
"""
while True:
with self.lock:
if not self.wait:
return
active_jobs = self.active_jobs
self.active_jobs = []
still_running = []

for active_job in active_jobs:
job_id = active_job.job.reana_job_id

status = self._get_job_status_from_controller(job_id)

if status == JobStatus.finished.name or active_job.job.is_norun:
active_job.callback(active_job.job)
elif status == JobStatus.failed.name:
active_job.error_callback(active_job.job)
else:
still_running.append(active_job)

with self.lock:
self.active_jobs = still_running

time.sleep(POLL_JOBS_STATUS_SLEEP_IN_SECONDS)


def submit_job(rjc_api_client, publisher, job_request_body):
"""Submit job to REANA Job Controller."""
response = rjc_api_client.submit(**job_request_body)
job_id = str(response["job_id"])

log.info("submitted job:{0}".format(job_id))
log.info(f"submitted job: {job_id}")
publish_job_submission(
workflow_uuid=job_request_body["workflow_uuid"],
publisher=publisher,
reana_job_id=job_id,
)
return job_id

# FIXME: Call `job_status = poll_job_status(rjc_api_client, job_id)` instead of
# checking job success/failure via `jobfinished`/`jobfailed` files in .snakemake?
# In that case we would probably need to implement our own `_wait_for_jobs` method.


def run_jobs(
rjc_api_client,
Expand Down