Skip to content

Commit

Permalink
Merge 6944756 into c03aa75
Browse files Browse the repository at this point in the history
  • Loading branch information
Diego committed Oct 27, 2020
2 parents c03aa75 + 6944756 commit 13bf989
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 42 deletions.
68 changes: 42 additions & 26 deletions reana_db/models.py
Expand Up @@ -575,43 +575,59 @@ def can_transition_to(self, next_status):
current_transition = (self.status, next_status)
return current_transition in ALLOWED_WORKFLOW_STATUS_TRANSITIONS

def update_workflow_timestamp(self, new_status):
"""Update workflow timestamps according to new status."""
from .database import Session

if new_status in [
RunStatus.finished,
RunStatus.failed,
]:
self.run_finished_at = datetime.now()
elif new_status in [RunStatus.stopped]:
self.run_stopped_at = datetime.now()
elif new_status in [RunStatus.running]:
self.run_started_at = datetime.now()
Session.commit()


@event.listens_for(Workflow.status, "set")
def workflow_status_change_listener(workflow, new_status, old_status, initiator):
"""Workflow status change listener."""
from .database import Session

def _update_disk_quota(workflow):
update_users_disk_quota(user=workflow.owner)
store_workflow_disk_quota(workflow)

def _update_cpu_quota(workflow):
terminated_at = workflow.run_finished_at or workflow.run_stopped_at
if workflow.run_started_at and terminated_at:
cpu_time = terminated_at - workflow.run_started_at
cpu_milliseconds = int(cpu_time.total_seconds() * 1000)
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=cpu_resource.id_,
quantity_used=cpu_milliseconds,
)
user_resource_quota = UserResource.query.filter_by(
user_id=workflow.owner_id, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used += cpu_milliseconds
Session.add(workflow_resource)
Session.commit()

workflow.update_workflow_timestamp(new_status)
if new_status in [
RunStatus.finished,
RunStatus.failed,
RunStatus.stopped,
]:
workflow.run_finished_at = datetime.now()
elif new_status in [RunStatus.stopped]:
workflow.run_stopped_at = datetime.now()
elif new_status in [RunStatus.running]:
workflow.run_started_at = datetime.now()
_update_cpu_quota(workflow)
_update_disk_quota(workflow)
elif new_status in [RunStatus.deleted]:
update_users_disk_quota(user=workflow.owner)
return new_status

finished_at = workflow.run_finished_at or workflow.run_stopped_at
if workflow.run_started_at and finished_at:
cpu_time = finished_at - workflow.run_started_at
cpu_milliseconds = int(cpu_time.total_seconds() * 1000)
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=cpu_resource.id_,
quantity_used=cpu_milliseconds,
)
user_resource_quota = UserResource.query.filter_by(
user_id=workflow.owner_id, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used += cpu_milliseconds
Session.add(workflow_resource)
Session.commit()
update_users_disk_quota(user=workflow.owner)
store_workflow_disk_quota(workflow)
_update_disk_quota(workflow)

return new_status

Expand Down
15 changes: 11 additions & 4 deletions reana_db/utils.py
Expand Up @@ -10,6 +10,8 @@
import os
from uuid import UUID

from sqlalchemy import inspect


def build_workspace_path(user_id, workflow_id=None):
"""Build user's workspace relative path.
Expand Down Expand Up @@ -235,6 +237,7 @@ def store_workflow_disk_quota(workflow):
:param workflow: Workflow whose disk resource usage must be calculated.
:type workflow: reana_db.models.Workflow
"""
from reana_commons.errors import REANAMissingWorkspaceError
from reana_commons.utils import get_disk_usage

from reana_db.database import Session
Expand All @@ -246,12 +249,16 @@ def store_workflow_disk_quota(workflow):
.filter_by(workflow_id=workflow.id_, resource_id=disk_resource.id_)
.one_or_none()
)

disk_bytes = get_disk_usage(workflow.workspace_path, summarize=True, block_size="b")
disk_bytes = int(disk_bytes[0]["size"])
try:
disk_bytes = get_disk_usage(
workflow.workspace_path, summarize=True, block_size="b"
)
disk_bytes = int(disk_bytes[0]["size"])
except REANAMissingWorkspaceError as e:
disk_bytes = 0
if workflow_resource:
workflow_resource.quantity_used = disk_bytes
else:
elif inspect(workflow).persistent:
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=disk_resource.id_,
Expand Down
25 changes: 13 additions & 12 deletions tests/conftest.py
Expand Up @@ -9,18 +9,13 @@
"""Pytest configuration for REANA-DB."""


from datetime import datetime
from time import sleep
from datetime import datetime, timedelta
from uuid import uuid4

import mock
import pytest

from reana_db.models import (
Resource,
User,
Workflow,
RunStatus,
)
from reana_db.models import Resource, RunStatus, User, Workflow


@pytest.fixture(scope="module")
Expand All @@ -33,7 +28,7 @@ def db():


@pytest.fixture
def new_user(session):
def new_user(session, db):
"""Create new user."""
user = User(
email="{}@reana.io".format(uuid4()), access_token="secretkey-{}".format(uuid4())
Expand Down Expand Up @@ -64,9 +59,15 @@ def _run_workflow(time_elapsed_seconds=0.5):
workflow.status = RunStatus.running
session.add(workflow)
session.commit()
# simulate time elapsed
sleep(time_elapsed_seconds)
Workflow.update_workflow_status(session, workflow.id_, RunStatus.finished)
termination_value = datetime.now() + timedelta(seconds=time_elapsed_seconds)

class MockDatetime(datetime):
@classmethod
def now(cls):
return termination_value

with mock.patch("reana_db.models.datetime", MockDatetime):
Workflow.update_workflow_status(session, workflow.id_, RunStatus.finished)
return workflow

return _run_workflow

0 comments on commit 13bf989

Please sign in to comment.