In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import os

In [0]:
# Print the version of Spark currently being used in the environment
print(os.environ['SPARK_VERSION'])

# Retrieve job parameter 'current_date' from Databricks widgets (user input or runtime parameter)
# This is a flexible way to pass dynamic parameters during job execution
# Example: Widgets are often set via dbutils.widgets.text("current_date", "20241123")
#date_str = dbutils.widgets.get("current_date")

date_str = dbutils.widgets.get("current_date")  # Fetch 'current_date' from widget
#date_str = "20241123"  # Hardcoded fallback for testing purposes (commented out)

# Define the file path dynamically based on the input date parameter
# The file naming convention is assumed to be zoom_car_bookings_yyyymmdd.json
booking_data = f"dbfs:/FileStore/zoomcar/bookings/zoom_car_bookings_{date_str}.json"

# Check if the file exists in the given path using dbutils.fs.ls
# If the file does not exist, an exception will be raised
try:
    dbutils.fs.ls(booking_data)  # List files in the specified path to validate existence
except Exception as e:
    # Raise a FileNotFoundError with a clear message if the file is not found
    raise FileNotFoundError(f"File {booking_data} not found!") from e


3.5.0


In [0]:
# Read booking data
booking_df = spark.read \
    .format("json") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("multiLine", "true") \
    .load(booking_data)

booking_df.printSchema()
display(booking_data)


root
 |-- booking_date: string (nullable = true)
 |-- booking_id: string (nullable = true)
 |-- car_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- end_time: string (nullable = true)
 |-- start_time: string (nullable = true)
 |-- status: string (nullable = true)
 |-- total_amount: double (nullable = true)



'dbfs:/FileStore/zoomcar/bookings/zoom_car_bookings_20241123.json'

In [0]:
from pyspark.sql.functions import col, to_timestamp, to_date  

# Drop rows with null values in critical columns: booking_id, customer_id, car_id, and booking_date
booking_df = booking_df.na.drop(subset=["booking_id", "customer_id", "car_id", "booking_date"])

# Define allowed statuses for bookings
allowed_statuses = ["completed", "cancelled", "pending"]

# Filter rows where the 'status' column contains only allowed statuses
booking_df = booking_df.filter(col("status").isin(allowed_statuses))

# Convert the 'booking_date' column to date format (yyyy-MM-dd)
booking_df = booking_df.withColumn("booking_date", to_date(col("booking_date"), "yyyy-MM-dd"))

# Convert 'start_time' and 'end_time' columns to timestamp format with the given pattern
booking_df = booking_df.withColumn("start_time", to_timestamp(col("start_time"), "yyyy-MM-dd'T'HH:mm:ss'Z'")).withColumn("end_time", to_timestamp(col("end_time"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))

# Filter out rows where 'booking_date' is null after conversion
booking_df = booking_df.filter(col("booking_date").isNotNull())

In [0]:
display(booking_df)

booking_date,booking_id,car_id,customer_id,end_time,start_time,status,total_amount
2024-07-09,B001,CAR244,C001,2024-07-09T04:41:44Z,2024-07-09T03:41:44Z,completed,57.83
2024-07-12,B002,CAR432,C002,2024-07-13T01:10:58Z,2024-07-12T19:10:58Z,completed,159.36
2024-04-23,B003,CAR443,C003,2024-04-23T09:54:58Z,2024-04-23T04:54:58Z,completed,59.16
2024-11-13,B004,CAR524,C004,2024-11-13T09:55:57Z,2024-11-13T07:55:57Z,cancelled,176.5
2024-10-18,B005,CAR414,C005,2024-10-19T01:07:47Z,2024-10-18T21:07:47Z,cancelled,129.11
2024-11-06,B006,CAR591,C006,2024-11-07T05:27:53Z,2024-11-06T22:27:53Z,completed,152.93
2024-05-15,B007,CAR183,C007,2024-05-15T12:54:28Z,2024-05-15T09:54:28Z,pending,196.48
2024-09-21,B008,CAR985,C008,2024-09-21T11:26:24Z,2024-09-21T05:26:24Z,completed,79.75
2024-05-02,B009,CAR646,C009,2024-05-02T15:01:22Z,2024-05-02T13:01:22Z,pending,105.63
2024-03-10,B010,CAR811,C010,2024-03-10T14:12:42Z,2024-03-10T06:12:42Z,completed,75.15


In [0]:

# Drop rows with null values in critical columns: booking_id, customer_id, car_id, and booking_date
booking_df = booking_df.na.drop(subset=["booking_id", "customer_id", "car_id", "booking_date"])

# Define allowed statuses for bookings
allowed_statuses = ["completed", "cancelled", "pending"]

# Filter rows where the 'status' column contains only allowed statuses
booking_df = booking_df.filter(col("status").isin(allowed_statuses))

# Convert the 'booking_date' column to date format (yyyy-MM-dd)
booking_df = booking_df.withColumn("booking_date", to_date(col("booking_date"), "yyyy-MM-dd"))

# Convert 'start_time' and 'end_time' columns to timestamp format with the given pattern
booking_df = booking_df.withColumn("start_time", to_timestamp(col("start_time"), "yyyy-MM-dd'T'HH:mm:ss'Z'")).withColumn("end_time", to_timestamp(col("end_time"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))

# Filter out rows where 'booking_date' is null after conversion
booking_df = booking_df.filter(col("booking_date").isNotNull())

# Perform data quality checks on the booking data
# Initialize a Check for validating booking data
check_bookings = Check(spark, CheckLevel.Error, "Booking Data Check") \
    .hasSize(lambda x: x > 0)  # Ensure that the dataset has more than 0 rows
    .isUnique("booking_id", hint="Booking ID is not unique throughout")  # Check if 'booking_id' is unique
    .isComplete("customer_id", hint="Customer ID is missing")  # Ensure 'customer_id' column has no null values
    .isComplete("car_id", hint="Car ID is missing")  # Ensure 'car_id' column has no null values
    .isComplete("booking_date", hint="Booking date is missing")  # Ensure 'booking_date' column has no null values
    .isComplete("start_time", hint="Start time is missing")  # Ensure 'start_time' column has no null values
    .isComplete("end_time", hint="End time is missing")  # Ensure 'end_time' column has no null values
    .isNonNegative("total_amount", hint="Total amount cannot be negative")  # Ensure 'total_amount' is non-negative
    .isContainedIn("status", allowed_statuses, hint="Status value is invalid")  # Check if 'status' values are valid

# Run the data quality verification suite on the booking data
booking_dq_check = VerificationSuite(spark) \
    .onData(booking_df) \
    .addCheck(check_bookings) \
    .run()

# Convert the verification results into a DataFrame for visualization
bookings_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, booking_dq_check)

# Display the DataFrame containing the results of the data quality checks
display(bookings_dq_check_df)

# Check if the verification passed or failed
if booking_dq_check.status != "Success":
    raise ValueError("Data Quality Checks Failed for Booking Data")  # Raise an error if checks fail




check,check_level,check_status,constraint,constraint_status,constraint_message
Booking Data Check,Error,Success,SizeConstraint(Size(None)),Success,
Booking Data Check,Error,Success,"UniquenessConstraint(Uniqueness(List(booking_id),None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(customer_id,None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(car_id,None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(booking_date,None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(start_time,None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(end_time,None,None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(total_amount is non-negative,COALESCE(CAST(total_amount AS DECIMAL(20,10)), 0.0) >= 0,None,List(total_amount),None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(status contained in completed,cancelled,pending,`status` IS NULL OR `status` IN ('completed','cancelled','pending'),None,List(status),None))",Success,


In [0]:
from pyspark.sql.functions import col, unix_timestamp, expr

booking_df = booking_df.withColumnRenamed("status", "book_status")

# Calculate trip time in minutes
booking_df = booking_df.withColumn(
    "duration_book",
    (unix_timestamp(col("end_time")) - unix_timestamp(col("start_time"))) / 60
)


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS zoom

In [0]:
booking_table_name = "databricks2.zoom.zoom_staging_bookings_delta"

# Check if the table exists
if not spark.catalog._jcatalog.tableExists(booking_table_name):
    # If the table does not exist, write the DataFrame as a new Delta table
    booking_df.write.format("delta").saveAsTable(booking_table_name)
else:
    # If the table exists, append the new data
    booking_df.write.format("delta").mode("append").saveAsTable(booking_table_name)



In [0]:
%sql
--%sql

--DROP TABLE IF EXISTS databricks2.default.zoom_staging_bookings_delta