Skip to content

Commit

Permalink
Merge 38f2258 into cc84399
Browse files Browse the repository at this point in the history
  • Loading branch information
roksys committed Jul 10, 2019
2 parents cc84399 + 38f2258 commit a93d66b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 19 deletions.
15 changes: 15 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,18 @@

WORKFLOW_ENGINE_NAME = 'workflow-engine'
"""Default workflow engine container name."""

BATCH_CONTAINER_USER_NAME = os.getenv(
'BATCH_CONTAINER_USER_NAME',
'reana')
"""Default OS user name for running job controller."""

BATCH_CONTAINER_USER_ID = os.getenv(
'BATCH_CONTAINER_USER_ID',
1000)
"""Default user id for running job controller and workflow engine apps."""

BATCH_CONTAINER_GROUP_ID = os.getenv(
'BATCH_CONTAINER_GROUP_ID',
0)
"""Default group id for running job controller and workflow engine apps."""
75 changes: 56 additions & 19 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""Workflow run manager interface."""
import base64
import json
import logging
import os

from flask import current_app
Expand All @@ -17,9 +18,8 @@
from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import (create_cvmfs_persistent_volume_claim,
create_cvmfs_storage_class)
create_cvmfs_storage_class, format_cmd)
from reana_db.database import Session
from reana_db.models import User

from reana_workflow_controller.errors import (REANAInteractiveSessionError,
REANAWorkflowControllerError)
Expand All @@ -35,6 +35,7 @@
REANA_WORKFLOW_ENGINE_IMAGE_SERIAL,
REANA_WORKFLOW_ENGINE_IMAGE_YADAGE,
SHARED_FS_MAPPING,
SHARED_VOLUME_PATH,
TTL_SECONDS_AFTER_FINISHED,
WORKFLOW_ENGINE_COMMON_ENV_VARS,
DEBUG_ENV_VARS)
Expand Down Expand Up @@ -206,8 +207,10 @@ def start_batch_workflow_run(self):
namespace=KubernetesWorkflowRunManager.default_namespace,
body=job)
except ApiException as e:
raise REANAWorkflowControllerError(
"Workflow engine pod cound not be created {}.".format(e))
msg = 'Workflow engine/job controller pod ' \
'creation failed {}'.format(e)
logging.error(msg, exc_info=True)
raise REANAWorkflowControllerError(e)

def start_interactive_session(self, interactive_session_type, **kwargs):
"""Start an interactive workflow run.
Expand Down Expand Up @@ -311,14 +314,10 @@ def _create_job_spec(self, name, command=None, image=None,
"""
image = image or self._workflow_engine_image()
command = command or self._workflow_engine_command()
env_vars = env_vars or self._workflow_engine_env_vars()
owner_id = str(self.workflow.owner_id)
workflow_engine_env_vars = env_vars or self._workflow_engine_env_vars()
job_controller_env_vars = []
if isinstance(command, str):
command = [command]
elif not isinstance(command, list):
raise ValueError('Command should be a list or a string and not {}'
.format(type(command)))
owner_id = str(self.workflow.owner_id)
command = format_cmd(command)

workflow_metadata = client.V1ObjectMeta(name=name)
job = client.V1Job()
Expand All @@ -328,6 +327,7 @@ def _create_job_spec(self, name, command=None, image=None,
spec = client.V1JobSpec(
template=client.V1PodTemplateSpec())
spec.template.metadata = workflow_metadata

workflow_enginge_container = client.V1Container(
name=current_app.config['WORKFLOW_ENGINE_NAME'],
image=image,
Expand All @@ -336,7 +336,7 @@ def _create_job_spec(self, name, command=None, image=None,
volume_mounts=[],
command=['/bin/bash', '-c'],
args=command)
job_controller_vars = [
job_controller_address = [
{
'name': 'JOB_CONTROLLER_SERVICE_PORT_HTTP',
'value':
Expand All @@ -346,13 +346,18 @@ def _create_job_spec(self, name, command=None, image=None,
'name': 'JOB_CONTROLLER_SERVICE_HOST',
'value': 'localhost'}
]
env_vars.extend(job_controller_vars)
workflow_enginge_container.env.extend(env_vars)
workflow_engine_env_vars.extend(job_controller_address)
workflow_enginge_container.env.extend(workflow_engine_env_vars)
workflow_enginge_container.security_context = \
client.V1SecurityContext(
run_as_group=current_app.config['BATCH_CONTAINER_GROUP_ID'],
run_as_user=current_app.config['BATCH_CONTAINER_USER_ID']
)
workflow_enginge_container.volume_mounts = [
{
'name': 'default-shared-volume',
'mountPath': SHARED_FS_MAPPING['MOUNT_DEST_PATH'],
},
}
]

job_controller_container = client.V1Container(
Expand All @@ -361,14 +366,26 @@ def _create_job_spec(self, name, command=None, image=None,
image_pull_policy='IfNotPresent',
env=[],
volume_mounts=[],
command=['/bin/bash', '-c'],
args=self._create_job_controller_startup_cmd(
current_app.config['BATCH_CONTAINER_USER_NAME']),
ports=[])

if os.getenv('FLASK_ENV') == 'development':
job_controller_env_vars.extend(
current_app.config['DEBUG_ENV_VARS'])
job_controller_env_vars.extend([{
'name': 'REANA_USER_ID',
'value': owner_id
}])
job_controller_env_vars.extend([
{
'name': 'REANA_USER_ID',
'value': owner_id
}, {
'name': 'CERN_USER',
'value': current_app.config['BATCH_CONTAINER_USER_NAME']
}, {
'name': 'USER', # Required by HTCondor
'value': current_app.config['BATCH_CONTAINER_USER_NAME']
}
])

secrets_store = REANAUserSecretsStore(owner_id)
user_secrets = secrets_store.get_secrets()
Expand All @@ -390,6 +407,7 @@ def _create_job_spec(self, name, command=None, image=None,

job_controller_container.env.extend(job_controller_env_vars)
job_controller_container.env.extend(job_controller_env_secrets)

job_controller_container.volume_mounts = [
{
'name': 'default-shared-volume',
Expand Down Expand Up @@ -418,8 +436,27 @@ def _create_job_spec(self, name, command=None, image=None,
}
},
]

job.spec = spec
job.spec.template.spec.restart_policy = 'Never'
job.spec.ttl_seconds_after_finished = TTL_SECONDS_AFTER_FINISHED
job.spec.backoff_limit = 0
return job

def _create_job_controller_startup_cmd(self, user=None):
"""Create job controller startup cmd."""
base_cmd = 'flask run -h 0.0.0.0;'
if user:
add_user_cmd = 'useradd -u {} -g {} -M {};'.format(
current_app.config['BATCH_CONTAINER_USER_ID'],
current_app.config['BATCH_CONTAINER_GROUP_ID'],
user)
chown_workspace_cmd = 'chown -R {} {};'.format(
current_app.config['BATCH_CONTAINER_USER_ID'],
SHARED_VOLUME_PATH + '/' + self.workflow.get_workspace()
)
run_app_cmd = 'su {} /bin/bash -c "{}"'.format(user, base_cmd)
full_cmd = add_user_cmd + chown_workspace_cmd + run_app_cmd
return [full_cmd]
else:
return base_cmd.split()

0 comments on commit a93d66b

Please sign in to comment.