In [0]:
import pyspark.sql.functions as F
from delta.tables import DeltaTable

In [0]:
df_customers = (
    spark.read.table("samples.tpch.customer")
)

In [0]:
df_orders = (
    spark.read.table("samples.tpch.orders")
)

### Customer who already placed orders

In [0]:
df_customer_placed_order = (
    df_customers.join(
        df_orders, 
        df_customers.c_custkey == df_orders.o_custkey, 
        how="left"
    ).select(df_customers["*"], df_orders.o_orderstatus, df_orders.o_totalprice, df_orders.o_orderdate)
)

In [0]:
df_customer_who_placed_order = (
    df_customer_placed_order.filter(F.col("o_orderstatus").isNotNull())
)

### Customers Placed Orders

In [0]:
df_customer_who_placed_order.display()

### Customers Never placed orders

In [0]:
df_customers_never_orders = (
    df_customers.join(
        df_orders, 
        df_customers.c_custkey == df_orders.o_custkey, 
        how="leftanti"
    )
)

### Deduplicate Records of customers never ordered

In [0]:
df_customers_never_orders_dedup = df_customers_never_orders.dropDuplicates(["c_custkey", "c_name", "c_address"])

In [0]:
if spark.catalog.tableExists("dev.silver.customers_never_ordered"):
    print("ℹ️ Table exists...")
    # Reference the Silver table as a DeltaTable
    silver_table = DeltaTable.forName(spark, "dev.silver.customers_never_ordered")
    print("⏳ Processing the Silver table...")
    # Perform merge
    silver_table.alias("silver").merge(
        df_customers_never_orders_dedup.alias("new"),
        "silver.c_custkey = new.c_custkey",
        "silver.c_name = new.c_name", 
        "silver.c_address = new.c_address"
    ).whenNotMatchedInsertAll().execute()
    print("✅ Merge completed!")

else:
    # Table doesn't exist → create it
    print("⏳ Creating new Silver table...")
    df_customers_never_orders_dedup.write.format("delta") \
        .mode("overwrite").saveAsTable("dev.silver.customers_never_ordered")

    print("✅ Data Ingested Successfully!")    


In [0]:
if spark.catalog.tableExists("dev.silver.customers_placed_orders"):
    print("ℹ️ Table exists...")
    # Reference the Silver table as a DeltaTable
    silver_table = DeltaTable.forName(spark, "dev.silver.customers_placed_orders")
    print("⏳ Processing the Silver table...")
    # Perform merge
    silver_table.alias("silver").merge(
        df_customer_who_placed_order.alias("new"),
        "silver.c_custkey = new.c_custkey",
        "silver.o_orderdate = new.o_orderdate",
        "silver.o_orderstatus = new.o_orderstatus",
        "silver.o_totalprice = new.o_totalprice"
    ).whenNotMatchedInsertAll().execute()
    print("✅ Merge completed!")

else:
    # Table doesn't exist → create it
    print("⏳ Creating new Silver table...")
    df_customers_never_orders_dedup.write.format("delta") \
        .mode("overwrite").saveAsTable("dev.silver.customers_placed_orders")

    print("✅ Data Ingested Successfully!")    
