# **Setup**

In [7]:
! pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('coding_challenge').getOrCreate()
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

!pip install delta-spark


Collecting delta-spark
  Downloading delta_spark-3.2.0-py3-none-any.whl.metadata (2.0 kB)
Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.2.0


# **Vehicle Maintenance Data Ingestion**

In [None]:
# Task 1

import os
from delta.tables import DeltaTable

csv_file_path = "/content/drive/MyDrive/DataEngineering/DataBricks_Coding_Challenge/vehicle_maintenance.csv"

if os.path.exists(csv_file_path):
  try:
    df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
    df.show()
    delta_table_path = "/content/vehicle_maintenance"
    df.write.format("delta").mode("overwrite").save(delta_table_path)
    print("CSV data ingested into Delta table.")
  except Exception as e:
    print(f"Error reading CSV file: {e}")
else:
    print("CSV file does not exist.")


In [None]:
# Task 2

# Filter for valid records (positive values for ServiceCost and Mileage)
cleaned_df = df.filter((col("ServiceCost") > 0) & (col("Mileage") > 0))

# Remove duplicates based on VehicleID and Date
cleaned_df = cleaned_df.dropDuplicates(["VehicleID", "Date"])

# Save cleaned data to a new Delta table
cleaned_delta_table_path = '/content/delta/cleaned_vehicle_maintenance'
cleaned_df.write.format("delta").mode("overwrite").save(cleaned_delta_table_path)

print("Data cleaned and saved to new Delta table.")


In [None]:
# Task 3

# Calculate total maintenance cost for each vehicle
total_cost_df = cleaned_df.groupBy("VehicleID").sum("ServiceCost").withColumnRenamed("sum(ServiceCost)", "TotalMaintenanceCost")

# Identify vehicles that have exceeded 30,000 miles
high_mileage_df = cleaned_df.filter(col("Mileage") > 30000)

# Save analysis results to Delta tables
total_cost_table_path = "/content/delta/vehicle_maintenance_total_cost"
high_mileage_table_path = "/content/delta/vehicle_high_mileage"

total_cost_df.write.format("delta").mode("overwrite").save(total_cost_table_path)
high_mileage_df.write.format("delta").mode("overwrite").save(high_mileage_table_path)

print("Analysis results saved to Delta tables.")


In [None]:
# Task 4

# Perform VACUUM to clean up old data
delta_table = DeltaTable.forPath(spark, cleaned_delta_table_path)
delta_table.vacuum(retentionHours=168)  # Retains last 7 days (168 hours) by default

# View Delta table history
spark.sql(f"DESCRIBE HISTORY delta.`{cleaned_delta_table_path}`").show()

print("Data governance tasks completed.")


# **Movie Ratings Data Ingestion**

In [None]:
# Task 1

spark = SparkSession.builder \
    .appName("MovieRatings") \
    .getOrCreate()

# Upload CSV to DBFS
dbutils.fs.cp("file:/content/movie_ratings.csv", "dbfs:/Filestore/movie_ratings.csv")

# Read the CSV file into Spark DataFrame
try:
    movie_ratings_df = spark.read.format("csv").option("header", "true").load("dbfs:/Filestore/movie_ratings.csv")
    movie_ratings_df.show()

    # Check for missing or inconsistent data (ratings outside of 1-5)
    invalid_data_df = movie_ratings_df.filter((col("Rating").cast(IntegerType()) < 1) | (col("Rating").cast(IntegerType()) > 5) | col("Rating").isNull())

    if invalid_data_df.count() > 0:
        print("Invalid or missing data found:")
        invalid_data_df.show()
    else:
        # Write valid data to Delta table
        delta_table_path = "dbfs:/Filestore/delta_movie_ratings"
        movie_ratings_df.write.format("delta").mode("overwrite").save(delta_table_path)
        print("Movie ratings ingested into Delta table.")
except Exception as e:
    print(f"Error during ingestion: {e}")

In [None]:
# Task 2

# Filter ratings between 1 and 5
cleaned_df = movie_ratings_df.filter((col("Rating").cast(IntegerType()) >= 1) & (col("Rating").cast(IntegerType()) <= 5))

# Remove duplicates based on UserID and MovieID
cleaned_df = cleaned_df.dropDuplicates(["UserID", "MovieID"])

# Write cleaned data to a new Delta table
cleaned_delta_table_path = "dbfs:/Filestore/cleaned_delta_movie_ratings"
cleaned_df.write.format("delta").mode("overwrite").save(cleaned_delta_table_path)
print("Cleaned data saved to Delta table.")

In [None]:
# Task 3

from pyspark.sql.functions import avg

# Calculate the average rating for each movie
avg_ratings_df = cleaned_df.groupBy("MovieID").agg(avg("Rating").alias("AvgRating"))

# Identify movies with highest and lowest average ratings
highest_rated_movie = avg_ratings_df.orderBy(col("AvgRating").desc()).limit(1)
lowest_rated_movie = avg_ratings_df.orderBy(col("AvgRating").asc()).limit(1)

# Display results
print("Highest Rated Movie:")
highest_rated_movie.show()

print("Lowest Rated Movie:")
lowest_rated_movie.show()

# Save analysis results to a Delta table
analysis_delta_table_path = "dbfs:/Filestore/movie_rating_analysis"
avg_ratings_df.write.format("delta").mode("overwrite").save(analysis_delta_table_path)
print("Movie rating analysis saved to Delta table.")


In [None]:
# Task 4

# Step 1: Update a few ratings
cleaned_df.filter(col("MovieID") == "M001").withColumn("Rating", col("Rating") + 1).write.format("delta").mode("overwrite").save(cleaned_delta_table_path)

# Step 2: Time travel to previous version (version 0)
original_df = spark.read.format("delta").option("versionAsOf", 0).load(cleaned_delta_table_path)
print("Original Ratings:")
original_df.show()

# Step 3: View the history of changes
spark.sql(f"DESCRIBE HISTORY delta.`{cleaned_delta_table_path}`").show()


In [None]:
# Task 5

# Z-Ordering on MovieID
spark.sql(f"OPTIMIZE delta.`{cleaned_delta_table_path}` ZORDER BY MovieID")

# Compact data using OPTIMIZE
spark.sql(f"OPTIMIZE delta.`{cleaned_delta_table_path}`")

# Clean up old versions with VACUUM
spark.sql(f"VACUUM delta.`{cleaned_delta_table_path}` RETAIN 0 HOURS")

# **Data Ingestion - Reading Data from Various Formats**

In [None]:
# Task 1

# Reading CSV data for student information
csv_path = "dbfs:/Filestore/student_info.csv"
student_df = spark.read.format("csv").option("header", "true").load(csv_path)
student_df.show()

# Reading JSON data for city information
json_path = "dbfs:/Filestore/city_info.json"
city_df = spark.read.json(json_path)
city_df.show()

# Reading Parquet data for hospitals
parquet_path = "dbfs:/Filestore/hospital_info.parquet"
hospital_df = spark.read.parquet(parquet_path)
hospital_df.show()

# Reading Delta table with error handling
delta_table_path = "dbfs:/Filestore/delta_hospital"

try:
    delta_df = spark.read.format("delta").load(delta_table_path)
    delta_df.show()
except Exception as e:
    print(f"Error reading Delta table: {e}")



In [None]:
# Task 2

# Writing student data to CSV
student_df.write.format("csv").mode("overwrite").option("header", "true").save("dbfs:/Filestore/student_output.csv")

# Writing city data to JSON
city_df.write.format("json").mode("overwrite").save("dbfs:/Filestore/city_output.json")

# Writing hospital data to Parquet
hospital_df.write.format("parquet").mode("overwrite").save("dbfs:/Filestore/hospital_output.parquet")

# Writing hospital data to Delta table
hospital_df.write.format("delta").mode("overwrite").save("dbfs:/Filestore/delta_hospital_output")


## **Notebook A**

In [None]:
# Task 3

# Reading, cleaning, and saving student data as Delta
csv_path = "dbfs:/Filestore/student_info.csv"
student_df = spark.read.format("csv").option("header", "true").load(csv_path)

# Clean data: Remove duplicates
cleaned_student_df = student_df.dropDuplicates()

# Save cleaned data as Delta
cleaned_student_df.write.format("delta").mode("overwrite").save("dbfs:/Filestore/delta_cleaned_student")


## **Notebook B**

In [None]:
# Reading Delta table and performing analysis
delta_student_df = spark.read.format("delta").load("dbfs:/Filestore/delta_cleaned_student")

# Calculate average score
avg_score_df = delta_student_df.groupBy("Class").avg("Score")

# Save analysis results to Delta table
avg_score_df.write.format("delta").mode("overwrite").save("dbfs:/Filestore/delta_avg_score")


## **Running Notebook B from Notebook A**

In [None]:
# Notebook A - running Notebook B
dbutils.notebook.run("NotebookB", 60)  # Timeout of 60 seconds


# **Additional Tasks**

In [None]:
# Optimization, Z-Ordering, and Vacuum Tasks

# Optimize the Delta table
spark.sql("OPTIMIZE delta.`dbfs:/Filestore/delta_hospital_output`")

# Z-ordering on CityName column
spark.sql("OPTIMIZE delta.`dbfs:/Filestore/delta_hospital_output` ZORDER BY (CityName)")

# Vacuum Delta table to remove old files
spark.sql("VACUUM delta.`dbfs:/Filestore/delta_hospital_output` RETAIN 7 HOURS")


In [None]:
# Exercise 1: Creating a Complete ETL Pipeline using Delta Live Tables (DLT)

import dlt
from pyspark.sql.functions import col

# Raw Transactions Table
@dlt.table(name="raw_transactions")
def raw_transactions():
    return (spark.read.format("csv")
            .option("header", "true")
            .load("dbfs:/Filestore/transactions.csv"))

# Transformed Transactions Table
@dlt.table(name="transformed_transactions")
def transformed_transactions():
    return (dlt.read("raw_transactions")
            .withColumn("TotalAmount", col("Quantity") * col("Price")))

# Create Raw Transactions Table
spark.sql("""
CREATE OR REPLACE TABLE raw_transactions
AS SELECT * FROM csv.`dbfs:/Filestore/transactions.csv`;
""")

# Create Transformed Transactions Table
spark.sql("""
#CREATE OR REPLACE TABLE transformed_transactions
#AS SELECT *, Quantity * Price AS TotalAmount
#FROM raw_transactions;
""")


In [None]:
# Exercise 2: Delta Lake Operations (Read, Write, Update, Delete, Merge)

# Read Delta table in PySpark
df = spark.read.format("delta").load("dbfs:/Filestore/delta_transactions")
df.show(5)

# SQL method
spark.sql("SELECT * FROM delta.`dbfs:/Filestore/delta_transactions` LIMIT 5").show()

# Append new transactions in PySpark
new_data = [(6, "2024-09-06", "C005", "Keyboard", 4, 100),
            (7, "2024-09-07", "C006", "Mouse", 10, 20)]
new_df = spark.createDataFrame(new_data, schema=df.schema)

new_df.write.format("delta").mode("append").save("dbfs:/Filestore/delta_transactions")

# PySpark Update
spark.sql("UPDATE delta.`dbfs:/Filestore/delta_transactions` SET Price = 1300 WHERE Product = 'Laptop'")

# Merging data into Delta table
merge_data = [(1, "2024-09-01", "C001", "Laptop", 1, 1250),  # Updated
              (8, "2024-09-08", "C007", "Charger", 2, 30)]   # New

merge_df = spark.createDataFrame(merge_data, schema=df.schema)

merge_df.createOrReplaceTempView("updates")

spark.sql("""
MERGE INTO delta.`dbfs:/Filestore/delta_transactions` AS t
USING updates AS u
ON t.TransactionID = u.TransactionID
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")


In [None]:
# Exercise 3: Delta Lake - History, Time Travel, and Vacuum

# View Delta table history
spark.sql("DESCRIBE HISTORY delta.`dbfs:/Filestore/delta_transactions`").show()

# Time travel using version
spark.read.format("delta").option("versionAsOf", 5).load("dbfs:/Filestore/delta_transactions").show()

# Time travel using timestamp
spark.read.format("delta").option("timestampAsOf", "2024-09-07 12:00:00").load("dbfs:/Filestore/delta_transactions").show()

# Vacuum with retention of 7 days
spark.sql("VACUUM delta.`dbfs:/Filestore/delta_transactions` RETAIN 7 HOURS")

# Convert Parquet to Delta
parquet_path = "dbfs:/Filestore/transactions_parquet"
delta_path = "dbfs:/Filestore/delta_transactions_from_parquet"

spark.read.format("parquet").load(parquet_path).write.format("delta").save(delta_path)


In [None]:
# Exercise 4: Incremental Load Pattern using Delta Lake

# Filter data for the first three days
initial_data = df.filter(df.TransactionDate <= "2024-09-03")
initial_data.write.format("delta").mode("overwrite").save("dbfs:/Filestore/delta_initial_transactions")

# Load new data (next four days)
new_data = df.filter(df.TransactionDate > "2024-09-03")
new_data.write.format("delta").mode("append").save("dbfs:/Filestore/delta_incremental_transactions")

# Incremental load: Read only new data (after 2024-09-03)
incremental_df = spark.read.format("delta").load("dbfs:/Filestore/delta_incremental_transactions")
incremental_df.show()

# Check Delta table history
spark.sql("DESCRIBE HISTORY delta.`dbfs:/Filestore/delta_incremental_transactions`").show()
