setup

In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')
import time

import os
import findspark

# Set environment variables (local paths)
os.environ["JAVA_HOME"] = "D:/Programs/Java"
os.environ["HADOOP_HOME"] = "D:/Programs/hadoop"
os.environ["SPARK_HOME"] = "D:/Programs/spark/spark-3.5.6-bin-hadoop3"  # Adjust if different

# Initialize findspark
findspark.init("D:/Programs/spark/spark-3.5.6-bin-hadoop3")

# Create Spark session
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType

In [2]:
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
import deltalake
import levi

### **Step 1: Setup Spark with Delta Lake**

👉 We create a **Spark session with Delta extensions**.

- This allows Spark to understand Delta’s ACID transactions.
- We also enable the `low shuffle` optimization so MERGE will avoid unnecessary shuffling of data.

In [3]:
builder = SparkSession \
    .builder \
    .appName("DeltaLake Spark Session") \
    .master("local[4]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.merge.enableLowShuffle", "true")  # optimization flag

spark = configure_spark_with_delta_pip(builder).getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

spark

### **Step 2: Create the Target Table (Big Transactions Data)**

👉 We generate **5 million transaction records** spanning years `2019–2024`.

Each row has:

- `txn_id` (unique transaction ID)
- `year` (for partitioning)
- `user_id` (customer ID)
- `amount` (transaction amount)

👉 We save it as a **partitioned Delta table**.

- Partitioning by `year` means Spark can **prune partitions** (scan only the years we need during MERGE).

In [4]:
import random
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Generate target data (big dataset)
target_data = [(i, random.randint(2019, 2024), f"user_{random.randint(1, 1000)}", random.randint(10, 1000))
               for i in range(1, 5_000_001)]  # 5 million rows

target_schema = StructType([
    StructField("txn_id", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("user_id", StringType(), True),
    StructField("amount", IntegerType(), True)
])

target_df = spark.createDataFrame(target_data, schema=target_schema)

# Save as partitioned Delta table
target_path = "D:/Internship/delta-lake/mergeTableData"
(target_df.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year")
    .save(target_path))


### **Step 3: Create the Source Table (Daily Updates)**

👉 We create **100k updates** for the year **2024**.

- Some of these `txn_id`s already exist → they will be updated.
- Some are new → they will be inserted.

👉 Save this as another **Delta table** representing new incoming data.

In [5]:
source_data = [(i, 2024, f"user_{random.randint(1, 1000)}", random.randint(500, 2000))
               for i in range(4_900_001, 5_000_001)]  # 100k rows

source_df = spark.createDataFrame(source_data, schema=target_schema)

source_path = "D:/Internship/delta-lake/mergeTableData2"
(source_df.write
    .format("delta")
    .mode("overwrite")
    .save(source_path))


### **Step 4: Perform MERGE**

👉 We run `MERGE INTO`:

- **Match condition:** `txn_id` (primary key)
- **If matched:** update all columns
- **If not matched:** insert as new row

This simulates an **upsert** (update + insert).

With **low shuffle enabled**, Spark will only shuffle the subset of data it really needs.

### **Step 5: Measure Performance**

👉 We compare **execution time** with and without low shuffle.

This shows how much faster MERGE becomes when shuffle is reduced.

In [None]:
import time

from delta.tables import DeltaTable

# Load Delta target table
deltaTable = DeltaTable.forPath(spark, target_path)

# Load updates
updatesDF = spark.read.format("delta").load(source_path)

# Disable low shuffle
spark.conf.set("spark.databricks.delta.merge.enableLowShuffle", "false")

start = time.time()
deltaTable.alias("target").merge(
    updatesDF.alias("source"),
    "target.txn_id = source.txn_id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()
print("Merge without Low Shuffle took:", time.time() - start, "seconds")

Merge without Low Shuffle took: 20.686091423034668 seconds


In [None]:
import time

# Enable low shuffle
spark.conf.set("spark.databricks.delta.merge.enableLowShuffle", "true")

start = time.time()
deltaTable.alias("target").merge(
    updatesDF.alias("source"),
    "target.txn_id = source.txn_id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()
print("Merge with Low Shuffle took:", time.time() - start, "seconds")

Merge with Low Shuffle took: 11.725181818008423 seconds


### **Step 7: Verify Final Data**

👉 Finally, we check that:

- Updates for 2024 were applied.
- New rows got inserted.

In [9]:
final_df = spark.read.format("delta").load(target_path)
final_df.filter(col("year") == 2024).show(10, truncate=False)

+-------+----+--------+------+
|txn_id |year|user_id |amount|
+-------+----+--------+------+
|3749891|2024|user_635|380   |
|3749901|2024|user_196|26    |
|3749931|2024|user_158|449   |
|3749932|2024|user_798|276   |
|3749942|2024|user_13 |517   |
|3749943|2024|user_510|808   |
|3749955|2024|user_863|759   |
|3750030|2024|user_320|317   |
|3750064|2024|user_541|884   |
|3750107|2024|user_561|774   |
+-------+----+--------+------+
only showing top 10 rows

