In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
from pyspark.sql.window import Window
from pyspark.sql.functions import current_timestamp, lit
from pyspark.sql.utils import AnalysisException
from datetime import datetime


In [0]:
src_silver_path="/Volumes/customer_360/customer_360_silver/silver_sales_volume"
gold_path="/Volumes/customer_360/customer_360_gold/gold_sales_fact_volume"


In [0]:
spark=SparkSession.builder.appName("read silver data").getOrCreate()
df=spark.read\
    .format("delta")\
    .load(src_silver_path)

In [0]:
if DeltaTable.isDeltaTable(spark, gold_path):
    bronze_table = DeltaTable.forPath(spark, gold_path)
    # Get max data_arrival_timestamp
    max_ts_row = bronze_table.toDF().select(max("data_arrival_timestamp")).collect()[0]
    max_ts = max_ts_row[0]  # None if table is empty
    if max_ts is None:
        print("e table is empty. Will load all records.")
else:
    print(" table not found. Will load all records.")
    max_ts = None  # first load

# Filter source for incremental load
if max_ts:
    df = df.filter(col("data_arrival_timestamp") > max_ts)
else:
    df = df # first load, take all records

print(f"Number of records to load: {df.count()}")

In [0]:
df.display()

In [0]:
df = df \
    .withColumn("order_quarter", quarter("order_date")) \
    .withColumn("order_week", weekofyear("order_date")) \
    .withColumn("day_of_week", dayofweek("order_date")) 

In [0]:

df = df.withColumn(
    "is_weekend",
    when(col("day_of_week").isin([1, 7]), True).otherwise(False)
)

In [0]:
df = df.withColumn(
    "avg_price_per_unit",
    col("sales") / col("quantity")
)

In [0]:
high_value_threshold = 500  # Example: adjust as per business rule

df = df.withColumn(
    "high_value_order",
    when(col("sales") >= high_value_threshold, True).otherwise(False)
)


In [0]:
df = df.withColumn("order_size_category", 
                   when(col("sales") > 1000, "Large")
                   .when(col("sales") > 500, "Medium")
                   .otherwise("Small"))

In [0]:
df = df.withColumn("discount_amount", col("sales") * col("discount"))

In [0]:
df = df.withColumn(
    "shipping_days",
    (unix_date(col("ship_date")) - unix_date(col("order_date")))
)

In [0]:
df = df.withColumn("profit_margin", col("profit") / col("sales"))
df = df.withColumn("profitable_order", when(col("profit") > 0, True).otherwise(False))

In [0]:
df = df.withColumn(
    "profitability_category",
    when(col("profit_margin") > 0.3, "High")
    .when(col("profit_margin") > 0.1, "Medium")
    .otherwise("Low")
)

In [0]:
df = df.withColumn("days_since_order", datediff(current_date(), col("order_date")))

In [0]:
customer_window = Window.partitionBy("customer_id").orderBy("order_date").rowsBetween(Window.unboundedPreceding, 0)

df = df.withColumn("cumulative_sales_per_customer", sum("sales").over(customer_window))
df = df.withColumn("cumulative_profit_per_customer", sum("profit").over(customer_window))


In [0]:
df = df\
    .withColumn("start_date", current_timestamp()) \
    .withColumn("end_date", lit(None).cast(TimestampType())) \
    .withColumn("is_active", lit(True))

In [0]:
df.display()

In [0]:
df.write\
    .format("delta")\
    .mode("append")\
    .option("mergeSchema", "true")\
    .partitionBy("order_year", "order_month")\
    .save(gold_path)



In [0]:
records_count = df.count()

# max timestamp (only if rows exist)
max_data_ts_row = (
    df.select(max("data_arrival_timestamp")).collect()[0][0]
    if records_count > 0
    else None
)

# Use Python datetime for load_time
load_time = datetime.now()

# Define schema explicitly
schema = StructType([
    StructField("layer", StringType(), True),
    StructField("table_name", StringType(), True),
    StructField("load_time", TimestampType(), True),
    StructField("records_loaded", LongType(), True),
    StructField("max_data_timestamp", TimestampType(), True)
])

# Prepare audit data (even if 0 rows)
data = [("gold", "gold_sales", load_time, records_count, max_data_ts_row)]

# Create DataFrame
df_audit = spark.createDataFrame(data, schema)

# Append to audit table
df_audit.write.format("delta") \
    .mode("append") \
    .save("/Volumes/customer_360/audit/audit_volume/etl_audit")

print(f"Audit log updated successfully. Records loaded: {records_count}")