In [0]:
%python
df_txt = spark.table("main.default.yellow_tripdata_large")

# Preview
df_txt.show(5)
df_txt.printSchema()


+--------------------+---------------------+---------------+-------------+-----------+-----+-------+----------+------------+------------+------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|payment_type|
+--------------------+---------------------+---------------+-------------+-----------+-----+-------+----------+------------+------------+------------+
|     1/11/2023 11:22|      1/15/2023 22:00|              2|         5.67|      23.46|  0.5|    0.5|      2.43|         0.0|       26.89|           1|
|      1/11/2023 4:22|      1/19/2023 12:53|              1|         9.04|      32.52|  1.0|    0.5|      3.97|         0.0|       37.99|           1|
|       1/1/2023 2:59|      1/30/2023 15:33|              2|        10.77|      37.97|  1.0|    0.5|      2.76|         0.0|       42.23|           2|
|     1/14/2023 13:56|      1/27/2023 18:47|              2|         4.27|        8.5|  0.5|  

In [0]:
%python
from pyspark.sql.functions import col, expr, to_date, hour, unix_timestamp, when

# ============================
# Parse timestamps
# ============================

df_txt = df_txt.withColumn(
    "pickup_ts",
    expr("try_to_timestamp(tpep_pickup_datetime, 'M/d/yyyy H:mm')")
).withColumn("dropoff_ts",
             expr("try_to_timestamp(tpep_dropoff_datetime, 'M/d/yyyy H:mm')")
).withColumn("pickup_date", to_date("pickup_ts")) \
 .withColumn("pickup_hour", hour("pickup_ts"))

# Filter out rows where timestamp parsing failed
df_txt = df_txt.filter(col("pickup_ts").isNotNull() & col("dropoff_ts").isNotNull())


# ============================
# Trip duration in minutes
# ============================

df_txt = df_txt.withColumn(
    "trip_duration_min",
    (unix_timestamp("dropoff_ts") - unix_timestamp("pickup_ts")) / 60
).filter(col("trip_duration_min") > 0)


# ============================
# Average speed (mph)
# ============================

df_txt = df_txt.withColumn(
    "avg_speed_mph",
    (col("trip_distance") / col("trip_duration_min")) * 60
).filter(col("avg_speed_mph") <= 100)  # remove unrealistic speeds


# ============================
# Tip percentage
# ============================

df_txt = df_txt.withColumn(
    "tip_pct",
    (col("tip_amount") / col("total_amount")) * 100
)


# ============================
# Trip distance category
# ============================

df_txt = df_txt.withColumn(
    "trip_distance_category",
    when(col("trip_distance") <= 2, "short")
    .when((col("trip_distance") > 2) & (col("trip_distance") <= 5), "medium")
    .otherwise("long")
)


# ============================
# Time of day
# ============================

df_txt = df_txt.withColumn(
    "time_of_day",
    when((col("pickup_hour") >= 5) & (col("pickup_hour") < 12), "morning")
    .when((col("pickup_hour") >= 12) & (col("pickup_hour") < 17), "afternoon")
    .when((col("pickup_hour") >= 17) & (col("pickup_hour") < 21), "evening")
    .otherwise("night")
)


# ============================
# High fare flag
# ============================

df_txt = df_txt.withColumn(
    "high_fare_flag",
    when(col("fare_amount") > 40, 1).otherwise(0)
)


# ============================
# Save as curated table
# ============================

df_txt.write.mode("overwrite").saveAsTable("main.default.taxi_trips_curated_txt")

print("Curated taxi trips table created: main.default.taxi_trips_curated_txt")


# ============================
# Create temporary view for SQL queries
# ============================

df_txt.createOrReplaceTempView("taxi_trips_view_txt")


Curated taxi trips table created: main.default.taxi_trips_curated_txt


In [0]:
%sql


-- 1. Trips per hour
SELECT pickup_hour, COUNT(*) AS trip_count
FROM main.default.taxi_trips_curated_txt
GROUP BY pickup_hour
ORDER BY pickup_hour;




pickup_hour,trip_count
0,2146
1,2193
2,2161
3,2104
4,2124
5,2114
6,2026
7,2012
8,2104
9,2067


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- 2. Average fare per hour
SELECT pickup_hour, ROUND(AVG(total_amount),2) AS avg_fare
FROM main.default.taxi_trips_curated_txt
GROUP BY pickup_hour
ORDER BY pickup_hour;



pickup_hour,avg_fare
0,34.4
1,35.63
2,34.71
3,34.86
4,35.01
5,35.16
6,35.42
7,35.1
8,35.34
9,35.39


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- 3. Daily revenue
SELECT pickup_date, ROUND(SUM(total_amount),2) AS daily_revenue
FROM main.default.taxi_trips_curated_txt
GROUP BY pickup_date
ORDER BY pickup_date;



pickup_date,daily_revenue
2023-01-01,114257.58
2023-01-02,113013.73
2023-01-03,105877.41
2023-01-04,103220.14
2023-01-05,99130.22
2023-01-06,94432.26
2023-01-07,93785.12
2023-01-08,91477.86
2023-01-09,83274.99
2023-01-10,78423.8


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- 4. Trips by time of day
SELECT time_of_day, COUNT(*) AS trips
FROM main.default.taxi_trips_curated_txt
GROUP BY time_of_day
ORDER BY trips DESC;



time_of_day,trips
night,16850
morning,14544
afternoon,10296
evening,8110


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
-- 5. Average speed per trip distance category
SELECT trip_distance_category, ROUND(AVG(avg_speed_mph),2) AS avg_speed
FROM main.default.taxi_trips_curated_txt
GROUP BY trip_distance_category
ORDER BY avg_speed DESC;

trip_distance_category,avg_speed
long,0.21
medium,0.1
short,0.02


Databricks visualization. Run in Databricks to view.