Skip to content

Commit

Permalink
models: calculate disk workflow resource usage
Browse files Browse the repository at this point in the history
closes #93
  • Loading branch information
mvidalgarcia committed Oct 19, 2020
1 parent f2360fa commit eb76b1f
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 30 deletions.
74 changes: 53 additions & 21 deletions reana_db/models.py
Expand Up @@ -16,9 +16,6 @@
from datetime import datetime

from reana_commons.utils import get_disk_usage
from reana_db.config import DB_SECRET_KEY, DEFAULT_QUOTA_RESOURCES
from reana_db.utils import build_workspace_path, update_users_disk_quota
from requests.sessions import session
from sqlalchemy import (
BigInteger,
Boolean,
Expand All @@ -40,6 +37,14 @@
from sqlalchemy_utils.models import Timestamp
from sqlalchemy_utils.types.encrypted.encrypted_type import AesEngine

from reana_db.config import DB_SECRET_KEY, DEFAULT_QUOTA_RESOURCES
from reana_db.utils import (
build_workspace_path,
store_workflow_disk_quota,
get_default_quota_resource,
update_users_disk_quota,
)

Base = declarative_base()


Expand Down Expand Up @@ -508,7 +513,36 @@ def get_full_workflow_name(self):

def get_workspace_disk_usage(self, summarize=False, block_size=None):
"""Retrieve disk usage information of a workspace."""
return get_disk_usage(self.workspace_path, summarize, block_size)
from .database import Session

def _format_disk_usage(bytes_, block_size):
"""Format disk usage according to passed args."""
if block_size:
# TODO: Change when https://github.com/reanahub/reana-server/issues/296
# gets implemented.
size = (
f"{bytes_}B" if block_size == "b" else f"{round(bytes_/1024, 2)}K"
)
else:
size = str(bytes_)
return [{"name": "", "size": size}]

if not summarize:
# size break down per directory so we can't query DB (`r-client du`)
return get_disk_usage(self.workspace_path, summarize, block_size)

disk_resource = get_default_quota_resource(ResourceType.disk.name)
disk_bytes = (
Session.query(WorkflowResource.quantity_used)
.filter_by(workflow_id=self.id_, resource_id=disk_resource.id_)
.scalar()
)
if disk_bytes:
return _format_disk_usage(disk_bytes, block_size)

# recalculate disk workflow resource
workflow_resource = store_workflow_disk_quota(self)
return _format_disk_usage(workflow_resource.quantity_used, block_size)

@staticmethod
def update_workflow_status(
Expand Down Expand Up @@ -545,6 +579,8 @@ def can_transition_to(self, next_status):
@event.listens_for(Workflow.status, "set")
def workflow_status_change_listener(workflow, new_status, old_status, initiator):
"""Workflow status change listener."""
from .database import Session

if new_status in [
RunStatus.finished,
RunStatus.failed,
Expand All @@ -562,24 +598,20 @@ def workflow_status_change_listener(workflow, new_status, old_status, initiator)
if workflow.run_started_at and finished_at:
cpu_time = finished_at - workflow.run_started_at
cpu_milliseconds = int(cpu_time.total_seconds() * 1000)
cpu_resource = Resource.query.filter_by(
name=DEFAULT_QUOTA_RESOURCES["cpu"]
).one_or_none()
if cpu_resource:
from .database import Session

workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=cpu_resource.id_,
quantity_used=cpu_milliseconds,
)
user_resource_quota = UserResource.query.filter_by(
user_id=workflow.owner_id, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used += cpu_milliseconds
Session.add(workflow_resource)
Session.commit()
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=cpu_resource.id_,
quantity_used=cpu_milliseconds,
)
user_resource_quota = UserResource.query.filter_by(
user_id=workflow.owner_id, resource_id=cpu_resource.id_
).first()
user_resource_quota.quota_used += cpu_milliseconds
Session.add(workflow_resource)
Session.commit()
update_users_disk_quota(user=workflow.owner)
store_workflow_disk_quota(workflow)

return new_status

Expand Down
50 changes: 50 additions & 0 deletions reana_db/utils.py
Expand Up @@ -210,3 +210,53 @@ def update_users_disk_quota(user=None):
).first()
user_resource_quota.quota_used = disk_usage_bytes
Session.commit()


def get_default_quota_resource(resource_type):
"""
Get default quota resource by given resource type.
:param resource_type: Resource type corresponding to default resource to get.
:type resource_type: reana_db.models.ResourceType
"""
from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.models import Resource

if resource_type not in DEFAULT_QUOTA_RESOURCES.keys():
raise Exception(f"Default resource of type {resource_type} does not exist.")

return Resource.query.filter_by(name=DEFAULT_QUOTA_RESOURCES[resource_type]).one()


def store_workflow_disk_quota(workflow):
"""
Update or create disk workflow resource.
:param workflow: Workflow whose disk resource usage must be calculated.
:type workflow: reana_db.models.Workflow
"""
from reana_commons.utils import get_disk_usage

from reana_db.database import Session
from reana_db.models import ResourceType, WorkflowResource

disk_resource = get_default_quota_resource(ResourceType.disk.name)
workflow_resource = (
Session.query(WorkflowResource)
.filter_by(workflow_id=workflow.id_, resource_id=disk_resource.id_)
.one_or_none()
)

disk_bytes = get_disk_usage(workflow.workspace_path, summarize=True, block_size="b")
disk_bytes = int(disk_bytes[0]["size"])
if workflow_resource:
workflow_resource.quantity_used = disk_bytes
else:
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=disk_resource.id_,
quantity_used=disk_bytes,
)
Session.add(workflow_resource)
Session.commit()
return workflow_resource
4 changes: 0 additions & 4 deletions tests/conftest.py
Expand Up @@ -15,13 +15,9 @@

import pytest

from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.models import (
Resource,
ResourceType,
ResourceUnit,
User,
UserResource,
Workflow,
RunStatus,
)
Expand Down
9 changes: 4 additions & 5 deletions tests/test_models.py
Expand Up @@ -13,19 +13,20 @@
import pytest
from mock import patch

from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.models import (
ALLOWED_WORKFLOW_STATUS_TRANSITIONS,
AuditLogAction,
Resource,
ResourceUnit,
ResourceType,
UserTokenStatus,
UserTokenType,
Workflow,
WorkflowResource,
RunStatus,
)

from reana_db.utils import get_default_quota_resource


def test_workflow_run_number_assignment(db, session, new_user):
"""Test workflow run number assignment."""
Expand Down Expand Up @@ -197,9 +198,7 @@ def test_workflow_cpu_quota_usage_update(db, session, run_workflow):
"""Test quota usage update once workflow is finished/stopped/failed."""
time_elapsed_seconds = 0.5
workflow = run_workflow(time_elapsed_seconds=time_elapsed_seconds)
cpu_resource = Resource.query.filter_by(
name=DEFAULT_QUOTA_RESOURCES["cpu"]
).one_or_none()
cpu_resource = get_default_quota_resource(ResourceType.cpu.name)
cpu_milliseconds = (
WorkflowResource.query.filter_by(
workflow_id=workflow.id_, resource_id=cpu_resource.id_
Expand Down

0 comments on commit eb76b1f

Please sign in to comment.