Skip to content

Commit

Permalink
workflow-run-manager: centralise int. wf. naming
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego Rodriguez committed Mar 25, 2020
1 parent 274aed1 commit 79a64bc
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 48 deletions.
14 changes: 0 additions & 14 deletions reana_workflow_controller/config.py
Expand Up @@ -15,20 +15,6 @@

from reana_workflow_controller.version import __version__

BROKER_URL = os.getenv('RABBIT_MQ_URL',
'message-broker.default.svc.cluster.local')

BROKER_USER = os.getenv('RABBIT_MQ_USER', 'test')

BROKER_PASS = os.getenv('RABBIT_MQ_PASS', '1234')

BROKER = os.getenv('RABBIT_MQ', 'amqp://{0}:{1}@{2}//'.format(BROKER_USER,
BROKER_PASS,
BROKER_URL))

BROKER_PORT = os.getenv('RABBIT_MQ_PORT', 5672)


SQLALCHEMY_TRACK_MODIFICATIONS = False
"""Track modifications flag."""

Expand Down
1 change: 1 addition & 0 deletions reana_workflow_controller/k8s.py
Expand Up @@ -53,6 +53,7 @@ def __init__(self, deployment_name, workspace, image, port, path,
self.cvmfs_repos = cvmfs_repos or []
metadata = client.V1ObjectMeta(
name=deployment_name,
labels={'reana_workflow_mode': 'session'},
)
self.kubernetes_objects = {
"ingress": self._build_ingress(metadata),
Expand Down
68 changes: 34 additions & 34 deletions reana_workflow_controller/workflow_run_manager.py
Expand Up @@ -18,14 +18,17 @@
INTERACTIVE_SESSION_TYPES,
K8S_CERN_EOS_AVAILABLE,
K8S_REANA_SERVICE_ACCOUNT_NAME,
REANA_COMPONENT_NAMING_SCHEME,
REANA_COMPONENT_PREFIX,
REANA_STORAGE_BACKEND, SHARED_VOLUME_PATH,
WORKFLOW_RUNTIME_USER_GID,
WORKFLOW_RUNTIME_USER_NAME,
WORKFLOW_RUNTIME_USER_UID)
from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.k8s.volumes import get_shared_volume
from reana_commons.utils import (create_cvmfs_persistent_volume_claim,
from reana_commons.utils import (build_unique_component_name,
create_cvmfs_persistent_volume_claim,
create_cvmfs_storage_class, format_cmd)
from reana_db.config import SQLALCHEMY_DATABASE_URI
from reana_db.database import Session
Expand Down Expand Up @@ -96,29 +99,13 @@ def __init__(self, workflow):
"""
self.workflow = workflow

def _workflow_run_name_generator(self, mode, type=None):
def _workflow_run_name_generator(self, mode):
"""Generate the name to be given to a workflow run.
In the case of Kubernetes, this should allow administrators to be able
to easily find workflow runs i.e.:
.. code-block:: console
$ kubectl get pods | grep batch
batch-serial-64594f48ff-5mgbz 0/1 Running 0 1m
batch-cwl-857bb969bb-john-64f97d955d-jklcw 0/1 Running 0 16h
$ kubectl get pods | grep interactive
interactive-yadage-7fbb558577-xdxn8 0/1 Running 0 30m
:param mode: Mode in which the workflow runs, like ``batch`` or
``interactive``.
:param mode: Mode in which the workflow runs: ``workflow`` or
``session``.
"""
type_ = type or self.workflow.type_
return 'reana-{mode}-{workflow_type}-{workflow_id}'.format(
mode=mode,
workflow_id=self.workflow.id_,
workflow_type=type_,
)
return build_unique_component_name(f'run-{mode}', self.workflow.id_)

def _generate_interactive_workflow_path(self):
"""Generate the path to access the interactive workflow."""
Expand Down Expand Up @@ -281,8 +268,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
self.workflow.interactive_session_type = interactive_session_type
self.workflow.interactive_session = access_path
workflow_run_name = \
self._workflow_run_name_generator(
'interactive', type=interactive_session_type)
self._workflow_run_name_generator('session')
self.workflow.interactive_session_name = workflow_run_name
kubernetes_objects = \
build_interactive_k8s_objects[interactive_session_type](
Expand Down Expand Up @@ -412,17 +398,25 @@ def _create_job_spec(self, name, command=None, image=None,
volume_mounts=[],
command=['/bin/bash', '-c'],
args=command)
job_controller_address = [
workflow_engine_env_vars.extend([
{
'name': 'REANA_JOB_CONTROLLER_SERVICE_PORT_HTTP',
'value':
str(current_app.config['JOB_CONTROLLER_CONTAINER_PORT'])
},
{
'name': 'REANA_JOB_CONTROLLER_SERVICE_HOST',
'value': 'localhost'}
]
workflow_engine_env_vars.extend(job_controller_address)
'value': 'localhost'
},
{
'name': 'REANA_COMPONENT_PREFIX',
'value': REANA_COMPONENT_PREFIX
},
{
'name': 'REANA_COMPONENT_NAMING_SCHEME',
'value': REANA_COMPONENT_NAMING_SCHEME,
},
])
workflow_engine_container.env.extend(workflow_engine_env_vars)
workflow_engine_container.security_context = \
client.V1SecurityContext(
Expand Down Expand Up @@ -466,20 +460,26 @@ def _create_job_spec(self, name, command=None, image=None,
{
'name': 'IMAGE_PULL_SECRETS',
'value': ','.join(IMAGE_PULL_SECRETS)
}
])
job_controller_container.env.extend(job_controller_env_vars)
job_controller_container.env.extend(job_controller_env_secrets)
job_controller_container.env.extend([
},
{
'name': 'REANA_SQLALCHEMY_DATABASE_URI',
'value': SQLALCHEMY_DATABASE_URI
},
{
'name': 'REANA_STORAGE_BACKEND',
'value': REANA_STORAGE_BACKEND
}
])
},
{
'name': 'REANA_COMPONENT_PREFIX',
'value': REANA_COMPONENT_PREFIX
},
{
'name': 'REANA_COMPONENT_NAMING_SCHEME',
'value': REANA_COMPONENT_NAMING_SCHEME,
},
])
job_controller_container.env.extend(job_controller_env_vars)
job_controller_container.env.extend(job_controller_env_secrets)

secrets_volume_mount = \
secrets_store.get_secrets_volume_mount_as_k8s_spec()
Expand Down

0 comments on commit 79a64bc

Please sign in to comment.