In [0]:
spark.version

'4.0.0'

## Load data

In [0]:
import pyspark.sql.functions as F

RAW_2019 = "/Volumes/workspace/default/nyc_taxi/2019"
LOOKUP   = "/Volumes/workspace/default/nyc_taxi/lookup/taxi_zone_lookup.csv"

# total bytes
total_bytes = (spark.read.format("binaryFile")
               .load("dbfs:/Volumes/workspace/default/nyc_taxi/2019/**")
               .agg(F.sum("length").alias("bytes"))
               .first()["bytes"])
print("Trips 2019 size:", total_bytes, "bytes")
print("≈ GB:", total_bytes/1024/1024/1024)

# load data
df = spark.read.parquet(RAW_2019)
lookup = spark.read.option("header", True).csv(LOOKUP)

print("Rows:", df.count())
df.printSchema()
lookup.show(5, truncate=False)


Trips 2019 size: 1243532931 bytes
≈ GB: 1.1581302909180522
Rows: 84598444
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

+----------+-------------+-----------------------+-----

## Transformation

In [0]:
df_clean = (
    df
    # Filter out invalid/exceptional values
    .filter((F.col("fare_amount") > 0) & (F.col("trip_distance") > 0))
    .filter((F.col("passenger_count") >= 1) & (F.col("passenger_count") <= 6))
    # Column transformation: Extract time features and derive indicators
    .withColumn("pickup_ts", F.col("tpep_pickup_datetime").cast("timestamp"))
    .withColumn("dropoff_ts", F.col("tpep_dropoff_datetime").cast("timestamp"))
    .withColumn("year", F.year("pickup_ts"))
    .withColumn("month", F.month("pickup_ts"))
    .withColumn("pickup_hour", F.hour("pickup_ts"))
    .withColumn("trip_mins", (F.unix_timestamp("dropoff_ts") - F.unix_timestamp("pickup_ts"))/60.0)
    .withColumn("tip_pct", F.when(F.col("total_amount") > 0, F.col("tip_amount")/F.col("total_amount")).otherwise(F.lit(0.0)))
    .withColumn("is_rush_hour", F.col("pickup_hour").between(16, 19))
)

bad_fare_dist = df.filter((F.col("fare_amount") <= 0) | (F.col("trip_distance") <= 0)).count()
bad_passenger = df.filter(~((F.col("passenger_count") >= 1) & (F.col("passenger_count") <= 6))).count()
print("Invalid fare/distance:", bad_fare_dist)
print("Invalid passenger    :", bad_passenger)

display(df_clean.limit(100))

Invalid fare/distance: 915064
Invalid passenger    : 1526716


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,pickup_ts,dropoff_ts,year,month,pickup_hour,trip_mins,tip_pct,is_rush_hour
1,2019-03-01T00:25:27.000,2019-03-01T00:36:37.000,2.0,3.7,1.0,N,95,130,1,13.0,0.5,0.5,0.7,0.0,0.3,15.0,0.0,,2019-03-01T00:25:27.000Z,2019-03-01T00:36:37.000Z,2019,3,0,11.166666666666666,0.0466666666666666,False
1,2019-03-01T00:05:21.000,2019-03-01T00:38:23.000,1.0,14.1,1.0,N,249,28,1,41.0,3.0,0.5,10.1,5.76,0.3,60.66,2.5,,2019-03-01T00:05:21.000Z,2019-03-01T00:38:23.000Z,2019,3,0,33.03333333333333,0.1665018133860863,False
1,2019-03-01T00:48:55.000,2019-03-01T01:06:03.000,1.0,9.6,1.0,N,138,98,2,27.0,0.5,0.5,0.0,0.0,0.3,28.3,0.0,,2019-03-01T00:48:55.000Z,2019-03-01T01:06:03.000Z,2019,3,0,17.133333333333333,0.0,False
1,2019-03-01T00:11:42.000,2019-03-01T00:16:40.000,1.0,0.8,1.0,N,48,48,1,5.5,3.0,0.5,3.0,0.0,0.3,12.3,2.5,,2019-03-01T00:11:42.000Z,2019-03-01T00:16:40.000Z,2019,3,0,4.966666666666667,0.2439024390243902,False
1,2019-03-01T00:45:03.000,2019-03-01T00:49:38.000,1.0,1.2,1.0,N,246,48,2,6.0,3.0,0.5,0.0,0.0,0.3,9.8,2.5,,2019-03-01T00:45:03.000Z,2019-03-01T00:49:38.000Z,2019,3,0,4.583333333333333,0.0,False
1,2019-03-01T00:02:37.000,2019-03-01T00:07:30.000,1.0,0.6,1.0,Y,239,238,2,5.5,3.0,0.5,0.0,0.0,0.3,9.3,2.5,,2019-03-01T00:02:37.000Z,2019-03-01T00:07:30.000Z,2019,3,0,4.883333333333334,0.0,False
2,2019-02-28T19:52:45.000,2019-02-28T20:01:54.000,1.0,5.65,1.0,N,132,197,2,17.0,0.5,0.5,0.0,0.0,0.3,18.3,0.0,,2019-02-28T19:52:45.000Z,2019-02-28T20:01:54.000Z,2019,2,19,9.15,0.0,True
2,2019-03-01T00:14:23.000,2019-03-01T00:19:17.000,1.0,1.16,1.0,N,229,170,1,6.0,0.5,0.5,2.45,0.0,0.3,12.25,2.5,,2019-03-01T00:14:23.000Z,2019-03-01T00:19:17.000Z,2019,3,0,4.9,0.2,False
2,2019-03-01T00:19:09.000,2019-03-01T00:23:30.000,1.0,0.71,1.0,N,137,234,1,5.0,0.5,0.5,1.0,0.0,0.3,9.8,2.5,,2019-03-01T00:19:09.000Z,2019-03-01T00:23:30.000Z,2019,3,0,4.35,0.1020408163265306,False
2,2019-03-01T00:06:46.000,2019-03-01T00:17:09.000,5.0,2.63,1.0,N,231,246,1,10.5,0.5,0.5,2.86,0.0,0.3,17.16,2.5,,2019-03-01T00:06:46.000Z,2019-03-01T00:17:09.000Z,2019,3,0,10.383333333333333,0.1666666666666666,False


In [0]:
#Join
lookup_small = lookup.select(
    F.col("LocationID").cast("int").alias("PULocationID"),
    "Borough", "Zone"
).withColumnRenamed("Borough", "PU_Borough").withColumnRenamed("Zone", "PU_Zone")

df_enriched = df_clean.join(F.broadcast(lookup_small), on="PULocationID", how="left")

print("enriched columns:", df_enriched.columns)
df_enriched.select(
    "tpep_pickup_datetime","PULocationID","PU_Borough","PU_Zone",
    "trip_distance","fare_amount","tip_pct"
).show(10, truncate=False)

enriched columns: ['PULocationID', 'VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee', 'pickup_ts', 'dropoff_ts', 'year', 'month', 'pickup_hour', 'trip_mins', 'tip_pct', 'is_rush_hour', 'PU_Borough', 'PU_Zone']
+--------------------+------------+----------+-----------------------------+-------------+-----------+-------------------+
|tpep_pickup_datetime|PULocationID|PU_Borough|PU_Zone                      |trip_distance|fare_amount|tip_pct            |
+--------------------+------------+----------+-----------------------------+-------------+-----------+-------------------+
|2019-03-01 00:25:27 |95          |Queens    |Forest Hills                 |3.7          |13.0       |0.04666666666666666|
|2019-03-01 00:05:21 |249         |Manh

In [0]:
# Aggregate
agg_by_month = (
    df_enriched
    .groupBy("year", "month", "PU_Borough")
    .agg(
        F.count("*").alias("trips"),
        F.avg("trip_distance").alias("avg_dist"),
        F.avg("tip_pct").alias("avg_tip_pct"),
        F.sum(F.col("total_amount")).alias("sum_total_amt")
    )
    .orderBy("year", "month", "PU_Borough")
)

agg_by_month.show(10, truncate=False)

+----+-----+----------+-----+------------------+--------------------+------------------+
|year|month|PU_Borough|trips|avg_dist          |avg_tip_pct         |sum_total_amt     |
+----+-----+----------+-----+------------------+--------------------+------------------+
|2001|1    |Manhattan |2    |1.8250000000000002|0.0                 |21.6              |
|2002|2    |Manhattan |11   |2.9309090909090902|0.0                 |196.92000000000002|
|2008|8    |Manhattan |2    |0.7849999999999999|0.08333333333333333 |20.26             |
|2008|12   |Bronx     |1    |7.16              |0.0                 |24.8              |
|2008|12   |Brooklyn  |1    |4.06              |0.16666666666666666 |26.76             |
|2008|12   |Manhattan |144  |2.2690972222222223|0.039084316046120865|2188.2100000000005|
|2008|12   |Queens    |41   |12.71292682926829 |0.04138012016089551 |1888.58           |
|2008|12   |Unknown   |2    |5.11              |0.0                 |46.22             |
|2009|1    |Brooklyn 

## SQL queries

In [0]:
df_enriched.createOrReplaceTempView("taxi")
spark.sql("""
    SELECT year, month, PU_Borough,
           COUNT(*) AS trips,
           AVG(trip_distance) AS avg_dist
    FROM taxi
    WHERE is_rush_hour = true
    GROUP BY year, month, PU_Borough
    ORDER BY year, month, PU_Borough
""").show(10)

spark.sql("""
    SELECT PU_Borough,
           percentile_approx(tip_pct, 0.5) AS p50_tip_pct,
           percentile_approx(tip_pct, 0.9) AS p90_tip_pct
    FROM taxi
    GROUP BY PU_Borough
    ORDER BY PU_Borough
""").show(10)

+----+-----+----------+-----+------------------+
|year|month|PU_Borough|trips|          avg_dist|
+----+-----+----------+-----+------------------+
|2008|   12| Manhattan|    2|              0.77|
|2009|    1| Manhattan|    9| 2.698888888888889|
|2009|    1|   Unknown|    1|              2.46|
|2018|   12|     Bronx|    1|              3.22|
|2018|   12|  Brooklyn|    4|            6.8125|
|2018|   12| Manhattan|  125| 2.908319999999999|
|2018|   12|    Queens|    6|10.443333333333333|
|2019|    1|     Bronx| 2289|  6.44893840104849|
|2019|    1|  Brooklyn|13923| 4.046517991812094|
|2019|    1|       EWR|   42| 7.748333333333335|
+----+-----+----------+-----+------------------+
only showing top 10 rows
+-------------+--------------------+-------------------+
|   PU_Borough|         p50_tip_pct|        p90_tip_pct|
+-------------+--------------------+-------------------+
|        Bronx|                 0.0|                0.0|
|     Brooklyn|0.046948356807511735| 0.1982758620689655|
|   

## Optimization

In [0]:
df_base = (
    df
    .filter((F.col("fare_amount") > 0) & (F.col("trip_distance") > 0))
    .withColumn("pickup_ts", F.col("tpep_pickup_datetime").cast("timestamp"))
    .withColumn("year",  F.year("pickup_ts"))
    .withColumn("month", F.month("pickup_ts"))
    # Write filters early
    .filter(F.col("month") == 11)
    # Projection
    .select("PULocationID","year","month","trip_distance","fare_amount","tip_amount","tpep_pickup_datetime")
)

df_base.explain("formatted")

== Physical Plan ==
* ColumnarToRow (5)
+- PhotonResultStage (4)
   +- PhotonProject (3)
      +- PhotonProject (2)
         +- PhotonScan parquet  (1)


(1) PhotonScan parquet 
Output [5]: [tpep_pickup_datetime#11305, trip_distance#11308, PULocationID#11311L, fare_amount#11314, tip_amount#11317]
DictionaryFilters: [(trip_distance#11308 > 0.0), (fare_amount#11314 > 0.0)]
Location: InMemoryFileIndex [dbfs:/Volumes/workspace/default/nyc_taxi/2019]
ReadSchema: struct<tpep_pickup_datetime:timestamp_ntz,trip_distance:double,PULocationID:bigint,fare_amount:double,tip_amount:double>
RequiredDataFilters: [isnotnull(fare_amount#11314), isnotnull(trip_distance#11308), isnotnull(tpep_pickup_datetime#11305), (fare_amount#11314 > 0.0), (trip_distance#11308 > 0.0), (month(cast(cast(tpep_pickup_datetime#11305 as timestamp) as date)) = 11)]

(2) PhotonProject
Input [5]: [tpep_pickup_datetime#11305, trip_distance#11308, PULocationID#11311L, fare_amount#11314, tip_amount#11317]
Arguments: [tpep_pickup_d

In [0]:
# ensure one-to-one mapping
lk = lookup.select(
    F.col("LocationID").cast("int").alias("PULocationID"),
    F.col("Borough").alias("PU_Borough")
).dropDuplicates(["PULocationID"])

# Join with broadcasted dimension table after early filtering
df_joined = df_base.join(F.broadcast(lk), on="PULocationID", how="left")

# Repartition by aggregation keys
df_part = df_joined.repartition("year","month","PU_Borough")

agg_by_month = (df_part
    .groupBy("year","month","PU_Borough")
    .agg(
        F.count("*").alias("trips"),
        F.avg("trip_distance").alias("avg_dist"),
        F.avg( (F.col("tip_amount")/F.col("fare_amount")).cast("double") ).alias("avg_tip_pct")
    )
)

agg_by_month.explain("formatted")

== Physical Plan ==
AdaptiveSparkPlan (24)
+- == Initial Plan ==
   ColumnarToRow (23)
   +- PhotonResultStage (22)
      +- PhotonGroupingAgg (21)
         +- PhotonShuffleExchangeSource (20)
            +- PhotonShuffleMapStage (19)
               +- PhotonShuffleExchangeSink (18)
                  +- PhotonProject (17)
                     +- PhotonBroadcastHashJoin LeftOuter (16)
                        :- PhotonProject (3)
                        :  +- PhotonProject (2)
                        :     +- PhotonScan parquet  (1)
                        +- PhotonShuffleExchangeSource (15)
                           +- PhotonShuffleMapStage (14)
                              +- PhotonShuffleExchangeSink (13)
                                 +- PhotonGroupingAgg (12)
                                    +- PhotonShuffleExchangeSource (11)
                                       +- PhotonShuffleMapStage (10)
                                          +- PhotonShuffleExchangeSink (9)
       

## Output: Parquet (partitioned by year, month) + read-back validation

In [0]:
OUT = "/Volumes/workspace/default/nyc_taxi/out_base"

(agg_by_month
 .write.mode("overwrite")
 .partitionBy("year","month")
 .parquet(OUT))

subset = spark.read.parquet(OUT).where("year=2019 AND month BETWEEN 9 AND 12")
subset.explain("formatted")
print("base rows:", spark.read.parquet(OUT).count())

== Physical Plan ==
* ColumnarToRow (3)
+- PhotonResultStage (2)
   +- PhotonScan parquet  (1)


(1) PhotonScan parquet 
Output [6]: [PU_Borough#12333, trips#12334L, avg_dist#12335, avg_tip_pct#12336, year#12337, month#12338]
Location: InMemoryFileIndex [dbfs:/Volumes/workspace/default/nyc_taxi/out_base]
PartitionFilters: [isnotnull(year#12337), isnotnull(month#12338), (year#12337 = 2019), (month#12338 >= 9), (month#12338 <= 12)]
ReadSchema: struct<PU_Borough:string,trips:bigint,avg_dist:double,avg_tip_pct:double>

(2) PhotonResultStage
Input [6]: [PU_Borough#12333, trips#12334L, avg_dist#12335, avg_tip_pct#12336, year#12337, month#12338]

(3) ColumnarToRow [codegen id : 1]
Input [6]: [PU_Borough#12333, trips#12334L, avg_dist#12335, avg_tip_pct#12336, year#12337, month#12338]


== Photon Explanation ==
The query is fully supported by Photon.
base rows: 10


## Caching optimization

In [0]:
dfX = df_enriched

In [0]:
from time import perf_counter

q = df_enriched.groupBy("PU_Borough").agg(F.avg("trip_distance").alias("avg_dist"))

t0 = perf_counter()
q.count()             # first operation (not cached)
t1 = perf_counter()

df_enriched.cache()
df_enriched.count()   # materialize cache

t2 = perf_counter()
q.count()             # second operation (reusing after caching)
t3 = perf_counter()

print(f"First run:  {t1 - t0:.2f}s")
print(f"After cache: {t3 - t2:.2f}s")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-8712686228892471>, line 9[0m
[1;32m      6[0m q[38;5;241m.[39mcount()             [38;5;66;03m# first operation (not cached)[39;00m
[1;32m      7[0m t1 [38;5;241m=[39m perf_counter()
[0;32m----> 9[0m df_enriched[38;5;241m.[39mcache()
[1;32m     10[0m df_enriched[38;5;241m.[39mcount()   [38;5;66;03m# materialize cache[39;00m
[1;32m     12[0m t2 [38;5;241m=[39m perf_counter()

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/dataframe.py:2093[0m, in [0;36mDataFrame.cache[0;34m(self)[0m
[1;32m   2092[0m [38;5;28;01mdef[39;00m [38;5;21mcache[39m([38;5;28mself[39m) [38;5;241m-[39m[38;5;241m>[39m ParentDataFrame:
[0;32m-> 2093[0m     [38;5;28;01mreturn[39;00m [38;5;28mself[39m[38;5;241m.[39mpersist()

File [0;32m/databri

 This notebook currently runs on a Serverless SQL Warehouse, which does not support DataFrame/table caching APIs (df.cache()/persist(), CLEAR CACHE).
 So the canonical .cache() demo is skipped.

## Actions vs Transformations

In [0]:
# Transformations are lazy: building a logical plan only
lazy_df = df_enriched.select("PU_Borough", "trip_distance")
lazy_df.explain("formatted") # no execution yet

== Physical Plan ==
AdaptiveSparkPlan (14)
+- == Initial Plan ==
   ColumnarToRow (13)
   +- PhotonResultStage (12)
      +- PhotonProject (11)
         +- PhotonBroadcastHashJoin LeftOuter (10)
            :- PhotonProject (2)
            :  +- PhotonScan parquet  (1)
            +- PhotonShuffleExchangeSource (9)
               +- PhotonShuffleMapStage (8)
                  +- PhotonShuffleExchangeSink (7)
                     +- PhotonProject (6)
                        +- PhotonFilter (5)
                           +- PhotonRowToColumnar (4)
                              +- Scan csv  (3)


(1) PhotonScan parquet 
Output [4]: [passenger_count#11041, trip_distance#11042, PULocationID#11045L, fare_amount#11048]
DictionaryFilters: [((passenger_count#11041 >= 1.0) AND (passenger_count#11041 <= 6.0)), (trip_distance#11042 > 0.0), (fare_amount#11048 > 0.0)]
Location: InMemoryFileIndex [dbfs:/Volumes/workspace/default/nyc_taxi/2019]
ReadSchema: struct<passenger_count:double,trip_distance:d

In [0]:
# Actions are eager: trigger execution
print("Action count:", lazy_df.count())

Action count: 81750701


In [0]:
# Another action, triggers again
lazy_df.limit(5).show()

+----------+-------------+
|PU_Borough|trip_distance|
+----------+-------------+
|    Queens|          3.7|
| Manhattan|         14.1|
|    Queens|          9.6|
| Manhattan|          0.8|
| Manhattan|          1.2|
+----------+-------------+



## MLlib: Linear Regression for predicting tip_amount

In [0]:
# === Baseline: LR on numeric features (pipeline + train/test metrics) ===
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

ml_df = (df_enriched
         .select("tip_amount","trip_distance","passenger_count","pickup_hour","is_rush_hour")
         .na.fill(0))

train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

assembler = VectorAssembler(
    inputCols=["trip_distance","passenger_count","pickup_hour","is_rush_hour"],
    outputCol="features"
)
lr = LinearRegression(labelCol="tip_amount", featuresCol="features",
                      maxIter=20, regParam=0.1, elasticNetParam=0.0)

pipe = Pipeline(stages=[assembler, lr])
model = pipe.fit(train_df)

def eval_reg(split_name, pred_df, label="tip_amount", predictionCol="prediction"):
    rmse = RegressionEvaluator(labelCol=label, predictionCol=predictionCol, metricName="rmse").evaluate(pred_df)
    r2   = RegressionEvaluator(labelCol=label, predictionCol=predictionCol, metricName="r2").evaluate(pred_df)
    print(f"{split_name.upper():<5}  RMSE={rmse:.4f}  R2={r2:.4f}")

pred_train = model.transform(train_df)
pred_test  = model.transform(test_df)
eval_reg("train", pred_train)
eval_reg("test",  pred_test)

lr_stage = model.stages[-1]
print("Coefficients:", lr_stage.coefficients.toArray().tolist(), "Intercept:", lr_stage.intercept)

pred_test.select("tip_amount","trip_distance","passenger_count","pickup_hour","is_rush_hour","prediction").show(5, truncate=False)


TRAIN  RMSE=2.2883  R2=0.2948
TEST   RMSE=2.9111  R2=-0.1032
Coefficients: [0.3643443692077242, -0.0187768575369093, 0.01146704126462563, 0.07652011843449306] Intercept: 0.9520979841075484
+----------+-------------+---------------+-----------+------------+------------------+
|tip_amount|trip_distance|passenger_count|pickup_hour|is_rush_hour|prediction        |
+----------+-------------+---------------+-----------+------------+------------------+
|0.0       |0.01         |1.0            |0          |false       |0.9369645702627164|
|0.0       |0.01         |1.0            |0          |false       |0.9369645702627164|
|0.0       |0.01         |1.0            |0          |false       |0.9369645702627164|
|0.0       |0.01         |1.0            |0          |false       |0.9369645702627164|
|0.0       |0.01         |1.0            |0          |false       |0.9369645702627164|
+----------+-------------+---------------+-----------+------------+------------------+
only showing top 5 rows
