Skip to content

Commit

Permalink
etl redcap-det: store REDCap records as a list instead of single dict
Browse files Browse the repository at this point in the history
This supports REDCap projects with repeating instruments and/or events.
The REDCap API returns multiple instances per record id if there are
repeating instruments/events in the project. Group all of these instances
into a single list that then gets passed to the custom ETL.
How these multiple instances get processed will be completely decided
by the custom ETL since they are expected to be handled differently per
project.

Also remove `required_instrument` parameter from core.
By supporting multiple record instances per record, the core ETL
decorator no longer has oversight of whether required instruments are
complete. This responsibility is now pushed to the custom REDCap ETLs.
  • Loading branch information
joverlee521 committed Sep 8, 2020
1 parent c0a517f commit 4012200
Showing 1 changed file with 12 additions and 37 deletions.
49 changes: 12 additions & 37 deletions lib/id3c/cli/command/etl/redcap_det.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import os
import click
import logging
from collections import defaultdict
from datetime import datetime, timezone
from functools import wraps
from textwrap import dedent
from typing import Callable, Iterable, Optional, Tuple, Dict, List, Any
from typing import Callable, Iterable, Optional, Tuple, Dict, List, Any, DefaultDict
from urllib.parse import urljoin
from id3c.cli.command import with_database_session
from id3c.cli.redcap import is_complete, Project
Expand Down Expand Up @@ -44,16 +45,15 @@ def command_for_project(name: str,
redcap_url: str,
project_id: int,
revision: int,
required_instruments: Iterable[str] = [],
include_incomplete: bool = False,
raw_coded_values: bool = False,
**kwargs) -> Callable[[Callable], click.Command]:
"""
Decorator to create REDCap DET ETL subcommands.
The decorated function should be an ETL routine for an individual DET and
REDCap record pair. It must take take two dictionaries, *det* and
*redcap_record*, as arguments. The function must return another dictionary
REDCap record pair. It must accept a dictionary *det* and a list
*redcap_record_instances* as arguments. The function must return another dictionary
which represents a FHIR document to insert into ``receiving.fhir``. If no
FHIR document is appropriate, the function should return ``None``.
Expand All @@ -69,9 +69,6 @@ def command_for_project(name: str,
for DETs where the instrument is marked incomplete. When false (default),
the ETL skips DETs where the instrument is marked incomplete.
*required_instruments* is an optional list of REDCap instrument names which
are required for the decorated routine to run.
*revision* is an integer specifying the version of the routine. If it
increments, previously processed DETs will be re-processed by the new
version of the routine.
Expand Down Expand Up @@ -169,20 +166,12 @@ def decorated(*args, db: DatabaseSession, log_output: bool, **kwargs):
LOG.info(f"Fetching {len(record_ids)} REDCap records from project {project.id}")

# Convert list of REDCap records to a dict so that
# records can be looked up by record id
# XXX TODO: Handle records with repeating instruments or longitudinal
# events.
redcap_records: Dict[str, dict] = {}
# records can be looked up by record id.
# Records with repeating instruments or longitudinal
# events will have multiple entries in the list.
redcap_records: DefaultDict[str, List[dict]] = defaultdict(list)
for record in project.records(ids = record_ids, raw = raw_coded_values):
if not redcap_records.get(record.id):
redcap_records[record.id] = record
else:
LOG.warning(dedent(f"""
Found duplicate record id «{record.id}» in project {project_id}.
Duplicate record ids are commonly due to repeating instruments/longitudinal events in REDCap,
which the redcap-det ETL is currently unable to handle.
If your REDCap project does not have repeating instruments or longitudinal events,
then this may be caused by a bug in REDCap."""))
redcap_records[record.id].append(record)

# Process all DETs in order of redcap_det_id
with pickled_cache(CACHE_FILE) as cache:
Expand All @@ -196,28 +185,14 @@ def decorated(*args, db: DatabaseSession, log_output: bool, **kwargs):
continue

received_det = first_complete_dets.pop(det["record_id"])
redcap_record = redcap_records.get(received_det.document["record"])
redcap_record_instances = redcap_records.get(received_det.document["record"])

if not redcap_record:
if not redcap_record_instances:
LOG.debug(f"REDCap record is missing or invalid. Skipping REDCap DET {received_det.id}")
mark_skipped(db, received_det.id, etl_id, "invalid REDCap record")
continue

# Only process REDCap record if all required instruments are complete
incomplete_instruments = {
instrument
for instrument
in required_instruments
if not is_complete(instrument, redcap_record)
}

if incomplete_instruments:
LOG.debug(f"The following required instruments «{incomplete_instruments}» are not yet marked complete. " + \
f"Skipping REDCap DET {received_det.id}")
mark_skipped(db, received_det.id, etl_id, "required instruments incomplete")
continue

bundle = routine(db = db, cache = cache, det = received_det, redcap_record = redcap_record)
bundle = routine(db = db, cache = cache, det = received_det, redcap_record_instances = redcap_record_instances)

if not bundle:
LOG.debug(f"Skipping REDCap DET {received_det.id} due to insufficient data in REDCap record.")
Expand Down

0 comments on commit 4012200

Please sign in to comment.