## DS-2002 Project 2: Healthcare Data Lakehouse with PySpark

### Overview
This notebook implements a multi-source data integration pipeline using PySpark and the Bronze-Silver-Gold medallion architecture. The pipeline processes healthcare appointment data from multiple sources and creates a dimensional data mart optimized for analytical queries.

### Data Sources
1. **MongoDB Atlas** - Doctors and Clinics (NoSQL)
2. **MySQL** - Patients and Date Dimension (Relational)
3. **JSON Files** - Appointments in 3 batches (File System)

### Architecture
- **Bronze Layer:** Raw appointment data from JSON batches
- **Silver Layer:** Enriched appointments joined with dimension tables
- **Gold Layer:** Final fact table with surrogate keys

### Technologies
- PySpark for distributed data processing
- MongoDB for NoSQL data storage
- MySQL for dimensional data mart
- Pandas for data manipulation
- SQLAlchemy for database connections

### Business Value
The data mart enables analysis of:
- Revenue and appointment trends by department
- Doctor performance metrics
- Monthly appointment patterns

#### Setup: Create Directory Structure and Split Appointments into Batches

In [1]:
import os

base_dir = os.path.join(os.getcwd(), 'project_data')
data_dir = os.path.join(base_dir, 'healthcare')
stream_dir = os.path.join(data_dir, 'streaming')
appointments_stream_dir = os.path.join(stream_dir, 'appointments')

os.makedirs(appointments_stream_dir, exist_ok=True)

In [2]:
# split appointments into 3 JSON batches
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
from urllib.parse import quote_plus

load_dotenv()

mysql_args = {
    "uid": "root",
    "pwd": quote_plus(os.getenv("MYSQL_PWD")),
    "hostname": "localhost",
    "dbname": "healthcare_src"
}

def get_sql_dataframe(sql_query, **args):
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()
    return dframe

sql_appts = "SELECT * FROM healthcare_src.appointments_src;"
df_appointments = get_sql_dataframe(sql_appts, **mysql_args)

batch_1 = df_appointments.iloc[0:2].copy()
batch_2 = df_appointments.iloc[2:4].copy()
batch_3 = df_appointments.iloc[4:6].copy()

for batch in [batch_1, batch_2, batch_3]:
    batch['appointment_ts'] = batch['appointment_ts'].astype(str)

batch_1.to_json(os.path.join(appointments_stream_dir, 'appointments_batch_1.json'), orient='records', lines=True)
batch_2.to_json(os.path.join(appointments_stream_dir, 'appointments_batch_2.json'), orient='records', lines=True)
batch_3.to_json(os.path.join(appointments_stream_dir, 'appointments_batch_3.json'), orient='records', lines=True)

#### Import Libraries and Configure PySpark Session

In [3]:
import findspark
findspark.init()
print(findspark.find())

import certifi
import pymongo
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

C:\spark-4.5.4-bin-hadoop3\spark-3.5.4-bin-hadoop3


In [4]:
# set configuration variables
mongodb_args = {
    "user_name": os.getenv("MONGODB_USERNAME"),
    "password": os.getenv("MONGODB_PWD"),
    "cluster_name": "Cluster0",
    "cluster_subnet": "egia6sc",
    "cluster_location": "atlas",
    "db_name": "healthcare"
}

mysql_mart_conn_args = {
    "uid": "root",
    "pwd": quote_plus(os.getenv("MYSQL_PWD")),
    "hostname": "localhost",
    "dbname": "healthcare_mart"
}

dest_database = "healthcare_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

appointments_output_bronze = os.path.join(database_dir, 'fact_appointments', 'bronze')
appointments_output_silver = os.path.join(database_dir, 'fact_appointments', 'silver')
appointments_output_gold = os.path.join(database_dir, 'fact_appointments', 'gold')

In [5]:
# create Spark Session
worker_threads = f"local[{int(os.cpu_count()/2)}]"
shuffle_partitions = int(os.cpu_count())

spark = SparkSession.builder \
    .appName('Healthcare Streaming Data Lakehouse') \
    .master(worker_threads) \
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '2g') \
    .config('spark.sql.shuffle.partitions', shuffle_partitions) \
    .config('spark.sql.warehouse.dir', sql_warehouse_dir) \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.33") \
    .getOrCreate()

#### Extract: Load Dimension Tables from Multiple Sources (MongoDB & MySQL)

In [6]:
# mongoDB helper functions
def get_mongo_client(**args):
    if args["cluster_location"] == "atlas":
        connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
        client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
    return client

def get_mongo_dataframe(mongo_client, db_name, collection, query):
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    return dframe

In [7]:
# load doctors and clinics
client = get_mongo_client(**mongodb_args)
df_doctors_pandas = get_mongo_dataframe(client, mongodb_args["db_name"], "doctors", {})
df_dim_doctor = spark.createDataFrame(df_doctors_pandas) \
    .withColumnRenamed("id", "doctor_id") \
    .select("doctor_id", "first_name", "last_name", "specialty", "department", "years_experience", "job_title") \
    .withColumn("is_senior", col("years_experience") >= 10)

client = get_mongo_client(**mongodb_args)
df_clinics_pandas = get_mongo_dataframe(client, mongodb_args["db_name"], "clinics", {})
df_dim_clinic = spark.createDataFrame(df_clinics_pandas) \
    .select("clinic_id", "name", "department", "city", "state") \
    .withColumn("location", concat_ws(", ", col("city"), col("state"))) \
    .select("clinic_id", "name", "department", "location")

In [8]:
# load patients and date dimension
sql_patients = "SELECT * FROM healthcare_mart.dimpatient;"
df_patients_pandas = get_sql_dataframe(sql_patients, **mysql_mart_conn_args)
df_dim_patient = spark.createDataFrame(df_patients_pandas)

sql_date = "SELECT DateKey, DateValue FROM healthcare_mart.dim_date;"
df_date_pandas = get_sql_dataframe(sql_date, **mysql_mart_conn_args)
df_dim_date = spark.createDataFrame(df_date_pandas)

#### Bronze Layer: (Raw Data Ingestion) Ingest appointment JSON batches as 3 micro-batches

In [9]:
appointments_schema = StructType([
    StructField("appointment_id", IntegerType(), True),
    StructField("doctor_id", IntegerType(), True),
    StructField("patient_id", IntegerType(), True),
    StructField("room_id", IntegerType(), True),
    StructField("insurance_id", IntegerType(), True),
    StructField("appointment_ts", StringType(), True),
    StructField("visit_type", StringType(), True),
    StructField("duration_minutes", IntegerType(), True),
    StructField("cost_usd", DecimalType(10, 2), True)
])


checkpoint_dir = os.path.join(database_dir, "_checkpoints", "appointments_bronze")
bronze_stream_out = appointments_output_bronze 

df_appointments_stream = (
    spark.readStream
        .schema(appointments_schema)
        .option("maxFilesPerTrigger", 1)  # 1 file per interval
        .json(appointments_stream_dir)
)

# write Bronze as parquet
bronze_query = (
    df_appointments_stream.writeStream
        .format("parquet")
        .option("path", bronze_stream_out)
        .option("checkpointLocation", checkpoint_dir)
        .trigger(availableNow=True) 
        .outputMode("append")
        .start()
)

bronze_query.awaitTermination()

df_appointments_all = spark.read.parquet(bronze_stream_out)

print(f"Bronze accumulated rows: {df_appointments_all.count()}")


Bronze accumulated rows: 6


#### Silver Layer (Cleaned & Validated Data): Transform and Join with Dimension Tables

In [10]:
# join appointments with dimensions
df_appointments_silver = df_appointments_all \
    .withColumn("appointment_ts", to_timestamp(col("appointment_ts"))) \
    .withColumn("DateKey", date_format(col("appointment_ts"), "yyyyMMdd").cast("int")) \
    .join(df_dim_doctor.select("doctor_id", "department"), "doctor_id", "left") \
    .join(df_dim_clinic.select("clinic_id", "department"), "department", "left") \
    .select("appointment_id", "DateKey", "doctor_id", "patient_id", "clinic_id", 
            "duration_minutes", "cost_usd")

#### Gold Layer (Analytics-Ready Data): Create Final Fact Table

In [None]:
# create Gold layer with window function
windowSpec = W.orderBy("appointment_id")
df_appointments_gold = df_appointments_silver \
    .withColumn("AppointmentKey", row_number().over(windowSpec)) \
    .select("AppointmentKey", "DateKey", "doctor_id", "patient_id", "clinic_id", "duration_minutes", "cost_usd")

# create temporary view (works with batch DataFrames)
df_appointments_gold.createOrReplaceTempView("gold_appointments_temp")

In [12]:
# get the base appointment data as pandas
import json
all_appointments = []
for i in range(1, 4):
    batch_file = os.path.join(appointments_stream_dir, f'appointments_batch_{i}.json')
    with open(batch_file, 'r') as f:
        for line in f:
            all_appointments.append(json.loads(line))

df_appts = pd.DataFrame(all_appointments)

# convert timestamp and create DateKey
df_appts['appointment_ts'] = pd.to_datetime(df_appts['appointment_ts'])
df_appts['DateKey'] = df_appts['appointment_ts'].dt.strftime('%Y%m%d').astype(int)

# join with doctors
df_appts = df_appts.merge(
    df_doctors_pandas[['id', 'department']].rename(columns={'id': 'doctor_id'}),
    on='doctor_id',
    how='left'
)

# join with clinics 
df_appts = df_appts.merge(
    df_clinics_pandas[['clinic_id', 'department']],
    on='department',
    how='left'
)

# create Gold fact table
df_gold_pandas = df_appts[['appointment_id', 'DateKey', 'doctor_id', 'patient_id', 
                            'clinic_id', 'duration_minutes', 'cost_usd']].copy()
df_gold_pandas.insert(0, 'AppointmentKey', range(1, len(df_gold_pandas) + 1))

print(f"Gold Table: {len(df_gold_pandas)} records")

# write to MySQL
conn_str = f"mysql+pymysql://{mysql_mart_conn_args['uid']}:{mysql_mart_conn_args['pwd']}@{mysql_mart_conn_args['hostname']}/{mysql_mart_conn_args['dbname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

df_gold_pandas.to_sql("factappointment_v2", con=connection, index=False, if_exists='replace')

from sqlalchemy import text
connection.execute(text("ALTER TABLE factappointment_v2 ADD PRIMARY KEY (AppointmentKey);"))
connection.close()

df_gold_pandas

Gold Table: 6 records


Unnamed: 0,AppointmentKey,appointment_id,DateKey,doctor_id,patient_id,clinic_id,duration_minutes,cost_usd
0,1,1,20250103,101,201,1.0,30,120.0
1,2,2,20250103,101,202,1.0,20,85.0
2,3,3,20250214,102,203,2.0,45,200.0
3,4,4,20250301,103,204,,30,115.0
4,5,5,20250315,102,205,2.0,25,95.0
5,6,6,20250422,101,206,1.0,60,260.0


### Analytical Queries

In [13]:
# Query 1: Total revenue and appointments by department
query1 = """
SELECT 
    c.department,
    COUNT(f.AppointmentKey) AS NumAppointments,
    SUM(f.cost_usd) AS TotalRevenue,
    AVG(f.cost_usd) AS AvgCost,
    AVG(f.duration_minutes) AS AvgDuration
FROM healthcare_mart.factappointment_v2 f
JOIN healthcare_mart.dimclinic c ON f.clinic_id = c.clinic_id
GROUP BY c.department
ORDER BY TotalRevenue DESC;
"""

df_q1 = get_sql_dataframe(query1, **mysql_mart_conn_args)
print("REVENUE AND APPOINTMENTS BY DEPARTMENT:")
print(df_q1)

REVENUE AND APPOINTMENTS BY DEPARTMENT:
   department  NumAppointments  TotalRevenue  AvgCost  AvgDuration
0  Cardiology                3         465.0    155.0      36.6667
1   Neurology                2         295.0    147.5      35.0000


In [14]:
# Query 2: Doctor performance metrics
query2 = """
SELECT 
    d.first_name,
    d.last_name,
    d.specialty,
    d.is_senior,
    COUNT(f.AppointmentKey) AS NumAppointments,
    SUM(f.cost_usd) AS TotalRevenue,
    AVG(f.duration_minutes) AS AvgDuration
FROM healthcare_mart.factappointment_v2 f
JOIN healthcare_mart.dimdoctor d ON f.doctor_id = d.doctor_id
GROUP BY d.first_name, d.last_name, d.specialty, d.is_senior
ORDER BY TotalRevenue DESC;
"""

df_q2 = get_sql_dataframe(query2, **mysql_mart_conn_args)
print("DOCTOR PERFORMANCE METRICS:")
print(df_q2)

DOCTOR PERFORMANCE METRICS:
  first_name last_name    specialty  is_senior  NumAppointments  TotalRevenue  \
0      Alice     Patel   Cardiology          1                3         465.0   
1   Benjamin       Lee    Neurology          0                2         295.0   
2      Carla     Gomez  Orthopedics          1                1         115.0   

   AvgDuration  
0      36.6667  
1      35.0000  
2      30.0000  


In [15]:
# Query 3: Monthly appointment trends
query3 = """
SELECT 
    dt.MonthName,
    dt.YearNum,
    COUNT(f.AppointmentKey) AS NumAppointments,
    SUM(f.cost_usd) AS TotalRevenue,
    AVG(f.cost_usd) AS AvgCost
FROM healthcare_mart.factappointment_v2 f
JOIN healthcare_mart.dim_date dt ON f.DateKey = dt.DateKey
GROUP BY dt.MonthName, dt.YearNum, dt.MonthNum
ORDER BY dt.YearNum, dt.MonthNum;
"""

df_q3 = get_sql_dataframe(query3, **mysql_mart_conn_args)
print("MONTHLY APPOINTMENT TRENDS:")
print(df_q3)

MONTHLY APPOINTMENT TRENDS:
  MonthName  YearNum  NumAppointments  TotalRevenue  AvgCost
0   January     2025                2         205.0    102.5
1  February     2025                1         200.0    200.0
2     March     2025                2         210.0    105.0
3     April     2025                1         260.0    260.0
