/
datastore.py
470 lines (394 loc) · 17.9 KB
/
datastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
"""
Datastore abstraction for our database.
"""
import logging
import psycopg2
from functools import wraps
from psycopg2 import DataError, DatabaseError, IntegrityError, ProgrammingError
from psycopg2.errors import InsufficientPrivilege
from typing import Any
from uuid import UUID
from werkzeug.exceptions import Forbidden, NotFound
from .. import db
from ..db import find_identifier, upsert_sample
from ..db.session import DatabaseSession
from .exceptions import AuthenticationRequired, BadRequest
from .utils import export
LOG = logging.getLogger(__name__)
def catch_permission_denied(function):
"""
Decorator to catch :class:`psycopg2.ProgrammingError` exceptions with the
``INSUFFICIENT_PRIVILEGE`` error code and rethrow them as
:class:`~werkzeug.exceptions.Forbidden` exceptions instead.
"""
@wraps(function)
def decorated(*args, **kwargs):
try:
return function(*args, **kwargs)
except InsufficientPrivilege as error:
LOG.error("Forbidden: %s", error)
raise Forbidden()
return decorated
@export
def login(username: str, password: str) -> DatabaseSession:
"""
Creates a new database session authenticated as the given user.
Returns an opaque session object which other functions in this module
require.
"""
LOG.debug(f"Logging into PostgreSQL database as '{username}'")
try:
return DatabaseSession(username = username, password = password)
except DatabaseError as error:
raise AuthenticationRequired() from None
@export
@catch_permission_denied
def store_enrollment(session: DatabaseSession, document: str) -> None:
"""
Store the given enrollment JSON *document* (a **string**) in the backing
database using *session*.
Raises a :class:`BadRequestDatabaseError` exception if the given *document*
isn't valid and a :class:`Forbidden` exception if the database reports a
`permission denied` error.
"""
with session, session.cursor() as cursor:
try:
cursor.execute(
"INSERT INTO receiving.enrollment (document) VALUES (%s)",
(document,))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
@export
@catch_permission_denied
def store_presence_absence(session: DatabaseSession, document: str) -> None:
"""
Store the given presence/absence *document* (a **string**) in the backing
database using *session*.
Raises a :class:`BadRequestDatabaseError` exception if the given *document*
isn't valid and a :class:`Forbidden` exception if the database reports a
`permission denied` error.
"""
with session, session.cursor() as cursor:
try:
cursor.execute(
"insert into receiving.presence_absence (document) VALUES (%s)",
(document,))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
@export
@catch_permission_denied
def store_sequence_read_set(session: DatabaseSession, document: str) -> None:
"""
Store the given sequence read set *document* (a **string**) in the backing
database using *session*.
Raises a :class:`BadRequestDatabaseError` exception if the given *document*
isn't valid and a :class:`Forbidden` exception if the database reports a
`permission denied` error.
"""
with session, session.cursor() as cursor:
try:
cursor.execute(
"insert into receiving.sequence_read_set (document) values (%s)",
(document,))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
@export
@catch_permission_denied
def store_consensus_genome(session: DatabaseSession, document: str) -> None:
"""
Store the given consensus genome *document* (a **string**) in the backing
database using *session*.
Raises a :class:`BadRequestDatabaseError` exception if the given *document*
isn't valid and a :class:`Forbidden` exception if the database reports a
`permission denied` error.
"""
with session, session.cursor() as cursor:
try:
cursor.execute(
"insert into receiving.consensus_genome (document) values (%s)",
(document,))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
@export
@catch_permission_denied
def store_redcap_det(session: DatabaseSession, document: str) -> None:
"""
Store the given REDCap DET *document* (a **string**) in the backing
database using *session*.
Raises a :class:`BadRequestDatabaseError` exception if the given *document*
isn't valid and a :class:`Forbidden` exception if the database reports a
`permission denied` error.
"""
with session, session.cursor() as cursor:
try:
cursor.execute(
"insert into receiving.redcap_det (document) values (%s)",
(document,))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
@export
@catch_permission_denied
def store_fhir(session: DatabaseSession, document: str) -> None:
"""
Store the given FHIR *document* (a **string**) in the backing
database using *session*.
Raises a :class:`BadRequestDatabaseError` exception if the given *document*
isn't valid and a :class:`Forbidden` exception if the database reports a
`permission denied` error.
"""
with session, session.cursor() as cursor:
try:
cursor.execute(
"insert into receiving.fhir (document) values (%s)",
(document,))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
@export
@catch_permission_denied
def verify_barcode_use_list(session: DatabaseSession, barcode_use_list: list) -> Any:
"""
Check the given *barcode_use_list* containing objects with ``barcode`` and ``use``
keys and values to verify that each barcode exists in the backing database and that the
given use matches the stored use.
Returns a list of objects in the same order as the input, with each object including the
``barcode`` (string) and ``use`` (string) being verified, ``barcode_found`` (boolean)
indicating whether the given barcode exists, and ``use_match`` (boolean) indicating whether
the given use matches the stored use. The ``use_match`` value will be `null` if the barcode
does not exist.
"""
barcode_use_tuples = [(bu["barcode"],bu["use"]) for bu in barcode_use_list]
args_str = ','.join(['%s'] * len(barcode_use_tuples))
sql = "select q.barcode, q.use, \
case \
when identifier.barcode is not null then true else false \
end as barcode_found, \
case \
when identifier_set.use IS NULL then null \
when q.use::citext=identifier_set.use then true \
else false \
end as use_match \
from (values {}) as q (barcode, use) \
left join warehouse.identifier on q.barcode::citext = identifier.barcode \
left join warehouse.identifier_set using (identifier_set_id)".format(args_str)
result = session.fetch_all(sql, tuple(barcode_use_tuples))
return result
@export
@catch_permission_denied
def fetch_identifier(session: DatabaseSession, id: str) -> Any:
"""
Fetch the identifier *id* from the backing database using *session*.
*id* may be a full UUID or shortened barcode.
Returns a named tuple with ``uuid``, ``barcode``, ``generated``, ``set``,
and ``use`` attributes. If the identifier doesn't exist, raises a
:class:`~werkzeug.exceptions.NotFound` exception.
"""
try:
uuid = UUID(id)
id_field = "uuid"
except ValueError:
id_field = "barcode"
with session:
identifier = session.fetch_row(f"""
select uuid, barcode, generated, identifier_set.name as set, identifier_set.use
from warehouse.identifier
join warehouse.identifier_set using (identifier_set_id)
where {id_field} = %s
""", (id,))
if not identifier:
LOG.error(f"Identifier {id_field} «{id}» not found")
raise NotFound(f"Identifier {id_field} «{id}» not found")
return identifier
@export
@catch_permission_denied
def fetch_identifier_sets(session: DatabaseSession) -> Any:
"""
Fetch all identifier sets from the backing database using *session*.
Returns a list of named tuples with ``name``, ``description``, and ``use``
attributes.
"""
with session, session.cursor() as cursor:
cursor.execute("""
select name, description, use
from warehouse.identifier_set
""")
return list(cursor)
@export
@catch_permission_denied
def fetch_identifier_set(session: DatabaseSession, name: str) -> Any:
"""
Fetch the identifier set *name* from the backing database using *session*.
Returns a named tuple with ``name``, ``description``, and ``use`` attributes.
If the set doesn't exist, raises a :class:`~werkzeug.exceptions.NotFound`
exception.
"""
with session:
set = session.fetch_row("""
select name, description, use
from warehouse.identifier_set
where name = %s
""", (name,))
if not set:
LOG.error(f"Identifier set «{name}» not found")
raise NotFound(f"Identifier set «{name}» not found")
return set
@export
@catch_permission_denied
def make_identifier_set(session: DatabaseSession, name: str, **fields) -> bool:
"""
Create a new identifier set *name* in the backing database using *session*
if it doesn't already exist, or update if it does exist.
If *use* and/or *description* are provided as keyword arguments, their values
are set in the database. Becuase *use* is a required field in the target table,
if it is not provided as a keyword argument the query will attempt to retrieve
its value from an existing record.
Returns ``True`` if the set was created or updated and ``False`` if it
already existed and was not updated.
Raises a :class:`BadRequestDatabaseError` exception if the database reports a
`constraint` error and a :class:`Forbidden` exception if the database reports a
`permission denied` error.
"""
with session, session.cursor() as cursor:
if "use" in fields and "description" in fields:
try:
cursor.execute("""
insert into warehouse.identifier_set (name, use, description)
values (%s, %s, nullif(%s, ''))
on conflict (name) do update
set use = excluded.use,
description = excluded.description
where identifier_set.use <> excluded.use
or coalesce(identifier_set.description,'') <> coalesce(excluded.description,'')
""", (name, fields["use"], fields["description"]))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
elif "use" in fields:
try:
cursor.execute("""
insert into warehouse.identifier_set (name, use)
values (%s, %s)
on conflict (name) do update
set use = excluded.use
where identifier_set.use <> excluded.use
""", (name, fields["use"]))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
elif "description" in fields:
try:
cursor.execute("""
insert into warehouse.identifier_set (name, use, description)
select s.name, t.use, s.description
from (values(%s, nullif(%s,''))) s(name, description)
left join (
select name, use
FROM warehouse.identifier_set WHERE name = %s
) t using (name)
on conflict (name) do update
set use = excluded.use, description = excluded.description
where identifier_set.use <> excluded.use
or coalesce(identifier_set.description,'') <> coalesce(excluded.description,'')
""", (name, fields["description"], name))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
else:
try:
cursor.execute("""
insert into warehouse.identifier_set (name, use)
select s.name, t.use
from (values(%s)) s(name)
left join (
select name, use
FROM warehouse.identifier_set WHERE name = %s
) t using (name)
on conflict (name) do update
set use = excluded.use
where identifier_set.use <> excluded.use
""", (name, name))
except (DataError, IntegrityError) as error:
raise BadRequestDatabaseError(error) from None
return cursor.rowcount == 1
@export
@catch_permission_denied
def fetch_identifier_set_uses(session: DatabaseSession) -> Any:
"""
Fetch all identifier set uses from the backing database using *session*.
Returns a list of named tuples with ``use`` and ``description`` attributes.
"""
with session, session.cursor() as cursor:
cursor.execute("""
select use, description
from warehouse.identifier_set_use
""")
return list(cursor)
@export
@catch_permission_denied
def store_sample(session: DatabaseSession, sample: dict) -> Any:
""""
Validate the given *sample* and insert or update in the backing database.
Returns a list of in the same order as the input, with each object including
the ``sample_id`` (string), ``status`` (string) to indicate if inserted,
updated, or validation failed, and ``details`` to indicate reason for
failed validation.
"""
with session:
sample_barcode = sample.pop("sample_id", None)
sample_identifier = find_identifier(session, sample_barcode) if sample_barcode else None
collection_barcode = sample.pop("collection_id", None)
collection_identifier = find_identifier(session, collection_barcode) if collection_barcode else None
result = {
"sample_barcode": sample_barcode,
"collection_barcode": collection_barcode
}
# validate barcodes
if sample_barcode and not sample_identifier:
result["status"] = "validation_failed"
result["details"] = f"sample barcode «{sample_barcode}» not found"
elif sample_identifier and sample_identifier.set_use != 'sample':
result["status"] = "validation_failed"
result["details"] = f"barcode «{sample_barcode}» has use type «{sample_identifier.set_use}» instead of expected use type «sample»"
elif collection_barcode and not collection_identifier:
result["status"] = "validation_failed"
result["details"] = f"collection barcode «{collection_barcode}» not found"
elif collection_identifier and collection_identifier.set_use != 'collection':
result["status"] = "validation_failed"
result["details"] = f"barcode «{collection_barcode}» has use type «{collection_identifier.set_use}» instead of expected use type «collection»"
if result.get("status", None) == "validation_failed":
LOG.debug(f"Validation failed for {sample} with details: {result.get('details')}")
return result
collected_date = sample.get("collection_date", None)
# Add date to sample so that it gets written to the 'details' column in warehouse.sample
sample["date"] = collected_date
# When updating an existing row, update the identifiers only if the record has both
# the 'sample_barcode' and 'collection_barcode' keys
should_update_identifiers = True if (sample_identifier and collection_identifier) else False
try:
sample, status = upsert_sample(session,
update_identifiers = should_update_identifiers,
identifier = sample_identifier.uuid if sample_identifier else None,
collection_identifier = collection_identifier.uuid if collection_identifier else None,
collection_date = collected_date,
encounter_id = None,
additional_details = sample)
result["sample"] = sample
result["status"] = status
except Exception as e:
result["status"] = "upsert_error"
result["details"] = f"error upserting sample record: {str(e)}"
LOG.debug(f"Error on upsert_sample: {str(e)}")
return result
@export
class BadRequestDatabaseError(BadRequest):
"""
Subclass of :class:`id3c.api.exceptions.BadRequest` which takes a
:class:`psycopg2.DatabaseError` and forms a JSON response detailing the
error.
This intentionally does not expose the query context itself, only the
context related to the data handling.
"""
def __init__(self, error: DatabaseError) -> None:
super().__init__(
error = error.diag.message_primary,
extra = {
"detail": error.diag.message_detail,
"context": error.diag.context,
}
)