# Working with Delta Tables in Databricks
Delta Lake provides reliability and performance improvements for big data workloads on Databricks.
This notebook covers:
- Creating Delta Tables
- Updating and Upserting Data
- Querying Delta Tables
- Time Travel in Delta Lake
- Best Practices

In [None]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

spark = SparkSession.builder.appName('DeltaStudy') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .getOrCreate()

### Creating a Delta Table
This example creates a Delta table with sample data representing customer orders.

In [None]:
data = [
    (101, 1, 'Laptop', 1),
    (102, 2, 'Mouse', 2),
    (103, 1, 'Keyboard', 1),
    (104, 3, 'Monitor', 1),
]
columns = ['order_id', 'customer_id', 'product', 'quantity']

df = spark.createDataFrame(data, columns)

# Write as Delta Table
df.write.format('delta').mode('overwrite').save('/mnt/delta/orders')

### Upserting Data
This code demonstrates upserts (merge operations) to add new records and update existing ones.

In [None]:
delta_table = DeltaTable.forPath(spark, '/mnt/delta/orders')

new_data = [(105, 2, 'Laptop', 1), (102, 2, 'Mouse', 3)]
columns = ['order_id', 'customer_id', 'product', 'quantity']
new_df = spark.createDataFrame(new_data, columns)

delta_table.alias('old').merge(
    new_df.alias('new'),
    'old.order_id = new.order_id'
).whenMatchedUpdate(set={'quantity': 'new.quantity'})
 .whenNotMatchedInsertAll()
 .execute()

delta_table.toDF().show()

### Querying Delta Tables
Delta tables can be queried like any Spark DataFrame. Here’s an example retrieving all records from the orders table.

In [None]:
df = spark.read.format('delta').load('/mnt/delta/orders')
df.show()

### Time Travel with Delta Tables
Delta Lake supports time travel, which allows you to query data as it existed at a previous version.
Below is an example querying an older version of the orders table.

In [None]:
df_v0 = spark.read.format('delta').option('versionAsOf', 0).load('/mnt/delta/orders')
df_v0.show()

### Best Practices
- **Partition Delta Tables**: Partitioning helps improve performance for large tables.
- **Vacuum Unused Data**: Run the `VACUUM` command to remove old data files that are no longer needed.
- **Optimize with ZORDER**: Use ZORDER by frequently filtered columns to improve query speed.

These practices help maintain efficiency and reduce storage in Delta Lake environments.

## Summary
In this notebook, we explored Delta tables and covered:
- Creating and writing to Delta tables
- Upserting records in Delta tables
- Querying and using time travel
- Best practices for Delta Lake

Delta tables provide an efficient and powerful data management solution for big data environments, especially with Databricks.