# Understanding Hired Rides in NYC

## Project Setup

In [2]:
# all import statements needed for the project

import os
import bs4
from bs4 import BeautifulSoup
import matplotlib.pyplot as plt
import pandas as pd
import requests
import sqlalchemy as db
import re
import geopandas as gpd
from math import ceil
from urllib.parse import unquote
import glob
from sqlalchemy import text
import folium
from folium.plugins import HeatMap

In [3]:

TLC_URL = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

dataset_directory = "/Users/shaoziheng/Desktop/4501/project/datasets"
TAXI_ZONES_DIR = "/Users/shaoziheng/Desktop/4501/project/datasets/taxi_zones"
TAXI_ZONES_SHAPEFILE = f"{TAXI_ZONES_DIR}/taxi_zones.shp"
WEATHER_CSV_DIR = "/Users/shaoziheng/Desktop/4501/project/datasets/weather"

CRS = 4326  # coordinate reference system

# (lat, lon)
NEW_YORK_BOX_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))
LGA_BOX_COORDS = ((40.763589, -73.891745), (40.778865, -73.854838))
JFK_BOX_COORDS = ((40.639263, -73.795642), (40.651376, -73.766264))
EWR_BOX_COORDS = ((40.686794, -74.194028), (40.699680, -74.165205))

DATABASE_URL = "sqlite:///project.db"
DATABASE_SCHEMA_FILE = "schema.sql"
QUERY_DIRECTORY = "queries"

In [4]:
# Make sure the QUERY_DIRECTORY exists
try:
    os.mkdir(QUERY_DIRECTORY)
except Exception as e:
    if e.errno == 17:
        # the directory already exists
        pass
    else:
        raise

## Part 1: Data Preprocessing

### Load Taxi Zones

In [5]:
def load_taxi_zones(shapefile):
    """
    Load and preprocess a shapefile containing taxi zone data.

    Args:
        shapefile (str): Path to the shapefile containing taxi zone boundaries. 
                         The shapefile must include `LocationID` or similar 
                         geographic attributes.
                        
    Returns:
        A GeoDataFrame with added `longitude` and 
        `latitude` columns corresponding to the centroids
        of the taxi zones.
    """
    g = gpd.read_file(shapefile)
    g = g.to_crs(4326)
    g['longitude'] = g.centroid.x
    g['latitude'] = g.centroid.y
    return g

### Web scraping links for downloading files

In [6]:
# Fetch URL from the TLC page
def get_all_urls_from_tlc_page(taxi_page):
    """
    Fetch the HTML content from the provided TLC page URL and parse it with BeautifulSoup.

    Args:
        taxi_page (str): The URL of the TLC webpage containing the data links.

    Returns:
        BeautifulSoup: A parsed BeautifulSoup object containing the HTML content 
                       of the webpage for further processing.
    """
    response = requests.get(taxi_page)
    if response.status_code != 200:
        raise Exception(f"Failed to fetch page content: {response.status_code}")
    soup = BeautifulSoup(response.text, "html.parser")
    return soup

In [7]:
# Extract URLs for yellow taxi and HVFHV data from the TLC page

def filter_parquet_urls(soup):
    """
    Extract URLs for yellow taxi and HVFHV trip data in Parquet format from the TLC webpage.

    Args:
        soup (BeautifulSoup): A BeautifulSoup object containing the parsed HTML of the TLC webpage.

    Returns:
        tuple: A tuple containing two lists:
            - yellow_taxi_links (list): List of URLs for yellow taxi Parquet files.
            - hvfhv_links (list): List of URLs for HVFHV Parquet files.

    Description:
        - Identifies links that match the naming pattern for yellow taxi and HVFHV trip data.
        - Decodes encoded characters (e.g., `%20` -> space) in the URLs.
        - Filters links based on file naming conventions for years 2020-2024.
    """
    yellow_taxi_links = []
    yellow_links = soup.find_all('a', {'href': re.compile(r"yellow_tripdata_202[0-3]-\d{2}\.parquet|yellow_tripdata_2024-(0[1-8])\.parquet")})
    for link in yellow_links:
        url = link['href'].strip()  # Remove leading/trailing spaces
        url = unquote(url)  # Decode any encoded characters like %20
        yellow_taxi_links.append(url)
        
    hvfhv_links = []
    hvfhv_links_soup = soup.find_all('a', {'href': re.compile(r"fhvhv_tripdata_202[0-3]-\d{2}\.parquet|fhvhv_tripdata_2024-(0[1-8])\.parquet")})
    for link in hvfhv_links_soup:
        url = link['href'].strip()  # Remove leading/trailing spaces
        url = unquote(url)  # Decode any encoded characters like %20
        hvfhv_links.append(url)
        
    return yellow_taxi_links, hvfhv_links


In [8]:
# download  the Yellow Taxi & High-Volume For-Hire Vehicle (HVFHV) trip data parquet files and save them to directory

def download_parquet_file(urls, output_directory):
    """
    Downloads Parquet files from a list of URLs and saves them to the specified directory.

    Args:
        urls (list): List of URLs to download.
        output_directory (str): Path to the directory where files will be saved.

    Raises:
        Exception: If a file fails to download.
    """
    # Ensure the output directory exists
    os.makedirs(output_directory, exist_ok=True)

    for url in urls:
        file_name = os.path.basename(url)
        output_path = os.path.join(output_directory, file_name)

        try:
            print(f"Downloading {url}...")
            response = requests.get(url, stream=True)
            response.raise_for_status()  # Raise an error for failed requests

            # Write the file content to disk
            with open(output_path, 'wb') as file:
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:  # Filter out keep-alive chunks
                        file.write(chunk)

            print(f"Saved to {output_path}")
        except Exception as e:
            print(f"Failed to download {url}: {e}")

In [9]:
soup=get_all_urls_from_tlc_page(TLC_URL)
yellow_taxi_links, hvfhv_links = filter_parquet_urls(soup)

In [26]:
# Download Yellow Taxi files
download_parquet_file(yellow_taxi_links, os.path.join(dataset_directory, "yellow_tripdata"))

# Download Uber HVFHV files
download_parquet_file(hvfhv_links, os.path.join(dataset_directory, "fhvhv_tripdata"))


Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2024-01.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2024-02.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2024-03.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2024-04.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-05.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2024-05.parquet


Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2021-10.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-11.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2021-11.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-12.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2021-12.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-01.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2020-01.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-02.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/yellow_tripdata/yellow_tripdata_2020-02.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-03.parquet...


Saved to /Users/shaoziheng/Desktop/4501/project/datasets/fhvhv_tripdata/fhvhv_tripdata_2022-08.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-09.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/fhvhv_tripdata/fhvhv_tripdata_2022-09.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-10.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/fhvhv_tripdata/fhvhv_tripdata_2022-10.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-11.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/fhvhv_tripdata/fhvhv_tripdata_2022-11.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2022-12.parquet...
Saved to /Users/shaoziheng/Desktop/4501/project/datasets/fhvhv_tripdata/fhvhv_tripdata_2022-12.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-01.parquet...
Saved to /Users

### Calculate Sample Size

In [10]:
# calculate sample size using Cochran's formula with 95% CI and 5% marginal error.

def calculate_sample_size(population, confidence_level=0.95, margin_of_error=0.05):
    """
    Calculate the sample size using Cochran's formula, considering finite population correction.

    Args:
        population (int): Total population size for which the sample size needs to be calculated.
        confidence_level (float): Desired confidence level (default is 0.95 for 95% CI).
        margin_of_error (float): Allowable margin of error (default is 0.05 for 5%).

    Returns:
        int: The calculated sample size, rounded up to the nearest integer.

    Formula:
        Cochran's formula for infinite population:
            n0 = (Z^2 * p * (1 - p)) / e^2
        Where:
            - Z: Z-score corresponding to the confidence level.
            - p: Estimated proportion of the population (default: 0.5 for maximum variability).
            - e: Margin of error.

        Finite population correction for population size N:
            n = n0 / (1 + (n0 - 1) / N)
    """
    Z = {0.9: 1.645, 0.95: 1.96, 0.99: 2.576}[confidence_level]
    p = 0.5
    e = margin_of_error
    sample_size = (Z**2 * p * (1 - p)) / e**2
    if population < 1e6:  # Finite population correction
        sample_size = sample_size / (1 + (sample_size - 1) / population)
    return ceil(sample_size)

### Process Taxi Data

In [11]:
def apply_cleaning_and_combine_taxi(input_directory, output_file):
    """
    Apply the `clean_taxi_month` function to all Parquet files in the input directory
    and combine the cleaned and sampled files into a single large Parquet file.

    Args:
        input_directory (str): Path to the directory containing the Parquet files.
        output_file (str): Path to save the combined cleaned Parquet file.

    Returns:
        None
    """
    # Get all Parquet files in the directory
    parquet_files = glob.glob(f"{input_directory}/*.parquet")
    
    all_cleaned_data = []  # To store all cleaned DataFrames

    # Apply cleaning to each file
    for file_path in parquet_files:
        print(f"Processing file: {file_path}")
        
        # load taxi zone data
        taxi_zones = load_taxi_zones(TAXI_ZONES_SHAPEFILE)
        taxi_zones = taxi_zones[['LocationID', 'longitude', 'latitude', 'zone', 'borough']]
        df = pd.read_parquet(file_path)
        lat_map = dict(zip(taxi_zones['LocationID'], taxi_zones['latitude']))
        lon_map = dict(zip(taxi_zones['LocationID'], taxi_zones['longitude']))
        
        # Apply the transformations from clean_taxi_month
        df['pickup_latitude'] = df['PULocationID'].map(lat_map)
        df['pickup_longitude'] = df['PULocationID'].map(lon_map)
        df['dropoff_latitude'] = df['DOLocationID'].map(lat_map)
        df['dropoff_longitude'] = df['DOLocationID'].map(lon_map)

        # remove invalid location IDs - drop rows where any of the values are missing 
        df = df.dropna(subset=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'])
        
        # Filter out invalid data points (non-positive fare, tip, passenger, trip distance)
        df = df[(df['fare_amount'] > 0) & (df['tip_amount'] >= 0) & 
                (df['total_amount'] >= 0) & (df['passenger_count'] > 0) & 
                (df['trip_distance'] > 0)]
        

        # remove rows where pickup and dropoff latitudes/longitudes are the same or very close
        df = df[(df['pickup_latitude'] != df['dropoff_latitude']) & 
                (df['pickup_longitude'] != df['dropoff_longitude']) & 
                (abs(df['pickup_longitude'] - df['dropoff_longitude']) > 0.001)]
        
        # Normalize column names
        df.columns = [col.lower().strip().replace(' ', '_') for col in df.columns]

        # Remove trips that start and/or end outside of the NEW_YORK_BOX
        NEW_YORK_BOX_COORDS = [(40.560445, -74.242330), (40.908524, -73.717047)]
        df = df[(df['pickup_latitude'] >= NEW_YORK_BOX_COORDS[0][0]) & 
                (df['pickup_latitude'] <= NEW_YORK_BOX_COORDS[1][0]) & 
                (df['pickup_longitude'] >= NEW_YORK_BOX_COORDS[0][1]) & 
                (df['pickup_longitude'] <= NEW_YORK_BOX_COORDS[1][1])]
        df = df[(df['dropoff_latitude'] >= NEW_YORK_BOX_COORDS[0][0]) & 
                (df['dropoff_latitude'] <= NEW_YORK_BOX_COORDS[1][0]) & 
                (df['dropoff_longitude'] >= NEW_YORK_BOX_COORDS[0][1]) & 
                (df['dropoff_longitude'] <= NEW_YORK_BOX_COORDS[1][1])]
        
        # remove unnecessary columns 
        df = df[['tpep_pickup_datetime','tpep_dropoff_datetime','fare_amount','tip_amount', 'extra', 'improvement_surcharge',
                  'congestion_surcharge', 'airport_fee', 'mta_tax', 'tolls_amount', 'pickup_latitude','pickup_longitude',
                 'dropoff_latitude','dropoff_longitude', 'trip_distance'
                ]]
        
        # Convert data types after filtering
        df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'], errors='coerce')
        df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'], errors='coerce')
        df['fare_amount'] = df['fare_amount'].astype(float, errors='ignore')
        df['tip_amount'] = df['tip_amount'].astype(float, errors='ignore')
        df['extra'] = df['extra'].astype(float, errors='ignore')
        df['improvement_surcharge'] = df['improvement_surcharge'].astype(float, errors='ignore')
        df['congestion_surcharge'] = df['congestion_surcharge'].astype(float, errors='ignore')
        df['airport_fee'] = df['airport_fee'].fillna(0).astype(float, errors='ignore')
        df['mta_tax'] = df['mta_tax'].astype(float, errors='ignore')
        df['tolls_amount'] = df['tolls_amount'].astype(float, errors='ignore')
        df['pickup_latitude'] = df['pickup_latitude'].astype(float, errors='ignore')
        df['pickup_longitude'] = df['pickup_longitude'].astype(float, errors='ignore')
        df['dropoff_latitude'] = df['dropoff_latitude'].astype(float, errors='ignore')
        df['dropoff_longitude'] = df['dropoff_longitude'].astype(float, errors='ignore')
        df['trip_distance'] = df['trip_distance'].astype(float, errors='ignore')
        
        
        
        # calculate sample size and generate sampled_df
        population_size = len(df)
        sample_size = calculate_sample_size(population_size, confidence_level=0.95, margin_of_error=0.05)
        sampled_df = df.sample(n=sample_size, random_state=42)

        # Append cleaned and sampled data to the list
        all_cleaned_data.append(sampled_df)
        print(f"Finished processing file: {file_path}")
    
    # Combine all cleaned DataFrames
    combined_df = pd.concat(all_cleaned_data, ignore_index=True)
    print(f"Combined all cleaned data into a single DataFrame with {len(combined_df)} rows.")

    # Save the combined DataFrame to a Parquet file
    combined_df.to_parquet(output_file, index=False)
    print(f"Saved combined data to {output_file}")


In [19]:
taxi_input_directory = f"{dataset_directory}/yellow_tripdata"
taxi_output_file = f"{dataset_directory}/combined_taxi_data.parquet"
apply_cleaning_and_combine_taxi(taxi_input_directory, taxi_output_file)

In [13]:
taxi_data=pd.read_parquet(taxi_output_file)

In [14]:
taxi_data

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,fare_amount,tip_amount,extra,improvement_surcharge,congestion_surcharge,airport_fee,mta_tax,tolls_amount,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,trip_distance
0,2023-06-27 16:50:19,2023-06-27 16:55:31,6.5,1.00,2.5,1.0,2.5,0.0,0.5,0.00,40.756687,-73.972356,40.758027,-73.977698,0.70
1,2023-06-28 14:01:22,2023-06-28 14:16:21,19.8,0.00,0.0,1.0,2.5,0.0,0.5,0.00,40.706808,-74.007496,40.734575,-74.002875,3.73
2,2023-06-09 20:04:55,2023-06-09 20:12:42,9.3,3.16,2.5,1.0,2.5,0.0,0.5,0.00,40.748427,-73.999917,40.735035,-74.008984,1.06
3,2023-06-04 01:03:00,2023-06-04 01:08:15,6.5,2.00,3.5,1.0,2.5,0.0,0.5,0.00,40.734575,-74.002875,40.723888,-74.001537,0.70
4,2023-06-30 08:07:36,2023-06-30 08:13:14,7.9,1.00,2.5,1.0,2.5,0.0,0.5,0.00,40.780436,-73.957011,40.790010,-73.945750,0.80
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
21550,2023-03-01 22:21:47,2023-03-01 22:36:55,19.8,6.27,1.0,1.0,2.5,0.0,0.5,6.55,40.753512,-73.988786,40.729506,-73.949540,3.94
21551,2023-03-01 15:57:02,2023-03-01 16:06:07,10.0,0.00,0.0,1.0,0.0,0.0,0.5,0.00,40.818257,-73.940771,40.801169,-73.937345,0.88
21552,2023-03-06 17:46:31,2023-03-06 17:53:27,8.6,0.00,2.5,1.0,2.5,0.0,0.5,0.00,40.732579,-73.994305,40.742278,-73.996971,1.07
21553,2023-03-22 17:44:43,2023-03-22 17:55:28,10.7,0.00,5.0,1.0,2.5,0.0,0.5,0.00,40.775965,-73.987645,40.762252,-73.989844,1.10


In [15]:
taxi_data.head()

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,fare_amount,tip_amount,extra,improvement_surcharge,congestion_surcharge,airport_fee,mta_tax,tolls_amount,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,trip_distance
0,2023-06-27 16:50:19,2023-06-27 16:55:31,6.5,1.0,2.5,1.0,2.5,0.0,0.5,0.0,40.756687,-73.972356,40.758027,-73.977698,0.7
1,2023-06-28 14:01:22,2023-06-28 14:16:21,19.8,0.0,0.0,1.0,2.5,0.0,0.5,0.0,40.706808,-74.007496,40.734575,-74.002875,3.73
2,2023-06-09 20:04:55,2023-06-09 20:12:42,9.3,3.16,2.5,1.0,2.5,0.0,0.5,0.0,40.748427,-73.999917,40.735035,-74.008984,1.06
3,2023-06-04 01:03:00,2023-06-04 01:08:15,6.5,2.0,3.5,1.0,2.5,0.0,0.5,0.0,40.734575,-74.002875,40.723888,-74.001537,0.7
4,2023-06-30 08:07:36,2023-06-30 08:13:14,7.9,1.0,2.5,1.0,2.5,0.0,0.5,0.0,40.780436,-73.957011,40.79001,-73.94575,0.8


In [16]:
taxi_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 21555 entries, 0 to 21554
Data columns (total 15 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   tpep_pickup_datetime   21555 non-null  datetime64[ns]
 1   tpep_dropoff_datetime  21555 non-null  datetime64[ns]
 2   fare_amount            21555 non-null  float64       
 3   tip_amount             21555 non-null  float64       
 4   extra                  21555 non-null  float64       
 5   improvement_surcharge  21555 non-null  float64       
 6   congestion_surcharge   21555 non-null  float64       
 7   airport_fee            21555 non-null  float64       
 8   mta_tax                21555 non-null  float64       
 9   tolls_amount           21555 non-null  float64       
 10  pickup_latitude        21555 non-null  float64       
 11  pickup_longitude       21555 non-null  float64       
 12  dropoff_latitude       21555 non-null  float64       
 13  d

In [17]:
taxi_data.describe()

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,fare_amount,tip_amount,extra,improvement_surcharge,congestion_surcharge,airport_fee,mta_tax,tolls_amount,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,trip_distance
count,21555,21555,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0,21555.0
mean,2022-05-02 02:22:55.033170688,2022-05-02 02:40:03.177824256,15.49832,2.81735,1.273732,0.55325,2.326374,0.090478,0.498056,0.46921,40.753237,-73.967072,40.755621,-73.97146,3.319831
min,2020-01-01 03:06:55,2020-01-01 03:13:30,0.01,0.0,0.0,0.0,0.0,0.0,0.0,0.0,40.576961,-74.174,40.571769,-74.174,0.01
25%,2021-03-01 17:26:52,2021-03-01 17:35:34,7.5,0.18,0.0,0.3,2.5,0.0,0.5,0.0,40.740438,-73.989844,40.740337,-73.989844,1.16
50%,2022-05-01 11:01:50,2022-05-01 11:21:35,11.0,2.26,1.0,0.3,2.5,0.0,0.5,0.0,40.758027,-73.977698,40.758027,-73.977698,1.87
75%,2023-07-01 00:33:29.500000,2023-07-01 00:48:51.500000,17.0,3.56,2.5,1.0,2.5,0.0,0.5,0.0,40.773633,-73.965146,40.774375,-73.959635,3.32
max,2024-08-31 21:58:30,2024-08-31 22:21:51,159.8,99.99,11.75,1.0,2.5,1.75,0.8,40.0,40.897932,-73.739473,40.899529,-73.735555,57.7
std,,,13.730091,3.239298,1.530403,0.336887,0.63556,0.361685,0.031472,1.891135,0.031175,0.045508,0.032247,0.035536,4.106375


### Processing Uber Data

In [21]:
def apply_cleaning_and_combine_uber(input_directory, intermediate_directory, output_file):
    """
    Process each Parquet file individually and save cleaned data as intermediate files.
    Then combine all intermediate files into a single large Parquet file.

    Args:
        input_directory (str): Path to the directory containing the Parquet files.
        intermediate_directory (str): Path to save intermediate cleaned files.
        output_file (str): Path to save the final combined Parquet file.
        confidence_level (float): Confidence level for sample size calculation.
        margin_of_error (float): Margin of error for sample size calculation.

    Returns:
        None
    """
    # Ensure the intermediate directory exists
    os.makedirs(intermediate_directory, exist_ok=True)

    # Get all Parquet files in the directory
    parquet_files = glob.glob(f"{input_directory}/*.parquet")
    
    for file_path in parquet_files:
        try:
            print(f"Processing file: {file_path}")
            
            # Load the file
            df = pd.read_parquet(file_path)
            
            # Filter Uber rides
            df = df[df['hvfhs_license_num'] == 'HV0003']

            # Load taxi zones for mapping
            taxi_zones = load_taxi_zones(TAXI_ZONES_SHAPEFILE)
            taxi_zones = taxi_zones[['LocationID', 'longitude', 'latitude', 'zone', 'borough']]
            lat_map = dict(zip(taxi_zones['LocationID'], taxi_zones['latitude']))
            lon_map = dict(zip(taxi_zones['LocationID'], taxi_zones['longitude']))

            # Map latitude and longitude
            df['pickup_latitude'] = df['PULocationID'].map(lat_map)
            df['pickup_longitude'] = df['PULocationID'].map(lon_map)
            df['dropoff_latitude'] = df['DOLocationID'].map(lat_map)
            df['dropoff_longitude'] = df['DOLocationID'].map(lon_map)

            # Drop rows with missing latitude and longitude
            df = df.dropna(subset=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'])

            # Filter invalid data points
            df = df[(df['trip_miles'] > 0) & (df['trip_time'] > 0) &
                    (df['base_passenger_fare'] >= 0) & (df['tolls'] >= 0) &
                    (df['bcf'] >= 0) & (df['sales_tax'] >= 0) & (df['tips'] >= 0) &
                    (df['driver_pay'] >= 0) & (df['congestion_surcharge'] >= 0)]

            # Remove trips that start and/or end outside of the NEW_YORK_BOX
            NEW_YORK_BOX_COORDS = [(40.560445, -74.242330), (40.908524, -73.717047)]
            df = df[(df['pickup_latitude'] >= NEW_YORK_BOX_COORDS[0][0]) & 
                    (df['pickup_latitude'] <= NEW_YORK_BOX_COORDS[1][0]) & 
                    (df['pickup_longitude'] >= NEW_YORK_BOX_COORDS[0][1]) & 
                    (df['pickup_longitude'] <= NEW_YORK_BOX_COORDS[1][1])]
            df = df[(df['dropoff_latitude'] >= NEW_YORK_BOX_COORDS[0][0]) & 
                    (df['dropoff_latitude'] <= NEW_YORK_BOX_COORDS[1][0]) & 
                    (df['dropoff_longitude'] >= NEW_YORK_BOX_COORDS[0][1]) & 
                    (df['dropoff_longitude'] <= NEW_YORK_BOX_COORDS[1][1])]

             # Normalize column names
            df.columns = [col.lower().strip().replace(' ', '_') for col in df.columns]

            # Drop unnecessary columns
            df = df[['pickup_datetime',
                     'dropoff_datetime',
                     'trip_miles',
                     'trip_time',
                     'base_passenger_fare',
                     'tolls',
                     'bcf',
                     'sales_tax',
                     'congestion_surcharge',
                     'airport_fee',
                     'tips',
                     'pickup_latitude',
                     'pickup_longitude',
                     'dropoff_latitude',
                     'dropoff_longitude']]
            
            # convert datatype after filtering
            df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'], errors='coerce')
            df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'], errors='coerce')
            df['trip_miles'] = df['trip_miles'].astype(float)
            df['trip_time'] = df['trip_time'].astype(float)
            df['base_passenger_fare'] = df['base_passenger_fare'].astype(float)
            df['tolls'] = df['tolls'].astype(float)
            df['bcf'] = df['bcf'].astype(float)
            df['sales_tax'] = df['sales_tax'].astype(float)
            df['congestion_surcharge'] = df['congestion_surcharge'].astype(float)
            df['airport_fee'] = df['airport_fee'].fillna(0).astype(float)
            df['tips'] = df['tips'].astype(float)
            df['pickup_latitude'] = df['pickup_latitude'].astype(float)
            df['pickup_longitude'] = df['pickup_longitude'].astype(float)
            df['dropoff_latitude'] = df['dropoff_latitude'].astype(float)
            df['dropoff_longitude'] = df['dropoff_longitude'].astype(float)

            # calculate sample size and generate sampled_df
            population_size = len(df)
            sample_size = calculate_sample_size(population_size, confidence_level=0.95, margin_of_error=0.05)
            sampled_df = df.sample(n=sample_size, random_state=42)

            # Save cleaned file as intermediate Parquet
            intermediate_file = os.path.join(intermediate_directory, os.path.basename(file_path))
            sampled_df.to_parquet(intermediate_file, index=False)
            print(f"Saved intermediate cleaned data to: {intermediate_file}")
        
        except Exception as e:
            print(f"Error processing file {file_path}: {e}")

    # Combine all intermediate files into the final Parquet file
    intermediate_files = glob.glob(f"{intermediate_directory}/*.parquet")
    all_dataframes = []
    
    for intermediate_file in intermediate_files:
        try:
            print(f"Loading intermediate file: {intermediate_file}")
            df = pd.read_parquet(intermediate_file)
            all_dataframes.append(df)
        except Exception as e:
            print(f"Error loading intermediate file {intermediate_file}: {e}")


    # Concatenate all cleaned intermediate files and save as one large file
    if all_dataframes:
        combined_df = pd.concat(all_dataframes, ignore_index=True)
        combined_df.to_parquet(output_file, index=False)
        print(f"Saved combined data to: {output_file}")
    else:
        print("No valid data to combine.")


In [22]:
uber_input_directory = f"{dataset_directory}/fhvhv_tripdata"
uber_intermediate_directory = f"{dataset_directory}/intermediate_cleaned_uber"
uber_output_file = f"{dataset_directory}/combined_uber_data.parquet"

apply_cleaning_and_combine_uber(uber_input_directory, uber_intermediate_directory, uber_output_file)

In [23]:
uber_data = pd.read_parquet(uber_output_file)

In [24]:
uber_data.head()

Unnamed: 0,pickup_datetime,dropoff_datetime,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude
0,2021-03-10 07:37:01,2021-03-10 07:48:59,2.15,718.0,10.71,0.0,0.32,0.95,0.0,0.0,0.0,40.688721,-73.855767,40.694542,-73.830924
1,2021-03-18 08:21:36,2021-03-18 08:33:29,1.34,713.0,9.79,0.0,0.0,0.0,2.75,0.0,2.0,40.736823,-73.984052,40.753512,-73.988786
2,2021-03-17 07:48:16,2021-03-17 08:02:38,4.56,862.0,18.28,0.0,0.55,1.62,0.0,0.0,0.0,40.742671,-73.754622,40.783332,-73.785972
3,2021-03-14 10:52:12,2021-03-14 11:03:28,2.69,676.0,15.16,0.0,0.45,1.35,0.0,0.0,0.0,40.876512,-73.89562,40.882403,-73.910665
4,2021-03-28 11:50:29,2021-03-28 11:57:33,0.68,424.0,14.15,0.0,0.42,1.26,2.75,0.0,0.0,40.717772,-74.00788,40.717772,-74.00788


In [26]:
uber_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 21560 entries, 0 to 21559
Data columns (total 15 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   pickup_datetime       21560 non-null  datetime64[ns]
 1   dropoff_datetime      21560 non-null  datetime64[ns]
 2   trip_miles            21560 non-null  float64       
 3   trip_time             21560 non-null  float64       
 4   base_passenger_fare   21560 non-null  float64       
 5   tolls                 21560 non-null  float64       
 6   bcf                   21560 non-null  float64       
 7   sales_tax             21560 non-null  float64       
 8   congestion_surcharge  21560 non-null  float64       
 9   airport_fee           21560 non-null  float64       
 10  tips                  21560 non-null  float64       
 11  pickup_latitude       21560 non-null  float64       
 12  pickup_longitude      21560 non-null  float64       
 13  dropoff_latitude

In [25]:
uber_data.describe()

Unnamed: 0,pickup_datetime,dropoff_datetime,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude
count,21560,21560,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0,21560.0
mean,2022-05-02 01:18:57.684322816,2022-05-02 01:36:50.816744192,4.379178,1073.391141,21.124263,0.628066,0.616259,1.883285,1.048782,0.133291,0.79258,40.73792,-73.934695,40.737407,-73.934451
min,2020-01-01 00:21:55,2020-01-01 00:40:45,0.04,68.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,40.561994,-74.186419,40.561994,-74.174
25%,2021-03-01 02:25:26.750000128,2021-03-01 02:43:57.500000,1.55,560.0,10.56,0.0,0.29,0.92,0.0,0.0,0.0,40.691201,-73.984196,40.691201,-73.984052
50%,2022-04-30 22:29:27,2022-04-30 22:48:34.500000,2.82,880.0,16.59,0.0,0.47,1.46,0.0,0.0,0.0,40.737698,-73.948789,40.737698,-73.947442
75%,2023-07-01 00:01:51.249999872,2023-07-01 00:14:48.500000,5.57,1378.0,26.19,0.0,0.76,2.36,2.75,0.0,0.0,40.774375,-73.899735,40.775932,-73.898956
max,2024-08-31 23:07:13,2024-08-31 23:19:20,42.78,8862.0,213.74,46.21,6.14,19.81,2.75,5.0,50.0,40.899529,-73.726656,40.899529,-73.726656
std,,,4.285147,740.388956,15.781332,2.525454,0.500672,1.435071,1.330371,0.566746,2.399464,0.068442,0.064763,0.068879,0.067801
