In [1]:
import polars as pl
import duckdb as db
import datetime
import os

# Ingestion Process
## Transforming Data
- Processess responsible for reading and cleaning data
    - Expects referral Excel files to turn into Apache Parquet Files

In [2]:
def clean_referralfile(file_path: str) -> pl.DataFrame:
    """Expects excel file type with full file path, returns a spark data frame"""
    
    df = pl.read_excel(file=file_path, xlsx2csv_options={"dateformat": "%Y-%m-%d"})
    df_renamed = df.rename(
        {
            'Referring':'Referring Provider'
            , 'Referring_duplicated_0': 'Referring Provider NPI'
            , 'Referral': 'Referral Date'
            , 'pat': 'pat Status'
            , 'Referred to': 'Referred to Specialist'
            , 'Specialist': 'Specialist NPI'
            , 'Visit': 'Visit Status'
            , 'Health': 'Health Plan'
        }
    )
    # Drop first row
    df_drop_rows = df_renamed[1:, :]
    # Create new column 'Update_DT' where value is seperated DT in the next row 
    df_fill = df_drop_rows.with_columns(Update_DT = df_drop_rows['Last Update'].shift(-1))
    # Add column for file_name
    df_name = df_fill.with_columns(file_source = pl.lit(file_path))
    # Create struct from list from str
    df_list = df_name.with_columns(pl.col('Diagnosis').str.split(',').arr.to_struct())
    df_expl = df_list.unnest('Diagnosis').rename({'field_0': 'Diagnosis'})
    # upper case and strip whitespace from text
    df_clean_text = df_expl.with_columns(
        (pl.col('Last Name').str.to_uppercase().str.strip())
        , (pl.col('First Name').str.to_uppercase().str.strip())
        , (pl.col('Last Update').str.to_uppercase().str.split(','))
        , (pl.col('Health Plan').str.to_uppercase().str.strip())
    )
    df_user = df_clean_text.with_columns((pl.col('Last Update').arr.last().str.strip()).alias("User_Fname"), (pl.col('Last Update').arr.first().str.strip()).alias("User_LName"))
    df_recomb_user = df_user.with_columns(pl.col('Last Update').arr.join(', '))
    # drop all null values now that we've cleaned the data
    df_clean = df_recomb_user.filter(~pl.all(pl.col('Center').is_null()))
    return df_clean

In [3]:
def write_parquet(spark_df: pl.DataFrame, file_path: str):
    """writes a spark dataframe to parquet file path"""

    spark_df.write_parquet(file=file_path)

In [4]:
def clean_write_file(inc_file_path: str, out_file_path):
    """wrapper function for cleaning and writing referral files to parquet"""

    clean = clean_referralfile(inc_file_path)
    write_parquet(clean, out_file_path)

In [5]:
def clean_file_names(in_path: str):
    """cleans the file names within 'data/raw_referrals/ dir"""
    
    for file_name in os.listdir(in_path):
        f_ext = file_name[-5:]
        f_name = file_name[:-5]
        f_name = f_name.upper().replace(' - ', '-').replace('TEXAS', 'TX').replace('.', '_')
        os.rename(in_path+file_name, in_path+f_name+f_ext)

In [6]:
def iter_clean_write_files(in_path: str = 'data/raw_referrals/excels/', out_path: str = 'data/clean_referrals/'):
    """iteratively cleans file names then writes to clean_referrals dir as parquet"""

    clean_file_names(in_path)

    for file_name in os.listdir(in_path):
        f_name = file_name[:-5]
        clean_write_file(in_path+file_name, out_path+f_name+'.parquet')

## Output Files

In [7]:
iter_clean_write_files('../data/raw_referrals/excels/', '../data/clean_referrals/')

# Data Transformation -> Reporting Data
## Creating Database & Tables
- Processes responsible for creating or replacing database tables

In [8]:
def query_commit(query: str):
    db.connect('../data/referral.db')
    db.sql(query=query)
    db.commit()

In [9]:
def create_team():
    with open('queries/create_tables/user_list.sql', 'r') as query:
        query_commit(query=query.read())

In [10]:
def create_coc():
    with open('queries/create_tables/coc.sql', 'r') as query:
        query_commit(query=query.read())

In [11]:
def create_hcpcs():
    with open('queries/create_tables/hcpcs.sql', 'r') as query:
        query_commit(query=query.read())

In [12]:
def create_icd10():
    with open('queries/create_tables/icd10cm.sql', 'r') as query:
        query_commit(query=query.read())

In [13]:
def create_referrals():
    with open('queries/create_tables/referrals.sql', 'r') as query:
        query_commit(query=query.read())

In [14]:
def transform_referrals():
    with open('queries/bi_referrals.sql', 'r') as query:
        query_commit(query=query.read())

In [15]:
def create_export():
    today = str(datetime.date.today())
    to_str = f"COPY (SELECT * FROM bi_referrals) TO '../data/report_data/{today}.parquet' (FORMAT 'parquet')"
    query_commit(query=to_str)

In [16]:
def initialize_tables():
    create_team()
    create_hcpcs()
    create_icd10()
    create_icd10()
    create_referrals()

In [17]:
def transform_export():
    transform_referrals()
    create_export()

In [18]:
def create_report_data():
    initialize_tables()
    transform_export()

In [19]:
create_report_data()

FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='100%'), style=ProgressStyle(bar_color='black'))