/
utils.py
262 lines (208 loc) · 8.7 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018, 2019 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA-DB utils."""
import os
from uuid import UUID
def build_workspace_path(user_id, workflow_id=None):
"""Build user's workspace relative path.
:param user_id: Owner of the workspace.
:param workflow_id: Optional parameter, if provided gives the path to the
workflow workspace instead of just the path to the user workspace.
:return: String that represents the workspace relative path.
i.e. users/0000/workflows/0034
"""
workspace_path = os.path.join("users", str(user_id), "workflows")
if workflow_id:
workspace_path = os.path.join(workspace_path, str(workflow_id))
return workspace_path
def _get_workflow_with_uuid_or_name(uuid_or_name, user_uuid):
"""Get Workflow from database with uuid or name.
:param uuid_or_name: String representing a valid UUIDv4 or valid
Workflow name. Valid name contains only ASCII alphanumerics.
Name might be in format 'reana.workflow.123' with arbitrary
number of dot-delimited substrings, where last substring specifies
the run number of the workflow this workflow name refers to.
If name does not contain a valid run number, but it is a valid name,
workflow with latest run number of all the workflows with this name
is returned.
:type uuid_or_name: String
:rtype: reana-db.models.Workflow
"""
from reana_db.models import Workflow
# Check existence
if not uuid_or_name:
raise ValueError("No Workflow was specified.")
# Check validity
try:
uuid_or_name.encode("ascii")
except UnicodeEncodeError:
# `workflow_name` contains something else than just ASCII.
raise ValueError("Workflow name {} is not valid.".format(uuid_or_name))
# Check if UUIDv4
try:
# is_uuid = UUID(uuid_or_name, version=4)
is_uuid = UUID("{" + uuid_or_name + "}", version=4)
except (TypeError, ValueError):
is_uuid = None
if is_uuid:
# `uuid_or_name` is an UUIDv4.
# Search with it since it is expected to be unique.
return _get_workflow_by_uuid(uuid_or_name)
else:
# `uuid_or_name` is not and UUIDv4. Expect it is a name.
# Expect name might be in format 'reana.workflow.123' with arbitrary
# number of dot-delimited substring, where last substring specifies
# the run_number of the workflow this workflow name refers to.
# Possible candidates for names are e.g. :
# 'workflow_name' -> ValueError
# 'workflow.name' -> True, True
# 'workflow.name.123' -> True, True
# '123.' -> True, False
# '' -> ValueError
# '.123' -> False, True
# '..' -> False, False
# '123.12' -> True, True
# '123.12.' -> True, False
# Try to split the dot-separated string.
try:
workflow_name, run_number = uuid_or_name.split(".", maxsplit=1)
except ValueError:
# Couldn't split. Probably not a dot-separated string.
# -> Search with `uuid_or_name`
return _get_workflow_by_name(uuid_or_name, user_uuid)
# Check if `run_number` was specified
if not run_number:
# No `run_number` specified.
# -> Search by `workflow_name`
return _get_workflow_by_name(workflow_name, user_uuid)
# `run_number` was specified.
# Check `run_number` is valid.
try:
run_number = float(run_number)
except ValueError:
# `uuid_or_name` was split, so it is a dot-separated string
# but it didn't contain a valid `run_number`.
# Assume that this dot-separated string is the name of
# the workflow and search with it.
return _get_workflow_by_name(uuid_or_name, user_uuid)
# `run_number` is valid.
# Search by `run_number` since it is a primary key.
workflow = Workflow.query.filter(
Workflow.name == workflow_name,
Workflow.run_number == run_number,
Workflow.owner_id == user_uuid,
).one_or_none()
if not workflow:
raise ValueError(
"REANA_WORKON is set to {0}, but "
"that workflow does not exist. "
"Please set your REANA_WORKON environment "
"variable appropriately.".format(workflow_name, run_number)
)
return workflow
def _get_workflow_by_name(workflow_name, user_uuid):
"""From Workflows named as `workflow_name` the latest run_number.
Only use when you are sure that workflow_name is not UUIDv4.
:rtype: reana-db.models.Workflow
"""
from reana_db.models import Workflow
workflow = (
Workflow.query.filter(
Workflow.name == workflow_name, Workflow.owner_id == user_uuid
)
.order_by(Workflow.run_number.desc())
.first()
)
if not workflow:
raise ValueError(
"REANA_WORKON is set to {0}, but "
"that workflow does not exist. "
"Please set your REANA_WORKON environment "
"variable appropriately.".format(workflow_name)
)
return workflow
def _get_workflow_by_uuid(workflow_uuid):
"""Get Workflow with UUIDv4.
:param workflow_uuid: UUIDv4 of a Workflow.
:type workflow_uuid: String representing a valid UUIDv4.
:rtype: reana-db.models.Workflow
"""
from reana_db.models import Workflow
workflow = Workflow.query.filter(Workflow.id_ == workflow_uuid).first()
if not workflow:
raise ValueError(
"REANA_WORKON is set to {0}, but "
"that workflow does not exist. "
"Please set your REANA_WORKON environment "
"variable appropriately.".format(workflow_uuid)
)
return workflow
def update_users_disk_quota(user=None):
"""Update users disk quota usage.
:param user: User whose disk quota will be updated. If None, applies to all users.
:type user: reana_db.models.User
"""
from reana_commons.utils import get_disk_usage
from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.models import Resource, User, UserResource
users = [user] if user else User.query.all()
for u in users:
workspace_path = u.get_user_workspace()
disk_usage_bytes = get_disk_usage(
workspace_path, summarize=True, block_size="b"
)
disk_usage_bytes = int(disk_usage_bytes[0]["size"])
disk_resource = Resource.query.filter_by(
name=DEFAULT_QUOTA_RESOURCES["disk"]
).one_or_none()
if disk_resource:
from .database import Session
user_resource_quota = UserResource.query.filter_by(
user_id=u.id_, resource_id=disk_resource.id_
).first()
user_resource_quota.quota_used = disk_usage_bytes
Session.commit()
def get_default_quota_resource(resource_type):
"""
Get default quota resource by given resource type.
:param resource_type: Resource type corresponding to default resource to get.
:type resource_type: reana_db.models.ResourceType
"""
from reana_db.config import DEFAULT_QUOTA_RESOURCES
from reana_db.models import Resource
if resource_type not in DEFAULT_QUOTA_RESOURCES.keys():
raise Exception(f"Default resource of type {resource_type} does not exist.")
return Resource.query.filter_by(name=DEFAULT_QUOTA_RESOURCES[resource_type]).one()
def store_workflow_disk_quota(workflow):
"""
Update or create disk workflow resource.
:param workflow: Workflow whose disk resource usage must be calculated.
:type workflow: reana_db.models.Workflow
"""
from reana_commons.utils import get_disk_usage
from reana_db.database import Session
from reana_db.models import ResourceType, WorkflowResource
disk_resource = get_default_quota_resource(ResourceType.disk.name)
workflow_resource = (
Session.query(WorkflowResource)
.filter_by(workflow_id=workflow.id_, resource_id=disk_resource.id_)
.one_or_none()
)
disk_bytes = get_disk_usage(workflow.workspace_path, summarize=True, block_size="b")
disk_bytes = int(disk_bytes[0]["size"])
if workflow_resource:
workflow_resource.quantity_used = disk_bytes
else:
workflow_resource = WorkflowResource(
workflow_id=workflow.id_,
resource_id=disk_resource.id_,
quantity_used=disk_bytes,
)
Session.add(workflow_resource)
Session.commit()
return workflow_resource