<a href="https://colab.research.google.com/github/showmiya-velusamy/Domain-Specific-Training/blob/main/ASSIGNMENT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=c11e61b31355c6dc29fb87d7170101b35f49bd0c922a04909561615c3fc7e90b
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
#assignment 1

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("DeltaLakeAssignment") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Import Delta Lake packages
from delta.tables import *

#Task 1: Creating Delta Table using Three Methods
# Load the CSV dataset (Employees)
employees_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/path_to/employees.csv")

# Load the JSON dataset (Products)
products_df = spark.read.format("json").load("/path_to/products.json")

# Write the DataFrame as a Delta Table (Employees)
employees_df.write.format("delta").mode("overwrite").save("/path_to/delta/employees_delta")

# Write the DataFrame as a Delta Table (Products)
products_df.write.format("delta").mode("overwrite").save("/path_to/delta/products_delta")

# Create Delta table for Employees using SQL
spark.sql("""
    CREATE TABLE delta_employees
    USING DELTA
    AS SELECT * FROM parquet./path_to/delta/employees_delta
""")

# Create Delta table for Products using SQL
spark.sql("""
    CREATE TABLE delta_products
    USING DELTA
    AS SELECT * FROM parquet./path_to/delta/products_delta
""")


# Task 2: Merge and Upsert (Slowly Changing Dimension - SCD)
#Load the Delta table for employees created in Task 1.

employee_df_updates = spark.read.format("csv").option("header", "true").load("/FileStore/employee_updates.csv")
employee_df_updates.write.format("delta").mode("overwrite").save("/delta/employee_updates")

employee_df_updates = spark.read.format("delta").load("/delta/employee_updates")
spark.sql("""
    CREATE TABLE delta_employee_updates
    USING DELTA
    AS SELECT * FROM parquet./path_to/delta/employees_updates
""")

#Merge the new employee data into the employees Delta table.
spark.sql("""
    MERGE INTO delta_employees AS target
    USING employee_updates AS source
    ON target.EmployeeID = source.EmployeeID
    WHEN MATCHED THEN UPDATE SET target.Salary = source.Salary, target.Department = source.Department
    WHEN NOT MATCHED THEN INSERT (EmployeeID, Name, Department, JoiningDate, Salary)
    VALUES (source.EmployeeID, source.Name, source.Department, source.JoiningDate, source.Salary)
""")

#Task 3: Internals of Delta Table
#Check the transaction history of the table.
spark.sql("DESCRIBE HISTORY delta_employee").show(truncate=False)

#Perform Time Travel and retrieve the table before the previous merge operation.

version_before_merge = delta_employee_table.history().filter("operation = 'MERGE'").select("version").first()[0] - 1

# Time Travel: Retrieve data before the previous merge
previous_version_df = spark.read.format("delta").option("versionAsOf", version_before_merge).load("/path_to/delta/employees_delta")
previous_version_df.show()


#Task 4: Optimize Delta Table
#Optimize the employees Delta table for better performance.
spark.sql("OPTIMIZE delta_employee_table")

#Use Z-ordering on the Department column for improved query performance.
spark.sql("""
OPTIMIZE delta_employee_table ZORDER BY Department """)

#Task 5: Time Travel with Delta Table
#Retrieve the employees Delta table as it was before the last merge.
time_travel_df = spark.read.format("delta").option("versionAsOf", version_before_merge).load("/path_to/delta/employees_delta")
time_travel_df.show()

#Task 6: Vacuum Delta Table
#Use the vacuum operation on the employees Delta table to remove old versions and free up disk space.
#Set the retention period to 7 days and ensure that old files are deleted.
spark.sql("""
VACUUM delta_employee_table RETAIN 168 HOURS """)

In [2]:
#ASSIGNMENT 2

dbutils.fs.cp("file:/content/sample_data/sales_data.csv", "dbfs:/FileStore/streaming/input/sales_data.csv")
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

#Initialize SparkSession

spark =SparkSession.builder\
     .appName("StructuredStreamingExample") \
     .getOrCreate()

#TASK 1
#Define the schema for the CSV data
sales_schema = "TransactionID INT, TransactionDate STRING, ProductID INT, Quantity INT, Price DOUBLE"

#Read streaming data from CSV files
df_sales_stream =spark.readStream\
          .format("csv") \
          .option("header", "true") \
          .schema (sales_schema)\
          .load("dbfs:/Filestore/streaming/input/")

query = spark.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
query.awaitTermination()

#TASK 2
transformed_df = spark.withColumn("TotalAmount", col("Quantity") * col("Price")) \
                   .filter(col("Quantity") > 1)

memory_query = transformed_df.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("transformed_data") \
    .start()

memory_query.awaitTermination()
spark.sql("SELECT * FROM transformed_data").show()

#TASK 3
aggregated_df = transformed_df.groupBy("ProductID") \
                              .agg({"TotalAmount": "sum"}) \
                              .withColumnRenamed("sum(TotalAmount)", "TotalSales")

aggregated_query = aggregated_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()
aggregated_query.awaitTermination()

#TASK 4
query_to_parquet = (aggregated_query.writeStream
                                .format("parquet")
                                .outputMode("append")
                                .option("path", "/output/stream_parquet/")
                                .option("checkpointLocation", "/output/checkpoints/")
                                .start())

#TASK 5
watermark_df = transformed_df.withWatermark("TransactionDate", "1 day")
watermarked_aggregated_df = watermark_df.groupBy("ProductID") \
                                        .agg({"TotalAmount": "sum"}) \
                                        .withColumnRenamed("sum(TotalAmount)", "TotalSales")

#TASK 6
product_schema = StructType([
    StructField("ProductID", StringType(), True),
    StructField("ProductName", StringType(), True),
    StructField("Category", StringType(), True)
])

products_df = spark.readStream \
    .schema(product_schema) \
    .csv("file:/content/sample_data/sales_data.csv")

joined_df = transformed_df.join(products_df, on="ProductID", how="inner")

joined_query = joined_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()
joined_query.awaitTermination()

#TASK 7
query.stop()
query_restarted = (df_sales_stream.writeStream
                               .format("parquet")
                               .outputMode("append")
                               .option("path", "/output/stream_restarted/")
                               .option("checkpointLocation", "/output/checkpoints/")
                               .start())





NameError: name 'dbutils' is not defined

In [None]:
#ASSIGNMENT 3

# Task 1: Create an ETL Pipeline using DLT (Python)

from pyspark.sql.functions import col

# Read the source data from CSV/Parquet
source_df = spark.read.format("csv").option("header", "true").load("file:/content/sample_data/orders_data.csv")

# Transform the data
# Add a new column 'TotalAmount' by multiplying 'Quantity' by 'Price'
transformed_df = source_df.withColumn("TotalAmount", col("Quantity") * col("Price"))

# Filter where Quantity is greater than 1
transformed_filtered_df = transformed_df.filter(col("Quantity") > 1)

# Write the transformed data to a Delta table
transformed_filtered_df.write.format("delta").mode("overwrite").save("file:/content/sample_data/delta_orders_table")

# Task 2: Create an ETL Pipeline using DLT (SQL)
# Create a table from the source CSV data
spark.sql("""
CREATE OR REPLACE TABLE orders_raw
USING CSV
OPTIONS (path "file:/content/sample_data/orders_data.csv", header "true");
""")

# 2: Transform the data by adding TotalAmount and filtering
spark.sql("""
CREATE OR REPLACE TABLE orders_transformed AS
SELECT *, (Quantity * Price) AS TotalAmount
FROM orders_raw
WHERE Quantity > 1;
""")

# 3: Write the transformed data into a Delta table
spark.sql("""
CREATE OR REPLACE TABLE delta_orders_table AS
SELECT * FROM orders_transformed;
""")

# Task 3: Perform Read, Write, Update, and Delete Operations on Delta Table
# 1. Reading the data from the Delta table (PySpark)
df = spark.read.format("delta").load("file:/content/sample_data/delta_orders_table")
df.show()

# 2. Update the table (Increase the price of laptops by 10%)
spark.sql("""
    UPDATE delta_orders_table
    SET Price = Price * 1.10
    WHERE Product = 'Laptop'
""")

# 3. Delete rows where quantity is less than 2
spark.sql("""
    DELETE FROM delta_orders_table
    WHERE Quantity < 2
""")

# 4. Insert a new record
spark.sql("""
    INSERT INTO delta_orders_table (OrderID, OrderDate, CustomerID, Product, Quantity, Price, TotalAmount)
    VALUES (106, '2024-01-06', 'C006', 'Keyboard', 3, 50, 150)
""")

# Task 4: Merge Data (SCD Type 2)
spark.sql("""MERGE INTO delta_orders_table AS target
USING (SELECT * FROM new_orders_data) AS source
ON target.OrderID = source.OrderID
WHEN MATCHED THEN
  UPDATE SET target.Quantity = source.Quantity,
             target.Price = source.Price,
             target.TotalAmount = source.Quantity * source.Price
WHEN NOT MATCHED THEN
  INSERT (OrderID, OrderDate, CustomerID, Product, Quantity, Price, TotalAmount)
  VALUES (source.OrderID, source.OrderDate, source.CustomerID, source.Product, source.Quantity, source.Price, source.Quantity * source.Price);
""")

# Task 5: Explore Delta Table Internals
#View the transaction history of the Delta table
spark.sql("""
DESCRIBE HISTORY delta_orders_table;
""")

#View file size and modification times
spark.sql("""
DESCRIBE DETAIL delta_orders_table;
""")

# Task 6: Time Travel in Delta Tables
#Query the table at a previous version
df_time_travel = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df_time_travel.show(truncate=False)

#Query the table using a timestamp
("""
SELECT * FROM delta_orders_table TIMESTAMP AS OF '2024-01-10 00:00:00';
""")

# Task 7: Optimize Delta Table
#Optimize the Delta table using Z-order on the Product column
spark.sql("""OPTIMIZE delta_orders_table
ZORDER BY (Product);
""")

#Vacuum the table to remove old files (older than 7 days)
spark.sql("""
VACUUM delta_orders_table RETAIN 7 HOURS;
""")

# Task 8: Converting Parquet Files to Delta Format
# Read the Parquet file
parquet_df = spark.read.format("parquet").load("file:/content/sample_data/historical_orders_parquet/")

# Convert the Parquet file to Delta
parquet_df.write.format("delta").save("file:/content/sample_data/historical_orders_delta/")

# Verify by querying the Delta table
delta_df = spark.read.format("delta").load("file:/content/sample_data/to/historical_orders_delta/")
delta_df.show()

In [6]:
pip install pyspark
pip install delta

SyntaxError: invalid syntax (<ipython-input-6-49f37b97164a>, line 1)

In [5]:
#ASSIGNMENT 4

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta import *

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Read the CSV file
df = spark.read.csv("file:/content/sample_data/orders_data.csv", header=True, inferSchema=True)

# Perform transformation
transformed_df = df.withColumn("TotalAmount", col("Quantity") * col("Price")) \
                   .filter(col("Quantity") > 5)

# Write the transformed data to a Delta table
transformed_df.write.format("delta").mode("overwrite").saveAsTable("transformed_orders")

# Read from the Delta table
df = spark.table("transformed_orders")

# Perform aggregation
aggregated_df = df.groupBy("Product").agg({"Quantity": "sum"})

# Write the aggregated data to a Delta table
aggregated_df.write.format("delta").mode("overwrite").saveAsTable("aggregated_orders")


ModuleNotFoundError: No module named 'delta'