In [0]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from pyspark.sql import functions as F, types as T

# Spark with Delta enabled (works in Colab & Databricks)
builder = (
    SparkSession.builder.appName("DeltaRideHailing")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()

# ---------- Inline Data ----------
trip_schema = T.StructType([
    T.StructField("trip_id", T.IntegerType()),
    T.StructField("rider_id", T.StringType()),
    T.StructField("driver_id", T.StringType()),
    T.StructField("city", T.StringType()),
    T.StructField("distance_km", T.DoubleType()),
    T.StructField("fare", T.DoubleType()),
    T.StructField("tip", T.DoubleType()),
    T.StructField("ts", T.TimestampType())
])

driver_schema = T.StructType([
    T.StructField("driver_id", T.StringType()),
    T.StructField("driver_name", T.StringType()),
    T.StructField("rating", T.DoubleType()),
    T.StructField("vehicle", T.StringType())
])

# Sample data
trips_rows = [
    (1001, "R001", "D010", "Bengaluru", 12.4, 320.0, 20.0, F.to_timestamp(F.lit("2025-08-08 08:05:00"))),
    (1002, "R002", "D011", "Hyderabad", 6.2, 150.0, 10.0, F.to_timestamp(F.lit("2025-08-08 08:15:00"))),
    (1003, "R003", "D012", "Pune", 3.5, 90.0, 0.0, F.to_timestamp(F.lit("2025-08-08 08:20:00"))),
    (1004, "R001", "D010", "Bengaluru", 18.9, 480.0, 25.0, F.to_timestamp(F.lit("2025-08-08 08:45:00"))),
    (1005, "R004", "D013", "Chennai", 10.0, 260.0, 15.0, F.to_timestamp(F.lit("2025-08-08 09:05:00"))),
    (1006, "R005", "D012", "Pune", 2.2, 70.0, 0.0, F.to_timestamp(F.lit("2025-08-08 09:10:00"))),
]
drivers_rows = [
    ("D010", "Anil", 4.8, "WagonR"),
    ("D011", "Sana", 4.6, "i20"),
    ("D012", "Rakesh", 4.4, "Swift"),
    ("D013", "Meera", 4.9, "Ciaz")
]



trips_df = spark.createDataFrame(trips_rows, schema=trip_schema)
drivers_df = spark.createDataFrame(drivers_rows, schema=driver_schema)

BASE = "/tmp/delta/ride_hailing"
TRIPS_PATH = f"{BASE}/trips"
DRIVERS_PATH = f"{BASE}/drivers"
trips_df.write.format("delta").mode("overwrite").save(TRIPS_PATH)
drivers_df.write.format("delta").mode("overwrite").save(DRIVERS_PATH)

print("Seeded:")
print(" Trips ->", TRIPS_PATH)
print(" Drivers ->", DRIVERS_PATH)

In [0]:

# Managed table (trips_managed)
spark.sql(f"""
CREATE TABLE trips_managed USING DELTA LOCATION '{TRIPS_PATH}'
AS SELECT * FROM delta.`{TRIPS_PATH}`
""")

# Unmanaged table (drivers_ext)
spark.sql(f"""
CREATE EXTERNAL TABLE drivers_ext USING DELTA LOCATION '{DRIVERS_PATH}'
""")

# Verify with DESCRIBE DETAIL
spark.sql("DESCRIBE DETAIL trips_managed").show(truncate=False)
spark.sql("DESCRIBE DETAIL drivers_ext").show(truncate=False)

In [0]:

trips_df = spark.read.format("delta").load(TRIPS_PATH)
drivers_df = spark.read.format("delta").load(DRIVERS_PATH)

# Show schemas and top 10 rows
trips_df.printSchema()
drivers_df.printSchema()
trips_df.show(10)
drivers_df.show(10)

# Compute derived column 'total_amount' and show top 5 trips by total_amount
trips_df = trips_df.withColumn("total_amount", F.col("fare") + F.col("tip"))
trips_df.orderBy(F.desc("total_amount")).show(5)

In [0]:


# Increase tip by 5 for trips in Bengaluru where distance_km > 15
trips_before_update = trips_df.filter((F.col("city") == "Bengaluru") & (F.col("distance_km") > 15))
trips_before_update.show()

# Perform the update
trips_df = trips_df.withColumn("tip", F.when((F.col("city") == "Bengaluru") & (F.col("distance_km") > 15), F.col("tip") + 5).otherwise(F.col("tip")))

# Show updated rows
trips_after_update = trips_df.filter((F.col("city") == "Bengaluru") & (F.col("distance_km") > 15))
trips_after_update.show()

In [0]:
trips_df = trips_df.filter((F.col("fare") > 0) & (F.col("distance_km") > 0))

trips_df.count()

In [0]:

new_trip_rows = [
    (1004, "R001", "D010", "Bengaluru", 18.9, 500.0, 30.0, F.to_timestamp(F.lit("2025-08-08 08:45:00"))),
    (1007, "R006", "D013", "Mumbai", 15.0, 350.0, 20.0, F.to_timestamp(F.lit("2025-08-08 10:00:00")))
]
new_trip_df = spark.createDataFrame(new_trip_rows, schema=trip_schema)

# Merge into the existing trips table
new_trip_df.createOrReplaceTempView("new_trips")
spark.sql("""
MERGE INTO delta.`/tmp/delta/ride_hailing/trips` AS trips
USING new_trips AS new
ON trips.trip_id = new.trip_id
WHEN MATCHED THEN UPDATE SET trips.fare = new.fare, trips.tip = new.tip
WHEN NOT MATCHED THEN INSERT VALUES (new.trip_id, new.rider_id, new.driver_id, new.city, new.distance_km, new.fare, new.tip, new.ts)
""")

# Verify the merge results
spark.sql("SELECT * FROM delta.`/tmp/delta/ride_hailing/trips` WHERE trip_id IN (1004, 1007)").show()


In [0]:

# Join trips with drivers
gold_view = trips_df.join(drivers_df, "driver_id").select(
    "trip_id", "city", "driver_name", "rating", "distance_km", (F.col("fare") + F.col("tip")).alias("total_amount"), "ts"
)

# Compute city-wise total revenue and average driver rating
gold_view.groupBy("city").agg(
    F.sum("total_amount").alias("total_revenue"),
    F.avg("rating").alias("avg_driver_rating")
).show()

# Compute driver-wise total trips and top 3 drivers by revenue
gold_view.groupBy("driver_name").agg(
    F.count("trip_id").alias("total_trips"),
    F.sum("total_amount").alias("total_revenue")
).orderBy(F.desc("total_revenue")).show(3)

In [0]:

# Show DESCRIBE HISTORY
spark.sql("DESCRIBE HISTORY delta.`/tmp/delta/ride_hailing/trips`").show()

# Read the table as of version 0 and compare with the latest version
trips_version_0 = spark.read.format("delta").option("versionAsOf", 0).load(TRIPS_PATH)
trips_df = spark.read.format("delta").load(TRIPS_PATH)

# Compare row counts and show some rows
print("Version 0 row count:", trips_version_0.count())
print("Latest version row count:", trips_df.count())
trips_version_0.show(5)
trips_df.show(5)



In [0]:
PARTITIONED_PATH = "/tmp/delta/partitioned_trips"
trips_df.write.partitionBy("city").format("delta").mode("overwrite").save(PARTITIONED_PATH)



In [0]:

# Create a small incremental batch
incremental_batch = [
    (1008, "R007", "D011", "Mumbai", 8.0, 200.0, 12.0, F.to_timestamp(F.lit("2025-08-08 11:00:00"))),
    (1009, "R008", "D010", "Mumbai", 10.5, 250.0, 15.0, F.to_timestamp(F.lit("2025-08-08 11:05:00")))
]
incremental_df = spark.createDataFrame(incremental_batch, schema=trip_schema)

# Append to the trips table
incremental_df.write.format("delta").mode("append").save(TRIPS_PATH)

# Re-run the city-wise revenue aggregation
gold_view = spark.read.format("delta").load(TRIPS_PATH).join(drivers_df, "driver_id").select(
    "trip_id", "city", "driver_name", "rating", "distance_km", (F.col("fare") + F.col("tip")).alias("total_amount"), "ts"
)
gold_view.groupBy("city").agg(
    F.sum("total_amount").alias("total_revenue"),
    F.avg("rating").alias("avg_driver_rating")
).show()



In [0]:

streaming_df = spark.readStream.format("delta").load(TRIPS_PATH)
query = streaming_df.writeStream.outputMode("append").format("console").trigger(processingTime="5 seconds").start()

# Simulate appending a new batch while the stream runs
streaming_batch = [
    (1010, "R009", "D011", "Mumbai", 7.5, 180.0, 10.0, F.to_timestamp(F.lit("2025-08-08 12:00:00")))
]
streaming_batch_df = spark.createDataFrame(streaming_batch, schema=trip_schema)
streaming_batch_df.write.format("delta").mode("append").save(TRIPS_PATH)





In [0]:


city_revenue_df = gold_view.groupBy("city").agg(
    F.sum("total_amount").alias("total_revenue")
).toPandas()

import matplotlib.pyplot as plt
city_revenue_df.plot(kind="bar", x="city", y="total_revenue")
plt.title("City-wise Revenue")
plt.ylabel("Total Revenue")
plt.show()
hourly_revenue_df = gold_view.withColumn("hour", F.hour("ts")).groupBy("hour").agg(
    F.sum("total_amount").alias("hourly_revenue")
).toPandas()
plt.plot(hourly_revenue_df["hour"], hourly_revenue_df["hourly_revenue"])
plt.title("Revenue by Hour")
plt.xlabel("Hour")
plt.ylabel("Hourly Revenue")
plt.show()



In [0]:


spark.sql("DROP TABLE IF EXISTS trips_managed")
spark.sql("DROP TABLE IF EXISTS drivers_ext")
from os import path
print("Is managed trips data removed?", not path.exists(TRIPS_PATH))
print("Is unmanaged drivers data still there?", path.exists(DRIVERS_PATH))




In [0]:

bad_trip_rows = [
    (1011, "R010", "D014", "Mumbai", 10.0, 200.0, -5.0, F.to_timestamp(F.lit("2025-08-08 12:10:00"))),  # Negative tip
    (1012, "R011", "D015", "Chennai", 7.5, 150.0, 0.0, F.to_timestamp(F.lit("2025-08-08 12:20:00")))
]
bad_trip_df = spark.createDataFrame(bad_trip_rows, schema=trip_schema)
valid_trip_df = bad_trip_df.filter(F.col("tip") >= 0)
valid_trip_df.write.format("delta").mode("append").save(TRIPS_PATH)

try:
    bad_trip_df.write.format("delta").mode("append").save(TRIPS_PATH)
except Exception as e:
    print("Error writing bad data:", e)




In [0]:

parquet_path = "/tmp/delta/ride_hailing_parquet"
subset_trips_df = trips_df.limit(5)  # Take a small subset of trips
subset_trips_df.write.format("parquet").mode("overwrite").save(parquet_path)

delta_path = "/tmp/delta/ride_hailing_delta_from_parquet"
spark.read.parquet(parquet_path).write.format("delta").mode("overwrite").save(delta_path)

delta_df = spark.read.format("delta").load(delta_path)
delta_df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

delta_df_v0.show(5)
delta_df.show(5)

new_batch_df = spark.createDataFrame([(1006, "R009", "D013", "Mumbai", 8.0, 150.0, 10.0, F.to_timestamp(F.lit("2025-08-08 12:25:00")))],
schema=trip_schema)

new_batch_df.createOrReplaceTempView("new_batch")

spark.sql("""
MERGE INTO delta.`/tmp/delta/ride_hailing_delta_from_parquet` AS trips
USING new_batch AS new
ON trips.trip_id = new.trip_id
WHEN MATCHED THEN UPDATE SET trips.fare = new.fare, trips.tip = new.tip
WHEN NOT MATCHED THEN INSERT VALUES (new.trip_id, new.rider_id, new.driver_id, new.city, new.distance_km, new.fare, new.tip, new.ts)
""")

spark.read.format("delta").load(delta_path).show()




In [0]:
# Convert to Pandas for plotting
city_revenue_df = gold_view.groupBy("city").agg(
    F.sum("total_amount").alias("total_revenue")
).toPandas()

# Plotting (Trips per City - Bar Chart)
city_revenue_df.plot(kind="bar", x="city", y="total_revenue")
plt.title("City-wise Revenue")
plt.ylabel("Total Revenue")
plt.xlabel("City")
plt.show()

# Plotting (Top Drivers by Revenue - Bar Chart)
driver_revenue_df = gold_view.groupBy("driver_name").agg(
    F.sum("total_amount").alias("total_revenue")
).orderBy(F.desc("total_revenue")).limit(10).toPandas()

driver_revenue_df.plot(kind="bar", x="driver_name", y="total_revenue")
plt.title("Top 10 Drivers by Revenue")
plt.ylabel("Total Revenue")
plt.xlabel("Driver Name")
plt.show()

# Plotting (Revenue by Hour - Line Chart)
hourly_revenue_df = gold_view.withColumn("hour", F.hour("ts")).groupBy("hour").agg(
    F.sum("total_amount").alias("hourly_revenue")
).toPandas()

plt.plot(hourly_revenue_df["hour"], hourly_revenue_df["hourly_revenue"])
plt.title("Revenue by Hour")
plt.xlabel("Hour")
plt.ylabel("Hourly Revenue")
plt.show()


