Skip to content

Commit

Permalink
Revert "consumer: removes unused task"
Browse files Browse the repository at this point in the history
This reverts commit 441e28b.

* We are correctly using the
  [TTLSecondsAfterFinished](https://kubernetes.io/docs/concepts/workloads/controllers/ttlafterfinished),
  and it really works, but the subtle detail is that RJC never
  finishes. One can see this by checking how many ready containers
  are inside the pod, Workflow Engine finishes and RJC still runs,
  see:
  ```console
  $ kubectl get pod \
            batch-serial-a0f3f1c4-161d-461c-94e0-c4dae1000b2f-nv8bb -o json | \
    jq '.status.containerStatuses[] | .name + " " + (.ready|tostring)'
      "job-controller true"
      "workflow-engine false"
  ```
  Because of this we need to manually delete the batch- containers
  with the task that was removed in the previous commit
  (closes #258).
  • Loading branch information
Diego Rodriguez committed Oct 17, 2019
1 parent da1442b commit 2840cba
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import uuid

import requests
from kubernetes.client.rest import ApiException
from reana_commons.consumer import BaseConsumer
from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.utils import (calculate_file_access_time,
calculate_hash_of_dir,
Expand All @@ -25,6 +27,7 @@

from reana_workflow_controller.config import (PROGRESS_STATUSES,
REANA_GITLAB_URL, REANA_URL)
from reana_workflow_controller.errors import REANAWorkflowControllerError


class JobStatusConsumer(BaseConsumer):
Expand Down Expand Up @@ -71,6 +74,10 @@ def _update_workflow_status(workflow_uuid, status, logs):
.one_or_none()
if workflow.git_ref:
_update_commit_status(workflow, status)
alive_statuses = \
[WorkflowStatus.created, WorkflowStatus.running, WorkflowStatus.queued]
if status not in alive_statuses:
_delete_workflow_engine_pod(workflow_uuid)


def _update_commit_status(workflow, status):
Expand Down Expand Up @@ -175,3 +182,21 @@ def _update_job_cache(msg):
cached_job.result_path = msg['caching_info'].get('result_path')
cached_job.workspace_hash = workspace_hash
Session.add(cached_job)


def _delete_workflow_engine_pod(workflow_uuid):
"""Delete workflow engine pod."""
try:
jobs = current_k8s_batchv1_api_client.list_namespaced_job(
namespace='default',
)
for job in jobs.items:
if workflow_uuid in job.metadata.name:
current_k8s_batchv1_api_client.delete_namespaced_job(
namespace='default',
propagation_policy="Background",
name=job.metadata.name)
break
except ApiException as e:
raise REANAWorkflowControllerError(
"Workflow engine pod cound not be deleted {}.".format(e))

0 comments on commit 2840cba

Please sign in to comment.