# Problem 1: Daily Summaries of Key Metrics for NYC TLC Data

This notebook downloads 6 months of NYC TLC data (Jan-June 2021), combines them into a single DataFrame, and calculates daily summaries.

## Requirements
1. Derive `dt_year`, `dt_month`, `dt_day` from `tpep_pickup_datetime`
2. Filter for year 2021
3. Calculate daily summaries:
   - Number of trips
   - Average trip_distance
   - Max mta_tax
   - 95th percentile of fare_amount
   - Min tip_amount
   - Average passenger_count (rounded up)
4. Sort by dt_year, dt_month, dt_day in descending order

## Setup: Import Libraries

In [1]:
import os
import subprocess
import sys
import time
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    year, month, dayofmonth,
    avg, count, max as spark_max, min as spark_min,
    expr, ceil, percentile_approx
)
import pandas as pd

## Configure Logging

In [2]:
# Configure logging with basicConfig
logging.basicConfig(
    level=logging.INFO,  # Set the log level to INFO
    # Define log message format
    format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)

logger = logging.getLogger(__name__)

## Create Spark Session

This cell creates a Spark session optimized for Problem 1 with:
- 4GB driver memory
- Local execution using all available cores
- Adaptive query execution enabled
- Arrow optimization for Pandas conversion

In [3]:
spark = (
    SparkSession.builder
    .appName("Problem1_DailySummaries")

    # Memory Configuration
    .config("spark.driver.memory", "4g")
    .config("spark.driver.maxResultSize", "2g")

    # Performance settings for local execution
    .config("spark.master", "local[*]")  # Use all available cores
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")

    # Serialization
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    # Arrow optimization for Pandas conversion
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")

    .getOrCreate()
)

logger.info("Spark session created successfully for Problem 1")
print("✅ Spark session created successfully!")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/06 15:00:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-10-06 15:00:28,638,p17536,{3812127214.py:23},INFO,Spark session created successfully for Problem 1


✅ Spark session created successfully!


## Download NYC TLC Data

This cell downloads 6 months of NYC TLC data (January-June 2021) from S3.
Files are saved to the `data/` directory and are skipped if already downloaded.

In [4]:
def download_monthly_data(months_to_download):
    """Download multiple months of NYC TLC data from S3."""

    logger.info(f"Starting download of {len(months_to_download)} months of NYC TLC data")
    print(f"\nDownloading {len(months_to_download)} months of NYC TLC data...")
    print("=" * 60)

    downloaded_files = []
    total_size = 0

    # Create data directory if it doesn't exist
    os.makedirs("data", exist_ok=True)

    for month_num in months_to_download:
        month_str = f"{month_num:02d}"
        s3_path = f"s3://bigdatateaching/nyc_tlc/trip_data/yyyy=2021/yellow_tripdata_2021-{month_str}.parquet"
        local_path = f"data/yellow_tripdata_2021-{month_str}.parquet"

        print(f"  Downloading month {month_str}/2021...")
        logger.debug(f"Processing month {month_str}: S3 path={s3_path}, local path={local_path}")

        try:
            # Check if file already exists
            if os.path.exists(local_path):
                file_size = os.path.getsize(local_path)
                logger.info(f"Month {month_str} file already exists locally: {file_size/1024/1024:.1f} MB")
                print(f"    ✅ Already exists: {file_size/1024/1024:.1f} MB")
                downloaded_files.append(local_path)
                total_size += file_size
                continue

            # Download from S3
            logger.info(f"Starting S3 download for month {month_str}")
            result = subprocess.run([
                "aws", "s3", "cp", s3_path, local_path
            ], capture_output=True, text=True, timeout=600)

            if result.returncode == 0:
                file_size = os.path.getsize(local_path)
                logger.info(f"Successfully downloaded month {month_str}: {file_size/1024/1024:.1f} MB")
                print(f"    ✅ Downloaded: {file_size/1024/1024:.1f} MB")
                downloaded_files.append(local_path)
                total_size += file_size
            else:
                logger.error(f"Failed to download month {month_str}: {result.stderr}")
                print(f"    ❌ Failed: {result.stderr}")

        except subprocess.TimeoutExpired:
            logger.error(f"Download timeout for month {month_str}")
            print(f"    ❌ Download timed out")
        except Exception as e:
            logger.error(f"Unexpected error downloading month {month_str}: {str(e)}")
            print(f"    ❌ Error: {str(e)}")

    logger.info(f"Download complete: {len(downloaded_files)} files successfully downloaded, total size: {total_size/1024/1024:.1f} MB")
    print(f"\n✅ Downloaded {len(downloaded_files)} files, total size: {total_size/1024/1024:.1f} MB")
    return downloaded_files


# Download 6 months of data (January to June 2021)
months_to_download = [1, 2, 3]
data_files = download_monthly_data(months_to_download)

if len(data_files) == 0:
    print("❌ No data files available. Cannot proceed with analysis")
else:
    print(f"✅ Ready to process {len(data_files)} data files")

2025-10-06 15:00:39,230,p17536,{3534718927.py:4},INFO,Starting download of 3 months of NYC TLC data
2025-10-06 15:00:39,232,p17536,{3534718927.py:33},INFO,Starting S3 download for month 01



Downloading 3 months of NYC TLC data...
  Downloading month 01/2021...


2025-10-06 15:00:40,848,p17536,{3534718927.py:40},INFO,Successfully downloaded month 01: 20.7 MB
2025-10-06 15:00:40,849,p17536,{3534718927.py:33},INFO,Starting S3 download for month 02


    ✅ Downloaded: 20.7 MB
  Downloading month 02/2021...


2025-10-06 15:00:42,219,p17536,{3534718927.py:40},INFO,Successfully downloaded month 02: 20.8 MB
2025-10-06 15:00:42,220,p17536,{3534718927.py:33},INFO,Starting S3 download for month 03


    ✅ Downloaded: 20.8 MB
  Downloading month 03/2021...


2025-10-06 15:00:43,680,p17536,{3534718927.py:40},INFO,Successfully downloaded month 03: 28.6 MB
2025-10-06 15:00:43,682,p17536,{3534718927.py:55},INFO,Download complete: 3 files successfully downloaded, total size: 70.1 MB


    ✅ Downloaded: 28.6 MB

✅ Downloaded 3 files, total size: 70.1 MB
✅ Ready to process 3 data files


## Load Data into Spark DataFrame

Read all downloaded parquet files into a single Spark DataFrame.

In [5]:
logger.info(f"Reading {len(data_files)} parquet files into single DataFrame")
print("Reading all data files into a single DataFrame...")
nyc_tlc = spark.read.parquet(*data_files)

total_rows = nyc_tlc.count()
logger.info(f"Successfully loaded {total_rows:,} total rows from {len(data_files)} files")
print(f"✅ Loaded {total_rows:,} total rows from {len(data_files)} files")

2025-10-06 15:00:47,331,p17536,{1907117768.py:1},INFO,Reading 3 parquet files into single DataFrame


Reading all data files into a single DataFrame...


2025-10-06 15:00:55,153,p17536,{1907117768.py:6},INFO,Successfully loaded 4,666,630 total rows from 3 files


✅ Loaded 4,666,630 total rows from 3 files


## Explore the Data Schema

Let's examine the structure of the dataset before we begin our analysis.

In [6]:
# Display the schema
nyc_tlc.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [7]:
# Show sample records
print("Sample records from the dataset:")
nyc_tlc.show(5, truncate=False)

Sample records from the dataset:


[Stage 4:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|2       |2021-03-01 00:22:02 |2021-03-01 00:23:22  |1.0            |0.0          |1.0       |N                 |264         |264         |2           |3.0        |0.5  |0.5    |0.0      

                                                                                

## Step 1: Derive Date Columns

Extract year, month, and day from the `tpep_pickup_datetime` column.

In [8]:
logger.info("Step 1: Deriving date columns from tpep_pickup_datetime")
print("Step 1: Deriving date columns from tpep_pickup_datetime...")

nyc_tlc = (nyc_tlc
    .withColumn("dt_year", year("tpep_pickup_datetime"))
    .withColumn("dt_month", month("tpep_pickup_datetime"))
    .withColumn("dt_day", dayofmonth("tpep_pickup_datetime"))
)

print("✅ Date columns added")
print("\nSample with date columns:")
nyc_tlc.select("tpep_pickup_datetime", "dt_year", "dt_month", "dt_day").show(5)

2025-10-06 15:01:02,786,p17536,{4229086798.py:1},INFO,Step 1: Deriving date columns from tpep_pickup_datetime


Step 1: Deriving date columns from tpep_pickup_datetime...
✅ Date columns added

Sample with date columns:
+--------------------+-------+--------+------+
|tpep_pickup_datetime|dt_year|dt_month|dt_day|
+--------------------+-------+--------+------+
| 2021-03-01 00:22:02|   2021|       3|     1|
| 2021-03-01 00:24:48|   2021|       3|     1|
| 2021-03-01 00:25:17|   2021|       3|     1|
| 2021-03-01 00:07:40|   2021|       3|     1|
| 2021-03-01 00:02:13|   2021|       3|     1|
+--------------------+-------+--------+------+
only showing top 5 rows


## Step 2: Filter for Year 2021

Ensure we only include data from 2021.

In [9]:
logger.info("Step 2: Filtering data for year 2021")
print("Step 2: Filtering data for year 2021...")

nyc_tlc_2021 = nyc_tlc.filter(nyc_tlc.dt_year == 2021)
filtered_rows = nyc_tlc_2021.count()

logger.info(f"Filtered dataset to {filtered_rows:,} rows for year 2021")
print(f"✅ Filtered to {filtered_rows:,} rows for year 2021")

2025-10-06 15:01:06,416,p17536,{216681216.py:1},INFO,Step 2: Filtering data for year 2021


Step 2: Filtering data for year 2021...


2025-10-06 15:01:09,597,p17536,{216681216.py:7},INFO,Filtered dataset to 4,666,578 rows for year 2021


✅ Filtered to 4,666,578 rows for year 2021


## Step 3: Calculate Daily Summaries

Group by date and calculate the required metrics:
- Number of trips
- Average trip distance
- Maximum MTA tax
- 95th percentile of fare amount
- Minimum tip amount
- Average passenger count (rounded up)

In [10]:
logger.info("Step 3: Calculating daily summaries with aggregations")
print("Step 3: Calculating daily summaries...")

daily_averages = (nyc_tlc_2021
    .groupBy("dt_year", "dt_month", "dt_day")
    .agg(
        count("*").alias("num_trips"),
        avg("trip_distance").alias("mean_trip_distance"),
        spark_max("mta_tax").alias("max_mta_tax"),
        expr("percentile_approx(fare_amount, 0.95)").alias("q95_fare_amount"),
        spark_min("tip_amount").alias("min_tip_amount"),
        ceil(avg("passenger_count")).alias("mean_passenger_count")
    )
)

print("✅ Daily summaries calculated")

2025-10-06 15:01:10,586,p17536,{2923183392.py:1},INFO,Step 3: Calculating daily summaries with aggregations


Step 3: Calculating daily summaries...
✅ Daily summaries calculated


## Step 4: Sort by Date (Descending)

Sort the results by year, month, and day in descending order.

In [11]:
logger.info("Step 4: Sorting results by date in descending order")
print("Step 4: Sorting by date (descending)...")

daily_averages = daily_averages.orderBy(
    "dt_year", "dt_month", "dt_day",
    ascending=[False, False, False]
)

print("✅ Results sorted")

2025-10-06 15:01:16,383,p17536,{2655154960.py:1},INFO,Step 4: Sorting results by date in descending order


Step 4: Sorting by date (descending)...
✅ Results sorted


## Step 5: Display Results

View the top daily summaries.

In [12]:
logger.info("Step 5: Displaying results")
print("\nTop 20 daily summaries (sorted in descending order):")
daily_averages.show(20)

# Get total number of days
total_days = daily_averages.count()
logger.info(f"Calculated summaries for {total_days} days")
print(f"\n✅ Calculated summaries for {total_days} days")

2025-10-06 15:01:19,545,p17536,{3530814441.py:1},INFO,Step 5: Displaying results



Top 20 daily summaries (sorted in descending order):


                                                                                

+-------+--------+------+---------+------------------+-----------+---------------+--------------+--------------------+
|dt_year|dt_month|dt_day|num_trips|mean_trip_distance|max_mta_tax|q95_fare_amount|min_tip_amount|mean_passenger_count|
+-------+--------+------+---------+------------------+-----------+---------------+--------------+--------------------+
|   2021|       5|    27|        3|2.7566666666666664|        0.5|           14.0|           0.0|                   1|
|   2021|       5|     5|        4|             1.245|        0.5|            9.5|           0.0|                   1|
|   2021|       4|    14|        3|1.3633333333333333|        0.5|           10.0|           0.0|                   1|
|   2021|       4|     1|        5|3.7840000000000003|        0.5|           22.5|           0.0|                   2|
|   2021|       3|    31|    67533|3.0777288140612673|        3.3|          35.06|        -30.56|                   2|
|   2021|       3|    30|    66702|2.92606488561

2025-10-06 15:01:32,147,p17536,{3530814441.py:7},INFO,Calculated summaries for 94 days



✅ Calculated summaries for 94 days


## Step 6: Convert to Pandas DataFrame

Convert the Spark DataFrame to Pandas for easier manipulation and export.

In [13]:
logger.info("Step 6: Converting Spark DataFrame to Pandas DataFrame")
print("Step 6: Converting to Pandas DataFrame...")

pandas_df = daily_averages.toPandas()
print("✅ Converted to Pandas")

# Display the Pandas DataFrame
pandas_df.head(10)

2025-10-06 15:01:41,602,p17536,{969615932.py:1},INFO,Step 6: Converting Spark DataFrame to Pandas DataFrame
  [PACKAGE_NOT_INSTALLED] PyArrow >= 11.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Step 6: Converting to Pandas DataFrame...


                                                                                

✅ Converted to Pandas


Unnamed: 0,dt_year,dt_month,dt_day,num_trips,mean_trip_distance,max_mta_tax,q95_fare_amount,min_tip_amount,mean_passenger_count
0,2021,5,27,3,2.756667,0.5,14.0,0.0,1
1,2021,5,5,4,1.245,0.5,9.5,0.0,1
2,2021,4,14,3,1.363333,0.5,10.0,0.0,1
3,2021,4,1,5,3.784,0.5,22.5,0.0,2
4,2021,3,31,67533,3.077729,3.3,35.06,-30.56,2
5,2021,3,30,66702,2.926065,0.5,35.0,-1.36,2
6,2021,3,29,58127,4.058761,0.5,35.95,-0.86,2
7,2021,3,28,40404,3.386357,0.5,42.5,-1.0,2
8,2021,3,27,67392,2.955325,3.3,32.5,-0.88,2
9,2021,3,26,73535,4.345161,0.5,34.5,-13.82,2


## Summary Statistics

Display key statistics about the analysis.

In [14]:
print("=" * 60)
print("PROBLEM 1 COMPLETED - Summary Statistics")
print("=" * 60)
print(f"Total rows processed: {filtered_rows:,}")
print(f"Days with data: {total_days}")
print(f"Date range: {pandas_df['dt_month'].min():02d}/{pandas_df['dt_day'].min():02d}/2021 to {pandas_df['dt_month'].max():02d}/{pandas_df['dt_day'].max():02d}/2021")
print(f"Total trips: {pandas_df['num_trips'].sum():,}")
print(f"Average daily trips: {pandas_df['num_trips'].mean():.0f}")

PROBLEM 1 COMPLETED - Summary Statistics
Total rows processed: 4,666,578
Days with data: 94
Date range: 01/01/2021 to 05/31/2021
Total trips: 4,666,578
Average daily trips: 49644


## Detailed Sample Results

View formatted details for the first few days.

In [15]:
print("=" * 60)
print("Sample Results (first 5 rows):")
print("=" * 60)

sample_df = pandas_df.head(5)
for _, row in sample_df.iterrows():
    print(f"Year: {int(row['dt_year'])}, Month: {int(row['dt_month']):02d}, Day: {int(row['dt_day']):02d}")
    print(f"  Trips: {int(row['num_trips']):,}")
    print(f"  Avg Distance: {row['mean_trip_distance']:.2f} miles")
    print(f"  Max MTA Tax: ${row['max_mta_tax']:.2f}")
    print(f"  95th Percentile Fare: ${row['q95_fare_amount']:.2f}")
    print(f"  Min Tip: ${row['min_tip_amount']:.2f}")
    print(f"  Avg Passengers: {row['mean_passenger_count']:.0f}")
    print("-" * 40)

Sample Results (first 5 rows):
Year: 2021, Month: 05, Day: 27
  Trips: 3
  Avg Distance: 2.76 miles
  Max MTA Tax: $0.50
  95th Percentile Fare: $14.00
  Min Tip: $0.00
  Avg Passengers: 1
----------------------------------------
Year: 2021, Month: 05, Day: 05
  Trips: 4
  Avg Distance: 1.25 miles
  Max MTA Tax: $0.50
  95th Percentile Fare: $9.50
  Min Tip: $0.00
  Avg Passengers: 1
----------------------------------------
Year: 2021, Month: 04, Day: 14
  Trips: 3
  Avg Distance: 1.36 miles
  Max MTA Tax: $0.50
  95th Percentile Fare: $10.00
  Min Tip: $0.00
  Avg Passengers: 1
----------------------------------------
Year: 2021, Month: 04, Day: 01
  Trips: 5
  Avg Distance: 3.78 miles
  Max MTA Tax: $0.50
  95th Percentile Fare: $22.50
  Min Tip: $0.00
  Avg Passengers: 2
----------------------------------------
Year: 2021, Month: 03, Day: 31
  Trips: 67,533
  Avg Distance: 3.08 miles
  Max MTA Tax: $3.30
  95th Percentile Fare: $35.06
  Min Tip: $-30.56
  Avg Passengers: 2
---------

## Cleanup: Stop Spark Session

Remember to stop the Spark session when you're done to free up resources.

In [16]:
spark.stop()
print("✅ Spark session stopped")

✅ Spark session stopped
