In [1]:
import os
import json
import certifi
import pandas as pd

import pymongo
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
from urllib.parse import quote_plus

In [2]:
load_dotenv()

mysql_args = {
    "uid" : "root",
    "pwd" : quote_plus(os.getenv("MYSQL_PWD")),
    "hostname" : "localhost",
    "dbname" : "healthcare_src"
}

mongodb_args = {
    "user_name" : os.getenv("MONGODB_USERNAME"),
    "password" : os.getenv("MONGODB_PWD"),
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "egia6sc",
    "cluster_location" : "atlas",
    "db_name" : "healthcare"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Populate MongoDB with Source Data

In [4]:
client = get_mongo_client(**mongodb_args)

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"doctors" : 'doctors.json',
              "clinics" : 'clinics.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

#### Extract Data from the Source MongoDB Collections Into DataFrames

In [5]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "doctors"

df_doctors = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_doctors

Unnamed: 0,id,first_name,last_name,specialty,department,years_experience,job_title
0,101,Alice,Patel,Cardiology,Cardiology,12,Attending Physician
1,102,Benjamin,Lee,Neurology,Neurology,9,Consultant
2,103,Carla,Gomez,Orthopedics,Orthopedics,15,Senior Surgeon
3,104,David,Nguyen,Cardiology,Cardiology,6,Resident
4,105,Emily,Chen,Pediatrics,Pediatrics,8,Pediatrician
5,106,Farah,Hassan,Dermatology,Dermatology,11,Consultant
6,107,George,Ibrahim,Family Medicine,Primary Care,5,General Practitioner
7,108,Hannah,Rogers,Endocrinology,Endocrinology,10,Endocrinologist


In [6]:
client = get_mongo_client(**mongodb_args)

query = {}
collection = "clinics"

df_clinics = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_clinics

Unnamed: 0,clinic_id,name,department,city,state
0,1,UVA Cardiology Center,Cardiology,Charlottesville,VA
1,2,UVA Neurology Center,Neurology,Charlottesville,VA
2,3,UVA Pediatrics,Pediatrics,Charlottesville,VA


#### Get Data from Date Dimension Table

In [7]:
sql_dim_date = "SELECT DateKey, DateValue FROM healthcare_mart.dimdate;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.DateValue = df_dim_date.DateValue.astype('datetime64[ns]').dt.date
df_dim_date.head()

Unnamed: 0,DateKey,DateValue
0,20250101,2025-01-01
1,20250102,2025-01-02
2,20250103,2025-01-03
3,20250104,2025-01-04
4,20250105,2025-01-05


#### Convert CSV to SQL Table

In [8]:
df_patients = pd.read_csv("data/patients.csv")
set_dataframe(
    df=df_patients,
    table_name="patients_src",
    pk_column="patient_id",
    db_operation="insert",
    **mysql_args
)

In [9]:
sql_patients = "SELECT * FROM healthcare_src.patients_src;"
df_dim_patients = get_sql_dataframe(sql_patients, **mysql_args)
df_dim_patients

Unnamed: 0,patient_id,first_name,last_name,gender,date_of_birth,city,state,insurance_type
0,201,Olivia,Johnson,Female,1985-03-12,Charlottesville,VA,BlueCross PPO
1,202,Liam,Smith,Male,1978-09-22,Richmond,VA,Aetna EPO
2,203,Emma,Williams,Female,1990-11-05,Norfolk,VA,UnitedHealth HMO
3,204,Noah,Brown,Male,1982-07-18,Roanoke,VA,BlueCross PPO
4,205,Ava,Jones,Female,1995-01-29,Charlottesville,VA,UnitedHealth HMO
5,206,Elijah,Garcia,Male,1988-04-09,Virginia Beach,VA,Aetna PPO
6,207,Sophia,Miller,Female,1993-02-15,Newport News,VA,BlueCross PPO
7,208,James,Davis,Male,1980-10-30,Harrisonburg,VA,Aetna EPO
8,209,Isabella,Martinez,Female,1998-06-12,Charlottesville,VA,UnitedHealth HMO
9,210,Lucas,Lopez,Male,1986-05-23,Fredericksburg,VA,BlueCross PPO


#### Populate Healthcare Data Mart

In [10]:
mysql_args_mart = dict(mysql_args)
mysql_args_mart["dbname"] = "healthcare_mart"

In [11]:
# dimdoctor (from MongoDB)
df_dim_doctor = df_doctors.rename(columns={"id": "doctor_id"})[["doctor_id", "first_name", "last_name", "specialty", "department", "years_experience", "job_title"]]
df_dim_doctor["is_senior"] = df_dim_doctor["years_experience"] >= 10
df_dim_doctor = df_dim_doctor[[
    "doctor_id", "first_name", "last_name", "specialty",
    "department", "years_experience", "is_senior", "job_title"
]]

set_dataframe(
    df=df_dim_doctor,
    table_name="dimdoctor",
    pk_column="doctor_id",          
    db_operation="insert",
    **mysql_args_mart
)

# dimclinic (from MongoDB)
df_dim_clinic = df_clinics[["clinic_id", "name", "department", "city", "state"]]
df_dim_clinic["location"] = df_dim_clinic["city"] + ", " + df_dim_clinic["state"] 
df_dim_clinic = df_dim_clinic[["clinic_id","name","department","location"]]

set_dataframe(
    df=df_dim_clinic,
    table_name="dimclinic",
    pk_column="clinic_id",
    db_operation="insert",
    **mysql_args_mart
)

# dimpatient
df_dim_patient = df_dim_patients[[
    "patient_id", "first_name", "last_name", "gender",
    "date_of_birth", "city", "state", "insurance_type"
]]
today = pd.to_datetime("today").normalize()
dob = pd.to_datetime(df_dim_patient["date_of_birth"], errors="coerce")
df_dim_patient["age"] = ((today - dob).dt.days // 365).astype("Int64")

df_dim_patient = df_dim_patient[["patient_id","first_name","last_name","gender",
                                 "age","city","state","insurance_type"]]

set_dataframe(
    df=df_dim_patient,
    table_name="dimpatient",
    pk_column="patient_id",
    db_operation="insert",
    **mysql_args_mart
)

#### Create Fact Appointment Table and Add to Data Mart

In [12]:
sql_appts = "SELECT * FROM healthcare_src.appointments_src;"
df_appts = get_sql_dataframe(sql_appts, **mysql_args)

df_appts["DateKey"] = pd.to_datetime(df_appts["appointment_ts"]).dt.strftime("%Y%m%d").astype(int)

sql_doc_keys = "SELECT doctor_id, department FROM healthcare_mart.dimdoctor;"
df_doc_dept = get_sql_dataframe(sql_doc_keys, **mysql_args_mart)

df_appts = df_appts.merge(df_doc_dept, on="doctor_id", how="left")

sql_clinics = "SELECT clinic_id, department FROM healthcare_mart.dimclinic;"
df_clinic_map = get_sql_dataframe(sql_clinics, **mysql_args_mart)

df_appts = df_appts.merge(df_clinic_map, on="department", how="left")

df_fact_final = df_appts[["DateKey", "doctor_id", "patient_id", "clinic_id", "duration_minutes", "cost_usd"]].copy()

df_fact_final.insert(0, "AppointmentKey", range(1, len(df_fact_final) + 1))

set_dataframe(
    df=df_fact_final,
    table_name="factappointment",
    pk_column="AppointmentKey",
    db_operation="insert",
    **mysql_args_mart
)

#### SQL SELECT Statements (Aggregation)

In [13]:
# total cost and average duration of appointments by department and month
query1 = """
SELECT c.department,
       d.MonthName,
       SUM(f.cost_usd) AS TotalCost,
       AVG(f.duration_minutes) AS AvgDuration
FROM healthcare_mart.FactAppointment f
JOIN healthcare_mart.DimClinic AS c ON f.clinic_id = c.clinic_id
JOIN healthcare_mart.dim_date AS d ON f.DateKey   = d.DateKey
GROUP BY c.department, d.MonthName
ORDER BY d.MonthName, c.department;
"""
df_1 = get_sql_dataframe(query1, **mysql_args_mart)
df_1

Unnamed: 0,department,MonthName,TotalCost,AvgDuration
0,Cardiology,April,260.0,60.0
1,Neurology,February,200.0,45.0
2,Cardiology,January,205.0,25.0
3,Neurology,March,95.0,25.0


In [14]:
# total revenue and number of appointments per doctor specialty and month
query2 = """
SELECT d.specialty,
       dt.MonthName,
       COUNT(f.AppointmentKey) AS NumAppointments,
       SUM(f.cost_usd) AS TotalRevenue
FROM healthcare_mart.factappointment AS f
JOIN healthcare_mart.dimdoctor AS d ON f.doctor_id = d.doctor_id
JOIN healthcare_mart.dim_date AS dt ON f.DateKey = dt.DateKey
GROUP BY d.specialty, dt.MonthName
ORDER BY dt.MonthName, TotalRevenue DESC;
"""
df_2 = get_sql_dataframe(query2, **mysql_args_mart)
df_2

Unnamed: 0,specialty,MonthName,NumAppointments,TotalRevenue
0,Cardiology,April,1,260.0
1,Neurology,February,1,200.0
2,Cardiology,January,2,205.0
3,Orthopedics,March,1,115.0
4,Neurology,March,1,95.0


In [15]:
# average appointment cost and duration by insurance type and department
query3 = """
SELECT p.insurance_type,
       c.department,
       ROUND(AVG(f.cost_usd), 2) AS AvgCost,
       ROUND(AVG(f.duration_minutes), 1) AS AvgDuration,
       COUNT(f.AppointmentKey) AS NumAppointments
FROM healthcare_mart.factappointment AS f
JOIN healthcare_mart.dimpatient AS p ON f.patient_id = p.patient_id
JOIN healthcare_mart.dimclinic AS c ON f.clinic_id = c.clinic_id
GROUP BY p.insurance_type, c.department
ORDER BY p.insurance_type, c.department;
"""
df_3 = get_sql_dataframe(query3, **mysql_args_mart)
df_3

Unnamed: 0,insurance_type,department,AvgCost,AvgDuration,NumAppointments
0,Aetna EPO,Cardiology,85.0,20.0,1
1,Aetna PPO,Cardiology,260.0,60.0,1
2,BlueCross PPO,Cardiology,120.0,30.0,1
3,UnitedHealth HMO,Neurology,147.5,35.0,2
