# Gold Layer – Business & ML Ready Data
This notebook is part of a Medallion Architecture (Bronze → Silver → Gold).

Purpose:
- Convert Silver layer data into business-friendly and ML-ready datasets
- Create risk segmentation and aggregated summaries for analytics and dashboards

In [0]:
# # Load cleaned and feature-engineered Silver data
# Why: Gold layer is always built on top of Silver (AI-ready data)
# Used for: Risk segmentation, ML input, and dashboard analytics

silver_df = spark.read.table("silver_patient_features")
silver_df.display()

In [0]:
# Import functions for conditional column creation and column reference

from pyspark.sql.functions import when, col

In [0]:
# Create risk_bucket column based on utilization_score and treatment_changed

gold_features_df = silver_df.withColumn(
    "risk_bucket",
    when((col("utilization_score") >= 5) & (col("treatment_changed") == 1), "High")
    .when(col("utilization_score") >= 3, "Medium")
    .otherwise("Low")
)

In [0]:
from pyspark.sql.functions import col, when, lit

# Load Silver table
silver_df = spark.read.table("silver_patient_features")

# -------------------------
# SAFE CHECK: does 'change' column exist?
# -------------------------
if "change" in silver_df.columns:
    silver_df = silver_df.withColumn(
        "treatment_changed",
        when(col("change") == "Ch", 1)
        .when(col("change") == "No", 0)
        .otherwise(None)
    ).drop("change")
else:
    # If column doesn't exist, create a default safe column
    silver_df = silver_df.withColumn(
        "treatment_changed",
        lit(None).cast("int")
    )

# -------------------------
# Save back to Silver (overwrite cleanly)
# -------------------------
(
    silver_df.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("silver_patient_features")
)

In [0]:
# Load silver_patient_features table into a DataFrame

gold_features_df = spark.table("silver_patient_features")

In [0]:
# Add utilization_score and risk_bucket features to gold_features_df

from pyspark.sql.functions import col, when

gold_features_df = gold_features_df.withColumn(
    "utilization_score",
    col("number_inpatient") + col("number_emergency") + col("number_outpatient")
)

gold_features_df = gold_features_df.withColumn(
    "risk_bucket",
    when((col("utilization_score") >= 5) & (col("treatment_changed") == 1), "High")
    .when(col("utilization_score") >= 3, "Medium")
    .otherwise("Low")
)

In [0]:
# Save gold_features_df as Delta table gold_patient_features (overwrite mode with schema update)

gold_features_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("gold_patient_features")

In [0]:
# Create aggregated business summary
# Purpose: Executive-friendly KPIs (no patient-level noise)
from pyspark.sql.functions import count
gold_summary_df = gold_features_df.groupBy(
    "age_bucket", "risk_bucket"
).agg(
    count("*").alias("patient_count")
)

gold_summary_df.display()


Databricks visualization. Run in Databricks to view.

In [0]:
# Save business summary Gold table
# Used for: Dashboards and high-level reporting

gold_summary_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_readmission_summary")


In [0]:
%sql
-- Validate risk distribution
SELECT risk_bucket, COUNT(*) AS patients
FROM gold_patient_features
GROUP BY risk_bucket;


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- Validate age-wise summary
SELECT *
FROM gold_readmission_summary
ORDER BY age_bucket, risk_bucket;


In [0]:
%sql
-- Create or replace view with readmission status label

CREATE OR REPLACE VIEW gold_readmission_status_view AS
SELECT
    gold_patient_features.*,
    CASE
        WHEN readmit_30d = 1 THEN 'Readmitted'
        ELSE 'Not Readmitted'
    END AS readmit_status
FROM gold_patient_features;

In [0]:

%sql
-- Count patients grouped by readmission status


SELECT
  readmit_status,
  COUNT(*) AS patient_count
FROM gold_readmission_status_view
GROUP BY readmit_status;

In [0]:
%sql
-- Create or replace view for readmission status labels

CREATE OR REPLACE VIEW dashboard_readmission_base AS
SELECT
  CASE 
    WHEN readmit_30d = 1 THEN 'Readmitted'
    ELSE 'Not Readmitted'
  END AS readmit_status
FROM gold_patient_features;

In [0]:
#silver_to_gold
df_silver = spark.table("default.silver_patient_features")

In [0]:
#gold_feature_engineering
from pyspark.sql.functions import when, col

df_gold = (
    df_silver
    .withColumn(
        "ai_risk_bucket",
        when(col("readmission_probability") >= 0.7, "High Risk")
        .when(col("readmission_probability") >= 0.4, "Medium Risk")
        .otherwise("Low Risk")
    )
    .withColumn(
        "readmit_status",
        when(col("readmitted") == 1, "Readmitted")
        .otherwise("Not Readmitted")
    )
)

In [0]:
# # Load the silver_patient_features table into a DataFrame and print its schema to verify column types

df_silver = spark.table("default.silver_patient_features")
df_silver.printSchema()

In [0]:
from pyspark.sql.functions import when, col

In [0]:
# # Add ai_risk_bucket column to classify patients into High/Medium/Low risk groups based on utilization and inpatient visits
# # Add readmit_status column to label patients as Readmitted or Not Readmitted for easier interpretation



df_gold = (
    df_silver
    .withColumn(
        "ai_risk_bucket",
        when(
            (col("utilization_score") >= 60) | (col("number_inpatient") >= 2),
            "High Risk"
        )
        .when(
            (col("utilization_score") >= 30),
            "Medium Risk"
        )
        .otherwise("Low Risk")
    )
    .withColumn(
        "readmit_status",
        when(col("readmitted") == 1, "Readmitted")
        .otherwise("Not Readmitted")
    )
)

In [0]:
%sql
DROP TABLE IF EXISTS default.gold_patient_features;

In [0]:
%sql
DROP TABLE IF EXISTS default.gold_patient_features;

In [0]:
# Import functions for column operations and data types

from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType, DoubleType

In [0]:
# # Create target label (readmit_30d) for ML classification
# # Cast utilization columns to Integer for safe numeric operations
# # Compute utilization_score as total healthcare usage
# # Add ai_risk_bucket to classify patients into High/Medium/Low risk groups

df_gold_clean = (
    df_silver
    # target label
    .withColumn(
        "readmit_30d",
        when(col("readmitted") == "<30", 1).otherwise(0).cast(IntegerType())
    )

    # utilization metrics (SAFE CAST)
    .withColumn("number_outpatient", col("number_outpatient").cast(IntegerType()))
    .withColumn("number_emergency", col("number_emergency").cast(IntegerType()))
    .withColumn("number_inpatient", col("number_inpatient").cast(IntegerType()))

    # utilization score
    .withColumn(
        "utilization_score",
        (
            col("number_outpatient")
            + col("number_emergency")
            + col("number_inpatient")
        ).cast(IntegerType())
    )

    # AI risk bucket (STRING — NO CAST!)
    .withColumn(
        "ai_risk_bucket",
        when(col("utilization_score") >= 5, "High Risk")
        .when(col("utilization_score") >= 2, "Medium Risk")
        .otherwise("Low Risk")
    )
)

In [0]:
from pyspark.sql.functions import col

# 1️⃣ Silver table load (actual existing table)
df_silver = spark.read.table("silver_patient_features")

# 2️⃣ readmit_30d ensure numeric (এই column আগেই আছে)
df_gold_clean = df_silver.withColumn(
    "readmit_30d",
    col("readmit_30d").cast("int")
)

# 3️⃣ Write GOLD table to correct schema
(
    df_gold_clean
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("default.gold_patient_features")
)

In [0]:
%sql
SELECT * FROM default.gold_readmission_summary_view;

In [0]:
%sql
describe gold_patient_features;

In [0]:
%sql
-- Create or replace gold_patient_features table with risk_bucket classification
CREATE OR REPLACE TABLE gold_patient_features AS
SELECT
  *,
  CASE
    WHEN utilization_score >= 8 OR readmit_30d = 1 THEN 'High Risk'
    WHEN utilization_score BETWEEN 5 AND 7 THEN 'Medium Risk'
    ELSE 'Low Risk'
  END AS risk_bucket
FROM silver_patient_features;