In [22]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('MySparkApp')\
.master('local[2]')\
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

1. What is the difference between Narrow and Wide Transformations?

✅ Narrow Transformations
	•	Only a single partition of data is required to compute the result.
	•	No shuffling of data across partitions.
	•	Examples: map(), filter(), flatMap().

✅ Wide Transformations
	•	Data from multiple partitions may be required.
	•	Involves shuffling (expensive network operation).
	•	Examples: groupByKey(), reduceByKey(), join().

In [None]:
# Narrow Transformation
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())  # Output: [2, 4, 6, 8]

# Wide Transformation (shuffle)
rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 1)])
rdd2 = spark.sparkContext.parallelize([("a", 2), ("b", 2)])
joined = rdd1.join(rdd2)
print(joined.collect())  # Output: [('a', (1, 2)), ('b', (1, 2))]

                                                                                

[2, 4, 6, 8]
[('b', (1, 2)), ('a', (1, 2))]


For output of previous cell:

- The first output [2, 4, 6, 8] shows each element doubled (narrow transformation).
- The second output [('a', (1, 2)), ('b', (1, 2))] shows the result of joining the two RDDs on their keys (wide transformation)

2. How optimization is done in Delta Lake?

Delta Lake provides optimizations like:
	•	Z-Ordering: Organizes data to improve skipping during queries.
	•	Data Skipping: Uses file-level statistics (min/max, null counts).
	•	File Compaction (Optimize): Reduces small files for faster read.
	•	Caching: Frequently accessed data can be cached.

In [27]:
# Create dummy sales_data table

%pip install delta-spark

import pandas as pd

# Create a DataFrame for sales_data
sales_df = spark.createDataFrame([
    (1, "2025-07-10", 100.0),
    (2, "2025-07-11", 150.0),
    (1, "2025-07-12", 200.0),
    (3, "2025-07-13", 120.0),
    (2, "2025-07-14", 180.0)
], ["customer_id", "sale_date", "amount"])

# Save as Delta table
sales_df.write.format("delta").mode("overwrite").saveAsTable("sales_data")

# Create a DataFrame for large_table
large_df = spark.createDataFrame([
    (1, "A"), (2, "B"), (3, "C"), (4, "D"), (5, "E")
], ["id", "value"])

# Save as Delta table
large_df.write.format("delta").mode("overwrite").saveAsTable("large_table")

Note: you may need to restart the kernel to use updated packages.


Py4JJavaError: An error occurred while calling o732.saveAsTable.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Make sure the provider name is correct and the package is properly registered and compatible with your Spark version. SQLSTATE: 42K02
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:681)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:740)
	at org.apache.spark.sql.classic.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:626)
	at org.apache.spark.sql.classic.DataFrameWriter.saveAsTable(DataFrameWriter.scala:436)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$6(DataSource.scala:665)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:665)
	at scala.util.Failure.orElse(Try.scala:230)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:665)
	... 15 more


In [None]:
%sql

-- Z-Ordering
OPTIMIZE sales_data ZORDER BY (customer_id)

-- File compaction
OPTIMIZE large_table

Full Load vs Incremental Load

🔁 Full Load
	•	Definition: Reloading all data from the source into the destination each time.
	•	When used:
	•	Small datasets
	•	No way to identify changes
	•	When simplicity is more important than efficiency

Pros:
	•	Simple to implement
	•	No risk of missing updates

Cons:
	•	High compute and storage cost
	•	Wipes target table each time (risk of data loss if not done right)
	•	Not scalable for large datasets

⸻

📥 Incremental Load
	•	Definition: Only new or changed records (delta) are extracted and loaded.

Two common ways:
	1.	Timestamp-based: Use last_updated_timestamp column
	2.	CDC (Change Data Capture): Detect inserts, updates, deletes (requires tracking)

Pros:
	•	Efficient and faster
	•	Scalable for large data
	•	Low resource consumption

Cons:
	•	Needs logic to track changes
	•	Risk of missing changes if timestamps aren’t reliable

⸻

🔧 2. PySpark Code: Fetch Only New or Changed Records (Incremental Load)

Let’s assume you have:
	•	source_table: a raw table with a last_updated_ts column.
	•	target_table: the curated/staging Delta table.
	•	Goal: Append only new/updated records to the target table.

⸻

🛠️ Step-by-step with PySpark + Delta (Recommended Method)

In [None]:
from pyspark.sql.functions import col
from delta.tables import DeltaTable

# Define paths or tables
source_path = "dbfs:/mnt/raw/source_table"
target_path = "dbfs:/mnt/curated/target_table"

# Load source data
df_source = spark.read.format("delta").load(source_path)

# Load target delta table
if DeltaTable.isDeltaTable(spark, target_path):
    delta_target = DeltaTable.forPath(spark, target_path)

    # Get latest timestamp from target
    latest_ts = delta_target.toDF().agg({"last_updated_ts": "max"}).collect()[0][0]

    # Filter only new or updated records from source
    df_incremental = df_source.filter(col("last_updated_ts") > latest_ts)

    # Merge into target table (UPSERT: insert new, update changed)
    delta_target.alias("target").merge(
        df_incremental.alias("source"),
        "target.primary_key = source.primary_key"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

else:
    # If target doesn't exist, just write full load once
    df_source.write.format("delta").mode("overwrite").save(target_path)

✅ Benefits of This Approach:
	•	Uses MERGE for efficient upsert (no duplicate entries)
	•	Works on large datasets using Delta Lake optimization
	•	Uses only new records with last_updated_ts (incremental)
	•	Scalable for batch or scheduled workflows

🔍 Optimizations for Performance:
Strategy
Description
✅ Z-Ordering - Helps speed up queries on last_updated_ts or primary_key
✅ Partitioning by Date - Use partitionBy("year", "month") on write
✅ Auto Optimize + Optimize - Enable on Delta for faster reads/writes
✅ Caching - Use .cache() wisely when doing multiple operations