# Sales Data Delta Lake Project

This project demonstrates how to work with Delta Lake using Databricks Community Edition. 
We perform data ingestion, Delta table creation, and DML operations like **INSERT, UPDATE, DELETE, MERGE**, 
along with Delta-specific features such as **time travel** and **history tracking**.


## Step 1: Import Libraries
We start by importing the required libraries.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

## Step 2: Initialize Spark Session
We create a Spark session with Delta support.

In [None]:
spark = SparkSession.builder \
    .appName("SalesDataDeltaLakeProject") \
    .getOrCreate()

## Step 3: Load Sample Data
We create a small DataFrame with order data.

In [None]:
data = [
    (1, '2025-09-01', 'C001', 'Mobile', 2, 500),
    (2, '2025-09-02', 'C002', 'Tablet', 1, 300),
    (3, '2025-09-03', 'C003', 'Headphones', 5, 50)
]

columns = ["order_id", "order_date", "customer_id", "product", "quantity", "price"]
df_spark = spark.createDataFrame(data, columns)
df_spark.show()

## Step 4: Write Data as Delta Table
We save the DataFrame as a Delta table.

In [None]:
df_spark.write.format("delta").mode("overwrite").saveAsTable("sales_data_delta")

## Step 5: Run Queries
We can query the Delta table using SQL.

In [None]:
spark.sql("SELECT * FROM sales_data_delta").show()

## Step 6: Perform Updates
We update product prices.

In [None]:
spark.sql("""
UPDATE sales_data_delta
SET price = price + 50
WHERE product = 'Mobile'
""")

## Step 7: Perform Deletes
We delete rows for specific products.

In [None]:
spark.sql("""
DELETE FROM sales_data_delta
WHERE product = 'Laptop'
""")

## Step 8: Merge Operation (Upsert)
We demonstrate how to perform MERGE (UPSERT).

In [None]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "sales_data_delta")

new_data = [(2, '2025-09-02', 'C002', 'Tablet', 2, 350),
            (4, '2025-09-05', 'C004', 'Charger', 3, 20)]

df_new = spark.createDataFrame(new_data, columns)

delta_table.alias("t").merge(
    df_new.alias("s"),
    "t.order_id = s.order_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

## Step 9: Time Travel
We can query older versions of the Delta table.

In [None]:
spark.sql("SELECT * FROM sales_data_delta VERSION AS OF 0").show()

## Step 10: History
We can check the full history of the Delta table.

In [None]:
spark.sql("DESCRIBE HISTORY sales_data_delta").show(truncate=False)