In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, expr, sum as spark_sum, avg, max, min
from pyspark.sql.types import *

## Data Cleaning


In [2]:
spark = SparkSession.builder \
    .appName("NYC Taxi Data") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [4]:
df = spark.read.parquet("hdfs://namenode:9000/data/merged_data.parquet")

In [4]:
df.printSchema()

root
 |-- vendorid: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- ratecodeid: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pulocationid: long (nullable = true)
 |-- dolocationid: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



We notice that `passenger_count`, `ratecodeid` and `store_and_fwd_flag` are not numeric.

In [5]:
print("Dataset shape:")
print(f"Rows: {df.count()}")
print(f"Columns: {len(df.columns)}")

Dataset shape:
Rows: 77966324
Columns: 19


In [6]:
df.select("fare_amount").describe().show()

+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|          77966324|
|   mean|14.863480149198923|
| stddev|15924.270135537783|
|    min|     -1.33391414E8|
|    max|         401092.32|
+-------+------------------+



The minimum fare is -133 million and max is 401k, with a huge standard deviation.


In [7]:
from pyspark.sql import functions as F

issue_counts = df.select(
    F.sum(F.when(F.col("fare_amount") < 0, 1).otherwise(0)).alias("negative_fares"),
    F.sum(F.when(F.col("fare_amount") == 0, 1).otherwise(0)).alias("zero_fares"),
    F.sum(F.when(F.col("fare_amount") > 1000, 1).otherwise(0)).alias("fares_over_1000"),
    F.sum(F.when(F.col("fare_amount") > 100, 1).otherwise(0)).alias("fares_over_100"),
    F.sum(F.when(F.col("fare_amount").isNull(), 1).otherwise(0)).alias("null_fares")
).collect()[0]

total_rows = df.count()

print(f"Total rows: {total_rows:,}")
print(f"Negative fares: {issue_counts['negative_fares']:,} ({issue_counts['negative_fares']/total_rows*100:.2f}%)")
print(f"Zero fares: {issue_counts['zero_fares']:,} ({issue_counts['zero_fares']/total_rows*100:.2f}%)")
print(f"Fares over $100: {issue_counts['fares_over_100']:,} ({issue_counts['fares_over_100']/total_rows*100:.2f}%)")
print(f"Fares over $1000: {issue_counts['fares_over_1000']:,} ({issue_counts['fares_over_1000']/total_rows*100:.2f}%)")
print(f"Null fares: {issue_counts['null_fares']:,} ({issue_counts['null_fares']/total_rows*100:.2f}%)")

Total rows: 77,966,324
Negative fares: 634,550 (0.81%)
Zero fares: 30,127 (0.04%)
Fares over $100: 179,373 (0.23%)
Fares over $1000: 108 (0.00%)
Null fares: 0 (0.00%)


In [9]:
print("5 most negative fares:")
df.select("fare_amount", "trip_distance", "total_amount", "payment_type") \
  .filter(F.col("fare_amount") < 0) \
  .orderBy("fare_amount") \
  .show(5)

5 most negative fares:
+--------------+-------------+------------+------------+
|   fare_amount|trip_distance|total_amount|payment_type|
+--------------+-------------+------------+------------+
| -1.33391414E8|          0.0|      -47.17|           0|
|-4.446376892E7|          0.0|       21.08|           0|
|       -2564.0|        68.07|     -2567.8|           3|
|       -1633.3|        29.03|     -1635.8|           2|
|       -1311.5|        57.53|     -1314.8|           2|
+--------------+-------------+------------+------------+
only showing top 5 rows



In [10]:
print("5 highest fares:")
df.select("fare_amount", "trip_distance", "total_amount", "payment_type") \
  .orderBy(F.desc("fare_amount")) \
  .show(5)

5 highest fares:
+-----------+-------------+------------+------------+
|fare_amount|trip_distance|total_amount|payment_type|
+-----------+-------------+------------+------------+
|  401092.32|          3.3|   401095.62|           4|
|  395844.94|          1.2|   395848.24|           3|
|  386983.63|          1.5|   386987.63|           2|
|  187502.96|         21.3|    187513.9|           2|
|  187444.96|          3.5|   187448.26|           4|
+-----------+-------------+------------+------------+
only showing top 5 rows



In [11]:
print("5 zero fare examples:")
df.select("fare_amount", "trip_distance", "total_amount", "payment_type") \
  .filter(F.col("fare_amount") == 0) \
  .show(5)

5 zero fare examples:
+-----------+-------------+------------+------------+
|fare_amount|trip_distance|total_amount|payment_type|
+-----------+-------------+------------+------------+
|        0.0|         12.0|        1.55|           4|
|        0.0|          2.8|        35.3|           1|
|        0.0|          0.0|         0.0|           1|
|        0.0|          0.0|         0.0|           3|
|        0.0|          6.1|       12.05|           2|
+-----------+-------------+------------+------------+
only showing top 5 rows



The extreme values are likely data entry errors or system glitches.

In [6]:
from pyspark.sql import functions as F

min_reasonable_fare = 0.50  
max_reasonable_fare = 500.0

original_count = df.count()

df_cleaned = df.filter(
    (F.col("fare_amount") >= min_reasonable_fare) & 
    (F.col("fare_amount") <= max_reasonable_fare)
)

cleaned_count = df_cleaned.count()
removed_count = original_count - cleaned_count

print(f"Records after cleaning: {cleaned_count:,}")
print(f"Records removed: {removed_count:,} ({removed_count/original_count*100:.2f}%)")

print(f"\nNew fare_amount statistics:")
df_cleaned.select("fare_amount").describe().show()

df = df_cleaned
print("✓ Extreme fare values removed")

Records after cleaning: 77,293,425
Records removed: 672,899 (0.86%)

New fare_amount statistics:
+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|          77293425|
|   mean|17.409338565861248|
| stddev|16.379684819213573|
|    min|               0.5|
|    max|             500.0|
+-------+------------------+

✓ Extreme fare values removed


In [13]:
print("trip_distance statistics:")
df.select("trip_distance").describe().show()

distance_issues = df.select(
    F.sum(F.when(F.col("trip_distance") < 0, 1).otherwise(0)).alias("negative_distance"),
    F.sum(F.when(F.col("trip_distance") == 0, 1).otherwise(0)).alias("zero_distance"),
    F.sum(F.when(F.col("trip_distance") > 100, 1).otherwise(0)).alias("distance_over_100"),
    F.sum(F.when(F.col("trip_distance") > 500, 1).otherwise(0)).alias("distance_over_500"),
    F.sum(F.when(F.col("trip_distance").isNull(), 1).otherwise(0)).alias("null_distance")
).collect()[0]

total_rows = df.count()
print(f"\ntrip_distance issues:")
print(f"Negative distances: {distance_issues['negative_distance']:,}")
print(f"Zero distances: {distance_issues['zero_distance']:,} ({distance_issues['zero_distance']/total_rows*100:.2f}%)")
print(f"Distances over 100 miles: {distance_issues['distance_over_100']:,}")
print(f"Distances over 500 miles: {distance_issues['distance_over_500']:,}")
print(f"Null distances: {distance_issues['null_distance']:,}")

trip_distance statistics:
+-------+------------------+
|summary|     trip_distance|
+-------+------------------+
|  count|          77293425|
|   mean| 5.041580006449956|
| stddev|458.21388927919924|
|    min|               0.0|
|    max|         389678.46|
+-------+------------------+


trip_distance issues:
Negative distances: 0
Zero distances: 1,252,864 (1.62%)
Distances over 100 miles: 2,549
Distances over 500 miles: 2,017
Null distances: 0


The trip_distance also has extreme outliers! A max of 389,678 miles is impossible for a taxi trip.

In [14]:
print("Top 5 longest distances:")
df.select("trip_distance", "fare_amount", "total_amount", "tpep_pickup_datetime") \
  .orderBy(F.desc("trip_distance")) \
  .show(5)

print("\n5 examples of zero distance trips:")
df.select("trip_distance", "fare_amount", "total_amount", "payment_type") \
  .filter(F.col("trip_distance") == 0) \
  .show(5)

print("\nSample of trips over 100 miles:")
df.select("trip_distance", "fare_amount", "total_amount") \
  .filter(F.col("trip_distance") > 100) \
  .orderBy(F.desc("trip_distance")) \
  .show(5)

Top 5 longest distances:
+-------------+-----------+------------+--------------------+
|trip_distance|fare_amount|total_amount|tpep_pickup_datetime|
+-------------+-----------+------------+--------------------+
|    389678.46|      12.11|       18.83| 2022-10-28 05:19:00|
|    357192.65|      12.46|       20.13| 2022-05-15 18:45:00|
|    348798.53|       20.6|        27.9| 2022-02-15 18:24:00|
|    345729.44|      13.27|       17.27| 2023-08-15 04:02:00|
|    344408.48|      63.12|       73.97| 2022-05-19 17:00:00|
+-------------+-----------+------------+--------------------+
only showing top 5 rows


5 examples of zero distance trips:
+-------------+-----------+------------+------------+
|trip_distance|fare_amount|total_amount|payment_type|
+-------------+-----------+------------+------------+
|          0.0|        2.5|         6.3|           4|
|          0.0|        2.5|         6.3|           2|
|          0.0|        2.5|         6.3|           2|
|          0.0|        2.5|     

These extreme distances are clearly data errors - 389,678 miles with only a \$12 fare is impossible! Zero distance trips might be legitimate (waiting time, cancellations), but the extreme distances need to be removed. Let's clean `trip_distance`:


In [7]:
original_count = df.count()
print(f"Records before distance cleaning: {original_count:,}")

max_reasonable_distance = 100.0

df_cleaned = df.filter(F.col("trip_distance") <= max_reasonable_distance)

cleaned_count = df_cleaned.count()
removed_count = original_count - cleaned_count

print(f"Records after distance cleaning: {cleaned_count:,}")
print(f"Records removed: {removed_count:,} ({removed_count/original_count*100:.3f}%)")

print(f"\nNew trip_distance statistics:")
df_cleaned.select("trip_distance").describe().show()

df = df_cleaned
print("✓ Extreme trip distances removed")

Records before distance cleaning: 77,293,425
Records after distance cleaning: 77,290,876
Records removed: 2,549 (0.003%)

New trip_distance statistics:
+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|  count|         77290876|
|   mean|3.484159909637998|
| stddev|4.498495606382115|
|    min|              0.0|
|    max|            99.92|
+-------+-----------------+

✓ Extreme trip distances removed


In [16]:
df.select("passenger_count").distinct().show(truncate=False)
df.select("ratecodeid").distinct().show(truncate=False)
df.select("store_and_fwd_flag").distinct().show(truncate=False)

+---------------+
|passenger_count|
+---------------+
|7              |
|3              |
|8              |
|0              |
|5              |
|6              |
|9              |
|1              |
|4              |
|2              |
|NULL           |
+---------------+

+----------+
|ratecodeid|
+----------+
|3         |
|99        |
|5         |
|6         |
|1         |
|4         |
|2         |
|NULL      |
+----------+

+------------------+
|store_and_fwd_flag|
+------------------+
|Y                 |
|N                 |
|NULL              |
+------------------+



In [8]:
df = df.withColumn("passenger_count", col("passenger_count").cast("integer"))
df = df.filter(
    (df.passenger_count.isNotNull()) &
    (df.passenger_count.between(1, 6))
)

In [9]:
df = df.withColumn("RatecodeID", col("RatecodeID").cast("integer"))

df = df.filter(
    df.RatecodeID.isNull() | df.RatecodeID.between(1, 6)
)

In [20]:
df.filter(col("store_and_fwd_flag").isNull()).count()

0

In [22]:
df.select("payment_type").distinct().show()

+------------+
|payment_type|
+------------+
|           5|
|           1|
|           3|
|           2|
|           4|
+------------+



In [23]:
df.select("VendorID").distinct().show()

+--------+
|VendorID|
+--------+
|       1|
|       2|
+--------+



In [24]:
df.select("payment_type").distinct().show()

+------------+
|payment_type|
+------------+
|           5|
|           1|
|           3|
|           2|
|           4|
+------------+



In [25]:
df.select("store_and_fwd_flag").distinct().show()

+------------------+
|store_and_fwd_flag|
+------------------+
|                 Y|
|                 N|
+------------------+



In [10]:
df = df.withColumn(
    "store_and_fwd_flag",
    when(col("store_and_fwd_flag") == "Y", 1)
    .when(col("store_and_fwd_flag") == "N", 0)
    .otherwise(None)
)

In [26]:
df.select(
  F.sum(F.when(F.col("PULocationID").isNull(), 1).otherwise(0)).alias("null_pu"),
  F.sum(F.when(F.col("DOLocationID").isNull(), 1).otherwise(0)).alias("null_do")
).show()


+-------+-------+
|null_pu|null_do|
+-------+-------+
|      0|      0|
+-------+-------+



In [29]:
df.select(
    "extra", "mta_tax", "tip_amount", "tolls_amount",
    "improvement_surcharge", "congestion_surcharge",
    "airport_fee"
).describe().show()

+-------+------------------+-------------------+------------------+-----------------+---------------------+--------------------+-------------------+
|summary|             extra|            mta_tax|        tip_amount|     tolls_amount|improvement_surcharge|congestion_surcharge|        airport_fee|
+-------+------------------+-------------------+------------------+-----------------+---------------------+--------------------+-------------------+
|  count|          72933592|           72933592|          72933592|         72933592|             72933592|            72933592|           72933592|
|   mean|1.3122974545117687|0.49531238856299076|3.1734939658555334|0.564894463305165|   0.6539529905657337|  2.3210445407652487|0.12225553692734617|
| stddev| 1.590529466535159| 0.0502140356667076|3.7336321153269836|2.100107551285682|   0.3500765907040568|  0.6444855616210668| 0.4084229381395421|
|    min|            -22.18|                0.0|               0.0|              0.0|                  0.0

In [11]:
cols_to_clean = ["extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "congestion_surcharge", "airport_fee"]

for c in cols_to_clean:
    df = df.withColumn(c, when(col(c) < 0, 0).otherwise(col(c)))

In [37]:
null_counts = df.select([F.sum(col(c).isNull().cast("int")).alias(c + "_nulls") for c in cols_to_clean])
null_counts.show()

+-----------+-------------+----------------+------------------+---------------------------+--------------------------+-----------------+
|extra_nulls|mta_tax_nulls|tip_amount_nulls|tolls_amount_nulls|improvement_surcharge_nulls|congestion_surcharge_nulls|airport_fee_nulls|
+-----------+-------------+----------------+------------------+---------------------------+--------------------------+-----------------+
|          0|            0|               0|                 0|                          0|                         0|                0|
+-----------+-------------+----------------+------------------+---------------------------+--------------------------+-----------------+



In [12]:
df= df_cleaned
datetime_nulls = df.select(
    F.sum(F.when(F.col("tpep_pickup_datetime").isNull(), 1).otherwise(0)).alias("null_pickup"),
    F.sum(F.when(F.col("tpep_dropoff_datetime").isNull(), 1).otherwise(0)).alias("null_dropoff")
).collect()[0]

print(f"Null pickup datetimes: {datetime_nulls['null_pickup']:,}")
print(f"Null dropoff datetimes: {datetime_nulls['null_dropoff']:,}")

date_ranges = df.select(
    F.min("tpep_pickup_datetime").alias("earliest_pickup"),
    F.max("tpep_pickup_datetime").alias("latest_pickup"),
    F.min("tpep_dropoff_datetime").alias("earliest_dropoff"),
    F.max("tpep_dropoff_datetime").alias("latest_dropoff")
).collect()[0]

print(f"\nDate ranges:")
print(f"Pickup dates: {date_ranges['earliest_pickup']} to {date_ranges['latest_pickup']}")
print(f"Dropoff dates: {date_ranges['earliest_dropoff']} to {date_ranges['latest_dropoff']}")
df_with_duration = df.withColumn(
    "trip_duration_seconds", 
    F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime")
)

duration_issues = df_with_duration.select(
    F.sum(F.when(F.col("trip_duration_seconds") < 0, 1).otherwise(0)).alias("negative_duration"),
    F.sum(F.when(F.col("trip_duration_seconds") == 0, 1).otherwise(0)).alias("zero_duration"),
    F.sum(F.when(F.col("trip_duration_seconds") > 86400, 1).otherwise(0)).alias("over_24_hours")
).collect()[0]

total_rows = df.count()
print(f"\nTrip duration issues:")
print(f"Negative duration (dropoff before pickup): {duration_issues['negative_duration']:,}")
print(f"Zero duration: {duration_issues['zero_duration']:,}")
print(f"Over 24 hours: {duration_issues['over_24_hours']:,} ({duration_issues['over_24_hours']/total_rows*100:.3f}%)")

Null pickup datetimes: 0
Null dropoff datetimes: 0

Date ranges:
Pickup dates: 2001-01-01 00:03:14 to 2024-01-03 19:42:57
Dropoff dates: 1970-01-20 10:16:32 to 2024-01-03 20:15:55

Trip duration issues:
Negative duration (dropoff before pickup): 16,070
Zero duration: 29,090
Over 24 hours: 535 (0.001%)


In [13]:
df_cleaned = df_with_duration.filter(
    (F.col("trip_duration_seconds") > 0) & 
    (F.col("trip_duration_seconds") <= 86400)  # 24 hours
)


In [None]:
df_cleaned.write.parquet("hdfs://namenode:9000/data/cleaned_data.parquet")