# Data quality checks

As my work progressed in the masters program, it became evident that many datasets I was interested in using had either issues in how they were structured, or contained information that I did not know how to handle.  
Thomas Stoeger, a fellow lab mate, had done extensive work with the Enterprise Data Warehousing team (EDW) to better understand tables/datasets in the unified database of the project. He taught me about encoding assumptions and insights about what each table was or was not about in the form of assertion functions. Once I learned a fact or a quirk about a table, I would then write code enforcing such expected behavior from the table, so that future versions of the data could be checked against this expectation.  
And as such, one could better document whenever changes occurred as being errors in the ETL process, or whether the datasets switched specification.  
Thomas had already written extensive lines of code to assert behaviors from many tables. As he slowly phased out from this project, and I became interested in two particular datasets, I added functions to his giant .py file. Below you can see both functions, which are long, but neat versions of functions that took months to build due to the back and forth with the EDW team and clinicians. These are now under routine use by others.

### Asserting the basic_endpoints table  
This table was hailed as the source of truth for getting dates. When I checked, there are some issues with them (ICU dates falling outside the hospitalization window, and inconsistent calculation of lengths of stay). This function tests initial and then subsequent assumptions on basic_endpoints, as me and others learned more about this table. Notice how whenever an assumption would not hold, a link to the pertinent GitHub issue would be mentioned: It mainly signals an ongoing, unresolved issue.

In [None]:
def assert_basic_endpoints(df, input_version, org, output_folder):

    n_trans = 'clinical.clinical_metadata'
    df_trans = get_clean(n_trans, input_version, org)

    trans = df_trans[['case_number', 'discharge_disposition_name']].copy(
    ).drop_duplicates()
    cis = df[['case_number', 'discharge_disposition_name']].drop_duplicates()

    tog = pd.merge(cis, trans, on='case_number',
                   how='outer', suffixes=('_cis', '_trans'))

    # basic_endpoints has "None" for empty disposition names
    # However, clinical_metadata has "unknown" for those entries
    tog = tog.replace({'unknown': None})

    tog[['discharge_disposition_name_cis', 'discharge_disposition_name_trans']] = tog[[
        'discharge_disposition_name_cis', 'discharge_disposition_name_trans']].fillna('')
    tog['discharge_disposition_name_cis'] = tog['discharge_disposition_name_cis'].str.strip()
    tog['discharge_disposition_name_trans'] = tog['discharge_disposition_name_trans'].str.strip()

    f = tog['discharge_disposition_name_cis'] != tog['discharge_disposition_name_trans']
    if any(f):

        report(
            tog[f],
            output_folder,
            'issue_158_basic_endpoints_disposition_differs_from_clinical_metadata')

        print("""
        https://github.com/NUSCRIPT/script_etl_eda/issues/158
        {} records with different discharge_disposition_name in basic_endpoints
        and clinical_metadata
        """.format(sum(f)))

    # One row, one patient/case. In the past, duplicate records would only differ in consent_dt column
    # Code below the first AssertionError checks if that's the only column that differs between duplicates.

    if df['case_number'].value_counts().max() > 1:
        raise AssertionError(
            "A single case_number/pt_study_id appears in more than one row (potential duplicate record)")

    f = df['discharge_disposition_name'] == 'Expired'
    expired = df.loc[f, ['death_date', 'discharge_disposition_name']]
    g = expired['death_date'].isnull()

    if sum(g) > 0:
        raise AssertioError(
            f"There are {sum(g)} 'Expired' cases without a death date")

    # Third intubation dates come after second intubation dates, and second after first

    df_two_intub = df.loc[df['Second_intub_start'].notnull()]
    f = df_two_intub['Second_intub_start'] >= df_two_intub['First_intub_stop']

    if not f.all():
        raise AssertionError(
            f"{~f} second intubation start dates happened before the end of the first intubation")

    df_three_intub = df.loc[df['Third_intub_start'].notnull()]
    g = df_three_intub['Third_intub_start'] >= df_three_intub['Second_intub_stop']

    if not g.all():
        raise AssertionError(
            f"{g} third intubation start dates happened before the end of the second intubation")

    # If any of the hospital or ICU dates is NULL, then corresponding LOS should be NULL as well

    date_los_pairs = {'hospital_los_days': ['admission_datetime', 'discharge_datetime'],
                      'index_ICU_LOS_Days': ['index_icu_start', 'index_icu_stop']}

    for los, dates in date_los_pairs.items():
        for date in dates:
            df_null_time = df.loc[df[date].isnull()]
            f = df_null_time[date].isnull()
            g = df_null_time[los].isnull()

            if sum(f) != sum(g):
                raise AssertionError(
                    "At least one record may report a LOS while having a NULL date")

    # ICU start and end dates are within hospital dates

    df_start = df[['case_number', 'admission_datetime',
                   'index_icu_start']].dropna()
    f = df_start['index_icu_start'].dt.date >= df_start['admission_datetime'].dt.date
    if not all(f):
        p = (~f).sum()
        raise AssertionError(f"For {p} patient(s) the index ICU start is before hospital admission")

    df_start1 = df[['case_number',
                    'admission_datetime', 'icu_start2']].dropna()
    m = df_start1['icu_start2'].dt.date >= df_start1['admission_datetime'].dt.date
    if not all(m):
        p = (~m).sum()
        raise AssertionError(f"For {p} patient(s) the second ICU start is before hospital admission")

    df_stop = df[['case_number', 'discharge_datetime',
                  'index_icu_stop']].dropna()
    g = df_stop['index_icu_stop'].dt.date <= df_stop['discharge_datetime'].dt.date
    if not all(g):
        q = (~g).sum()
        raise AssertionError(f"For {q} patient(s) the index ICU stop date is after hospital discharge")

    df_stop1 = df[['case_number', 'discharge_datetime', 'icu_stop2']].dropna()
    n = df_stop1['icu_stop2'].dt.date <= df_stop1['discharge_datetime'].dt.date
    if not all(n):
        q = (~n).sum()
        raise AssertionError(f"For {q} patient(s) the second ICU stop date is after hospital discharge")

    # Length of stay (LOS) calculations follow a similar, agreed-upon convention

    conventions = []

    for los, dates in date_los_pairs.items():
        df_time = df.loc[df[dates[0]].notnull()]
        df_time = df_time.loc[df_time[dates[1]].notnull()]

        number_of_valid_records = len(df_time[los])

        df_time['calculated_LOS'] = (df_time[dates[1]].dt.date -
                                     df_time[dates[0]].dt.date).dt.days

        difference = df_time[los] - df_time['calculated_LOS']
        c = difference.value_counts()

        if (c != number_of_valid_records).any():
            raise AssertionError("Inconsistent hospital/ICU LOS calculation")
        else:
            conventions.append(list(c.index.astype(int))[0])

    if conventions[0] != conventions[1]:
        raise AssertionError(
            f"""Inconsistent LOS calculation convention between hospital LOS and ICU LOS\nHospital admission date is Day {conventions[0]}\nICU start date is Day {conventions[1]}""")

### Asserting the SOFA scores table  
This dataset was difficult to pin down. While luckily only one problem would be found at a time, they usually involved researching and understanding the medical technicalities of the score. For example, to properly assess a score for hypertension/cardiovascular, an epinephrine or dobutamine dose had to be specified if the patient had low mean arterial pressure. When accounting for that, it was still problematic. But Anna from the EDW team realized that she had neglected to further specify that such dosage had to be sustained for a full 60 minutes, otherwise it did not count toward the subscore. Remarkably, physicians didn't capture this technicality either. Once this aspect was accounted for, the table passed all checks regarding validity of the score calculations.

In [None]:
def assert_sofa_scores(df, output_folder):
    """
    Checking the following assumptions:
    - ICU starting and ending dates are within hospital admission and discharge dates.
    - Day bucket ends minus day bucket starts should define a single day. Not two days. Nor zero.
    - If measurements are present, they should conform to SOFA criteria
    - The total SOFA score is indeed the sum of scores from each organ system (where NULL values are assumed to be zero).
    """

    # ICU starting and ending dates are within hospital admission and discharge dates.
    f = df['icu_start_dt'].dt.date >= df['admission_datetime'].dt.date
    if not f.all():

        report(df.loc[~f, :][['icu_start_dt', 'admission_datetime']].drop_duplicates(),
               output_folder,
               'issue_130a_sofa_scores_icu_start_date_before_admission_date')

        print("""https://github.com/NUSCRIPT/script_etl_eda/issues/130
        {} records have a ICU start date that is before hospital admission date""".format(sum(~f)))

    g = df['icu_stop_dt'].dt.date <= df['discharge_datetime'].dt.date
    if not g.all():

        report(df.loc[~g, :][['icu_stop_dt', 'discharge_datetime']].drop_duplicates(),
               output_folder,
               'issue_130b_sofa_scores_icu_stop_date_after_discharge_date')

        print("""https://github.com/NUSCRIPT/script_etl_eda/issues/130
        {} records have a ICU stop date that is after hospital discharge date""".format(sum(~g)))

    # Day bucket ends minus day bucket starts should define a single day. Not two days. Nor zero.
    h = (df['day_bucket_ends'].dt.date -
         df['day_bucket_starts'].dt.date).dt.days == 1
    if not h.all():
        raise AssertionError(f"{~h} bucket days does not define a single day")

    # If measurements are present, they should conform to SOFA criteria
    # Coagulation criteria/Platelets
    df_coag = df[df['Platelet'] > 150]
    f = df_coag['platelet_points'] == 0

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} platelet count above 150 was not coded as 0")

    df_coag = df[(df['Platelet'] <= 150) & (df['Platelet'] > 100)]
    f = df_coag['platelet_points'] == 1

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} platelet count between 100 and 150 was not coded as 1")

    df_coag = df[(df['Platelet'] <= 100) & (df['Platelet'] > 50)]
    f = df_coag['platelet_points'] == 2

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} platelet count between 50 and 100 was not coded as 2")

    df_coag = df[(df['Platelet'] <= 50) & (df['Platelet'] > 20)]
    f = df_coag['platelet_points'] == 3

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} platelet count between 20 and 50 was not coded as 3")

    df_coag = df[df['Platelet'] <= 20]
    f = df_coag['platelet_points'] == 4

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} platelet count below 20 was not coded as 4")

    df_coag = df[df['Platelet'].isnull()]
    f = df_coag['platelet_points'].isnull()

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} NULL platelet count does not correspond with NULL platelet points")

    # Respiratory criteria/PF ratio
    df_PF = df[df['PF_ratio'] > 400]
    f = df_PF['P_F_ratio_points'] == 0

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} PF_ratio value above 400 mm Hg was not coded as 0")

    df_PF = df[(df['PF_ratio'] <= 400) & (df['PF_ratio'] > 300)]
    f = df_PF['P_F_ratio_points'] == 1

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} PF_ratio value between 300 and 400 mm Hg was not coded as 1")

    df_PF = df[(df['PF_ratio'] <= 300) & (df['PF_ratio'] > 200)]
    f = df_PF['P_F_ratio_points'] == 2

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} PF_ratio value between 200 and 300 mm Hg was not coded as 2")

    df_PF = df[(df['PF_ratio'] <= 200) & (
        df['PF_ratio'] > 100) & (df['intub_flag'] == 1)]
    f = df_PF['P_F_ratio_points'] == 3

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} PF_ratio value between 100 and 200 mm Hg (with respiratory support) was not coded as 3")

    df_PF = df[(df['PF_ratio'] <= 100) & (df['intub_flag'] == 1)]
    f = df_PF['P_F_ratio_points'] == 4

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} PF_ratio value below 100 mm Hg (with respiratory support) was not coded as 4")

    df_PF = df[df['PF_ratio'].isnull()]
    f = df_PF['P_F_ratio_points'].isnull()

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} NULL P/F ratio value does not correspond with NULL PF ratio points")

    # Renal criteria: Creatinine or urine output
    df_renal = df[(df['creatinine'] < 1.2) & (
        df['HD_or_CRRT_flag'].isnull()) & (df['urine_output'] >= 500)]
    f = df_renal['renal_points'] == 0

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} creatinine value below 1.2 mg/dL was not coded as 0")

    df_renal = df[(df['creatinine'] <= 1.9) & (df['creatinine'] >= 1.2) & (
        df['HD_or_CRRT_flag'].isnull()) & (df['urine_output'] >= 500)]
    f = df_renal['renal_points'] == 1

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} creatinine value between 1.2 and 1.9 mg/dL was not coded as 1")

    df_renal = df[(df['creatinine'] <= 3.4) & (df['creatinine'] >= 2.0) & (
        df['HD_or_CRRT_flag'].isnull()) & (df['urine_output'] >= 500)]
    f = df_renal['renal_points'] == 2

    if not f.all():
        raise AssertionError(
            "{sum(~f)} creatinine value between 2.0 and 3.4 mg/dL was not coded as 2")

    df_renal = df[((df['creatinine'] <= 4.9) & (df['creatinine'] >= 3.5) & (
        df['urine_output'] >= 200)) & (df['HD_or_CRRT_flag'].isnull())]
    f = df_renal['renal_points'] == 3

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} creatinine value between 3.5 and 4.9 mg/dL, or urine output value between 200-500 mL/d was not coded as 3")

    df_renal = df[((df['creatinine'] >= 5.0) | (
        df['urine_output'] < 200)) & (df['HD_or_CRRT_flag'] == 1)]
    f = df_renal['renal_points'] == 4

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} creatinine value greater than or equal to 5.0 mg/dL, or urine output value below 200 mL/d was not coded as 4")

    df_renal = df[df['urine_output'].isnull() & df['creatinine'].isnull()
                  & df['HD_or_CRRT_flag'].isnull()]
    f = df_renal['renal_points'].isnull()

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} NULL value for urine output and creatinine does not correspond with NULL renal points")

    # Neurological criteria/Glasgow Coma Scale
    cols = ['eye_opening_score', 'best_motor_response_score',
            'best_verbal_response_score']
    df_for_gcs = df.dropna(subset=cols).copy()
    for col in cols:
        df_for_gcs.loc[:, col] = df_for_gcs.loc[:, col].astype(int)
    
    df_for_gcs['total_gcs_score'] = df_for_gcs['eye_opening_score'] + \
        df_for_gcs['best_motor_response_score'] + df_for_gcs['best_verbal_response_score']
    df_gcs = df_for_gcs[df_for_gcs['total_gcs_score'] == 15]
    f = df_gcs['gcs_points'] == 0

    if not f.all():
        raise AssertionError(f"{sum(~f)} total GCS score of 15 was not coded as 0")

    df_gcs = df_for_gcs[(df_for_gcs['total_gcs_score'] == 13) | (df_for_gcs['total_gcs_score'] == 14)]
    f = df_gcs['gcs_points'] == 1

    if not f.all():
        raise AssertionError(f"{sum(~f)} total GCS score between 13 and 14 (inclusive) was not coded as 1")

    df_gcs = df_for_gcs[(df_for_gcs['total_gcs_score'] <= 12) & (df_for_gcs['total_gcs_score'] >= 10)]
    f = df_gcs['gcs_points'] == 2

    if not f.all():
        raise AssertionError(f"{sum(~f)} total GCS score between 10 and 12 (inclusive) was not coded as 2")

    df_gcs = df_for_gcs[(df_for_gcs['total_gcs_score'] <= 9) & (df_for_gcs['total_gcs_score'] >= 6)]
    f = df_gcs['gcs_points'] == 3

    if not f.all():
        raise AssertionError(f"{sum(~f)} total GCS score between 6 and 9 (inclusive) was not coded as 3")

    df_gcs = df_for_gcs[df_for_gcs['total_gcs_score'] < 6]
    f = df_gcs['gcs_points'] == 4

    if not f.all():
        raise AssertionError(f"{~f} total GCS score less than 6 was not coded as 4")

    df_gcs = df_for_gcs[df_for_gcs['eye_opening_score'].isnull() | df_for_gcs['best_motor_response_score'].isnull(
    ) | df_for_gcs['best_verbal_response_score'].isnull()]
    f = df_gcs['gcs_points'].isnull()

    if not f.all():
        raise AssertionError(f"{sum(~f)} NULL GCS subscore field does not correspond with NULL GCS points")

    # Cardiovascular criteria/Mean arterial pressure
    df_htn = df[(df['map'] >= 70) & (
        df['dosage'].isnull() | df['dosage'] == 0)]
    f = df_htn['htn_points'] == 0

    if not f.all():
        raise AssertionError(f"{sum(~f)} MAP value greater than or equal to 70 mm Hg was not coded as 0")

    df_htn = df[(df['map'] < 70) & (df['dosage'].isnull() | df['dosage'] == 0)]
    f = df_htn['htn_points'] == 1

    if not f.all():
        raise AssertionError(f"{sum(~f)} MAP value smaller than 70 mm Hg was not coded as 1")

    g = df['htn_med_name'] == 'dopamine'
    h = df['htn_med_name'] == 'EPINEPHrine'
    j = df['htn_med_name'] == 'DOBUTamine'
    df_htn_dop = df.loc[g]
    df_htn_epi = df.loc[h]
    df_htn_dobu = df.loc[j]

    df_htn = df_htn_dop[(df_htn_dop['dosage'] > 0) & (df_htn_dop['dosage'] < 5) & (
        df_htn_dop['length_of_time_on_med_in_min'] >= 60)]
    f = df_htn['htn_points'] == 2

    if not f.all():
        raise AssertionError(f"{sum(~f)} dopamine value smaller than 5 ug/kg/min was not coded as 2")

    f = df_htn_dobu['htn_points'] == 2

    if not f.all():
        raise AssertionError(f"{sum(~f)} entry where dobutamine was administered was not coded as 2")

    df_htn = df_htn_dop[(df_htn_dop['dosage'] <= 15) & (
        df_htn_dop['dosage'] >= 5.1) & (df_htn_dop['length_of_time_on_med_in_min'] >= 60)]
    f = df_htn['htn_points'] == 3

    if not f.all():
        raise AssertionError(f"{sum(~f)} dopamine value between 5.1 and 15 ug/kg/min (inclusive) was not coded as 3")

    df_htn = df_htn_epi[(df_htn_epi['dosage'] > 0) & (df_htn_epi['dosage'] <= 0.1) & (
        df_htn_epi['length_of_time_on_med_in_min'] >= 60)]
    f = df_htn['htn_points'] == 3

    if not f.all():
        print(""" 
        epinephrine encoding wrong. need to ask felix to add more specific error report
        
        """)

    df_htn = df_htn_dop[(df_htn_dop['dosage'] > 15) & (
        df_htn_dop['length_of_time_on_med_in_min'] >= 60)]
    f = df_htn['htn_points'] == 4

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} dopamine value greater than 15 ug/kg/min was not coded as 4")

    df_htn = df_htn_epi[(df_htn_epi['dosage'] > 0.1) & (
        df_htn_epi['length_of_time_on_med_in_min'] >= 60)]
    f = df_htn['htn_points'] == 4

    if not f.all():
        raise AssertionError(
            f"{sum(~f)} epinephrine value greater than 0.1 ug/kg/min was not coded as 4")

    # The total SOFA score is indeed the sum of scores from each organ system (where NULL values are assumed to be zero).
    individual_sums = df['P_F_ratio_points'].fillna(0) + df['platelet_points'].fillna(0) + df['bilirubin_points'].fillna(
        0) + df['htn_points'].fillna(0) + df['gcs_points'].fillna(0) + df['renal_points'].fillna(0)
    h = individual_sums == df['SOFA']

    if not h.all():
        raise AssertionError(
            f"{sum(~h)} SOFA score doesn't correspond with the sum of individual scores")