In [0]:
from pyspark.sql.functions import sum, col,to_date, date_format,unix_timestamp
import pandas as pd
import os

date_str=dbutils.widgets.get("arrival_date")
bookings_df = spark.read.format("json").load(f"/Volumes/zoom_car/default/data/car_booking/zoom_car_bookings_{date_str}.json")
bookings_df.printSchema()
display(bookings_df)

In [0]:
#Data Cleaning & Transformation

critical_fields = ['booking_id', 'customer_id', 'car_id', 'booking_date']
valid_status = ['cancelled', 'completed', 'pending']


bookings_cleaned = bookings_df.dropna(subset=critical_fields)\
    .withColumn("booking_date", to_date("booking_date", "yyyy-MM-dd"))\
    .withColumn("start_time", col("start_time").cast("timestamp"))\
    .withColumn("end_time", col("end_time").cast("timestamp"))\
    .filter(col("booking_date").isNotNull())\
    .filter(col("start_time").isNotNull())\
    .filter(col("end_time").isNotNull())\
    .filter(col("status").isin(valid_status))

bookings_transformed = bookings_cleaned \
    .withColumn("start_date", to_date("start_time")) \
    .withColumn("end_date", to_date("end_time")) \
    .withColumn("start_time_only", date_format("start_time", "HH:mm:ss")) \
    .withColumn("end_time_only", date_format("end_time", "HH:mm:ss")) \
    .withColumn("total_duration", (unix_timestamp("end_time") - unix_timestamp("start_time")) / 60)

bookings_transformed.write.format("delta").mode("overwrite").option("overwriteSchema","true").saveAsTable("zoom_car.default.staging_bookings_delta")
                            
print("Table created successfully!")
display(bookings_transformed)
