From 4b77b3371830ad2573de58b2449d96526c8d0de4 Mon Sep 17 00:00:00 2001 From: Rokas Maciulaitis Date: Mon, 27 Jan 2020 14:53:21 +0100 Subject: [PATCH 1/2] global: changes get_workspace method with workspace_path field * It allows reuse of workspace. * Addresses reanahub/reana-workflow-controller/issues/289 --- reana_workflow_controller/consumer.py | 2 ++ reana_workflow_controller/rest/utils.py | 24 +++++++++---------- reana_workflow_controller/rest/workflows.py | 10 ++++---- .../rest/workflows_workspace.py | 10 ++++---- .../workflow_run_manager.py | 8 +++---- tests/test_utils.py | 12 +++++----- tests/test_views.py | 24 +++++++++---------- 7 files changed, 46 insertions(+), 44 deletions(-) diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 743f6f06..60082280 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -12,6 +12,7 @@ import json import uuid +from datetime import datetime import requests from kubernetes.client.rest import ApiException @@ -82,6 +83,7 @@ def _update_workflow_status(workflow_uuid, status, logs): alive_statuses = \ [WorkflowStatus.created, WorkflowStatus.running, WorkflowStatus.queued] if status not in alive_statuses: + workflow.run_finished_at = datetime.now() _delete_workflow_engine_pod(workflow_uuid) diff --git a/reana_workflow_controller/rest/utils.py b/reana_workflow_controller/rest/utils.py index 521584ef..39c484e8 100644 --- a/reana_workflow_controller/rest/utils.py +++ b/reana_workflow_controller/rest/utils.py @@ -16,18 +16,18 @@ from datetime import datetime from pathlib import Path +from kubernetes.client.rest import ApiException +from reana_commons.config import REANA_WORKFLOW_UMASK +from reana_commons.k8s.secrets import REANAUserSecretsStore +from reana_commons.utils import get_workflow_status_change_verb +from sqlalchemy.exc import SQLAlchemyError + import fs from flask import current_app as app from flask import jsonify from git import Repo -from reana_commons.config import REANA_WORKFLOW_UMASK -from reana_commons.k8s.secrets import REANAUserSecretsStore -from reana_commons.utils import get_workflow_status_change_verb from reana_db.database import Session from reana_db.models import Job, JobCache, Workflow, WorkflowStatus -from sqlalchemy.exc import SQLAlchemyError -from kubernetes.client.rest import ApiException - from reana_workflow_controller.config import (REANA_GITLAB_HOST, WORKFLOW_TIME_FORMAT) from reana_workflow_controller.errors import (REANAExternalCallError, @@ -163,7 +163,7 @@ def remove_workflow_jobs_from_cache(workflow): jobs = Session.query(Job).filter_by(workflow_uuid=workflow.id_).all() for job in jobs: job_path = remove_upper_level_references( - os.path.join(workflow.get_workspace(), + os.path.join(workflow.workspace_path, '..', 'archive', str(job.id_))) Session.query(JobCache).filter_by(job_id=job.id_).delete() @@ -188,11 +188,11 @@ def delete_workflow(workflow, Workflow.status != WorkflowStatus.running).all() for workflow in to_be_deleted: if hard_delete: - remove_workflow_workspace(workflow.get_workspace()) + remove_workflow_workspace(workflow.workspace_path) _delete_workflow_row_from_db(workflow) else: if workspace: - remove_workflow_workspace(workflow.get_workspace()) + remove_workflow_workspace(workflow.workspace_path) _mark_workflow_as_deleted_in_db(workflow) remove_workflow_jobs_from_cache(workflow) @@ -303,7 +303,7 @@ def mv_files(source, target, workflow): """Move files within workspace.""" absolute_workspace_path = os.path.join( app.config['SHARED_VOLUME_PATH'], - workflow.get_workspace()) + workflow.workspace_path) absolute_source_path = os.path.join( app.config['SHARED_VOLUME_PATH'], absolute_workspace_path, @@ -422,8 +422,8 @@ def get_workspace_diff(workflow_a, workflow_b, brief=False, context_lines=5): :rtype: Dictionary with file paths and their sizes unique to each workspace. """ - workspace_a = workflow_a.get_workspace() - workspace_b = workflow_b.get_workspace() + workspace_a = workflow_a.workspace_path + workspace_b = workflow_b.workspace_path reana_fs = fs.open_fs(app.config['SHARED_VOLUME_PATH']) if reana_fs.exists(workspace_a) and reana_fs.exists(workspace_b): diff_command = ['diff', diff --git a/reana_workflow_controller/rest/workflows.py b/reana_workflow_controller/rest/workflows.py index d128784d..ad22c7ea 100644 --- a/reana_workflow_controller/rest/workflows.py +++ b/reana_workflow_controller/rest/workflows.py @@ -26,7 +26,7 @@ from reana_workflow_controller.rest.utils import ( get_specification_diff, get_workflow_name) from reana_workflow_controller.rest.utils import \ - create_workflow_workspace, get_workspace_diff + create_workflow_workspace, get_workspace_diff START = 'start' @@ -181,9 +181,9 @@ def get_workflows(): # noqa workflow.interactive_session if verbose: reana_fs = fs.open_fs(SHARED_VOLUME_PATH) - if reana_fs.exists(workflow.get_workspace()): + if reana_fs.exists(workflow.workspace_path): absolute_workspace_path = reana_fs.getospath( - workflow.get_workspace()) + workflow.workspace_path) disk_usage_info = get_workspace_disk_usage( absolute_workspace_path, block_size=block_size) if disk_usage_info: @@ -323,13 +323,13 @@ def create_workflow(): # noqa Session.add(workflow) Session.object_session(workflow).commit() if git_ref: - create_workflow_workspace(workflow.get_workspace(), + create_workflow_workspace(workflow.workspace_path, user_id=user.id_, git_url=git_data['git_url'], git_branch=git_data['git_branch'], git_ref=git_ref) else: - create_workflow_workspace(workflow.get_workspace()) + create_workflow_workspace(workflow.workspace_path) return jsonify({'message': 'Workflow workspace created', 'workflow_id': workflow.id_, 'workflow_name': get_workflow_name(workflow)}), 201 diff --git a/reana_workflow_controller/rest/workflows_workspace.py b/reana_workflow_controller/rest/workflows_workspace.py index 6ec92b3e..d8af10e8 100644 --- a/reana_workflow_controller/rest/workflows_workspace.py +++ b/reana_workflow_controller/rest/workflows_workspace.py @@ -123,8 +123,8 @@ def upload_file(workflow_id_or_name): elif '..' in full_file_name.split("/"): raise REANAUploadPathError('Path cannot contain "..".') absolute_workspace_path = os.path.join( - current_app.config['SHARED_VOLUME_PATH'], - workflow.get_workspace()) + current_app.config['SHARED_VOLUME_PATH'], + workflow.workspace_path) if len(full_file_name.split("/")) > 1: dirs = full_file_name.split("/")[:-1] absolute_workspace_path = os.path.join(absolute_workspace_path, @@ -219,7 +219,7 @@ def download_file(workflow_id_or_name, file_name): # noqa absolute_workflow_workspace_path = os.path.join( current_app.config['SHARED_VOLUME_PATH'], - workflow.get_workspace()) + workflow.workspace_path) return send_from_directory(absolute_workflow_workspace_path, file_name, mimetype='multipart/form-data', @@ -304,7 +304,7 @@ def delete_file(workflow_id_or_name, file_name): # noqa workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid) abs_path_to_workspace = os.path.join( - current_app.config['SHARED_VOLUME_PATH'], workflow.get_workspace()) + current_app.config['SHARED_VOLUME_PATH'], workflow.workspace_path) deleted = remove_files_recursive_wildcard( abs_path_to_workspace, file_name) @@ -404,7 +404,7 @@ def get_files(workflow_id_or_name): # noqa user_uuid) file_list = list_directory_files(os.path.join( current_app.config['SHARED_VOLUME_PATH'], - workflow.get_workspace())) + workflow.workspace_path)) return jsonify(file_list), 200 except ValueError: diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 148a10bb..db7d6d06 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -179,7 +179,7 @@ def _workflow_engine_command(self, overwrite_input_parameters=None, return (WorkflowRunManager.engine_mapping[self.workflow.type_] ['command'].format( id=self.workflow.id_, - workspace=self.workflow.get_workspace(), + workspace=self.workflow.workspace_path, workflow_json=base64.standard_b64encode(json.dumps( self.workflow.get_specification()).encode()), workflow_file=self.workflow.reana_specification.get( @@ -283,7 +283,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs): self.workflow.interactive_session_name = workflow_run_name kubernetes_objects = \ build_interactive_k8s_objects[interactive_session_type]( - workflow_run_name, self.workflow.get_workspace(), + workflow_run_name, self.workflow.workspace_path, access_path, access_token=self.workflow.get_owner_access_token(), **kwargs) @@ -376,7 +376,7 @@ def _create_job_spec(self, name, command=None, image=None, owner_id = str(self.workflow.owner_id) command = format_cmd(command) workspace_mount, workspace_volume = \ - get_shared_volume(self.workflow.get_workspace()) + get_shared_volume(self.workflow.workspace_path) db_mount, _ = get_shared_volume('db') workflow_metadata = client.V1ObjectMeta( @@ -523,7 +523,7 @@ def _create_job_controller_startup_cmd(self, user=None): chown_workspace_cmd = 'chown -R {}:{} {};'.format( WORKFLOW_RUNTIME_USER_UID, WORKFLOW_RUNTIME_USER_GID, - SHARED_VOLUME_PATH + '/' + self.workflow.get_workspace() + SHARED_VOLUME_PATH + '/' + self.workflow.workspace_path ) run_app_cmd = 'su {} /bin/bash -c "{}"'.format(user, base_cmd) full_cmd = add_group_cmd + add_user_cmd + chown_workspace_cmd + \ diff --git a/tests/test_utils.py b/tests/test_utils.py index aba65afd..47cb9a09 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -101,10 +101,10 @@ def test_workspace_deletion(app, hard_delete): """Test workspace deletion.""" workflow = sample_yadage_workflow_in_db - create_workflow_workspace(sample_yadage_workflow_in_db.get_workspace()) + create_workflow_workspace(sample_yadage_workflow_in_db.workspace_path) absolute_workflow_workspace = os.path.join( tmp_shared_volume_path, - workflow.get_workspace()) + workflow.workspace_path) # create a job for the workflow workflow_job = Job(id_=uuid.uuid4(), workflow_uuid=workflow.id_) @@ -143,10 +143,10 @@ def test_deletion_of_workspace_of_an_already_deleted_workflow( sample_yadage_workflow_in_db, tmp_shared_volume_path): """Test workspace deletion of an already deleted workflow.""" - create_workflow_workspace(sample_yadage_workflow_in_db.get_workspace()) + create_workflow_workspace(sample_yadage_workflow_in_db.workspace_path) absolute_workflow_workspace = os.path.join( tmp_shared_volume_path, - sample_yadage_workflow_in_db.get_workspace()) + sample_yadage_workflow_in_db.workspace_path) # check that the workflow workspace exists assert os.path.exists(absolute_workflow_workspace) @@ -194,11 +194,11 @@ def test_workspace_permissions(app, session, default_user, sample_yadage_workflow_in_db, tmp_shared_volume_path): """Test workspace dir permissions.""" - create_workflow_workspace(sample_yadage_workflow_in_db.get_workspace()) + create_workflow_workspace(sample_yadage_workflow_in_db.workspace_path) expeted_worspace_permissions = 'drwxrwxr-x' absolute_workflow_workspace = os.path.join( tmp_shared_volume_path, - sample_yadage_workflow_in_db.get_workspace()) + sample_yadage_workflow_in_db.workspace_path) workspace_permissions = \ stat.filemode(os.stat(absolute_workflow_workspace).st_mode) assert os.path.exists(absolute_workflow_workspace) diff --git a/tests/test_views.py b/tests/test_views.py index 81e42557..bc16d52f 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -108,7 +108,7 @@ def test_create_workflow_with_name(app, session, default_user, # Check that the workflow workspace exists absolute_workflow_workspace = os.path.join( - tmp_shared_volume_path, workflow.get_workspace()) + tmp_shared_volume_path, workflow.workspace_path) assert os.path.exists(absolute_workflow_workspace) @@ -146,7 +146,7 @@ def test_create_workflow_without_name(app, session, default_user, # Check that the workflow workspace exists absolute_workflow_workspace = os.path.join( - tmp_shared_volume_path, workflow.get_workspace()) + tmp_shared_volume_path, workflow.workspace_path) assert os.path.exists(absolute_workflow_workspace) @@ -219,7 +219,7 @@ def test_download_file(app, session, default_user, # files absolute_path_workflow_workspace = \ os.path.join(tmp_shared_volume_path, - workflow.get_workspace()) + workflow.workspace_path) file_path = os.path.join(absolute_path_workflow_workspace, file_name) # because outputs directory doesn't exist by default @@ -260,7 +260,7 @@ def test_download_file_with_path(app, session, default_user, # files absolute_path_workflow_workspace = \ os.path.join(tmp_shared_volume_path, - workflow.get_workspace()) + workflow.workspace_path) file_path = os.path.join(absolute_path_workflow_workspace, file_name) # because outputs directory doesn't exist by default os.makedirs(os.path.dirname(file_path), exist_ok=True) @@ -294,7 +294,7 @@ def test_get_files(app, session, default_user, # create file absolute_path_workflow_workspace = \ os.path.join(tmp_shared_volume_path, - workflow.get_workspace()) + workflow.workspace_path) fs_ = fs.open_fs(absolute_path_workflow_workspace) test_files = [] for i in range(5): @@ -631,7 +631,7 @@ def test_upload_file(app, session, default_user, input_stream=io.BytesIO(file_binary_content)) assert res.status_code == 200 # remove workspace directory from path - workflow_workspace = workflow.get_workspace() + workflow_workspace = workflow.workspace_path # we use `secure_filename` here because # we use it in server side when adding @@ -666,10 +666,10 @@ def test_delete_file(app, default_user, sample_serial_workflow_in_db): """Test delete file.""" # Move to fixture from flask import current_app - create_workflow_workspace(sample_serial_workflow_in_db.get_workspace()) + create_workflow_workspace(sample_serial_workflow_in_db.workspace_path) abs_path_workspace = os.path.join( current_app.config['SHARED_VOLUME_PATH'], - sample_serial_workflow_in_db.get_workspace()) + sample_serial_workflow_in_db.workspace_path) file_name = 'dataset.csv' file_binary_content = b'1,2,3,4\n5,6,7,8' abs_path_to_file = os.path.join(abs_path_workspace, file_name) @@ -943,7 +943,7 @@ def test_workspace_deletion(app, absolute_workflow_workspace = os.path.join( tmp_shared_volume_path, - workflow.get_workspace()) + workflow.workspace_path) # create a job for the workflow workflow_job = Job(id_=uuid.uuid4(), workflow_uuid=workflow.id_) @@ -997,7 +997,7 @@ def test_deletion_of_workspace_of_an_already_deleted_workflow( absolute_workflow_workspace = os.path.join( tmp_shared_volume_path, - workflow.get_workspace()) + workflow.workspace_path) # check that the workflow workspace exists assert os.path.exists(absolute_workflow_workspace) @@ -1064,9 +1064,9 @@ def test_get_workspace_diff(app, default_user, """Test get workspace differences.""" # create the workspaces for the two workflows workspace_path_a = next(sample_workflow_workspace( - str(sample_serial_workflow_in_db.id_))) + str(sample_serial_workflow_in_db.workspace_path))) workspace_path_b = next(sample_workflow_workspace( - str(sample_yadage_workflow_in_db.id_))) + str(sample_yadage_workflow_in_db.workspace_path))) sample_serial_workflow_in_db.get_workspace = lambda: str( sample_serial_workflow_in_db.id_) From 88ea799679ad53cf10842533fe321ab5350af08a Mon Sep 17 00:00:00 2001 From: Rokas Maciulaitis Date: Thu, 30 Jan 2020 14:06:06 +0100 Subject: [PATCH 2/2] consumer: saves workflow engine container logs * Saves workflow engine container logs and updates WF status if it is only needed. --- .travis.yml | 2 +- reana_workflow_controller/consumer.py | 40 +++++++++++++++++---------- setup.py | 2 +- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/.travis.yml b/.travis.yml index 248c050d..272759ee 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,7 @@ before_install: - pip install -r requirements-dev.txt - travis_retry pip install --upgrade pip setuptools py - travis_retry pip install twine wheel coveralls - - travis_retry pip install "mock>=3" + - travis_retry pip install "mock>=3,<4" install: - travis_retry pip install -e .[all] diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 60082280..b335397c 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -11,13 +11,15 @@ from __future__ import absolute_import import json +import logging import uuid from datetime import datetime import requests from kubernetes.client.rest import ApiException from reana_commons.consumer import BaseConsumer -from reana_commons.k8s.api_client import current_k8s_batchv1_api_client +from reana_commons.k8s.api_client import (current_k8s_batchv1_api_client, + current_k8s_corev1_api_client) from reana_commons.k8s.secrets import REANAUserSecretsStore from reana_commons.utils import (calculate_file_access_time, calculate_hash_of_dir, @@ -74,17 +76,19 @@ def on_message(self, body, message): def _update_workflow_status(workflow_uuid, status, logs): """Update workflow status in DB.""" - Workflow.update_workflow_status(Session, workflow_uuid, - status, logs, None) workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\ .one_or_none() - if workflow.git_ref: - _update_commit_status(workflow, status) - alive_statuses = \ - [WorkflowStatus.created, WorkflowStatus.running, WorkflowStatus.queued] - if status not in alive_statuses: - workflow.run_finished_at = datetime.now() - _delete_workflow_engine_pod(workflow_uuid) + if workflow.status != status: + Workflow.update_workflow_status(Session, workflow_uuid, + status, logs, None) + if workflow.git_ref: + _update_commit_status(workflow, status) + alive_statuses = \ + [WorkflowStatus.created, WorkflowStatus.running, + WorkflowStatus.queued] + if status not in alive_statuses: + workflow.run_finished_at = datetime.now() + _delete_workflow_engine_pod(workflow) def _update_commit_status(workflow, status): @@ -191,19 +195,27 @@ def _update_job_cache(msg): Session.add(cached_job) -def _delete_workflow_engine_pod(workflow_uuid): +def _delete_workflow_engine_pod(workflow): """Delete workflow engine pod.""" try: - jobs = current_k8s_batchv1_api_client.list_namespaced_job( + jobs = current_k8s_corev1_api_client.list_namespaced_pod( namespace='default', ) for job in jobs.items: - if workflow_uuid in job.metadata.name: + if str(workflow.id_) in job.metadata.name: + workflow.logs = \ + current_k8s_corev1_api_client.read_namespaced_pod_log( + namespace=job.metadata.namespace, + name=job.metadata.name, + container='workflow-engine') current_k8s_batchv1_api_client.delete_namespaced_job( namespace='default', propagation_policy="Background", - name=job.metadata.name) + name=job.metadata.labels['job-name']) break except ApiException as e: raise REANAWorkflowControllerError( "Workflow engine pod cound not be deleted {}.".format(e)) + except Exception as e: + logging.error(traceback.format_exc()) + logging.error("Unexpected error: {}".format(e)) diff --git a/setup.py b/setup.py index faef4a9f..dedb322c 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ 'marshmallow>2.13.0,<=2.20.1', 'packaging>=18.0', 'reana-commons[kubernetes]>=0.7.0.dev20200203,<0.8.0', - 'reana-db>=0.7.0.dev20200129,<0.8.0', + 'reana-db>=0.7.0.dev20200206,<0.8.0', 'requests==2.20.0', 'sqlalchemy-utils>=0.31.0', 'uwsgi-tools>=1.1.1',