In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=f4d6adfba2818bb13a925eab364db0ea8300a60d4f368fc553edf64a95c31109
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

spark=SparkSession.builder.appName("coding_assesment").getOrCreate()

In [None]:
df=pd.read_csv("vehicle.csv")
dbutils.fs.cp("file:/content/vehicle.csv","dbfs:/Filestore/vehicle.csv")
vehicle_df=spark.read.format("csv").option("header","true").load("dbfs:/Filestore/vehicle.csv")
vehicle_df.write.format("delta").mode("overwrite").save("dbfs:/Filestore/delta_vehicle")

try:
                                                        vehcile_df=spark.read.format("delta").load("dbfs:/Filestore/delta_vehicle")
except:
      error as e:
        print(e)



In [None]:
#regstering a table
%sql
create table delta_vehicle using delta location "dbfs:/Filestore/delta_vehicle"

# Read the registered Delta table
df = spark.read.table("delta_vehicle")

# Perform cleaning operations (as shown earlier)
df_cleaned = df.filter((df["ServiceCost"] > 0) & (df["Mileage"] > 0))
df_cleaned = df_cleaned.dropDuplicates(["VehicleID", "Date"])

# Save the cleaned data to a new Delta table
df_cleaned.write.format("delta").mode("overwrite").saveAsTable("delta_vehicle_cleaned")


In [None]:
# Calculate total maintenance cost for each vehicle
df_total_cost = df.groupBy("VehicleID").agg(sum("ServiceCost").alias("TotalMaintenanceCost"))

# Identify vehicles exceeding the mileage threshold
df_exceeding_mileage = df.filter(df["Mileage"] > 3000).select("VehicleID", "Mileage").distinct()

# Join total cost and exceeding mileage data
df_analysis = df_total_cost.join(df_exceeding_mileage, on="VehicleID", how="left")

# Save the analysis results to a new Delta table
df_analysis.write.format("delta").mode("overwrite").saveAsTable("delta_vehicle_analysis")

In [None]:
spark.sql("optimize delta_vehicle_cleaned")
spark.sql("vacuum delta_vehicle_cleaned retain 168 hours")

**Task-2**

In [None]:
dbutils.fs.cp("file:/content/ratings.csv","dbfs/filestore/ratings.csv")
ratings_df=spark.read.format("csv").option("header","true").load("dbfs:/filestore/ratings.csv")
ratings_df.write.format("delta").mode("overwrite").save("dbfs:/filestore/delta_ratings")

try:
  ratings_df=spark.read.format("delta").load("dbfs:/filestore/delta_ratings")
except:
    print("An error Occured")

# registering as a table
%sql
create table delta_ratings using delta location "dbfs:/filestore/delta_ratings"


In [None]:
#cleaning table
df=spark.read.table("delta_ratings")
df_cleaned=df.filter((df["rating"]>=1) & (df["rating"]<=5))
df_cleaned.removeDuplicates(["userId","movieId"])

# writing into a new delta file
df_cleaned.write.format("delta").mode("overwrite").saveAsTable("delta_ratings_cleaned")

In [None]:
df_aggregated=df.groupBy("movieId").agg(avg("rating").alias("average_rating"))
df_highest=df.groupBy("movieId").agg(max("rating").alias("highest_rating")).select("movieId","highest_rating")
df_lowest=df.groupBy("movieId").agg(min("rating").alias("lowest_rating")).select("movieId","lowest_rating")
df_aggregated=df_aggregated.join(df_highest,on="movieId",how="left").join(df_lowest,on="movieId",how="left")
df_aggregated.write.format("delta").mode("overwrite").saveAsTable("delta_ratings_aggregated")

In [None]:
spark.sql('''
update delta_ratings_cleaned
set 'rating'=4.5
where 'rating'=5
''')
df_previous=spark.read.format("delta").option("versionAsOf","0").load("dbfs:/filestore/delta_ratings_cleaned")
df_previous.show()

spark.sql("Describe History delta_ratings_cleaned")

spark.sql("optimize delta_ratings_cleaned zorder by("movieID")")
spark.sql("vacuum delta_ratings_cleaned retain 168 hours")

**task-3**

In [None]:
dbutils.fs.cp("file:/content/student.csv","dbfs:/filestore/student.csv")
dbutils.fs.cp("file:/content/city.csv","dbfs:/filestore/city.csv")
df_student=spark.read.format("csv").option("header","true").load("dbfs:/filestore/student.csv")
df_city=spark.read.format("json").option("header","true").load("dbfs:/filestore/city.csv")
hospital_data = {
    "HospitalID": [101, 102, 103, 104],
    "HospitalName": ["City Hospital", "Green Valley Clinic", "Sunshine Medical", "Downtown Health Center"],
    "City": ["New York", "Los Angeles", "Chicago", "Houston"],
    "Capacity": [250, 100, 300, 200],
    "EmergencyServices": [True, False, True, True]
}
df_hospital = pd.DataFrame(hospital_data)
df_hospital.to_parquet("hospital_data.parquet")
df_student.write.format("delta").mode("overwrite").save("dbfs:/filestore/delta_student")
df_city.write.format("delta").mode("overwrite").save("dbfs:/filestore/delta_city")
df_hospital.write.format("delta").mode("overwrite").save("dbfs:/filestore/delta_hospital")

%sql
create table delta_student using delta location "dbfs:/filestore/delta_student"

%sql
create table delta_city using delta location "dbfs:/filestore/delta_city"

%sql
create table delta_hospital using delta location "dbfs:/filestore/delta_hospital"

df=spark.read.table("delta_hospital")
hospital_cleaned=df.removeDuplicates()
hospital_cleaned.dropna(inplace=True)
hospital_cleaned.write.format("delta").mode("overwrite").saveAsTable("delta_hospital_cleaned")

df=spark.read.table("delta_student")
student_cleaned=df.removeDuplicates()
student_cleaned.dropna(inplace=True)
student_cleaned.write.format("delta").mode("overwrite").saveAsTable("delta_student_cleaned")

df=spark.read.table("delta_city")
city_cleaned=df.removeDuplicates()
city_cleaned.dropna(inplace=True)
city_cleaned.write.format("delta").mode("overwrite").saveAsTable("delta_city_cleaned")

spark.notebook.run("https://colab.research.google.com/drive/1HKA7y2u2u1ZGZGVm-Oli70LWYHv9R35i#scrollTo=n1PSDRg7jnt5",80)

In [None]:
#Additional commands
spark.sql("optimize delta_student_cleaned")
spark.sql("vacuum delta_student_cleaned retain 168 hours")

**TASK-4**

In [None]:
dbutils.fs.cp("file:/content/transactions.csv","dbfs:/filestore/transactions.csv")
df=spark.read.format("csv").option("header","true").load("dbfs:/filestore/transactions.csv")
df.write.format("delta").mode("overwrite").save("dbfs:/filestore/delta_transactions")
delta_transactions=spark.read.format("delta").load("dbfs:/filestore/delta_transactions")
#Adding new aggreagte column
delta_transactions=delta_transactions.withColumn("total_amount",col("quantity")*col("price"))
delta_transactions.write.format("delta").mode("overwrite").saveAsTable("delta_transactions")


In [None]:
import DLT

In [None]:
import dlt
from pyspark.sql.functions import col

@dlt.table
def raw_transactions():
    return spark.read.csv("dbfs:/filestore/transactions.csv", header=True, inferSchema=True)

@dlt.table
def transformed_transactions():
    return (
        dlt.read("raw_transactions")
        .withColumn("total_amount", col("Quantity") * col("Price"))
    )
spark.sql('''
CREATE OR REFRESH LIVE TABLE raw_transactions AS
SELECT * FROM read_csv("path_to_csv_file/transactions.csv");
''')

spark.sql('''
CREATE OR REFRESH LIVE TABLE transformed_transactions AS
SELECT *,(Quantity * Price) AS total_amount
FROM live.raw_transactions;
''')


In [None]:
spark.read.table("delta_transactions").show()

dbutils.fs.cp("file:/content/new_transactions.csv","dbfs:/filestore/new_transactions.csv")
new_transactions=spark.read.format("csv").option("header","true").load("dbfs:/filestore/new_transactions.csv")
new_transactions=new_transactions.withColumn("total_amount",col("quantity")*col("price"))
#merging data into old table
new_transactions.write.format("delta").mode("append").saveAsTable("delta_transactions")

#registering table
%sql
create table delta_transactions using delta location "dbfs:/filestore/delta_transactions"

spark.sql('''
update delta_transactions
set 'price'=1300
where product='Laptop'
''')

spark.sql('''
delete from delta_transactions
where "Quantity"<3
''')

%sql
delete from delta_transactions where "Quantity"<3

In [None]:
# Write new data to a temporary Delta table
df_new_transactions=spark.read.format("csv").option("header","true").load("dbfs:/filestore/new_transactions.csv")
df_new_transactions.write.format("delta").mode("overwrite").save("/filestore/delta/new_transactions")
spark.sql('''
MERGE INTO transactions AS existing
USING new_transactions AS updates
ON existing.TransactionID = updates.TransactionID
WHEN MATCHED THEN
    UPDATE SET
        existing.TransactionDate = updates.TransactionDate,
        existing.CustomerID = updates.CustomerID,
        existing.Product = updates.Product,
        existing.Quantity = updates.Quantity,
        existing.Price = updates.Price
WHEN NOT MATCHED THEN
    INSERT (TransactionID, TransactionDate, CustomerID, Product, Quantity, Price)
    VALUES (updates.TransactionID, updates.TransactionDate, updates.CustomerID, updates.Product, updates.Quantity, updates.Price)
''')

In [None]:
spark.sql("optimize delta_transactions")
spark.sql("vacuum delta_transactions retain 168 hours")

df=pd.read_csv("transactions.csv")
paraquet_transactions=df.to_parquet("paraquet_transactions.parquet")
dbutils.fs.cp("file:/content/paraquet_transactions.parquet","dbfs:/filestore/paraquet_transactions.parquet")

transactions_pqt=spark.read.format("parquet").load("dbfs:/filestore/paraquet_transactions.parquet")


In [None]:
df_new_incremental = df_new_transactions.filter("TransactionDate > '2024-09-03'")

# Append new transactions to the existing Delta table
df_new_incremental.write.format("delta").mode("append").save("path_to_delta_table/transactions")
delta_table = DeltaTable.forPath(spark, "path_to_delta_table/transactions")

spark.sql("describe history df_new_incremental")

# You can also check specific versions for data
# Check the latest version of the Delta table
latest_version = delta_table.history().first().version
print(f"Latest version: {latest_version}")

# Optionally, read the Delta table for verification
df_transactions = spark.read.format("delta").load("path_to_delta_table/transactions")
df_transactions.show()