In [0]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Initialize Spark session
spark = SparkSession.builder.appName("Silver Layer Processing").getOrCreate()

# Snowflake connection options
sfOptions = {
    "sfURL": "https://wsxmajg-ms55786.snowflakecomputing.com",
    "sfUser": "KAICY3108",
    "sfPassword": "Kaicy@11122021",
    "sfDatabase": "AARAMBH",
    "sfSchema": "BRONZE",
    "sfWarehouse": "COMPUTE_WH"
}

# Snowflake read function
def read_from_snowflake(table_name):
    return spark.read \
        .format("snowflake") \
        .options(**sfOptions) \
        .option("dbtable", table_name) \
        .load()

# Read Bronze tables
demographic_df = read_from_snowflake("bronze.demographic")
credit_bureau_df = read_from_snowflake("bronze.credit_bureau")
offer_df = read_from_snowflake("bronze.offer")

# Data Cleaning
demographic_df = demographic_df.dropDuplicates(['cust_id']).fillna({'age': -1, 'name': 'Unknown'})
credit_bureau_df = credit_bureau_df.dropDuplicates(['cust_id']).fillna({'credit_score': -1, 'income': 0})
offer_df = offer_df.dropDuplicates(['customer_id']).fillna({'offer_id': 'Unknown'})

# Rename customer_id to cust_id in offer_df
offer_df = offer_df.withColumnRenamed("customer_id", "cust_id")

# Join DataFrames
final_df = demographic_df.join(credit_bureau_df, 'cust_id', 'inner') \
                         .join(offer_df, 'cust_id', 'inner') \
                         .select('cust_id', 'name', 'age', 'credit_score', 'income', 'offer_type', 'offer_status')

# Add Calculated Columns
final_df = final_df.withColumn("income_category", when(final_df.income > 50000, "High")
                                               .when(final_df.income > 20000, "Medium")
                                               .otherwise("Low"))

# Display final DataFrame
display(final_df)

# Snowflake write function
def write_to_snowflake(df, table_name):
    df.write \
        .format("snowflake") \
        .options(**sfOptions) \
        .option("dbtable", table_name) \
        .mode("overwrite") \
        .save()

# Switch schema to SILVER for loading
sfOptions["sfSchema"] = "SILVER"

# Load cleaned data into Bronze layer
write_to_snowflake(final_df, "silver.silver_cleaned_data")

# Read back from Silver layer for verification
silver_df = read_from_snowflake("silver.silver_cleaned_data")
display(silver_df)