In [2]:
# Install necessary libraries
!pip install pyspark pandas numpy

# Import required libraries
import pandas as pd
import numpy as np
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_date

# Generate Sample CSV Data for hospitals
def generate_sample_csv():
    data_1 = {
        "PatientID": [101, 102, 103, 104, 105],
        "Name": ["Alice", "Bob", "Charlie", "David", "Eve"],
        "Age": [25, 40, np.nan, 55, 30],
        "Diagnosis": ["Flu", "COVID-19", "Malaria", np.nan, "Cold"],
        "Doctor": ["Dr. A", "Dr. B", "Dr. C", "Dr. D", np.nan],
        "AdmissionDate": ["2025-03-01", "2025-03-02", "2025-03-03", "2025-03-04", "2025-03-05"],
        "DischargeDate": ["2025-03-05", "2025-03-07", "2025-03-10", "2025-03-12", np.nan]
    }

    data_2 = {
        "PatientID": [106, 107, 108, 109, 110],
        "Name": ["Frank", "Grace", "Hannah", "Ian", "Jack"],
        "Age": [45, np.nan, 38, 60, 22],
        "Diagnosis": ["Dengue", "Typhoid", np.nan, "COVID-19", "Flu"],
        "Doctor": ["Dr. E", "Dr. F", "Dr. G", np.nan, "Dr. H"],
        "AdmissionDate": ["2025-03-06", "2025-03-08", "2025-03-10", "2025-03-11", "2025-03-12"],
        "DischargeDate": ["2025-03-09", "2025-03-14", np.nan, "2025-03-18", "2025-03-20"]
    }

    df1 = pd.DataFrame(data_1)
    df2 = pd.DataFrame(data_2)

    df1.to_csv("hospital_1.csv", index=False)
    df2.to_csv("hospital_2.csv", index=False)

# Generate the CSV files
generate_sample_csv()

# Initialize Spark session
spark = SparkSession.builder.appName("PatientDataETL").getOrCreate()

# List of generated CSV files
hospital_files = ["hospital_1.csv", "hospital_2.csv"]

# Function to read and merge multiple CSVs
def extract_data(files):
    dfs = [spark.read.csv(file, header=True, inferSchema=True) for file in files]
    merged_df = dfs[0]
    for df in dfs[1:]:
        merged_df = merged_df.union(df)
    return merged_df

# Extracting data
raw_patient_data = extract_data(hospital_files)

# Transform: Handling missing values
def clean_data(df):
    df = df.dropDuplicates()  # Remove duplicates
    df = df.fillna({"Age": 0, "Diagnosis": "Unknown", "Doctor": "Not Assigned"})  # Fill missing values
    df = df.withColumn("AdmissionDate", to_date(col("AdmissionDate"), "yyyy-MM-dd"))  # Standardize date format
    df = df.withColumn("DischargeDate", to_date(col("DischargeDate"), "yyyy-MM-dd"))
    return df

# Cleaning the extracted data
cleaned_data = clean_data(raw_patient_data)

# Load: Save the cleaned data to a centralized CSV file
cleaned_data.write.csv("cleaned_patient_data.csv", header=True, mode="overwrite")

# Display cleaned data
cleaned_data.show()


+---------+-------+----+---------+------------+-------------+-------------+
|PatientID|   Name| Age|Diagnosis|      Doctor|AdmissionDate|DischargeDate|
+---------+-------+----+---------+------------+-------------+-------------+
|      102|    Bob|40.0| COVID-19|       Dr. B|   2025-03-02|   2025-03-07|
|      101|  Alice|25.0|      Flu|       Dr. A|   2025-03-01|   2025-03-05|
|      110|   Jack|22.0|      Flu|       Dr. H|   2025-03-12|   2025-03-20|
|      106|  Frank|45.0|   Dengue|       Dr. E|   2025-03-06|   2025-03-09|
|      104|  David|55.0|  Unknown|       Dr. D|   2025-03-04|   2025-03-12|
|      103|Charlie| 0.0|  Malaria|       Dr. C|   2025-03-03|   2025-03-10|
|      105|    Eve|30.0|     Cold|Not Assigned|   2025-03-05|         NULL|
|      108| Hannah|38.0|  Unknown|       Dr. G|   2025-03-10|         NULL|
|      109|    Ian|60.0| COVID-19|Not Assigned|   2025-03-11|   2025-03-18|
|      107|  Grace| 0.0|  Typhoid|       Dr. F|   2025-03-08|   2025-03-14|
+---------+-