In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
import re

# SILVER LAYER SCRIPT

### DATA LOADING

**Reading Data**

In [0]:
df_bronze = spark.read.format('csv').option('header', True).option('inferSchema', True).load("abfss://bronze@sthealthcarede.dfs.core.windows.net/Healthcare_Dataset")

**Data Profiling**

In [0]:
missing_values = ['nil', 'na', 'n/a', '-', 'unknown']

# Count missing values per column
df_bronze.select([
    count(when(col(c).isNull() | (trim(col(c)) == '') | (lower(trim(col(c))).isin(missing_values)), c)).alias(c + '_invalid')
    for c in df_bronze.columns
]).display()

# Detect outliers/invalid data
df_bronze.filter((col('Age') < 0) | (col('Age') > 120)).display()

df_bronze.select(
    min('Date of Admission').alias('earliest_admission'),
    max('Date of Admission').alias('latest_admission')
).display()

df_bronze.agg(
    min('Billing Amount').alias('billing_min'),
    max('Billing Amount').alias('billing_max'),
    avg('Billing Amount').alias('billing_avg')
).display()

df_bronze.filter(col('Discharge Date') < col('Date of Admission')).display()

# Unique values and frequency for categorical columns
categorical_cols = ['Gender', 'Blood Type', 'Admission Type']
for c in categorical_cols:
    df_bronze.groupBy(c).count().display()

# Duplicates
df_bronze.groupBy(df_bronze.columns).count().filter('count > 1').display()

df_bronze.groupBy(
    'Name',
    'Date of Admission',
    'Hospital'
).count().filter('count > 1').display()


**TRANSFORMATIONS**

In [0]:
text_cols = ['Name', 'Doctor', 'Hospital', 'Insurance Provider', 'Medical Condition', 'Medication', 'Test Results']

df_silver = df_bronze

for c in text_cols:
    df_silver = df_silver.withColumn(c, initcap(trim(col(c))))

w = Window.partitionBy('Name', 'Date of Admission', 'Hospital')

df_silver = (
    df_silver
    .withColumn('rn', row_number().over(
        w.orderBy(col('Age').desc())
    ))
    .filter(col('rn') == 1)
    .drop('rn')
)

df_silver = (df_silver.dropDuplicates()      # Remove duplicates
    # Cast columns
    .withColumn('Billing Amount', col('Billing Amount').cast('decimal(10, 2)'))  
    # Clean up hospital names   
    .withColumn('hospital_clean',       
        trim(regexp_replace(
            regexp_replace(col('Hospital'), r'(?i)^\s*and\s*,?\s*', ''),  
            r'(?i)\s*(,?\s*(and)?\s*)$', ''))
    )
    # Derived column
    .withColumn('Length of stay', datediff(col('Discharge Date'), col('Date of Admission')))    
    # Composite Admission Key for uniqueness    
    .withColumn('admission_key',        
        sha2(concat_ws('||', col('Name'), col('Date of Admission').cast('string'), col('Hospital')), 256)
    )   
    # Metadata
    .withColumn('ingestion_date', current_date())           
    .withColumn('source_file', lit('Healthcare_Dataset.csv')))

def to_snake_case(col_name):
    return re.sub(r'[^a-zA-Z0-9]+', '_', col_name).lower().strip('_')

df_silver = df_silver.select(
    *[col(c).alias(to_snake_case(c)) for c in df_silver.columns]
)

In [0]:
df_silver.display()

In [0]:
df_silver.write.format('delta')\
    .mode('append')\
    .option('path', 'abfss://silver@sthealthcarede.dfs.core.windows.net/Healthcare_Dataset')\
    .save()

