# Introduction to Delta Lake


Delta Lake is an open-source storage layer that brings ACID transactions, scalable metadata handling, and unified batch and streaming data processing to data lakes. It works on top of existing Apache Spark and data lake storage (e.g., ADLS, S3, HDFS) to provide reliability and performance improvements over traditional data lakes.

**key features**
- ACID Transactions: Ensures data reliability with atomicity, consistency, isolation, and durability.
- Schema Enforcement: Prevents bad data from corrupting your tables.
- Schema Evolution: Allows automatic schema updates as data changes.
- Time Travel: Enables data versioning with the ability to query old data (VERSION AS OF or TIMESTAMP AS OF).
- Unified Batch & Streaming: Supports both types of workloads on the same data.
- Data Lineage & Audit: Every change is tracked in the transaction log (_delta_log), helping with traceability.

**how does it achieves the delta format** <br>
Delta Lake uses a transaction log (stored in the _delta_log folder) to track changes to data files. Each write creates a new version of the table with atomic changes.
Data is stored in Parquet files
The _delta_log directory stores JSON logs for every transaction
Delta tables can be queried just like regular tables using Spark SQL or PySpark

In [0]:
from pyspark.sql import Row

# Create a list of Rows
data = [
    Row(name="Alice", list_of_items=["apple", "banana", "cherry"]),
    Row(name="Bob", list_of_items=["orange", "grape", "melon"]),
    Row(name="Charlie", list_of_items=["pear", "kiwi", "mango"])
]

# Create a DataFrame
df = spark.createDataFrame(data)

# Display the DataFrame
display(df)

name,list_of_items
Alice,"List(apple, banana, cherry)"
Bob,"List(orange, grape, melon)"
Charlie,"List(pear, kiwi, mango)"


In [0]:
from pyspark.sql.functions import explode
new_df=df.select(df.name,explode(df.list_of_items))
display(new_df)

name,col
Alice,apple
Alice,banana
Alice,cherry
Bob,orange
Bob,grape
Bob,melon
Charlie,pear
Charlie,kiwi
Charlie,mango


# Volume
Volumes are Unity Catalog objects representing a logical volume of storage in a cloud object storage location. Volumes provide capabilities for accessing, storing, governing, and organizing files. While tables provide governance over tabular datasets, volumes add governance over non-tabular datasets. You can use volumes to store and access files in any format, including structured, semi-structured, and unstructured data.

**Saving the df into delta fromat in managed volumes**

In [0]:
new_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/Volumes/workspace/default/new_volume")

**When you save a DataFrame in Delta format, Delta Lake stores data in a combination of Parquet files and a _delta_log folder inside the directory you specified.**

In [0]:
from pyspark.sql.functions import lit

nums = 10  # Define the variable nums
new_df = new_df.withColumn("nums", lit(nums))
new_df.show()

+-------+------+----+
|   name|   col|nums|
+-------+------+----+
|  Alice| apple|  10|
|  Alice|banana|  10|
|  Alice|cherry|  10|
|    Bob|orange|  10|
|    Bob| grape|  10|
|    Bob| melon|  10|
|Charlie|  pear|  10|
|Charlie|  kiwi|  10|
|Charlie| mango|  10|
+-------+------+----+



# the deltalog

** _delta_log/ – Transaction Log (Delta Table Metadata)**


- This folder contains the transaction history and metadata of your Delta table.
- It is what enables ACID transactions, schema enforcement, time travel, and more.
the json files kepps the track of of data over time


In [0]:
spark.read.format("json").load("/Volumes/workspace/default/new_volume/_delta_log/00000000000000000001.json").display()

add,commitInfo,metaData
,"List(0711-072404-mijs91ih-v2n, Databricks-Runtime/16.4.x-aarch64-photon-scala2.12, true, WriteSerializable, WRITE, List(1, 1141, 9), List(Append, [], false), 0, List(true, false), 1752222138652, 76d863b7-4a9c-4f91-b13e-67ea2d5e1a3c, 3444117624069686, shaikmohammadthaheer@gmail.com)",
,,"List(List(true), 1752221537105, List(parquet), 9bc9e51a-77ed-407f-8910-36fe6fd26e9d, List(), {""type"":""struct"",""fields"":[{""name"":""name"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""col"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""nums"",""type"":""integer"",""nullable"":true,""metadata"":{}}]})"
"List(true, 1752222139000, part-00000-45681a2a-c446-4066-9efd-2029834a4f74.c000.snappy.parquet, 1141, {""numRecords"":9,""minValues"":{""name"":""Alice"",""col"":""apple"",""nums"":10},""maxValues"":{""name"":""Charlie"",""col"":""pear"",""nums"":10},""nullCount"":{""name"":0,""col"":0,""nums"":0},""tightBounds"":true}, List(1752222139000000, 1752222139000000, 1752222139000000, 268435456))",,


## 📚 Managed Tables vs External Tables in Delta Lake / Databricks

Delta tables can be created as either **Managed** or **External**, depending on how storage is controlled.

---

### 🏠 Managed Tables (Internal Tables)

- The **metastore fully manages** both the metadata and the data files.
- The data is stored in the default warehouse location (e.g., DBFS).
- When you **drop the table**, both the **metadata and the data files are deleted**.
- Suitable for quick development, internal workflows, and temporary data.

#### ✅ When to Use Managed Tables

- You don’t need to manage the physical data path.
- You want everything stored and cleaned up automatically by Databricks.
- You’re working on prototypes, internal projects, or sandbox environments.

---

### 📦 External Tables

- You **define the storage path** (e.g., S3, ADLS, or Volume).
- Only the **table metadata** is stored in the metastore.
- When you **drop the table**, only the metadata is deleted — the **actual data files remain**.
- Ideal for production systems, shared storage, or external data management.

#### ✅ When to Use External Tables

- You want full control over where and how the data is stored.
- You're using **cloud storage** like S3, ADLS, or Unity Catalog Volumes.
- You need to **preserve the data even after dropping the table**, or share it across workspaces or tools.

---

### 🔄 Summary Table

| Feature                  | Managed Table             | External Table                     |
|--------------------------|---------------------------|-------------------------------------|
| Data storage location    | Managed automatically      | You specify the path               |
| Data ownership           | Databricks/metastore       | You (S3, ADLS, Volumes, etc.)      |
| Drop table effect        | Deletes both data & metadata | Deletes metadata only             |
| Best for                 | Internal/temporary data    | External/long-term/shared storage  |



# Read Data from delta

In [0]:
spark.read.format("delta").load("/Volumes/workspace/default/new_volume").display()

name,col,nums
Alice,apple,10
Alice,banana,10
Alice,cherry,10
Bob,orange,10
Bob,grape,10
Bob,melon,10
Charlie,pear,10
Charlie,kiwi,10
Charlie,mango,10


EVerytime if we try to do any rud and other operations it will create the new updated parquest files and json files in deltlog to keep the track of operations overtime 

# 🔁 What is Versioning in Delta Lake?
When you write, update, or delete data in a Delta Lake table, Delta Lake creates a new version of the table (also called a snapshot). These versions are stored as metadata files in the _delta_log/ directory of the table.

Each commit (like an insert or update) increases the version number

In [0]:
%sql
SELECT * FROM delta.`/Volumes/workspace/default/new_volume` VERSION AS OF 4

name,col,nums
Alice,apple,10
Alice,banana,10
Alice,cherry,10
Bob,orange,10
Bob,grape,10
Bob,melon,10
Charlie,pear,10
Charlie,kiwi,10
Charlie,mango,10


# DML in Delta Lake

In [0]:
data=[(1,"crip",30550,),(2,"prick",40440),(3,"jack",400),(4,"loath",500)]
df=spark.createDataFrame(data,["id","course","fee"])
# df.write.format("delta").mode("append").save("/Volumes/workspace/default/DML")
spark.read.format("delta").load("/Volumes/workspace/default/DML").display()


id,course,fee
1,pip,3000
2,spark,4000
3,pyspark,4000
4,spark,5000
1,crip,30550
2,prick,40440
3,jack,400
4,loath,500


# Update in delta lake

In [0]:
%sql
update delta.`/Volumes/workspace/default/DML` 
set fee = '3005' where id = '1'

num_affected_rows
2


In [0]:
%sql
update delta.`/Volumes/workspace/default/DML` 
set course = 'pipt' where course = 'pip'

num_affected_rows
1


# Deletion Vectors

Deletion Vectors (DVs) are a feature in Delta Lake that allow rows to be logically deleted without rewriting the entire data file (Parquet). Instead of physically deleting rows, Delta Lake tracks deleted rows separately using vector files — making deletes faster and more efficient, especially for large datasets.

**To physically remove deleted data we use vaccum**


_Using MERGE operation in Delta Lake is an implementation upsert or  Slowly Changing Dimension (SCD1)_<br>
**merge into sink using source**<br>
**condtion** <br>
**WHEN MATCHED THEN**<br>
 **UPDATE SET ...**<br>
**WHEN NOT MATCHED THEN**<br>
  **INSERT ...**

# In Python upsert

In [0]:
data=[(1,'python',5000),(6,'scala',10000),(7,'java',15000)]
df=spark.createDataFrame(data,["id","course","fee"])

In [0]:
from delta.tables import DeltaTable
delta_table=DeltaTable.forPath(spark,"/Volumes/workspace/default/DML")
delta_table.alias("trgt").merge(df.alias("src"),"trgt.id=src.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from delta.`/Volumes/workspace/default/DML`

id,course,fee
2,spark,4000
3,pyspark,4000
4,spark,5000
2,prick,40440
3,jack,400
4,loath,500
1,python,5000
1,python,5000
6,scala,10000
7,java,15000


# Table Utiliy commands

**Check schema**

In [0]:
%sql
describe delta.`/Volumes/workspace/default/DML`

col_name,data_type,comment
id,bigint,
course,string,
fee,bigint,


**Check the table details**

In [0]:
%sql
describe detail delta.`/Volumes/workspace/default/DML`

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,bccb664e-976f-4f28-826e-03f627f3096f,,,dbfs:/Volumes/workspace/default/DML,2025-07-13T14:19:01.155Z,2025-07-13T15:15:30.000Z,List(),List(),1,1242,Map(delta.enableDeletionVectors -> true),3,7,"List(appendOnly, deletionVectors, invariants)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False


**Better view we use extended**

In [0]:
%sql
describe extended delta.`/Volumes/workspace/default/DML`

col_name,data_type,comment
id,bigint,
course,string,
fee,bigint,
,,
# Delta Statistics Columns,,
Column Names,"id, course, fee",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,new_catalog,


**Check what operations have been performed on each version**

In [0]:
%sql
describe history delta.`/Volumes/workspace/default/DML`

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
7,2025-07-13T15:15:30.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,,0713-150105-y8f49oi7-v2n,6.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 4, numRemovedBytes -> 4358, p25FileSize -> 1242, numDeletionVectorsRemoved -> 1, minFileSize -> 1242, numAddedFiles -> 1, maxFileSize -> 1242, p75FileSize -> 1242, p50FileSize -> 1242, numAddedBytes -> 1242)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
6,2025-07-13T15:15:27.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,MERGE,"Map(predicate -> [""(id#11105L = id#11089L)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,,0713-150105-y8f49oi7-v2n,5.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 3, numTargetBytesAdded -> 3152, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 2, executionTimeMs -> 3827, materializeSourceTimeMs -> 424, numTargetRowsInserted -> 2, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 1450, numTargetRowsUpdated -> 2, numOutputRows -> 4, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 3, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1836)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
5,2025-07-13T15:01:39.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,,0713-150105-y8f49oi7-v2n,4.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 2222, p25FileSize -> 1206, numDeletionVectorsRemoved -> 1, minFileSize -> 1206, numAddedFiles -> 1, maxFileSize -> 1206, p75FileSize -> 1206, p50FileSize -> 1206, numAddedBytes -> 1206)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
4,2025-07-13T15:01:35.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,UPDATE,"Map(predicate -> [""(course#10459 = pip)""])",,,0713-150105-y8f49oi7-v2n,3.0,WriteSerializable,False,"Map(numRemovedFiles -> 0, numRemovedBytes -> 0, numCopiedRows -> 0, numDeletionVectorsAdded -> 1, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 8810, numDeletionVectorsUpdated -> 0, scanTimeMs -> 6243, numAddedFiles -> 1, numUpdatedRows -> 1, numAddedBytes -> 1016, rewriteTimeMs -> 2534)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
3,2025-07-13T14:30:50.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,,0713-141740-cor2s2lb-v2n,2.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 3, numRemovedBytes -> 3296, p25FileSize -> 1206, numDeletionVectorsRemoved -> 2, minFileSize -> 1206, numAddedFiles -> 1, maxFileSize -> 1206, p75FileSize -> 1206, p50FileSize -> 1206, numAddedBytes -> 1206)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
2,2025-07-13T14:30:47.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,UPDATE,"Map(predicate -> [""(id#10966L = 1)""])",,,0713-141740-cor2s2lb-v2n,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 0, numRemovedBytes -> 0, numCopiedRows -> 0, numDeletionVectorsAdded -> 2, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 0, executionTimeMs -> 3483, numDeletionVectorsUpdated -> 0, scanTimeMs -> 2057, numAddedFiles -> 1, numUpdatedRows -> 2, numAddedBytes -> 1080, rewriteTimeMs -> 1398)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
1,2025-07-13T14:20:59.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,WRITE,"Map(mode -> Append, statsOnLoad -> false, partitionBy -> [])",,,0713-141740-cor2s2lb-v2n,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1086)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
0,2025-07-13T14:19:07.000Z,3444117624069686,shaikmohammadthaheer@gmail.com,WRITE,"Map(mode -> ErrorIfExists, statsOnLoad -> false, partitionBy -> [])",,,0713-141740-cor2s2lb-v2n,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1130)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12


**Time Travelling**

In [0]:
%sql
--current data
select * from delta.`/Volumes/workspace/default/DML`

id,course,fee
2,spark,4000
3,pyspark,4000
4,spark,5000
2,prick,40440
3,jack,400
4,loath,500
1,python,5000
1,python,5000
6,scala,10000
7,java,15000


In [0]:
%sql
--time travelling 
restore delta.`/Volumes/workspace/default/DML` to version as of 1

    

table_size_after_restore,num_of_files_after_restore,num_removed_files,num_restored_files,removed_files_size,restored_files_size
2216,2,1,2,1242,2216


In [0]:
%sql
-- data restored to version 1 
select * from delta.`/Volumes/workspace/default/DML`

id,course,fee
1,pip,3000
2,spark,4000
3,pyspark,4000
4,spark,5000
1,crip,30550
2,prick,40440
3,jack,400
4,loath,500


**Use version with select statement**

In [0]:
%sql
-- data restored to version 1 and we can we version with SELECT statement
select * from delta.`/Volumes/workspace/default/DML`
version as of 5

id,course,fee
1,pipt,3005
1,crip,3005
2,spark,4000
3,pyspark,4000
4,spark,5000
2,prick,40440
3,jack,400
4,loath,500


**we can use timestamp instead of version**

In [0]:
%sql
select * from delta.`/Volumes/workspace/default/DML`
timestamp as of '2025-07-13T15:01:35.000+00:00'

id,course,fee
1,crip,3005
2,spark,4000
3,pyspark,4000
4,spark,5000
2,prick,40440
3,jack,400
4,loath,500
1,pipt,3005


# 🧹 Delta Lake: VACUUM and `_delta_log` Retention

Delta Lake provides features like **time travel**, **version history**, and **ACID transactions**. Two important mechanisms to manage storage and performance are:

- **`VACUUM`** – to clean up old **data files**
- **`_delta_log` retention** – to manage **transaction log history**

---

## ✅ 1. VACUUM in Delta Lake

### 🔍 What is VACUUM?

`VACUUM` removes **physically unused files** from the storage layer. These files become obsolete after operations like:

- `DELETE`
- `UPDATE`
- `MERGE`
- `OPTIMIZE`

Delta retains old files temporarily for time travel and rollback.

---

### ⏱️ Default Retention

- Delta Lake keeps unused data files for **7 days** by default.
- This allows time travel using older versions (within 7 days).

---

### 🧪 Syntax Examples

```sql
-- Default behavior: keeps last 7 days' files
VACUUM my_table;

-- Force delete all old files immediately (⚠️ no time travel possible)
VACUUM my_table RETAIN 0 HOURS;






## ⏳ Default Log Retention

Delta keeps logs in `_delta_log/` for **30 days** by default.

Older logs may be cleaned automatically based on retention rules.

---

## 🛠️ Configure Log Retention

```sql
-- Change log retention to 10 days
ALTER TABLE my_table SET TBLPROPERTIES (
  'delta.logRetentionDuration' = 'interval 10 days'
);

-- Keep logs forever (not recommended for large tables)
ALTER TABLE my_table SET TBLPROPERTIES (
  'delta.logRetentionDuration' = 'interval 0 days'
);

# 📋 Cloning in Delta Lake

Delta Lake supports a powerful feature called **cloning**, which lets you create a copy of a table at a specific **point in time or version** — **even if the original table doesn’t have a timestamp column**.

---

## 🔍 What is a Clone?

Delta supports two types of clones:

| Type            | Description                                          |
|-----------------|------------------------------------------------------|
| `SHALLOW CLONE` | Creates a reference to the original data files (fast, storage-efficient) |
| `DEEP CLONE`    | Physically copies data and metadata (independent copy) |

---

## ✅ Syntax: Clone Current Table

```sql
-- Shallow clone
CREATE OR REPLACE TABLE my_clone SHALLOW CLONE my_table;

-- Deep clone
CREATE OR REPLACE TABLE my_deep_clone DEEP CLONE my_table;


In [0]:
%sql
create table workspace.default.cloning_tabel clone delta.`/Volumes/workspace/default/DML`
    


source_table_size,source_num_of_files,num_of_synced_transactions,num_removed_files,num_copied_files,removed_files_size,copied_files_size
2216,2,,0,2,0,2216


# 🚀 OPTIMIZE and Z-ORDER in Delta Lake

Delta Lake provides powerful commands to improve performance on large datasets. Two important features for file management and query optimization are:

- 🔧 `OPTIMIZE` – reduces small files by compaction
- 🧭 `Z-ORDER` – improves query performance by co-locating related data

---

## 🔧 OPTIMIZE in Delta Lake

### 📌 What It Does

`OPTIMIZE` compacts many small files in a Delta table into fewer large files. This improves:

- Query performance (less file scanning)
- Data skipping
- Cost efficiency on cloud storage

### 🧪 Syntax

```sql
OPTIMIZE table_name;


OPTIMIZE sales_data ZORDER BY (customer_id);
This tells Delta Lake to reorganize the data so rows with similar customer_id values are stored near each other.


# 💧 Liquid Clustering in Delta Lake

Delta Lake introduces **Liquid Clustering** to overcome the limitations of traditional static partitioning and Z-Ordering.

---

## 🔍 What is Liquid Clustering?

**Liquid Clustering** allows you to **cluster your Delta table dynamically** based on one or more columns — without needing strict partitions or frequent reorganization via `OPTIMIZE`.

> Think of it as **"auto-optimized Z-ORDER + partitioning"** — but more flexible, efficient, and scalable.

---

## 🧠 Why Use It?

| Problem with Traditional Methods        | How Liquid Clustering Helps                     |
|----------------------------------------|--------------------------------------------------|
| Manual partitions can become skewed    | Clustering is dynamic and adaptive              |
| `OPTIMIZE ZORDER` is compute-intensive | Delta engine clusters data during write         |
| Hard to maintain over time             | Requires **no manual optimization**             |

---

## ✅ How to Enable Liquid Clustering

You need to **set clustering columns** on the Delta table.

### 📌 Example:

```sql
ALTER TABLE orders
SET TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = true,
  'delta.autoOptimize.autoCompact' = true,
  'delta.clusteredBy' = 'customer_id',
  'delta.clusteredBy.maxBuckets' = '100'
);
