Skip to content

Commit

Permalink
Add PHSKC result_value to FHIR Bundle under an Observation
Browse files Browse the repository at this point in the history
This change mimics how the UW Retrospective ETL handles external test results and
includes functionality to add the `result_value` field for PHSKC data into Observation
resources consistent with the FHIR format. Slight modifications were made to the existing
functions used for this pipeline in the `fhir` module for readability.
  • Loading branch information
bencap committed May 19, 2022
1 parent 7a3304f commit d9580c5
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 10 deletions.
4 changes: 2 additions & 2 deletions lib/seattleflu/id3c/cli/command/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def ethnicity(ethnicity: str) -> Optional[bool]:
>>> ethnicity("NOT HISPANIC OR LATINO")
False
>>> ethnicity("NULL") == None
True
Expand All @@ -163,7 +163,7 @@ def ethnicity(ethnicity: str) -> Optional[bool]:
else:
ethnicity = standardize_whitespace(ethnicity.lower())

# Leaving this code here to be implemented later. My original approach was to use FHIR
# Leaving this code here to be implemented later. My original approach was to use FHIR
# coding for ethnicity, which would be preffered, but for consistency with other ETLs
# I switched to ingesting ethnicity as a boolean. To transition to FHIR codes across all projects will
# require updating multiple ETLs, shipping views, and re-ingesting data. A card has been added
Expand Down
78 changes: 77 additions & 1 deletion lib/seattleflu/id3c/cli/command/etl/clinical.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
create_specimen,
find_sample_origin_by_barcode,
UnknownSampleOrigin,
UnknownTestResult,
create_encounter_class,
create_encounter_status,
)
Expand Down Expand Up @@ -43,7 +44,7 @@
UnknownAdmitICUResponseError,

)
from . import race, ethnicity
from . import race, ethnicity, standardize_whitespace
from .fhir import *
from .redcap_map import map_sex

Expand Down Expand Up @@ -277,6 +278,45 @@ def create_resident_locations(record: dict) -> Optional[tuple]:
return location_entries, location_references


def create_clinical_result_observation_resource(record: dict) -> Optional[List[dict]]:
"""
Determine the clinical results based on responses in *record* and
create observation resources for each result following the FHIR format
(http://www.hl7.org/implement/standards/fhir/observation.html)
"""
cov19_snomed_code = {
'system': 'http://snomed.info/sct',
'code': '871562009',
'display': 'Detection of SARS-CoV-2',
}

result = virology_test_result(record)

clinical_observation = observation_resource('clinical')
clinical_observation['id'] = 'result-1'
clinical_observation['code']['coding'] = [cov19_snomed_code]
clinical_observation['valueBoolean'] = result

return [clinical_observation] or None


def virology_test_result(record: dict) -> Optional[bool]:
"""
Given a *record* with a UW Virology test result, returns
boolean indicating the result of the rest
"""
result = standardize_whitespace(record['result_value']).lower()

if result == 'det':
return True
elif result == 'ndet':
return False
elif result == 'incon':
return None
else:
raise UnknownTestResult(result)


def create_questionnaire_response(record: dict, patient_reference: dict, encounter_reference: dict) -> Optional[dict]:
""" Returns a FHIR Questionnaire Response resource entry """
response_items = determine_questionnaire_items(record)
Expand Down Expand Up @@ -357,6 +397,19 @@ def generate_fhir_bundle(db: DatabaseSession, record: dict) -> Optional[dict]:

specimen_observation_entry = create_specimen_observation_entry(specimen_reference, patient_reference, encounter_reference)

diagnostic_code = create_codeable_concept(
system = f'{SFS}/presence-absence-panel',
code = test_coding(record['site'])
)

diagnostic_report_resource_entry = create_diagnostic_report(
record,
patient_reference,
specimen_reference,
diagnostic_code,
create_clinical_result_observation_resource,
)

resource_entries = [
patient_entry,
specimen_entry,
Expand All @@ -368,13 +421,36 @@ def generate_fhir_bundle(db: DatabaseSession, record: dict) -> Optional[dict]:
if location_entries:
resource_entries.extend(location_entries)

if diagnostic_report_resource_entry:
resource_entries.append(diagnostic_report_resource_entry)

return create_bundle_resource(
bundle_id = str(uuid4()),
timestamp = datetime.now().astimezone().isoformat(),
source = f"{record['_provenance']['filename']},row:{record['_provenance']['row']}" ,
entries = list(filter(None, resource_entries))
)

def test_coding(site_name: str) -> str:
"""
Given a *site_name*, returns the code associated with the external test run
on samples from this site.
"""
if not site_name:
LOG.debug("No site name found")
return "Unknown"

site_name = site_name.upper()

code_map = {
"PHSKC": "phskc-retrospective"
}

if site_name not in code_map:
raise UnknownSiteError(f"Unknown site name «{site_name}»")

return code_map[site_name]

def site_identifier(site_name: str) -> str:
"""
Given a *site_name*, returns its matching site identifier.
Expand Down
15 changes: 8 additions & 7 deletions lib/seattleflu/id3c/cli/command/etl/fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ def create_patient_resource(patient_identifier: List[dict],
return patient_resource


def create_diagnostic_report(redcap_record:dict,
def create_diagnostic_report(record:dict,
patient_reference: dict,
specimen_reference: dict,
diagnostic_code: dict,
create_device_result_observation_resource: Callable) -> Optional[dict]:
create_device_result_observation_resources: Callable) -> Optional[dict]:
"""
Create FHIR diagnostic report from given *redcap_record*.
Create FHIR diagnostic report from given *record*.
Links the generated diagnostic report to a specific *patient_reference* and
*specimen_reference*.
Expand All @@ -77,7 +77,7 @@ def create_diagnostic_report(redcap_record:dict,
*create_device_result_observation_resource* function which attaches
observation resources to the diagnostic report.
"""
clinical_results = create_device_result_observation_resource(redcap_record)
clinical_results = create_device_result_observation_resources(record)
if not clinical_results:
return None

Expand All @@ -89,7 +89,7 @@ def create_diagnostic_report(redcap_record:dict,
)
diagnostic_result_references.append(reference)

collection_datetime = redcap_record['collection_date']
collection_datetime = record['collection_date']

diagnostic_report_resource = create_diagnostic_report_resource(
datetime = collection_datetime,
Expand Down Expand Up @@ -348,6 +348,7 @@ def create_specimen_observation(specimen_reference: dict,
"specimen": specimen_reference
}


def create_immunization_resource(patient_reference: dict,
immunization_identifier: List[dict],
immunization_date: str,
Expand All @@ -357,7 +358,7 @@ def create_immunization_resource(patient_reference: dict,
Create an immunization resource following the FHIR format
(https://www.hl7.org/fhir/immunization.html)
"""

return({
"resourceType": "Immunization",
"identifier": immunization_identifier,
Expand All @@ -369,7 +370,7 @@ def create_immunization_resource(patient_reference: dict,
}
})


def create_questionnaire_response_resource(patient_reference: dict,
encounter_reference: dict,
items: List[dict]) -> dict:
Expand Down

0 comments on commit d9580c5

Please sign in to comment.