# 🚀 **Lab 4 - Performance Tuning, Optimization & Scaling**
In this lab, you'll explore key concepts and techniques for tuning performance, selecting optimal Delta configs, and other critical concepts to maintain a scalable lakehouse architecture. Let's dive in!  

## 🎯 What You'll Learn
- The Native Execution Engine
- Optimizing tables with zone and use case appropriate table features
- Table clustering
- Compute sizing
- Query optimization
- Table maintenance

## **4.1 - Why Spark & Delta Performance Tuning Matters** | 🕑 5 min.
#### 🧠 Why Performance Tuning?
Performance tuning is not just about speed—it’s about:
- 💰 **Reducing cost** (fewer resources, faster jobs)
- 📈 **Improving scalability** (handle more data, more users)
- 📊 **Delivering consistent SLAs** (avoid those surprise slowdowns)
- 🧘 **Creating a smoother dev & user experience** (debug less, wait less)

In Spark + Delta workloads, **every layer matters**—from the execution engine and data layout to the shape of your compute and how queries are written. Getting performance right means **pulling the right levers at the right time**

![](https://github.com/voidfunction/FabCon25SparkWorkshop/blob/main/module-4-tuning-optimizing-scaling/_media/overview.excalidraw.png?raw=true)

## **4.2 - Going Native for Big Gains** | 🕑 15 min.
### **4.2.0 Why the Native Execution Engine?**
#### **4.2.0.1 Columnar vs. Row Memory**
![](https://github.com/voidfunction/FabCon25SparkWorkshop/blob/main/module-4-tuning-optimizing-scaling/_media/nee-vs-spark-jvm.excalidraw.png?raw=true)

<br>

#### **4.2.0.2 Vectorized vs. Scalar Hardware Processing**
![](https://github.com/voidfunction/FabCon25SparkWorkshop/blob/main/module-4-tuning-optimizing-scaling/_media/nee-vs-spark-jvm-simd.excalidraw.png?raw=true)

### 💪 **4.2.1 - The Power of the Native Execution Engine**

#### **4.2.1.1 - Create benchmark function**

In [1]:
def benchmark_native_engine(query):
    import time
    spark.conf.set("spark.synapse.vegas.useCache", "false")
    spark.conf.set("spark.native.enabled", "false")
    # Set description of the job
    spark.sparkContext.setJobDescription(f"Spark Query: {query}")
    # Record start time
    start_time_spark = time.time()
    # Execute query
    q = spark.sql(query).limit(1000)
    # Collect results (this triggers the query execution)
    if query.strip().replace('\n', '').split(' ')[0].lower() in ('with', 'select'):
        q.collect()
    # Record end time
    end_time_spark = time.time()
    duration_spark = (end_time_spark - start_time_spark) * 1000  # Convert to milliseconds
    print(f"Execution time w/ Spark: {duration_spark:.2f} ms")

    spark.conf.set("spark.native.enabled", "true")
    # Set description of the job
    spark.sparkContext.setJobDescription(f"Native Query: {query}")
    # Record start time
    start_time_native = time.time()
    # Execute query
    q = spark.sql(query).limit(1000)
    # Collect results (this triggers the query execution)
    if query.strip().replace('\n', '').split(' ')[0].lower() in ('with', 'select'):
        q.collect()
    # Record end time
    end_time_native = time.time()
    spark.sparkContext.setJobDescription(None)
    duration_native = (end_time_native - start_time_native) * 1000  # Convert to milliseconds

    # Get the execution plan
    execution_plan = q._jdf.queryExecution().executedPlan().toString()

    # Assert that the plan contains "Velox" to make sure that NEE was enabled and had at least 1 compatible operation
    assert "Velox" in execution_plan, f"Plan did not contain Velox: {execution_plan}"

    spark.conf.set("spark.synapse.vegas.useCache", "true")

    print(f"Execution time w/ Native: {duration_native:.2f} ms")
    times_faster = duration_spark/duration_native
    if duration_spark > duration_native:
        print(f"Native was \033[1;34m{times_faster:.1f}x faster\033[0m!!!")


StatementMeta(, 59d6dd39-09c7-4fd8-a0d2-3306f18d4fbf, 3, Finished, Available, Finished)

#### **4.2.1.2 - Run Benchmark on 1.5B row parquet dataset**

In [None]:
taxi_df = spark.read.parquet(f"abfss://47938747-73b4-4f78-99dc-4ff2afa78142@onelake.dfs.fabric.microsoft.com/443d992d-01f1-4caa-9345-088e81dd81df/Files/nyctlc/yellow")
taxi_df.createOrReplaceTempView('nyc_yellow_taxi')

StatementMeta(, d3796a3c-3e9a-4d18-9ab2-6ff109678345, 3, Finished, Available, Finished)

In [None]:
benchmark_native_engine("""
    SELECT 
        AVG(tripDistance), 
        VendorID 
    FROM nyc_yellow_taxi
    GROUP BY ALL
""")

📌 **Challenge:** write your own query and execute it with the benchmark function: `benchmark_native_engine()`. Reference the DataFrame via the temp view `nyc_yellow_taxi` just like it is any other Lakehouse table or view. 

💡 **Tip:** you can call `.printSchema()` on any DataFrame (i.e. `taxi_df`) to see what columns are available and the respective data types.

In [None]:
benchmark_native_engine("""
    SELECT 

    FROM nyc_yellow_taxi
""")

<details>
  <summary><strong>🔑 Answer:</strong>  Click to expand the answer if you need help with this task</summary>

~~~python
# Print the schema
taxi_df.printSchema()

# Write a SELECT statement
benchmark_native_engine("""
    SELECT *
    FROM nyc_yellow_taxi
    LIMIT 1000
""")
~~~
  
</details>

#### **4.2.1.3 - Compare Final Execution Plans**
- **Numbering (`*(1)`, `*(2)`) Represents Execution Order**
- **Caret symbol (`^`) Represents a Stage Executed via the Velox engine (NEE)**
- Graphical plans show Native execution in <span style="color:green">Green</span> and Spark in <span style="color:blue"> blue</span>

📌 **Challenge:** open the URL in a new tab and go to the _SQL / DataFrame_ tab. Then compare the difference between SQL queries that start with `Spark Query:` vs. `Native Query` (this job description is coded into the benchmark_native_engine() function).

💡 **Tip:** You can call `spark.sparkContext.uiWebUrl` to get the link to the Spark UI. 

In [None]:
spark.sparkContext.uiWebUrl

### **4.2.2 - Enable the Native Execution Engine**
We can enable any mutable Spark configuration via PySpark, Scala, or SparkSQL

In [1]:
# PySpark / Scala
spark.conf.set("spark.native.enabled", "true")

# SparkSQL
spark.sql("SET spark.native.enabled = True")

StatementMeta(, 259855e6-c78d-4dfb-9014-6f56c59cedfe, 3, Finished, Available, Finished)

DataFrame[key: string, value: string]

#### **4.2.2.1 - Audit Session Configs**

In [None]:
# Verify that Spark config is set
spark.conf.get("spark.native.enabled")

##### 🚨 Key Takeaway: **Not Using the Native Execution Engine is a Missed Opportunity**

- If you aren't using the Native Execution, and thus operating on the traditional Spark JVM-based execution engine, you’re **missing out on huge performance gains**—we’re talking 2x–5x improvements in many cases.
- ✅ *Fix*: Validate engine usage via the Spark UI or logs. Ensure compatible functions and workloads are used to **unlock native execution**.

## **4.3 - Smart Feature Selection by Zone & Workload** | 🕑 10 min.
While default Spark configurations should benefit the most common use cases, sometimes we need to modify configurations to optimize our specific workloads. Here we will set spark configurations to optimize typical bronze and silver zones.

![OptimizeWrite](https://milescole.dev/assets/img/posts/Optimized-Writes/optimized-write.excalidraw.png)

![Deletion Vectors](https://milescole.dev/assets/img/posts/Deletion-Vectors/deletion-vectors-tldr2.excalidraw.png)

#### **4.3.1.1 - Get Baseline Perf Before Config Modification**

In [None]:
%%sql
UPDATE gold.dbo.patientobservations SET deceasedDateTime = NULL WHERE first_name = 'Dion244' and last_name = 'Bergnaum523'

### **4.3.2 - Enable Session Configs**

In [16]:
spark.conf.set("spark.microsoft.delta.optimizeWrite.partitioned.enabled", "true") 
    # optimizeWrite is generally beneficial for partitioned tables
spark.conf.unset("spark.databricks.delta.optimizeWrite.enabled") 
    # unset so that `optimizeWrite.partitioned.enabled` is not overridden
spark.conf.set("spark.databricks.delta.optimizeWrite.binSize", "128m") 
    # appropriate target file size for small tables up to 10GB in size
spark.conf.unset("spark.sql.parquet.vorder.default") 
    # unset so that vorder is not enabled by default at the session level and can be controlled at the table level as needed
spark.conf.set("spark.databricks.delta.properties.defaults.enableDeletionVectors", "true") 
    # improve performance of performing updates and deletes
spark.conf.set("spark.databricks.delta.optimize.maxFileSize", "128m") 
    # appropriate target file size for small tables up to 10GB in size

StatementMeta(, 765ae0fd-357a-4234-bf0a-0c317eeca3f7, 18, Finished, Available, Finished)

#### **4.3.2.1 - Enable Delta Features on an Existing Table**
Now that we've tailored our Spark configurations to our specific workload. Newly created Delta tables will inherit these configurations. We can modify the V-Order and Deletion Vector properties on existing tables by altering the tables:

In [17]:
%%sql
ALTER TABLE gold.dbo.patientobservations
SET TBLPROPERTIES (
    'delta.parquet.vorder.enabled' = 'false',
    'delta.enableDeletionVectors' = 'true'
)

StatementMeta(, 765ae0fd-357a-4234-bf0a-0c317eeca3f7, 19, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

#### **4.3.2.2 - Audit Delta Table Features**
To verify that our table has the desired optimal features, we can use `DESCRIBE DETAIL` using SparkSQL to check what features are enabled on the table. 

📌 **Challenge:** Use `DESCRIBE DETAIL` to verify  that Deletion Vectors are enabled and that V-Order is not specified or disabled on `gold.dbo.patientobservations`.

In [None]:
%%sql


<details>
  <summary><strong>🔑 Answer:</strong>  Click to expand the answer if you need help with this task</summary>

~~~sql
DESCRIBE DETAIL golddenormalized.dbo.patientobservations_gold
~~~
  
</details>

#### **4.3.2.3 - Run Statement with Write Optimized Delta Features**

In [None]:
%%sql
UPDATE gold.dbo.patientobservations SET deceasedDateTime = NULL WHERE first_name = 'Dion244' and last_name = 'Bergnaum523'

The `UPDATE` statement should've run at about 2x faster, largely due to Deletion Vectors being enabled. 

📌 **Challenge:** Use `DESCRIBE HISTORY` on the table and inpsect the `operationMetrics` columns for the two `UPDATE` operations to see the difference with Deletion Vectors enabled.

In [None]:
%%sql


<details>
  <summary><strong>🔑 Answer:</strong>  Click to expand the answer if you need help with this task</summary>

~~~sql
DESCRIBE HISTORY gold.dbo.patientobservations
~~~
  
</details>

> ℹ️ _You can query Delta table history by creating a temp view and then querying it just like any table. Run the below to get a cleaner display comparing key write operation metrics._

In [None]:
spark.sql("DESCRIBE HISTORY gold.dbo.patientobservations").createOrReplaceTempView("po_history")
history_df = spark.sql("""
    SELECT 
        version, 
        operationMetrics, 
        operationMetrics.numUpdatedRows, 
        operationMetrics.numCopiedRows, 
        operationMetrics.numDeletionVectorsAdded, 
        operationMetrics.rewriteTimeMs 
    FROM po_history 
    WHERE operation = 'UPDATE'
""")
display(history_df)

## **4.4 - Getting the Most Out of Table Clustering** | 🕑 5 min.

## 📦 Table Clustering in Practice

> Clustering controls **how data is physically laid out**, which affects **read efficiency**, **file skipping**, and **query performance**.

---

### 🧱 Partitioning (Traditional)

✅ Best for **very large datasets** with **clear access patterns**.

- 📏 **Avoid partitioning under 1 TB** — small tables get worse performance due to file and metadata overhead.
- 🎯 **Pick low-cardinality columns** (e.g., `year`, `region`) — too many partitions = too many small files.
- 🚫 Avoid partitioning by high-cardinality fields (e.g., `user_id`, `uuid`, `timestamp`) unless absolutely needed.
- Optimized Write should generally be enabled when writing to partitioned tables to prevent too many small files.

> 🧠 Good partitioning reduces scan ranges — bad partitioning just bloats your file count and adds additional write and read overhead.

---

### 🧮 Z-Order Clustering

✅ Designed for **multi-dimensional skipping**, especially for **interactive queries**.

- ⚠️ **Impractical for data production** — Z-Ordering is **not incremental**, and **not supported during write**.
- 📦 Requires **periodic post-processing (OPTIMIZE ZORDER BY)**, which is **expensive** and often incompatible with streaming or near-real-time workflows.
- 🧪 Use only if you're managing data for **heavy ad-hoc analytics workloads** and can afford the maintenance.

> 💬 *"Z-Order helps readers — but is very expensive for writers to maintain."*

---

### 🌊 Liquid Clustering

🚧 **Not yet recommended** in Microsoft Fabric (as of OSS Delta 3.2).

- ⚙️ OSS Delta lacks complete **write-time optimizations** (e.g., optimal clustering-aware writes, file grouping).
- 🔍 No **read-time skipping optimizations** yet, so clustered data doesn't give file skipping benefits.
- 🔁 It makes writer 20-25% slower and doesn't improve reads.

> 📌 Until both **write-side layout** and **reader skipping logic** are improved, liquid clustering gives limited benefit.

---

### ✨ Summary Guidelines

| Feature             | Use It When...                           | Avoid When...                                 |
|---------------------|------------------------------------------|-----------------------------------------------|
| **Partitioning**    | Table is >1TB, access is filterable      | Table is small or queries are random access   |
| **Z-Order**         | You control refresh cadence, filter by multiple columns | You have streaming/real-time pipelines |
| **Liquid Clustering** | (Not recommended yet in Fabric)         | (Not mature enough for benefit)               |

---

## 🔚 Rule of Thumb

> **Don’t cluster by default — cluster with purpose.**


## **4.5 - Right-Sizing Spark for the Job** | 🕑 5 min.

### ✅ **4.5.1 - General Guidance: Right-Sizing Spark Compute**

<br>

The key is to **match node size and count** to your workload’s shape and business goals — not just go big or small by default.

<br>

---

<br>

#### 💡 **Best Practices**

<br>

- 🧠 **Consider workload shape and SLAs**:  
  Are you optimizing for **cost**, **speed**, or a balance of both? If you need to hit tight SLAs, right-sizing means **spending smarter**, not just spending more.

<br>

- ⚖️ **Bigger clusters ≠ more expensive — if they finish proportionally faster**:  
  A cluster that is 4× bigger and finishes your job 4× faster will cost about the **same** — but delivers results sooner and frees up compute for other jobs.

<br>

- 🚀 **Larger nodes typically reduce shuffle & GC overhead**:  
  Fewer, larger executors often result in less shuffling, more efficient memory usage, and lower GC times.

<br>

- 🎛️ **Autoscale and Dynamic Allocation = flexibility & efficiency**  
  These features shine when **your jobs vary in shape** or **you don't know exactly what the workload will look like**. They let Spark request resources based on actual needs — starting as single node, growing when busy, shrinking when idle. Autoscale tends to be somewhat _conservative_ in deciding when to scale in comparison to other platforms.

<br>

---

<br>

#### 🔧 **Your Knobs**

| Knob | Options |
|------|---------|
| **Node Size** | Small (4 cores) → Medium (8 cores) → Large (16 cores) → XL (32 cores) → XXL (64 cores) |
| **# of Nodes** | 1 to N |
| **Autoscale Range** | 1 to N nodes |
| **Dynamic Allocation Range** | 1 to N nodes|

<br>

---

<br>

#### 💡 **When to Use What**

| Scenario | Guidance |
|----------|----------|
| **Transform-heavy jobs with shuffles & joins** | Use **larger nodes** (16–64 cores), fewer total nodes. Avoid tiny fragments. |
| **Bursty or unpredictable jobs** | Use **Autoscale + Dynamic Allocate** to let the cluster grow/shrink as needed. Works well when jobs vary in size. |
| **Many small parallel jobs (e.g., streaming or batch microjobs)** | Use **small/medium nodes**, more nodes. Set minimum nodes to avoid cold start lag. |
| **Small serial processed jobs or development work** | Use **small or medium nodes** in single node mode (driver and executor shares 1 VM). |
| **Large jobs with known partitioning** | Pre-size the cluster manually: pick the node size and count based on data volume and shuffle stages. Disable DA if it isn't scaling up aggressively enough to meet your performance needs. |
| **ML or distributed training** | Use **many medium/large nodes** to maximize parallelism and distribute compute evenly. |

<br>

> ⚠️ **When using High-Concurrency Mode** the upper range of DA should be less than the maximum Autoscale range to prevent a single job from allocating all nodes in a long running operation.
> <br>
> <br>
>_Outside of High-Concurrency Mode, Autoscale and Dynamic Allocation should be set to the same range._

<br>

---

<br>

#### 🚩 **Red Flags That You're Mis-Sized**

- **Long GC times** → Node too small or too many small executors  
- **Shuffle spill to disk** → Not enough memory, or too many partitions per core  
- **Slow start / long scheduling delay** → DA starting too low or nodes scaling up too late  
- **Low CPU usage across nodes** → Too large, underutilized cluster or poorly parallelized job

<br>

---

<br>

#### ⚖️ **Key Trade-Offs**

- **Autoscale + DA = flexibility**, but may cause **warm-up lag** or inconsistent performance  
- **Static sizing = predictability**, but risks **wasting resources** on smaller jobs  
- **Larger nodes = better shuffle handling**, **fewer JVMs**, but less granular parallelism  
- **Smaller nodes = fine-grained parallelism**, but more driver overhead per executor  

<br>

---

<br>

#### ⚠️ **Things to Avoid!**

##### 🐢 Underpowered or Poorly Sized Compute

- **Small clusters** may save money up front—but if they cause excessive **shuffling, spilling, or GC**, they end up costing more in time and dollars.
- **Too many small executors** can be worse than **fewer large ones**, especially for shuffles and joins.
- ✅ *Fix*: Right-size your compute for the workload. Use **metrics like shuffle spill, memory usage, and GC time** to guide tuning.

<br>

##### 🔄 Too Much Caching / Wrong Things Cached

- Caching large DataFrames unnecessarily eats up executor memory and causes GC issues.
- Worse, caching things that change between actions results in stale or invalid data.
- ✅ *Fix*: Only cache when reuse is **frequent and beneficial**. Unpersist when done.


<br>

---

<br>

#### 🔑 **Key Takeaway**

Start with the **shape of your workload** and your **expected data processing SLAs**, not the shape of your cluster.


## **4.6 - Writing Smarter Queries** | 🕑 10 min.



### 🎲 **4.6.1. Avoid Relying on Schema Inference in Production**

- Schema inference slows down reads, **increases job execution time**, and can result in **unexpected schema drift**.
- Particularly painful in high-scale scenarios.
- ✅ *Fix*: Define schemas explicitly in production jobs, especially for semi-structured data like JSON and any structured files without a schema header (CSV, Excel, etc.).

👉 _Let's explore how this impacts execution times. First, lets start with allowing Spark to automatically infer the schema._

In [None]:
from pyspark.sql.functions import from_json,col
from pyspark.sql.types import *

observations_path = f"abfss://47938747-73b4-4f78-99dc-4ff2afa78142@onelake.dfs.fabric.microsoft.com/443d992d-01f1-4caa-9345-088e81dd81df/Files/observationsrawfull/"
# Load JSON data
observations_raw_df = spark.read.json(observations_path)

display(observations_raw_df)

We can get the underlying schema of any DataFrame via calling `.schema` on the DataFrame. 

📌 **Challenge:** Use the following cell to return the schema and then create a variable named `schema` with the schema that was returned.

StatementMeta(, b0d3c55e-45ca-4b9f-a54f-3db85457abd0, 12, Finished, Available, Finished)

<details>
  <summary><strong>🔑 Answer:</strong>  Click to expand the answer if you need help with this task</summary>

~~~python
# Print the schema
print(observations_raw_df.schema)

# Create a variable named schema
schema = observations_raw_df.schema
# OR
schema = StructType([StructField('category', ArrayType(StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True)]), True), True), StructField('code', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('component', ArrayType(StructType([StructField('code', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('valueCodeableConcept', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('valueQuantity', StructType([StructField('code', StringType(), True), StructField('system', StringType(), True), StructField('unit', StringType(), True), StructField('value', DoubleType(), True)]), True), StructField('valueString', StringType(), True)]), True), True), StructField('effectiveDateTime', StringType(), True), StructField('encounter', StructType([StructField('reference', StringType(), True)]), True), StructField('id', StringType(), True), StructField('issued', StringType(), True), StructField('meta', StructType([StructField('profile', ArrayType(StringType(), True), True)]), True), StructField('resourceType', StringType(), True), StructField('status', StringType(), True), StructField('subject', StructType([StructField('reference', StringType(), True)]), True), StructField('valueCodeableConcept', StructType([StructField('coding', ArrayType(StructType([StructField('code', StringType(), True), StructField('display', StringType(), True), StructField('system', StringType(), True)]), True), True), StructField('text', StringType(), True)]), True), StructField('valueQuantity', StructType([StructField('code', StringType(), True), StructField('system', StringType(), True), StructField('unit', StringType(), True), StructField('value', DoubleType(), True)]), True)])
~~~
  
</details>


Now run the code below to read the JSON files into a DataFrame, but with the statically defined schema to see the performance improvement. You should see much better performance.

In [None]:
from pyspark.sql.functions import from_json,col
observations_path = f"abfss://47938747-73b4-4f78-99dc-4ff2afa78142@onelake.dfs.fabric.microsoft.com/443d992d-01f1-4caa-9345-088e81dd81df/Files/observationsrawfull/"
# Load JSON data
observations_raw_df = spark.read.json(observations_path, schema)

display(observations_raw_df)

### ❓ **4.6.2 - Avoid Overusing Repartition or Coalesce Without Understanding**

- Over-partitioning increases task overhead and shuffle volume. Under-partitioning can lead to skew and stragglers.
- `repartition()` is rarely needed but can be extremely helpful when used intentionally and in the right scenarios.
- ✅ *Fix*: Base partitioning decisions on **data size** and **number of output files**. Use the **Spark UI** to see partition/task counts. If you don't know what you are doing, don't repartition.


### 🔄 **4.6.3 - Avoid Too Much Caching / Wrong Things Cached**

- Caching large DataFrames unnecessarily eats up executor memory and causes GC issues.
- Worse, caching things that change between actions results in stale or invalid data.
- ✅ *Fix*: Only cache when reuse is **frequent and beneficial**. Unpersist when done.


### 🧊 **4.6.4 - Avoid Using Expensive UDFs in Hot Paths**

- Python, Scala, or Java UDFs are black boxes to the Catalyst optimizer.
- They **disable query optimizations**, and often run **single-threaded** per executor task.
- ✅ *Fix*: Rewrite logic using Spark SQL functions or **pandas_udf** (if applicable). Avoid UDFs in `WHERE`, `JOIN`, and `GROUP BY` clauses.


### 📥 **4.6.5 - Avoid Data Skew**

- When a small number of keys have a disproportionate number of rows, tasks processing those keys become **stragglers**, slowing down the job.
- ✅ *Fix*: Detect skew via Spark UI or metrics. Use **salting**, **adaptive skew join**, or data bucketing for heavily skewed keys.


## **4.7 - Keeping Tables Healthy** | 🕑 5 min.

### **📉 4.7.1 - Avoiding Small File Problems**
- Delta Lake performance relies heavily on **file pruning** and **partition pruning**.
- If your table has thousands of tiny files (or large unpartitioned ones), query planning and scanning become inefficient.
- ✅ *Fix*: Use **Auto Compaction**, combined with **Optimized Write** where appropriate.

<br>

![Auto Compaction](https://milescole.dev/assets/img/posts/Compaction/auto-compaction.excalidraw.png)

### 🔁 **4.7.2 - Preventing Excessive Storage Costs**
- Delta tables **accumulate metadata and file over time**. Without regular maintenance, the number of files will continue to accumulate. While running `OPTIMIZE` will prevent performance issues due to accumulating many files, `VACUUM` will help prevent unnessesary storage costs.
- ✅ *Fix*: Set up recurring jobs to:
  - Clean up obsolete data (`VACUUM`) -> Prevents excessive storage costs


#### **4.7.2.1 - Run Vacuum to Clean Up Old Files**
Delta has a safety check to prevent a possible concurrent long running write operations from have data deleted pre-commit. If you are certain that there are no operations being performed on this table that take longer than the retention interval you plan to specify, you can turn off this safety check by setting the Spark configuration property `spark.databricks.delta.retentionDurationCheck.enabled` to `false`.

In [27]:
# allow low retention vacuum
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

StatementMeta(, 765ae0fd-357a-4234-bf0a-0c317eeca3f7, 29, Finished, Available, Finished)

_Before_ running `VACUUM`, let's get a count of parquet files in the table's data directory:

In [None]:
files = notebookutils.fs.ls(f"abfss://{notebookutils.runtime.context['currentWorkspaceName']}@{spark.conf.get('fs.defaultFS').split('@')[1]}gold.Lakehouse/Tables/dbo/patientobservations/")
all_files = [f.path for f in files if f.path.endswith('.parquet')]
print(len(all_files))

Running `VACUUM` by default will delete inactive files from Delta versions older than 168 hours (7 days). You can customize the number of hours that are retained via adding `RETAIN N HOURS`. 

📌 **Challenge:** Modify the `VACUUM` statement below to **retain 0 hours** of history.

In [None]:
%%sql
VACUUM gold.dbo.patientobservations

<details>
  <summary><strong>🔑 Answer:</strong>  Click to expand the answer if you need help with this task</summary>

~~~sql
VACUUM gold.dbo.patientobservations RETAIN 0 HOURS
~~~
  
</details>

#### **4.7.2.2 - Verify Only Active Files Remain**
Now that VACUUM has been run, let's verify that files pertaining to older table versions has been cleaned

In [None]:
files = notebookutils.fs.ls(f"abfss://{notebookutils.runtime.context['currentWorkspaceName']}@{spark.conf.get('fs.defaultFS').split('@')[1]}gold.Lakehouse/Tables/dbo/patientobservations/")
all_files = [f.path for f in files if f.path.endswith('.parquet')]
len(all_files)

To verify that these are files in the active Delta version, we can use `df.inputFiles()` to return the files used to return a query against the table.

In [None]:
active_files = spark.sql(f"SELECT * FROM gold.dbo.patientobservations").inputFiles()
active_files

Now we'll convert the two lists to sets so that we can easily subtract the two to veryify that all files from old versions has been vacuumed.

In [None]:
all_files = [f.path.split('/')[-1] for f in files if f.path.endswith('.parquet')]
active_files = [f.split('/')[-1] for f in active_files]
result = list(set(all_files) - set(active_files))
assert len(result) == 0

StatementMeta(, 765ae0fd-357a-4234-bf0a-0c317eeca3f7, 41, Finished, Available, Finished)

## **BONUS: Cheat Sheet - Top Reasons for Sub-Optimal Performance**
---
<br>

###### ❌ **1. Not Using the Native Execution Engine**

- If you aren't using the Native Execution, and thus operating on the traditional Spark JVM-based execution engine, you’re **missing out on huge performance gains**—we’re talking 2x–5x improvements in some cases.
- ✅ *Fix*: Validate engine usage via the Spark UI or logs. Ensure compatible functions and workloads are used to **unlock native execution**.
- 👉 *Deep dive in Section 4.2*

<br>

---
<br>

###### ⚠️ **2. Using Features Without a Use Case**

- Features like **Deletion Vectors**, **Change Data Feed**, **Optimized Writes**, and **V-Order** are powerful—but they **add overhead**.
- When enabled on high-ingest or read-intensive tables **without a clear need**, they increase metadata size, write cost, and memory usage.
- ✅ *Fix*: Evaluate features **per table and per workload zone** based on workload requirements.
- 👉 *Best practices are covered in Section 4.3*

<br>

---
<br>

###### 🌀 **3. Mixing Execution Contexts**

- Combining **Spark, Pandas, Polars, DuckDB**, etc. in a single pipeline adds **serialization overhead**, breaks optimizations, and introduces **data duplication in memory**.
- While flexible, it reduces the optimizer’s ability to plan holistically.
- ✅ *Fix*: Stick to one processing engine per stage of the pipeline. Use cross-context conversions **sparingly and intentionally**.

<br>

---
<br>

###### 🐢 **4. Underpowered or Poorly Sized Compute**

- **Small clusters** may save money up front—but if they cause excessive **shuffling, spilling, or GC**, they end up costing more in time and dollars.
- **Too many small executors** can be worse than **fewer large ones**, especially for shuffles and joins.
- ✅ *Fix*: Right-size your compute for the workload. Use **metrics like shuffle spill, memory usage, and GC time** to guide tuning.
- 👉 *Discussed in Section 4.5*

<br>

---
<br>

###### 🎲 **5. Relying on Schema Inference in Production**

- Schema inference slows down reads, increases job startup time, and can result in **unexpected schema drift**.
- Particularly painful in high-scale scenarios.
- ✅ *Fix*: Define schemas explicitly in production jobs, especially for semi-structured data like JSON and any structured files without a schema header (CSV, Excel, etc.).
- 👉 *Discussed in Section 4.6*

<br>

---
<br>

###### 📉 **6. Poor File Layout and Small File Problem**

- Delta Lake performance relies heavily on **file pruning** and **partition pruning**.
- If your table has thousands of tiny files (or large unpartitioned ones), query planning and scanning become inefficient.
- ✅ *Fix*: Use **Auto Compaction**, combined with **Optimized Write** where appropriate.

<br>

---
<br>

###### 🔁 **7. No Table Maintenance Strategy**
- Delta tables **accumulate metadata and file churn**. Without regular maintenance, performance **gradually decays**.
- ✅ *Fix*: Set up recurring jobs to:
  - Compact files (`OPTIMIZE`) -> Maintains performance
  - Clean up obsolete data (`VACUUM`) -> Prevents excessive storage costs

<br>

---
<br>

###### ❓ **8. Overusing Repartition or Coalesce Without Understanding**

- Over-partitioning increases task overhead and shuffle volume. Under-partitioning can lead to skew and stragglers.
- `repartition()` is rarely needed but can be extremely helpful when used intentionally and in the right scenarios.
- ✅ *Fix*: Base partitioning decisions on **data size** and **number of output files**. Use the **Spark UI** to see partition/task counts. If you don't know what you are doing, don't repartition.

<br>

---
<br>

###### 📊 **9. Poor Partitioning Strategy**

- Partitioning is generally only beneficial for tables > 1Tb in compressed size. Partitioning below 1Tb generally hurts read performance and causes unnessesary write overhead.
- Partitioning on high cardinality columns will quickly results in **small-file problems**.
- ✅ *Fix*: Choose partition columns based on **query patterns**, **cardinality**, and **evolution over time**. Evaluate regularly.

<br>

---
<br>

###### 🔄 **10. Too Much Caching / Wrong Things Cached**

- Caching large DataFrames unnecessarily eats up executor memory and causes GC issues.
- Worse, caching things that change between actions results in stale or invalid data.
- ✅ *Fix*: Only cache when reuse is **frequent and beneficial**. Unpersist when done.

<br>

---
<br>

###### 🧊 **11. Using Expensive UDFs in Hot Paths**

- Python, Scala, or Java UDFs are black boxes to the Catalyst optimizer.
- They **disable query optimizations**, and often run **single-threaded** per executor task.
- ✅ *Fix*: Rewrite logic using Spark SQL functions or **pandas_udf** (if applicable). Avoid UDFs in `WHERE`, `JOIN`, and `GROUP BY` clauses.

<br>

---
<br>

###### 📥 **12. Lack of Data Skew Handling**

- When a small number of keys have a disproportionate number of rows, tasks processing those keys become **stragglers**, slowing down the job.
- ✅ *Fix*: Detect skew via Spark UI or metrics. Use **salting**, **adaptive skew join**, or data bucketing for heavily skewed keys.

<br>
