<a href="https://colab.research.google.com/github/victormurcia/VCHAMPS/blob/main/VCHAMPS_Final_Scripts.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Section 1. Libraries needed

In [None]:
#General utilities
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm  # Import tqdm for the progress bar
import glob,shutil,os,warnings,math,time,sys,re,math
import dask.dataframe as dd

#For Slider viz
import ipywidgets as widgets
from IPython.display import display, clear_output,HTML

#DataPrep for Quick EDA
#!pip install --q dataprep
#from dataprep.eda import create_report

#Enable data to be extracted and downloaded from my Google Drive
from google.colab import drive, files
drive.mount('/content/drive')

#For performing UTC normalization on datetime columns based on the STATE column
import pytz

#For UUID generation
import uuid

#For ML stuff
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_curve, roc_auc_score, confusion_matrix, accuracy_score, classification_report,auc,f1_score,recall_score,precision_score,make_scorer
from sklearn.model_selection import train_test_split, GridSearchCV,train_test_split
from xgboost import XGBClassifier
#!pip install eli5
#import eli5
#import shap

# Section 2. Load CSVs

In [None]:
def load_csvs(path2data: str) -> List[str]:
  """
  Load and return a list of CSV file paths from the specified directory.

  Args:
      path2data (str): The directory path containing the CSV files.

  Returns:
      List[str]: A list of CSV file paths.

  """
  csv_files = glob.glob(path2data + '/*.csv')
  return csv_files

def make_df_list(csv_files: List[str]) -> List[pd.DataFrame]:
  """
  Read CSV files from the provided list of file paths and return a list of DataFrames.

  Args:
      csv_files (List[str]): A list of CSV file paths.

  Returns:
      List[pd.DataFrame]: A list of DataFrames read from the CSV files.

  """
  df_list = []
  # Read the CSV file
  for csv in csv_files:
    df = pd.read_csv(csv)
    df_list.append(df)

  return df_list

def clean_filenames(csv_files: List[str]) -> List[str]:
  """
  Clean the file names by removing directory path and the .csv extension.

  Args:
      csv_files (List[str]): A list of CSV file paths.

  Returns:
      List[str]: A list of cleaned file names without directory path and file extension.

  """
  #Get list of file names without directory junk and remove .csv extension from name
  file_names = []

  for file_path in csv_files:
      file_name = os.path.basename(file_path)  # Get the file name with extension
      file_name = os.path.splitext(file_name)[0]  # Remove the file extension
      file_names.append(file_name)
  return file_names

# Specify the path to the desired directory
directory_path = r'D:\VCHAMPS Data\Quality Check'

# Change the current working directory to the desired directory
os.chdir(directory_path)

# Verify the current working directory
current_directory = os.getcwd()
print(f"Current working directory: {current_directory}")
#Define data location
path2data = r'D:\VCHAMPS Data\Quality Check'
#Load the .csv files into memory
csv_files  = load_csvs(path2data)
print(csv_files)
#Create list of dataframes from csvs
df_list_csv    = make_df_list(csv_files)
#Clean the names of .csv files
csv_file_names = clean_filenames(csv_files)

# Section 3. Conversion of CSVs to Parquet

In [None]:
def convert_csv_to_parquet(current_directory):
    # Create an empty DataFrame to store the file sizes
    file_sizes = pd.DataFrame(columns=['Filename', 'CSV Size', 'Parquet Size'])

    # Find all CSV files in the input directory
    csv_files = [file for file in os.listdir(current_directory) if file.endswith('.csv')]

    # Process each CSV file
    for i, csv_file in enumerate(csv_files):
        # Create the file paths
        input_file_path = os.path.join(current_directory, csv_file)
        output_file_name = f"{csv_file.replace('.csv', '')}.parquet"
        output_file_path = os.path.join(current_directory, output_file_name)

        # Get the size of the CSV file
        csv_file_size = os.path.getsize(input_file_path)

        # Append the file size to the DataFrame
        file_sizes = pd.concat([file_sizes, pd.DataFrame({
            'Filename': [csv_file.replace('.csv', '')],
            'CSV Size': [csv_file_size],
            'Parquet Size': [None]  # Initialize with None
        })], ignore_index=True)

        # Read the CSV file as a Dask DataFrame
        df = dd.read_csv(input_file_path, dtype={
            'Administered elsewhere': 'object',
            'Dose unit': 'object',
            'Result textual': 'object',
            'Administration end date': 'object',
            'Agentorangeflag': 'object',
            'Combatflag': 'object',
            'Ionizingradiationflag': 'object',
            'Swasiaconditionsflag': 'object',
            'Procedure code': 'object'
        })

        # Write the Dask DataFrame to Parquet file
        df.to_parquet(output_file_path, engine='pyarrow', write_index=False)

    # Find directories that end with .parquet
    parquet_directories = [directory for directory in os.listdir(current_directory) if os.path.isdir(os.path.join(current_directory, directory)) and directory.endswith('.parquet')]
    # Calculate total file sizes for each parquet directory
    parquet_file_sizes = []
    for directory in parquet_directories:
        parquet_dir_path = os.path.join(current_directory, directory)
        total_size = sum(os.path.getsize(os.path.join(parquet_dir_path, file)) for file in os.listdir(parquet_dir_path) if file.endswith('.parquet'))
        parquet_file_sizes.append(total_size)

    # Update the 'Parquet Size' column in file_sizes DataFrame
    file_sizes.loc[file_sizes['Filename'].isin([directory.replace('.parquet', '') for directory in parquet_directories]), 'Parquet Size'] = parquet_file_sizes

    print("CSV to Parquet conversion completed.")

    return file_sizes

def calculate_reduction(file_sizes):
    # Calculate % reduction
    file_sizes['%Reduction'] = ((file_sizes['CSV Size'] - file_sizes['Parquet Size']) / file_sizes['CSV Size']) * 100

    # Convert file sizes from bytes to megabytes (MB)
    file_sizes['CSV Size [MB]'] = file_sizes['CSV Size'] / (1024 * 1024)  # Convert bytes to megabytes
    file_sizes['Parquet Size [MB]'] = file_sizes['Parquet Size'] / (1024 * 1024)  # Convert bytes to megabytes

    # Rearrange the columns
    file_sizes = file_sizes[['Filename', 'CSV Size [MB]', 'Parquet Size [MB]', '%Reduction']]

    return file_sizes

# Get file sizes
file_sizes = convert_csv_to_parquet(current_directory)
file_sizes = calculate_reduction(file_sizes)

# Section 4. ICD Mapping Functions and Basic Data Clean Up

In [None]:
def read_icd_codes(path):
    icd_codes = pd.read_csv(path).drop('Unnamed: 0', axis = 1)
    return icd_codes

# Function to split semicolon separated diagnoses into a list of diagnoses
def split_diagnoses(diagnoses):
    return [dx.lower().replace(",", "").strip() for dx in diagnoses.split(';')]

def process_diagnoses_dataframes(dataframes,icd_codes):
    final_df_list = []
    for idx, df in enumerate(dataframes):
        # Get the columns containing diagnoses (columns with 'icd10' in their name)
        diagnosis_columns = [col for col in df.columns if 'icd10' in col.lower()]
        print(idx, diagnosis_columns)
        df.compute()

        # Create a new column with a single list of diagnoses
        for i in range(len(diagnosis_columns)):
            if i == 0:
              df['diagnosis'] = df[diagnosis_columns[i]].apply(split_diagnoses)
            else:
              df['diagnosis'] = df['diagnosis'] + df[diagnosis_columns[i]].apply(split_diagnoses)

        # Explode the columns of lists so each row has a single diagnosis
        df = df.explode('diagnosis')
        df.reset_index(drop = True)
        # Merge to icd_codes csv
        df = df.merge(icd_codes, left_on='diagnosis', right_on='description', how='left')
        df = df.drop(columns = diagnosis_columns + ['description'])
        final_df_list.append(df)
    return final_df_list

def process_diagnoses_dataframe(df, icd_codes):
    # Get the columns containing diagnoses (columns with 'icd10' in their name)
    diagnosis_columns = [col for col in df.columns if 'icd10' in col.lower()]
    print(diagnosis_columns)
    df.compute()

    # Create a new column with a single list of diagnoses
    for i in range(len(diagnosis_columns)):
        if i == 0:
            df['diagnosis'] = df[diagnosis_columns[i]].apply(split_diagnoses)
        else:
            df['diagnosis'] = df['diagnosis'] + df[diagnosis_columns[i]].apply(split_diagnoses)

    # Explode the columns of lists so each row has a single diagnosis
    df = df.explode('diagnosis')
    df.reset_index(drop=True)

    # Merge with icd_codes dataframe
    df = df.merge(icd_codes, left_on='diagnosis', right_on='description', how='left')
    df = df.drop(columns=diagnosis_columns + ['description'])

    return df



def load_parquet_files(folder_path):
    # Get a list of all files in the current directory
    all_files = [f.path for f in os.scandir(folder_path) if f.is_file()]

    # Get a list of all subdirectories (dataset folders) in the current directory
    dataset_folders = [f.path for f in os.scandir(folder_path) if f.is_dir()]

    # Initialize an empty list to store the DataFrames
    dataframes = []

    # Load Parquet files in the current directory
    for file in all_files:
        if file.endswith('.parquet'):
            df = dd.read_parquet(file, engine='pyarrow')
            dataframes.append(df)

    # Load Parquet files in the subfolders
    for folder in dataset_folders:
        df = dd.read_parquet(folder + '/**/*.parquet', engine='pyarrow')
        dataframes.append(df)

    return dataframes


# Specify the path to the desired directory
directory_path = r'/content/drive/MyDrive/VCHAMPS - Train Data -  UTC'

# Change the current working directory to the desired directory
os.chdir(directory_path)

# Verify the current working directory
cwd = os.getcwd()

print(f"Current working directory: {cwd}")
dataframes = load_parquet_files(cwd)

icd_codes = read_icd_codes('/content/drive/MyDrive/icd_codes_cc.csv')

def preprocess_dataframe(df, col_name):
    # Categorize a column
    df[col_name] = df[col_name].astype('category')
    df[col_name] = df[col_name].cat.as_known()  # Ensure known categories

    # Get dummies for a specific column
    dummies = dd.get_dummies(df[col_name])

    # Convert dummy columns to bool data type
    for col in dummies.columns:
        dummies[col] = dummies[col].astype(bool)

    # Concatenate the dummies with the original DataFrame
    df_with_dummies = dd.concat([df, dummies], axis=1)

    # Drop a column
    new_df = df_with_dummies.drop(col_name, axis=1)

    # Clear df and dummies from memory
    del df, dummies

    return new_df

#Clean Up and Preprocess Conditions
col_name = 'Condition type'
df = dataframes[0] #This needs to be the condition dataframe!!
conditions_df = preprocess_dataframe(df,col_name)
conditions_df = conditions_df.dropna(subset=['Diagnosis sequence number or rank'])
conditions_df = process_diagnoses_dataframe(conditions_df, icd_codes)
conditions_df = conditions_df.drop(columns=['diagnosis','billable'])
conditions_df = conditions_df.dropna(subset=['code','cc Status'])
# Save the Dask DataFrame as Parquet
conditions_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/conditions_train.parquet', engine='pyarrow')


#Clean Up and Preprocess Demographics Event
df = dataframes[1]
dem_event_df = preprocess_dataframe(df,'Marital status')
dem_event_df = preprocess_dataframe(dem_event_df,'Ruca category')
dem_event_df = dem_event_df.drop(columns=['Not specified (no value)','Not specified'])
# Save the Dask DataFrame as Parquet
dem_event_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/demographics_event_df.parquet', engine='pyarrow')

#Clean Up and Preprocess Demographics Static
dem_static_df = dataframes[2]
# Drop rows where 'Ethnicity' column contains 'Not specified'
dem_static_df = dem_static_df[dem_static_df['Ethnicity'] != 'Not specified']
dem_static_df = preprocess_dataframe(dem_static_df,'Gender')
# Drop rows where 'Races' column contains '(Censored)'
dem_static_df = dem_static_df[dem_static_df['Races'] != '(Censored)']
dem_static_df = preprocess_dataframe(dem_static_df,'Races')
# Define the value conversions
value_conversions = {'Yes': 1, 'No': 0}
# Convert specific values in the column
dem_static_df['Veteran flag'] = dem_static_df['Veteran flag'].map(value_conversions, na_action='ignore')
dem_static_df = dem_static_df.drop(columns=['Not specified (no value)'])
# Save the Dask DataFrame as Parquet
dem_static_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/demographics_static.parquet', engine='pyarrow')

# Cleaning Death
df = dataframes[3] #Death had no NaN values so I'll just keep it as is for now
# Save the Dask DataFrame as Parquet
df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/death.parquet', engine='pyarrow')

# Cleaning ED Visits
ed_visits_df = dataframes[4]
# Drop rows with NaN or NaT values from a specific column
ed_visits_df = ed_visits_df.dropna(subset=['Discharge date ed'])
ed_visits_df = process_diagnoses_dataframe(ed_visits_df, icd_codes)
ed_visits_df = ed_visits_df.drop(columns=['diagnosis','billable'])
ed_visits_df = ed_visits_df.dropna(subset=['code','cc Status'])
# Assuming ed_visits_df is your Dask DataFrame
ed_visits_df['Died during ed visit'] = ed_visits_df['Died during ed visit'].map({'Yes': 1, 'No': 0}, meta=('Died during ed visit'))
# Save the Dask DataFrame as Parquet
ed_visits_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/ed_visits.parquet', engine='pyarrow')

# Cleaning immunizations
df = dataframes[5]
df = df.drop(columns=['Dose quantity','Dose unit'])
df = df.dropna(subset=['Cvx code'])
df['Cvx code'] = df['Cvx code'].astype('int16')
df['Series doses'] = df['Series doses'].replace('Not specified (no value)', 'NS')
dummy_df  = pd.get_dummies(df['Series doses'], prefix='Series doses')
df = df.merge(dummy_df, left_index=True, right_index=True)
df = df.drop(columns=['Series doses'])
# Save the Dask DataFrame as Parquet
df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/immunizations.parquet', engine='pyarrow')

#Cleaning inpatient admissions DF
inpatient_df = dataframes[6]
inpatient_df = process_diagnoses_dataframe(inpatient_df, icd_codes)
inpatient_df['CV diagnosis'] = inpatient_df['CV diagnosis'].astype('int8')
inpatient_df = inpatient_df.drop(columns=['Serviceconnectedflag','billable'])
inpatient_df = inpatient_df.dropna(subset=['code','Outpatientreferralflag'])
mode = inpatient_df['Agentorangeflag'].mode().iloc[0]
inpatient_df['Agentorangeflag'] = inpatient_df['Agentorangeflag'].fillna(mode)
inpatient_df = inpatient_df.dropna(subset=['Discharge date'])

for col in ['Died during admission', 'Outpatientreferralflag', 'Agentorangeflag']:
  inpatient_df = inpatient_df[~((inpatient_df[col].isna()) | (inpatient_df[col] == np.inf) | (inpatient_df[col] == -np.inf))]

mapping = {'No': 0, 'Yes': 1}
for col in ['Outpatientreferralflag', 'Agentorangeflag']:
    print(col)
    inpatient_df[col] = inpatient_df[col].map(mapping).astype('int8')

dummy_df  = pd.get_dummies(inpatient_df['cc Status'], prefix='cc Status')
inpatient_df = inpatient_df.merge(dummy_df, left_index=True, right_index=True)
inpatient_df = inpatient_df.drop(columns=['cc Status'])
inpatient_df = inpatient_df[~((inpatient_df['Admitting unit service'] == '(Censored)') | (inpatient_df['Admitting unit service'] == 'Not specified'))]
inpatient_df = inpatient_df[~((inpatient_df['Discharging unit service'] == '(Censored)') | (inpatient_df['Discharging unit service'] == 'Not specified') | (inpatient_df['Discharging unit service'] == 'Not specified (no value)'))]
inpatient_df = inpatient_df[~(inpatient_df['Discharge disposition'] == 'Not specified (no value)')]
dummy_df  = pd.get_dummies(inpatient_df['Discharge disposition'], prefix='Discharge disposition')
inpatient_df = inpatient_df.merge(dummy_df, left_index=True, right_index=True)
inpatient_df = inpatient_df.drop(columns=['Discharge disposition'])
# Save the Dask DataFrame as Parquet
inpatient_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/inpatient_admissions.parquet', engine='pyarrow')

# Cleaning Inpatient Location
inpatient_location_df = dataframes[7]
inpatient_location_df = inpatient_location_df.dropna(subset=['Location end date'])
inpatient_location_df = inpatient_location_df[~((inpatient_location_df['Service'] == '(Censored)') | (inpatient_location_df['Service'] == 'Not specified') | (inpatient_location_df['Service'] == 'Not specified (no value)'))]

mapping = {'No': 0, 'Yes': 1}
inpatient_location_df['Died at location'] = inpatient_location_df['Died at location'].map(mapping).astype('int8')
# Save the Dask DataFrame as Parquet
inpatient_location_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/inpatient_location.parquet', engine='pyarrow')

# Cleaning Inpatient Specialty
inpatient_specialty_df = dataframes[8]
inpatient_specialty_df = inpatient_specialty_df.dropna(subset=['Specialty end date'])
inpatient_specialty_df = inpatient_specialty_df[~((inpatient_specialty_df['Specialty'] == '(Censored)') | (inpatient_specialty_df['Specialty'] == 'Not specified') | (inpatient_specialty_df['Specialty'] == 'Not specified (no value)'))]
# Save the Dask DataFrame as Parquet
inpatient_specialty_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/inpatient_specialty.parquet', engine='

# Cleaning for Measurements DF
measurements_df = dataframes[9]
columns_list = ["Age at measurement", "Measurement date", "Measurement", "Result numeric"]
measurements_df = measurements_df.dropna(subset = columns_list)
measurements_df = measurements_df.drop(columns=['Result textual'])
measurements_df = dd.from_pandas(measurements_df, npartitions=100)
measurements_df['Measurement'] = measurements_df['Measurement'].astype('category')
measurements_df['Measurement'] = measurements_df['Measurement'].cat.as_known()
dummy_df = dd.get_dummies(measurements_df['Measurement'], prefix='Measurement')
measurements_df = measurements_df.merge(dummy_df, left_index=True, right_index=True)
measurements_df = measurements_df.drop(columns=['Measurement'])
# Save the Dask DataFrame as Parquet
measurements_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/measurements.parquet', engine='pyarrow')

# Cleaning Lab Results DF
lab_harm_df = pd.read_csv('/content/drive/MyDrive/labHarmonization.csv')
filtered_lab_harm_df = lab_harm_df[lab_harm_df['concept'].notna()].sort_values('sd', ascending=False).drop(columns=['Unnamed: 0']).reset_index(drop=True)
labs_to_keep = filtered_lab_harm_df['desc'].tolist()
value_counts = filtered_lab_harm_df['unit'].value_counts()
values_with_count_1 = value_counts[value_counts == 1]
units_to_remove_list = values_with_count_1.index.tolist()

lab_results_df = dd.read_parquet('/content/drive/MyDrive/VCHAMPS - Train Data -  UTC/lab_results_train.parquet/*.parquet')
columns_list = ["Age at lab test", "Lab test date", "Lab test", "Result numeric",'Result units']
lab_results_df = lab_results_df.dropna(subset = columns_list)
filtered_lab_results_df = lab_results_df[lab_results_df['Lab test description'].isin(labs_to_keep)]
filtered_lab_results_df = filtered_lab_results_df.reset_index(drop=True)
dask_df = dd.from_pandas(filtered_lab_harm_df, npartitions=2)
merged_df = dd.merge(filtered_lab_results_df, dask_df, left_on='Lab test description', right_on='desc')
columns_to_convert = ['Count', 'Mean', 'sd', 'check', 'k', 'na', 'wbc', 'bicarb', 'pco2', 'hct', 'cr', 'hgb', 'lactate', 'ldh', 'ph', 'gfr', 'ast', 'alt', 'inr', 'a1c', 'ferritin', 'tropi', 'tropt', 'trophs', 'bnp', 'ntprobnp', 'tbili', 'ddimer', 'esr', 'crp', 'hscrp', 'meth_sc', 'meth_lvl', 'methadone_sc', 'methadone_lvl', 'cocaine_sc', 'cocaine_lvl']
for column in columns_to_convert:
    dask_df[column] = dask_df[column].astype('int8')
merged_df['Mean'] = merged_df['Mean'].round(3)
merged_df['sd'] = merged_df['sd'].round(3)
merged_df = merged_df.dropna(subset = ['Result units','Result range'])
split_columns = merged_df['Result range'].str.split(' - ')
merged_df['range_min'] = split_columns.str[0]
merged_df['range_max'] = split_columns.str[1]
merged_df['range_min'] = merged_df['range_min'].astype(float)
merged_df['range_max'] = merged_df['range_max'].astype(float)
merged_df = merged_df.drop(columns = ['Result range'])
merged_df = merged_df.drop(columns = ['Lab test description','Result units','check','Count'])
columns_to_drop = ['k', 'na', 'wbc', 'bicarb', 'pco2', 'hct', 'cr', 'hgb', 'lactate', 'ldh', 'ph', 'gfr', 'ast', 'alt', 'inr', 'a1c', 'ferritin', 'tropi', 'tropt', 'trophs', 'bnp', 'ntprobnp', 'tbili', 'ddimer', 'esr', 'crp', 'hscrp', 'meth_sc', 'meth_lvl', 'methadone_sc', 'methadone_lvl', 'cocaine_sc', 'cocaine_lvl']
merged_df = merged_df.drop(columns = columns_to_drop)
merged_df = merged_df.dropna()
merged_df = merged_df[(merged_df['Result numeric'] >= merged_df['Mean'] - 2 * merged_df['sd']) | (merged_df['Result numeric'] <= merged_df['Mean'])
merged_df = merged_df.drop(columns = ['Mean','sd'])
def remove_rows_with_values(df, column_name, values_list):
    filtered_df = df[~df[column_name].isin(values_list)]
    return filtered_df
cleaned_df = remove_rows_with_values(merged_df, 'unit', units_to_remove_list)
cleaned_df = cleaned_df.repartition(npartitions=100)
cleaned_df['concept'] = cleaned_df['concept'].astype('category')
cleaned_df['concept'] = cleaned_df['concept'].cat.as_known()
# Save the Dask DataFrame as Parquet
cleaned_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/lab_results.parquet', engine='pyarrow', row_group_size=100000, write_index=False)

#Cleaning Measurements BP DF
measurements_bp_df = dd.read_parquet('/content/drive/MyDrive/VCHAMPS - Train Data -  UTC/measurements_blood_pressure_train.parquet/*.parquet')
measurements_bp_df = measurements_bp_df.dropna()
measurements_bp_df = measurements_bp_df.compute()
measurements_bp_df['Diastolic bp'] = measurements_bp_df['Diastolic bp'].astype('int16')
measurements_bp_df['Systolic bp'] = measurements_bp_df['Systolic bp'].astype('int16')
# Save the Dask DataFrame as Parquet
measurements_bp_df.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/measurements_bp.parquet', engine='pyarrow')

# Cleaning Medication Administered DF
medications_administered_df = dd.read_parquet('/content/drive/MyDrive/VCHAMPS - Train Data -  UTC/medications_administered_train.parquet/*.parquet')
medications_administered_df = medications_administered_df.compute()
medications_administered_df = medications_administered_df[~(medications_administered_df['Administration status'] == 'Not specified (no value)')]
dose_form_list = medications_administered_df['Dose form'].value_counts()
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose form'] == 'miscellaneous') | (medications_administered_df['Dose form'] == '*unknown at this time*'))]
medications_administered_df = medications_administered_df.dropna(subset = ['Dose administered','Dose unit administered'])
medications_administered_df = medications_administered_df[~((medications_administered_df['Administered medication atc 5'] == '(Censored)'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Administered medication atc 5'] == 'Not specified'))]

magd = pd.read_csv('/content/drive/MyDrive/medications_administered_grouped_data.csv')
dose_unit_admin_list = medications_administered_df['Dose unit administered'].value_counts()
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == '.'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'as ordered'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'moderate amount'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'small amt'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'mod amt'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'as directed'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'ad'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'liberal amount'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'moderate'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'per order'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'liberally'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'ok'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'ao'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'lib amt'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'liberal'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'sm amount'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'a small amount'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'suff'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'some'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'as order'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'as ord'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'a'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'ade'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'dose'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'as rx'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'mod. amt.'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'suff amount'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'smamt'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'as'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'p'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'small amnt'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'adequate amount'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'moderate amt.'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'sm amnt'))]
medications_administered_df = medications_administered_df[~((medications_administered_df['Dose unit administered'] == 'aso'))]
medications_administered_df['Administration status'] = medications_administered_df['Administration status'].astype('category')
medications_administered_ddf  = dd.from_pandas(medications_administered_df, npartitions=100)
# Save the Dask DataFrame as Parquet
medications_administered_ddf.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/medications_administered.parquet', engine='pyarrow')

# Cleaning Medication Ordered DF
medications_ordered_df = dd.read_parquet('/content/drive/MyDrive/VCHAMPS - Train Data -  UTC/medications_ordered_train.parquet/*.parquet')
medications_ordered_df = medications_ordered_df.compute()
medications_ordered_df['Order status'] = medications_ordered_df['Order status'].astype('category')
medications_ordered_df = medications_ordered_df[~((medications_ordered_df['Ordered medication atc 5'] == '(Censored)'))]
medications_ordered_df = medications_ordered_df[~((medications_ordered_df['Ordered medication atc 5'] == 'Not specified'))]
medications_ordered_ddf = dd.from_pandas(medications_ordered_df, npartitions=100)
# Save the Dask DataFrame as Parquet
medications_ordered_ddf.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/medications_ordered.parquet', engine='pyarrow')

# Cleaning outpatient visits
outpatient_visits_df = dd.read_parquet('/content/drive/MyDrive/VCHAMPS - Train Data -  UTC/outpatient_visits_train.parquet/*.parquet')
cols_to_convert = ['Agentorangeflag','Combatflag','Ionizingradiationflag','Serviceconnectedflag','Swasiaconditionsflag']
for col in cols_to_convert:
  outpatient_visits_df[col] = outpatient_visits_df[col].astype('category')
  outpatient_visits_df[col] = outpatient_visits_df[col].cat.as_known()
outpatient_visits_df = process_diagnoses_dataframe(outpatient_visits_df, icd_codes)
outpatient_visits_df = outpatient_visits_df.drop(columns = ['billable'])
outpatient_visits_df = outpatient_visits_df.dropna(subset = ['code','diagnosis'])
outpatient_visits_df = outpatient_visits_df.compute()
outpatient_visits_df = outpatient_visits_df[~((outpatient_visits_df['Stop code'] == '(Censored)'))]
outpatient_visits_df = outpatient_visits_df[~((outpatient_visits_df['Stop code'] == '100 NOT USED'))]
outpatient_visits_df = outpatient_visits_df[~((outpatient_visits_df['Stop code'] == 'Not specified (no value)'))]
outpatient_visits_df['cc Status'] = outpatient_visits_df['cc Status'].astype('category')
outpatient_visits_ddf = dd.from_pandas(outpatient_visits_df, npartitions=80)
# Save the Dask DataFrame as Parquet
outpatient_visits_ddf.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/outpatient_visits.parquet', engine='pyarrow')

# Cleaning Up Procedures DF
procedures_df = dd.read_parquet('/content/drive/MyDrive/VCHAMPS - Train Data -  UTC/procedures_train.parquet/*.parquet')
procedures_df = procedures_df.compute()
procedures_df = procedures_df.dropna()
procedures_ddf = dd.from_pandas(procedures_df, npartitions=80)
# Save the Dask DataFrame as Parquet
procedures_ddf.to_parquet('/content/drive/MyDrive/VCHAMPS - Train Cleaned/procedures.parquet', engine='pyarrow')

# Section 5. Encounter Mapping

In [None]:
def map_encounter_id_vectorized(row, age_col, date_col):
    """
    Maps the encounter ID for a given row based on matching criteria in different dataframes.

    Args:
        row (pandas.Series): The row containing the data to be matched.
        age_col (str): The column name for the patient's age in the row.
        date_col (str): The column name for the date to match in the row.

    Returns:
        str: The matched encounter ID if found, or a newly generated UUID if no match is found.

    """
    patient_id = row['Internalpatientid']
    patient_age = row[age_col]
    date_to_match = row[date_col]

    filtered_ed_visits = ed_visits_df[ed_visits_df['Internalpatientid'] == patient_id]
    ed_visit_match = (filtered_ed_visits['Ed visit start date'] <= date_to_match) & (filtered_ed_visits['Discharge date ed'] >= date_to_match) & (filtered_ed_visits['Age at ed visit'] <= patient_age)
    if ed_visit_match.any():
        return filtered_ed_visits.loc[ed_visit_match, 'Encounter ID'].iloc[0]

    filtered_inpatient_admissions = inpatient_admissions_df[inpatient_admissions_df['Internalpatientid'] == patient_id]
    inpatient_match = (filtered_inpatient_admissions['Admission date'] <= date_to_match) & (filtered_inpatient_admissions['Discharge date'] >= date_to_match) & (filtered_inpatient_admissions['Age at admission'] <= patient_age)
    if inpatient_match.any():
        return filtered_inpatient_admissions.loc[inpatient_match, 'Encounter ID'].iloc[0]

    filtered_outpatient_visits = outpatient_visits_df[outpatient_visits_df['Internalpatientid'] == patient_id]
    outpatient_match = (filtered_outpatient_visits['Visit start date'] <= date_to_match) & (filtered_outpatient_visits['Visit End Date'] >= date_to_match) & (filtered_outpatient_visits['Age at visit'] <= patient_age)
    if outpatient_match.any():
        return filtered_outpatient_visits.loc[outpatient_match, 'Encounter ID'].iloc[0]

    return str(uuid.uuid4())

def process_and_save_chunks(df, save_path,df_name,age_col,date_col):
    """
    Process the input DataFrame (df) in chunks, calculate encounter IDs,
    and save the results to Parquet files.

    Parameters:
        df (pandas.DataFrame): The DataFrame containing the data to process.
        save_path (str): The path to save the Parquet files.

    Returns:
        None
    """
    # Define the chunk size
    chunk_size = 100000

    # Calculate the number of chunks
    num_chunks = math.ceil(len(df) / chunk_size)

    # Create an empty list to store the encounter IDs
    encounter_ids = []

    # Iterate over chunks
    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size

        # Get the chunk of dataframe
        chunk_df = df[start_idx:end_idx]

        # Process the chunk and track progress using tqdm
        for _, row in tqdm(chunk_df.iterrows(), total=chunk_df.shape[0], desc=f"Processing Chunk {i+1}/{num_chunks}"):
            encounter_id = map_encounter_id_vectorized(row, age_col, date_col)
            encounter_ids.append(encounter_id)

        # Create a new DataFrame with the chunk results
        chunk_results_df = chunk_df.copy()
        chunk_results_df['Encounter ID'] = encounter_ids[start_idx:end_idx]

        # Save the results of the chunk to Parquet file
        chunk_results_df.to_parquet(f'{save_path}/{df_name}{i+1}.parquet', index=False)

# Specify the path to the desired directory
directory_path = r'/content/drive/MyDrive/VCHAMPS - Train Cleaned'

# Change the current working directory to the desired directory
os.chdir(directory_path)

# Verify the current working directory
cwd = os.getcwd()

print(f"Current working directory: {cwd}")

mapped_dir = '/content/drive/MyDrive/VCHAMPS - Train Cleaned-Mapped'
#Load the Dataframes
ed_visits_df            = dd.read_parquet(mapped_dir + '/ed_visits.parquet')
inpatient_admissions_df = dd.read_parquet(mapped_dir + '/inpatient_admissions.parquet')
outpatient_visits_df    = dd.read_parquet(mapped_dir + '/outpatient_visits.parquet')

ed_visits_df = ed_visits_df.compute()
inpatient_admissions_df = inpatient_admissions_df.compute()
outpatient_visits_df = outpatient_visits_df.compute()

#Map Lab results
lab_results_df = dd.read_parquet(directory_path + '/lab_results.parquet/*.parquet')
lab_results_df = lab_results_df.compute()
lab_results_df = lab_results_df.reset_index(drop=True)
save_path = '/content/drive/MyDrive/VCHAMPS - Train Cleaned-Mapped/lab_results'
process_and_save_chunks(lab_results_df, save_path,df_name,'Age at lab test', 'Lab test date')

#Map conditions
conditions_df = dd.read_parquet(directory_path + '/conditions.parquet/*.parquet')
conditions_df = conditions_df.compute()
conditions_df = conditions_df.reset_index(drop=True)
save_path = '/content/drive/MyDrive/VCHAMPS - Train Cleaned-Mapped/conditions'
process_and_save_chunks(conditions_df, save_path,df_name,'Age at condition', 'Condition documented date')

#Map conditions
demographics_event_df = dd.read_parquet(directory_path + '/conditions.parquet/*.parquet')
demographics_event_df = demographics_event_df.compute()
demographics_event_df = demographics_event_df.reset_index(drop=True)
save_path = '/content/drive/MyDrive/VCHAMPS - Train Cleaned-Mapped/conditions'
process_and_save_chunks(demographics_event_df, save_path,df_name,'Age at event', 'Event date')