In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, to_date
from delta.tables import DeltaTable

# Initialize Spark Session
spark = SparkSession.builder.appName("RetailSalesDataPipeline").getOrCreate()

# Sample data for sales_df
sales_data = [
    (1, 101, 1, 50.0, 5.0, "2024-01-01 10:00:00"),
    (2, 102, 2, 150.0, 10.0, "2024-01-02 11:00:00"),
    (3, 103, 1, 200.0, 20.0, "2024-01-03 12:00:00"),
    (4, 101, 3, 75.0, 5.0, "2024-01-04 13:00:00"),
    (5, 104, 4, 125.0, 7.5, "2024-01-05 14:00:00"),
    (6, 105, 5, 250.0, 12.5, "2024-01-06 15:00:00")
]

sales_columns = ["transaction_id", "product_id", "customer_id", "sale_amount", "discount", "sale_date"]
sales_df = spark.createDataFrame(sales_data, sales_columns)
sales_df = sales_df.withColumn("sale_date", to_timestamp("sale_date", "yyyy-MM-dd HH:mm:ss"))

# Sample data for products_df
products_data = [
    (101, "Product A", 50.0, "Category1"),
    (102, "Product B", 150.0, "Category2"),
    (103, "Product C", 200.0, "Category3"),
    (104, "Product D", 125.0, "Category1"),
    (105, "Product E", 250.0, "Category2")
]

products_columns = ["product_id", "product_name", "price", "category"]
products_df = spark.createDataFrame(products_data, products_columns)

# Sample data for customers_df
customers_data = [
    (1, "Alice", "alice@example.com", "2023-12-15"),
    (2, "Bob", "bob@example.com", "2023-11-20"),
    (3, "Charlie", "charlie@example.com", "2023-10-05"),
    (4, "David", "david@example.com", "2023-09-12"),
    (5, "Eve", "eve@example.com", "2023-08-18")
]

customers_columns = ["customer_id", "customer_name", "email", "signup_date"]
customers_df = spark.createDataFrame(customers_data, customers_columns)
customers_df = customers_df.withColumn("signup_date", to_date("signup_date", "yyyy-MM-dd"))

# Data Cleaning
sales_df = sales_df.na.fill({"discount": 0})

# Data Enrichment
enriched_sales_df = sales_df.join(products_df, on="product_id", how="inner").join(customers_df, on="customer_id", how="inner")

# Save the enriched data to Delta Lake
enriched_sales_df.write.format("delta").mode("overwrite").save("/dbfs/delta/sales_enriched")

# Create a managed Delta table
enriched_sales_df.write.format("delta").mode("overwrite").saveAsTable("sales_enriched")
enriched_sales_df.show()




+-----------+----------+--------------+-----------+--------+-------------------+------------+-----+---------+-------------+-------------------+-----------+
|customer_id|product_id|transaction_id|sale_amount|discount|          sale_date|product_name|price| category|customer_name|              email|signup_date|
+-----------+----------+--------------+-----------+--------+-------------------+------------+-----+---------+-------------+-------------------+-----------+
|          1|       103|             3|      200.0|    20.0|2024-01-03 12:00:00|   Product C|200.0|Category3|        Alice|  alice@example.com| 2023-12-15|
|          1|       101|             1|       50.0|     5.0|2024-01-01 10:00:00|   Product A| 50.0|Category1|        Alice|  alice@example.com| 2023-12-15|
|          2|       102|             2|      150.0|    10.0|2024-01-02 11:00:00|   Product B|150.0|Category2|          Bob|    bob@example.com| 2023-11-20|
|          3|       101|             4|       75.0|     5.0|2024

In [0]:
# Sample data for updates_df
updates_data = [
    (1, 101, 1, 60.0, 6.0, "2024-01-01 10:00:00"),  # Update existing transaction
    (7, 106, 3, 300.0, 15.0, "2024-01-07 16:00:00")  # New transaction
]

updates_columns = ["transaction_id", "product_id", "customer_id", "sale_amount", "discount", "sale_date"]
updates_df = spark.createDataFrame(updates_data, updates_columns)
updates_df = updates_df.withColumn("sale_date", to_timestamp("sale_date", "yyyy-MM-dd HH:mm:ss"))

# Perform the merge (upsert) operation
delta_table = DeltaTable.forPath(spark, "/dbfs/delta/sales_enriched")

delta_table.alias("tgt").merge(
    updates_df.alias("src"),
    "tgt.product_id = src.product_id AND tgt.sale_date = src.sale_date"
).whenMatchedUpdate(set={
    "tgt.sale_amount": col("src.sale_amount"),
    "tgt.discount": col("src.discount"),
    "tgt.transaction_id": col("src.transaction_id"),
    "tgt.customer_id": col("src.customer_id")
}).whenNotMatchedInsert(values={
    "product_id": col("src.product_id"),
    "sale_date": col("src.sale_date"),
    "sale_amount": col("src.sale_amount"),
    "discount": col("src.discount"),
    "transaction_id": col("src.transaction_id"),
    "customer_id": col("src.customer_id")
}).execute()

# Show the updated Delta table
updated_df = spark.read.format("delta").load("/dbfs/delta/sales_enriched")
updated_df.show()

+-----------+----------+--------------+-----------+--------+-------------------+------------+-----+---------+-------------+-------------------+-----------+
|customer_id|product_id|transaction_id|sale_amount|discount|          sale_date|product_name|price| category|customer_name|              email|signup_date|
+-----------+----------+--------------+-----------+--------+-------------------+------------+-----+---------+-------------+-------------------+-----------+
|          3|       101|             4|       75.0|     5.0|2024-01-04 13:00:00|   Product A| 50.0|Category1|      Charlie|charlie@example.com| 2023-10-05|
|          4|       104|             5|      125.0|     7.5|2024-01-05 14:00:00|   Product D|125.0|Category1|        David|  david@example.com| 2023-09-12|
|          1|       101|             1|       60.0|     6.0|2024-01-01 10:00:00|   Product A| 50.0|Category1|        Alice|  alice@example.com| 2023-12-15|
|          1|       103|             3|      200.0|    20.0|2024

Explanation of the Merge Operation
Load the Delta Table:

Load the Delta table you want to perform upserts on.
Merge Operation:

Use the merge operation to upsert data from the updates_df DataFrame into the Delta table.
Match Condition: The condition to match rows from the target and source DataFrames ("tgt.product_id = src.product_id AND tgt.sale_date = src.sale_date").
When Matched:

Update: Specify the columns to update in the target DataFrame with values from the source DataFrame.
When Not Matched:

Insert: Specify the columns to insert into the target DataFrame with values from the source DataFrame.