# NCDR LAAO v1.3 & v1.4 Data Merge
This notebook uses the raw data exports from the NCDR system and merges the v1.3 & v1.4 versions together and create delta tables to push into a structured warehouse environment.

## Required Packages

In [355]:
import pandas as pd
import numpy as np
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, LongType, TimestampType, DoubleType


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 357, Finished, Available)

# v1.3 Ingestion

### In-hospital Ingestion

In [356]:
# import from Lakehouse file upload
# seperate each XLSX sheet
# add facility column with NCDR PID & custom column with hardocded natural key (Pat_ID)
# append multiple sheets into dataframes in a dictionary 

def read_excel_and_process_sheets(file_path):
    facility_code = file_path.split('/')[-1][4:10]
    sheets_dict = pd.read_excel(file_path, sheet_name=None, engine='openpyxl')
    mrn_columns = ['OtherID']
    
    for sheet_name, df in sheets_dict.items():
        df['facility'] = facility_code
        
        for column in mrn_columns:
            if column in df.columns:
                df[column] = df[column].apply(str)
                
        # Ensure 'ArrivalDate' is in datetime format before formatting
        if 'ArrivalDate' in df.columns:
            df['ArrivalDate'] = pd.to_datetime(df['ArrivalDate'], errors='coerce')  # Convert to datetime, coercing errors
            df['ArrivalDate'] = df['ArrivalDate'].dt.strftime('%Y-%m-%d')  # Then format to string
        
        # Ensure 'NCDRPatientID' is string
        if 'NCDRPatientID' in df.columns:
            df['NCDRPatientID'] = df['NCDRPatientID'].astype(str)
        
        # Create 'Pat_ID'
        if 'NCDRPatientID' in df.columns and 'ArrivalDate' in df.columns:
            df['Pat_ID'] = df['NCDRPatientID'] + '_' + df['ArrivalDate'] + '_' + df['facility']
        
        sheets_dict[sheet_name] = df
    
    return sheets_dict

def append_sheets_from_multiple_files(file_paths):
    all_sheets = {}
    
    for file_path in file_paths:
        sheets_dict = read_excel_and_process_sheets(file_path)
        
        for sheet_name, df in sheets_dict.items():
            if sheet_name in all_sheets:
                all_sheets[sheet_name] = pd.concat([all_sheets[sheet_name], df], ignore_index=True)
            else:
                all_sheets[sheet_name] = df
                
    return all_sheets

# Example usage
file_paths = ["/lakehouse/default/Files/LAAO/Version_1_3/LAAO136983-In-hospital-112017PAS.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO197501-In-hospital-112017SHY.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO410079-In-hospital-112017HAM.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO468377-In-hospital-112017ALT.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO697322-In-hospital-112017PUH.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO799953-In-hospital-112017PIN.xlsx"]

all_sheets_appended = append_sheets_from_multiple_files(file_paths)


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 358, Finished, Available)

### Follow up Ingestion

In [357]:
# import from Lakehouse file upload
# seperate each XLSX sheet
# add facility column with NCDR PID & custom column with hardocded natural key (Pat_ID)
# append multiple sheets into dataframes in a dictionary 

def read_excel_and_process_sheets(file_path):
    """
    Reads each sheet from an Excel file into a pandas DataFrame,
    adds a 'facility' column based on the file title, and stores them in a dictionary.
    """
    # Extract facility code from the file title
    facility_code = file_path.split('/')[-1][4:10]  # Adjust based on your filenames
    
    # Read the Excel file into a dict of DataFrames
    sheets_dict = pd.read_excel(file_path, sheet_name=None, engine='openpyxl')
    
    # Process each sheet
    for sheet_name, df in sheets_dict.items():
        df['facility'] = facility_code
        sheets_dict[sheet_name] = df

        # Ensure 'ArrivalDate' is in datetime format before formatting
        if 'ArrivalDate' in df.columns:
            df['ArrivalDate'] = pd.to_datetime(df['ArrivalDate'], errors='coerce')  # Convert to datetime, coercing errors
            df['ArrivalDate'] = df['ArrivalDate'].dt.strftime('%Y-%m-%d')  # Then format to string
        
        # Ensure 'NCDRPatientID' is string
        if 'NCDRPatientID' in df.columns:
            df['NCDRPatientID'] = df['NCDRPatientID'].astype(str)
        
        # Create 'Pat_ID'
        if 'NCDRPatientID' in df.columns and 'ArrivalDate' in df.columns:
            df['Pat_ID'] = df['NCDRPatientID'] + '_' + df['ArrivalDate'] + '_' + df['facility']

    return sheets_dict

def append_sheets_from_multiple_files(file_paths_FU):
    """
    Processes multiple Excel files, appending sheets with the same name across files.
    """
    all_sheets = {}  # Dictionary to store appended sheets
    
    for file_path in file_paths_FU:
        # Process each file
        sheets_dict = read_excel_and_process_sheets(file_path)
        
        # Append sheets with the same name
        for sheet_name, df in sheets_dict.items():
            if sheet_name in all_sheets:
                all_sheets[sheet_name] = pd.concat([all_sheets[sheet_name], df], ignore_index=True)
            else:
                all_sheets[sheet_name] = df
                
    return all_sheets


# Example usage
file_paths_FU = ["/lakehouse/default/Files/LAAO/Version_1_3/LAAO136983-Follow-up-112017PAS.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO197501-Follow-up-112017SHY.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO410079-Follow-up-112017HAM.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO468377-Follow-up-112017ALT.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO697322-Follow-up-112017PUH.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_3/LAAO799953-Follow-up-112017PIN.xlsx"]

all_sheets_appended_FU = append_sheets_from_multiple_files(file_paths_FU)



StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 359, Finished, Available)

## Feature Engineering
To align with v1.4 structure

### ProcedureStartDateTime In-Hospital

In [358]:
# ProcedureStartDateTime (merges ProcedureStartDate and ProcedureStartTime across all dataframes)

def combine_date_time(row):
    try:
        # Ensure the date is in datetime format
        date = pd.to_datetime(row['ProcedureStartDate'])
        
        # Explicitly format ProcedureStartTime as a string in case it's not
        time_str = str(row['ProcedureStartTime'])
        
        # Convert the time string to timedelta
        time_delta = pd.to_timedelta(time_str)
        
        # Combine the date and time into a single datetime object
        return date + time_delta
    except Exception as e:
        # Print row information to help diagnose the issue
        print(f"Error processing row: {row}")
        print(f"Exception: {e}")
        # Return a default value or handle the error as appropriate
        return pd.NaT

# Apply the conversion and update the DataFrame
for sheet_name, df in all_sheets_appended.items():
    if 'ProcedureStartDate' in df.columns and 'ProcedureStartTime' in df.columns:
        df['ProcedureStartDateTime'] = df.apply(combine_date_time, axis=1)
        df.drop(columns=['ProcedureStartDate', 'ProcedureStartTime'], inplace=True)
        all_sheets_appended[sheet_name] = df


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 360, Finished, Available)

### ProcedureStopDateTime In-Hospital

In [359]:
# ProcedureStopDateTime (merges ProcedureStopDate and ProcedureStopTime across all dataframes)

def combine_date_time(row):
    try:
        # Ensure the date is in datetime format
        date = pd.to_datetime(row['ProcedureStopDate'])
        
        # Explicitly format ProcedureStartTime as a string in case it's not
        time_str = str(row['ProcedureStopTime'])
        
        # Convert the time string to timedelta
        time_delta = pd.to_timedelta(time_str)
        
        # Combine the date and time into a single datetime object
        return date + time_delta
    except Exception as e:
        # Print row information to help diagnose the issue
        print(f"Error processing row: {row}")
        print(f"Exception: {e}")
        # Return a default value or handle the error as appropriate
        return pd.NaT

# Apply the conversion and update the DataFrame
for sheet_name, df in all_sheets_appended.items():
    if 'ProcedureStopDate' in df.columns and 'ProcedureStopTime' in df.columns:
        df['ProcedureStopDateTime'] = df.apply(combine_date_time, axis=1)
        df.drop(columns=['ProcedureStopDate', 'ProcedureStopTime'], inplace=True)
        all_sheets_appended[sheet_name] = df


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 361, Finished, Available)

### ProcedureStartDate & PrcocedureStartTime Follow-up

In [360]:
# ProcedureStartDateTime (merges ProcedureStartDate and ProcedureStartTime across all dataframes)

def combine_date_time(row):
    try:
        # Ensure the date is in datetime format
        date = pd.to_datetime(row['ProcedureStartDate'])
        
        # Explicitly format ProcedureStartTime as a string in case it's not
        time_str = str(row['ProcedureStartTime'])
        
        # Convert the time string to timedelta
        time_delta = pd.to_timedelta(time_str)
        
        # Combine the date and time into a single datetime object
        return date + time_delta
    except Exception as e:
        # Print row information to help diagnose the issue
        print(f"Error processing row: {row}")
        print(f"Exception: {e}")
        # Return a default value or handle the error as appropriate
        return pd.NaT

# Apply the conversion and update the DataFrame
for sheet_name, df in all_sheets_appended_FU.items():
    if 'ProcedureStartDate' in df.columns and 'ProcedureStartTime' in df.columns:
        df['RefProcStartDateTime'] = df.apply(combine_date_time, axis=1)
        df.drop(columns=['ProcedureStartDate', 'ProcedureStartTime'], inplace=True)
        all_sheets_appended_FU[sheet_name] = df

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 362, Finished, Available)

### ProcedureStopDateTime Follow-up

In [361]:
# ProcedureStopDateTime (merges ProcedureStopDate and ProcedureStopTime across all dataframes)

def combine_date_time(row):
    try:
        # Ensure the date is in datetime format
        date = pd.to_datetime(row['ProcedureStopDate'])
        
        # Explicitly format ProcedureStartTime as a string in case it's not
        time_str = str(row['ProcedureStopTime'])
        
        # Convert the time string to timedelta
        time_delta = pd.to_timedelta(time_str)
        
        # Combine the date and time into a single datetime object
        return date + time_delta
    except Exception as e:
        # Print row information to help diagnose the issue
        print(f"Error processing row: {row}")
        print(f"Exception: {e}")
        # Return a default value or handle the error as appropriate
        return pd.NaT

# Apply the conversion and update the DataFrame
for sheet_name, df in all_sheets_appended_FU.items():
    if 'ProcedureStopDate' in df.columns and 'ProcedureStopTime' in df.columns:
        df['ProcedureStopDateTime'] = df.apply(combine_date_time, axis=1)
        df.drop(columns=['ProcedureStopDate', 'ProcedureStopTime'], inplace=True)
        all_sheets_appended_FU[sheet_name] = df


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 363, Finished, Available)

### Add HispOrig

In [362]:
# HisOrig (converting 'HispEthnicityMexican','HispEthnicityPuertoRico', 'HispEthnicityCuban', 
#'HispEthnicityOtherOrigin' columns into v1.4 compatability)

def HispOrig(row):
    if row['HispEthnicityMexican'] == 'Yes':
        return 'Yes'
    elif row['HispEthnicityPuertoRico'] == 'Yes':
        return 'Yes'
    elif row ['HispEthnicityCuban'] == 'Yes':
        return 'Yes'
    elif row ['HispEthnicityOtherOrigin'] == 'Yes':
        return 'Yes'
    else:
        return 'No'

#apply the function to each row to create a new column
all_sheets_appended['In-hospital']['HispOrig'] = all_sheets_appended['In-hospital'].apply(HispOrig, axis=1)

# Drop the old columns from the 'In-hospital' DataFrame
all_sheets_appended['In-hospital'] = all_sheets_appended['In-hospital']

# Display the head of the DataFrame to verify the new column
print(all_sheets_appended['In-hospital'].head())

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 364, Finished, Available)

  NCDRPatientID  LastName FirstName MidName    OtherID          SSN SSNNA  \
0       1185318     SMITH     Basil       J  784565935  197422513.0    No   
1       3903583    PEALER   MICHAEL       L  785314254          NaN   Yes   
2       4914526    DISHER       JON       E  980510965          NaN   Yes   
3       6087603  Stafford    Lonnie       L  970579558          NaN   Yes   
4       6087684  Grealish    Robert       J  784976049          NaN   Yes   

         DOB     Sex RaceWhite  ... ADJ_BleedDevRelated  \
0 1951-03-21    Male       Yes  ...                 NaN   
1 1949-07-04    Male       Yes  ...                 NaN   
2 1944-01-18    Male       Yes  ...                 NaN   
3 1943-10-11  Female       Yes  ...                 NaN   
4 1949-09-10    Male       Yes  ...                 NaN   

  ADJ_SysThromboAdjStatus ADJ_SysThromboDeathDate ADJ_SysThromboDeathCause  \
0                     NaN                     NaN                      NaN   
1                     NaN 

### DevSucdep

In [363]:
# DevSucdep (converting 'DeviceOutcome' column into v1.4 compatability)

def devsucdep(row):
    if row['DeviceOutcome'] == 'Deployed, not released':
        return 'No'
    elif row['DeviceOutcome'] == 'Device retrieved':
        return 'No'
    elif row ['DeviceOutcome'] == 'Not Deployed':
        return 'No'
    elif row ['DeviceOutcome'] == 'Deployed, released':
        return 'Yes'
    else:
        return 'No'

#apply the function to each row to create a new column
all_sheets_appended['AccessSysID']['DevSucdep'] = all_sheets_appended['AccessSysID'].apply(devsucdep, axis=1)

# Drop the old columns from the 'AccessSysID' DataFrame
all_sheets_appended['AccessSysID'] = all_sheets_appended['AccessSysID']

# Display the head of the DataFrame to verify the new column
print(all_sheets_appended['AccessSysID'].head())

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 365, Finished, Available)

  NCDRPatientID LastName FirstName MidName    OtherID ArrivalDate     DCDate  \
0       1185318    SMITH     Basil       J  784565935  2021-02-09 2021-02-10   
1       1185318    SMITH     Basil       J  784565935  2021-02-09 2021-02-10   
2       1185318    SMITH     Basil       J  784565935  2021-02-09 2021-02-10   
3       3903583   PEALER   MICHAEL       L  785314254  2022-07-19 2022-07-20   
4       4914526   DISHER       JON       E  980510965  2020-12-01 2020-12-02   

   AccessSysCounter                                      AccessSysID  \
0                 1  WATCHMAN TruSeal Double Curve 14F Access System   
1                 1  WATCHMAN TruSeal Double Curve 14F Access System   
2                 1  WATCHMAN TruSeal Double Curve 14F Access System   
3                 1  WATCHMAN TruSeal Double Curve 14F Access System   
4                 1  WATCHMAN TruSeal Double Curve 14F Access System   

   DevCounter                               LAADevID  Dev_UDIDirectID  \
0           1

### OutDevUnsucDepl

In [364]:
# OutDevUnsucDepl (converting 'DeviceOutcome' column into v1.4 compatability)

def OutDevUnsucDepl(row):
    if row['DeviceOutcome'] == 'Deployed, not released':
        return 'Deployed, not released'
    elif row['DeviceOutcome'] == 'Device retrieved':
        return 'Device retrieved'
    elif row ['DeviceOutcome'] == 'Not Deployed':
        return 'Not Deployed'
    else:
        return np.nan

#apply the function to each row to create a new column
all_sheets_appended['AccessSysID']['OutDevUnsucDepl'] = all_sheets_appended['AccessSysID'].apply(OutDevUnsucDepl, axis=1)

# Drop the old columns from the 'AccessSysID' DataFrame
all_sheets_appended['AccessSysID'] = all_sheets_appended['AccessSysID']


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 366, Finished, Available)

## Rename Columns to Match v1.4
Includes labeling columns that are retired by noting which version they were retired with (i.e. "1_3" for version 1.3)

In [365]:
# Mapping of old column names to new column names
# Mappings for renaming
column_name_mapping_canceled = {
    'Anatomy not conducive for implant': 'ProcCanceledReason - Anatomy not conducive for implant',
    'Appendage too large (for device implant)': 'ProcCanceledReason - Appendage too large (for device implant)',
    'Appendage too small (for device implant)': 'ProcCanceledReason - Appendage too small (for device implant)',
    'Catheterization challenge': 'ProcCanceledReason - Catherization challenge',
    'Decompensation in patient condition': 'ProcCanceledReason - Decompensation in patient condition',
    'Epicardial access issue': 'ProcCanceledReason - Epicardial access issue',
    'Thrombus detected': 'ProcCanceledReason - Thrombus detected',
    'Unanticipated patient condition': 'ProcCanceledReason - Unanticipated patient condition',
    'Patient/Family choice': 'ProcCanceledReason -  Patient/Family choice'
}

column_name_mapping_aborted = {
    'Anatomy not conducive for implant': 'ProcAbortedReason - Anatomy not conducive for implant',
    'Appendage too large (for device implant)': 'ProcAbortedReason - Appendage too large (for device implant)',
    'Appendage too small (for device implant)': 'ProcAbortedReason - Appendage too small (for device implant)',
    'Catheterization challenge': 'ProcAbortedReason - Catherization challenge',
    'Decompensation in patient condition': 'ProcAbortedReason - Decompensation in patient condition',
    'Device related': 'ProcAbortedReason -  Device related',
    'Transcatheter device retrieval': 'ProcAbortedReason - Transcatheter device retrieval',
    'Device release criteria not met': 'ProcAbortedReason - Device release criteria not met',
    'Epicardial access issue': 'ProcAbortedReason - Epicardial access issue',
    'Surgical device retrieval': 'ProcAbortedReason - Surgical device retrieval',
    'Device associated thrombus developed during procedure': 'ProcAbortedReason - Device associated thrombus developed during procedure',
    'Unanticipated patient condition': 'ProcAbortedReason - Unanticipated patient condition',
    'Patient/Family choice': 'ProcAbortedReason - Patient/Family choice'
}

other_mapping = {
    'LAAOAdmission': 'LAAO_Adm',
    'IncreasedFallRisk': 'IncrFallRisk',
    'ConAntiCoagTherapy': 'ConAntiCoagTx',
    'AtrialFib': 'AFibInd',
    'AFibPriorAblStrategy': 'AFibPriorAblStrategyCode',
    'StrucIntervention': 'CardStrucInterv',
    'StrucInterventionType': 'CardStrucIntervType',
    'LAAIntervention': 'LAAOInterv',
    'LAAInterventionType': 'LAAOType',
    'EpicardialApproach': 'EpicardialAppCons',
    'EpiADLupus': 'LupusCons',
    'IEPerformed': 'ICEPerf',
    'AlbuminND': 'Albumin_ND',
    'PLTCount': 'PlateletCt',
    'PLTCountND': 'PlateletCtND',
    'RankinScaleNA': 'PostProc_RankinScaleNA',
    'MedAdmin': 'PreMedAdmin',
    'ProcTEEPerf': 'TEEPerfLAAO',
    'ProcTEEDate': 'TEEDateLAAO',
    'LAAOrificeMaxWidth': 'LAAO_OrWid',
    'ProcLocation': 'ProcedureLocation',
    'OperA_LastName': 'OperA_LastName2',
    'OperA_FirstName': 'OperA_FirstName2',
    'OperA_MidName': 'OperA_MidName2',
    'OperA_NPI': 'OperA_NPI2',
    'FluoroDoseDAP': 'FluoroDoseDAP2',
    'ProcHeparin': 'ProcHeparin2',
    'ProcHeparinInitAdmin': 'ProcHeparinInitAdminTime',
    'ProcBivalirudin': 'ProcBivalirudin2',
    'ProcOtherAnticoag': 'ProcOtherAnticoag2',
    'ProcEventOccurred': 'PostProcOccurred',
    'EventID': 'ProcEvents',
    'EventDate': 'IntraPostProcEventDate',
    'PostProcHgb': 'PostProcHgb2',
    'PostProcHgbND': 'PostProcHgbND2',
    'PostProcCreat': 'PostProcCreat2',
    'PostProcCreatND': 'PostProcCreatND2',
    'EOCSurgery': 'Sx_F',
    'F_FollowupInterval': 'FUInterv',
    'F_LVEFAssessed': 'FU_LVEF',
    'F_TTEPerf': 'TTEPerfFU',
    'F_TTEDate': 'TTEDate_F',
    'F_ResidualLeak': 'ResidualLeakFU',
    'F_ResidualLeakNA': 'ResidualLeakNAFU',
    'F_Creat': 'Creat_FU',
    'F_Hgb': 'LowHgbValue_F',
    'F_HgbND': 'HGBND_FU',
    'F_RankinScale': 'F_RankinScore',
    'F_RankinScaleNA': 'F_mRS_NA',
    'F_BIEToiletUse': 'F_BIEToilet',
    'F_MedAdmin': 'F_MedAdmin2',
    'F_MedDose': 'F_MedDose2',
    'F_NOACTherapyDiscontinued': 'F_DOACTherapyDiscontinued',
    'F_NOACTherapyDiscontinuedDate': 'F_DOACTherapyDiscontinuedDate',
    'F_NOACTherapyResumed': 'F_DOACTherapyResumed',
    'F_NOACTherapyResumedDate': 'F_DOACTherapyResumedDate',
    'F_FollowupEventOccurred': 'FupEvOccurred',
    'F_EventID': 'F_Event',
    'F_EventDate': 'FupEventDate',
    'F_ADJ_NeuroSxOnset': 'FU_ADJ_NeuroSxOnset',
    'F_ADJ_NeuroDeficit': 'FU_ADJ_NeuroDeficit',
    'F_ADJ_NeuroClinicPresent': 'FU_ADJ_NeuroClinicPresent',
    'F_ADJ_NeuroDxConfirmed': 'FU_ADJ_NeuroDxConfirmed',
    'F_ADJ_NeuroBrainImaging': 'FU_ADJ_NeuroBrainImaging',
    'F_ADJ_NeuroBrainImagingType': 'FU_ADJ_NeuroBrainImagingType',
    'F_ADJ_NeuroDeficitType': 'FU_ADJ_NeuroDeficitType',
    'F_ADJ_NeuroIntracranType': 'FU_ADJ_NeuroIntracranType',
    'F_ADJ_NeuroIVrTPA': 'FU_ADJ_NeuroIVrTPA',
    'F_ADJ_NeuroEndoTheraInter': 'FU_ADJ_NeuroEndoTheraInter',
    'F_ADJ_NeuroSxDuration': 'FU_ADJ_NeuroSxDuration',
    'F_ADJ_NeuroTrauma': 'FU_ADJ_NeuroTrauma',
    'F_ADJ_RankinScale': 'FU_ADJ_RankinScale',
    'F_ADJ_RankinScaleNA': 'FU_ADJ_RankinScaleNA',
    'F_ADJ_NeuroProcRelated': 'FU_ADJ_NeuroProcRelated',
    'F_ADJ_BleedDeathDate': 'FU_ADJ_BleedDeathDate',
    'F_ADJ_BleedInvInter': 'FU_ADJ_BleedInvInter',
    'F_ADJ_BleedRBCTransfusion': 'FU_ADJ_BleedRBCTransfusion',
    'F_ADJ_BleedRBCUnits': 'FU_ADJ_BleedRBCUnits',
    'F_ADJ_BleedPreTransHgb': 'FU_ADJ_BleedPreTransHgb',
    'F_ADJ_BleedImagePerf': 'FU_ADJ_BleedImagePerf',
    'F_ADJ_BleedEndOrganDamage': 'FU_ADJ_BleedEndOrganDamage',
    'F_ADJ_BleedPrimaryDC': 'FU_BleedReadm',
    'F_ADJ_BleedMajorSurgery': 'FU_ADJ_BleedMajorSurgery',
    'F_ADJ_BleedPCI': 'FU_ADJ_BleedPCI',
    'F_ADJ_BleedProcRelated': 'FU_ADJ_BleedProcRelated',
    'F_ADJ_BleedDevRelated': 'FU_ADJ_BleedDevRelated',
    'F_ADJ_MedID': 'FU_ADJ_MedID',
    'F_ADJ_MedAdmin': 'FU_ADJ_MedAdmin',
    'F_ADJ_SysThromboAdjStatus': 'FU_ADJ_SysThromboAdjStatus',
    'F_ADJ_SysThromboHypoperfusion': 'FU_ADJ_SysThromboHypoperfusion',
    'F_ADJ_SysThromboImagEvidence': 'FU_ADJ_SysThromboImagEvidence',
    'F_ADJ_SysThromboImagMethod': 'FU_ADJ_SysThromboImagMethod',
    'F_ADJ_SysThromboTheraInterv': 'FU_ADJ_SysThromboTheraInterv',
    'F_ADJ_SysThromboIntervType': 'FU_ADJ_SysThromboIntervType',
    'F_AspirinDiscontinued': 'F_AspirinTherapyDiscontinued',
    'F_AspirinDiscontinuedDate': 'F_AspirinTherapyDiscontinuedDate',
    'F_AspirinResumed': 'F_AspirinTherapyResumed',
    'F_AspirinResumedDate': 'F_AspirinTherapyResumedDate',
    'F_P2Y12Discontinued': 'F_P2Y12TherapyDiscontinued',
    'F_P2Y12DiscontinuedDate': 'F_P2Y12TherapyDiscontinuedDate',
    'F_P2Y12Resumed': 'F_P2Y12TherapyResumed',
    'F_P2Y12Date': 'F_P2Y12TherapyResumedDate',
    'RegistryVer': 'SchemaVersion',
    'RaceAsianIndian': 'RaceAsianIndian_RETIRED_1_3',
    'RaceChinese': 'RaceChinese_RETIRED_1_3',
    'RaceFilipino': 'RaceFilipino_RETIRED_1_3',
    'RaceJapanese': 'RaceJapanese_RETIRED_1_3',
    'RaceKorean': 'RaceKorean_RETIRED_1_3',
    'RaceVietnamese': 'RaceVietnamese_RETIRED_1_3',
    'RaceAsianOther': 'RaceAsianOther_RETIRED_1_3',
    'RaceNativeHawaii': 'RaceNativeHawaii_RETIRED_1_3',
    'RaceGuamChamorro': 'RaceGuamChamorro_RETIRED_1_3',
    'RaceSamoan': 'RaceSamoan_RETIRED_1_3',
    'RacePacificIslandOther': 'RacePacificIslandOther_RETIRED_1_3',
    'HispEthnicityMexican': 'HispEthnicityMexican_RETIRED_1_3',
    'HispEthnicityPuertoRico': 'HispEthnicityPuertoRico_RETIRED_1_3',
    'HispEthnicityCuban': 'HispEthnicityCuban_RETIRED_1_3',
    'HispEthnicityOtherOrigin': 'HispEthnicityOtherOrigin_RETIRED_1_3',
    'HIC': 'HIC_RETIRED_1_3',
    'PtRestriction': 'PtRestriction_RETIRED_1_3',
    'FluoroDoseDAP_Units': 'FluoroDoseDAP2_Units',
    'EventOccurred': 'EventOccurred_RETIRED_1_3',
    'PostProcMedStrategy_MedID': 'PostProcMedStrategy_MedID_RETIRED_1_3',
    'PostProcMedPlanStrategy': 'PostProcMedPlanStrategy_RETIRED_1_3',
    'PostProcMedStrategyReason': 'PostProcMedStrategyReason_RETIRED_1_3',
    'F_EventOccurred': 'F_EventOccurred_RETIRED_1_3',
    'Private Health Insurance': 'HIPS - Private Health Insurance',
    'Military Health Care': 'HIPS - Military Health Care',
    'State-Specific Plan (non-Medicaid)': 'HIPS - State-Specific Plan (non-Medicaid)',
    'Indian Health Service': 'HIPS - Indian Health Service',
    'Non-US Insurance': 'HIPS - Non-US Insurance',
    'Medicare Advantage': 'HIPS - Medicare Advantage',
    'Medicare': 'HIPS - Medicare',
    'Medicaid': 'HIPS - Medicaid',
    'Myocardial Infarction (MI)': 'PriorVD - Prior Myocardial Infarction (MI)',
    'Known Aortic Plaque': 'PriorVD - Known Aortic Plaque',
    'Peripheral arterial occlusive disease (PAD)': 'PriorVD - Peripheral Arterial Occlusive Disease (PAD)',
    'Coronary artery disease (CAD)': 'PriorVD - Coronary Artery Disease (CAD)*',
    'Percutaneous coronary intervention (PCI)': 'PriorVD - Percutaneous Coronary Intervention (PCI)*',
    'Coronary artery bypass graft (CABG)': 'PriorVD - Coronary Artery Bypass Graft (CABG)*',
    'Carotid disease': 'PriorVD - Carotid Artery Disease*',
    'BleedEventType (Intracranial)': 'BleedEventType - Intracranial Bleed',
    'BleedEventType (Epistaxis)': 'BleedEventType - Epistaxis',
    'BleedEventType (Gastrointestinal)': 'BleedEventType - Gastrointestinal Bleed',
    'BleedEventType (Other)': 'BleedEventType - Other',
    'Complex Fractionated Atrial Electrogram': 'AFibPriorAblStrategyCode - Complex Fractionated Atrial Electrogram',
    'Cryoablation': 'AFibPriorAblStrategyCode - Convergent Procedure',
    'Convergent Procedure': 'AFibPriorAblStrategyCode - Cryoablation',
    'Empiric LA Linear Lesions': 'AFibPriorAblStrategyCode - Empiric LA Linear Lesions',
    'Focal Ablation': 'AFibPriorAblStrategyCode - Focal Ablation',
    'Ganglion Plexus Ablation': 'AFibPriorAblStrategyCode - Ganglion Plexus Ablation',
    'Pulmonary Vein Isolation': 'AFibPriorAblStrategyCode - Pulmonary Vein Isolation',
    'Segmental PV Ablation': 'AFibPriorAblStrategyCode - Segmental PV Ablation',
    'Rotor Based Mapping': 'AFibPriorAblStrategyCode - Rotor Based Mapping',
    'Wide Area Circumferential Ablation': 'AFibPriorAblStrategyCode - Wide Area Circumferential Ablation',
    'Aortic Balloon Valvuloplasty': 'CardStrucIntervType - Aortic Balloon Valvuloplasty',
    'Transcatheter Aortic Valve Replacement (TAVR)': 'CardStrucIntervType - Transcatheter Aortic Valve Replacement (TAVR)',
    'Aortic Valve Replacement – Surgical': 'CardStrucIntervType - AV Replacement - Surgical',
    'Aortic Valve Repair – Surgical': 'CardStrucIntervType - AV Repair - Surgical',
    'Mitral Balloon Valvuloplasty': 'CardStrucIntervType - Mitral Balloon Valvuloplasty',
    'Transcatheter Mitral Valve Repair (TMVR)': 'CardStrucIntervType -  Transcatheter Mitral Valve Repair (TMVR)',
    'Mitral Valve Replacement – Surgical': 'CardStrucIntervType - MV Replacement - Surgical',
    'Mitral Valve Repair – Surgical': 'CardStrucIntervType - MV Repair - Surgical',
    'Mitral Annuloplasty Ring – Surgical': 'CardStrucIntervType - Mitral Annuloplasty Ring - Surgical',
    'Mitral Transcatheter – Valve-in-Valve': 'CardStrucIntervType - Mitral Transcatheter - Valve-in-valve',
    'Atrial Septal Defect Closure': 'CardStrucIntervType - ASD Closure',
    'Patent Foramen Ovale Closure': 'CardStrucIntervType - PFO Closure',
    'Pulmonic Replacement': 'CardStrucIntervType - Pulmonic Replacement',
    'Pulmonic Repair': 'CardStrucIntervType - Pulmonic Repair',
    'Tricuspid Replacement': 'CardStrucIntervType - Tricuspid Replacement',
    'Tricuspid Repair': 'CardStrucIntervType - Tricuspid Repair',
    'Epicardial Ligation': 'LAAOType - Epicardial Ligation',
    'Percutaneous Occlusion': 'LAAOType - Surgical Amputation',
    'Surgical Amputation': 'LAAOType - Surgical Ligation',
    'Surgical Closure Device': 'LAAOType - Percutaneous Occlusion',
    'Surgical Ligation': 'LAAOType - Surgical Closure Device',
    'Surgical Stapling': 'LAAOType - Surgical Stapling',
    'Non-ischemic cardiomyopathy': 'PriorCMType - Non-ischemic cardiomyopathy',
    'Ischemic cardiomyopathy': 'PriorCMType - Other cardiomyopathy type',
    'Restrictive cardiomyopathy': 'PriorCMType - Restrictive cardiomyopathy',
    'Hypertrophic cardiomyopathy': 'PriorCMType - Ischemic cardiomyopathy',
    'Other Cardiomyopathy Type': 'PriorCMType - Hypertrophic cardiomyopathy',
    'HemorrhagicStroke': 'HBStrokeType - Hemorrhagic Stroke',
    'IschemicStroke': 'HBStrokeType - Ischemic Stroke',
    'UndeterminedStroke': 'HBStrokeType - Undetermined Stroke',
    'EpiCardiacSurgery': 'MedCond - Cardiac Surgery', 
    'EpiPericarditis': 'MedCond - Pericarditis', 
    'EpiAccess': 'MedCond - Epicardial Access', 
    'EpiAutoimmuneDisease': 'MedCond - Autoimmune Disease',
    'EpiThorRadTherapy': 'MedCond - Thoracic Radiation Therapy', 
    'EpiPectusExcavatum': 'MedCond - Pectus Excavatum', 
    'EpiEpigastricSurg': 'MedCond - Epigastric Surgery',
    'EpiHepatomegaly': 'MedCond - Hepatomegaly', 
    'EpiHiatalHernia': 'MedCond - Hiatal Hernia',
    'Sinus node rhythm': 'AtrialRhythm - Sinus node rhythm', 
    'Atrial fibrillation': 'AtrialRhythm - Atrial fibrillation', 
    'Atrial tachycardia': 'AtrialRhythm - Atrial tachycardia', 
    'Atrial flutter': 'AtrialRhythm - Atrial flutter', 
    'Sinus arrest': 'AtrialRhythm - Sinus arrest', 
    'Atrial paced': 'AtrialRhythm - Atrial paced', 
    'Undocumented atrial rhythm': 'AtrialRhythm - Not Documented',
    'ProcedureStopDateTime' : 'ProcedureEndDateTime',
    'High fall risk': 'ProcLAAOInd - High fall risk',
    'History of major bleed': 'ProcLAAOInd - History of major bleed',
    'Increased thromboembolic stroke risk': 'ProcLAAOInd - Increased thromboembolic stroke risk',
    'Labile INR': 'ProcLAAOInd - Labile INR',
    'Non-compliance with anticoagulation therapy': 'ProcLAAOInd - Non-compliance with anticoagulation therapy',
    'Patient preference': 'ProcLAAOInd - Patient preference',
    'Fluoroscopy': 'GuidanceMethodID - Fluoroscopy',
    'Intracardiac three dimensional echocardiography': 'GuidanceMethodID - Intracardiac three dimensional echocardiography',
    'Electro Anatomic Mapping': 'GuidanceMethodID - Electro Anatomic Mapping',
    'Transesophageal Echocardiography (TEE) ': 'GuidanceMethodID - Transesophageal Echocardiogram (TEE)',
    'AFib Ablation': 'ConcomitantProcType - AFib Ablation',
    'ICD': 'ConcomitantProcType - ICD',
    'PCI': 'ConcomitantProcType - PCI',
    'TAVR': 'ConcomitantProcType - TAVR',
    'TMVR': 'ConcomitantProcType - TMVR',
    'ASD Closure Congenital': 'ConcomitantProcType - ASD Closure Congenital',
    'ASD Closure Iatrogenic': 'ConcomitantProcType - ASD Closure Iatrogenic',
    'PFO Closure Congenital': 'ConcomitantProcType - PFO Closure Congenital',
    'F_Method_Office': 'F_Method - Office Visit',
    'F_Method_MedRecord': 'F_Method - Medical Records',
    'F_Method_MedProvider': 'F_Method - Letter from Medical Provider',
    'F_Method_Phone': 'F_Method - Phone Call',
    'F_Method_SSFile': 'F_Method - Social Security Death Master File',
    'F_Method_Hospital': 'F_Method - Hospitalized',
    'F_Method_Other': 'F_Method - Other'
}

def rename_columns(df, column_mapping):
    """Function to rename columns for a single DataFrame."""
    return df.rename(columns=column_mapping)

# Example logic for determining which mapping to apply
def determine_mapping(sheet_name):
    """Determine which column mapping to use based on sheet_name or other logic."""
    if "canceled" in sheet_name.lower():
        return column_name_mapping_canceled
    elif "aborted" in sheet_name.lower():
        return column_name_mapping_aborted
    # Add additional conditions as needed
    else:
        return other_mapping  # Or return a default mapping if applicable

# Apply renaming logic to each DataFrame in the dictionary
for sheet_name, df in all_sheets_appended.items():
    mapping_to_use = determine_mapping(sheet_name)
    if mapping_to_use is not None:
        all_sheets_appended[sheet_name] = rename_columns(df, mapping_to_use)

# Repeat for any other dictionaries of DataFrames as needed
for sheet_name, df in all_sheets_appended_FU.items():
    mapping_to_use = determine_mapping(sheet_name)
    if mapping_to_use is not None:
        all_sheets_appended_FU[sheet_name] = rename_columns(df, mapping_to_use)



StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 367, Finished, Available)

### Rename Specific Followup Columns Seperately

In [366]:
# Rename Followup ArrivalDate and DischargeDate
# seperate because only want follow up tables renamed

# Mapping of old column names to new column names
column_name_mapping = {
    'ArrivalDate': 'FURefArrivalDate', 
    'DCDate': 'FU_RefDischargeDate'
    }

# Loop through all DataFrames in the second dictionary and rename columns
for sheet_name, df in all_sheets_appended_FU.items():
    all_sheets_appended_FU[sheet_name] = rename_columns(df, column_name_mapping)

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 368, Finished, Available)

## Write DFs back to files

### Adjust any columns that run into conflict with Parquet conversion to string type

In [367]:
def adjust_column_data_types(df, columns_to_adjust):
    """
    Adjusts the data types of specified columns to string.
    
    Parameters:
    - df: DataFrame to process.
    - columns_to_adjust: List of column names to adjust to string type.
    
    Returns:
    - DataFrame with adjusted column types.
    """
    for column in columns_to_adjust:
        if column in df.columns:
            df[column] = df[column].astype(str)
    return df

# Specify columns that you know should be treated as strings
columns_to_adjust = ['HIC_RETIRED_1_3', 'MBI']

# Before writing to Parquet, adjust the data types of specified columns
for sheet_name, df in all_sheets_appended.items():
    df = adjust_column_data_types(df, columns_to_adjust)  # Adjust data types


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 369, Finished, Available)

In [368]:
LAKEHOUSE_PATH = "/lakehouse/default/Files/LAAO/Version_1_3/Merged"

# Function to write DataFrame to parquet
def write_dataframe_to_parquet(df, path, filename):
    # Construct the full path
    full_path = os.path.join(path, filename)
    # Write the DataFrame to Parquet
    df.to_parquet(full_path, index=False, compression='snappy')

# Loop through all DataFrames in the first dictionary and write them to Parquet
for sheet_name, df in all_sheets_appended.items():
    filename = f"{sheet_name}.parquet"  # Construct a filename based on the sheet name
    write_dataframe_to_parquet(df, LAKEHOUSE_PATH, filename)

# Loop through all DataFrames in the second dictionary and write them to Parquet
for sheet_name, df in all_sheets_appended_FU.items():
    filename = f"{sheet_name}.parquet"  # Construct a filename with a suffix for follow-up sheets
    write_dataframe_to_parquet(df, LAKEHOUSE_PATH, filename)

# Now, all DataFrames from both dictionaries should be written to your lakehouse in Parquet format.


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 370, Finished, Available)

## Clean Data for v1.4 Merge

### Clean In-Hospital

In [369]:
import pandas as pd

# Load the data
in_hospital_df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet")
afib_strategy_df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/AFibPriorAblStrategy.parquet")

# Perform a left join on the keys: 'facility', 'OtherID', 'NCDRPatientID'
merged_df = pd.merge(in_hospital_df, afib_strategy_df[['facility', 'OtherID', 'NCDRPatientID', 'PrevAFibTermCA']],
                     on=['facility', 'OtherID', 'NCDRPatientID'], how='left')

# Convert "LVEF" from "68" to "0.68"
merged_df['LVEF'] = merged_df['LVEF'].astype(float) / 100

# Drop the specified columns from the merged DataFrame
columns_to_drop = ["PrevAFLTerm.1", "PrevAFibTermDC.1"]
merged_df = merged_df.drop(columns=columns_to_drop, errors='ignore')

# Correctly evaluate the conditions using pandas' `.any(axis=1)` method
merged_df['PrevAFibTerm'] = 'No'  # Default value
# Combine all conditions using the `|` operator, which acts as an OR operator on pandas Series
condition = (
    (merged_df['PrevAFibTermPC'] == 'Yes') |
    (merged_df['PrevAFibTermDC'] == 'Yes') |
    (merged_df['PrevAFibTermSA'] == 'Yes') |
    (merged_df['PrevAFibTermCA'] == 'Yes')
)
# Apply the condition for each row using `.any(axis=1)`
merged_df.loc[condition, 'PrevAFibTerm'] = 'Yes'  # Set to 'Yes' if any condition is met


# Remove duplicates based on the natural key while keeping the row with the most non-null values (original file ingested had duplicates)
def keep_max_non_null_row(df):
    return df.loc[df.notnull().sum(axis=1).idxmax()]

# Assuming 'ArrivalDate' is a column in your DataFrame. If not, adjust the column names as necessary.
merged_df = merged_df.groupby(['NCDRPatientID', 'facility', 'ArrivalDate']).apply(keep_max_non_null_row).reset_index(drop=True)

# List of columns to update
columns_to_update = [
    "ProcOtherAnticoag2",
    "ProcBivalirudin2",
    "ProcHeparin2",
    "ProcedureLocation",
    "FluoroDoseDAP2_Units",
    "Sex"
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    merged_df[column] = merged_df[column].replace({'Yes': 'Yes - Prescribed', 'No': 'No - Not Prescribed', 'Cath Lab': 'Cardiac Catheterization Laboratory',
    'Hybrid OR': 'Hybrid Operating Room Suite', 'Hybrid Cath Lab': 'Hybrid Catheterization Laboratory Suite',
    'cGy-cm2': 'cGy·cm²', 'Gy-cm2': 'Gy·cm²', 'mGy-cm2': 'mGy·cm²', 'dGy-cm2': 'dGy·cm²', 'µGy-M2': 'µGy·cm²', 'Female': 'F', 'Male': 'M'})

# Overwrite the original file with the modified data
merged_df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet", index=False)

print("File has been successfully modified.")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 371, Finished, Available)

File has been successfully modified.


### Clean HIPS

In [370]:
# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/HIPS.parquet")

# List of columns to update
columns_to_update = [
    "HIPS - Private Health Insurance",
    "HIPS - Medicare",
    "HIPS - Medicare Advantage",
    "HIPS - Medicaid",
    "HIPS - Military Health Care",
    "HIPS - State-Specific Plan (non-Medicaid)",
    "HIPS - Indian Health Service",
    "HIPS - Non-US Insurance"
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/HIPS.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 372, Finished, Available)

File has been successfully overwritten


### Clean BleedEventType

In [371]:
# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/BleedEventType.parquet")

# List of columns to update
columns_to_update = [
    "BleedEventType - Intracranial Bleed",
    "BleedEventType - Epistaxis",
    "BleedEventType - Gastrointestinal Bleed",
    "BleedEventType - Other"
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/BleedEventType.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 373, Finished, Available)

File has been successfully overwritten


### Clean AtrialRhythm

In [372]:
# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/AtrialRhythm.parquet")

# List of columns to update
columns_to_update = [
    'AtrialRhythm - Sinus node rhythm', 
    'AtrialRhythm - Atrial fibrillation', 
    'AtrialRhythm - Atrial tachycardia', 
    'AtrialRhythm - Atrial flutter', 
    'AtrialRhythm - Sinus arrest', 
    'AtrialRhythm - Atrial paced', 
    'AtrialRhythm - Not Documented'
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/AtrialRhythm.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 374, Finished, Available)

File has been successfully overwritten


###  Clean StrucInterventionType

In [373]:
# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/StrucInterventionType.parquet")

# List of columns to update
columns_to_update = [
    'CardStrucIntervType - Aortic Balloon Valvuloplasty', 
    'CardStrucIntervType - Transcatheter Aortic Valve Replacement (TAVR)', 
    'CardStrucIntervType - AV Replacement - Surgical', 
    'CardStrucIntervType - AV Repair - Surgical', 
    'CardStrucIntervType - Mitral Balloon Valvuloplasty', 
    'CardStrucIntervType -  Transcatheter Mitral Valve Repair (TMVR)', 
    'CardStrucIntervType - MV Replacement - Surgical', 
    'CardStrucIntervType - MV Repair - Surgical', 
    'CardStrucIntervType - Mitral Annuloplasty Ring - Surgical', 
    'CardStrucIntervType - Mitral Transcatheter - Valve-in-valve', 
    'CardStrucIntervType - ASD Closure', 
    'CardStrucIntervType - PFO Closure', 
    'CardStrucIntervType - Pulmonic Replacement', 
    'CardStrucIntervType - Pulmonic Repair', 
    'CardStrucIntervType - Tricuspid Replacement', 
    'CardStrucIntervType - Tricuspid Repair', 

]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/StrucInterventionType.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 375, Finished, Available)

File has been successfully overwritten


### Clean PriorVD

In [374]:

# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/PriorVD.parquet")

# List of columns to update
columns_to_update = [
    'PriorVD - Prior Myocardial Infarction (MI)', 
    'PriorVD - Peripheral Arterial Occlusive Disease (PAD)', 
    'PriorVD - Known Aortic Plaque', 
    'PriorVD - Coronary Artery Disease (CAD)*', 
    'PriorVD - Percutaneous Coronary Intervention (PCI)*', 
    'PriorVD - Coronary Artery Bypass Graft (CABG)*', 
    'PriorVD - Carotid Artery Disease*'
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/PriorVD.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 376, Finished, Available)

File has been successfully overwritten


### Clean PriorCMType

In [375]:

# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/PriorCMType.parquet")

# List of columns to update
columns_to_update = [
    'PriorCMType - Non-ischemic cardiomyopathy', 
    'PriorCMType - Ischemic cardiomyopathy', 
    'PriorCMType - Restrictive cardiomyopathy', 
    'PriorCMType - Hypertrophic cardiomyopathy', 
    'PriorCMType - Other cardiomyopathy type'
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/PriorCMType.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 377, Finished, Available)

File has been successfully overwritten


### Clean LAAInterventionType

In [376]:

# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/LAAInterventionType.parquet")

# List of columns to update
columns_to_update = [
    'LAAOType - Epicardial Ligation', 
    'LAAOType - Surgical Amputation', 
    'LAAOType - Surgical Ligation', 
    'LAAOType - Percutaneous Occlusion', 
    'LAAOType - Surgical Closure Device', 
    'LAAOType - Surgical Stapling'
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/LAAInterventionType.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 378, Finished, Available)

File has been successfully overwritten


### ProcCanceledReason

In [377]:

# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcCanceledReason.parquet")

# List of columns to update
columns_to_update = [
    'ProcCanceledReason - Anatomy not conducive for implant', 
    'ProcCanceledReason - Appendage too large (for device implant)', 
    'ProcCanceledReason - Appendage too small (for device implant)', 
    'ProcCanceledReason - Catherization challenge', 
    'ProcCanceledReason - Decompensation in patient condition', 
    'ProcCanceledReason - Epicardial access issue', 
    'ProcCanceledReason - Thrombus detected', 
    'ProcCanceledReason - Unanticipated patient condition', 
    'ProcCanceledReason -  Patient/Family choice'

]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Remove duplicates based on 'NCDRPatientID', 'facility', 'ArrivalDate'
# Keep the first occurrence of each duplicate
df = df.drop_duplicates(subset=['NCDRPatientID', 'facility', 'ArrivalDate'], keep='first')

# Overwrite the original parque file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcCanceledReason.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 379, Finished, Available)

File has been successfully overwritten


### Clean ProcAbortedReason

In [378]:

# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcAbortedReason.parquet")

# List of columns to update
columns_to_update = [
    'ProcAbortedReason - Anatomy not conducive for implant', 
    'ProcAbortedReason - Appendage too large (for device implant)', 
    'ProcAbortedReason - Appendage too small (for device implant)', 
    'ProcAbortedReason - Catherization challenge', 
    'ProcAbortedReason - Decompensation in patient condition', 
    'ProcAbortedReason -  Device related', 
    'ProcAbortedReason - Transcatheter device retrieval', 
    'ProcAbortedReason - Device release criteria not met', 
    'ProcAbortedReason - Epicardial access issue', 
    'ProcAbortedReason - Surgical device retrieval', 
    'ProcAbortedReason - Device associated thrombus developed during procedure', 
    'ProcAbortedReason - Unanticipated patient condition', 
    'ProcAbortedReason - Patient/Family choice'


]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Remove duplicates based on 'NCDRPatientID', 'facility', 'ArrivalDate'
# Keep the first occurrence of each duplicate
df = df.drop_duplicates("Pat_ID", keep='first')

# Overwrite the original parquet file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcAbortedReason.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 380, Finished, Available)

File has been successfully overwritten


### Clean AfibPriorAblStrategy

In [379]:

# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/AFibPriorAblStrategy.parquet")

# List of columns to update
columns_to_update = [
    'AFibPriorAblStrategyCode - Complex Fractionated Atrial Electrogram', 
    'AFibPriorAblStrategyCode - Convergent Procedure', 
    'AFibPriorAblStrategyCode - Cryoablation', 
    'AFibPriorAblStrategyCode - Empiric LA Linear Lesions', 
    'AFibPriorAblStrategyCode - Focal Ablation', 
    'AFibPriorAblStrategyCode - Ganglion Plexus Ablation', 
    'AFibPriorAblStrategyCode - Pulmonary Vein Isolation', 
    'AFibPriorAblStrategyCode - Segmental PV Ablation', 
    'AFibPriorAblStrategyCode - Rotor Based Mapping', 
    'AFibPriorAblStrategyCode - Wide Area Circumferential Ablation' 
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original parquet file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/AFibPriorAblStrategy.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 381, Finished, Available)

File has been successfully overwritten


### Clean ConcomitantProcType

In [380]:

# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ConcomitantProcType.parquet")

# List of columns to update
columns_to_update = [
    'ConcomitantProcPerf', 
    'ConcomitantProcType - AFib Ablation', 
    'ConcomitantProcType - ICD', 
    'ConcomitantProcType - PCI', 
    'ConcomitantProcType - TAVR', 
    'ConcomitantProcType - TMVR', 
    'ConcomitantProcType - ASD Closure Congenital', 
    'ConcomitantProcType - ASD Closure Iatrogenic', 
    'ConcomitantProcType - PFO Closure Congenital'

]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Remove duplicates based on 'NCDRPatientID', 'facility', 'ArrivalDate'
# Keep the first occurrence of each duplicate
df = df.drop_duplicates(subset=['NCDRPatientID', 'facility', 'ArrivalDate'], keep='first')

# Overwrite the original parquet file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ConcomitantProcType.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 382, Finished, Available)

File has been successfully overwritten


### Clean ProcLAAOInd

In [381]:
# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcLAAOInd.parquet")

# List of columns to update
columns_to_update = [
    'ProcLAAOInd - High fall risk', 
    'ProcLAAOInd - History of major bleed', 
    'ProcLAAOInd - Increased thromboembolic stroke risk', 
    'ProcLAAOInd - Labile INR', 
    'ProcLAAOInd - Non-compliance with anticoagulation therapy', 
    'ProcLAAOInd - Patient preference'
 
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Remove duplicates based on 'NCDRPatientID', 'facility', 'ArrivalDate'
# Keep the first occurrence of each duplicate
df = df.drop_duplicates(subset=['NCDRPatientID', 'facility', 'ArrivalDate'], keep='first')


# Overwrite the original parquet file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcLAAOInd.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 383, Finished, Available)

File has been successfully overwritten


### Clean GuidanceMethodID

In [382]:
# Convert 'Y' & 'N' to "Yes" & "No"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/GuidanceMethodID.parquet")

# List of columns to update
columns_to_update = [
    'GuidanceMethodID - Intracardiac three dimensional echocardiography', 
    'GuidanceMethodID - Electro Anatomic Mapping', 
    'GuidanceMethodID - Fluoroscopy',
    'GuidanceMethodID - Transesophageal Echocardiogram (TEE)'
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Y': 'Yes', 'N': 'No', 'NULL': 'No'})

# Overwrite the original parquet file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/GuidanceMethodID.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 384, Finished, Available)

File has been successfully overwritten


### Clean Follow-up

In [383]:
# Convert LVEF form "xx" to "0.xx"
import pandas as pd

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/Follow-up.parquet")

# Remove duplicates based on 'NCDRPatientID', 'facility', 'FURefArrivalDate', 'F_AssessmentDate'
# multiple duplicates exist in original export form NCDR for same follow up visit 
# duplicates were generated by entering readmissions into the follow up section
# Keep the first occurrence of each duplicate
df = df.drop_duplicates(subset=['NCDRPatientID', 'facility', 'FURefArrivalDate', 'F_AssessmentDate'], keep='first')

# Remove the '%' sign and convert to numeric
df['F_LVEF'] = df['F_LVEF'].astype(float) / 100

# List of columns to update
columns_to_update = [
    'Sex', 'F_WarfarinDiscontinued', 'F_DOACTherapyDiscontinued', 'F_AspirinTherapyDiscontinued', 'F_P2Y12TherapyDiscontinued'
]

# Loop through each column and replace 'Y' with 'Yes' and 'N' with 'No'
for column in columns_to_update:
    df[column] = df[column].replace({'Female': 'F', 'Male': 'M', 'No': 'No - Not Discontinued', 'Yes': 'Yes - Discontinued'})

# Overwrite the original parquet file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_3/Merged/Follow-up.parquet", index=False)

print("File has been successfully overwritten")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 385, Finished, Available)

File has been successfully overwritten


# v1.4 Ingestion

## In-hospital Ingestion

In [384]:
# import from Lakehouse file upload
# seperate each XLSX sheet
# add facility column with NCDR PID & custom column with hardocded natural key (Pat_ID)
# append multiple sheets into one into dataframes in a dictionary 

def read_excel_and_process_sheets(file_path):
    """
    Reads each sheet from an Excel file into a pandas DataFrame,
    adds a 'facility' column based on the file title, and converts ArrivalDate and DCDate to datetime format.
    """
    # Extract facility code from the file title
    facility_code = file_path.split('/')[-1][4:10]  # Adjust based on your filenames
    
    # Read the Excel file into a dict of DataFrames
    sheets_dict = pd.read_excel(file_path, sheet_name=None, engine='openpyxl')

    # List of date columns to convert
    date_columns = [
        'ArrivalDate', 'DCDate', 'AFibCathAblDate', 'AFibSurgAblDate',
        'AFibFlutterCathAblDate', 'TTEDate', 'CTImagingDate', 'MRDate',
        'ICEDate', 'ProcedureStartDateTime', 'TEEDateLAAO',
        'ProcedureEndDateTime', 'IntraPostProcEventDate', 'AJ_EventDate',
        'ADJ_DeathDate', 'ADJ_BleedDeathDate', 'ADJ_SysThromboDeathDate', 'DOB'
    ]
    
    #list columns to convert to string for parquet formatting issues
    string_columns = ['OtherID']

    # Process each sheet
    for sheet_name, df in sheets_dict.items():
        # Add 'facility' column
        df['facility'] = facility_code
        
        # Convert specified date columns to datetime if they exist
        for column in date_columns:
            if column in df.columns:
                df[column] = pd.to_datetime(df[column])

        for column in string_columns:
            if column in df.columns:
                df[column] = df[column].astype(str)

        # Ensure 'ArrivalDate' is in datetime format before formatting
        if 'ArrivalDate' in df.columns:
            df['ArrivalDate'] = pd.to_datetime(df['ArrivalDate'], errors='coerce')  # Convert to datetime, coercing errors
            df['ArrivalDate'] = df['ArrivalDate'].dt.strftime('%Y-%m-%d')  # Then format to string
        
        # Ensure 'NCDRPatientID' is string
        if 'NCDRPatientID' in df.columns:
            df['NCDRPatientID'] = df['NCDRPatientID'].astype(str)
        
        # Create 'Pat_ID'
        if 'NCDRPatientID' in df.columns and 'ArrivalDate' in df.columns:
            df['Pat_ID'] = df['NCDRPatientID'] + '_' + df['ArrivalDate'] + '_' + df['facility']
        
        sheets_dict[sheet_name] = df
    
    return sheets_dict

def append_sheets_from_multiple_files(file_paths):
    """
    Processes multiple Excel files, appending sheets with the same name across files.
    """
    all_sheets = {}  # Dictionary to store appended sheets
    
    for file_path in file_paths:
        # Process each file
        sheets_dict = read_excel_and_process_sheets(file_path)
        
        # Append sheets with the same name
        for sheet_name, df in sheets_dict.items():
            if sheet_name in all_sheets:
                all_sheets[sheet_name] = pd.concat([all_sheets[sheet_name], df], ignore_index=True)
            else:
                all_sheets[sheet_name] = df
                
    return all_sheets


file_paths = ["/lakehouse/default/Files/LAAO/Version_1_4/LAAO136983-BasePAS.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO197501-BaseSHY.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO410079-BaseHAM.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO468377-BaseALT.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO488281-BaseWMD.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO697322-BasePUH.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO799953-BasePIN.xlsx"]
# Here, replace the file_paths list with the actual paths of your files

all_sheets_appended = append_sheets_from_multiple_files(file_paths)

# At this point, `all_sheets_appended` contains a DataFrame for each unique sheet name
# across all processed Excel files, with sheets having the same name appended together




StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 386, Finished, Available)

## FollowUp Ingestion

In [385]:
# import from Lakehouse file upload
# seperate each XLSX sheet
# add facility column with NCDR PID & custom column with hardocded natural key (Pat_ID)
# append multiple sheets into one into dataframes in a dictionary 

def read_excel_and_process_sheets(file_path):
    """
    Reads each sheet from an Excel file into a pandas DataFrame,
    adds a 'facility' column based on the file title, and stores them in a dictionary.
    """
    # Extract facility code from the file title
    facility_code = file_path.split('/')[-1][4:10]  # Adjust based on your filenames
    
    # Read the Excel file into a dict of DataFrames
    sheets_dict = pd.read_excel(file_path, sheet_name=None, engine='openpyxl')

    # Date columns to convert
    date_columns = [
        'FURefArrivalDate', 'FU_RefDischargeDate', 'RefProcStartDateTime',
        'F_AssessmentDate', 'F_DeathDate', 'TTEDate_F', 'F_TEEDate',
        'F_CardiacCTDate', 'F_CardiacMRIDate', 'F_ICEDate',
        'F_WarfarinDiscontinuedDate', 'F_WarfarinResumedDate',
        'F_DOACTherapyDiscontinuedDate', 'F_DOACTherapyResumedDate',
        'F_AspirinTherapyDiscontinuedDate', 'F_AspirinTherapyResumedDate',
        'F_P2Y12TherapyDiscontinuedDate', 'F_P2Y12TherapyResumedDate',
        'FupEventDate', 'F_AJ_EventDate', 'F_ADJ_DeathDate',
        'FU_ADJ_BleedDeathDate', 'F_ADJ_SysThromboDeathDate'
    ]

    #list columns to convert to string for parquet formatting issues
    string_columns = ['OtherID']

    
    # Process each sheet
    for sheet_name, df in sheets_dict.items():
        df['facility'] = facility_code
        

        # Convert specified date columns to datetime if they exist
        for column in date_columns:
            if column in df.columns:
                df[column] = pd.to_datetime(df[column]) #, errors='coerce')  # Using 'coerce' to handle invalid dates
        
        for column in string_columns:
            if column in df.columns:
                df[column] = df[column].astype(str)
        
        if 'FURefArrivalDate' in df.columns:
            df['FURefArrivalDate'] = pd.to_datetime(df['FURefArrivalDate'], errors='coerce')  # Convert to datetime, coercing errors
            df['FURefArrivalDate'] = df['FURefArrivalDate'].dt.strftime('%Y-%m-%d')  # Then format to string
        
        # Ensure 'NCDRPatientID' is string
        if 'NCDRPatientID' in df.columns:
            df['NCDRPatientID'] = df['NCDRPatientID'].astype(str)
        
        # Create 'Pat_ID'
        if 'NCDRPatientID' in df.columns and 'FURefArrivalDate' in df.columns:
            df['Pat_ID'] = df['NCDRPatientID'] + '_' + df['FURefArrivalDate'] + '_' + df['facility']
        
        sheets_dict[sheet_name] = df


        sheets_dict[sheet_name] = df
    
    return sheets_dict

def append_sheets_from_multiple_files(file_paths_FU):
    """
    Processes multiple Excel files, appending sheets with the same name across files.
    """
    all_sheets = {}  # Dictionary to store appended sheets
    
    for file_path in file_paths_FU:
        # Process each file
        sheets_dict = read_excel_and_process_sheets(file_path)
        
        # Append sheets with the same name
        for sheet_name, df in sheets_dict.items():
            if sheet_name in all_sheets:
                all_sheets[sheet_name] = pd.concat([all_sheets[sheet_name], df], ignore_index=True)
            else:
                all_sheets[sheet_name] = df
                
    return all_sheets


# Example usage
file_paths_FU = ["/lakehouse/default/Files/LAAO/Version_1_4/LAAO136983-FollowUpPAS.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO197501-FollowUpSHY.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO410079-FollowUpHAM.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO468377-FollowUpALT.xlsx", 
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO488281-FollowUpWMD.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO697322-FollowUpPUH.xlsx",
                "/lakehouse/default/Files/LAAO/Version_1_4/LAAO799953-FollowUpPIN.xlsx"]
# Here, replace the file_paths_FU list with the actual paths of your files

all_sheets_appended_FU = append_sheets_from_multiple_files(file_paths_FU)

# At this point, `all_sheets_appended_FU` contains a DataFrame for each unique sheet name
# across all processed Excel files, with sheets having the same name appended together


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 387, Finished, Available)

## Write DFs back to files

In [386]:
LAKEHOUSE_PATH = "/lakehouse/default/Files/LAAO/Version_1_4/Merged"

# Function to write DataFrame to parquet without including the index
def write_dataframe_to_parquet(df, path, filename):
    # Construct the full path
    full_path = os.path.join(path, filename)
    # Write the DataFrame to parquet without the index
    df.to_parquet(full_path, index=False)

# Loop through all DataFrames in the first dictionary and write them to parquet
for sheet_name, df in all_sheets_appended.items():
    filename = f"{sheet_name}.parquet"  # Construct a filename based on the sheet name
    write_dataframe_to_parquet(df, LAKEHOUSE_PATH, filename)

# Loop through all DataFrames in the second dictionary and write them to parquet
for sheet_name, df in all_sheets_appended_FU.items():
    # Check if sheet name is 'Demographics' or 'Export_Criteria' and add '_FU' suffix only to them
    if sheet_name in ['Demographics', 'Export Criteria']:
        filename = f"{sheet_name}_FU.parquet"  # Add '_FU' suffix to these specific sheets
    else:
        filename = f"{sheet_name}.parquet"  # Keep original name for other sheets
    write_dataframe_to_parquet(df, LAKEHOUSE_PATH, filename)

# Now, all DataFrames from both dictionaries should be written to your lakehouse in parquet format without the index column.


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 388, Finished, Available)

## Data Cleaning

### Clean IPPEvents

In [387]:
# Load data 
df = pd.read_parquet("/lakehouse/default/" + "Files/LAAO/Version_1_4/Merged/IPPEvents.parquet")

# Function to remove "(Complete Adjudication)" and trim the string
def clean_event(event_str):
    cleaned_str = event_str.replace("(Complete Adjudication)", "")  # Remove "(Complete Adjudication)"
    return cleaned_str.strip()  # Trim leading and trailing whitespace

# Apply the cleaning function to the 'F_Event' column
df['ProcEvents'] = df['ProcEvents'].apply(clean_event)

# Overwrite the original parquet file with the cleaned data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/IPPEvents.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 389, Finished, Available)

File has been successfully overwritten


### Clean PreProcLabs

In [388]:
# Remove measurement labels

# Load data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/PreProcLabs.parquet")

# List of columns to clean
columns_to_clean = ['Height', 'Weight', 'Pulse', 'SystolicBP', 'DiastolicBP', 'HGB', 'PT', 'PreProcCreat', 'Albumin', 'PlateletCt']

# Clean the columns
for column in columns_to_clean:
    # Split the value and label, keep only the numeric value (first part)
    df[column] = df[column].str.split(' ').str[0]
    # Convert the cleaned values back to numeric type, in case they were not
    df[column] = pd.to_numeric(df[column], errors='coerce')  # 'coerce' will set invalid parsing to NaN

# Overwrite the original parquet file with the cleaned data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/PreProcLabs.parquet", index=False)

# If you print or display anything after this, it should show that the operation was successful
print("File has been successfully overwritten")




StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 390, Finished, Available)

File has been successfully overwritten


### Clean PostProcLabs

In [389]:
# Remove measurement Labels

# Load data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/PostProcLabs.parquet")

# List of columns to clean
columns_to_clean = ['PostProcPeakCreat', 'PostProcCreat2', 'PostProcHgb2']

# Clean the columns
for column in columns_to_clean:
    # Split the value and label, keep only the numeric value (first part)
    df[column] = df[column].str.split(' ').str[0]
    # Convert the cleaned values back to numeric type, in case they were not
    df[column] = pd.to_numeric(df[column], errors='coerce')  # 'coerce' will set invalid parsing to NaN

# Overwrite the original parquet file with the cleaned data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/PostProcLabs.parquet", index=False)

# If you print or display anything after this, it should show that the operation was successful
print("File has been successfully overwritten")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 391, Finished, Available)

File has been successfully overwritten


### Clean LVEF

In [390]:
# Convert LVEF form "xx %" to "0.xx"

# Load the data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/Diagnostics.parquet")

# Remove the '%' sign and convert to numeric
df['LVEF'] = df['LVEF'].str.replace('%', '').astype(float) / 100

# Overwrite the original parquet file with the modified data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/Diagnostics.parquet", index=False)

print("File has been successfully overwritten")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 392, Finished, Available)

File has been successfully overwritten


### Clean FollowUp (Labs & LVEF)

In [391]:
# Remove measurement Labels & Convert LVEF form "xx %" to "0.xx"

# Load data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/FollowUp.parquet")

# List of columns to clean
columns_to_clean = ['ResidualLeakFU', 'Creat_FU', 'LowHgbValue_F']

# Clean the columns
for column in columns_to_clean:
    # Ensure the column is of string type before splitting
    df[column] = df[column].astype(str).str.split(' ').str[0]
    # Convert the cleaned values back to numeric type, in case they were not
    df[column] = pd.to_numeric(df[column], errors='coerce')  # 'coerce' will set invalid parsing to NaN

# For the 'F_LVEF' column, ensure it is a string, remove '%' signs, and safely convert to float
if 'F_LVEF' in df.columns:
    # Replace '%' with an empty string and convert 'None' or any non-numeric strings to NaN
    df['F_LVEF'] = df['F_LVEF'].astype(str).str.replace('%', '').apply(pd.to_numeric, errors='coerce') / 100

# Overwrite the original parquet file with the cleaned data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/FollowUp.parquet", index=False)

# If you print or display anything after this, it should show that the operation was successful
print("File has been successfully overwritten")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 393, Finished, Available)

File has been successfully overwritten


### Clean ProcInfo

In [392]:
# Load data
df = pd.read_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/ProcInfo.parquet")

# List of columns to clean
columns_to_clean = ['LAAO_OrWid', 'ResidualLeak', 'FluoroDoseKerm', 'ContrastVol', 'FluoroDoseDAP2']

# Clean the columns
for column in columns_to_clean:
    # Check if the column is one of those needing units extracted
    if column in ['FluoroDoseKerm', 'FluoroDoseDAP2']:
        # Split into two parts: value and unit, ensuring at least two columns are returned
        split_data = df[column].astype(str).str.split(' ', expand=True)
        df[column] = pd.to_numeric(split_data[0], errors='coerce')  # Convert value part back to numeric
        unit_column_name = f"{column}_Units"
        if split_data.shape[1] > 1:  # Check if there's a second part
            df[unit_column_name] = split_data[1]  # Assign unit part to the new column
        else:
            df[unit_column_name] = pd.NA  # Fill with NA or a placeholder if no unit is present
    else:
        # For other columns, just clean as before
        df[column] = pd.to_numeric(df[column].astype(str).str.split(' ').str[0], errors='coerce')

# Overwrite the original parquet file with the cleaned data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/ProcInfo.parquet", index=False)

print("File has been successfully overwritten")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 394, Finished, Available)

File has been successfully overwritten


### Clean FUEvents

In [393]:
# Load data 
df = pd.read_parquet("/lakehouse/default/" + "Files/LAAO/Version_1_4/Merged/FUEVENTS.parquet")

# Function to remove "(Complete Adjudication)" and trim the string
def clean_event(event_str):
    cleaned_str = event_str.replace("(Complete Adjudication)", "")  # Remove "(Complete Adjudication)"
    return cleaned_str.strip()  # Trim leading and trailing whitespace

# Apply the cleaning function to the 'F_Event' column
df['F_Event'] = df['F_Event'].apply(clean_event)

# Overwrite the original parquet file with the cleaned data
df.to_parquet("/lakehouse/default/Files/LAAO/Version_1_4/Merged/FUEVENTS.parquet", index=False)

print("File has been successfully overwritten")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 395, Finished, Available)

File has been successfully overwritten


# Version Merge Process

Granularity
- Base/Initial Encounter tables are at procedures level if not noted as long tables below
- FU are at the follow up visit level, there should up to 4 (45 day, 6m, 1y, 2y) unless noted in the long tables below

Long Tables
- PreProcMeds
- AccessSystems
- Devices
- IPPEvents
- DischargeMeds
- FUMEDS

Did not bring in ADJ tables

## Demographics

In [394]:
# demographics from v1.4
# Load data into pandas DataFrame
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Demographics.parquet"
demographics = pd.read_parquet(file_path)

# Convert the specified columns to string after loading
columns_to_convert = ['ZipCode', 'SSN', 'NCDRPatientID']
for column in columns_to_convert:
    demographics[column] = demographics[column].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Apply the function to the ZipCode column
demographics['ZipCode'] = demographics['ZipCode'].apply(remove_decimal_point)

# Convert 'facility' to int
demographics['facility'] = pd.to_numeric(demographics['facility']).astype('Int64')

# Convert 'DOB' column to datetime format
demographics['DOB'] = pd.to_datetime(demographics['DOB'], format='%Y-%m-%d')

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 396, Finished, Available)

Success


In [395]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility']).astype('Int64')

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d',  # Use errors='coerce' for columns without a specific format
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 397, Finished, Available)

Success


In [396]:
# Identifying matching columns
matching_columns = demographics.columns.intersection(in_hospital.columns)

# Appending matching columns from in_hospital to demographics
combined_df = pd.concat([demographics, in_hospital[matching_columns]], axis=0, ignore_index=True)

# Show the DataFrames
print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 398, Finished, Available)

Success


In [397]:
schema = StructType([
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("SSN", StringType(), True),
    StructField("SSNA", StringType(), True),
    StructField("NCDRPatientID", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("DOB", DateType(), True),
    StructField("Sex", StringType(), True),
    StructField("ZipCode", StringType(), True),
    StructField("ZipCodeNA", StringType(), True),
    StructField("RaceWhite", StringType(), True),
    StructField("RaceBlack", StringType(), True),
    StructField("RaceAsian", StringType(), True),
    StructField("RaceAmIndian", StringType(), True),
    StructField("RaceNatHaw", StringType(), True),
    StructField("HispOrig", StringType(), True),
    StructField("facility", IntegerType(), True)
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
combined_spark_df = spark.createDataFrame(combined_df, schema=schema)

combined_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_demographics")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 399, Finished, Available)

## Episode

In [398]:
# load episode from v1.4

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Episode.parquet"
episode = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID']
for col in columns_as_str:
    episode[col] = episode[col].astype(str)

# Convert 'Facility' to integer type, handling missing or invalid values by converting them to NaN
# Note: pd.to_numeric is used for safe conversion
episode['facility'] = pd.to_numeric(episode['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    episode[date_col] = pd.to_datetime(episode[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 400, Finished, Available)

Success


In [399]:
# Load HIPs from v 1.3

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/HIPS.parquet"
HIPS = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID']
for col in columns_as_str:
    HIPS[col] = HIPS[col].astype(str)

# Convert 'Facility' to integer type, handling missing or invalid values by converting them to NaN
# Note: pd.to_numeric is used for safe conversion
HIPS['facility'] = pd.to_numeric(HIPS['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    HIPS[date_col] = pd.to_datetime(HIPS[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 401, Finished, Available)

Success


In [400]:
# Identifying matching columns
matching_columns = episode.columns.intersection(HIPS.columns)

# Appending matching columns from episode to hips
combined_df = pd.concat([episode, HIPS[matching_columns]], axis=0, ignore_index=True)

# Show the DataFrames
print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 402, Finished, Available)

Success


In [401]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d',
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 403, Finished, Available)

Success


In [402]:
#Bring in needed columns to combined_df from in_hospital 1.3 df


# List of columns from 'in_hospital' to bring into 'combined_df'
specified_columns = [
    'MBI', 'EnrolledStudy', 'LAAO_Adm'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index(['NCDRPatientID', 'facility', 'ArrivalDate'], inplace=True)
in_hospital.set_index(['NCDRPatientID', 'facility', 'ArrivalDate'], inplace=True)

# Update only the specified columns in combined_df with the values from in_hospital
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(in_hospital[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 404, Finished, Available)

Success


In [403]:
# rename to be delta table friendly

# Here's an example of renaming columns:

columns_mapping = {
    "HIPS - Private Health Insurance": "HIPS_Private_Health_Insurance",
    "HIPS - Medicare": "HIPS_Medicare",
    "HIPS - Medicare Advantage": "HIPS_Medicare_Advantage",
    "HIPS - Medicaid": "HIPS_Medicaid",
    "HIPS - Military Health Care": "HIPS_Military_Health_Care",
    "HIPS - State-Specific Plan (non-Medicaid)": "HIPS_State_Specific_Plan_non_Medicaid",
    "HIPS - Indian Health Service": "HIPS_Indian_Health_Service",
    "HIPS - Non-US Insurance": "HIPS_Non_US_Insurance"
}

# Rename the columns
combined_df_renamed = combined_df.rename(columns=columns_mapping)

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 405, Finished, Available)

Success


In [404]:
# Initialize SparkSession
spark = SparkSession.builder.appName("ExplicitSchemaExample").getOrCreate()

# Define the schema explicitly with adjusted column names
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("DCDate", DateType(), True),
    StructField("HealthIns", StringType(), True),
    StructField("HIPS_Private_Health_Insurance", StringType(), True),
    StructField("HIPS_Medicare", StringType(), True),
    StructField("HIPS_Medicare_Advantage", StringType(), True),
    StructField("HIPS_Medicaid", StringType(), True),
    StructField("HIPS_Military_Health_Care", StringType(), True),
    StructField("HIPS_State_Specific_Plan_non_Medicaid", StringType(), True),
    StructField("HIPS_Indian_Health_Service", StringType(), True),
    StructField("HIPS_Non_US_Insurance", StringType(), True),
    StructField("MBI", StringType(), True),
    StructField("EnrolledStudy", StringType(), True),
    StructField("LAAO_Adm", StringType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
combined_spark_df = spark.createDataFrame(combined_df_renamed, schema=schema)

# Now try writing the DataFrame to the Delta table again
combined_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_episode")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 406, Finished, Available)

## Research

No data in either. Wrote a blank table to future use based on v1.4 structure.

In [405]:
# load research from v1.4

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Research.parquet"
research = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID']
for col in columns_as_str:
    research[col] = research[col].astype(str)

# Convert 'Facility' to integer type, handling missing or invalid values by converting them to NaN
# Note: pd.to_numeric is used for safe conversion
research['facility'] = pd.to_numeric(research['facility']).astype('Int64')

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    research[date_col] = pd.to_datetime(research[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 407, Finished, Available)

Success


In [406]:
# Define the schema based on the provided column information
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("DCDate", DateType(), True),
    StructField("StudyName", StringType(), True),
    StructField("StudyPtID", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
spark_df = spark.createDataFrame(research, schema=schema)

# Write the Spark DataFrame to the Delta table
spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_research")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 408, Finished, Available)

## HistoryAndRisk

### Define Dataframes Needed

In [407]:
# load historyandrisk from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/HistoryAndRisk.parquet"
historyandrisk = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    historyandrisk[col] = historyandrisk[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
historyandrisk['facility'] = pd.to_numeric(historyandrisk['facility'])

# Apply the function to the specified columns
historyandrisk['OtherID'] = historyandrisk['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d',  # Use errors='coerce' for columns without a specific format
    'AFibFlutterCathAblDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        historyandrisk[col] = pd.to_datetime(historyandrisk[col], format=fmt)
    else:
        historyandrisk[col] = pd.to_datetime(historyandrisk[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 409, Finished, Available)

Success


In [408]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d', 
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 410, Finished, Available)

Success


In [409]:
# load bleedeventtype from v1.3

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/BleedEventType.parquet"
bleedeventtype = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID', 'OtherID']
for col in columns_as_str:
    bleedeventtype[col] = bleedeventtype[col].astype(str)

# Convert 'Facility' to integer type
bleedeventtype['facility'] = pd.to_numeric(bleedeventtype['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    bleedeventtype[date_col] = pd.to_datetime(bleedeventtype[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 411, Finished, Available)

Success


In [410]:
# load strucinterventiontype from v1.3

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/StrucInterventionType.parquet"
strucinterventiontype = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID', 'OtherID']
for col in columns_as_str:
    strucinterventiontype[col] = strucinterventiontype[col].astype(str)

# Convert 'Facility' to integer type
strucinterventiontype['facility'] = pd.to_numeric(strucinterventiontype['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    strucinterventiontype[date_col] = pd.to_datetime(strucinterventiontype[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 412, Finished, Available)

Success


In [411]:
# load priorvd from v1.3

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/PriorVD.parquet"
priorvd = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID', 'OtherID']
for col in columns_as_str:
    priorvd[col] = priorvd[col].astype(str)

# Convert 'Facility' to integer type
priorvd['facility'] = pd.to_numeric(priorvd['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    priorvd[date_col] = pd.to_datetime(priorvd[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 413, Finished, Available)

Success


In [412]:
# load priorcmtype from v1.3

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/PriorCMType.parquet"
priorcmtype = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID', 'OtherID']
for col in columns_as_str:
    priorcmtype[col] = priorcmtype[col].astype(str)

# Convert 'Facility' to integer type
priorcmtype['facility'] = pd.to_numeric(priorcmtype['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    priorcmtype[date_col] = pd.to_datetime(priorcmtype[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 414, Finished, Available)

Success


In [413]:
# load laainterventiontype from v1.3

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/LAAInterventionType.parquet"
laainterventiontype = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID', 'OtherID']
for col in columns_as_str:
    laainterventiontype[col] = laainterventiontype[col].astype(str)

# Convert 'Facility' to integer type
laainterventiontype['facility'] = pd.to_numeric(laainterventiontype['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    laainterventiontype[date_col] = pd.to_datetime(laainterventiontype[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 415, Finished, Available)

Success


In [414]:
# load afibpriorablstrategy from v1.3

# Load data from a Parquet file
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/AFibPriorAblStrategy.parquet"
afibpriorablstrategy = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
columns_as_str = ['NCDRPatientID', 'OtherID']
for col in columns_as_str:
    afibpriorablstrategy[col] = afibpriorablstrategy[col].astype(str)

# Convert 'Facility' to integer type
afibpriorablstrategy['facility'] = pd.to_numeric(afibpriorablstrategy['facility'])

# Convert date columns to datetime format
date_columns = ['ArrivalDate', 'DCDate']
for date_col in date_columns:
    afibpriorablstrategy[date_col] = pd.to_datetime(afibpriorablstrategy[date_col], format='%Y-%m-%d')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 416, Finished, Available)

Success


### Begin Append and Merging Dataframes

In [415]:
#start by appending in_hospital 

# List of columns to append from 'in_hospital'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'DCDate', 
    'facility', 
    'ChadCHF', 
    'NYHA', 
    'ChadLVDysf', 
    'ChadHypertCont', 
    'ChadDM', 
    'ChadStroke', 
    'ChadTIA', 
    'ChadTE', 
    'HBHyperUncont', 
    'HBAbnRenal', 
    'HBAbnLiver', 
    'HBStroke', 
    'HBStrokeType - Hemorrhagic Stroke', 
    'HBStrokeType - Ischemic Stroke', 
    'HBStrokeType - Undetermined Stroke', 
    'HBBleed', 
    'HBLabINR', 
    'HBAlcohol', 
    'HBDrugAP', 
    'HBDrugNSAID', 
    'IncrFallRisk', 
    'GeneticCoag', 
    'ConAntiCoagTx', 
    'AFibInd', 
    'AFibClass', 
    'ValvularAF', 
    'HxRHVD', 
    'HxMVReplace', 
    'MechValveMitPos', 
    'HxMVRepair', 
    'PrevAFibTerm', 
    'PrevAFibTermPC', 
    'PrevAFibTermDC', 
    'PrevAFibTermSA', 
    'AFibSurgAblDate', 
    'AFlutter', 
    'AFlutterType', 
    'PrevAFLTerm', 
    'PrevAFLTermPC', 
    'PrevAFLTermCA', 
    'AFibFlutterCathAblDate', 
    'ChronicLungDisease', 
    'CAD', 
    'SleepApnea', 
    'SleepApneaRxFollowed', 
    'EpicardialAppCons', 
    'MedCond - Cardiac Surgery', 
    'MedCond - Pericarditis', 
    'MedCond - Epicardial Access', 
    'MedCond - Thoracic Radiation Therapy', 
    'MedCond - Pectus Excavatum', 
    'MedCond - Epigastric Surgery', 
    'MedCond - Autoimmune Disease', 
    'MedCond - Hepatomegaly', 
    'MedCond - Hiatal Hernia', 
    'LupusCons'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = in_hospital[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(historyandrisk.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'historyandrisk' DataFrame
new_data_to_append = new_data_to_append[historyandrisk.columns]

# Use concat instead of append
combined_df = pd.concat([historyandrisk, new_data_to_append], ignore_index=True)


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 417, Finished, Available)

In [416]:
#Bring in needed columns to combined_df from structinterventiontype 1.3 df


# List of columns from 'strucinterventiontype' to bring into 'combined_df'
specified_columns = [
    'CardStrucInterv', 
    'CardStrucIntervType - Aortic Balloon Valvuloplasty', 
    'CardStrucIntervType - Transcatheter Aortic Valve Replacement (TAVR)', 
    'CardStrucIntervType - AV Replacement - Surgical', 
    'CardStrucIntervType - AV Repair - Surgical', 
    'CardStrucIntervType - Mitral Balloon Valvuloplasty', 
    'CardStrucIntervType -  Transcatheter Mitral Valve Repair (TMVR)', 
    'CardStrucIntervType - MV Replacement - Surgical', 
    'CardStrucIntervType - MV Repair - Surgical', 
    'CardStrucIntervType - Mitral Annuloplasty Ring - Surgical', 
    'CardStrucIntervType - Mitral Transcatheter - Valve-in-valve', 
    'CardStrucIntervType - ASD Closure', 
    'CardStrucIntervType - PFO Closure', 
    'CardStrucIntervType - Pulmonic Replacement', 
    'CardStrucIntervType - Pulmonic Repair', 
    'CardStrucIntervType - Tricuspid Replacement', 
    'CardStrucIntervType - Tricuspid Repair'
]

# Set the index of both dataframes to 'Pat_ID'
combined_df.set_index('Pat_ID', inplace=True)
strucinterventiontype.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from strucinterventiontype
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(strucinterventiontype[column])

# Reset the index to return to the original structure, if needed
combined_df.reset_index(inplace=True)

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 418, Finished, Available)

Success


In [417]:

#Bring in needed columns to combined_df from priorvd 1.3 df

# List of columns from 'priorvd' to bring into 'combined_df'
specified_columns = [
    'PriorVD - Prior Myocardial Infarction (MI)', 
    'PriorVD - Peripheral Arterial Occlusive Disease (PAD)', 
    'PriorVD - Known Aortic Plaque', 
    'PriorVD - Coronary Artery Disease (CAD)*', 
    'PriorVD - Percutaneous Coronary Intervention (PCI)*', 
    'PriorVD - Coronary Artery Bypass Graft (CABG)*', 
    'PriorVD - Carotid Artery Disease*'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
priorvd.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from priorvd
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(priorvd[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 419, Finished, Available)

Success


In [418]:

#Bring in needed columns to combined_df from priorcmtype 1.3 df

# List of columns from 'priorcmtype' to bring into 'combined_df'
specified_columns = [
    'CM', 
    'PriorCMType - Non-ischemic cardiomyopathy', 
    'PriorCMType - Ischemic cardiomyopathy', 
    'PriorCMType - Restrictive cardiomyopathy', 
    'PriorCMType - Hypertrophic cardiomyopathy', 
    'PriorCMType - Other cardiomyopathy type'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
priorcmtype.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from priorcmtype
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(priorcmtype[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 420, Finished, Available)

Success


In [419]:

#Bring in needed columns to combined_df from laainterventiontype 1.3 df

# List of columns from 'laainterventiontype' to bring into 'combined_df'
specified_columns = [
    'LAAOInterv', 
    'LAAOType - Epicardial Ligation', 
    'LAAOType - Surgical Amputation', 
    'LAAOType - Surgical Ligation', 
    'LAAOType - Percutaneous Occlusion', 
    'LAAOType - Surgical Closure Device', 
    'LAAOType - Surgical Stapling'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
laainterventiontype.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from laainterventiontype
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(laainterventiontype[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 421, Finished, Available)

Success


In [420]:

#Bring in needed columns to combined_df from bleedeventtype 1.3 df

# List of columns from 'bleedeventtype' to bring into 'combined_df'
specified_columns = [
    'ClinicBleedEvent', 
    'BleedEventType - Intracranial Bleed', 
    'BleedEventType - Epistaxis', 
    'BleedEventType - Gastrointestinal Bleed', 
    'BleedEventType - Other'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
bleedeventtype.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from bleedeventtype
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(bleedeventtype[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)


print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 422, Finished, Available)

Success


In [421]:

#Bring in needed columns to combined_df from afibpriorablstrategy 1.3 df

# List of columns from 'afibpriorablstrategy' to bring into 'combined_df'
specified_columns = [
    'PrevAFibTermCA', 
    'AFibCathAblDate', 
    'AFibPriorAblStrategyCode - Complex Fractionated Atrial Electrogram', 
    'AFibPriorAblStrategyCode - Convergent Procedure', 
    'AFibPriorAblStrategyCode - Cryoablation', 
    'AFibPriorAblStrategyCode - Empiric LA Linear Lesions', 
    'AFibPriorAblStrategyCode - Focal Ablation', 
    'AFibPriorAblStrategyCode - Ganglion Plexus Ablation', 
    'AFibPriorAblStrategyCode - Pulmonary Vein Isolation', 
    'AFibPriorAblStrategyCode - Segmental PV Ablation', 
    'AFibPriorAblStrategyCode - Rotor Based Mapping', 
    'AFibPriorAblStrategyCode - Wide Area Circumferential Ablation'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index("Pat_ID", inplace=True)
afibpriorablstrategy.set_index("Pat_ID", inplace=True)

# Update only the specified columns in combined_df with the values from afibpriorablstrategy
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(afibpriorablstrategy[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)


print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 423, Finished, Available)

Success


In [422]:
# Rename columns to be compatible with Delta table
columns_mapping = {
    'HBStrokeType - Hemorrhagic Stroke': 'HBStrokeType_Hemorrhagic_Stroke', 
    'HBStrokeType - Ischemic Stroke': 'HBStrokeType_Ischemic_Stroke', 
    'HBStrokeType - Undetermined Stroke': 'HBStrokeType_Undetermined_Stroke',
    'MedCond - Cardiac Surgery': 'MedCond_Cardiac_Surgery', 
    'MedCond - Pericarditis': 'MedCond_Pericarditis', 
    'MedCond - Epicardial Access': 'MedCond_Epicardial_Access', 
    'MedCond - Thoracic Radiation Therapy': 'MedCond_Thoracic_Radiation_Therapy', 
    'MedCond - Pectus Excavatum': 'MedCond_Pectus_Excavatum', 
    'MedCond - Epigastric Surgery': 'MedCond_Epigastric_Surgery', 
    'MedCond - Autoimmune Disease': 'MedCond_Autoimmune_Disease', 
    'MedCond - Hepatomegaly': 'MedCond_Hepatomegaly', 
    'MedCond - Hiatal Hernia': 'MedCond_Hiatal_Hernia',
    'CardStrucIntervType - Aortic Balloon Valvuloplasty': 'CardStrucIntervType_Aortic_Balloon_Valvuloplasty', 
    'CardStrucIntervType - Transcatheter Aortic Valve Replacement (TAVR)': 'CardStrucIntervType_Transcatheter_Aortic_Valve_Replacement_TAVR', 
    'CardStrucIntervType - AV Replacement - Surgical': 'CardStrucIntervType_AV_Replacement_Surgical', 
    'CardStrucIntervType - AV Repair - Surgical': 'CardStrucIntervType_AV_Repair_Surgical', 
    'CardStrucIntervType - Mitral Balloon Valvuloplasty': 'CardStrucIntervType_Mitral_Balloon_Valvuloplasty', 
    'CardStrucIntervType - Transcatheter Mitral Valve Repair (TMVR)': 'CardStrucIntervType_Transcatheter_Mitral_Valve_Repair_TMVR', 
    'CardStrucIntervType - MV Replacement - Surgical': 'CardStrucIntervType_MV_Replacement_Surgical', 
    'CardStrucIntervType - MV Repair - Surgical': 'CardStrucIntervType_MV_Repair_Surgical', 
    'CardStrucIntervType - Mitral Annuloplasty Ring - Surgical': 'CardStrucIntervType_Mitral_Annuloplasty_Ring_Surgical', 
    'CardStrucIntervType - Mitral Transcatheter - Valve-in-valve': 'CardStrucIntervType_Mitral_Transcatheter_Valve_in_valve', 
    'CardStrucIntervType - ASD Closure': 'CardStrucIntervType_ASD_Closure', 
    'CardStrucIntervType - PFO Closure': 'CardStrucIntervType_PFO_Closure', 
    'CardStrucIntervType - Pulmonic Replacement': 'CardStrucIntervType_Pulmonic_Replacement', 
    'CardStrucIntervType - Pulmonic Repair': 'CardStrucIntervType_Pulmonic_Repair', 
    'CardStrucIntervType - Tricuspid Replacement': 'CardStrucIntervType_Tricuspid_Replacement', 
    'CardStrucIntervType - Tricuspid Repair': 'CardStrucIntervType_Tricuspid_Repair',
    'PriorVD - Prior Myocardial Infarction (MI)': 'PriorVD_Prior_Myocardial_Infarction_MI', 
    'PriorVD - Peripheral Arterial Occlusive Disease (PAD)': 'PriorVD_Peripheral_Arterial_Occlusive_Disease_PAD', 
    'PriorVD - Known Aortic Plaque': 'PriorVD_Known_Aortic_Plaque', 
    'PriorVD - Coronary Artery Disease (CAD)*': 'PriorVD_Coronary_Artery_Disease_CAD', 
    'PriorVD - Percutaneous Coronary Intervention (PCI)*': 'PriorVD_Percutaneous_Coronary_Intervention_PCI', 
    'PriorVD - Coronary Artery Bypass Graft (CABG)*': 'PriorVD_Coronary_Artery_Bypass_Graft_CABG', 
    'PriorVD - Carotid Artery Disease': 'PriorVD_Carotid_Artery_Disease',
    'PriorCMType - Non-ischemic cardiomyopathy': 'PriorCMType_Non_ischemic_cardiomyopathy', 
    'PriorCMType - Ischemic cardiomyopathy': 'PriorCMType_Ischemic_cardiomyopathy', 
    'PriorCMType - Restrictive cardiomyopathy': 'PriorCMType_Restrictive_cardiomyopathy', 
    'PriorCMType - Hypertrophic cardiomyopathy': 'PriorCMType_Hypertrophic_cardiomyopathy', 
    'PriorCMType - Other cardiomyopathy type': 'PriorCMType_Other_cardiomyopathy_type',
    'LAAOType - Epicardial Ligation': 'LAAOType_Epicardial_Ligation', 
    'LAAOType - Surgical Amputation': 'LAAOType_Surgical_Amputation', 
    'LAAOType - Surgical Ligation': 'LAAOType_Surgical_Ligation', 
    'LAAOType - Percutaneous Occlusion': 'LAAOType_Percutaneous_Occlusion', 
    'LAAOType - Surgical Closure Device': 'LAAOType_Surgical_Closure_Device', 
    'LAAOType - Surgical Stapling': 'LAAOType_Surgical_Stapling',
    'BleedEventType - Intracranial Bleed': 'BleedEventType_Intracranial_Bleed', 
    'BleedEventType - Epistaxis': 'BleedEventType_Epistaxis', 
    'BleedEventType - Gastrointestinal Bleed': 'BleedEventType_Gastrointestinal_Bleed', 
    'BleedEventType - Other': 'BleedEventType_Other',
    'AFibPriorAblStrategyCode - Complex Fractionated Atrial Electrogram': 'AFibPriorAblStrategyCode_Complex_Fractionated_Atrial_Electrogram', 
    'AFibPriorAblStrategyCode - Convergent Procedure': 'AFibPriorAblStrategyCode_Convergent_Procedure', 
    'AFibPriorAblStrategyCode - Cryoablation': 'AFibPriorAblStrategyCode_Cryoablation', 
    'AFibPriorAblStrategyCode - Empiric LA Linear Lesions': 'AFibPriorAblStrategyCode_Empiric_LA_Linear_Lesions', 
    'AFibPriorAblStrategyCode - Focal Ablation': 'AFibPriorAblStrategyCode_Focal_Ablation', 
    'AFibPriorAblStrategyCode - Ganglion Plexus Ablation': 'AFibPriorAblStrategyCode_Ganglion_Plexus_Ablation', 
    'AFibPriorAblStrategyCode - Pulmonary Vein Isolation': 'AFibPriorAblStrategyCode_Pulmonary_Vein_Isolation', 
    'AFibPriorAblStrategyCode - Segmental PV Ablation': 'AFibPriorAblStrategyCode_Segmental_PV_Ablation', 
    'AFibPriorAblStrategyCode - Rotor Based Mapping': 'AFibPriorAblStrategyCode_Rotor_Based_Mapping', 
    'AFibPriorAblStrategyCode - Wide Area Circumferential Ablation': 'AFibPriorAblStrategyCode_Wide_Area_Circumferential_Ablation',
    'PriorVD - Carotid Artery Disease*': 'PriorVD_Carotid_Artery_Disease',
    'CardStrucIntervType -  Transcatheter Mitral Valve Repair (TMVR)': 'CardStrucIntervType_Transcatheter_Mitral_Valve_Repair_TMVR'
}

# Rename the columns
combined_df_renamed = combined_df.rename(columns=columns_mapping)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 424, Finished, Available)

Success


In [423]:
# Define the schema explicitly for laao_historyandrisk
schema = StructType([
    StructField("Pat_ID", StringType(), True),
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("DCDate", DateType(), True),
    StructField("ChadCHF", StringType(), True),
    StructField("NYHA", StringType(), True),
    StructField("ChadLVDysf", StringType(), True),
    StructField("ChadHypertCont", StringType(), True),
    StructField("ChadDM", StringType(), True),
    StructField("ChadStroke", StringType(), True),
    StructField("ChadTIA", StringType(), True),
    StructField("ChadTE", StringType(), True),
    StructField("ChadVascDis", StringType(), True),
    StructField("PriorVD_Prior_Myocardial_Infarction_MI", StringType(), True),
    StructField("PriorVD_Peripheral_Arterial_Occlusive_Disease_PAD", StringType(), True),
    StructField("PriorVD_Known_Aortic_Plaque", StringType(), True),
    StructField("PriorVD_Coronary_Artery_Disease_CAD", StringType(), True),
    StructField("PriorVD_Percutaneous_Coronary_Intervention_PCI", StringType(), True),
    StructField("PriorVD_Coronary_Artery_Bypass_Graft_CABG", StringType(), True),
    StructField("PriorVD_Carotid_Artery_Disease", StringType(), True),
    StructField("HBHyperUncont", StringType(), True),
    StructField("HBAbnRenal", StringType(), True),
    StructField("HBAbnLiver", StringType(), True),
    StructField("HBStroke", StringType(), True),
    StructField("HBStrokeType_Hemorrhagic_Stroke", StringType(), True),
    StructField("HBStrokeType_Ischemic_Stroke", StringType(), True),
    StructField("HBStrokeType_Undetermined_Stroke", StringType(), True),
    StructField("HBBleed", StringType(), True),
    StructField("HBLabINR", StringType(), True),
    StructField("HBAlcohol", StringType(), True),
    StructField("HBDrugAP", StringType(), True),
    StructField("HBDrugNSAID", StringType(), True),
    StructField("IncrFallRisk", StringType(), True),
    StructField("ClinicBleedEvent", StringType(), True),
    StructField("BleedEventType_Intracranial_Bleed", StringType(), True),
    StructField("BleedEventType_Epistaxis", StringType(), True),
    StructField("BleedEventType_Gastrointestinal_Bleed", StringType(), True),
    StructField("BleedEventType_Other", StringType(), True),
    StructField("GeneticCoag", StringType(), True),
    StructField("ConAntiCoagTx", StringType(), True),
    StructField("AFibInd", StringType(), True),
    StructField("AFibClass", StringType(), True),
    StructField("ValvularAF", StringType(), True),
    StructField("HxRHVD", StringType(), True),
    StructField("HxMVReplace", StringType(), True),
    StructField("MechValveMitPos", StringType(), True),
    StructField("HxMVRepair", StringType(), True),
    StructField("PrevAFibTerm", StringType(), True),
    StructField("PrevAFibTermPC", StringType(), True),
    StructField("PrevAFibTermDC", StringType(), True),
    StructField("PrevAFibTermCA", StringType(), True),
    StructField("AFibCathAblDate", DateType(), True),  
    StructField("AFibPriorAblStrategyCode_Complex_Fractionated_Atrial_Electrogram", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Convergent_Procedure", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Cryoablation", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Empiric_LA_Linear_Lesions", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Focal_Ablation", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Ganglion_Plexus_Ablation", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Pulmonary_Vein_Isolation", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Segmental_PV_Ablation", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Rotor_Based_Mapping", StringType(), True),
    StructField("AFibPriorAblStrategyCode_Wide_Area_Circumferential_Ablation", StringType(), True),
    StructField("PrevAFibTermSA", StringType(), True),
    StructField("AFibSurgAblDate", DateType(), True),
    StructField("AFlutter", StringType(), True),
    StructField("AFlutterType", StringType(), True),
    StructField("PrevAFLTerm", StringType(), True),
    StructField("PrevAFLTermPC", StringType(), True),
    StructField("PrevAFLTermDC", StringType(), True),
    StructField("PrevAFLTermCA", StringType(), True),
    StructField("AFibFlutterCathAblDate", DateType(), True), 
    StructField("CardStrucInterv", StringType(), True),
    StructField("CardStrucIntervType_Aortic_Balloon_Valvuloplasty", StringType(), True),
    StructField("CardStrucIntervType_Transcatheter_Aortic_Valve_Replacement_TAVR", StringType(), True),
    StructField("CardStrucIntervType_AV_Replacement_Surgical", StringType(), True),
    StructField("CardStrucIntervType_AV_Repair_Surgical", StringType(), True),
    StructField("CardStrucIntervType_Mitral_Balloon_Valvuloplasty", StringType(), True),
    StructField("CardStrucIntervType_Transcatheter_Mitral_Valve_Repair_TMVR", StringType(), True),
    StructField("CardStrucIntervType_MV_Replacement_Surgical", StringType(), True),
    StructField("CardStrucIntervType_MV_Repair_Surgical", StringType(), True),
    StructField("CardStrucIntervType_Mitral_Annuloplasty_Ring_Surgical", StringType(), True),
    StructField("CardStrucIntervType_Mitral_Transcatheter_Valve_in_valve", StringType(), True),
    StructField("CardStrucIntervType_ASD_Closure", StringType(), True),
    StructField("CardStrucIntervType_PFO_Closure", StringType(), True),
    StructField("CardStrucIntervType_Pulmonic_Replacement", StringType(), True),
    StructField("CardStrucIntervType_Pulmonic_Repair", StringType(), True),
    StructField("CardStrucIntervType_Tricuspid_Replacement", StringType(), True),
    StructField("CardStrucIntervType_Tricuspid_Repair", StringType(), True),
    StructField("LAAOInterv", StringType(), True),
    StructField("LAAOType_Epicardial_Ligation", StringType(), True),
    StructField("LAAOType_Surgical_Amputation", StringType(), True),
    StructField("LAAOType_Surgical_Ligation", StringType(), True),
    StructField("LAAOType_Percutaneous_Occlusion", StringType(), True),
    StructField("LAAOType_Surgical_Closure_Device", StringType(), True),
    StructField("LAAOType_Surgical_Stapling", StringType(), True),
    StructField("CM", StringType(), True),
    StructField("PriorCMType_Non_ischemic_cardiomyopathy", StringType(), True),
    StructField("PriorCMType_Ischemic_cardiomyopathy", StringType(), True),
    StructField("PriorCMType_Restrictive_cardiomyopathy", StringType(), True),
    StructField("PriorCMType_Hypertrophic_cardiomyopathy", StringType(), True),
    StructField("PriorCMType_Other_cardiomyopathy_type", StringType(), True),
    StructField("ChronicLungDisease", StringType(), True),
    StructField("CAD", StringType(), True),
    StructField("SleepApnea", StringType(), True),
    StructField("SleepApneaRxFollowed", StringType(), True),
    StructField("EpicardialAppCons", StringType(), True),
    StructField("MedCond_Cardiac_Surgery", StringType(), True),
    StructField("MedCond_Pericarditis", StringType(), True),
    StructField("MedCond_Epicardial_Access", StringType(), True),
    StructField("MedCond_Thoracic_Radiation_Therapy", StringType(), True),
    StructField("MedCond_Pectus_Excavatum", StringType(), True),
    StructField("MedCond_Epigastric_Surgery", StringType(), True),
    StructField("MedCond_Autoimmune_Disease", StringType(), True),
    StructField("MedCond_Hepatomegaly", StringType(), True),
    StructField("MedCond_Hiatal_Hernia", StringType(), True),
    StructField("LupusCons", FloatType(), True),
    StructField("facility", IntegerType(), True)
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
laao_historyandrisk_df = spark.createDataFrame(combined_df_renamed, schema=schema)

# Write the DataFrame to the Delta table
laao_historyandrisk_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_historyandrisk")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 425, Finished, Available)

## Diagnostics

In [424]:
# load diagnostics from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Diagnostics.parquet"
diagnostics = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    diagnostics[col] = diagnostics[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
diagnostics['facility'] = pd.to_numeric(diagnostics['facility'])

# Apply the function to the specified columns
diagnostics['OtherID'] = diagnostics['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'TTEDate': '%Y-%m-%d',
    'CTImagingDate': '%Y-%m-%d',
    'MRDate': '%Y-%m-%d',
    'ICEDate':  '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        diagnostics[col] = pd.to_datetime(diagnostics[col], format=fmt)
    else:
        diagnostics[col] = pd.to_datetime(diagnostics[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 426, Finished, Available)

Success


In [425]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d', 
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 427, Finished, Available)

Success


In [426]:
# load atrialrhythm from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/AtrialRhythm.parquet"
atrialrhythm = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in [ 'OtherID', 'NCDRPatientID']:
    atrialrhythm[col] = atrialrhythm[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
atrialrhythm['facility'] = pd.to_numeric(atrialrhythm['facility'])

# Apply the function to the specified columns
atrialrhythm['OtherID'] = atrialrhythm['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    # 'AFibSurgAblDate': '%Y-%m-%d', 
    # 'AFibFlutterCathAblDate': '%Y-%m-%d',
    # 'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    # 'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    # 'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        atrialrhythm[col] = pd.to_datetime(atrialrhythm[col], format=fmt)
    else:
        atrialrhythm[col] = pd.to_datetime(atrialrhythm[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 428, Finished, Available)

Success


In [427]:
#start by appending atrialrhythm

# List of columns to append from 'atrialrhythm'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'DCDate', 
    'AtrialRhythm - Sinus node rhythm', 
    'AtrialRhythm - Atrial fibrillation', 
    'AtrialRhythm - Atrial tachycardia', 
    'AtrialRhythm - Atrial flutter', 
    'AtrialRhythm - Sinus arrest', 
    'AtrialRhythm - Atrial paced', 
    'AtrialRhythm - Not Documented', 
    'facility'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = atrialrhythm[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(diagnostics.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'diagnostics' DataFrame
new_data_to_append = new_data_to_append[diagnostics.columns]

# Use concat instead of append
combined_df = pd.concat([diagnostics, new_data_to_append], ignore_index=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 429, Finished, Available)

Success


In [428]:

#Bring in needed columns to combined_df from in_hospital 1.3 df

# List of columns from 'in_hospital' to bring into 'combined_df'
specified_columns = [
    'LVEFAssessed', 
    'LVEF', 
    'TTEPerf', 
    'TTEDate', 
    'BaselineImagingPerf', 
    'CTPerformed', 
    'CTImagingDate', 
    'MRPerformed', 
    'MRDate', 
    'ICEPerf', 
    'ICEDate'

]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index(['NCDRPatientID', 'facility', 'ArrivalDate'], inplace=True)
in_hospital.set_index(['NCDRPatientID', 'facility', 'ArrivalDate'], inplace=True)

# Update only the specified columns in combined_df with the values from in_hospital
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(in_hospital[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 430, Finished, Available)

Success


In [429]:
# Rename columns to be compatible with Delta table
columns_mapping = {
    'AtrialRhythm - Sinus node rhythm': 'AtrialRhythm_Sinus_node_rhythm', 
    'AtrialRhythm - Atrial fibrillation': 'AtrialRhythm_Atrial_fibrillation', 
    'AtrialRhythm - Atrial tachycardia': 'AtrialRhythm_Atrial_tachycardia',
    'AtrialRhythm - Atrial flutter': 'AtrialRhythm_Atrial_flutter', 
    'AtrialRhythm - Sinus arrest': 'AtrialRhythm_Sinus_arrest', 
    'AtrialRhythm - Atrial paced': 'AtrialRhythm_Atrial_paced', 
    'AtrialRhythm - Not Documented': 'AtrialRhythm_Not_Documented'
}

# Rename the columns
combined_df_renamed = combined_df.rename(columns=columns_mapping)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 431, Finished, Available)

Success


In [430]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoDiagnosticsIngestion").getOrCreate()

# Define the schema explicitly based on the laao_diagnostics table columns
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("DCDate", DateType(), True),
    StructField("AtrialRhythm_Sinus_node_rhythm", StringType(), True),
    StructField("AtrialRhythm_Atrial_fibrillation", StringType(), True),
    StructField("AtrialRhythm_Atrial_tachycardia", StringType(), True),
    StructField("AtrialRhythm_Atrial_flutter", StringType(), True),
    StructField("AtrialRhythm_Sinus_arrest", StringType(), True),
    StructField("AtrialRhythm_Atrial_paced", StringType(), True),
    StructField("AtrialRhythm_Not_Documented", StringType(), True),
    StructField("LVEFAssessed", StringType(), True),
    StructField("LVEF", FloatType(), True),
    StructField("TTEPerf", StringType(), True),
    StructField("TTEDate", DateType(), True),
    StructField("BaselineImagingPerf", StringType(), True),
    StructField("CTPerformed", StringType(), True),
    StructField("CTImagingDate", DateType(), True),
    StructField("MRPerformed", StringType(), True),
    StructField("MRDate", DateType(), True),
    StructField("ICEPerf", StringType(), True),
    StructField("ICEDate", DateType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
combined_spark_df = spark.createDataFrame(combined_df_renamed, schema=schema)

# Write the DataFrame to the Delta table with the defined schema
combined_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_diagnostics")

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 432, Finished, Available)

Success


## PreProcLabs

In [431]:
# load preproclabs from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/PreProcLabs.parquet"
preproclabs = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    preproclabs[col] = preproclabs[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
preproclabs['facility'] = pd.to_numeric(preproclabs['facility'])

# Apply the function to the specified columns
preproclabs['OtherID'] = preproclabs['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
}

for col, fmt in date_columns.items():
    if fmt:
        preproclabs[col] = pd.to_datetime(preproclabs[col], format=fmt)
    else:
        preproclabs[col] = pd.to_datetime(preproclabs[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 433, Finished, Available)

Success


In [432]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d', 
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 434, Finished, Available)

Success


In [433]:
#start by appending in_hospital to get natrual key columns

# List of columns to append from 'in_hospital'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'DCDate', 
    'Height', 
    'Weight', 
    'Pulse', 
    'SystolicBP', 
    'DiastolicBP', 
    'HGB', 
    'HGBND', 
    'PT', 
    'PTND', 
    'INR', 
    'INRND', 
    'PreProcCreat', 
    'PreProcCreatND', 
    'Albumin', 
    'Albumin_ND', 
    'PlateletCt', 
    'PlateletCtND', 
    'RankinScale', 
    'PostProc_RankinScaleNA', 
    'facility'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = in_hospital[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(preproclabs.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'preproclabs' DataFrame
new_data_to_append = new_data_to_append[preproclabs.columns]

# Use concat instead of append
combined_df = pd.concat([preproclabs, new_data_to_append], ignore_index=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 435, Finished, Available)

Success


In [434]:
combined_df.info()

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 436, Finished, Available)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3598 entries, 0 to 3597
Data columns (total 28 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   NCDRPatientID           3598 non-null   object        
 1   LastName                3598 non-null   object        
 2   FirstName               3598 non-null   object        
 3   MidName                 2894 non-null   object        
 4   OtherID                 3598 non-null   object        
 5   ArrivalDate             3598 non-null   datetime64[ns]
 6   DCDate                  3598 non-null   datetime64[ns]
 7   Height                  3592 non-null   float64       
 8   Weight                  3595 non-null   float64       
 9   Pulse                   3588 non-null   float64       
 10  SystolicBP              3590 non-null   float64       
 11  DiastolicBP             3590 non-null   float64       
 12  HGB                     3109 non-null   float64 

In [435]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LAAPreprocLabsIngestion").getOrCreate()

# Define the schema explicitly for the laao_preproclabs table
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("DCDate", DateType(), True),
    StructField("Height", FloatType(), True),
    StructField("Weight", FloatType(), True),
    StructField("Pulse", FloatType(), True),
    StructField("SystolicBP", FloatType(), True),
    StructField("DiastolicBP", FloatType(), True),
    StructField("HGB", FloatType(), True),
    StructField("HGBND", StringType(), True),
    StructField("PT", FloatType(), True),
    StructField("PTND", StringType(), True),
    StructField("INR", FloatType(), True),
    StructField("INRND", StringType(), True),
    StructField("PreProcCreat", FloatType(), True),
    StructField("PreProcCreatND", StringType(), True),
    StructField("Albumin", FloatType(), True),
    StructField("Albumin_ND", StringType(), True),
    StructField("PlateletCt", FloatType(), True),
    StructField("PlateletCtND", StringType(), True),
    StructField("RankinScale", StringType(), True),
    StructField("PostProc_RankinScaleNA", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True)
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
preproclabs_spark_df = spark.createDataFrame(combined_df, schema=schema)

# Write the DataFrame to the Delta table
preproclabs_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_preproclabs")

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 437, Finished, Available)

Success


## PreProcMeds

In [436]:
# load preprocmeds from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/PreProcMeds.parquet"
preprocmeds = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    preprocmeds[col] = preprocmeds[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
preprocmeds['facility'] = pd.to_numeric(preprocmeds['facility'])

# Apply the function to the specified columns
preprocmeds['OtherID'] = preprocmeds['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
}

for col, fmt in date_columns.items():
    if fmt:
        preprocmeds[col] = pd.to_datetime(preprocmeds[col], format=fmt)
    else:
        preprocmeds[col] = pd.to_datetime(preprocmeds[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 438, Finished, Available)

Success


In [437]:
# load medid from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/MedID.parquet"
medid = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    medid[col] = medid[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
medid['facility'] = pd.to_numeric(medid['facility'])

# Apply the function to the specified columns
medid['OtherID'] = medid['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
}

for col, fmt in date_columns.items():
    if fmt:
        medid[col] = pd.to_datetime(medid[col], format=fmt)
    else:
        medid[col] = pd.to_datetime(medid[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 439, Finished, Available)

Success


In [438]:
# pivot (melt) medid to make it longform

# Rename the columns to match v 1.4 medication names
MedID_renamed = medid.rename(columns={
    'Fondaparinux': 'Fondaparinux',
    'Heparin Derivative': 'Heparin Derivative',
    'Low Molecular Wt Heparin': 'Low Molecular Weight Heparin',
    'Unfractionated Heparin': 'Unfractionated Heparin',
    'Warfarin': 'Warfarin',
    'Aspirin (81-100 mg)': 'Aspirin 81 to 100 mg',
    'Aspirin (101-324 mg)': 'Aspirin 101 to 324 mg',
    'Aspirin (325 mg)': 'Aspirin 325 mg',
    'Aspirin/Dipyridamole': 'Aspirin/Dipyridamole',
    'Vorapaxar': 'Vorapaxar',
    'Apixaban': 'Apixaban',
    'Dabigatran': 'Dabigatran',
    'Edoxaban': 'Edoxaban',
    'Rivaroxaban': 'Rivaroxaban',
    'Cangrelor': 'Cangrelor',
    'Clopidogrel': 'Clopidogrel',
    'Other P2Y12 Inhibitor': 'Other P2Y12',
    'Prasugrel': 'Prasugrel',
    'Ticagrelor': 'Ticagrelor',
    'Ticlopidine': 'Ticlopidine',
    'Aggrenox': 'Aggrenox'
})

# Now proceed to melt the dataframe as before
id_vars = ['NCDRPatientID', 'LastName', 'FirstName', 'MidName', 'OtherID', 'ArrivalDate', 'DCDate', 'facility', 'Pat_ID']

# Use the melt function to pivot the dataframe
MedID_melted = pd.melt(frame=MedID_renamed, id_vars=id_vars, var_name='MedID', value_name='PreMedAdmin')

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 440, Finished, Available)

Success


In [439]:
# Reorder MedID_melted columns to match preprocmeds
MedID_melted = MedID_melted[preprocmeds.columns]

# Then concatenate
combined_df = pd.concat([preprocmeds, MedID_melted], axis=0, ignore_index=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 441, Finished, Available)

Success


In [440]:
combined_df.info()

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 442, Finished, Available)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 77140 entries, 0 to 77139
Data columns (total 11 columns):
 #   Column         Dtype         
---  ------         -----         
 0   NCDRPatientID  object        
 1   LastName       object        
 2   FirstName      object        
 3   MidName        object        
 4   OtherID        object        
 5   ArrivalDate    datetime64[ns]
 6   DCDate         datetime64[ns]
 7   MedID          object        
 8   PreMedAdmin    object        
 9   facility       int64         
 10  Pat_ID         object        
dtypes: datetime64[ns](2), int64(1), object(8)
memory usage: 6.5+ MB


In [441]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoPreprocmedsIngestion").getOrCreate()

# Define the schema explicitly based on the new table requirements
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("DCDate", DateType(), True),
    StructField("MedID", StringType(), True),
    StructField("PreMedAdmin", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True)
])

# Assuming `preprocmeds_df` is your pandas DataFrame containing the data
preprocmeds_spark_df = spark.createDataFrame(combined_df, schema=schema)

# Write the DataFrame to the Delta table
preprocmeds_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_preprocmeds")

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 443, Finished, Available)

Success


## ProcInfo

### Define Dataframes needed

In [442]:
# load procinfo from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/ProcInfo.parquet"
procinfo = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID', 'OHSConversionReason']:
    procinfo[col] = procinfo[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
procinfo['facility'] = pd.to_numeric(procinfo['facility'])

# Apply the function to the specified columns
procinfo['OtherID'] = procinfo['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',

}

for col, fmt in date_columns.items():
    if fmt:
        procinfo[col] = pd.to_datetime(procinfo[col], format=fmt)
    else:
        procinfo[col] = pd.to_datetime(procinfo[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 444, Finished, Available)

Success


In [443]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d', 
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 445, Finished, Available)

Success


In [444]:
# load proclaaoind from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcLAAOInd.parquet"
proclaaoind = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    proclaaoind[col] = proclaaoind[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
proclaaoind['facility'] = pd.to_numeric(proclaaoind['facility'])

# Apply the function to the specified columns
proclaaoind['OtherID'] = proclaaoind['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        proclaaoind[col] = pd.to_datetime(proclaaoind[col], format=fmt)
    else:
        proclaaoind[col] = pd.to_datetime(proclaaoind[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 446, Finished, Available)

Success


In [445]:
# load proccanceledreason from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcCanceledReason.parquet"
proccanceledreason = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    proccanceledreason[col] = proccanceledreason[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
proccanceledreason['facility'] = pd.to_numeric(proccanceledreason['facility'])

# Apply the function to the specified columns
proccanceledreason['OtherID'] = proccanceledreason['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureStopDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        proccanceledreason[col] = pd.to_datetime(proccanceledreason[col], format=fmt)
    else:
        proccanceledreason[col] = pd.to_datetime(proccanceledreason[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 447, Finished, Available)

Success


In [446]:
# load procabortedreason from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/ProcAbortedReason.parquet"
procabortedreason = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    procabortedreason[col] = procabortedreason[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
procabortedreason['facility'] = pd.to_numeric(procabortedreason['facility'])

# Apply the function to the specified columns
procabortedreason['OtherID'] = procabortedreason['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureStopDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        procabortedreason[col] = pd.to_datetime(procabortedreason[col], format=fmt)
    else:
        procabortedreason[col] = pd.to_datetime(procabortedreason[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 448, Finished, Available)

Success


In [447]:
# load guidancemethodid from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/GuidanceMethodID.parquet"
guidancemethodid = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    guidancemethodid[col] = guidancemethodid[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
guidancemethodid['facility'] = pd.to_numeric(guidancemethodid['facility'])

# Apply the function to the specified columns
guidancemethodid['OtherID'] = guidancemethodid['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        guidancemethodid[col] = pd.to_datetime(guidancemethodid[col], format=fmt)
    else:
        guidancemethodid[col] = pd.to_datetime(guidancemethodid[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 449, Finished, Available)

Success


In [448]:
# load concomitantproctype from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/ConcomitantProcType.parquet"
concomitantproctype = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    concomitantproctype[col] = concomitantproctype[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
concomitantproctype['facility'] = pd.to_numeric(concomitantproctype['facility'])

# Apply the function to the specified columns
concomitantproctype['OtherID'] = concomitantproctype['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        concomitantproctype[col] = pd.to_datetime(concomitantproctype[col], format=fmt)
    else:
        concomitantproctype[col] = pd.to_datetime(concomitantproctype[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 450, Finished, Available)

Success


### Begin Append and Merging Dataframes

In [449]:
# List of columns to append from 'in_hospital'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'ProcedureStartDateTime', 
    'ProcedureLocation', 
    'Anesthesia', 
    'DCDate', 
    'TEEPerfLAAO', 
    'TEEDateLAAO', 
    'ProcAtrialThromDetect', 
    'LAAO_OrWid', 
    'ProcedureEndDateTime', 
    'ResidualLeak', 
    'ResidualLeakNA', 
    'OHSConversion', 
    'OHSConversionReason', 
    'FluoroDoseKerm', 
    'ContrastVol', 
    'FluoroDoseDAP2', 
    'IntraProcAnticoag', 
    'Warfarin', 
    'ProcHeparin2', 
    'ProcHeparinInitAdminTime', 
    'ProcBivalirudin2', 
    'ProcOtherAnticoag2', 
    'AnticoagReversal', 
    'facility', 
    'FluoroDoseDAP2_Units', 
    'FluoroDoseKerm_Units'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = in_hospital[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(procinfo.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'procinfo' DataFrame
new_data_to_append = new_data_to_append[procinfo.columns]

# Use concat instead of append
combined_df = pd.concat([procinfo, new_data_to_append], ignore_index=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 451, Finished, Available)

Success


In [450]:
#Bring in needed columns to combined_df from proclaaoind 1.3 df


# List of columns from 'proclaaoind' to bring into 'combined_df'
specified_columns = [
    'ProcLAAOInd - High fall risk', 
    'ProcLAAOInd - History of major bleed', 
    'ProcLAAOInd - Increased thromboembolic stroke risk', 
    'ProcLAAOInd - Labile INR', 
    'ProcLAAOInd - Non-compliance with anticoagulation therapy', 
    'ProcLAAOInd - Patient preference'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
proclaaoind.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from proclaaoind
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(proclaaoind[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)

print("Success")


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 452, Finished, Available)

Success


In [451]:
#Bring in needed columns to combined_df from proccanceledreason 1.3 df

# List of columns from 'proccanceledreason' to bring into 'combined_df'
specified_columns = [
    'ProcCanceled', 
    'ProcCanceledReason - Anatomy not conducive for implant', 
    'ProcCanceledReason - Appendage too large (for device implant)', 
    'ProcCanceledReason - Appendage too small (for device implant)', 
    'ProcCanceledReason - Catherization challenge', 
    'ProcCanceledReason - Decompensation in patient condition', 
    'ProcCanceledReason - Epicardial access issue', 
    'ProcCanceledReason - Thrombus detected', 
    'ProcCanceledReason - Unanticipated patient condition', 
    'ProcCanceledReason -  Patient/Family choice'

]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
proccanceledreason.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from proccanceledreason
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(proccanceledreason[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)


print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 453, Finished, Available)

Success


In [452]:
#Bring in needed columns to combined_df from procabortedreason 1.3 df


# List of columns from 'procabortedreason' to bring into 'combined_df'
specified_columns = [
    'ProcAborted', 
    'ProcAbortedReason - Anatomy not conducive for implant', 
    'ProcAbortedReason - Appendage too large (for device implant)', 
    'ProcAbortedReason - Appendage too small (for device implant)', 
    'ProcAbortedReason - Catherization challenge', 
    'ProcAbortedReason - Decompensation in patient condition', 
    'ProcAbortedReason -  Device related', 
    'ProcAbortedReason - Transcatheter device retrieval', 
    'ProcAbortedReason - Device release criteria not met', 
    'ProcAbortedReason - Epicardial access issue', 
    'ProcAbortedReason - Surgical device retrieval', 
    'ProcAbortedReason - Device associated thrombus developed during procedure', 
    'ProcAbortedReason - Unanticipated patient condition', 
    'ProcAbortedReason - Patient/Family choice'


]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
procabortedreason.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from procabortedreason
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(procabortedreason[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)


print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 454, Finished, Available)

Success


In [453]:
#Bring in needed columns to combined_df from guidancemethodid 1.3 df


# List of columns from 'guidancemethodid' to bring into 'combined_df'
specified_columns = [
    'GuidanceMethodID - Intracardiac three dimensional echocardiography', 
    'GuidanceMethodID - Electro Anatomic Mapping', 
    'GuidanceMethodID - Fluoroscopy', 
    'GuidanceMethodID - Transesophageal Echocardiogram (TEE)'
]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
guidancemethodid.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from guidancemethodid
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(guidancemethodid[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)


print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 455, Finished, Available)

Success


In [454]:
#Bring in needed columns to combined_df from concomitantproctype 1.3 df


# List of columns from 'concomitantproctype' to bring into 'combined_df'
specified_columns = [
    'ConcomitantProcPerf', 
    'ConcomitantProcType - AFib Ablation', 
    'ConcomitantProcType - ICD', 
    'ConcomitantProcType - PCI', 
    'ConcomitantProcType - TAVR', 
    'ConcomitantProcType - TMVR', 
    'ConcomitantProcType - ASD Closure Congenital', 
    'ConcomitantProcType - ASD Closure Iatrogenic', 
    'ConcomitantProcType - PFO Closure Congenital' 

]

# Set the index of both dataframes to the key columns for alignment
combined_df.set_index('Pat_ID', inplace=True)
concomitantproctype.set_index('Pat_ID', inplace=True)

# Update only the specified columns in combined_df with the values from concomitantproctype
for column in specified_columns:
    if column in combined_df.columns:
        combined_df[column].update(concomitantproctype[column])

# Reset the index to return to the original structure
combined_df.reset_index(inplace=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 456, Finished, Available)

Success


In [455]:
columns_mapping = {
    'ProcLAAOInd - High fall risk': 'ProcLAAOInd_High_Fall_Risk',
    'ProcLAAOInd - History of major bleed': 'ProcLAAOInd_History_Of_Major_Bleed',
    'ProcLAAOInd - Increased thromboembolic stroke risk': 'ProcLAAOInd_Increased_Thromboembolic_Stroke_Risk',
    'ProcLAAOInd - Labile INR': 'ProcLAAOInd_Labile_INR',
    'ProcLAAOInd - Non-compliance with anticoagulation therapy': 'ProcLAAOInd_Non_Compliance_With_Anticoagulation_Therapy',
    'ProcLAAOInd - Patient preference': 'ProcLAAOInd_Patient_Preference',
    'ProcCanceledReason - Anatomy not conducive for implant': 'ProcCanceledReason_Anatomy_Not_Conducive_For_Implant',
    'ProcCanceledReason - Appendage too large (for device implant)': 'ProcCanceledReason_Appendage_Too_Large_For_Device_Implant',
    'ProcCanceledReason - Appendage too small (for device implant)': 'ProcCanceledReason_Appendage_Too_Small_For_Device_Implant',
    'ProcCanceledReason - Catherization challenge': 'ProcCanceledReason_Catherization_Challenge',
    'ProcCanceledReason - Decompensation in patient condition': 'ProcCanceledReason_Decompensation_In_Patient_Condition',
    'ProcCanceledReason - Epicardial access issue': 'ProcCanceledReason_Epicardial_Access_Issue',
    'ProcCanceledReason - Thrombus detected': 'ProcCanceledReason_Thrombus_Detected',
    'ProcCanceledReason - Unanticipated patient condition': 'ProcCanceledReason_Unanticipated_Patient_Condition',
    'ProcCanceledReason -  Patient/Family choice': 'ProcCanceledReason_Patient_Family_Choice',
    'ProcAbortedReason - Anatomy not conducive for implant': 'ProcAbortedReason_Anatomy_Not_Conducive_For_Implant',
    'ProcAbortedReason - Appendage too large (for device implant)': 'ProcAbortedReason_Appendage_Too_Large_For_Device_Implant',
    'ProcAbortedReason - Appendage too small (for device implant)': 'ProcAbortedReason_Appendage_Too_Small_For_Device_Implant',
    'ProcAbortedReason - Catherization challenge': 'ProcAbortedReason_Catherization_Challenge',
    'ProcAbortedReason - Decompensation in patient condition': 'ProcAbortedReason_Decompensation_In_Patient_Condition',
    'ProcAbortedReason -  Device related': 'ProcAbortedReason_Device_Related',
    'ProcAbortedReason - Transcatheter device retrieval': 'ProcAbortedReason_Transcatheter_Device_Retrieval',
    'ProcAbortedReason - Device release criteria not met': 'ProcAbortedReason_Device_Release_Criteria_Not_Met',
    'ProcAbortedReason - Epicardial access issue': 'ProcAbortedReason_Epicardial_Access_Issue',
    'ProcAbortedReason - Surgical device retrieval': 'ProcAbortedReason_Surgical_Device_Retrieval',
    'ProcAbortedReason - Device associated thrombus developed during procedure': 'ProcAbortedReason_Device_Associated_Thrombus_Developed_During_Procedure',
    'ProcAbortedReason - Unanticipated patient condition': 'ProcAbortedReason_Unanticipated_Patient_Condition',
    'ProcAbortedReason - Patient/Family choice': 'ProcAbortedReason_Patient_Family_Choice',
    'GuidanceMethodID - Intracardiac three dimensional echocardiography': 'GuidanceMethodID_Intracardiac_Three_Dimensional_Echocardiography',
    'GuidanceMethodID - Electro Anatomic Mapping': 'GuidanceMethodID_Electro_Anatomic_Mapping',
    'GuidanceMethodID - Fluoroscopy': 'GuidanceMethodID_Fluoroscopy',
    'GuidanceMethodID - Transesophageal Echocardiogram (TEE)': 'GuidanceMethodID_Transesophageal_Echocardiogram_TEE',
    'ConcomitantProcType - AFib Ablation': 'ConcomitantProcType_AFib_Ablation',
    'ConcomitantProcType - ICD': 'ConcomitantProcType_ICD',
    'ConcomitantProcType - PCI': 'ConcomitantProcType_PCI',
    'ConcomitantProcType - TAVR': 'ConcomitantProcType_TAVR',
    'ConcomitantProcType - TMVR': 'ConcomitantProcType_TMVR',
    'ConcomitantProcType - ASD Closure Congenital': 'ConcomitantProcType_ASD_Closure_Congenital',
    'ConcomitantProcType - ASD Closure Iatrogenic': 'ConcomitantProcType_ASD_Closure_Iatrogenic',
    'ConcomitantProcType - PFO Closure Congenital': 'ConcomitantProcType_PFO_Closure_Congenital',
}

# Rename the columns
combined_df_renamed = combined_df.rename(columns=columns_mapping)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 457, Finished, Available)

Success


In [456]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoProcInfoIngestion").getOrCreate()

# Define the schema explicitly with all column names and types
schema = StructType([
    StructField("Pat_ID", StringType(), True),
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("ProcedureStartDateTime", TimestampType(), True),
    StructField("DCDate", DateType(), True),
    StructField("TEEPerfLAAO", StringType(), True),
    StructField("TEEDateLAAO", DateType(), True),
    StructField("ProcAtrialThromDetect", StringType(), True),
    StructField("LAAO_OrWid", DoubleType(), True),
    StructField("ProcedureEndDateTime", TimestampType(), True),
    StructField("SDM_Proc", StringType(), True),
    StructField("SDM_Tool", StringType(), True),
    StructField("SDM_Tool_Name", StringType(), True),
    StructField("ProcedureLocation", StringType(), True),
    StructField("Anesthesia", StringType(), True),
    StructField("ProcLAAOInd_High_Fall_Risk", StringType(), True),
    StructField("ProcLAAOInd_History_Of_Major_Bleed", StringType(), True),
    StructField("ProcLAAOInd_Clinically_significant_bleeding_risk_Other_than_those_listed_here", StringType(), True),
    StructField("ProcLAAOInd_Increased_Thromboembolic_Stroke_Risk", StringType(), True),
    StructField("ProcLAAOInd_Labile_INR", StringType(), True),
    StructField("ProcLAAOInd_Non_Compliance_With_Anticoagulation_Therapy", StringType(), True),
    StructField("ProcLAAOInd_Patient_Preference", StringType(), True),
    StructField("ProcCanceled", StringType(), True),
    StructField("ProcCanceledReason_Anatomy_Not_Conducive_For_Implant", StringType(), True),
    StructField("ProcCanceledReason_Appendage_Too_Large_For_Device_Implant", StringType(), True),
    StructField("ProcCanceledReason_Appendage_Too_Small_For_Device_Implant", StringType(), True),
    StructField("ProcCanceledReason_Catherization_Challenge", StringType(), True),
    StructField("ProcCanceledReason_Decompensation_In_Patient_Condition", StringType(), True),
    StructField("ProcCanceledReason_Epicardial_Access_Issue", StringType(), True),
    StructField("ProcCanceledReason_Thrombus_Detected", StringType(), True),
    StructField("ProcCanceledReason_Unanticipated_Patient_Condition", StringType(), True),
    StructField("ProcCanceledReason_Patient_Family_Choice", StringType(), True),
    StructField("ProcAborted", StringType(), True),
    StructField("ProcAbortedReason_Anatomy_Not_Conducive_For_Implant", StringType(), True),
    StructField("ProcAbortedReason_Appendage_Too_Large_For_Device_Implant", StringType(), True),
    StructField("ProcAbortedReason_Appendage_Too_Small_For_Device_Implant", StringType(), True),
    StructField("ProcAbortedReason_Catherization_Challenge", StringType(), True),
    StructField("ProcAbortedReason_Decompensation_In_Patient_Condition", StringType(), True),
    StructField("ProcAbortedReason_Device_Related", StringType(), True),
    StructField("ProcAbortedReason_Transcatheter_Device_Retrieval", StringType(), True),
    StructField("ProcAbortedReason_Device_Release_Criteria_Not_Met", StringType(), True),
    StructField("ProcAbortedReason_Epicardial_Access_Issue", StringType(), True),
    StructField("ProcAbortedReason_Surgical_Device_Retrieval", StringType(), True),
    StructField("ProcAbortedReason_Device_Associated_Thrombus_Developed_During_Procedure", StringType(), True),
    StructField("ProcAbortedReason_Unanticipated_Patient_Condition", StringType(), True),
    StructField("ProcAbortedReason_Patient_Family_Choice", StringType(), True),
    StructField("ResidualLeak", DoubleType(), True),
    StructField("ResidualLeakNA", StringType(), True),
    StructField("GuidanceMethodID_Intracardiac_Three_Dimensional_Echocardiography", StringType(), True),
    StructField("GuidanceMethodID_Electro_Anatomic_Mapping", StringType(), True),
    StructField("GuidanceMethodID_Fluoroscopy", StringType(), True),
    StructField("GuidanceMethodID_Transesophageal_Echocardiogram_TEE", StringType(), True),
    StructField("OHSConversion", StringType(), True),
    StructField("OHSConversionReason", StringType(), True),
    StructField("ConcomitantProcPerf", StringType(), True),
    StructField("ConcomitantProcType_AFib_Ablation", StringType(), True),
    StructField("ConcomitantProcType_ICD", StringType(), True),
    StructField("ConcomitantProcType_PCI", StringType(), True),
    StructField("ConcomitantProcType_TAVR", StringType(), True),
    StructField("ConcomitantProcType_TMVR", StringType(), True),
    StructField("ConcomitantProcType_ASD_Closure_Congenital", StringType(), True),
    StructField("ConcomitantProcType_ASD_Closure_Iatrogenic", StringType(), True),
    StructField("ConcomitantProcType_PFO_Closure_Congenital", StringType(), True),
    StructField("FluoroDoseKerm", DoubleType(), True),
    StructField("ContrastVol", DoubleType(), True),
    StructField("FluoroDoseDAP2", DoubleType(), True),
    StructField("IntraProcAnticoag", StringType(), True),
    StructField("Warfarin", StringType(), True),
    StructField("ProcHeparin2", StringType(), True),
    StructField("ProcHeparinInitAdminTime", StringType(), True),
    StructField("ProcBivalirudin2", StringType(), True),
    StructField("ProcOtherAnticoag2", StringType(), True),
    StructField("AnticoagReversal", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("FluoroDoseKerm_Units", StringType(), True),
    StructField("FluoroDoseDAP2_Units", StringType(), True),

])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
laao_procinfo_df = spark.createDataFrame(combined_df_renamed, schema=schema)

# Now write the DataFrame to the Delta table
laao_procinfo_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_procinfo")

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 458, Finished, Available)

Success


## Operators

In [457]:
# load operators from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Operators.parquet"
operators = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    operators[col] = operators[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
operators['facility'] = pd.to_numeric(operators['facility'])

# Apply the function to the specified columns
operators['OtherID'] = operators['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
}

for col, fmt in date_columns.items():
    if fmt:
        operators[col] = pd.to_datetime(operators[col], format=fmt)
    else:
        operators[col] = pd.to_datetime(operators[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 459, Finished, Available)

Success


In [458]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d', 
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 460, Finished, Available)

Success


In [459]:
#start by appending in_hospital

# List of columns to append from 'in_hospital'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'ProcedureStartDateTime', 
    'DCDate', 
    'OperA_LastName2', 
    'OperA_FirstName2', 
    'OperA_MidName2', 
    'OperA_NPI2', 
    'facility'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = in_hospital[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(operators.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'operators' DataFrame
new_data_to_append = new_data_to_append[operators.columns]

# Use concat instead of append
combined_df = pd.concat([operators, new_data_to_append], ignore_index=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 461, Finished, Available)

Success


In [460]:
combined_df.info()

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 462, Finished, Available)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3610 entries, 0 to 3609
Data columns (total 14 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   NCDRPatientID           3610 non-null   object        
 1   LastName                3610 non-null   object        
 2   FirstName               3610 non-null   object        
 3   MidName                 2906 non-null   object        
 4   OtherID                 3610 non-null   object        
 5   ArrivalDate             3610 non-null   datetime64[ns]
 6   ProcedureStartDateTime  3610 non-null   datetime64[ns]
 7   DCDate                  3610 non-null   datetime64[ns]
 8   OperA_LastName2         3610 non-null   object        
 9   OperA_FirstName2        3610 non-null   object        
 10  OperA_MidName2          774 non-null    object        
 11  OperA_NPI2              3610 non-null   object        
 12  facility                3610 non-null   int64   

In [461]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoOperatorsIngestion").getOrCreate()

# Define the schema explicitly based on the given columns
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("ProcedureStartDateTime", TimestampType(), True),
    StructField("DCDate", DateType(), True),
    StructField("OperA_LastName2", StringType(), True),
    StructField("OperA_FirstName2", StringType(), True),
    StructField("OperA_MidName2", StringType(), True),
    StructField("OperA_NPI2", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
combined_spark_df = spark.createDataFrame(combined_df, schema=schema)

# Now try writing the DataFrame to the Delta table again
combined_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_operators")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 463, Finished, Available)

  Expected bytes, got a 'int' object
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Success


## Fellows
New data for v1.4, no data available for v1.3.

In [462]:
# load fellows from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Fellows.parquet"
fellows = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    fellows[col] = fellows[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
fellows['facility'] = pd.to_numeric(fellows['facility'])

# Apply the function to the specified columns
fellows['OtherID'] = fellows['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
}

for col, fmt in date_columns.items():
    if fmt:
        fellows[col] = pd.to_datetime(fellows[col], format=fmt)
    else:
        fellows[col] = pd.to_datetime(fellows[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 464, Finished, Available)

Success


In [463]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoFellowsIngestion").getOrCreate()

# Define the schema explicitly for laao_fellows table
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("ProcedureStartDateTime", TimestampType(), True),
    StructField("DCDate", DateType(), True),
    StructField("FITProgID", IntegerType(), True),
    StructField("FIT_LastName", StringType(), True),
    StructField("FIT_FirstName", StringType(), True),
    StructField("FIT_MidName", StringType(), True),
    StructField("FIT_NPI", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
    # Add any additional fields here as needed
])

# Convert the pandas DataFrame to a Spark DataFrame using the defined schema
laao_fellows_spark_df = spark.createDataFrame(fellows, schema=schema)

# Write the DataFrame to the Delta table
laao_fellows_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_fellows")

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 465, Finished, Available)

  Expected a string or bytes dtype, got int64
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Success


## AccessSystems

In [464]:
# load accesssystems from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/AccessSystems.parquet"
accesssystems = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    accesssystems[col] = accesssystems[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
accesssystems['facility'] = pd.to_numeric(accesssystems['facility'])

# Apply the function to the specified columns
accesssystems['OtherID'] = accesssystems['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        accesssystems[col] = pd.to_datetime(accesssystems[col], format=fmt)
    else:
        accesssystems[col] = pd.to_datetime(accesssystems[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 466, Finished, Available)

Success


In [465]:
# load accesssysid from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/AccessSysID.parquet"
accesssysid = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    accesssysid[col] = accesssysid[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
accesssysid['facility'] = pd.to_numeric(accesssysid['facility'])

# Apply the function to the specified columns
accesssysid['OtherID'] = accesssysid['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        accesssysid[col] = pd.to_datetime(accesssysid[col], format=fmt)
    else:
        accesssysid[col] = pd.to_datetime(accesssysid[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 467, Finished, Available)

Success


In [466]:
#start by appending accesssysid

# List of columns to append from 'accesssysid'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'ProcedureStartDateTime', 
    'DCDate', 
    'AccessSysCounter', 
    'AccessSysID', 
    'facility'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = accesssysid[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(accesssystems.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'accesssystems' DataFrame
new_data_to_append = new_data_to_append[accesssystems.columns]

# Use concat instead of append
combined_df = pd.concat([accesssystems, new_data_to_append], ignore_index=True)


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 468, Finished, Available)

In [467]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LA_AccessSystems_Ingestion").getOrCreate()

# Define the schema explicitly for the laao_accesssystems table
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("ProcedureStartDateTime", TimestampType(), True),
    StructField("DCDate", DateType(), True),
    StructField("AccessSysCounter", IntegerType(), True),
    StructField("AccessSysID", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
laao_accesssystems_df = spark.createDataFrame(combined_df, schema=schema)

# Write the DataFrame to the Delta table
laao_accesssystems_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_accesssystems")

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 469, Finished, Available)

Success


## Devices

In [468]:
# load devices from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Devices.parquet"
devices = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    devices[col] = devices[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
devices['facility'] = pd.to_numeric(devices['facility'])

# Apply the function to the specified columns
devices['OtherID'] = devices['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        devices[col] = pd.to_datetime(devices[col], format=fmt)
    else:
        devices[col] = pd.to_datetime(devices[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 470, Finished, Available)

Success


In [469]:
# load accesssysid from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/AccessSysID.parquet"
accesssysid = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    accesssysid[col] = accesssysid[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
accesssysid['facility'] = pd.to_numeric(accesssysid['facility'])

# Apply the function to the specified columns
accesssysid['OtherID'] = accesssysid['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        accesssysid[col] = pd.to_datetime(accesssysid[col], format=fmt)
    else:
        accesssysid[col] = pd.to_datetime(accesssysid[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 471, Finished, Available)

Success


In [470]:
#start by appending accesssysid

# List of columns to append from 'accesssysid'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'ProcedureStartDateTime', 
    'DCDate', 
    'DevCounter', 
    'LAADevID', 
    'Dev_UDIDirectID', 
    'LAAIsolationApproach', 
    'DevSucdep', 
    'OutDevUnsucDepl', 
    'facility' 

]

# Create a new DataFrame with only the specified columns
new_data_to_append = accesssysid[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(devices.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'devices' DataFrame
new_data_to_append = new_data_to_append[devices.columns]

# Use concat instead of append
combined_df = pd.concat([devices, new_data_to_append], ignore_index=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 472, Finished, Available)

Success


In [471]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LAAODevicesIngestion").getOrCreate()

# Define the schema explicitly to match the laao_accesssystems table
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("ProcedureStartDateTime", TimestampType(), True),
    StructField("DCDate", DateType(), True),
    StructField("DevCounter", IntegerType(), True),
    StructField("LAADevID", StringType(), True),
    StructField("Dev_UDIDirectID", FloatType(), True),
    StructField("LAAIsolationApproach", StringType(), True),
    StructField("DevSucdep", StringType(), True),
    StructField("OutDevUnsucDepl", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame using the defined schema
spark_df = spark.createDataFrame(combined_df, schema=schema)

# Write the DataFrame to the Delta table
spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_devices")

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 473, Finished, Available)

Success


## IPPEvents

In [472]:
# load ippevents from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/IPPEvents.parquet"
ippevents = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    ippevents[col] = ippevents[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
ippevents['facility'] = pd.to_numeric(ippevents['facility'])

# Apply the function to the specified columns
ippevents['OtherID'] = ippevents['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'IntraPostProcEventDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        ippevents[col] = pd.to_datetime(ippevents[col], format=fmt)
    else:
        ippevents[col] = pd.to_datetime(ippevents[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 474, Finished, Available)

Success


In [473]:
# load event_id from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/Event ID.parquet"
event_id = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    event_id[col] = event_id[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
event_id['facility'] = pd.to_numeric(event_id['facility'])

# Apply the function to the specified columns
event_id['OtherID'] = event_id['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'IntraPostProcEventDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        event_id[col] = pd.to_datetime(event_id[col], format=fmt)
    else:
        event_id[col] = pd.to_datetime(event_id[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 475, Finished, Available)

Success


In [474]:
#start by appending event_id

# List of columns to append from 'event_id'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'ProcedureStartDateTime', 
    'DCDate', 
    'ProcEvents', 
    'PostProcOccurred', 
    'IntraPostProcEventDate', 
    'facility' 
]

# Create a new DataFrame with only the specified columns
new_data_to_append = event_id[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(ippevents.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'ippevents' DataFrame
new_data_to_append = new_data_to_append[ippevents.columns]

# Use concat instead of append
combined_df = pd.concat([ippevents, new_data_to_append], ignore_index=True)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 476, Finished, Available)

Success


In [475]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LAOOIPPEventsIngestion").getOrCreate()

# Define the schema explicitly based on the `laao_ippevents` table structure
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("ProcedureStartDateTime", TimestampType(), True),
    StructField("DCDate", DateType(), True),
    StructField("ProcEvents", StringType(), True),
    StructField("PostProcOccurred", StringType(), True),
    StructField("IntraPostProcEventDate", DateType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),   
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
combined_spark_df = spark.createDataFrame(combined_df, schema=schema)

#write to delta table
combined_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_ippevents")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 477, Finished, Available)

Success


## PostProcLabs

In [476]:
# load postproclabs from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/PostProcLabs.parquet"
postproclabs = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    postproclabs[col] = postproclabs[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
postproclabs['facility'] = pd.to_numeric(postproclabs['facility'])

# Apply the function to the specified columns
postproclabs['OtherID'] = postproclabs['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        postproclabs[col] = pd.to_datetime(postproclabs[col], format=fmt)
    else:
        postproclabs[col] = pd.to_datetime(postproclabs[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 478, Finished, Available)

Success


In [477]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d', 
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 479, Finished, Available)

Success


In [478]:
#start by appending in_hospital

# List of columns to append from 'in_hospital'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'ProcedureStartDateTime', 
    'DCDate', 
    'PostProcPeakCreat', 
    'PostProcPeakCreatND', 
    'PostProcCreat2', 
    'PostProcCreatND2', 
    'PostProcHgb2', 
    'PostProcHgbND2', 
    'facility'

]

# Create a new DataFrame with only the specified columns
new_data_to_append = in_hospital[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(postproclabs.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'postproclabs' DataFrame
new_data_to_append = new_data_to_append[postproclabs.columns]

# Use concat instead of append
combined_df = pd.concat([postproclabs, new_data_to_append], ignore_index=True)

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 480, Finished, Available)

Success


In [479]:
# Initialize SparkSession
spark = SparkSession.builder.appName("LAAOPostProcLabsIngestion").getOrCreate()

# Define the schema explicitly
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("ProcedureStartDateTime", TimestampType(), True),
    StructField("DCDate", DateType(), True),
    StructField("PostProcPeakCreat", DoubleType(), True),
    StructField("PostProcPeakCreatND", StringType(), True),
    StructField("PostProcCreat2", DoubleType(), True),
    StructField("PostProcCreatND2", StringType(), True),
    StructField("PostProcHgb2", DoubleType(), True),
    StructField("PostProcHgbND2", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
laao_postprocmeds_df = spark.createDataFrame(combined_df, schema=schema)

# Now, write the DataFrame to the Delta table
laao_postprocmeds_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_postproclabs")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 481, Finished, Available)

Success


## Discharge

In [480]:
# load discharge from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/Discharge.parquet"
discharge = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    discharge[col] = discharge[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
discharge['facility'] = pd.to_numeric(discharge['facility'])

# Apply the function to the specified columns
discharge['OtherID'] = discharge['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        discharge[col] = pd.to_datetime(discharge[col], format=fmt)
    else:
        discharge[col] = pd.to_datetime(discharge[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 482, Finished, Available)

Success


In [481]:
# load in_hospital from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/In-hospital.parquet"
in_hospital = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['ZipCode', 'OtherID', 'SSN', 'NCDRPatientID', 'OperA_NPI2']:
    in_hospital[col] = in_hospital[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
in_hospital['facility'] = pd.to_numeric(in_hospital['facility'])

# Apply the function to the specified columns
in_hospital['OtherID'] = in_hospital['OtherID'].apply(remove_decimal_point)
in_hospital['SSN'] = in_hospital['SSN'].apply(remove_decimal_point)
in_hospital['ZipCode'] = in_hospital['ZipCode'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'DOB': '%Y-%m-%d',
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d',
    'AFibSurgAblDate': '%Y-%m-%d', 
    'AFibFlutterCathAblDate': '%Y-%m-%d',
    'ProcedureStartDateTime': '%Y-%m-%d %H:%M:%S',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S',
    'TEEDateLAAO': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        in_hospital[col] = pd.to_datetime(in_hospital[col], format=fmt)
    else:
        in_hospital[col] = pd.to_datetime(in_hospital[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 483, Finished, Available)

Success


In [482]:
#start by appending in_hospital to get natrual key columns

# List of columns to append from 'in_hospital'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'ArrivalDate', 
    'DCDate', 
    'Sx_F', 
    'PCIOther', 
    'DCStatus', 
    'DCLocation', 
    'DCHospice', 
    'DeathProcedure', 
    'DeathCause', 
    'facility'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = in_hospital[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(discharge.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'discharge' DataFrame
new_data_to_append = new_data_to_append[discharge.columns]

# Use concat instead of append
combined_df = pd.concat([discharge, new_data_to_append], ignore_index=True)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 484, Finished, Available)

Success


In [483]:
# Initialize SparkSession
spark = SparkSession.builder.appName("DischargeIngestion").getOrCreate()

# Define the schema explicitly for laao_discharge
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("DCDate", DateType(), True),
    StructField("Sx_F", StringType(), True),
    StructField("PCIOther", StringType(), True),
    StructField("DCStatus", StringType(), True),
    StructField("DCLocation", StringType(), True),
    StructField("DCHospice", StringType(), True),
    StructField("DeathProcedure", StringType(), True),
    StructField("DeathCause", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
discharge_spark_df = spark.createDataFrame(combined_df, schema=schema)

# Writing the DataFrame to the Delta table
discharge_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_discharge")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 485, Finished, Available)

Success


## DischargeMeds

In [484]:
# load dischargemeds from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/DischargeMeds.parquet"
dischargemeds = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    dischargemeds[col] = dischargemeds[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
dischargemeds['facility'] = pd.to_numeric(dischargemeds['facility'])

# Apply the function to the specified columns
dischargemeds['OtherID'] = dischargemeds['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        dischargemeds[col] = pd.to_datetime(dischargemeds[col], format=fmt)
    else:
        dischargemeds[col] = pd.to_datetime(dischargemeds[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 486, Finished, Available)

Success


In [485]:
# load postprocmedstrategy from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/PostProcMedStrategy_MedID.parquet"
postprocmedstrategy = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    postprocmedstrategy[col] = postprocmedstrategy[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
postprocmedstrategy['facility'] = pd.to_numeric(postprocmedstrategy['facility'])

# Apply the function to the specified columns
postprocmedstrategy['OtherID'] = postprocmedstrategy['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'ArrivalDate': '%Y-%m-%d',
    'DCDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        postprocmedstrategy[col] = pd.to_datetime(postprocmedstrategy[col], format=fmt)
    else:
        postprocmedstrategy[col] = pd.to_datetime(postprocmedstrategy[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 487, Finished, Available)

Success


In [486]:
# Step 1: Melting the DataFrame to create 'DC_MedID'
melted_df = pd.melt(postprocmedstrategy, 
                    id_vars=['NCDRPatientID', 'LastName', 'FirstName', 'MidName', 'OtherID', 'ArrivalDate', 'DCDate', 'facility', 'Pat_ID'], 
                    value_vars=['Aspirin (81-100 mg)', 'Aspirin (101-324 mg)', 'Aspirin 325 mg', 'DOAC', 'P2Y12', 'Warfarin'], 
                    var_name='DC_MedID', value_name='Value')

# Step 2: Generating 'DC_MedAdmin'
melted_df['DC_MedAdmin'] = np.where(
    (melted_df['Value'].notna()) & (melted_df['Value'].str.contains('Implant', na=False)), 
    'Yes', 
    'No - No Reason'
)
#Step 3: Create DC_MedDose
# Using a lambda function to check if 'Aspirin' is in 'DC_MedID', then clean up the text
melted_df['DC_MedDose'] = melted_df.apply(
    lambda row: re.sub(r'\(|\)', '', row['DC_MedID']) if 'Aspirin' in row['DC_MedID'] and 
    pd.notna(row['Value']) and 'Implant' in str(row['Value']) else np.nan, axis=1
)

# Step for updating DC_MedID for Aspirin entries
melted_df['DC_MedID'] = melted_df.apply(
    lambda x: 'Aspirin' if 'Aspirin' in x['DC_MedID'] else x['DC_MedID'], axis=1
)

# Step to remove rows where DC_MedID is 'Aspirin' and DC_MedDose is null
melted_df = melted_df[~((melted_df['DC_MedID'] == 'Aspirin') & (melted_df['DC_MedDose'].isnull()))]

melted_df.drop('Value', axis = 1, inplace = True)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 488, Finished, Available)

Success


In [487]:
#start by appending melted_df

# List of columns to append from 'melted_df'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID',
    'ArrivalDate', 
    'DCDate', 
    'facility', 
    'DC_MedID', 
    'DC_MedAdmin',
    'DC_MedDose'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = melted_df[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(dischargemeds.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'dischargemeds' DataFrame
new_data_to_append = new_data_to_append[dischargemeds.columns]

# Use concat instead of append
combined_df = pd.concat([dischargemeds, new_data_to_append], ignore_index=True)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 489, Finished, Available)

Success


In [488]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Assuming you have a pandas DataFrame named `dischargemeds_df` with the data for ingestion

# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoDischargemedsIngestion").getOrCreate()

# Define the schema explicitly for the laao_dischargemeds table
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("ArrivalDate", DateType(), True),
    StructField("DCDate", DateType(), True),
    StructField("DC_MedID", StringType(), True),
    StructField("DC_MedAdmin", StringType(), True),
    StructField("DC_MedDose", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True)
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
laao_dischargemeds_spark_df = spark.createDataFrame(combined_df, schema=schema)

# Write the DataFrame to the Delta table
laao_dischargemeds_spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_dischargemeds")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 490, Finished, Available)

Success


## FollowUp

In [489]:
# load followup from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/FollowUp.parquet"
followup = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    followup[col] = followup[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
followup['facility'] = pd.to_numeric(followup['facility'])

# Apply the function to the specified columns
followup['OtherID'] = followup['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
    'F_DeathDate':'%Y-%m-%d',
    'TTEDate_F':'%Y-%m-%d',
    'F_TEEDate':'%Y-%m-%d',
    'F_CardiacCTDate':'%Y-%m-%d',
    'F_CardiacMRIDate':'%Y-%m-%d',
    'F_ICEDate':'%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        followup[col] = pd.to_datetime(followup[col], format=fmt)
    else:
        followup[col] = pd.to_datetime(followup[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 491, Finished, Available)

Success


In [490]:
# load follow_up from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/Follow-up.parquet"
follow_up = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    follow_up[col] = follow_up[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
follow_up['facility'] = pd.to_numeric(follow_up['facility'])

# Apply the function to the specified columns
follow_up['OtherID'] = follow_up['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
    'F_DeathDate':'%Y-%m-%d',
    'TTEDate_F':'%Y-%m-%d',
    'F_TEEDate':'%Y-%m-%d',
    'F_CardiacCTDate':'%Y-%m-%d',
    'F_CardiacMRIDate':'%Y-%m-%d',
    'F_ICEDate':'%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        follow_up[col] = pd.to_datetime(follow_up[col], format=fmt)
    else:
        follow_up[col] = pd.to_datetime(follow_up[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 492, Finished, Available)

Success


In [491]:
#start by appending follow_up

# List of columns to append from 'follow_up'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'FURefArrivalDate', 
    'FU_RefDischargeDate', 
    'RefProcStartDateTime', 
    'F_AssessmentDate', 
    'FUInterv', 
    'F_Method - Office Visit', 
    'F_Method - Medical Records', 
    'F_Method - Letter from Medical Provider', 
    'F_Method - Phone Call', 
    'F_Method - Social Security Death Master File', 
    'F_Method - Hospitalized', 
    'F_Method - Other', 
    'F_Status', 
    'F_DeathDate', 
    'F_DeathCause', 
    'FU_LVEF', 
    'F_LVEF', 
    'TTEPerfFU', 
    'TTEDate_F', 
    'F_TEEPerf', 
    'F_TEEDate', 
    'F_CardiacCTPerf', 
    'F_CardiacCTDate', 
    'F_CardiacMRIPerf', 
    'F_CardiacMRIDate', 
    'F_ICEPerformed', 
    'F_ICEDate', 
    'F_AtrialThromDetect', 
    'ResidualLeakFU', 
    'ResidualLeakNAFU', 
    'Creat_FU', 
    'F_CreatND', 
    'LowHgbValue_F', 
    'HGBND_FU', 
    'F_RankinScore', 
    'F_mRS_NA', 
    'F_BIEPerf', 
    'F_BIEFeeding', 
    'F_BIEBathing', 
    'F_BIEGrooming', 
    'F_BIEDressing', 
    'F_BIEBowels', 
    'F_BIEBladder', 
    'F_BIEToilet', 
    'F_BIETransfers', 
    'F_BIEMobility', 
    'F_BIEStairs', 
    'facility'

]

# Create a new DataFrame with only the specified columns
new_data_to_append = follow_up[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(followup.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'followup' DataFrame
new_data_to_append = new_data_to_append[followup.columns]

# Use concat instead of append
combined_df = pd.concat([followup, new_data_to_append], ignore_index=True)


StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 493, Finished, Available)

In [492]:
# Rename columns to be compatible with Delta table
columns_mapping = {
    'F_Method - Office Visit': 'F_Method_Office_Visit',
    'F_Method - Medical Records': 'F_Method_Medical_Records',
    'F_Method - Letter from Medical Provider': 'F_Method_Letter_From_Medical_Provider',
    'F_Method - Phone Call': 'F_Method_Phone_Call',
    'F_Method - Social Security Death Master File': 'F_Method_Social_Security_Death_Master_File',
    'F_Method - Hospitalized': 'F_Method_Hospitalized',
    'F_Method - Other': 'F_Method_Other'
}

# Rename the columns
combined_df_renamed = combined_df.rename(columns=columns_mapping)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 494, Finished, Available)

Success


In [493]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType, FloatType

# Initialize SparkSession
spark = SparkSession.builder.appName("IngestionToDeltaTable").getOrCreate()

# Define the schema explicitly based on the provided columns
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("FURefArrivalDate", DateType(), True),
    StructField("FU_RefDischargeDate", DateType(), True),
    StructField("RefProcStartDateTime", TimestampType(), True),
    StructField("F_AssessmentDate", DateType(), True),
    StructField("FUInterv", StringType(), True),
    StructField("F_Method_Office_Visit", StringType(), True),
    StructField("F_Method_Medical_Records", StringType(), True),
    StructField("F_Method_Letter_From_Medical_Provider", StringType(), True),
    StructField("F_Method_Phone_Call", StringType(), True),
    StructField("F_Method_Social_Security_Death_Master_File", StringType(), True),
    StructField("F_Method_Hospitalized", StringType(), True),
    StructField("F_Method_Other", StringType(), True),
    StructField("F_Status", StringType(), True),
    StructField("F_DeathDate", DateType(), True),
    StructField("F_DeathCause", StringType(), True),
    StructField("FU_LVEF", StringType(), True),
    StructField("F_LVEF", FloatType(), True),
    StructField("TTEPerfFU", StringType(), True),
    StructField("TTEDate_F", DateType(), True),
    StructField("F_TEEPerf", StringType(), True),
    StructField("F_TEEDate", DateType(), True),
    StructField("F_CardiacCTPerf", StringType(), True),
    StructField("F_CardiacCTDate", DateType(), True),
    StructField("F_CardiacMRIPerf", StringType(), True),
    StructField("F_CardiacMRIDate", DateType(), True),
    StructField("F_ICEPerformed", StringType(), True),
    StructField("F_ICEDate", DateType(), True),
    StructField("F_AtrialThromDetect", StringType(), True),
    StructField("ResidualLeakFU", FloatType(), True),
    StructField("ResidualLeakNAFU", StringType(), True),
    StructField("Creat_FU", FloatType(), True),
    StructField("F_CreatND", StringType(), True),
    StructField("LowHgbValue_F", FloatType(), True),
    StructField("HGBND_FU", StringType(), True),
    StructField("F_RankinScore", StringType(), True),
    StructField("F_mRS_NA", StringType(), True),
    StructField("F_BIEPerf", StringType(), True),
    StructField("F_BIEFeeding", StringType(), True),
    StructField("F_BIEBathing", StringType(), True),
    StructField("F_BIEGrooming", StringType(), True),
    StructField("F_BIEDressing", StringType(), True),
    StructField("F_BIEBowels", StringType(), True),
    StructField("F_BIEBladder", StringType(), True),
    StructField("F_BIEToilet", StringType(), True),
    StructField("F_BIETransfers", StringType(), True),
    StructField("F_BIEMobility", StringType(), True),
    StructField("F_BIEStairs", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
laao_followup_df = spark.createDataFrame(combined_df_renamed, schema=schema)

# Write the DataFrame to the Delta table
laao_followup_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_followup")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 495, Finished, Available)

Success


## FMEDS

In [494]:
# load fmeds from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/FMEDS.parquet"
fmeds = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    fmeds[col] = fmeds[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
fmeds['facility'] = pd.to_numeric(fmeds['facility'])

# Apply the function to the specified columns
fmeds['OtherID'] = fmeds['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
    'F_WarfarinDiscontinuedDate':'%Y-%m-%d',
    'F_WarfarinResumedDate':'%Y-%m-%d',
    'F_DOACTherapyDiscontinuedDate':'%Y-%m-%d',
    'F_DOACTherapyResumedDate':'%Y-%m-%d',
    'F_AspirinTherapyDiscontinuedDate':'%Y-%m-%d',
    'F_AspirinTherapyResumedDate':'%Y-%m-%d',
    'F_P2Y12TherapyDiscontinuedDate':'%Y-%m-%d',
    'F_P2Y12TherapyResumedDate':'%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        fmeds[col] = pd.to_datetime(fmeds[col], format=fmt)
    else:
        fmeds[col] = pd.to_datetime(fmeds[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 496, Finished, Available)

Success


In [495]:
# load follow_up from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/Follow-up.parquet"
follow_up = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    follow_up[col] = follow_up[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
follow_up['facility'] = pd.to_numeric(follow_up['facility'])

# Apply the function to the specified columns
follow_up['OtherID'] = follow_up['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
    'F_DeathDate':'%Y-%m-%d',
    'TTEDate_F':'%Y-%m-%d',
    'F_TEEDate':'%Y-%m-%d',
    'F_CardiacCTDate':'%Y-%m-%d',
    'F_CardiacMRIDate':'%Y-%m-%d',
    'F_ICEDate':'%Y-%m-%d',
    'F_WarfarinDiscontinuedDate':'%Y-%m-%d',
    'F_WarfarinResumedDate':'%Y-%m-%d',
    'F_DOACTherapyDiscontinuedDate':'%Y-%m-%d',
    'F_DOACTherapyResumedDate':'%Y-%m-%d',
    'F_AspirinTherapyDiscontinuedDate':'%Y-%m-%d',
    'F_AspirinTherapyResumedDate':'%Y-%m-%d',
    'F_P2Y12TherapyDiscontinuedDate':'%Y-%m-%d',
    'F_P2Y12TherapyResumedDate':'%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        follow_up[col] = pd.to_datetime(follow_up[col], format=fmt)
    else:
        follow_up[col] = pd.to_datetime(follow_up[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 497, Finished, Available)

Success


In [496]:
#start by appending follow_up

# List of columns to append from 'follow_up'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'FURefArrivalDate', 
    'FU_RefDischargeDate', 
    'RefProcStartDateTime', 
    'F_AssessmentDate', 
    'F_WarfarinDiscontinued', 
    'F_WarfarinDiscontinuedDate', 
    'F_WarfarinResumed', 
    'F_WarfarinResumedDate', 
    'F_DOACTherapyDiscontinued', 
    'F_DOACTherapyDiscontinuedDate', 
    'F_DOACTherapyResumed', 
    'F_DOACTherapyResumedDate', 
    'F_AspirinTherapyDiscontinued', 
    'F_AspirinTherapyDiscontinuedDate', 
    'F_AspirinTherapyResumed', 
    'F_AspirinTherapyResumedDate', 
    'F_P2Y12TherapyDiscontinued', 
    'F_P2Y12TherapyDiscontinuedDate', 
    'F_P2Y12TherapyResumed', 
    'F_P2Y12TherapyResumedDate', 
    'facility' 
]

# Create a new DataFrame with only the specified columns
new_data_to_append = follow_up[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(fmeds.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'fmeds' DataFrame
new_data_to_append = new_data_to_append[fmeds.columns]

# Use concat instead of append
combined_df = pd.concat([fmeds, new_data_to_append], ignore_index=True)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 498, Finished, Available)

Success


In [497]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType

# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoFmedsIngestion").getOrCreate()

# Define the schema explicitly
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("FURefArrivalDate", DateType(), True),
    StructField("FU_RefDischargeDate", DateType(), True),
    StructField("RefProcStartDateTime", TimestampType(), True),
    StructField("F_AssessmentDate", DateType(), True),
    StructField("F_WarfarinDiscontinued", StringType(), True),
    StructField("F_WarfarinDiscontinuedDate", DateType(), True),
    StructField("F_WarfarinResumed", StringType(), True),
    StructField("F_WarfarinResumedDate", DateType(), True),
    StructField("F_DOACTherapyDiscontinued", StringType(), True),
    StructField("F_DOACTherapyDiscontinuedDate", DateType(), True),
    StructField("F_DOACTherapyResumed", StringType(), True),
    StructField("F_DOACTherapyResumedDate", DateType(), True),
    StructField("F_AspirinTherapyDiscontinued", StringType(), True),
    StructField("F_AspirinTherapyDiscontinuedDate", DateType(), True),
    StructField("F_AspirinTherapyResumed", StringType(), True),
    StructField("F_AspirinTherapyResumedDate", DateType(), True),
    StructField("F_P2Y12TherapyDiscontinued", StringType(), True),
    StructField("F_P2Y12TherapyDiscontinuedDate", DateType(), True),
    StructField("F_P2Y12TherapyResumed", StringType(), True),
    StructField("F_P2Y12TherapyResumedDate", DateType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True)
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
laao_fmeds_df = spark.createDataFrame(combined_df, schema=schema)

# Write the DataFrame to the Delta table
laao_fmeds_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_fmeds")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 499, Finished, Available)

Success


## FUMEDS

In [498]:
# load FUMEDS from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/FUMEDS.parquet"
fumeds = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    fumeds[col] = fumeds[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
fumeds['facility'] = pd.to_numeric(fumeds['facility'])

# Apply the function to the specified columns
fumeds['OtherID'] = fumeds['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
}

for col, fmt in date_columns.items():
    if fmt:
        fumeds[col] = pd.to_datetime(fumeds[col], format=fmt)
    else:
        fumeds[col] = pd.to_datetime(fumeds[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 500, Finished, Available)

Success


In [499]:
# load f_medid from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/F_MedID.parquet"
f_medid = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    f_medid[col] = f_medid[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
f_medid['facility'] = pd.to_numeric(f_medid['facility'])

# Apply the function to the specified columns
f_medid['OtherID'] = f_medid['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
    'ProcedureEndDateTime': '%Y-%m-%d %H:%M:%S'
}

for col, fmt in date_columns.items():
    if fmt:
        f_medid[col] = pd.to_datetime(f_medid[col], format=fmt)
    else:
        f_medid[col] = pd.to_datetime(f_medid[col])

#rename to align with v1.4 convention
f_medid = f_medid.rename(columns={
    'Low Molecular Wt Heparin': 'Low Molecular Weight Heparin',
    'Other P2Y12 Inhibitor': 'Other P2Y12'
})

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 501, Finished, Available)

Success


In [500]:
# Step 1: Melting the DataFrame to create 'F_MedID' & 'F_MedAdmin2'
melted_df = pd.melt(f_medid, 
                    id_vars=['NCDRPatientID', 'LastName', 'FirstName', 'MidName', 'OtherID',
                    'FURefArrivalDate', 'FU_RefDischargeDate', 'F_AssessmentDate', 'facility', 
                    'RefProcStartDateTime', 'ProcedureEndDateTime', 'Dose', 'Pat_ID'], 
                    value_vars=['Aggrenox', 'Aspirin (81 mg)', 'Aspirin (325 mg)', 'Aspirin', 'Dose',
                    'Aspirin/Dipyridamole', 'Durlaza', 'Vorapaxar', 'Cangrelor',
                    'Clopidogrel', 'Prasugrel', 'Ticlopidine', 'Ticagrelor',
                    'Other P2Y12', 'Warfarin', 'Apixaban', 'Dabigatran',
                    'Rivaroxaban', 'Edoxaban', 'Unfractionated Heparin', 'Fondaparinux',
                    'Low Molecular Weight Heparin', 'Heparin Derivative'], 
                    var_name='F_MedID', value_name='F_MedAdmin2')

#Step 2: Create F_MedDose2
# Using a lambda function to check if 'Aspirin' is in 'F_MedID', then clean up the text
melted_df['F_MedDose2'] = melted_df.apply(
    lambda row: re.sub(r'\(|\)', '', row['F_MedID']) 
    if 'Aspirin' in row['F_MedID'] 
    and pd.notna(row['F_MedAdmin2']) 
    and 'Yes' in str(row['F_MedAdmin2']) 
    and '(' in row['F_MedID'] and ')' in row['F_MedID']  # Check for both parentheses
    else np.nan, 
    axis=1
)

# Step for updating F_MedID for Aspirin entries
melted_df['F_MedID'] = melted_df.apply(
    lambda x: 'Aspirin' if 'Aspirin (' in x['F_MedID'] else x['F_MedID'], axis=1
)

# Replace "blank" values with NaN
melted_df.replace('', np.nan, inplace=True)  # Replace empty strings with np.nan
melted_df.replace(' ', np.nan, inplace=True)  # Replace spaces with np.nan if necessary

# Drop rows where 'F_MedAdmin2' is now NaN after replacements
melted_df = melted_df.dropna(subset=['F_MedAdmin2'])


print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 502, Finished, Available)

Success


In [501]:
#start by appending melted_df

# List of columns to append from 'melted_df'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'FURefArrivalDate', 
    'FU_RefDischargeDate', 
    'RefProcStartDateTime', 
    'F_AssessmentDate', 
    'F_MedID', 
    'F_MedAdmin2', 
    'F_MedDose2', 
    'facility'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = melted_df[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(fumeds.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'fumeds' DataFrame
new_data_to_append = new_data_to_append[fumeds.columns]

# Use concat instead of append
combined_df = pd.concat([fumeds, new_data_to_append], ignore_index=True)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 503, Finished, Available)

Success


In [502]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType

# Initialize SparkSession
spark = SparkSession.builder.appName("LaaoFumedsIngestion").getOrCreate()

# Define the schema explicitly for the laao_fumeds table
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("FURefArrivalDate", DateType(), True),
    StructField("FU_RefDischargeDate", DateType(), True),
    StructField("RefProcStartDateTime", TimestampType(), True),
    StructField("F_AssessmentDate", DateType(), True),
    StructField("F_MedID", StringType(), True),
    StructField("F_MedAdmin2", StringType(), True),
    StructField("F_MedDose2", StringType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
spark_df = spark.createDataFrame(combined_df, schema=schema)

spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_fumeds")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 504, Finished, Available)

Success


## FUEVENTS

In [503]:
# load fuevents from v1.4
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_4/Merged/FUEVENTS.parquet"
fuevents = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    fuevents[col] = fuevents[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
fuevents['facility'] = pd.to_numeric(fuevents['facility'])

# Apply the function to the specified columns
fuevents['OtherID'] = fuevents['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
    'FupEventDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        fuevents[col] = pd.to_datetime(fuevents[col], format=fmt)
    else:
        fuevents[col] = pd.to_datetime(fuevents[col])

print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 505, Finished, Available)

Success


In [504]:
# load f_eventid from v1.3
# Load data into pandas DataFrame from Parquet
file_path = "/lakehouse/default/Files/LAAO/Version_1_3/Merged/F_EventID.parquet"
f_eventid = pd.read_parquet(file_path)

# Ensure specified columns are treated as strings
for col in ['OtherID', 'NCDRPatientID']:
    f_eventid[col] = f_eventid[col].astype(str)

# Function to remove '.0' from string representations of numbers
def remove_decimal_point(value):
    if isinstance(value, str) and value.endswith('.0'):
        return value[:-2]  # Remove the last two characters '.0'
    return value

# Convert 'facility' to int
f_eventid['facility'] = pd.to_numeric(f_eventid['facility'])

# Apply the function to the specified columns
f_eventid['OtherID'] = f_eventid['OtherID'].apply(remove_decimal_point)

# Convert date columns to datetime format
date_columns = {
    'FURefArrivalDate': '%Y-%m-%d',
    'FU_RefDischargeDate': '%Y-%m-%d',
    'RefProcStartDateTime':'%Y-%m-%d %H:%M:%S',
    'F_AssessmentDate':'%Y-%m-%d',
    'FupEventDate': '%Y-%m-%d'
}

for col, fmt in date_columns.items():
    if fmt:
        f_eventid[col] = pd.to_datetime(f_eventid[col], format=fmt)
    else:
        f_eventid[col] = pd.to_datetime(f_eventid[col])


print("Success")

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 506, Finished, Available)

Success


In [505]:
#start by appending f_eventid

# List of columns to append from 'f_eventid'
columns_to_append = [
    'Pat_ID',
    'NCDRPatientID', 
    'LastName', 
    'FirstName', 
    'MidName', 
    'OtherID', 
    'FURefArrivalDate', 
    'FU_RefDischargeDate', 
    'RefProcStartDateTime', 
    'F_AssessmentDate', 
    'F_Event', 
    'FupEvOccurred', 
    'FupEventDate', 
    'facility'
]

# Create a new DataFrame with only the specified columns
new_data_to_append = f_eventid[columns_to_append].copy()

# Make sure that all other columns are filled with NaN
additional_columns = set(fuevents.columns) - set(new_data_to_append.columns)
for column in additional_columns:
    new_data_to_append[column] = np.nan

# Reorder the columns in the new data to match the 'fuevents' DataFrame
new_data_to_append = new_data_to_append[fuevents.columns]

# Use concat instead of append
combined_df = pd.concat([fuevents, new_data_to_append], ignore_index=True)

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 507, Finished, Available)

Success


In [506]:
# Initialize SparkSession
spark = SparkSession.builder.appName("IngestionIntoDeltaTable").getOrCreate()

# Define the schema explicitly for laao_fuevents
schema = StructType([
    StructField("NCDRPatientID", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("FirstName", StringType(), True),
    StructField("MidName", StringType(), True),
    StructField("OtherID", StringType(), True),
    StructField("FURefArrivalDate", DateType(), True),
    StructField("FU_RefDischargeDate", DateType(), True),
    StructField("RefProcStartDateTime", TimestampType(), True),
    StructField("F_AssessmentDate", DateType(), True),
    StructField("F_Event", StringType(), True),
    StructField("FupEvOccurred", StringType(), True),
    StructField("FupEventDate", DateType(), True),
    StructField("facility", IntegerType(), True),
    StructField("Pat_ID", StringType(), True),
])

# Convert the pandas DataFrame to a Spark DataFrame with the defined schema
spark_df = spark.createDataFrame(combined_df, schema=schema)

# Write the DataFrame to the Delta table
spark_df.write.format("delta").mode("overwrite").save("abfss://918a39aa-48f9-4458-8609-67a29814b9c8@onelake.dfs.fabric.microsoft.com/83d6ecf5-3497-4c8f-9baf-192fa2d057ab/Tables/laao_fuevents")

print('Success')

StatementMeta(, 981f704b-223d-4977-ab99-9eb6122ab346, 508, Finished, Available)

Success
