In [48]:
import json

data_directory = '/home/ehan/Documents/Code/Data-4999/BAC@MC 2024 Phase One Datasets/'
with open(f'{data_directory}data-dictionary.json', 'r') as file:
   data_dictionary = json.load(file)

In [49]:
def add_zero(prov_id):

    if len(prov_id) < 6:
        return '0' + prov_id
    
    return prov_id

In [50]:
def clean_rows(dataframe, folder_name):
    """
    Clean DataFrame rows to remove observations where there are mostly NULL values.

    Parameters:
        dataframe (pandas.DataFrame): The DataFrame with rows to be cleand.
        folder_name (str): The name of the folder which indicates what columns will be in this DataFrame.
        
    Returns:
        pandas.DataFrame: DataFrame with cleaned rows.
    """
    df = dataframe

    match folder_name:

        case 'CostReports':
            # Drop rows where both fiscal_start and loc_type are NA
            df = df.dropna(subset=['fiscal_start', 'loc_type'], how='all')
            
        case 'CovidVax':
            # Drop rows where any observation in column 'pct_residents_primary_vax' is equal to 'not_available'
            df = df[df['pct_residents_primary_vax'] != 'not available']

        case 'HealthDeficiencies':
            # Filter rows where the observation in column B has a length of 1
            defstat_index = df.columns.get_loc('defstat')
            mask = df['defstat'].str.len() == 1
            
            # Create a list of columns to shift
            columns_to_shift = df.columns[defstat_index+1:].tolist()

            # Convert values to object dtype before assigning them
            shifted_values = df.loc[mask, columns_to_shift[1:]].astype(int).values

            # Shift observations 1 to the left for rows where condition is met
            df.loc[mask, columns_to_shift[:-1]] = shifted_values

            # Drop the last column for rows where condition is met
            df.loc[mask, columns_to_shift[-1]] = None

    return df

In [51]:
import re

def clean_dataframe(dataframe, folder_name):
    """
    Clean column names of a DataFrame using a lookup dictionary with regex patterns.
    
    Parameters:
        dataframe (pandas.DataFrame): The DataFrame with column names to be cleaned.
        folder_name (str): The name of the folder which is being concatenated in the extract_transform function
        
    
    Returns:
        pandas.DataFrame: DataFrame with cleaned column names.
    """
    # Iterate over each column name
    for column_name in dataframe.columns:
        # Iterate over each regex pattern in the lookup dictionary
        for replacement, pattern in data_dictionary[folder_name].items():
            

            # If the regex pattern matches the column name
            if re.search(pattern, column_name.lower().strip()):

                # Replace the column name with the replacement string
                dataframe.rename(columns={column_name: replacement}, inplace=True)

                break
        
        else:

            # Rename columns to drop for items not in the data dictionary
            dataframe.rename(columns={column_name: 'DROPCOLUMN'}, inplace=True)

    # Replace column names for 2021 CovidVax
    if folder_name == 'CovidVax':
        current_names = ['residents_2021','staff_2021']
        new_names = ['pct_residents_primary_vax', 'pct_staff_primary_vax']
    
        for current_name, new_name in zip(current_names, new_names):
            if current_name in dataframe.columns:
                dataframe = dataframe.rename(columns={current_name: new_name})

    # Drop columns with all NaN values
    df = dataframe.dropna(axis=1, how='all')

    # Clean rows
    df = clean_rows(df, folder_name)

    # Apply zeros for prov_id with less than 6 characters
    df['prov_id'] = df['prov_id'].astype(str).apply(add_zero)

    # Drop marked columns
    df = df.drop(columns=[col for col in df.columns if col == 'DROPCOLUMN'])
    
    return df

In [52]:
import os
import pandas as pd

def extract_transform(folder_path):
    """
    Read folder contents for subdirectories containing .csv files. Read files ,csv into pandas DataFrames and then concatenate into master DataFrame by directory.
    
    Parameters:
        folder_path (str): Path to directory containing sub directories with .csv files
    
    Returns:
        List: List of pandas DataFrames stacked by directory
    """
    #Initialize an empty dict to store the DataFrames
    joined_dfs = {}

    # Iterate over all folders in the directory
    for folder_name in os.listdir(folder_path):
        folder_full_path = os.path.join(folder_path, folder_name)
        

        #Check if the current item in directory is a folder
        if os.path.isdir(folder_full_path):
            # Initialize empty list to store DataFrames
            dfs = []

            # Iterate over every each CSV file in the folder
            for file_name in os.listdir(folder_full_path):
                
                if file_name.endswith('.csv') :
                    file_full_path = os.path.join(folder_full_path, file_name)

                    # Read the CSV file into a DataFrame, use latin1 encoding
                    df = pd.read_csv(file_full_path, encoding= 'latin1', low_memory= False)

                    # Rename columns using clean columns function
                    df = clean_dataframe(df, folder_name)
                    
                    # Append the DataFrame to the list
                    dfs.append(df)     

            # Stack all the dfs and convert strings to lowercase
            folder_df = pd.concat(dfs, ignore_index= False).map(lambda x: x.lower() if isinstance(x, str) else x).reset_index()

            # Store all folder_dfs in a dict
            joined_dfs[folder_name] = folder_df
    
    return joined_dfs
        

In [53]:
def upload_dataframe_to_mysql(df, table_name, engine, chunk_size= 50000):
    try:
        # Drop the 'index' column if it exists
        df = df.drop(columns=['index'])
    except KeyError:
        pass  # 'index' column does not exist, continue without dropping

    # Calculate number of chunks
    num_chunks = len(df) // chunk_size + (1 if len(df) % chunk_size > 0 else 0)

    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(df))
        df_chunk = df.iloc[start_idx:end_idx]

        try:
            # Upload chunk to MySQL table
            df_chunk.to_sql(table_name, con=engine, if_exists='append', index=False)
            print(f'Chunk {i+1}/{num_chunks} uploaded successfully to MySQL table: {table_name}')
        except Exception as e:
            print(f'Error uploading chunk {i+1}/{num_chunks} to MySQL table: {e}')

In [54]:
import mysql.connector
from sqlalchemy import create_engine

# MySQL connection parameters
host= 'localhost'
user= 'josh'
password= 'go$T4GS'
database= 'data_4999'

# Create MySQL connection
engine = create_engine(f"mysql+mysqlconnector://{user}:{password}@{host}/{database}")

# Run the stacking functions
joined_dfs = extract_transform(data_directory)

# Iterate over the dictionary of DataFrames and upload each DataFrame to MySQL
for table_name, df in joined_dfs.items():
    upload_dataframe_to_mysql(df, table_name, engine)

# Close the connection
engine.dispose()

      prov_id                                    prov_name  \
0      015009                     BURNS NURSING HOME, INC.   
1      015010                COOSA VALLEY NURSING FACILITY   
2      015012                   HIGHLANDS HEALTH AND REHAB   
3      015014  EASTVIEW REHABILITATION & HEALTHCARE CENTER   
4      015015                PLANTATION MANOR NURSING HOME   
...       ...                                          ...   
15646  676393                      LAS VENTANAS DE SOCORRO   
15647  676394        WOODLANDS PLACE REHABILITATION SUITES   
15648  676395               THE HEALTHCARE RESORT OF PLANO   
15649  676396                       S.P.J.S.T. REST HOME 3   
15650  676398                        THE RIO AT FOX HOLLOW   

                                        address          city state    zip  \
0                          701 MONROE STREET NW  RUSSELLVILLE    AL  35653   
1                       315 WEST HICKORY STREET     SYLACAUGA    AL  35150   
2                    