Skip to content

Commit

Permalink
Merge 9df6ae8 into ac4ae18
Browse files Browse the repository at this point in the history
  • Loading branch information
Dinos Kousidis committed Jan 21, 2019
2 parents ac4ae18 + 9df6ae8 commit ce84610
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 34 deletions.
74 changes: 68 additions & 6 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

BROKER_PORT = os.getenv('RABBIT_MQ_PORT', 5672)

SHARED_VOLUME_PATH = os.getenv('SHARED_VOLUME_PATH', '/reana')
SHARED_VOLUME_PATH = os.getenv('SHARED_VOLUME_PATH', '/var/reana')


SQLALCHEMY_TRACK_MODIFICATIONS = False
Expand All @@ -47,25 +47,87 @@
'serial': 'serial-default-queue'}

REANA_STORAGE_BACKEND = os.getenv('REANA_STORAGE_BACKEND', 'local')
"""Type of storage attached to the engines, one of ['local', 'ceph']."""
"""Type of storage attached to the engines, one of ['local', 'cephfs']."""

MANILA_CEPHFS_PVC = 'manila-cephfs-pvc'
"""If CEPH storage backend is used, this represents the name of the
Kubernetes persistent volume claim."""

MOUNT_CVMFS = os.getenv('REANA_MOUNT_CVMFS', False)
"""Option to mount CVMFS volumes in workflow engines and jobs."""

SHARED_FS_MAPPING = {
'MOUNT_SOURCE_PATH': os.getenv("SHARED_VOLUME_PATH_ROOT", '/reana'),
'MOUNT_SOURCE_PATH': os.getenv("SHARED_VOLUME_PATH_ROOT",
SHARED_VOLUME_PATH),
# Root path in the underlying shared file system to be mounted inside
# workflow engines.
'MOUNT_DEST_PATH': os.getenv("SHARED_VOLUME_PATH", '/reana'),
'MOUNT_DEST_PATH': os.getenv("SHARED_VOLUME_PATH",
SHARED_VOLUME_PATH),
# Mount path for the shared file system volume inside workflow engines.
}
"""Mapping from the shared file system backend to the job file system."""

WORKFLOW_ENGINE_VERSION = parse(__version__).base_version if \
os.getenv("REANA_DEPLOYMENT_TYPE", 'local') != 'local' else 'latest'
WORKFLOW_ENGINE_VERSION = 'cvmfs-16'
"""CWL workflow engine version."""

WORKFLOW_ENGINE_COMMON_ENV_VARS = [
{
'name': 'ZMQ_PROXY_CONNECT',
'value': 'tcp://zeromq-msg-proxy.default.svc.cluster.local:8666'
},
{
'name': 'SHARED_VOLUME_PATH',
'value': SHARED_VOLUME_PATH
},
{
'name': 'REANA_MOUNT_CVMFS',
'value': MOUNT_CVMFS
}
]
"""Common to all workflow engines environment variables."""

WORKFLOW_ENGINE_COMMON_ENV_VARS_DEBUG = ({'name': 'WDB_SOCKET_SERVER',
'value': 'wdb'},
{'name': 'WDB_NO_BROWSER_AUTO_OPEN',
'value': 'True'})
"""Common to all workflow engines environment variables for debug mode."""

TTL_SECONDS_AFTER_FINISHED = 60
"""Threshold in seconds to clean up terminated (either Complete or Failed)
jobs."""


CVMFS_VOLUME_CONFIGURATION = {
'alice': {
'name': 'alice-cvmfs-volume',
'persistentVolumeClaim': {
'claimName': 'csi-cvmfs-alice-pvc',
'readOnly': True
},
'mountPath': '/cvmfs/alice.cern.ch'
},
'cms': {
'name': 'cms-cvmfs-volume',
'persistentVolumeClaim': {
'claimName': 'csi-cvmfs-cms-pvc',
'readOnly': True
},
'mountPath': '/cvmfs/cms.cern.ch'
},
'lhcb': {
'name': 'lhcb-cvmfs-volume',
'persistentVolumeClaim': {
'claimName': 'csi-cvmfs-lhcb-pvc',
'readOnly': True
},
'mountPath': '/cvmfs/lhcb.cern.ch'
},
'atlas': {
'name': 'atlas-cvmfs-volume',
'persistentVolumeClaim': {
'claimName': 'csi-cvmfs-atlas-pvc',
'readOnly': True
},
'mountPath': '/cvmfs/atlas.cern.ch'
}
}
43 changes: 17 additions & 26 deletions reana_workflow_controller/workflow_run_managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,56 +9,47 @@
import json
import os

from reana_workflow_controller.config import WORKFLOW_ENGINE_VERSION
from reana_workflow_controller.config import (
MOUNT_CVMFS,
WORKFLOW_ENGINE_COMMON_ENV_VARS,
WORKFLOW_ENGINE_COMMON_ENV_VARS_DEBUG,
WORKFLOW_ENGINE_VERSION)


class WorkflowRunManager():
"""Interface which specifies how to manage workflow runs."""

common_env_variables = [
{
'name': 'ZMQ_PROXY_CONNECT',
'value': 'tcp://zeromq-msg-proxy.default.svc.cluster.local:8666'
},
{
'name': 'SHARED_VOLUME_PATH',
'value': '/reana'
},
]
"""Common to all workflow engines environment variables."""

if os.getenv('FLASK_ENV') == 'development':
common_env_variables.extend(({'name': 'WDB_SOCKET_SERVER',
'value': 'wdb'},
{'name': 'WDB_NO_BROWSER_AUTO_OPEN',
'value': 'True'}))
WORKFLOW_ENGINE_COMMON_ENV_VARS.extend(
WORKFLOW_ENGINE_COMMON_ENV_VARS_DEBUG)

engine_mapping = {
'cwl': {'image': 'reanahub/reana-workflow-engine-cwl:{}'.format(
WORKFLOW_ENGINE_VERSION),
'cwl': {'image': 'dinossimpson/reana-workflow-engine-cwl:{}'.
format(WORKFLOW_ENGINE_VERSION),
'command': ("run-cwl-workflow "
"--workflow-uuid {id} "
"--workflow-workspace {workspace} "
"--workflow-json '{workflow_json}' "
"--workflow-parameters '{parameters}' "
"--operational-options '{options}' "),
'environment_variables': common_env_variables},
'yadage': {'image': 'reanahub/reana-workflow-engine-yadage:{}'.format(
WORKFLOW_ENGINE_VERSION),
'environment_variables': WORKFLOW_ENGINE_COMMON_ENV_VARS},
'yadage': {'image': 'dinossimpson/reana-workflow-engine-yadage:{}'.
format(WORKFLOW_ENGINE_VERSION),
'command': ("run-yadage-workflow "
"--workflow-uuid {id} "
"--workflow-workspace {workspace} "
"--workflow-json '{workflow_json}' "
"--workflow-parameters '{parameters}' "),
'environment_variables': common_env_variables},
'serial': {'image': 'reanahub/reana-workflow-engine-serial:{}'.format(
WORKFLOW_ENGINE_VERSION),
'environment_variables': WORKFLOW_ENGINE_COMMON_ENV_VARS},
'serial': {'image': 'dinossimpson/reana-workflow-engine-serial:{}'.
format(WORKFLOW_ENGINE_VERSION),
'command': ("run-serial-workflow "
"--workflow-uuid {id} "
"--workflow-workspace {workspace} "
"--workflow-json '{workflow_json}' "
"--workflow-parameters '{parameters}' "
"--operational-options '{options}' "),
'environment_variables': common_env_variables},
'environment_variables': WORKFLOW_ENGINE_COMMON_ENV_VARS},
}
"""Mapping between engines and their basis configuration."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from kubernetes import client

from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_workflow_controller.config import (MANILA_CEPHFS_PVC,
from reana_workflow_controller.config import (CVMFS_VOLUME_CONFIGURATION,
MANILA_CEPHFS_PVC,
MOUNT_CVMFS,
SHARED_FS_MAPPING,
REANA_STORAGE_BACKEND,
TTL_SECONDS_AFTER_FINISHED)
Expand All @@ -20,7 +22,7 @@ class KubernetesWorkflowRunManager(WorkflowRunManager):
"""Implementation of WorkflowRunManager for Kubernetes."""

k8s_shared_volume = {
'ceph': {
'cephfs': {
'name': 'default-shared-volume',
'persistentVolumeClaim': {
'claimName': MANILA_CEPHFS_PVC,
Expand All @@ -36,6 +38,15 @@ class KubernetesWorkflowRunManager(WorkflowRunManager):
}
"""Configuration to connect to the different storage backends."""

cvmfs_volumes = []
cvmfs_volume_mounts = []
for cvmfs_mount in CVMFS_VOLUME_CONFIGURATION.values():
cvmfs_volumes.append({'name': cvmfs_mount['name'],
'persistentVolumeClaim':
cvmfs_mount['persistentVolumeClaim']})
cvmfs_volume_mounts.append({'name': cvmfs_mount['name'],
'mountPath': cvmfs_mount['mountPath']})

def start_batch_workflow_run(self):
"""Start a batch workflow run."""
namespace = 'default'
Expand Down Expand Up @@ -88,6 +99,11 @@ def _create_job_spec(self, name, command=None, image=None,
KubernetesWorkflowRunManager.k8s_shared_volume
[REANA_STORAGE_BACKEND]
]
if MOUNT_CVMFS:
container.volume_mounts.extend(KubernetesWorkflowRunManager.
cvmfs_volume_mounts)
spec.template.spec.volumes.extend(KubernetesWorkflowRunManager.
cvmfs_volumes)
job.spec = spec
job.spec.template.spec.restart_policy = 'Never'
job.spec.ttl_seconds_after_finished = TTL_SECONDS_AFTER_FINISHED
Expand Down

0 comments on commit ce84610

Please sign in to comment.