# Load Silver Table to Gold Table - Product

## Overview
Load Product data from Silver lakehouse table to Gold lakehouse table.

## Data Flow
- **Source**: MAAG_LH_Silver.shared.product (Silver lakehouse table)
- **Target**: MAAG_LH_Gold.shared.product (Gold lakehouse - attached as default)
- **Process**: Read Silver table, apply transformations, load to Gold Delta table

---

In [None]:
# STEP 1: Import Libraries and test source data 

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, sum as spark_sum, current_timestamp
import os

# Configuration - Silver to Gold data flow
WORKSPACE_NAME = "Fabric_MAAG"
SOURCE_LAKEHOUSE_NAME = "MAAG_LH_Silver"
SOURCE_SCHEMA = "shared"
SOURCE_TABLE = "product"

# Source: Absolute path to Silver lakehouse table
SOURCE_TABLE_PATH = f"abfss://{WORKSPACE_NAME}@onelake.dfs.fabric.microsoft.com/{SOURCE_LAKEHOUSE_NAME}.Lakehouse/Tables/{SOURCE_SCHEMA}/{SOURCE_TABLE}"

# Target: Gold lakehouse (attached as default)
TARGET_SCHEMA = "shared"
TARGET_TABLE = "product_test"
TARGET_FULL_PATH = f"{TARGET_SCHEMA}.{TARGET_TABLE}"

print(f"🔄 Loading Product from Silver to Gold")
print(f"📂 Source: {SOURCE_TABLE_PATH}")
print(f"🎯 Target: {TARGET_FULL_PATH}")
print("="*50)

# Read from Silver lakehouse table
df = spark.read.format("delta").load(SOURCE_TABLE_PATH)

print(f"✅ Data loaded from Silver table")
print(f"📊 Records: {df.count()}")
print(f"📋 Columns: {df.columns}")

# Display sample data
print(f"\n📖 Sample data from Silver:")
df.show(5, truncate=False)

In [None]:
# STEP 2: Transformations for Gold layer

# Apply Gold layer transformations and data quality
print(f"🔧 Applying Gold layer transformations...")

# Add audit columns for Gold layer
df_gold = df.withColumn("GoldLoadTimestamp", current_timestamp())

# Data quality checks for Gold layer
print(f"\n🔍 Gold layer data quality validation...")

# Check for duplicates
duplicate_count = df_gold.groupBy("ProductID").count().filter(col("count") > 1).count()
if duplicate_count > 0:
    print(f"⚠️ Found {duplicate_count} duplicate ProductIDs")
else:
    print(f"✅ No duplicates found")

# Check for nulls in key fields (ProductID, ProductName)
null_checks = df_gold.select(
    spark_sum(col("ProductID").isNull().cast("int")).alias("null_ids"),
    spark_sum(col("ProductName").isNull().cast("int")).alias("null_names")
).collect()[0]

if null_checks["null_ids"] > 0 or null_checks["null_names"] > 0:
    print(f"⚠️ Found nulls: IDs={null_checks['null_ids']}, Names={null_checks['null_names']}")
else:
    print(f"✅ No nulls in key fields")

# Data mapping: Ensure columns match Gold schema (case and type)
gold_columns = [
    "ProductID", "ProductName", "ProductDescription", "BrandName", "ProductNumber", "Color", "ProductModel",
    "ProductCategoryID", "CategoryName", "ListPrice", "StandardCost", "Weight", "WeightUom",
    "ProductStatus", "CreatedDate", "SellStartDate", "SellEndDate", "IsoCurrencyCode", "UpdatedDate", "CreatedBy", "UpdatedBy"
]

# Rename columns if needed (example: 'Name' -> 'ProductName')
if "Name" in df_gold.columns and "ProductName" not in df_gold.columns:
    df_gold = df_gold.withColumnRenamed("Name", "ProductName")

# Add missing columns with default values
from pyspark.sql import functions as F
for col_name in gold_columns:
    if col_name not in df_gold.columns:
        df_gold = df_gold.withColumn(col_name, F.lit(None))

# Select and order columns to match Gold schema
df_gold = df_gold.select(gold_columns + [c for c in df_gold.columns if c not in gold_columns])

# Cast columns to correct types (example for key columns)
from pyspark.sql.types import StringType, DecimalType, DateType

# Cast all columns to match Gold schema
df_gold = (
    df_gold
    .withColumn("ProductID", col("ProductID").cast(StringType()))
    .withColumn("ProductName", col("ProductName").cast(StringType()))
    .withColumn("ProductDescription", col("ProductDescription").cast(StringType()))
    .withColumn("BrandName", col("BrandName").cast(StringType()))
    .withColumn("ProductNumber", col("ProductNumber").cast(StringType()))
    .withColumn("Color", col("Color").cast(StringType()))
    .withColumn("ProductModel", col("ProductModel").cast(StringType()))
    .withColumn("ProductCategoryID", col("ProductCategoryID").cast(StringType()))
    .withColumn("CategoryName", col("CategoryName").cast(StringType()))
    .withColumn("ListPrice", col("ListPrice").cast(DecimalType(18,2)))
    .withColumn("StandardCost", col("StandardCost").cast(DecimalType(18,2)))
    .withColumn("Weight", col("Weight").cast(DecimalType(18,3)))
    .withColumn("WeightUom", col("WeightUom").cast(StringType()))
    .withColumn("ProductStatus", col("ProductStatus").cast(StringType()))
    .withColumn("CreatedDate", col("CreatedDate").cast(DateType()))
    .withColumn("SellStartDate", col("SellStartDate").cast(DateType()))
    .withColumn("SellEndDate", col("SellEndDate").cast(DateType()))
    .withColumn("IsoCurrencyCode", col("IsoCurrencyCode").cast(StringType()))
    .withColumn("UpdatedDate", col("UpdatedDate").cast(DateType()))
    .withColumn("CreatedBy", col("CreatedBy").cast(StringType()))
    .withColumn("UpdatedBy", col("UpdatedBy").cast(StringType()))
)

print(f"\n📖 Sample Gold data:")
df_gold.show(5, truncate=False)

In [None]:
# STEP 3 Load data to Gold table
print(f"💾 Loading data to Gold table: {TARGET_FULL_PATH}")

# Troubleshooting: Check if DataFrame is empty before writing
print(f"✅ Gold DataFrame record count: {df_gold.count()}")
df_gold.show(5, truncate=False)

try:
    # Write to Gold Delta table (default lakehouse)
    # Use "overwrite" mode to ensure data is written if table is empty or append fails
    df_gold.write \
      .format("delta") \
      .mode("overwrite") \
      .saveAsTable(TARGET_FULL_PATH)

    print(f"✅ Data loaded successfully to Gold table")
    print(f"🎉 Silver to Gold data load complete!")
    print(f"✅ All steps completed successfully.")

except Exception as e:
    print(f"❌ Error loading data to Gold table: {str(e)}")
    raise