## Delta Lake: A Powerful Tool for Data Lakes

**Delta Lake** is an open-source storage layer that brings ACID transactions to data lakes. It builds upon the strengths of Apache Parquet, providing a reliable and scalable platform for big data analytics.

**Key Features of Delta Lake:**

1. **ACID Transactions:**
   * Ensures data consistency and reliability, even in concurrent writes.
   * Prevents data corruption and ensures data integrity.
2. **Schema Evolution:**
   * Supports schema evolution, allowing you to add, modify, or drop columns without breaking existing queries.
   * Provides flexibility in data modeling and schema changes.
3. **Time Travel:**
   * Allows you to query historical versions of your data.
   * Facilitates data auditing and rollback.
4. **Unified Batch and Streaming:**
   * Supports both batch and streaming data ingestion and processing.
   * Simplifies data pipelines and enables real-time analytics.
5. **Self-Healing:**
   * Automatically recovers from data corruption or failures.
   * Ensures data integrity and reliability.

**How Delta Lake Works:**

1. **Data Storage:** Delta Lake stores data in Parquet format, which is a highly efficient columnar format.
2. **Transaction Log:** It maintains a transaction log that records all changes made to the table.
3. **Read and Write Operations:** When you read or write data to a Delta table, Delta Lake reads the transaction log to determine the current state of the table.
4. **Time Travel:** You can query historical versions of the table by specifying a specific version number or timestamp.

**Benefits of Using Delta Lake:**

* **Reliability and Consistency:** Ensures data integrity and consistency through ACID transactions.
* **Scalability:** Handles large-scale datasets and high-throughput workloads.
* **Flexibility:** Supports schema evolution and time travel.
* **Performance:** Optimized for fast query performance.
* **Simplified Data Pipelines:** Unifies batch and streaming processing.

By leveraging Delta Lake, you can build robust, scalable, and reliable data pipelines and data lakes.


## Real-world Examples of ACID Transactions

**ACID** transactions are fundamental to ensuring data integrity and consistency in database systems. Here are some real-world examples of ACID transactions:

### Financial Transactions:
* **Online Banking:** When transferring money between accounts, the transaction must be atomic: either both accounts are updated successfully, or neither is. 
* **ATM Withdrawals:** An ATM withdrawal involves multiple steps: deducting money from the account balance, dispensing cash, and updating the transaction log. All these steps must be completed successfully to ensure data consistency.

### E-commerce Transactions:
* **Order Processing:** When a customer places an order, a transaction is initiated to update inventory levels, process payments, and generate shipping labels. All these steps must be completed successfully to prevent inconsistencies.
* **Checkout Process:** During checkout, multiple operations, such as updating cart items, processing payments, and generating invoices, are performed as a single transaction.

### Airline Reservations:
* **Seat Booking:** When a customer books a flight, a transaction is initiated to update the seat availability and generate a booking confirmation. The transaction must be atomic to prevent overbooking.

### Inventory Management:
* **Stock Adjustments:** When a product is added to or removed from inventory, a transaction is initiated to update the inventory levels. This ensures that the inventory count is accurate.

**Key Points to Remember:**

* **Atomicity:** All operations within a transaction must be treated as a single unit. Either all operations are committed, or none are.
* **Consistency:** A transaction must ensure that the database state is valid before and after the transaction.
* **Isolation:** Concurrent transactions must not interfere with each other.
* **Durability:** Once a transaction is committed, its effects must be permanent, even in the event of system failures.

By adhering to the ACID properties, database systems can maintain data integrity and reliability, even in complex and high-traffic scenarios.


# Create Delta table

In [0]:
# Create a DataFrame
data = [
    ("Alice", 30),
    ("Bob", 25),
    ("Charlie", 35)
]

df = spark.createDataFrame(data, ["Name", "Age"])

# Write the DataFrame as a Delta table
df.write.format("delta").option("delta.columnMapping.mode", "name").saveAsTable("my_delta_table")

In [0]:
%sql
select * from my_delta_table

Name,Age
Charlie,35
Alice,30
Bob,25


# Update Delta table

In [0]:
%sql
update my_delta_table set Age=31 where Name='Alice'

num_affected_rows
1


In [0]:
%sql
select * from my_delta_table

Name,Age
Charlie,35
Alice,31
Bob,25


In [0]:
%sql
DESCRIBE detail my_delta_table;

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,277c61fb-ad3d-407f-91e5-dd730faa834b,spark_catalog.default.my_delta_table,,dbfs:/user/hive/warehouse/my_delta_table,2024-12-17T03:22:21.388+0000,2024-12-17T03:22:28.000+0000,List(),3,3651,"Map(delta.columnMapping.mode -> name, delta.columnMapping.maxColumnId -> 2)",2,5,"List(appendOnly, changeDataFeed, checkConstraints, columnMapping, generatedColumns, invariants)",Map()


#  Read the table using versionAsOf

In [0]:
# Read the table at version 1
df = spark.read.format("delta").option("versionAsOf", 1).load("dbfs:/user/hive/warehouse/my_delta_table")
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|Charlie| 35|
|  Alice| 31|
|    Bob| 25|
+-------+---+



#  Read the table using timestampAsOf

In [0]:
# Read the table as of 2023-01-01
#df = spark.read.format("delta").option("timestampAsOf", "2024-12-17T00:42:14.000+00:00").load("dbfs:/user/hive/warehouse/my_delta_table")
#df.show()

# Schema Evolution in Delta Lake

In [0]:
%sql
-- Make sure delta.columnMapping.mode set as name
SHOW TBLPROPERTIES my_delta_table;

key,value
delta.columnMapping.maxColumnId,5
delta.columnMapping.mode,name
delta.minReaderVersion,2
delta.minWriterVersion,5


## Add a new column

In [0]:
from pyspark.sql.functions import lit
# Add a new columna
df_with_city = df.withColumn("city", lit("New York"))

# Write the updated DataFrame to the Delta table
df_with_city.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable("my_delta_table")

In [0]:
%sql
select * from my_delta_table

Name,Age,city
Charlie,35,New York
Alice,31,New York
Bob,25,New York


## Rename the column

In [0]:
%sql
-- Enable column mapping
ALTER TABLE my_delta_table SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

-- Rename the column
ALTER TABLE my_delta_table RENAME COLUMN Name TO Ename;

In [0]:
%sql
select * from my_delta_table

Ename,Age,city
Charlie,35,New York
Alice,31,New York
Bob,25,New York


## Drop the column

In [0]:
%sql
ALTER TABLE my_delta_table DROP column city

In [0]:
%sql
select * from my_delta_table

Ename,Age
Charlie,35
Alice,31
Bob,25
