#Exploration of Delta Lake

In [0]:
import pyspark
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Initialize Spark Session with Delta Lake support
spark = (SparkSession.builder
    .appName("DeltaLakeExploration")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate())


In [0]:

# Define Delta Lake path
delta_table_path = "/tmp/delta_table"

# Create a simple DataFrame
data = [(1, "Alice", 29), (2, "Bob", 34), (3, "Charlie", 23)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)

# Write DataFrame to Delta format
df.write.format("delta").mode("overwrite").save(delta_table_path)



In [0]:

# Read Delta Table
delta_df = spark.read.format("delta").load(delta_table_path)
delta_df.show()


+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  3|Charlie| 23|  null|
|  1|  Alice| 29|  null|
|  2|    Bob| 34|  null|
+---+-------+---+------+



In [0]:
# Append new data
new_data = [(4, "David", 45), (5, "Emma", 31)]
new_df = spark.createDataFrame(new_data, columns)
new_df.write.format("delta").mode("append").save(delta_table_path)

In [0]:
# Read updated table
spark.read.format("delta").load(delta_table_path).show()

+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  3|Charlie| 23|  null|
|  1|  Alice| 29|  null|
|  4|  David| 45|  null|
|  5|   Emma| 31|  null|
|  2|    Bob| 34|  null|
+---+-------+---+------+



In [0]:

# Time Travel: View previous versions of data
from delta.tables import *
delta_table = DeltaTable.forPath(spark, delta_table_path)
print("Available Versions:")
spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`").show()

Available Versions:
+-------+-------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|          userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      7|2025-03-13 14:07:42|4022047655635652|uninterested.cent...|    WRITE|{mode -> Append, ...|null|{3494345668537905}|0313-140107-z6ah7unc|          6|WriteSerializable|         true|{numFiles -> 2, n...|        null|Databricks-Runtim...|
|      6

In [0]:
# Read an older version
time_travel_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
time_travel_df.show()

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 23|
|  1|  Alice| 29|
|  2|    Bob| 34|
+---+-------+---+



In [0]:

# Schema Evolution: Add a new column
new_schema_data = [(6, "Frank", 39, "M")]
new_columns = ["id", "name", "age", "gender"]
new_schema_df = spark.createDataFrame(new_schema_data, new_columns)
new_schema_df.write.format("delta").mode("append").option("mergeSchema", "true").save(delta_table_path)

In [0]:

# Read the updated schema
delta_df_updated = spark.read.format("delta").load(delta_table_path)
delta_df_updated.show()

+---+-------+---+------+
| id|   name|age|gender|
+---+-------+---+------+
|  6|  Frank| 39|     M|
|  3|Charlie| 23|  null|
|  1|  Alice| 29|  null|
|  4|  David| 45|  null|
|  5|   Emma| 31|  null|
|  2|    Bob| 34|  null|
+---+-------+---+------+



In [0]:
# Data Compaction: Optimize and Vacuum
delta_table.optimize()
delta_table.vacuum(retentionHours=168)  # Clean old versions after 7 days

Out[14]: DataFrame[]

In [0]:

# Concurrency Control: MERGE INTO for Upserts
merge_data = [(1, "Alice", 30), (7, "Grace", 27)]
merge_df = spark.createDataFrame(merge_data, columns)

delta_table.alias("t").merge(
    merge_df.alias("s"), "t.id = s.id"
).whenMatchedUpdate(set={"age": "s.age"}) \
.whenNotMatchedInsert(values={"id": "s.id", "name": "s.name", "age": "s.age"}) \
.execute()

display(spark.read.format("delta").load(delta_table_path))


id,name,age,gender
6,Frank,39,M
7,Grace,27,
1,Alice,30,
3,Charlie,23,
4,David,45,
5,Emma,31,
2,Bob,34,


In [0]:
# Z-Ordering for Query Optimization
spark.sql(f"OPTIMIZE delta.`{delta_table_path}` ZORDER BY id")
display(spark.read.format("delta").load(delta_table_path))

id,name,age,gender
6,Frank,39,M
7,Grace,27,
1,Alice,30,
3,Charlie,23,
4,David,45,
5,Emma,31,
2,Bob,34,


In [0]:
# Audit Logging & History Tracking
display(spark.sql(f"SELECT * FROM delta.`{delta_table_path}` VERSION AS OF 0"))


id,name,age
3,Charlie,23
1,Alice,29
2,Bob,34
