# Delta Lake Optimization Project

This notebook provides a hands-on guide to optimizing Delta Lake tables in Databricks. We will explore and compare various optimization techniques using synthetically generated sales data.

**Learning Objectives:**

- Understand and implement table partitioning.
- Apply Z-Ordering to improve data skipping.
- Perform manual and automatic compaction to optimize file sizes.
- Utilize Liquid Clustering for flexible data layout.
- Use the VACUUM command to manage table storage.
- Analyze Spark UI to observe execution plans and performance improvements.

**Setup:**
First, we'll define the catalog and schema for our project. Make sure you have the necessary permissions to create catalogs and schemas in your Databricks workspace.

In [None]:
# Configuration
CATALOG_NAME = "delta_optimization_project"
SCHEMA_NAME = "sales_data"

# Create the catalog and schema if they don't exist
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME}")
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME}")
spark.sql(f"USE SCHEMA {SCHEMA_NAME}")

## Step 1: Generate Synthetic Sales Data

We'll start by creating a synthetic dataset of sales transactions. This data will be the foundation for our optimization experiments. We'll include a variety of data types and cardinalities to make our tests realistic.

In [None]:
# Generate synthetic data at scale using Spark (executor-friendly, no pandas/Faker)
import pyspark.sql.functions as F

# Number of records to generate (adjust for your cluster)
num_records = 5_000_000

# Controls the number of output files (one file per output partition, roughly).
# Increase partitions to create more small files; decrease to reduce file count.
num_partitions = 1000

# Small list of countries used for sampling (kept on driver only)
countries = ['United States','United Kingdom','Germany','France','Canada','Australia','India','Brazil','Japan','Netherlands']

# Build the DataFrame using Spark primitives (scales across executors)
df = (
    spark.range(num_records)
    .repartition(num_partitions)
    .withColumnRenamed('id', 'seq')
    .withColumn(
        'transaction_id',
        F.concat(
            F.lit('txn_'),
            F.col('seq').cast('string'),
            F.lit('_'),
            # cast the double returned by rand() to string before md5 to avoid datatype mismatch
            F.substring(F.md5(F.rand(12345).cast('string')), 1, 8)
        )
    )
    .withColumn('customer_id', (F.floor(F.rand(42) * 1001) + 1000).cast('int'))
    .withColumn('product_id', (F.floor(F.rand(99) * 401) + 100).cast('int'))
    .withColumn('sale_date', F.date_sub(F.current_date(), F.floor(F.rand(7) * 730).cast('int')))
    .withColumn('quantity', (F.floor(F.rand(11) * 10) + 1).cast('int'))
    .withColumn('unit_price', F.round(F.rand(13) * 190.0 + F.lit(10.5), 2))
    .withColumn(
        'country',
        F.element_at(
            F.array(*[F.lit(c) for c in countries]),
            (F.floor(F.rand(21) * len(countries)) + 1).cast('int')
        )
    )
)
# Write the data to a Delta table. Use maxRecordsPerFile to force small files in a performant, distributed write.
# Tune num_partitions and maxRecordsPerFile to get desired file sizes/counts for your cluster.
(df.write
   .format('delta')
   .option('maxRecordsPerFile', 5000)
   .mode('overwrite')
   .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_raw")
)

## Step 2: Analyze the Unoptimized Table

Before we apply any optimizations, let's run a query on our raw table and establish a baseline for performance. We'll look for sales data for a specific customer and product.

**Instructions:**

1. Run the query below.
2. Open the Spark UI for the job that just ran.
3. Observe the number of files read and the time taken for the query to complete.

In [None]:
# Baseline query — read from saved Delta table
from pyspark.sql.functions import col

df_raw = (spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_raw")
      .where((col("customer_id") == 1500) & (col("product_id") == 250))
      )
df_raw.show()

## Step 3: Implementing Partitioning

Partitioning is a way to divide a table into smaller, more manageable parts based on the values of one or more columns. This is most effective on columns with low cardinality that are frequently used in filters.

Let's partition our sales data by `country`.

In [None]:
# Create a partitioned table
(df_raw.write.format("delta")
 .mode("overwrite")
 .partitionBy("country")
 .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_partitioned"))

In [None]:
# Run the same query on the partitioned table
(spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_partitioned")
      .where((col("customer_id") == 1500) & (col("product_id") == 250))
      .show())

**Analysis of Partitioning:**

Go to the Spark UI and compare the execution plan with the baseline. You'll notice that if a filter on the partition key (`country`) was present, Spark would be able to prune entire directories, significantly reducing the amount of data scanned. Even without a direct filter on the partition key, observe any changes in the query plan.

## Step 4: Applying Z-Ordering

Z-Ordering is a technique that co-locates related information in the same set of files. This is particularly useful for high-cardinality columns that are often used in query predicates. We will apply Z-Ordering on `customer_id` and `product_id` to our partitioned table.

In [None]:
from delta.tables import *

# IMPORTANT: Do NOT apply Z-Ordering to the already-partitioned table here.
# Instead, create a z-ordered copy of the unpartitioned raw table for comparison.
# Drop the target table if it exists so this cell is idempotent in iterative runs.
spark.sql(f"DROP TABLE IF EXISTS {CATALOG_NAME}.{SCHEMA_NAME}.sales_raw_zorder")

# Create a separate table from sales_raw to apply Z-Ordering to (keeps sales_partitioned untouched)
spark.sql(f"CREATE TABLE {CATALOG_NAME}.{SCHEMA_NAME}.sales_raw_zorder USING DELTA AS SELECT * FROM {CATALOG_NAME}.{SCHEMA_NAME}.sales_raw")

# Convert the new table to a DeltaTable object
delta_table = DeltaTable.forName(spark, f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_raw_zorder")

# Apply Z-Ordering on high-cardinality columns for the raw copy
delta_table.optimize().executeZOrderBy("customer_id", "product_id")

In [None]:
# Rerun the query to see the effect of Z-Ordering on the z-ordered raw table
(spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_raw_zorder")
      .where((col("customer_id") == 1500) & (col("product_id") == 250))
      .show())

**Analysis of Z-Ordering:**

In the Spark UI, look at the "Details" for the scan phase. You should see a significant reduction in the number of files read, demonstrating the effectiveness of data skipping.

## Step 5: Manual Compaction (OPTIMIZE)

Over time, streaming and DML operations can create many small files in your Delta table, which can hurt read performance. The `OPTIMIZE` command compacts these small files into larger ones.

First, let's simulate the creation of many small files.

In [None]:
# Simulate small file creation by writing in a loop
for i in range(10):
    (df_raw.sample(fraction=0.001)
     .write.format("delta")
     .mode("append")
     .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_to_compact"))

In [None]:
# Check the number of files
location = spark.sql(f"DESCRIBE DETAIL {CATALOG_NAME}.{SCHEMA_NAME}.sales_to_compact").select("location").first()[0]
display(dbutils.fs.ls(location))

In [None]:
# Now, perform manual compaction
delta_table_to_compact = DeltaTable.forName(spark, f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_to_compact")
delta_table_to_compact.optimize().executeCompaction()

In [None]:
# Check the number of files again
location = spark.sql(f"DESCRIBE DETAIL {CATALOG_NAME}.{SCHEMA_NAME}.sales_to_compact").select("location").first()[0]
display(dbutils.fs.ls(location))

**Analysis of Compaction:**

- Observe the reduction in the number of files after running `OPTIMIZE`. This leads to more efficient reads as Spark needs to open and process fewer files.

## Step 6: Auto Compaction

Databricks can automatically compact small files for you. This is enabled through table properties.

Let's create a new table with auto-compaction enabled.

In [None]:
# Create a table with auto-compaction enabled
spark.sql(f"""
CREATE TABLE {CATALOG_NAME}.{SCHEMA_NAME}.sales_auto_compact
USING DELTA
TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true'
)
AS SELECT * FROM {CATALOG_NAME}.{SCHEMA_NAME}.sales_raw
""")

# Simulate small writes to this table
for i in range(10):
    (df.sample(fraction=0.001)
     .write.format("delta")
     .mode("append")
     .saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_auto_compact"))

**Analysis of Auto Compaction:**

- After the writes complete, query the table's history to see the `OPTIMIZE` operations that were automatically triggered. Auto-compaction helps maintain optimal file sizes without manual intervention.

## Step 7: Liquid Clustering

Liquid Clustering is a more flexible and adaptive way to organize data compared to partitioning and Z-Ordering. It's especially useful for tables with high-cardinality columns or evolving query patterns.

**Note:** Liquid Clustering requires a Databricks Runtime that supports it.

In [None]:
# Create a table with Liquid Clustering
spark.sql(f"""
CREATE TABLE {CATALOG_NAME}.{SCHEMA_NAME}.sales_liquid_clustered (
  transaction_id STRING,
  customer_id INT,
  product_id INT,
  sale_date DATE,
  quantity INT,
  unit_price DOUBLE,
  country STRING
)
USING DELTA
CLUSTER BY (customer_id, product_id)
""")

# Insert data into the clustered table
df.write.format("delta").mode("append").saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_liquid_clustered")

# Rerun the query on the liquid clustered table
(spark.read.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_liquid_clustered")
      .where((col("customer_id") == 1500) & (col("product_id") == 250))
      .show())

**Analysis of Liquid Clustering:**

1.  Run the query on the liquid clustered table.
2.  Examine the Spark UI. Liquid Clustering provides similar data skipping benefits as Z-Ordering but is more adaptive to changes in your data and queries over time.

## Step 8: VACUUM

The `VACUUM` command removes old, unreferenced data files from your Delta table's storage. This is crucial for managing storage costs and cleaning up your data lake.

**Important:** By default, `VACUUM` has a retention period of 7 days to prevent accidental deletion of data that might still be in use. For this educational exercise, we will disable this check. **Do not do this in a production environment.**

In [None]:
# Disable the retention period check for demonstration purposes
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

# VACUUM the partitioned table
delta_table_to_vacuum = DeltaTable.forName(spark, f"{CATALOG_NAME}.{SCHEMA_NAME}.sales_partitioned")
delta_table_to_vacuum.vacuum()

# Re-enable the retention check
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")

**Analysis of VACUUM:**

- The `VACUUM` command cleans up files that are no longer part of the current version of the table. You can check the file system before and after running `VACUUM` on a table that has undergone several modifications to see the effect.

## Project Conclusion

In this project, we have explored several key techniques for optimizing Delta Lake tables in Databricks. We've seen how partitioning, Z-Ordering, compaction, and Liquid Clustering can significantly improve query performance by reducing the amount of data that needs to be scanned. We also learned how to maintain our tables using the `VACUUM` command.

**Key Takeaways:**

- **Partitioning:** Best for low-cardinality columns used frequently in filters.
- **Z-Ordering:** Effective for improving data skipping on high-cardinality columns.
- **Compaction:** Solves the "small file problem" and is crucial for read performance.
- **Liquid Clustering:** A modern, flexible approach to data layout that adapts to your data and queries.
- **VACUUM:** Essential for managing storage and cleaning up old data files.

## Cleanup

Run the following cell to drop the catalog and all associated tables created during this project.

In [None]:
# Drop the catalog and all its contents
spark.sql(f"DROP CATALOG IF EXISTS {CATALOG_NAME} CASCADE")