In [None]:
## !pip install pyspark

In [1]:
# Section 1 — Setup & imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, date_trunc, hour, dayofweek, month, year, when, unix_timestamp,
    count, avg, corr, first
)
from pyspark.sql.types import TimestampType

# Create Spark session (adjust memory/config as needed)
spark = SparkSession.builder \
    .appName("NYC Taxi Weather Analysis") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print("Spark session created:", spark)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/03 18:45:30 WARN Utils: Your hostname, Lilys-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.140 instead (on interface en0)
25/12/03 18:45:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/03 18:45:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/03 18:45:31 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark session created: <pyspark.sql.session.SparkSession object at 0x10cf32120>


In [2]:
# Section 2 — File paths (UPDATE if different)
weather_path = "./Weather_Data/weather_2019_2020.csv"
taxi_glob_2019 = r"./Taxi_Data/yellow_tripdata_2019-*.csv"
taxi_glob_2020 = r"./Taxi_Data/yellow_tripdata_2020-*.csv"

print("Weather:", weather_path)
print("Taxi 2019 pattern:", taxi_glob_2019)
print("Taxi 2020 pattern:", taxi_glob_2020)


Weather: ./Weather_Data/weather_2019_2020.csv
Taxi 2019 pattern: ./Taxi_Data/yellow_tripdata_2019-*.csv
Taxi 2020 pattern: ./Taxi_Data/yellow_tripdata_2020-*.csv


In [3]:
# Section 3 — Load data into Spark DataFrames
weather_df = spark.read.option("header", True).option("inferSchema", True).csv(weather_path)

print("Weather schema:")
weather_df.printSchema()
weather_df.show(3, truncate=False)

taxi_df_2019 = spark.read.option("header", True).option("inferSchema", True).csv(taxi_glob_2019)
taxi_df_2020 = spark.read.option("header", True).option("inferSchema", True).csv(taxi_glob_2020)
taxi_df = taxi_df_2019.unionByName(taxi_df_2020)

print("Taxi schema:")
taxi_df.printSchema()
taxi_df.select('tpep_pickup_datetime','tpep_dropoff_datetime','PULocationID','DOLocationID','fare_amount').show(5, truncate=False)
print('Total taxi rows (approx):', taxi_df.count())


Weather schema:
root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- time_only: timestamp (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- temperature_2m (°C): double (nullable = true)
 |-- precipitation (mm): double (nullable = true)
 |-- rain (mm): double (nullable = true)
 |-- cloudcover (%): double (nullable = true)
 |-- cloudcover_low (%): double (nullable = true)
 |-- cloudcover_mid (%): double (nullable = true)
 |-- cloudcover_high (%): double (nullable = true)
 |-- windspeed_10m (km/h): double (nullable = true)
 |-- winddirection_10m (°): double (nullable = true)

+----+-----+---+-------------------+-------------------+-------------------+------------------+---------+--------------+------------------+------------------+-------------------+--------------------+---------------------+
|year|month|day|time_only          |time               |temperature_2m (°C)|precipitation (mm)|rain (mm)|cloudcover (%

25/12/03 18:46:24 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: ./Taxi_Data/yellow_tripdata_2019-*.csv.
java.io.FileNotFoundException: File Taxi_Data/yellow_tripdata_2019-*.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark

Taxi schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------------------+---------------------+------------+------------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|PULocationID|DOLocationID|f



Total taxi rows (approx): 101246797


                                                                                

In [4]:
# Section 4 — Normalize/rename weather columns to safe names (no spaces/special chars)
# Update these renames if your CSV uses slightly different headers.
rename_map = {
    'time': 'weather_time',
    'temperature_2m (°C)': 'temperature_c',
    'precipitation (mm)': 'precip_mm',
    'rain (mm)': 'rain_mm',
    'cloudcover (%)': 'cloudcover',
    'cloudcover_low (%)': 'cloudcover_low',
    'cloudcover_mid (%)': 'cloudcover_mid',
    'cloudcover_high (%)': 'cloudcover_high',
    'windspeed_10m (km/h)': 'windspeed_kmh',
    'winddirection_10m': 'winddir'
}

for old, new in rename_map.items():
    if old in weather_df.columns:
        weather_df = weather_df.withColumnRenamed(old, new)

# Cast weather_time to timestamp
weather_df = weather_df.withColumn('weather_time', col('weather_time').cast(TimestampType()))

print("Renamed weather cols:")
print(weather_df.columns)


Renamed weather cols:
['year', 'month', 'day', 'time_only', 'weather_time', 'temperature_c', 'precip_mm', 'rain_mm', 'cloudcover', 'cloudcover_low', 'cloudcover_mid', 'cloudcover_high', 'windspeed_kmh', 'winddirection_10m (°)']


In [5]:
# Section 5 — Prepare taxi timestamps and create pickup_hour
taxi_df = taxi_df.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType())) \
                 .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType()))

# Create pickup_hour (rounded/truncated to hour)
taxi_df = taxi_df.withColumn("pickup_hour", date_trunc("hour", col("tpep_pickup_datetime")))

# Quick check
taxi_df.select('tpep_pickup_datetime','pickup_hour').show(5, truncate=False)

+--------------------+-------------------+
|tpep_pickup_datetime|pickup_hour        |
+--------------------+-------------------+
|2019-01-01 00:46:40 |2019-01-01 00:00:00|
|2019-01-01 00:59:47 |2019-01-01 00:00:00|
|2018-12-21 13:48:30 |2018-12-21 13:00:00|
|2018-11-28 15:52:25 |2018-11-28 15:00:00|
|2018-11-28 15:56:57 |2018-11-28 15:00:00|
+--------------------+-------------------+
only showing top 5 rows


In [6]:
# Section 6 — Join taxi with weather (left join to keep all taxi records)
combined_df = taxi_df.join(weather_df, taxi_df.pickup_hour == weather_df.weather_time, how='left') \
                     .drop('weather_time')

print("Combined schema:")
combined_df.printSchema()

combined_df.select('pickup_hour','PULocationID','DOLocationID','fare_amount','temperature_c','rain_mm').show(5, truncate=False)

Combined schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- pickup_hour: timestamp (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = 

In [7]:
# Section 7 — Feature engineering
combined_df = combined_df.withColumn('hour', hour(col('pickup_hour'))) \
    .withColumn('day_of_week', dayofweek(col('pickup_hour'))) \
    .withColumn('month', month(col('pickup_hour'))) \
    .withColumn('year', year(col('pickup_hour'))) \
    .withColumn('is_rainy', when(col('rain_mm') > 0, 1).otherwise(0)) \
    .withColumn('is_cold', when(col('temperature_c') < 5, 1).otherwise(0))

# Trip duration in minutes
combined_df = combined_df.withColumn('duration_min', (unix_timestamp(col('tpep_dropoff_datetime')) - unix_timestamp(col('tpep_pickup_datetime'))) / 60)
combined_df.select('pickup_hour','hour','day_of_week','PULocationID','is_rainy','is_cold','duration_min').show(5, truncate=False)

+-------------------+----+-----------+------------+--------+-------+------------------+
|pickup_hour        |hour|day_of_week|PULocationID|is_rainy|is_cold|duration_min      |
+-------------------+----+-----------+------------+--------+-------+------------------+
|2019-01-01 00:00:00|0   |3          |151         |1       |0      |6.666666666666667 |
|2019-01-01 00:00:00|0   |3          |239         |1       |0      |19.2              |
|2018-12-21 13:00:00|13  |6          |236         |0       |0      |4.166666666666667 |
|2018-11-28 15:00:00|15  |4          |193         |0       |0      |3.3333333333333335|
|2018-11-28 15:00:00|15  |4          |193         |0       |0      |1.6               |
+-------------------+----+-----------+------------+--------+-------+------------------+
only showing top 5 rows


In [8]:
# Section 8 — Aggregations: hourly overall and hourly by zone

# City-wide hourly demand
city_hourly = combined_df.groupBy('pickup_hour','hour') \
    .agg(count('*').alias('trip_count'), avg('fare_amount').alias('avg_fare')) \
    .orderBy('pickup_hour')

city_hourly.show(5)

# Hourly demand by pickup zone
zone_hourly = combined_df.groupBy('pickup_hour','hour','PULocationID') \
    .agg(
        count('*').alias('trip_count'), 
        avg('fare_amount').alias('avg_fare'), 
        avg('duration_min').alias('avg_duration'), 
        avg('temperature_c').alias('avg_temp'), 
        avg('rain_mm').alias('avg_rain')
    ).orderBy('pickup_hour','PULocationID')

zone_hourly.show(5)

                                                                                

+-------------------+----+----------+------------------+
|        pickup_hour|hour|trip_count|          avg_fare|
+-------------------+----+----------+------------------+
|2001-01-01 00:00:00|   0|         2|               8.5|
|2001-02-02 14:00:00|  14|         1|               2.5|
|2002-02-02 01:00:00|   1|        11|13.954545454545455|
|2002-12-31 22:00:00|  22|         1|               0.0|
|2002-12-31 23:00:00|  23|         1|               3.0|
+-------------------+----+----------+------------------+
only showing top 5 rows


[Stage 23:>                                                         (0 + 8) / 9]

+-------------------+----+------------+----------+--------+------------------+--------+--------+
|        pickup_hour|hour|PULocationID|trip_count|avg_fare|      avg_duration|avg_temp|avg_rain|
+-------------------+----+------------+----------+--------+------------------+--------+--------+
|2001-01-01 00:00:00|   0|          48|         1|    13.5|            390.25|    NULL|    NULL|
|2001-01-01 00:00:00|   0|         151|         1|     3.5|              57.9|    NULL|    NULL|
|2001-02-02 14:00:00|  14|         193|         1|     2.5|12.333333333333334|    NULL|    NULL|
|2002-02-02 01:00:00|   1|          48|         4|  10.625| 293.9791666666667|    NULL|    NULL|
|2002-02-02 01:00:00|   1|          79|         1|    15.0|1374.9666666666667|    NULL|    NULL|
+-------------------+----+------------+----------+--------+------------------+--------+--------+
only showing top 5 rows


                                                                                

In [None]:
#Section 9 : save cleaned dataframes to parquet
combined_df.write.option("compression", "snappy").mode("overwrite").parquet("Cleaned_Parquet/combined/")
zone_hourly.write.option("compression", "snappy").mode("overwrite").parquet("Cleaned_Parquet/zone_hourly/")
city_hourly.write.option("compression", "snappy").mode("overwrite").parquet("Cleaned_Parquet/city_hourly/")
taxi_df.write.option("compression", "snappy").mode("overwrite").parquet("Cleaned_Parquet/taxi_clean/")
weather_df.write.option("compression", "snappy").mode("overwrite").parquet("Cleaned_Parquet/weather_clean/")





25/12/03 18:56:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/12/03 18:56:03 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/12/03 18:56:05 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/12/03 18:56:06 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/12/03 18:56:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/12/03 18:56:09 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/12/03 18:56:11 WARN MemoryManager: Total allocation exceeds 95.00% 