###                                                                  Connect and Read Clean Hospital Data Records
                                                          


In [0]:
spark



<pyspark.sql.connect.session.SparkSession at 0x7f547c3ca3f0>

In [0]:
dbutils.library.restartPython()


In [0]:
spark.conf.set(
    "fs.azure.account.key.cleanrecords.dfs.core.windows.net",
    "<your-storage-account-key>"
)


In [0]:
## Display storage path, name and size

display(dbutils.fs.ls("abfss://clean-records-container@cleanrecords.dfs.core.windows.net/"))


path,name,size,modificationTime
abfss://clean-records-container@cleanrecords.dfs.core.windows.net/Cleaned_Hospital_Data,Cleaned_Hospital_Data,181875,1762354567000


In [0]:
## Read Patient Records Data

PatientData = spark.read.parquet(
    "abfss://clean-records-container@cleanrecords.dfs.core.windows.net/Cleaned_Hospital_Data"
)


### Quality Check

In [0]:
## Check if any columns have special characters that need to be removed

from functools import reduce
from pyspark.sql.functions import col

columns_to_check = ["PatientLastName", "PatientFirstName", "PatientMaiden"]
pattern = "[^a-zA-Z0-9 '_]"

# Build a combined OR condition

filter_condition = reduce(lambda a, b: a|b, [col(c).rlike(pattern) for c in columns_to_check])
special_char = PatientData.filter(filter_condition)

Columns_with_SpecialChar = special_char.select(columns_to_check)
Columns_with_SpecialChar.show(50)


+---------------+----------------+-------------+
|PatientLastName|PatientFirstName|PatientMaiden|
+---------------+----------------+-------------+
|        Delr‚àö‚â†o|           Juana|     Sandoval|
|        Delr‚àö‚â†o|           Juana|     Sandoval|
|        Delr‚àö‚â†o|           Juana|     Sandoval|
|        Delr‚àö‚â†o|           Juana|     Sandoval|
|        Delr‚àö‚â†o|           Juana|     Sandoval|
|        Delr‚àö‚â†o|           Juana|     Sandoval|
|        Delr‚àö‚â†o|           Juana|     Sandoval|
|          Ponce|            NULL|      Far‚àö‚â†as|
|      De Jes‚àö‚à´s|          Ramiro|         NULL|
|         Griego|   Miguel ‚àö√•ngel|         NULL|
|         Griego|   Miguel ‚àö√•ngel|         NULL|
|         Griego|   Miguel ‚àö√•ngel|         NULL|
|         Griego|   Miguel ‚àö√•ngel|         NULL|
|     Delgadillo|      Jos Mar‚àö‚â†a|         NULL|
|     Delgadillo|      Jos Mar‚àö‚â†a|         NULL|
|         Mar‚àö‚â†n|      Jos Emilio|         NULL|
|        

In [0]:
from pyspark.sql.functions import regexp_replace, col, trim

## Remove all special characters from the columns

pattern1 = "[^a-zA-Z0-9 '_]" 

columns_to_clean = ["PatientLastName", "PatientFirstName", "PatientMaiden"]

for c in columns_to_clean:
    PatientData = PatientData.withColumn(c, trim(regexp_replace(col(c), pattern, "")))





In [0]:
## Check if special characters were removed from columns

columns_to_check = ["PatientLastName", "PatientFirstName", "PatientMaiden"]
pattern = "[^a-zA-Z0-9 '_]"

# Build a combined OR condition

filter_condition = reduce(lambda a, b: a|b, [col(c).rlike(pattern) for c in columns_to_check])
special_char = PatientData.filter(filter_condition)

Columns_with_SpecialChar = special_char.select(columns_to_check)
Columns_with_SpecialChar.show(100)


+---------------+----------------+-------------+
|PatientLastName|PatientFirstName|PatientMaiden|
+---------------+----------------+-------------+
+---------------+----------------+-------------+



In [0]:
## Accuracy checks for data types and format validation

PatientData.printSchema()


root
 |-- PatientID: string (nullable = true)
 |-- PatientBirthday: date (nullable = true)
 |-- PatientDeathdate: date (nullable = true)
 |-- PatientPrefix: string (nullable = true)
 |-- PatientSuffix: string (nullable = true)
 |-- PatientMaiden: string (nullable = true)
 |-- PatientMarital: string (nullable = true)
 |-- PatientRace: string (nullable = true)
 |-- PatientEthnicity: string (nullable = true)
 |-- PatientGender: string (nullable = true)
 |-- PatientBirthplace: string (nullable = true)
 |-- PatientAddress: string (nullable = true)
 |-- PatientCity: string (nullable = true)
 |-- PatientState: string (nullable = true)
 |-- PatientCounty: string (nullable = true)
 |-- PatientZip: string (nullable = true)
 |-- PatientFirstName: string (nullable = true)
 |-- PatientLastName: string (nullable = true)
 |-- EncountersID: string (nullable = true)
 |-- EncounterStartTime: timestamp (nullable = true)
 |-- EncounterStopTime: timestamp (nullable = true)
 |-- PatientEncounterID: string (

In [0]:
## Get the total # of columns and rows

print(f" Number of rows : {PatientData.count()}")
print(f"Number of columns : {len(PatientData.columns)}")

 Number of rows : 10196
Number of columns : 48


In [0]:
## Count the # of null values for each column

from pyspark.sql import functions as F
null_counts_df = PatientData.select(*[F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in ["PatientID", "ID","PatientFirstName","PatientLastName", "PatientProcedureID","Payer", "PatientEncounterID", "PatientBirthday"]])

null_counts_df.show()

+---------+---+----------------+---------------+------------------+-----+------------------+---------------+
|PatientID| ID|PatientFirstName|PatientLastName|PatientProcedureID|Payer|PatientEncounterID|PatientBirthday|
+---------+---+----------------+---------------+------------------+-----+------------------+---------------+
|       26|783|              59|            269|              1028|  783|               783|            113|
+---------+---+----------------+---------------+------------------+-----+------------------+---------------+



In [0]:
## find rows where PatientID is missing and at least one demographic field is missing

from pyspark.sql.functions import col, isnull, trim

# helper: treat blanks and nulls as missing
def is_blank(c):
    return isnull(col(c)) | (trim(col(c)) == "")

# Condition: PatientID missing AND at least one demographic field missing
cond = (
    is_blank("PatientID") &
    (
        is_blank("PatientFirstName") |
        is_blank("PatientLastName") |
        isnull(col("PatientBirthday"))
    )
)

# Filter and return those rows
PatientData_missingID = PatientData.filter(cond) \
    .select("PatientID", "PatientFirstName", "PatientLastName", "PatientBirthday")

display(PatientData_missingID)




PatientID,PatientFirstName,PatientLastName,PatientBirthday
,,,
,,,
,,,
,,,
,,,
,,,
,,,
,,,
,,,
,,,


In [0]:
## Drop rows that have null values for PatientID

PatientDataClean = PatientData.dropna(subset=["PatientID"])



In [0]:
before = PatientData.count()
after = PatientDataClean.count()

print(f"Before cleaning: {before:,} rows")
print(f"After cleaning:  {after:,} rows")
print(f"Rows removed:    {before - after:,}")


Before cleaning: 10,196 rows
After cleaning:  10,170 rows
Rows removed:    26


In [0]:
## Check for outliers for columns with numeric data types

from pyspark.sql.functions import col
from pyspark.sql import DataFrame

def detect_outliers_iqr(PatientDataClean, columns: list, factor: float = 1.5):
    """
    Detect outliers for multiple numeric columns in a PySpark DataFrame using IQR.
    
    Args:
        df (DataFrame): Input PySpark DataFrame
        columns (list): List of numeric column names
        factor (float): Multiplier for IQR (default 1.5)
    
    Returns:
        dict: Dictionary where keys are column names and values are DataFrames of outliers
    """
    outlier_PatientDataClean = {}
    
    for col_name in columns:
        # Compute Q1 and Q3
        q1, q3 = PatientDataClean.approxQuantile(col_name, [0.25, 0.75], 0.05)
        iqr = q3 - q1

        lower_bound = q1 - factor * iqr
        upper_bound = q3 + factor * iqr

        # Filter outliers for this column
        outlier_PatientDataClean[col_name] = PatientDataClean.filter((col(col_name) < lower_bound) | (col(col_name) > upper_bound))

        print(f"‚úÖ Column: {col_name}")
        print(f"   Q1={q1}, Q3={q3}, IQR={iqr}")
        print(f"   Lower={lower_bound}, Upper={upper_bound}")
        print(f"   Outlier count: {outlier_PatientDataClean[col_name].count()}\n")

    return outlier_PatientDataClean
detect_outliers_iqr(PatientDataClean, ["Base_Encounter_Cost","Total_Claim_Cost", "Payer_Coverage", "ProcedureBaseCost"]).show()


‚úÖ Column: Base_Encounter_Cost
   Q1=85.55, Q3=142.58, IQR=57.030000000000015
   Lower=0.004999999999981242, Upper=228.12500000000003
   Outlier count: 0

‚úÖ Column: Total_Claim_Cost
   Q1=142.58, Q3=1182.42, IQR=1039.8400000000001
   Lower=-1417.1800000000003, Upper=2742.1800000000003
   Outlier count: 1692

‚úÖ Column: Payer_Coverage
   Q1=0.0, Q3=155.77, IQR=155.77
   Lower=-233.65500000000003, Upper=389.42500000000007
   Outlier count: 1622

‚úÖ Column: ProcedureBaseCost
   Q1=431.0, Q3=781.0, IQR=350.0
   Lower=-94.0, Upper=1306.0
   Outlier count: 1175



[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
File [0;32m<command-5837991487584938>, line 37[0m
[1;32m     34[0m         [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124m   Outlier count: [39m[38;5;132;01m{[39;00moutlier_PatientDataClean[col_name][38;5;241m.[39mcount()[38;5;132;01m}[39;00m[38;5;130;01m\n[39;00m[38;5;124m"[39m)
[1;32m     36[0m     [38;5;28;01mreturn[39;00m outlier_PatientDataClean
[0;32m---> 37[0m detect_outliers_iqr(PatientDataClean, [[38;5;124m"[39m[38;5;124mBase_Encounter_Cost[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mTotal_Claim_Cost[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mPayer_Coverage[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mProcedureBaseCost[39m[38;5;124m"[39m])[38;5;241m.[39mshow()

[0;31mAttributeError[0m: 'dict' object has no attribute 'show'

In [0]:
## Check the columns that do have outliers to see if they make sense

from pyspark.sql.functions import col

# Numeric columns you want to inspect
numeric_cols = ["Total_Claim_Cost", "Payer_Coverage", "ProcedureBaseCost"]

# Descriptive columns to include for context
context_cols = ["ReasonDescription", "ProcedureDescription"]

# Loop through each numeric column and display top 10
for c in numeric_cols:
    print(f"\nüîº Top 10 rows for column: {c}")
    PatientDataClean \
        .select(*(context_cols + numeric_cols)) \
        .filter(col(c).isNotNull()) \
        .orderBy(col(c).desc()) \
        .show(10, truncate=False)



üîº Top 10 rows for column: Total_Claim_Cost
+-----------------+-------------------------------------------------------------------------------+----------------+--------------+-----------------+
|ReasonDescription|ProcedureDescription                                                           |Total_Claim_Cost|Payer_Coverage|ProcedureBaseCost|
+-----------------+-------------------------------------------------------------------------------+----------------+--------------+-----------------+
|Normal Pregnancy |Assessment Of Health And Social Care Needs Procedure                           |63714.32        |0.00          |431.00           |
|Normal Pregnancy |Assessment Of Substance Use Procedure                                          |63714.32        |0.00          |431.00           |
|Normal Pregnancy |Screening For Domestic Abuse Procedure                                         |63714.32        |0.00          |431.00           |
|Normal Pregnancy |Colonoscopy                       

In [0]:
## Check if any of the procedure or encounter stop times are before the start time

invalid_times = PatientDataClean.filter(
    (col("ProcedureStopTime") < col("ProcedureStartTime")) |
    (col("EncounterStopTime") < col("EncounterStartTime"))
)

invalid_times.select(
    "ReasonDescription",
    "Description",
    "ProcedureStartTime",
    "ProcedureStopTime",
    "EncounterStartTime",
    "EncounterStopTime"
).show(10, truncate=False)



+-----------------+-----------+------------------+-----------------+------------------+-----------------+
|ReasonDescription|Description|ProcedureStartTime|ProcedureStopTime|EncounterStartTime|EncounterStopTime|
+-----------------+-----------+------------------+-----------------+------------------+-----------------+
+-----------------+-----------+------------------+-----------------+------------------+-----------------+



In [0]:
## check if there are any outliers for the procedure datetime and ecounters datetime

from pyspark.sql.functions import unix_timestamp, round

PatientDataDurations = PatientDataClean.withColumn(
    "ProcedureDuration_Hours",
    round((unix_timestamp(col("ProcedureStopTime")) - unix_timestamp(col("ProcedureStartTime"))) / 3600, 2)
).withColumn(
    "EncounterDuration_Hours",
    round((unix_timestamp(col("EncounterStopTime")) - unix_timestamp(col("EncounterStartTime"))) / 3600, 2)
)

def detect_outliers_iqr(df, column):
    q1, q3 = df.approxQuantile(column, [0.25, 0.75], 0.05)
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr

    outliers = df.filter((col(column) < lower) | (col(column) > upper))
    print(f"‚úÖ {column}: Q1={q1}, Q3={q3}, IQR={iqr}, Lower={lower}, Upper={upper}, Outliers={outliers.count()}")
    return outliers

# Apply to both durations
procedure_outliers = detect_outliers_iqr(PatientDataDurations, "ProcedureDuration_Hours")
encounter_outliers = detect_outliers_iqr(PatientDataDurations, "EncounterDuration_Hours")



‚úÖ ProcedureDuration_Hours: Q1=0.25, Q3=2.05, IQR=1.7999999999999998, Lower=-2.4499999999999997, Upper=4.75, Outliers=0
‚úÖ EncounterDuration_Hours: Q1=0.25, Q3=0.25, IQR=0.0, Lower=0.25, Upper=0.25, Outliers=2507


In [0]:
## View the outliers

procedure_outliers.select(
    "ReasonDescription", "Description",
    "ProcedureStartTime", "ProcedureStopTime", "ProcedureDuration_Hours"
).orderBy(col("ProcedureDuration_Hours").desc()).show(10, truncate=False)

encounter_outliers.select(
    "ReasonDescription", "Description",
    "EncounterStartTime", "EncounterStopTime", "EncounterDuration_Hours"
).orderBy(col("EncounterDuration_Hours").desc()).show(10, truncate=False)



+-----------------+-----------+------------------+-----------------+-----------------------+
|ReasonDescription|Description|ProcedureStartTime|ProcedureStopTime|ProcedureDuration_Hours|
+-----------------+-----------+------------------+-----------------+-----------------------+
+-----------------+-----------+------------------+-----------------+-----------------------+

+-------------------------------------+-------------------------------------+-------------------+-------------------+-----------------------+
|ReasonDescription                    |Description                          |EncounterStartTime |EncounterStopTime  |EncounterDuration_Hours|
+-------------------------------------+-------------------------------------+-------------------+-------------------+-----------------------+
|NULL                                 |NULL                                 |2013-12-03 19:23:12|2013-12-04 19:23:12|24.0                   |
|NULL                                 |NULL                

### **Analysis**

In [0]:
## What are the top 5 most common patient encounters Greenville Hospital recieve

from pyspark.sql.functions import count, desc, col, trim

# Filter out rows where Description is null or blank
PatientDataClean_nonull = PatientDataClean.filter(
    (col("Description").isNotNull()) & (trim(col("Description")) != "")
)

# Group, count, and sort
top_reasons_df = (
    PatientDataClean_nonull.groupBy("Description")
    .agg(count("*").alias("Encounter_Count"))
    .sort(desc("Encounter_Count"))
    .limit(5)
)

# Show full text in columns (no truncation)
top_reasons_df.show(truncate=False)


+----------------------------------------+---------------+
|Description                             |Encounter_Count|
+----------------------------------------+---------------+
|Encounter For Problem Procedure         |1446           |
|Urgent Care Clinic Procedure            |1132           |
|Encounter For Check Up Procedure        |1130           |
|General Examination Of Patient Procedure|981            |
|Telemedicine Consultation With Patient  |680            |
+----------------------------------------+---------------+



In [0]:
## What is the average duration of the top 10 most common patient encounters Greenville Hospital recieve

from pyspark.sql.functions import count, desc, col, trim, avg, unix_timestamp

# 1Ô∏è‚É£ Filter out rows where Description is null or blank
PatientDataClean_nonull = PatientDataClean.filter(
    (col("Description").isNotNull()) & (trim(col("Description")) != "")
)

# 2Ô∏è‚É£ Compute top 10 most frequent encounter reasons
top_reasons_df = (
    PatientDataClean_nonull.groupBy("Description")
    .agg(count("*").alias("Encounter_Count"))
    .sort(desc("Encounter_Count"))
    .limit(10)
)

# 3Ô∏è‚É£ Join back to the main dataset to calculate average encounter duration
# Convert start/stop time columns to timestamps and compute duration in minutes
PatientData_with_duration = (
    PatientDataClean_nonull
    .withColumn("EncounterStartTimeTS", unix_timestamp(col("EncounterStartTime")))
    .withColumn("EncounterStopTimeTS", unix_timestamp(col("EncounterStopTime")))
    .withColumn("Encounter_Duration_Minutes", 
                (col("EncounterStopTimeTS") - col("EncounterStartTimeTS")) / 60)
)

# 4Ô∏è‚É£ Filter to only include the top 10 encounter reasons
PatientData_top10 = PatientData_with_duration.join(
    top_reasons_df.select("Description"),
    on="Description",
    how="inner"
)

# 5Ô∏è‚É£ Compute average encounter duration per description
avg_duration_df = (
    PatientData_top10.groupBy("Description")
    .agg(
        count("*").alias("Encounter_Count"),
        avg(col("Encounter_Duration_Minutes")).alias("Avg_Encounter_Duration_Minutes")
    )
    .sort(desc("Encounter_Count"))
)

# 6Ô∏è‚É£ Display the full results
avg_duration_df.show(truncate=False)


+----------------------------------------+---------------+------------------------------+
|Description                             |Encounter_Count|Avg_Encounter_Duration_Minutes|
+----------------------------------------+---------------+------------------------------+
|Encounter For Problem Procedure         |1446           |102.46304163126592            |
|Urgent Care Clinic Procedure            |1132           |11.996221662468514            |
|Encounter For Check Up Procedure        |1130           |13.88888888888889             |
|General Examination Of Patient Procedure|981            |14.59119496855346             |
|Telemedicine Consultation With Patient  |680            |12.90566037735849             |
|Followup Encounter                      |529            |14.948096885813149            |
|Patient Encounter Procedure Procedure   |496            |53.1130896226414              |
|Encounter For Symptom                   |378            |15.303378378378408            |
|Encounter

In [0]:

## Retrieve each encounter classification by the most # count of visits to the least

from pyspark.sql.functions import col, count, trim, desc

# 1Ô∏è‚É£ Filter for 'Noinsurance' encounters only
noinsurance_df = PatientDataClean.filter(
    (col("InsuranceName").isNotNull()) &
    (trim(col("InsuranceName")).isin("Noinsurance", "NOINSURANCE", "noinsurance")) &
    (col("EncounterClass").isNotNull()) &
    (trim(col("EncounterClass")) != "")
)

# 2Ô∏è‚É£ Group by EncounterClass and count visits
encounterclass_counts_df = (
    noinsurance_df.groupBy("EncounterClass")
    .agg(count("*").alias("Visit_Count"))
    .sort(desc("Visit_Count"))
)

# 3Ô∏è‚É£ Show results (full text)
encounterclass_counts_df.show(truncate=False)


+--------------+-----------+
|EncounterClass|Visit_Count|
+--------------+-----------+
|Ambulatory    |1376       |
|Outpatient    |952        |
|Urgentcare    |590        |
|Wellness      |418        |
|Emergency     |211        |
|Inpatient     |137        |
+--------------+-----------+



In [0]:
## FInd the top 3 payers by total payments

from pyspark.sql.functions import col, sum as spark_sum, desc, trim, format_number, concat, lit

# 1Ô∏è‚É£ Filter out null, blank, or 'Noinsurance' payers
PatientDataClean_nonull_payers = PatientDataClean.filter(
    (col("InsuranceName").isNotNull()) &
    (trim(col("InsuranceName")) != "") &
    (trim(col("InsuranceName")) != "Noinsurance")
)

# 2Ô∏è‚É£ Group by payer and sum the total payments
payers_total_df = (
    PatientDataClean_nonull_payers.groupBy("InsuranceName")
    .agg(spark_sum(col("Total_Claim_Cost")).alias("Total_Payments"))
    .sort(desc("Total_Payments"))
    .limit(3)
)

# 3Ô∏è‚É£ Format currency (e.g., $12,345.67)
payers_formatted_df = payers_total_df.select(
    col("InsuranceName"),
    concat(lit("$"), format_number(col("Total_Payments"), 2)).alias("Total_Payments_USD")
)

# 4Ô∏è‚É£ Show results without truncation
payers_formatted_df.show(truncate=False)


+----------------+------------------+
|InsuranceName   |Total_Payments_USD|
+----------------+------------------+
|Unitedhealthcare|$17,287,483.78    |
|Medicare        |$3,498,745.36     |
|Humana          |$124,356.94       |
+----------------+------------------+



In [0]:

## What is the total amount of medical expenses not paid by insurance

from pyspark.sql.functions import col, sum as spark_sum, desc, trim, format_number, concat, lit

# 1Ô∏è‚É£ Filter only 'Noinsurance' rows (case-insensitive, clean spacing)
noinsurance_df = PatientDataClean.filter(
    (col("InsuranceName").isNotNull()) &
    (trim(col("InsuranceName")).isin("Noinsurance", "NOINSURANCE", "noinsurance"))
)

# 2Ô∏è‚É£ Group by payer (Noinsurance) and sum the total payments
noinsurance_total_df = (
    noinsurance_df.groupBy("InsuranceName")
    .agg(spark_sum(col("Total_Claim_Cost")).alias("Total_Payments"))
    .sort(desc("Total_Payments"))
)

# 3Ô∏è‚É£ Format the total as currency
noinsurance_formatted_df = noinsurance_total_df.select(
    col("InsuranceName"),
    concat(lit("$"), format_number(col("Total_Payments"), 2)).alias("Total_Payments_USD")
)

# 4Ô∏è‚É£ Show the result (full text, no truncation)
noinsurance_formatted_df.show(truncate=False)


+-------------+------------------+
|InsuranceName|Total_Payments_USD|
+-------------+------------------+
|Noinsurance  |$10,915,952.23    |
+-------------+------------------+



In [0]:
## What demographic pays the most in medical expenses

from pyspark.sql.functions import (
    col, sum as spark_sum, trim, current_date, datediff, floor,
    concat, lit, format_number
)

# 1Ô∏è‚É£ Filter only 'Noinsurance' records
noinsurance_df = PatientDataClean.filter(
    (col("InsuranceName").isNotNull()) &
    (trim(col("InsuranceName")).isin("Noinsurance", "NOINSURANCE", "noinsurance"))
)

# 2Ô∏è‚É£ Compute Patient Age (in years)
noinsurance_with_age = noinsurance_df.withColumn(
    "Patient_Age",
    floor(datediff(current_date(), col("PatientBirthday")) / 365.25)
)

# 3Ô∏è‚É£ Group by age and demographics, then sum total claim cost
noinsurance_by_demo = (
    noinsurance_with_age.groupBy(
        "Patient_Age",
        "PatientMarital",
        "PatientRace",
        "PatientEthnicity",
        "PatientGender"
    )
    .agg(spark_sum(col("Total_Claim_Cost")).alias("Total_Noinsurance_Payments"))
    .sort(col("Total_Noinsurance_Payments").desc())  # üî• order by descending total
)

# 4Ô∏è‚É£ Format total payments as currency for display
noinsurance_formatted = noinsurance_by_demo.select(
    "Patient_Age",
    "PatientMarital",
    "PatientRace",
    "PatientEthnicity",
    "PatientGender",
    concat(lit("$"), format_number(col("Total_Noinsurance_Payments"), 2)).alias("Total_Payments_USD")
)

# 5Ô∏è‚É£ Show results
noinsurance_formatted.show(truncate=False)



+-----------+--------------+-----------+----------------+-------------+------------------+
|Patient_Age|PatientMarital|PatientRace|PatientEthnicity|PatientGender|Total_Payments_USD|
+-----------+--------------+-----------+----------------+-------------+------------------+
|64         |M             |Black      |Hispanic        |F            |$9,555,786.36     |
|102        |S             |White      |Nonhispanic     |M            |$398,277.30       |
|92         |M             |White      |Nonhispanic     |F            |$308,119.68       |
|86         |M             |White      |Nonhispanic     |F            |$255,282.56       |
|83         |M             |White      |Nonhispanic     |NULL         |$110,630.76       |
|46         |M             |White      |Nonhispanic     |M            |$79,822.92        |
|94         |M             |White      |Nonhispanic     |F            |$56,308.10        |
|38         |S             |White      |Nonhispanic     |F            |$34,833.74        |

In [0]:
## For each demographic, what procedure is most common?

from pyspark.sql import Window
from pyspark.sql.functions import (
    col, count, trim, current_date, datediff, floor,
    row_number, desc
)

# 1Ô∏è‚É£ Filter for 'Noinsurance' and valid ProcedureDescription
noinsurance_df = PatientDataClean.filter(
    (col("InsuranceName").isNotNull()) &
    (trim(col("InsuranceName")).isin("Noinsurance", "NOINSURANCE", "noinsurance")) &
    (col("ProcedureDescription").isNotNull()) &
    (trim(col("ProcedureDescription")) != "")
)

# 2Ô∏è‚É£ Compute Patient Age (in years)
noinsurance_with_age = noinsurance_df.withColumn(
    "Patient_Age",
    floor(datediff(current_date(), col("PatientBirthday")) / 365.25)
)

# 3Ô∏è‚É£ Group by demographics, procedure, and reason ‚Äî count # of visits
agg_df = (
    noinsurance_with_age.groupBy(
        "Patient_Age",
        "PatientMarital",
        "PatientRace",
        "PatientEthnicity",
        "PatientGender",
        "ProcedureDescription",
        "ProcedureReasonDescription"
    )
    .agg(count("*").alias("Visit_Count"))
)

# 4Ô∏è‚É£ Rank procedures by Visit_Count within each demographic group
window_spec = Window.partitionBy(
    "Patient_Age",
    "PatientMarital",
    "PatientRace",
    "PatientEthnicity",
    "PatientGender"
).orderBy(desc("Visit_Count"))

ranked_df = agg_df.withColumn("rank", row_number().over(window_spec))

# 5Ô∏è‚É£ Keep only the top 1 procedure per demographic group
top_proc_df = ranked_df.filter(col("rank") == 1)

# 6Ô∏è‚É£ Display ordered results by Visit_Count (descending)
final_df = top_proc_df.select(
    "Patient_Age",
    "PatientMarital",
    "PatientRace",
    "PatientEthnicity",
    "PatientGender",
    "ProcedureDescription",
    "Visit_Count"
).orderBy(col("Visit_Count").desc())

# 7Ô∏è‚É£ Show all columns (no truncation)
final_df.show(truncate=False)



+-----------+--------------+-----------+----------------+-------------+-------------------------------------------------------------------------------+-----------+
|Patient_Age|PatientMarital|PatientRace|PatientEthnicity|PatientGender|ProcedureDescription                                                           |Visit_Count|
+-----------+--------------+-----------+----------------+-------------+-------------------------------------------------------------------------------+-----------+
|64         |M             |Black      |Hispanic        |F            |Renal Dialysis Procedure                                                       |2622       |
|94         |M             |White      |Nonhispanic     |F            |Combined Chemotherapy And Radiation Therapy Procedure                          |30         |
|92         |M             |White      |Nonhispanic     |F            |High Resolution Computed Tomography Of Chest Without Contrast Procedure        |16         |
|83         |M  

In [0]:
## What is the average age of patients who visits the hospital

from pyspark.sql.functions import col, trim, current_date, datediff, floor, avg

# 1Ô∏è‚É£ Filter only 'Noinsurance' rows with valid (non-null, non-empty) ProcedureReasonDescription
noinsurance_df = PatientDataClean.filter(
    (col("InsuranceName").isNotNull()) &
    (trim(col("InsuranceName")).isin("Noinsurance", "NOINSURANCE", "noinsurance")) &
    (col("ProcedureReasonDescription").isNotNull()) &
    (trim(col("ProcedureReasonDescription")) != "")
)

# 2Ô∏è‚É£ Compute Patient Age (in years)
noinsurance_with_age = noinsurance_df.withColumn(
    "Patient_Age",
    floor(datediff(current_date(), col("PatientBirthday")) / 365.25)
)

# 3Ô∏è‚É£ Calculate the overall average age
avg_age_df = noinsurance_with_age.select(avg(col("Patient_Age")).alias("Average_Age"))

# 4Ô∏è‚É£ Display the result
avg_age_df.show()


+-----------+
|Average_Age|
+-----------+
|       89.5|
+-----------+



In [0]:
import requests
requests.get("https://ifconfig.me").text


'172.202.17.203'

In [0]:
server_name = "medical-records.database.windows.net"
database_name = "Patient_Records"

jdbc_url = (
    f"jdbc:sqlserver://{server_name}:1433;"
    f"database={database_name};"
    "encrypt=true;trustServerCertificate=false;"
    "hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
)

PatientData.write \
    .format("jdbc") \
    .mode("overwrite") \
    .option("url", jdbc_url) \
    .option("dbtable", "dbo.Patient_Medical_Records") \
    .option("user", "databricks_loader") \
    .option("password", "N$60engD06") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()




In [0]:
# -----------------------------------------------
# üöÄ Load PatientDataClean into Azure SQL Database
# -----------------------------------------------

# Define your SQL Server and database
server_name = "medical-records.database.windows.net"
database_name = "Patient_Records"

# Build JDBC connection string
jdbc_url = (
    f"jdbc:sqlserver://{server_name}:1433;"
    f"database={database_name};"
    "encrypt=true;trustServerCertificate=false;"
    "hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
)

# Table name to write to
table_name = "dbo.Patient_Medical_Records"

# ‚úÖ Write the data
(
    PatientDataClean.write
    .format("jdbc")
    .mode("overwrite")   # Use "append" if you want to add to existing rows instead
    .option("url", jdbc_url)
    .option("dbtable", table_name)
    .option("user", "databricks_loader")
    .option("password", "N$60engD06")
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .save()
)

print("‚úÖ Data successfully loaded into Azure SQL table [dbo].[Patient_Medical_Records]")


‚úÖ Data successfully loaded into Azure SQL table [dbo].[Patient_Medical_Records]
