diff --git a/intrahospital_api/management/commands/load_all_lab_tests.py b/intrahospital_api/management/commands/load_all_lab_tests.py new file mode 100644 index 000000000..c02fcfccb --- /dev/null +++ b/intrahospital_api/management/commands/load_all_lab_tests.py @@ -0,0 +1,353 @@ +from django.core.management.base import BaseCommand +from django.utils import timezone +from elcid import models as elcid_models +from plugins.labtests import models as lab_models +from elcid.utils import timing +from django.conf import settings +import subprocess +import pytds +import csv +import os + + +# Get all results from upstream, order the results +# by the three fields that define a unique lab test +# so that in future when we iterate over to create +# observations we can cache the call to lab tests +GET_ALL_RESULTS = """ + SELECT * FROM tQuest.Pathology_Result_View +""" + +RESULTS_CSV = "results.csv" +LABTEST_CSV = "lab_tests.csv" +OBSERVATIONS_CSV = "observations.csv" + +LAB_TEST_COLUMNS = [ + "Relevant_Clinical_Info", # Becomes clinical info + # Datetime ordered is observation date + # if there is no observation date, we + # use request date + "Observation_date", + "Request_Date", + "Result_ID", # Lab number + "Specimen_Site", # Site + # Status, this comes through as + # F for complete + # There is also empty stryings, Nones + # and 'I' that we store as Pending + "OBX_Status", # TODO is there more than one status per test number, test_ocde + "Result_ID", # lab number + "OBR_exam_code_ID", # test_code + "OBR_exam_code_Text", # test_name + "Encounter_Consultant_Name", # encounter_consultant_name + "Encounter_Location_Code", # encounter_location_code + "Encounter_Location_Name", # encounter_location_name + "Accession_number", # accession_number + "Department", # department_int +] + +OBSERVATION_COLUMNS = [ + "last_updated", # last_updated + # observation_datetime is Observation_date + # if this isn't populated its Request_Date + "Observation_date", + "Request_Date", + "Reported_date", # reported_datetime + "Result_Range", # reference_range + "OBX_exam_code_Text", # observation_name + "OBX_id", # observation_number + "Result_Value", # observation_value + "Result_Units", # units +] + + +COLUMNS = ["Patient_Number"] + LAB_TEST_COLUMNS + OBSERVATION_COLUMNS + + +@timing +def write_results(): + """ + Get all lab test data for MRNs that are within elcid + """ + hns = set( + elcid_models.Demographics.objects.exclude( + hospital_number=None, + ) + .exclude(hospital_number="") + .values_list("hospital_number", flat=True) + ) + with open(RESULTS_CSV, "w") as m: + writer = csv.DictWriter(m, fieldnames=COLUMNS) + columns = set(COLUMNS) + writer.writeheader() + with pytds.connect( + settings.TRUST_DB["ip_address"], + settings.TRUST_DB["database"], + settings.TRUST_DB["username"], + settings.TRUST_DB["password"], + as_dict=True, + ) as conn: + with conn.cursor() as cur: + cur.execute(GET_ALL_RESULTS) + while True: + rows = cur.fetchmany() + if not rows: + break + for row in rows: + key = get_key(row) + if not all(key): + continue + if key[0] not in hns: + continue + writer.writerow({k: v for k, v in row.items() if k in columns}) + + +def cast_to_lab_test_dict(row, patient_id): + """ + Creates a dictionary from an upstream row with keys, values + of what we want to save in our lab test model + """ + result = {"patient_id": patient_id} + result["clinical_info"] = row["Relevant_Clinical_Info"] + result["datetime_ordered"] = row.get("Observation_date") + result["lab_number"] = row["Result_ID"] + + site = row["Specimen_Site"] + if site and "^" in site and "-" in site: + site = site.split("^")[1].strip().split("-")[0].strip() + result["site"] = site + + status_abbr = row["OBX_Status"] + + if status_abbr == "F": + status = "complete" + else: + status = "pending" + result["status"] = status + + result["test_code"] = row["OBR_exam_code_ID"] + result["test_name"] = row["OBR_exam_code_Text"] + result["encounter_consultant_name"] = row["Encounter_Consultant_Name"] + result["encounter_location_code"] = row["Encounter_Location_Code"] + result["encounter_location_name"] = row["Encounter_Location_Name"] + result["accession_number"] = row["Accession_number"] + result["created_at"] = timezone.now() + result["updated_at"] = timezone.now() + dep = row.get("Department") + result["department_int"] = None + if dep: + result["department_int"] = int(dep) + return result + + +def cast_to_observation_dict(row, lab_test_id): + """ + Creates a dictionary from an upstream row with keys, values + of what we want to save in our observation model + """ + result = {"test_id": lab_test_id} + result["last_updated"] = row["last_updated"] + result["observation_datetime"] = row["Observation_date"] + if not result["observation_datetime"]: + result["Request_Date"] + result["reported_datetime"] = row["Reported_date"] + result["reference_range"] = row["Result_Range"] + result["observation_number"] = row["OBX_id"] + result["observation_name"] = row["OBX_exam_code_Text"] + result["observation_value"] = row["Result_Value"] + result["units"] = row["Result_Units"] + result["created_at"] = timezone.now() + return result + + +def get_key(row): + """ + Returns MRN, lab_number, test_name + which are the three things that define a unique + lab test. + """ + return ( + row["Patient_Number"], + row["Result_ID"], + row["OBR_exam_code_Text"], + ) + + +def get_csv_fields(file_name): + """ + Gets the column names from a csv file. + """ + with open(file_name) as m: + reader = csv.DictReader(m) + headers = next(reader).keys() + return list(headers) + + +def get_mrn_lab_number_test_name_to_test_id(): + values = lab_models.LabTest.objects.values_list( + 'patient__demographics__hospital_number', + 'lab_number', + 'test_name', + 'id' + ) + result = {} + for mrn, lab_number, test_name, test_id in values: + result[(mrn, lab_number, test_name,)] = test_id + return result + + +@timing +def write_observation_csv(): + """ + Reads the results csv where the data exists as it exists + in the upstream table. + + Writes the observation csv where the headers match our observation fields + and the data is formatted into what we would save to our observation. + It also adds the test_id column with the elcid lab test id in it. + """ + mrn_lab_number_test_name_to_test_id = get_mrn_lab_number_test_name_to_test_id() + with open(RESULTS_CSV) as m: + reader = csv.DictReader(m) + headers = cast_to_observation_dict(next(reader), 1).keys() + m.seek(0) + with open(RESULTS_CSV) as m: + reader = csv.DictReader(m) + with open(OBSERVATIONS_CSV, "w") as a: + writer = None + for idx, row in enumerate(reader): + key = get_key(row) + lt_id = mrn_lab_number_test_name_to_test_id[key] + obs_dict = cast_to_observation_dict(row, lt_id) + if idx == 0: + writer = csv.DictWriter(a, fieldnames=headers) + writer.writeheader() + writer.writerow(obs_dict) + + +@timing +def write_lab_test_csv(): + """ + Reads the results csv where the data exists as it exists + in the upstream table. + + Writes the lab test csv where the headers match our lab test fields + and the data is formatted into what we would save to our lab test. + It also adds the patient_id column with the elcid patient id in it. + """ + seen = set() + hns_and_patient_ids = elcid_models.Demographics.objects.all().values_list( + "hospital_number", "patient_id" + ) + hospital_number_to_patient_id = {i: v for i, v in hns_and_patient_ids} + writer = None + with open(RESULTS_CSV) as m: + reader = csv.DictReader(m) + with open(LABTEST_CSV, "w") as a: + for idx, row in enumerate(reader): + key = get_key(row) + if key in seen: + continue + seen.add(key) + patient_id = hospital_number_to_patient_id[row["Patient_Number"]] + + our_row = cast_to_lab_test_dict(row, patient_id) + if idx == 0: + headers = our_row.keys() + writer = csv.DictWriter(a, fieldnames=headers) + writer.writeheader() + writer.writerow(our_row) + + +def call_db_command(sql): + """ + Calls a command on our database via psql + """ + subprocess.call(f"psql --echo-all -d {settings.DATABASES['default']['NAME']} -c '{sql}'", shell=True) + + +def delete_existing_lab_tests(): + """ + Delete all existing lab tests + NOTE this cascades into infectious diseases + """ + call_db_command("truncate table labtests_labtest cascade;") + + +def delete_existing_observations(): + """ + Delete all existing observations + """ + call_db_command("truncate table labtests_observation;") + + +def copy_lab_tests(): + """ + Runs the psql copy command to copy lab tests from + LABTEST_CSV into our lab test table + """ + columns = ",".join(get_csv_fields(LABTEST_CSV)) + pwd = os.getcwd() + labtest_csv = os.path.join(pwd, LABTEST_CSV) + cmd = f"\copy labtests_labtest ({columns}) FROM '{labtest_csv}' WITH (FORMAT csv, header);" + call_db_command(cmd) + + +def copy_observations(): + """ + Runs the psql copy command to copy observations from + OBSERVATIONS_CSV into our observation table + """ + columns = ",".join(get_csv_fields(OBSERVATIONS_CSV)) + pwd = os.getcwd() + observation_csv = os.path.join(pwd, OBSERVATIONS_CSV) + cmd = f"\copy labtests_observation ({columns}) FROM '{observation_csv}' WITH (FORMAT csv, header);" + call_db_command(cmd) + + +def check_db_command(): + """ + A sanity check to make sure we have permissions to + run commands on our database. + """ + call_db_command("SELECT count(*) FROM labtests_labtest") + + +class Command(BaseCommand): + @timing + def handle(self, *args, **options): + check_db_command() + + # Writes a csv of results as they exist in the upstream table + write_results() + + # Writes a csv of lab tests with the fields they will have + # in our table + write_lab_test_csv() + + # Deletes all existing lab tests in our lab test table + # note this also deletes FK relationships + delete_existing_lab_tests() + + # Copies the lab tests from the lab test csv into our + # table + copy_lab_tests() + + # Deletes all observations in our observation table + # this is not strictly necessary but its safer if the + # process has errored out for any reason (probably disk space) + # and restarted. + delete_existing_observations() + + # Writes a csv of observations with the fields they will have + # in our table + write_observation_csv() + + # Copies the observations from the observation csv into our + # table + copy_observations() + + lab_test_count = lab_models.LabTest.objects.all().count() + observation_count = lab_models.Observation.objects.all().count() + self.stdout.write(f"{lab_test_count} lab tests loaded") + self.stdout.write(f"{observation_count} observations loaded") diff --git a/intrahospital_api/test/test_load_all_lab_tests.py b/intrahospital_api/test/test_load_all_lab_tests.py new file mode 100644 index 000000000..00b9d705e --- /dev/null +++ b/intrahospital_api/test/test_load_all_lab_tests.py @@ -0,0 +1,171 @@ +from unittest import mock +from plugins.labtests import models as lab_models +from django.utils import timezone +from opal.core.test import OpalTestCase +from intrahospital_api.management.commands import load_all_lab_tests +import datetime + +FAKE_PATHOLOGY_DATA = { + u'Abnormal_Flag': u'', + u'Accession_number': u'73151060487', + u'CRS_ADDRESS_LINE1': u'James Centre', + u'CRS_ADDRESS_LINE2': u'39 Winston Terrace', + u'CRS_ADDRESS_LINE3': u'LONDON', + u'CRS_ADDRESS_LINE4': u'', + u'CRS_DOB': datetime.datetime(1980, 10, 10, 0, 0), + u'CRS_Date_of_Death': datetime.datetime(1900, 1, 1, 0, 0), + u'CRS_Deceased_Flag': u'ALIVE', + u'CRS_EMAIL': u'', + u'CRS_Ethnic_Group': u'D', + u'CRS_Forename1': u'TEST', + u'CRS_Forename2': u'', + u'CRS_GP_NATIONAL_CODE': u'G1004756', + u'CRS_GP_PRACTICE_CODE': u'H84012', + u'CRS_HOME_TELEPHONE': u'0111111111', + u'CRS_MAIN_LANGUAGE': u'', + u'CRS_MARITAL_STATUS': u'', + u'CRS_MOBILE_TELEPHONE': u'', + u'CRS_NATIONALITY': u'GBR', + u'CRS_NHS_Number': u'', + u'CRS_NOK_ADDRESS1': u'', + u'CRS_NOK_ADDRESS2': u'', + u'CRS_NOK_ADDRESS3': u'', + u'CRS_NOK_ADDRESS4': u'', + u'CRS_NOK_FORENAME1': u'', + u'CRS_NOK_FORENAME2': u'', + u'CRS_NOK_HOME_TELEPHONE': u'', + u'CRS_NOK_MOBILE_TELEPHONE': u'', + u'CRS_NOK_POST_CODE': u'', + u'CRS_NOK_RELATIONSHIP': u'', + u'CRS_NOK_SURNAME': u'', + u'CRS_NOK_TYPE': u'', + u'CRS_NOK_WORK_TELEPHONE': u'', + u'CRS_Postcode': u'N6 P12', + u'CRS_Religion': u'', + u'CRS_SEX': u'F', + u'CRS_Surname': u'ZZZTEST', + u'CRS_Title': u'', + u'CRS_WORK_TELEPHONE': u'', + u'DOB': datetime.datetime(1964, 1, 1, 0, 0), + u'Date_Last_Obs_Normal': datetime.datetime(2015, 7, 18, 16, 26), + u'Date_of_the_Observation': datetime.datetime(2015, 7, 18, 16, 26), + u'Department': u'9', + u'Encounter_Consultant_Code': u'C2754019', + u'Encounter_Consultant_Name': u'DR. M. SMITH', + u'Encounter_Consultant_Type': u'', + u'Encounter_Location_Code': u'6N', + u'Encounter_Location_Name': u'RAL 6 NORTH', + u'Encounter_Location_Type': u'IP', + u'Event_Date': datetime.datetime(2015, 7, 18, 16, 47), + u'Firstname': u'TEST', + u'MSH_Control_ID': u'18498139', + u'OBR-5_Priority': u'N', + u'OBR_Sequence_ID': u'2', + u'OBR_Status': u'F', + u'OBR_exam_code_ID': u'ANNR', + u'OBR_exam_code_Text': u'ANTI NEURONAL AB REFERRAL', + u'OBX_Sequence_ID': u'11', + u'OBX_Status': u'F', + u'OBX_exam_code_ID': u'AN12', + u'OBX_exam_code_Text': u'Anti-CV2 (CRMP-5) antibodies', + u'OBX_id': 20334311, + u'ORC-9_Datetime_of_Transaction': datetime.datetime(2015, 7, 18, 16, 47), + u'Observation_date': datetime.datetime(2015, 7, 18, 16, 18), + u'Order_Number': u'', + u'Patient_Class': u'NHS', + u'Patient_ID_External': u'7060976728', + u'Patient_Number': u'20552710', + u'Relevant_Clinical_Info': u'testing', + u'Reported_date': datetime.datetime(2015, 7, 18, 16, 26), + u'Request_Date': datetime.datetime(2015, 7, 18, 16, 15), + u'Requesting_Clinician': u'C4369059_Chee Ronnie', + u'Result_ID': u'0013I245895', + u'Result_Range': u' -', + u'Result_Units': u'', + u'Result_Value': u'Negative', + u'SEX': u'F', + u'Specimen_Site': u'^& ^', + u'Surname': u'ZZZTEST', + u'Visit_Number': u'', + u'crs_patient_masterfile_id': None, + u'date_inserted': datetime.datetime(2015, 7, 18, 17, 0, 2, 240000), + u'id': 5949264, + u'last_updated': datetime.datetime(2015, 7, 18, 17, 0, 2, 240000), + u'visible': u'Y' +} + + +class CastToLabTestDictTestCase(OpalTestCase): + @mock.patch( + 'intrahospital_api.management.commands.load_all_lab_tests.timezone.now' + ) + def test_cast_to_lab_test_dict(self, now_value): + now = timezone.make_aware( + datetime.datetime(2023, 1, 30, 13, 00) + ) + now_value.return_value = now + result = load_all_lab_tests.cast_to_lab_test_dict(FAKE_PATHOLOGY_DATA, 1) + self.assertEqual( + result["patient_id"], 1 + ) + mappings = { + "clinical_info": "Relevant_Clinical_Info", + "datetime_ordered": "Observation_date", + "lab_number": "Result_ID", + "test_code": "OBR_exam_code_ID", + "test_name": "OBR_exam_code_Text", + "encounter_consultant_name": "Encounter_Consultant_Name", + "encounter_location_code": "Encounter_Location_Code", + "encounter_location_name": "Encounter_Location_Name", + "accession_number": "Accession_number", + } + for our_key, their_key in mappings.items(): + self.assertEqual( + result[our_key], FAKE_PATHOLOGY_DATA[their_key] + ) + self.assertEqual( + result["created_at"], now + ) + self.assertEqual( + result["updated_at"], now + ) + self.assertEqual(result["department_int"], 9) + self.assertEqual(result["status"], 'complete') + self.assertEqual(result["site"], "^& ^") + patient, _ = self.new_patient_and_episode_please() + result["patient_id"] = patient.id + # Sanity check in case we have changed the fields on the model + lab_models.LabTest.objects.create( + **result + ) + + @mock.patch( + 'intrahospital_api.management.commands.load_all_lab_tests.timezone.now' + ) + def test_cast_to_observation_dict(self, now_value): + now = timezone.make_aware( + datetime.datetime(2023, 1, 30, 13, 00) + ) + now_value.return_value = now + result = load_all_lab_tests.cast_to_observation_dict(FAKE_PATHOLOGY_DATA, 1) + self.assertEqual(result["test_id"], 1) + mappings = { + "last_updated": "last_updated", + "observation_datetime": "Observation_date", + "reported_datetime": "Reported_date", + "reference_range": "Result_Range", + "observation_number": "OBX_id", + "observation_name": "OBX_exam_code_Text", + "observation_value": "Result_Value", + "units": "Result_Units", + } + for our_key, their_key in mappings.items(): + self.assertEqual( + result[our_key], FAKE_PATHOLOGY_DATA[their_key] + ) + patient, _ = self.new_patient_and_episode_please() + lab_test = patient.lab_tests.create() + result['test_id'] = lab_test.id + lab_models.Observation.objects.create( + **result + )