In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff, date_format, when, sum as spark_sum, avg, count, expr, coalesce

In [0]:
RAW_CATALOG = "samples"
RAW_SCHEMA = "tpch"


TARGET_CATALOG = "bootcamp_training" 
TARGET_SCHEMA = "session2" 
FINAL_TABLE = "unified_sales_fact" 
AGG_FINAL_TABLE = "unified_sales_fact_agg"

FULL_TARGET_PATH = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{FINAL_TABLE}"
FULL_TARGET_AGG_PATH = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{AGG_FINAL_TABLE}"


#spark.sql(f"CREATE CATALOG IF NOT EXISTS {TARGET_CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {TARGET_CATALOG}.{TARGET_SCHEMA}")

# --- Direct Read from Source Tables (No Staging) ---
print(f"Reading TPC-H tables directly from {RAW_CATALOG}.{RAW_SCHEMA}...")

# Define table paths using the three-part notation (standard read access)
def get_table(name):
    # Reads from the system catalog 'samples'
    return spark.table(f"{RAW_CATALOG}.{RAW_SCHEMA}.{name}")

orders_df = get_table("orders")
lineitem_df = get_table("lineitem")
customer_df = get_table("customer")
region_df = get_table("region")
nation_df = get_table("nation")
part_df = get_table("part")
supplier_df = get_table("supplier")


# -------------------------------------------------------------------------
# UNIFIED GOLD FACT TABLE: AGGREGATED SALES FACT
# -------------------------------------------------------------------------

print(f"\n1. Creating Aggregated Final Fact Table: {FULL_TARGET_PATH}...")

# 2. Perform comprehensive joins (Fact to all Dimensions)
unified_fact_base = lineitem_df.alias("li").join(
    orders_df.alias("o"),
    col("li.l_orderkey") == col("o.o_orderkey"),
    "inner"
).join(
    customer_df.alias("c"),
    col("o.o_custkey") == col("c.c_custkey"),
    "inner"
).join(
    nation_df.alias("cn"), 
    col("c.c_nationkey") == col("cn.n_nationkey"),
    "inner"
).join(
    region_df.alias("cr"), 
    col("cn.n_regionkey") == col("cr.r_regionkey"),
    "inner"
).join(
    part_df.alias("p"),
    col("li.l_partkey") == col("p.p_partkey"),
    "inner"
).join(
    supplier_df.alias("s"),
    col("li.l_suppkey") == col("s.s_suppkey"),
    "inner"
)

# 3. Define the grouping columns
GROUPING_COLS = [
    col("o.o_orderdate").alias("Order_Date"),
    col("c.c_custkey").alias("Customer_Key"),
    col("c.c_mktsegment").alias("Customer_Market_Segment"),
    col("cn.n_name").alias("Customer_Nation"),
    col("cr.r_name").alias("Customer_Region"),
    col("p.p_brand").alias("Part_Brand"),
    col("p.p_type").alias("Part_Type"),
    col("s.s_suppkey").alias("Supplier_Key"),
    col("s.s_nationkey").alias("Supplier_Nation_Key"),
    col("c.c_acctbal").alias("Customer_Account_Balance")
]

# 4. Apply Aggregation
final_reporting_df = unified_fact_base.groupBy(*GROUPING_COLS).agg(
    
    spark_sum(col("li.l_quantity")).alias("Total_Quantity_Sold"),
    spark_sum(col("li.l_extendedprice")).alias("Total_Extended_Price"),
    
    # Calculate Net Revenue at the group level
    spark_sum(col("li.l_extendedprice") * (1 - col("li.l_discount"))).alias("Net_Revenue"),
    
    # Count of unique orders
    count(col("o.o_orderkey")).alias("Order_Line_Count")
)

# 5. Final Select (Applying derived metrics and the date filter)
final_reporting_df = final_reporting_df.select(
    "*", 
    when(col("Customer_Account_Balance") > 5000, "High").otherwise("Standard").alias("Customer_Value_Flag")
)

# --- Apply the specific date filter you requested ---
# Note: The date filter is applied *after* the initial aggregation by Order_Date.
final_reporting_df = final_reporting_df.filter(col("Order_Date") == "1998-08-02")

# 6. Write the single final table to the Gold layer (using catalog.schema.table)
final_reporting_df.write.mode("overwrite").saveAsTable(
    FULL_TARGET_PATH
)

print(f"--- SUCCESS: Final Table Created ---")
print(f"Table Name: {FULL_TARGET_PATH}")


In [0]:
final_reporting_df.createOrReplaceTempView("final_reporting_df")
Final_df = spark.sql("SELECT Customer_Region, count(distinct Customer_Key) as Customer_Count FROM final_reporting_df GROUP BY Customer_Region")

#FULL_TARGET_AGG_PATH = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{AGG_FINAL_TABLE}"

Final_df.write.mode("overwrite").saveAsTable(
    FULL_TARGET_AGG_PATH
)

print(f"--- SUCCESS: Final AGG Table Created ---")
print(f"Table Name: {FULL_TARGET_AGG_PATH}")


In [0]:
Final_df.coalesce(1).write.mode("overwrite").format("csv").option("header", True).save("/Volumes/bootcamp_training/session2/tbch_gold/final_reporting_df.csv")

In [0]:
final_reporting_df.coalesce(1).write.mode("overwrite").format("csv").option("header", True).save("/Volumes/bootcamp_training/session2/tbch_gold/final_reporting_df_without_agg.csv")