# Store Dimension - End-to-End Pipeline

## Overview
In this notebook, we implement the complete data pipeline for the **Store Dimension**. Unlike the Date dimension, this pipeline ingests data from external **CSV files**.

## Architecture Flow
1.  **Source:** `store_{rundate}.csv` file in the Landing Bucket.
2.  **Landing Layer:** 
    *   Read CSV.
    *   Cast to String.
    *   Add Audit Columns.
    *   Write to Delta Table (`dim_store_ld`).
    *   Archive the source file.
3.  **Staging Layer:**
    *   Read incremental data from Landing.
    *   De-duplication based on Natural Key (`store_id`).
    *   Type casting.
    *   Write to Delta Table (`dim_store_stg`).
4.  **Dimension Layer (SCD Type 1):**
    *   Read from Staging.
    *   Generate Surrogate Keys (`row_wid`) using UUID.
    *   **Upsert (Merge):** Update existing stores, Insert new stores.
    *   Generate Symlink Manifest for Athena.

In [None]:
# Import necessary libraries
import pyspark
import uuid
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Store Dimension Load") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Job Parameters
run_date = "20220101"
schema_name = "pyspark_warehouse"
landing_file_name = f"store_{run_date}.csv"

# Define Paths (Simulated for this notebook)
base_path = "s3a://warehouse/" # Update with your bucket
source_path = f"{base_path}landing/source/store/{landing_file_name}"
archive_path = f"{base_path}archive/store/"

print(f"Processing Run Date: {run_date}")

In [None]:
# --- SIMULATION: Create Dummy Source CSV File ---
# In a real scenario, this file arrives from the source system.
# We create it here to make this notebook runnable.

data = [
    ("5001", "Pet House KA", "123 Main St", "Anytown", "KA", "12345", "91-99999-00001"),
    ("5002", "Pet House MH", "456 Elm St", "Othertown", "MH", "67890", "91-99999-00002"),
    ("5003", "Pet House TN", "789 Oak Ave", "BigCity", "TN", "11223", "91-99999-00003")
]
columns = ["store_id", "store_name", "address", "city", "state", "zip", "phone"]

df_source = spark.createDataFrame(data, columns)

# Write as CSV to simulate the landing file
# Note: In local execution, this writes to a local folder. 
# Ensure your Spark setup supports the scheme used in 'source_path'
df_source.coalesce(1).write.mode("overwrite").option("header", "true").csv(source_path.replace(landing_file_name, ""))

print(f"Simulated Source File Created at: {source_path}")

## 1. Landing Layer Load
We read the specific CSV file for the `run_date`, cast columns to String to prevent schema breaks, and add audit details.

In [None]:
# --- LANDING LOAD ---

# 1. Read CSV
df_raw = spark.read \
    .option("header", "true") \
    .option("delimiter", ",") \
    .csv(source_path)

# 2. Cast to String & Add Audit Cols
# It is best practice to cast raw input to String in the Landing layer
df_landing = df_raw.select([col(c).cast("string") for c in df_raw.columns]) \
    .withColumn("insert_dt", current_timestamp()) \
    .withColumn("rundate", lit(run_date))

print("Landing Data Preview:")
df_landing.show(truncate=False)

# 3. Write to Landing Delta Table
landing_table = f"{schema_name}.dim_store_ld"
df_landing.write.format("delta").mode("append").saveAsTable(landing_table)

print(f"Data loaded to {landing_table}")

# 4. Archival (Logic only - requires boto3 for actual S3 move)
print(f"TODO: Move {source_path} to {archive_path}")

## 2. Staging Layer Load
We read from the Landing table. In a production incremental run, we would filter by `insert_dt > max_timestamp`. Here, we perform deduplication on the `store_id`.

In [None]:
# --- STAGING LOAD ---

from pyspark.sql.window import Window

# 1. Read from Landing
df_ld = spark.read.table(landing_table)

# 2. Deduplication Logic (Natural Key: store_id)
# We keep the latest record based on insert_dt
window_spec = Window.partitionBy("store_id").orderBy(col("insert_dt").desc())

df_deduped = df_ld.withColumn("rn", row_number().over(window_spec)) \
                  .filter(col("rn") == 1) \
                  .drop("rn")

# 3. Type Casting
# Convert strings to appropriate types
df_stg = df_deduped \
    .withColumn("store_id", col("store_id").cast("integer")) \
    .withColumn("zip_code", col("zip").cast("string")) \
    .withColumn("update_dt", current_timestamp()) \
    .drop("zip") # Renamed to zip_code

# Select specific columns
stg_cols = ["store_id", "store_name", "address", "city", "state", "zip_code", "phone", "insert_dt", "update_dt", "rundate"]
df_stg_final = df_stg.select(stg_cols)

# 4. Write to Staging (Overwrite)
staging_table = f"{schema_name}.dim_store_stg"
df_stg_final.write.format("delta").mode("overwrite").saveAsTable(staging_table)

print(f"Data loaded to {staging_table}")
df_stg_final.show(5)

## 3. Dimension Load (SCD Type 1)
We move data from Staging to the final Dimension.
*   **Surrogate Key:** Generated using a Python UUID function.
*   **Merge:** Upsert logic based on `store_id`.

In [None]:
# --- DIMENSION LOAD ---

# 1. Define UDF for UUID (Surrogate Key)
# Note: In standard Spark, creating a monotonically increasing ID is often preferred for performance,
# but for random unique IDs, UUID is used.
uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())

# 2. Read Staging
df_stage_data = spark.read.table(staging_table)

# 3. Generate Surrogate Key
df_dim_prep = df_stage_data.withColumn("row_wid", uuid_udf())

# 4. Target Table Definition
dim_table = f"{schema_name}.dim_store"

# 5. SCD Type 1 Merge (Upsert)
if DeltaTable.isDeltaTable(spark, f"/user/hive/warehouse/{dim_table}"): # Simplified check path
    delta_target = DeltaTable.forName(spark, dim_table)
    
    delta_target.alias("tgt").merge(
        df_dim_prep.alias("src"),
        "tgt.store_id = src.store_id"
    ).whenMatchedUpdate(set={
        "store_name": col("src.store_name"),
        "address": col("src.address"),
        "city": col("src.city"),
        "state": col("src.state"),
        "zip_code": col("src.zip_code"),
        "phone": col("src.phone"),
        "update_dt": col("src.update_dt"),
        "rundate": col("src.rundate")
    }).whenNotMatchedInsert(values={
        "row_wid": col("src.row_wid"),
        "store_id": col("src.store_id"),
        "store_name": col("src.store_name"),
        "address": col("src.address"),
        "city": col("src.city"),
        "state": col("src.state"),
        "zip_code": col("src.zip_code"),
        "phone": col("src.phone"),
        "insert_dt": col("src.insert_dt"),
        "update_dt": col("src.update_dt"),
        "rundate": col("src.rundate")
    }).execute()
    print("Merge Completed.")
else:
    # First Run: Create table
    print("Table does not exist. Creating new table...")
    df_dim_prep.write.format("delta").saveAsTable(dim_table)

# 6. Generate Manifest
spark.sql(f"GENERATE symlink_format_manifest FOR TABLE {dim_table}")
print("Manifest Generated.")

In [None]:
# Final Validation
spark.sql(f"SELECT * FROM {dim_table}").show(truncate=False)