# DELTA LAKE CONCEPTS

In [None]:
# copying files to dbfs
employee_csv_path = 'file:/Workspace/Shared/assignment17sep/employees.csv'

new_employee_csv_path = 'file:/Workspace/Shared/assignment17sep/NewEmployeeData.csv'

products_path = 'file:/Workspace/Shared/assignment17sep/products.json'

dbutils.fs.cp(employee_csv_path, 'dbfs:/FileStore/assignment17sep/employees.csv')

dbutils.fs.cp(new_employee_csv_path, 'dbfs:/FileStore/assignment17sep/NewEmployeeData.csv')

dbutils.fs.cp(products_path, 'dbfs:/FileStore/assignment17sep/products.json')

location = 'dbfs:/FileStore/assignment17sep/'

# loading csv and json
employees_df = spark.read.csv(f"{location}employees.csv", header=True, inferSchema=True)

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("ProductID", StringType(), True),
    StructField("ProductName", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Price", DoubleType(), True)
])

products_df = spark.read.schema(schema).json(f"{location}products.json")

employees_df.show()
products_df.show()

+----------+------------+-----------+-----------+------+
|EmployeeID|EmployeeName| Department|JoiningDate|Salary|
+----------+------------+-----------+-----------+------+
|       101|        John|         HR| 2023-01-10| 50000|
|       102|       Alice|    Finance| 2023-02-15| 70000|
|       103|        Mark|Engineering| 2023-03-20| 85000|
|       104|        Emma|      Sales| 2023-04-01| 55000|
|       105|        Liam|  Marketing| 2023-05-12| 60000|
+----------+------------+-----------+-----------+------+

+---------+-----------+-----------+------+
|ProductID|ProductName|   Category| Price|
+---------+-----------+-----------+------+
|     P101|     Laptop|Electronics|1200.0|
|     P102|      Phone|Electronics| 800.0|
|     P103|     Tablet|Electronics| 600.0|
|     P104|    Monitor|Electronics| 300.0|
|     P105|      Mouse|Accessories|  25.0|
+---------+-----------+-----------+------+



In [None]:
employees_df.write.format("delta").save(f"{location}delta/employees")
products_df.write.format("delta").save(f"{location}delta/products")

In [None]:
new_employee_df = spark.read.csv(f"{location}NewEmployeeData.csv", header=True, inferSchema=True)
new_employee_df.write.format("delta").mode("append").save(f"{location}delta/employees")
print("New data appended to Delta table successfully.")

# Create a temporary view for SQL operations
new_employee_df.createOrReplaceTempView("new_employee_data")

print("Merging new data into Delta table...")

delta_table_path = 'dbfs:/FileStore/assignment17sep/delta/employees'
spark.sql(f"""
MERGE INTO delta.`{delta_table_path}` AS target
USING new_employee_data AS source
ON target.EmployeeID = source.EmployeeID
WHEN MATCHED THEN UPDATE SET
    target.Salary = source.Salary
WHEN NOT MATCHED THEN INSERT (EmployeeID,EmployeeName,Department,JoiningDate,Salary)
    VALUES (source.EmployeeID,source.EmployeeName,source.Department,source.JoiningDate,source.Salary)
""")

print("Data merged successfully.")

New data appended to Delta table successfully.
Merging new data into Delta table...
Data merged successfully.


In [None]:
# History
print("Viewing Delta table history...")
history_df = spark.sql(f"DESCRIBE HISTORY delta.`{delta_table_path}`")
history_df.show(truncate=False)

# time travel
print("table before the previous merge operation.")
path = 'dbfs:/FileStore/assignment17sep/delta/employees'
df_time_travel = spark.read.format("delta").option("versionAsOf", 0).load(path)
df_time_travel.show(truncate=False)

# Vacuum
print("Vacuuming old files...")
spark.sql(f"VACUUM delta.`{delta_table_path}` RETAIN 168 HOURS")

print("Delta Table operations completed.")


Viewing Delta table history...
+-------+-------------------+----------------+----------------------------------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# delta table creation
spark.sql("CREATE TABLE IF NOT EXISTS delta_employee_table USING DELTA LOCATION 'dbfs:/FileStore/assignment17sep/delta/employees'")

# optimize
spark.sql("OPTIMIZE delta_employee_table")

# Zordering
spark.sql("OPTIMIZE delta_employee_table ZORDER BY Department")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [None]:
# Check the history to find the version numbers
spark.sql("DESCRIBE HISTORY delta_employee_table;").show()

# Retrieve the Delta table as it was at a specific version
spark.sql("SELECT * FROM delta_employee_table VERSION AS OF 0;").show()


+-------+-------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|          userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     11|2024-09-13 04:17:28|8379095214579684|azuser2109_mml.lo...| OPTIMIZE|{predicate -> [],...|NULL|{3714719545832430}|0911-102451-o8x6anfh|         10|SnapshotIsolation|        false|{numRemovedFiles ...|        NULL|Databricks-Runtim...|
|     10|2024-09-13 04:17:24

In [None]:
# retain last 7 days record only
spark.sql("VACUUM delta_employee_table RETAIN 168 HOURS")

DataFrame[path: string]

# STRUCTURED STREAMING AND TRANSFORMATION ON STREAMS

In [None]:
# folder for streaming
csv_path = 'file:/Workspace/Shared/assignment17sep/transactions.csv'
streaming_path ='dbfs:/FileStore/assignment17sep/streaming/input/'
dbutils.fs.cp(csv_path, f"{streaming_path}transactions.csv")

True

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Streaming").getOrCreate()

transaction_schema = "TransactionID STRING, TransactionDate DATE, ProductID STRING, Quantity INT, Price DOUBLE"
transaction_stream_df = spark.readStream.format("csv").option("header", True).schema(transaction_schema) \
            .load("dbfs:/FileStore/assignment17sep/streaming/input/")

transaction_stream_df.printSchema()

root
 |-- TransactionID: string (nullable = true)
 |-- TransactionDate: date (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)



In [None]:
transformed_stream_df = transaction_stream_df.withColumn("TotalAmount", transaction_stream_df["Quantity"] * transaction_stream_df["Price"]).filter(transaction_stream_df["Quantity"] > 1)

query = transformed_stream_df.writeStream \
    .format("memory") \
    .queryName("transformed_data") \
    .outputMode("append") \
    .start()

# To view the data from memory
spark.sql("SELECT * FROM transformed_data").show()


+-------------+---------------+---------+--------+-----+-----------+
|TransactionID|TransactionDate|ProductID|Quantity|Price|TotalAmount|
+-------------+---------------+---------+--------+-----+-----------+
+-------------+---------------+---------+--------+-----+-----------+



In [None]:
from pyspark.sql.functions import sum, col, to_timestamp

transaction_stream_df = transaction_stream_df.withColumn("TransactionDate", to_timestamp(col("TransactionDate")))

aggregated_stream_df = transaction_stream_df \
    .withWatermark("TransactionDate", "1 day") \
    .groupBy("ProductID") \
    .agg(sum(col("Quantity") * col("Price")).alias("TotalSales"))

query = aggregated_stream_df.writeStream \
    .format("console") \
    .outputMode("update") \
    .start()


In [None]:
query = aggregated_stream_df.writeStream \
    .format("parquet") \
    .option("path", "dbfs:/FileStore/assignment17sep/streaming/output/parquet/") \
    .option("checkpointLocation", "dbfs:/FileStore/assignment17sep/streaming/output/checkpoint/") \
    .outputMode("append") \
    .start()


In [None]:
csv_path = 'file:/Workspace/Shared/assignment17sep/products.csv'
dbutils.fs.cp(csv_path, f"{streaming_path}products.csv")

product_schema = "ProductID STRING, ProductName STRING, Category STRING"
product_stream_df = spark.readStream.format("csv").option("header", True).schema(product_schema) \
    .load("dbfs:/FileStore/assignment17sep/streaming/input/")

joined_stream = transaction_stream_df.join(product_stream_df, "ProductID")

query = joined_stream.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()


In [None]:
query.stop()

query = aggregated_stream_df.writeStream \
    .format("parquet") \
    .option("path", "dbfs:/FileStore/assignment17sep/streaming/output/parquet/") \
    .option("checkpointLocation", "dbfs:/FileStore/assignment17sep/streaming/output/checkpoint/") \
    .outputMode("append") \
    .start()

# CREATING A COMPLETE ETL PIPELINE USING DELTA LIVE TABLES

In [None]:
orders_csv_path = 'file:/Workspace/Shared/assignment17sep/orders.csv'
dbutils.fs.cp(orders_csv_path, "dbfs:/FileStore/assignment17sep/orders.csv")

True

In [None]:
import dlt
from pyspark.sql.functions import col, expr

# Read data from a CSV source
@dlt.table
def orders_raw():
    return spark.read.format("csv").option("header", True).load("dbfs:/FileStore/assignment17sep/orders.csv")

# Transform data (add TotalAmount and filter Quantity > 1)
@dlt.table
def orders_transformed():
    df = dlt.read("orders_raw")
    df = df.withColumn("TotalAmount", col("Quantity") * col("Price"))
    return df.filter(col("Quantity") > 1)

# Load the transformed data into a Delta table
@dlt.table
def orders_final():
    dlt.read("orders_transformed").write.format("delta").mode("overwrite").save("dbfs:/FileStore/assignment17sep/delta/orders_final")
    return dlt.read("orders_transformed")


Name,Type
OrderID,string
OrderDate,string
CustomerID,string
Product,string
Quantity,string
Price,string
TotalAmount,double


In [None]:
# python
df = spark.read.format("csv").load("dbfs:/FileStore/assignment17sep/orders.csv")
df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/assignment17sep/delta/orders")

# Read data from Delta Table
df = spark.read.format("delta").load("dbfs:/FileStore/assignment17sep/delta/orders")
df.show()

# Insert new record
df = df.union(spark.createDataFrame([(106, "2024-01-12", "C006", "Keyboard", 3, 50)], ["OrderID", "OrderDate", "CustomerID", "Product", "Quantity", "Price"]))
df.show()

# Update prices (increase price by 10%)
df = df.filter(col("_c3") == "Laptop").withColumn("Price", col("_c5") * 1.1)
df.show()

# Delete rows where Quantity < 2
df = df.filter(col("_c4") >= 2)
df.show()


+-------+----------+----------+-------+--------+-----+
|    _c0|       _c1|       _c2|    _c3|     _c4|  _c5|
+-------+----------+----------+-------+--------+-----+
|OrderID| OrderDate|CustomerID|Product|Quantity|Price|
|    101|2024-01-01|      C001| Laptop|       2| 1000|
|    102|2024-01-02|      C002|  Phone|       1|  500|
|    103|2024-01-03|      C003| Tablet|       3|  300|
|    104|2024-01-04|      C004|Monitor|       1|  150|
|    105|2024-01-05|      C005|  Mouse|       5|   20|
+-------+----------+----------+-------+--------+-----+

+-------+----------+----------+--------+--------+-----+
|    _c0|       _c1|       _c2|     _c3|     _c4|  _c5|
+-------+----------+----------+--------+--------+-----+
|OrderID| OrderDate|CustomerID| Product|Quantity|Price|
|    101|2024-01-01|      C001|  Laptop|       2| 1000|
|    102|2024-01-02|      C002|   Phone|       1|  500|
|    103|2024-01-03|      C003|  Tablet|       3|  300|
|    104|2024-01-04|      C004| Monitor|       1|  150|
|

In [None]:
#SQL

# Read data as Delta Table
spark.sql("CREATE TABLE IF NOT EXISTS delta_orders_table USING DELTA LOCATION 'dbfs:/FileStore/assignment17sep/delta/orders_final'")

# Update prices (increase laptops by 10%)
spark.sql("UPDATE delta_orders_table SET Price = Price * 1.1 WHERE Product = 'Laptop'").show()

# Delete rows where Quantity < 2
spark.sql("DELETE FROM delta_orders_table WHERE Quantity < 2").show()

# Insert new record
spark.sql("INSERT INTO delta_orders_table (OrderID, OrderDate, CustomerID, Product, Quantity, Price) VALUES (106, '2024-01-12', 'C006', 'Keyboard', 3, 50)")

+-----------------+
|num_affected_rows|
+-----------------+
|                0|
+-----------------+

+-----------------+
|num_affected_rows|
+-----------------+
|                0|
+-----------------+



DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [None]:
data = [
    (101, '2024-01-10', 'C001', 'Laptop', 2, 1200), 
    (106, '2024-01-12', 'C006', 'Keyboard', 3, 50)
    ]
    
schema = ["OrderID", "OrderDate", "CustomerID", "Product", "Quantity", "Price"]


new_orders_df = spark.createDataFrame(data, schema=schema)

new_orders_df.createOrReplaceTempView("new_orders_data")

print("Merging new data into Delta table...")

orders_df = spark.read.csv("dbfs:/FileStore/assignment17sep/orders.csv", header=True, inferSchema=True)
orders_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/assignment17sep/delta/orders1")


dbfs_path = 'dbfs:/FileStore/assignment17sep/delta/orders1'
spark.sql(f"""
MERGE INTO delta.`{dbfs_path}` AS target
USING new_orders_data AS source
ON target.OrderID = source.OrderID
WHEN MATCHED THEN UPDATE SET
    target.Quantity = source.Quantity, target.Price = source.Price
WHEN NOT MATCHED THEN INSERT (OrderID, OrderDate, CustomerID, Product, Quantity, Price) 
VALUES (source.OrderID, source.OrderDate, source.CustomerID, source.Product, source.Quantity, source.Price)
""")

print("New data merged successfully!")


Merging new data into Delta table...
New data merged successfully!


In [None]:
%sql
-- View the history of changes
DESCRIBE HISTORY delta.`dbfs:/FileStore/assignment17sep/delta/orders`;

-- View the detailed metadata
DESCRIBE DETAIL delta.`dbfs:/FileStore/assignment17sep/delta/orders`;


format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,e81d9a0a-eb4b-4ed5-9e98-227a4ab6321a,,,dbfs:/FileStore/assignment17sep/delta/orders,2024-09-17T08:11:32.971Z,2024-09-17T08:27:18Z,List(),List(),6,9354,Map(delta.enableDeletionVectors -> true),3,7,List(deletionVectors),"Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)"


In [None]:
%sql
-- Query the table before the last merge
SELECT * FROM delta.`dbfs:/FileStore/assignment17sep/delta/orders` VERSION AS OF 0;


_c0,_c1,_c2,_c3,_c4,_c5
OrderID,OrderDate,CustomerID,Product,Quantity,Price
101,2024-01-01,C001,Laptop,2,1000
102,2024-01-02,C002,Phone,1,500
103,2024-01-03,C003,Tablet,3,300
104,2024-01-04,C004,Monitor,1,150
105,2024-01-05,C005,Mouse,5,20


In [None]:
# Optimize the table for faster queries with Z-ordering
spark.sql("OPTIMIZE delta.`dbfs:/FileStore/assignment17sep/delta/orders`")

# Vacuum the table to remove old files
spark.sql("VACUUM delta.`dbfs:/FileStore/assignment17sep/delta/orders` RETAIN 168 HOURS")


DataFrame[path: string]

# CREATING AND SCHEDULING A JOB

In [None]:
csv_path = 'file:/Workspace/Shared/assignment17sep/orders.csv'

dbfs_path = 'dbfs:/Filestore/assignment17sep/orders.csv'

dbutils.fs.cp(csv_path, dbfs_path)

from pyspark.sql.functions import col

# Load CSV data
df = spark.read.format("csv").option("header", True).load(dbfs_path)

# Add TotalAmount column and filter records (I used greater than 2 according to data)
transformed_df = df.withColumn("TotalAmount", col("Quantity") * col("Price")).filter(col("Quantity") > 2)

# Write to Delta table
transformed_df.write.format("delta").mode("overwrite")\
    .save("dbfs:/Workspace/Shared/assignment17sep/orders_transformed")
