## Task 1

In [None]:
! pip install Pyspark

Collecting Pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: Pyspark
  Building wheel for Pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for Pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=84b873268497fbf2a923a2d55db8a58d92df4c736103835b738ed82bddfc2f9a
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built Pyspark
Installing collected packages: Pyspark
Successfully installed Pyspark-3.5.2


In [None]:
dbutils.fs.cp("file:/Workspace/Shared/customer_transaction.csv", "dbfs:/FileStore/streaming/input/customer_transaction.csv")

create an ETL Pipeline using DLT (Python)

In [None]:
import dlt
from pyspark.sql.functions import col

@dlt.table
def raw_transactions():

    return spark.read.csv("dbfs:/FileStore/streaming/input/customer_transaction.csv", header=True)

@dlt.table
def transformed_transactions():
    df = dlt.read("raw_transactions")
    df_transformed = df.withColumn("TotalAmount", col("Quantity") * col("Price")) \
                       .filter(col("Quantity") > 1)

    return df_transformed


 Create an ETL Pipeline using DLT (SQL)

In [None]:
##  Step 1: Define the raw transactions table

df_transactions = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/streaming/input/customer_transactions.csv")

df_transactions.write.format("delta").mode("overwrite").save("/delta/customer_transactions")





In [None]:
%sql

CREATE OR REFRESH LIVE TABLE transformed_transactions AS
SELECT
    TransactionID,
    TransactionDate,
    CustomerID,
    Product,
    Quantity,
    Price,
    Quantity * Price AS TotalAmount
FROM delta."/delta/customer_transactions";



## task 2

 Delta Lake Operations - Read, Write, Update, Delete, Merge

1. Read Data from Delta Lake:

In [None]:
# using python
df = spark.read.format("delta").load("/delta/customer_transactions")

df.show(5)



In [None]:
%sql
SELECT * FROM delta.'/delta/customer_transactions' LIMIT 5;

2. Write Data to Delta Lake:

In [None]:
# Create new transactions DataFrame
new_data = [
    (6, "2024-09-06", "C005", "Keyboard", 4, 100),
    (7, "2024-09-07", "C006", "Mouse", 10, 20)
]

new_transactions_df = spark.createDataFrame(new_data, schema=["TransactionID", "TransactionDate", "CustomerID", "Product", "Quantity", "Price"])


new_transactions_df.write.format("delta").mode("append").save("/delta/customer_transactions")


3. Update Data in Delta Lake:

In [None]:
## using python

from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/delta/customer_transactions")

delta_table.update(
    condition = "Product = 'Laptop'",
    set = { "Price": "1300" }
)

df_updated = spark.read.format("delta").load("/delta/customer_transactions")
df_updated.filter("Product = 'Laptop'").show()


In [None]:
## using sql
UPDATE delta.`/delta/customer_transaction`
SET Price = 1300
WHERE Product = 'Laptop';

Delete Data from Delta Lake

In [None]:
## using python
delta_table.delete("Quantity < 3")

df_after_delete = spark.read.format("delta").load("/delta/customer_transactions")
df_after_delete.show()


In [None]:
## using sql
DELETE FROM delta.`/delta/customer_transactions`
WHERE Quantity < 3;

5. Merge Data into Delta Lake:

In [None]:
# Create DataFrame for merge
merge_data = [
    (1, "2024-09-01", "C001", "Laptop", 1, 1250),
    (8, "2024-09-08", "C007", "Charger", 2, 30)
]

merge_df = spark.createDataFrame(merge_data, schema=["TransactionID", "TransactionDate", "CustomerID", "Product", "Quantity", "Price"])


In [None]:
## using python

merge_df.createOrReplaceTempView("updates")

# Merge statement
delta_table.alias("target").merge(
    updates.alias("source"),
    "target.TransactionID = source.TransactionID"
).whenMatchedUpdate(
    condition="target.TransactionID = source.TransactionID",
    set={"Price": "source.Price", "Quantity": "source.Quantity", "TransactionDate": "source.TransactionDate"}
).whenNotMatchedInsertAll().execute()


In [None]:
## using sql
%sql
MERGE INTO delta.`/delta/customer_transactions` AS target
USING (SELECT * FROM VALUES
    (1, '2024-09-01', 'C001', 'Laptop', 1, 1250),
    (8, '2024-09-08', 'C007', 'Charger', 2, 30)
) AS source (TransactionID, TransactionDate, CustomerID, Product, Quantity, Price)
ON target.TransactionID = source.TransactionID
WHEN MATCHED THEN
    UPDATE SET target.Price = source.Price, target.Quantity = source.Quantity, target.TransactionDate = source.TransactionDate
WHEN NOT MATCHED THEN
    INSERT *


## task 3

Delta Lake - History, Time Travel, and Vacuum

1. View Delta Table History:

In [None]:
# Check the transaction history using PySpark
delta_table.history().show()
# Check file details using sql
spark.sql("DESCRIBE DETAIL delta.`/delta/customer_transactions`").show()

Perform Time Travel

In [None]:
# Load the table as it was 5 versions ago
df_version_5 = spark.read.format("delta").option("versionAsOf", 5).load("/delta/customer_transactions")
df_version_5.show()


In [None]:
# Retrieve the state of the table at a specific timestamp
timestamp = "2024-09-01T12:00:00"
df_at_time = spark.read.format("delta").option("timestampAsOf", timestamp).load("/delta/customer_transactions")

df_at_time.show()


In [None]:
## sql
%sql
SELECT * FROM delta.`/delta/orders` VERSION AS OF 5;

%sql
SELECT * FROM delta.`/delta/orders` TIMESTAMP AS OF '2024-09-01T12:00:00';


 Vacuum the Delta Table:

In [None]:

delta_table.vacuum(retentionHours=168)


In [None]:
spark.sql("VACUUM delta.`/delta/customer_transactions` RETAIN 168 HOURS")

 Converting Parquet Files to Delta Files

In [None]:

df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("dbfs:/FileStore/streaming/input/customer_transactions.csv")

df_csv.write.format("parquet").mode("overwrite").save("/parquet/customer_transactions")


In [None]:
# Convert the Parquet table to Delta
spark.read.format("parquet").load("/parquet/customer_transactions/").write.format("delta").mode("overwrite").save("/delta/orders_converted")


In [None]:
## Convert a Parquet table to Delta using SQL
%sql
CONVERT TO DELTA parquet.`/parquet/customer_transactions`;


## task 4

Implementing Incremental Load Pattern using Delta Lake

In [None]:

initial_data = [
    (1, "2024-09-01", "C001", "Laptop", 1, 1200),
    (2, "2024-09-02", "C002", "Tablet", 2, 300),
    (3, "2024-09-03", "C001", "Headphones", 5, 50)
]

columns = ["TransactionID", "TransactionDate", "CustomerID", "Product", "Quantity", "Price"]
df_initial = spark.createDataFrame(initial_data, columns)

df_initial.write.format("delta").mode("overwrite").save("/delta/transactions")


Incremental data

In [None]:

incremental_data = [
    (4, "2024-09-04", "C003", "Smartphone", 1, 800),
    (5, "2024-09-05", "C004", "Smartwatch", 3, 200),
    (6, "2024-09-06", "C005", "Keyboard", 4, 100),
    (7, "2024-09-07", "C006", "Mouse", 10, 20)
]

df_incremental = spark.createDataFrame(incremental_data, columns)


Implement Incremental Load

In [None]:

new_transactions = df_incremental.filter(col("TransactionDate") > "2024-09-03")

new_transactions.write.format("delta").mode("append").save("/delta/transactions")


Monitor incremental Load

In [None]:

%sql
DESCRIBE HISTORY delta.`/delta/transactions`
