Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

global: changes get_workspace method with workspace_path field #288

Merged
merged 2 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ before_install:
- pip install -r requirements-dev.txt
- travis_retry pip install --upgrade pip setuptools py
- travis_retry pip install twine wheel coveralls
- travis_retry pip install "mock>=3"
- travis_retry pip install "mock>=3,<4"

install:
- travis_retry pip install -e .[all]
Expand Down
40 changes: 27 additions & 13 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
from __future__ import absolute_import

import json
import logging
import uuid
from datetime import datetime

import requests
from kubernetes.client.rest import ApiException
from reana_commons.consumer import BaseConsumer
from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_commons.k8s.api_client import (current_k8s_batchv1_api_client,
current_k8s_corev1_api_client)
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import (calculate_file_access_time,
calculate_hash_of_dir,
Expand Down Expand Up @@ -73,16 +76,19 @@ def on_message(self, body, message):

def _update_workflow_status(workflow_uuid, status, logs):
"""Update workflow status in DB."""
Workflow.update_workflow_status(Session, workflow_uuid,
status, logs, None)
workflow = Session.query(Workflow).filter_by(id_=workflow_uuid)\
.one_or_none()
if workflow.git_ref:
_update_commit_status(workflow, status)
alive_statuses = \
[WorkflowStatus.created, WorkflowStatus.running, WorkflowStatus.queued]
if status not in alive_statuses:
_delete_workflow_engine_pod(workflow_uuid)
if workflow.status != status:
Workflow.update_workflow_status(Session, workflow_uuid,
status, logs, None)
if workflow.git_ref:
_update_commit_status(workflow, status)
alive_statuses = \
[WorkflowStatus.created, WorkflowStatus.running,
WorkflowStatus.queued]
if status not in alive_statuses:
workflow.run_finished_at = datetime.now()
_delete_workflow_engine_pod(workflow)


def _update_commit_status(workflow, status):
Expand Down Expand Up @@ -189,19 +195,27 @@ def _update_job_cache(msg):
Session.add(cached_job)


def _delete_workflow_engine_pod(workflow_uuid):
def _delete_workflow_engine_pod(workflow):
"""Delete workflow engine pod."""
try:
jobs = current_k8s_batchv1_api_client.list_namespaced_job(
jobs = current_k8s_corev1_api_client.list_namespaced_pod(
namespace='default',
)
for job in jobs.items:
if workflow_uuid in job.metadata.name:
if str(workflow.id_) in job.metadata.name:
workflow.logs = \
current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=job.metadata.namespace,
name=job.metadata.name,
container='workflow-engine')
current_k8s_batchv1_api_client.delete_namespaced_job(
namespace='default',
propagation_policy="Background",
name=job.metadata.name)
name=job.metadata.labels['job-name'])
break
except ApiException as e:
raise REANAWorkflowControllerError(
"Workflow engine pod cound not be deleted {}.".format(e))
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))
24 changes: 12 additions & 12 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
from datetime import datetime
from pathlib import Path

from kubernetes.client.rest import ApiException
from reana_commons.config import REANA_WORKFLOW_UMASK
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import get_workflow_status_change_verb
from sqlalchemy.exc import SQLAlchemyError

import fs
from flask import current_app as app
from flask import jsonify
from git import Repo
from reana_commons.config import REANA_WORKFLOW_UMASK
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import get_workflow_status_change_verb
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, WorkflowStatus
from sqlalchemy.exc import SQLAlchemyError
from kubernetes.client.rest import ApiException

from reana_workflow_controller.config import (REANA_GITLAB_HOST,
WORKFLOW_TIME_FORMAT)
from reana_workflow_controller.errors import (REANAExternalCallError,
Expand Down Expand Up @@ -163,7 +163,7 @@ def remove_workflow_jobs_from_cache(workflow):
jobs = Session.query(Job).filter_by(workflow_uuid=workflow.id_).all()
for job in jobs:
job_path = remove_upper_level_references(
os.path.join(workflow.get_workspace(),
os.path.join(workflow.workspace_path,
'..', 'archive',
str(job.id_)))
Session.query(JobCache).filter_by(job_id=job.id_).delete()
Expand All @@ -188,11 +188,11 @@ def delete_workflow(workflow,
Workflow.status != WorkflowStatus.running).all()
for workflow in to_be_deleted:
if hard_delete:
remove_workflow_workspace(workflow.get_workspace())
remove_workflow_workspace(workflow.workspace_path)
_delete_workflow_row_from_db(workflow)
else:
if workspace:
remove_workflow_workspace(workflow.get_workspace())
remove_workflow_workspace(workflow.workspace_path)
_mark_workflow_as_deleted_in_db(workflow)
remove_workflow_jobs_from_cache(workflow)

Expand Down Expand Up @@ -303,7 +303,7 @@ def mv_files(source, target, workflow):
"""Move files within workspace."""
absolute_workspace_path = os.path.join(
app.config['SHARED_VOLUME_PATH'],
workflow.get_workspace())
workflow.workspace_path)
absolute_source_path = os.path.join(
app.config['SHARED_VOLUME_PATH'],
absolute_workspace_path,
Expand Down Expand Up @@ -422,8 +422,8 @@ def get_workspace_diff(workflow_a, workflow_b, brief=False, context_lines=5):
:rtype: Dictionary with file paths and their sizes
unique to each workspace.
"""
workspace_a = workflow_a.get_workspace()
workspace_b = workflow_b.get_workspace()
workspace_a = workflow_a.workspace_path
workspace_b = workflow_b.workspace_path
reana_fs = fs.open_fs(app.config['SHARED_VOLUME_PATH'])
if reana_fs.exists(workspace_a) and reana_fs.exists(workspace_b):
diff_command = ['diff',
Expand Down
10 changes: 5 additions & 5 deletions reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from reana_workflow_controller.rest.utils import (
get_specification_diff, get_workflow_name)
from reana_workflow_controller.rest.utils import \
create_workflow_workspace, get_workspace_diff
create_workflow_workspace, get_workspace_diff


START = 'start'
Expand Down Expand Up @@ -181,9 +181,9 @@ def get_workflows(): # noqa
workflow.interactive_session
if verbose:
reana_fs = fs.open_fs(SHARED_VOLUME_PATH)
if reana_fs.exists(workflow.get_workspace()):
if reana_fs.exists(workflow.workspace_path):
absolute_workspace_path = reana_fs.getospath(
workflow.get_workspace())
workflow.workspace_path)
disk_usage_info = get_workspace_disk_usage(
absolute_workspace_path, block_size=block_size)
if disk_usage_info:
Expand Down Expand Up @@ -323,13 +323,13 @@ def create_workflow(): # noqa
Session.add(workflow)
Session.object_session(workflow).commit()
if git_ref:
create_workflow_workspace(workflow.get_workspace(),
create_workflow_workspace(workflow.workspace_path,
user_id=user.id_,
git_url=git_data['git_url'],
git_branch=git_data['git_branch'],
git_ref=git_ref)
else:
create_workflow_workspace(workflow.get_workspace())
create_workflow_workspace(workflow.workspace_path)
return jsonify({'message': 'Workflow workspace created',
'workflow_id': workflow.id_,
'workflow_name': get_workflow_name(workflow)}), 201
Expand Down
10 changes: 5 additions & 5 deletions reana_workflow_controller/rest/workflows_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ def upload_file(workflow_id_or_name):
elif '..' in full_file_name.split("/"):
raise REANAUploadPathError('Path cannot contain "..".')
absolute_workspace_path = os.path.join(
current_app.config['SHARED_VOLUME_PATH'],
workflow.get_workspace())
current_app.config['SHARED_VOLUME_PATH'],
workflow.workspace_path)
if len(full_file_name.split("/")) > 1:
dirs = full_file_name.split("/")[:-1]
absolute_workspace_path = os.path.join(absolute_workspace_path,
Expand Down Expand Up @@ -219,7 +219,7 @@ def download_file(workflow_id_or_name, file_name): # noqa

absolute_workflow_workspace_path = os.path.join(
current_app.config['SHARED_VOLUME_PATH'],
workflow.get_workspace())
workflow.workspace_path)
return send_from_directory(absolute_workflow_workspace_path,
file_name,
mimetype='multipart/form-data',
Expand Down Expand Up @@ -304,7 +304,7 @@ def delete_file(workflow_id_or_name, file_name): # noqa
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name,
user_uuid)
abs_path_to_workspace = os.path.join(
current_app.config['SHARED_VOLUME_PATH'], workflow.get_workspace())
current_app.config['SHARED_VOLUME_PATH'], workflow.workspace_path)
deleted = remove_files_recursive_wildcard(
abs_path_to_workspace, file_name)

Expand Down Expand Up @@ -404,7 +404,7 @@ def get_files(workflow_id_or_name): # noqa
user_uuid)
file_list = list_directory_files(os.path.join(
current_app.config['SHARED_VOLUME_PATH'],
workflow.get_workspace()))
workflow.workspace_path))
return jsonify(file_list), 200

except ValueError:
Expand Down
8 changes: 4 additions & 4 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def _workflow_engine_command(self, overwrite_input_parameters=None,
return (WorkflowRunManager.engine_mapping[self.workflow.type_]
['command'].format(
id=self.workflow.id_,
workspace=self.workflow.get_workspace(),
workspace=self.workflow.workspace_path,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can now upgrade the minimal version of reana-db to 0.7.0.dev20200205 in setup.py.

workflow_json=base64.standard_b64encode(json.dumps(
self.workflow.get_specification()).encode()),
workflow_file=self.workflow.reana_specification.get(
Expand Down Expand Up @@ -283,7 +283,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
self.workflow.interactive_session_name = workflow_run_name
kubernetes_objects = \
build_interactive_k8s_objects[interactive_session_type](
workflow_run_name, self.workflow.get_workspace(),
workflow_run_name, self.workflow.workspace_path,
access_path,
access_token=self.workflow.get_owner_access_token(),
**kwargs)
Expand Down Expand Up @@ -376,7 +376,7 @@ def _create_job_spec(self, name, command=None, image=None,
owner_id = str(self.workflow.owner_id)
command = format_cmd(command)
workspace_mount, workspace_volume = \
get_shared_volume(self.workflow.get_workspace())
get_shared_volume(self.workflow.workspace_path)
db_mount, _ = get_shared_volume('db')

workflow_metadata = client.V1ObjectMeta(
Expand Down Expand Up @@ -523,7 +523,7 @@ def _create_job_controller_startup_cmd(self, user=None):
chown_workspace_cmd = 'chown -R {}:{} {};'.format(
WORKFLOW_RUNTIME_USER_UID,
WORKFLOW_RUNTIME_USER_GID,
SHARED_VOLUME_PATH + '/' + self.workflow.get_workspace()
SHARED_VOLUME_PATH + '/' + self.workflow.workspace_path
)
run_app_cmd = 'su {} /bin/bash -c "{}"'.format(user, base_cmd)
full_cmd = add_group_cmd + add_user_cmd + chown_workspace_cmd + \
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
'marshmallow>2.13.0,<=2.20.1',
'packaging>=18.0',
'reana-commons[kubernetes]>=0.7.0.dev20200203,<0.8.0',
'reana-db>=0.7.0.dev20200129,<0.8.0',
'reana-db>=0.7.0.dev20200206,<0.8.0',
'requests==2.20.0',
'sqlalchemy-utils>=0.31.0',
'uwsgi-tools>=1.1.1',
Expand Down
12 changes: 6 additions & 6 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ def test_workspace_deletion(app,
hard_delete):
"""Test workspace deletion."""
workflow = sample_yadage_workflow_in_db
create_workflow_workspace(sample_yadage_workflow_in_db.get_workspace())
create_workflow_workspace(sample_yadage_workflow_in_db.workspace_path)
absolute_workflow_workspace = os.path.join(
tmp_shared_volume_path,
workflow.get_workspace())
workflow.workspace_path)

# create a job for the workflow
workflow_job = Job(id_=uuid.uuid4(), workflow_uuid=workflow.id_)
Expand Down Expand Up @@ -143,10 +143,10 @@ def test_deletion_of_workspace_of_an_already_deleted_workflow(
sample_yadage_workflow_in_db,
tmp_shared_volume_path):
"""Test workspace deletion of an already deleted workflow."""
create_workflow_workspace(sample_yadage_workflow_in_db.get_workspace())
create_workflow_workspace(sample_yadage_workflow_in_db.workspace_path)
absolute_workflow_workspace = os.path.join(
tmp_shared_volume_path,
sample_yadage_workflow_in_db.get_workspace())
sample_yadage_workflow_in_db.workspace_path)

# check that the workflow workspace exists
assert os.path.exists(absolute_workflow_workspace)
Expand Down Expand Up @@ -194,11 +194,11 @@ def test_workspace_permissions(app, session, default_user,
sample_yadage_workflow_in_db,
tmp_shared_volume_path):
"""Test workspace dir permissions."""
create_workflow_workspace(sample_yadage_workflow_in_db.get_workspace())
create_workflow_workspace(sample_yadage_workflow_in_db.workspace_path)
expeted_worspace_permissions = 'drwxrwxr-x'
absolute_workflow_workspace = os.path.join(
tmp_shared_volume_path,
sample_yadage_workflow_in_db.get_workspace())
sample_yadage_workflow_in_db.workspace_path)
workspace_permissions = \
stat.filemode(os.stat(absolute_workflow_workspace).st_mode)
assert os.path.exists(absolute_workflow_workspace)
Expand Down