# Understanding Hired Rides in NYC

_[Project prompt](https://docs.google.com/document/d/1VERPjEZcC1XSs4-02aM-DbkNr_yaJVbFjLJxaYQswqA/edit#)_

Done by Group7 : Jiacong Yuan ( jy3459 ) and Wenjie Lin ( wl2792 )

## Project Setup

In [1]:
# all import statements needed for the project, for example:

import os

import bs4
import matplotlib.pyplot as plt
import pandas as pd
import requests
import sqlalchemy as db
import geopandas as gpd
import sqlite3
import numpy as np
import math
from bs4 import BeautifulSoup
import re
from functools import partial
from sqlalchemy import text

In [2]:
# any constants you might need; some have been added for you, and 
# some you need to fill in

TLC_URL = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

TAXI_ZONES_DIR = ""
TAXI_ZONES_SHAPEFILE = f"{TAXI_ZONES_DIR}/taxi_zones.shp"
WEATHER_CSV_DIR = ""

CRS = 4326  # coordinate reference system

# (lat, lon)
NEW_YORK_BOX_COORDS = ((40.560445, -74.242330), (40.908524, -73.717047))
LGA_BOX_COORDS = ((40.763589, -73.891745), (40.778865, -73.854838))
JFK_BOX_COORDS = ((40.639263, -73.795642), (40.651376, -73.766264))
EWR_BOX_COORDS = ((40.686794, -74.194028), (40.699680, -74.165205))

DATABASE_URL = "sqlite:///project.db"
DATABASE_SCHEMA_FILE = "schema.sql"
QUERY_DIRECTORY = "queries"

In [3]:
# Make sure the QUERY_DIRECTORY exists
try:
    os.mkdir(QUERY_DIRECTORY)
except Exception as e:
    if e.errno == 17:
        # the directory already exists
        pass
    else:
        raise

## Part 1: Data Preprocessing

### Load Taxi Zones and Lookup Coordinates
This section demonstrates how to load a geospatial dataset representing NYC taxi zones and find the geographic coordinates (latitude and longitude) for a specific taxi zone using its unique Location ID.



### Step 1: Load the Taxi Zones Data

#### **What Happens in This Step**

1. **Input File Path**  
   - The file path for the shapefile is specified. It should contain a map of NYC taxi zones, each associated with a unique `LocationID`.

2. **Check for File Existence**  
   - The program first verifies if the shapefile exists at the specified path. If the file is missing, it raises an error and stops the execution.

3. **Load the Shapefile**  
   - The file is loaded into a GeoDataFrame using GeoPandas, which is specialized for working with spatial data.

4. **Validate the Data**  
   - It ensures that the shapefile contains the required column `LocationID` (a unique identifier for each taxi zone).  


In [7]:
def load_taxi_zones(shapefile_path):
    """
    Load the Taxi Zone shapefile into a GeoDataFrame.

    Parameters:
        shapefile_path (str): Path to the Taxi Zone shapefile.

    Returns:
        gpd.GeoDataFrame: A GeoDataFrame containing Taxi Zone data.

    Raises:
        FileNotFoundError: If the shapefile does not exist.
        RuntimeError: If there is an error loading the shapefile.
    """
    if not os.path.exists(shapefile_path):
        raise FileNotFoundError(f"Shapefile not found at: {shapefile_path}")

    try:
        taxi_zones_gdf = gpd.read_file(shapefile_path)

        # Ensure required columns exist
        if 'LocationID' not in taxi_zones_gdf.columns:
            raise ValueError("Shapefile must contain 'LocationID' column.")

        # Ensure CRS is EPSG:4326
        if taxi_zones_gdf.crs is None or taxi_zones_gdf.crs.to_epsg() != 4326:
            taxi_zones_gdf = taxi_zones_gdf.to_crs(epsg=4326)

        return taxi_zones_gdf

    except Exception as e:
        raise RuntimeError(f"Failed to load shapefile: {e}")


In [8]:
TAXI_ZONES_DIR = r"C:\Users\wenji\OneDrive\Project\taxi_zones"
TAXI_ZONES_SHAPEFILE = f"{TAXI_ZONES_DIR}/taxi_zones.shp"
if not os.path.exists(TAXI_ZONES_SHAPEFILE):
    raise FileNotFoundError(f"Taxi Zone Shapefile not found at: {TAXI_ZONES_SHAPEFILE}")
else:
    print(f"Taxi Zone Shapefile found at: {TAXI_ZONES_SHAPEFILE}")

Taxi Zone Shapefile found at: C:\Users\wenji\OneDrive\Project\taxi_zones/taxi_zones.shp


In [9]:
try:
    taxi_zones_gdf = load_taxi_zones(TAXI_ZONES_SHAPEFILE)
    print(taxi_zones_gdf.head())
except FileNotFoundError as e:
    print(f"File not found: {e}")
except RuntimeError as e:
    print(f"Error loading shapefile: {e}")

   OBJECTID  Shape_Leng  Shape_Area                     zone  LocationID  \
0         1    0.116357    0.000782           Newark Airport           1   
1         2    0.433470    0.004866              Jamaica Bay           2   
2         3    0.084341    0.000314  Allerton/Pelham Gardens           3   
3         4    0.043567    0.000112            Alphabet City           4   
4         5    0.092146    0.000498            Arden Heights           5   

         borough                                           geometry  
0            EWR  POLYGON ((-74.18445 40.695, -74.18449 40.6951,...  
1         Queens  MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ...  
2          Bronx  POLYGON ((-73.84793 40.87134, -73.84725 40.870...  
3      Manhattan  POLYGON ((-73.97177 40.72582, -73.97179 40.725...  
4  Staten Island  POLYGON ((-74.17422 40.56257, -74.17349 40.562...  


### Step 2: Lookup Coordinates for a Taxi Zone

#### **What Happens in This Step**

1. **Input Location ID**  
   - The user provides a `LocationID`, a unique identifier for a specific taxi zone.

2. **Filter the Data**  
   - The program searches the GeoDataFrame to find the zone with the matching `LocationID`.

3. **Extract the Geometry**  
   - Once the zone is identified, its shape (geometry) is used to calculate the centroid (center point).

4. **Output Coordinates**  
   - The program returns the coordinates (latitude and longitude) of the centroid.  
   - If the `LocationID` is not found in the data, it returns `None`.


In [11]:
def lookup_coords_for_taxi_zone_id(zone_loc_id, loaded_taxi_zones):
    """
    Lookup the latitude and longitude for a given Taxi Zone Location ID.

    Parameters:
        zone_loc_id (int): Taxi Zone Location ID.
        loaded_taxi_zones (gpd.GeoDataFrame): GeoDataFrame containing Taxi Zone data.

    Returns:
        tuple: (latitude, longitude) of the centroid of the zone geometry, 
               or None if the Location ID is not found.
    """
    try:
        # Filter GeoDataFrame for the given Location ID
        zone = loaded_taxi_zones[loaded_taxi_zones['LocationID'] == zone_loc_id]
        
        if zone.empty:
            return None  # Return None if the Location ID is not found
        
        # Extract the geometry and calculate the centroid
        centroid = zone.iloc[0].geometry.centroid
        
        # Return the centroid's coordinates as (latitude, longitude)
        return centroid.y, centroid.x  # (latitude, longitude)
    
    except Exception as e:
        raise RuntimeError(f"Error looking up coordinates for Zone ID {zone_loc_id}: {e}")



In [19]:
# Lookup coordinates for a specific Taxi Zone ID
zone_id = 263
coords = lookup_coords_for_taxi_zone_id(zone_id, taxi_zones_gdf)

if coords:
    print(f"Coordinates for Zone ID {zone_id}: Latitude = {coords[0]}, Longitude = {coords[1]}")
else:
    print(f"Zone ID {zone_id} not found in the Taxi Zone data.")

Coordinates for Zone ID 263: Latitude = 40.77876585543437, Longitude = -73.951009874818


### Calculate Sample Size
This section demonstrates how to calculate the appropriate sample size for analyzing a dataset using Cochran's formula, specifically tailored for finite populations like taxi trip data.



To calculate the required sample size, we use **Cochran's formula**, which accounts for the population size and desired confidence level, margin of error, and proportion.

1. **Input Parameters**  
   - Population size: Total number of records in the dataset.  
   - Confidence level: Typically 90%, 95%, or 99% (e.g., 95% confidence).  
   - Margin of error: The acceptable error range for the results (e.g., ±5%).  
   - Proportion: Estimated proportion of the population with the desired attribute (default is 0.5 for maximum variability).

2. **Cochran's Formula for Infinite Populations**  
   - The formula calculates the sample size assuming an infinite population:
     $$
     n_0 = \frac{Z^2 \cdot p \cdot (1 - p)}{E^2}
     $$
     Where:
     - \( n_0 \): Initial sample size for infinite population.
     - \( Z \): Z-score based on the confidence level (e.g., 1.96 for 95% confidence).
     - \( p \): Proportion of the population (default 0.5).  
     - \( E \): Margin of error (e.g., 0.05 for ±5%).

3. **Adjust for Finite Population Size**  
   - If the population is finite, the sample size is adjusted using the correction formula:
     $$
     n = \frac{n_0}{1 + \frac{n_0 - 1}{N}}
     $$
     Where:
     - \( n \): Final sample size for the finite population.  
     - \( N \): Total population size.

4. **Output**  
   - The final sample size is rounded up to ensure a whole number of samples is selected.  
   - This ensures the sample is statistically valid for the specified parameters.


In [23]:
def calculate_sample_size(population, confidence_level=0.95, margin_of_error=0.05, proportion=0.5):
    """
    Calculate the sample size using Cochran's formula for finite populations.

    Parameters:
        population (int): The size of the population (e.g., number of trips in a month).
        confidence_level (float): The confidence level (default is 0.95 for 95% confidence).
        margin_of_error (float): The margin of error (default is 0.05 for ±5% error).
        proportion (float): The estimated proportion of the population with the desired attribute (default is 0.5).

    Returns:
        int: The calculated sample size.
    """
    # Z-score for the given confidence level
    z_scores = {0.9: 1.645, 0.95: 1.96, 0.99: 2.576}
    if confidence_level not in z_scores:
        raise ValueError("Unsupported confidence level. Use 0.9, 0.95, or 0.99.")
    Z = z_scores[confidence_level]
    
    # Cochran's formula for an infinite population
    n_0 = (Z**2 * proportion * (1 - proportion)) / (margin_of_error**2)
    
    # Adjust for finite population size
    if population < n_0:
        sample_size = population  # If the population is small, the entire dataset is needed
    else:
        sample_size = n_0 / (1 + (n_0 - 1) / population)
    
    return math.ceil(sample_size)

In [25]:
yellow_taxi_df = pd.read_parquet(r"C:\Users\wenji\OneDrive\Project\downloads yellow taxi\yellow_tripdata_2020-01.parquet")

population_size = len(yellow_taxi_df)
print(f"Population size: {population_size}")

Population size: 6405008


### Common Functions

### Step 1: Define a function `get_all_urls_from_tlc_page` that extracts all URLs from a specified webpage.

1. **Input**:  
   - The function takes a webpage URL (`taxi_page`) as input.

2. **Request the Webpage**:  
   - Using the `requests` library, the HTML content of the page is fetched.

3. **Parse the HTML**:  
   - The HTML content is parsed using `BeautifulSoup` from the `bs4` library.

4. **Extract URLs**:  
   - All anchor (`<a>`) tags with `href` attributes are located, and their links are extracted into a list.

5. **Output**:  
   - A list of extracted URLs is returned.

In [29]:
def get_all_urls_from_tlc_page(taxi_page):
    """
    Extract all URLs from the TLC trip record data page.

    Parameters:
        taxi_page (str): URL of the TLC webpage.

    Returns:
        list: A list of all extracted URLs from the webpage.
    """
    try:
        # Request the HTML page
        response = requests.get(taxi_page)
        response.raise_for_status()  # Check if the request was successful

        # Parse the HTML content
        soup = BeautifulSoup(response.text, 'html.parser')

        # Find all links on the page
        links = [a['href'] for a in soup.find_all('a', href=True)]

        return links
    except Exception as e:
        raise RuntimeError(f"Error fetching or parsing the TLC page: {e}")



In [31]:
# TLC Trip Record Data Page URL
URL = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

# Extract all URLs
try:
    all_urls = get_all_urls_from_tlc_page(URL)
    print(f"Extracted {len(all_urls)} URLs:")
    for url in all_urls:  # Print all links in  URL
        print(url)
except RuntimeError as e:
    print(e)


Extracted 563 URLs:
http://www1.nyc.gov
/311/index.page
/home/search/index.page
#
/site/tlc/index.page
http://www1.nyc.gov/home/text-size.page
#
/site/tlc/index.page
/site/tlc/about/about-tlc.page
/site/tlc/passengers/your-ride.page
/site/tlc/drivers/get-a-drivers-license.page
/site/tlc/vehicles/get-a-vehicle-license.page
/site/tlc/businesses/yellow-cab.page
/site/tlc/tlc-online/tlc-online.page
/site/tlc/about/about-tlc.page
/site/tlc/about/data-and-research.page
/site/tlc/about/tlc-initiatives.page
/site/tlc/about/contact-tlc.page
/site/tlc/about/data.page
/site/tlc/about/pilot-programs.page
/site/tlc/about/industry-reports.page
/site/tlc/about/tlc-trip-record-data.page
/site/tlc/about/request-data.page
#
#
#
#
#
javascript:expandAll();
javascript:collapseAll();
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet 
https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2024-01.parquet 
https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2024

### Step 2: The `filter_parquet_urls` function uses regular expressions to match and filter URLs for Yellow Taxi and HVFHV datasets. 

1. **Input**:  
   - A list of URLs (`all_urls`) extracted from the TLC page.

2. **Regular Expressions**:  
   - Patterns are defined to match Parquet files for:
     - Yellow Taxi datasets (`yellow_tripdata`).
     - HVFHV datasets (`fhvhv_tripdata`).
   - The date ranges included are:
     - 2020 (January to December).
     - 2021–2023 (all months).
     - 2024 (January to August).

3. **Filter URLs**:  
   - URLs matching the patterns are extracted into separate lists.

4. **Output**:  
   - Two lists: one for Yellow Taxi dataset links, and another for HVFHV dataset links.

In [34]:
def filter_parquet_urls(all_urls):
    """
    Filter Yellow Taxi and HVFHV Parquet file URLs for the specified date range.

    Parameters:
        all_urls (list): A list of all URLs extracted from the TLC page.

    Returns:
        tuple: Two lists containing filtered URLs for Yellow Taxi and HVFHV datasets.
    """
    # Compile regular expressions to match the desired Parquet files
    yellow_taxi_pattern = re.compile(
        r'yellow_tripdata_2020-(0[1-9]|1[0-2])\.parquet|'
        r'yellow_tripdata_202[1-3]-\d{2}\.parquet|'
        r'yellow_tripdata_2024-(0[1-8])\.parquet'
    )
    fhvhv_pattern = re.compile(
        r'fhvhv_tripdata_2020-(0[1-9]|1[0-2])\.parquet|'
        r'fhvhv_tripdata_202[1-3]-\d{2}\.parquet|'
        r'fhvhv_tripdata_2024-(0[1-8])\.parquet'
    )

    # Filter URLs using the patterns
    yellow_taxi_links = [url for url in all_urls if yellow_taxi_pattern.search(url)]
    fhvhv_links = [url for url in all_urls if fhvhv_pattern.search(url)]

    return yellow_taxi_links, fhvhv_links


In [36]:
# Example: Extract URLs from the TLC page
all_urls = get_all_urls_from_tlc_page("https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page")

# Filter the Parquet file URLs
yellow_taxi_links, fhvhv_links = filter_parquet_urls(all_urls)

# Print the filtered links
print("Yellow Taxi Links:")
for link in yellow_taxi_links:
    print(link)

print("\nFHVHV Links:")
for link in fhvhv_links:
    print(link)
    

Yellow Taxi Links:
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet 
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-02.parquet 
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-03.parquet 
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-04.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-05.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-06.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-07.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-08.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-02.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-03.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-04.parquet
https://d37ci6vzur

### Step 3: Define the Download Function

The `download_files` function takes a list of file URLs and downloads them to a specified directory.

1. **Input Parameters**:
   - `file_links`: A list of URLs for the files to download.
   - `save_dir`: Path to the directory where files will be saved.

2. **Ensure Save Directory Exists**:
   - The function creates the directory if it does not already exist using `os.makedirs`.

3. **Download Files**:
   - For each URL in `file_links`:
     - The file name is extracted from the URL.
     - The file is downloaded using `requests` and saved locally in the specified directory.
     - The function prints the status of each download.

4. **Handle Errors**:
   - If any error occurs during the download, an appropriate error message is printed.

5. **Use Download Function**:
   - we use download function to download parquet file from yellow_taxi_links, fhvhv_links



In [39]:
def download_files(file_links, save_dir):
    """
    Downloads files from a list of URLs and saves them to a specified directory.

    Args:
        file_links (list): List of URLs for the files to download.
        save_dir (str): Path to the directory where files will be saved.

    Returns:
        None
    """
    # Ensure the save directory exists
    os.makedirs(save_dir, exist_ok=True)

    for link in file_links:
        try:
            # Extract file name from the URL
            file_name = link.split("/")[-1]
            file_path = os.path.join(save_dir, file_name)
            
            # Download the file
            print(f"Downloading {file_name}...")
            response = requests.get(link, stream=True)
            response.raise_for_status()  # Raise an error for bad status codes

            # Save the file locally
            with open(file_path, 'wb') as file:
                for chunk in response.iter_content(chunk_size=8192):
                    file.write(chunk)
            print(f"Downloaded: {file_name}")
        
        except requests.exceptions.RequestException as e:
            print(f"Failed to download {link}: {e}")



In [None]:
# Example usage

file_links = [yellow_taxi_links]

save_dir = "C:\Users\wenji\OneDrive\Project\downloads yellow taxi"
download_files(file_links, save_dir)

In [None]:
file_links = fhvhv_links  # Use the list directly, assuming fhvhv_links is already a list of URLs
save_dir = "C:\Users\wenji\OneDrive\Project\downloads_HVFHV"

download_files(file_links, save_dir)


### Step 4: Define the Sampling Function for a Single Parquet File

The `sample_parquet_file` function loads a Parquet file, calculates the required sample size using Cochran's formula, extracts a random sample of rows, and saves the sampled data to a new Parquet file.

1. **Process**:
   - Loads the dataset from the specified Parquet file.
   - Calculates the sample size using the `calculate_sample_size` function.
   - Extracts a random sample of rows if the sample size is less than the population size. Otherwise, uses the full dataset.
   - Saves the sampled data as a new Parquet file in the output directory.

2. **Output**:
   - A new Parquet file containing the sampled dataset.

In [41]:
def sample_parquet_file(parquet_file, output_directory="sampled yellow taxi", confidence_level=0.95, margin_of_error=0.05, proportion=0.5):
    """
    Load a Parquet file, calculate a sample size, extract a random sample,
    and save it to a new Parquet file in the 'sampled yellow taxi' folder.

    Parameters:
        parquet_file (str): Path to the input Parquet file.
        output_directory (str): Directory to save the sampled Parquet file.
        confidence_level (float): Confidence level for sample size calculation (default is 0.95).
        margin_of_error (float): Margin of error for sample size calculation (default is 0.05).
        proportion (float): Estimated proportion of the population with the desired attribute (default is 0.5).
    """
    # Ensure the output directory exists
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Load the dataset
    print(f"Loading Parquet file: {parquet_file}")
    df = pd.read_parquet(parquet_file)

    # Calculate sample size
    population_size = len(df)
    sample_size = calculate_sample_size(population_size, confidence_level, margin_of_error, proportion)
    print(f"Population size: {population_size}, Sample size: {sample_size}")

    # If the sample size is larger than the population, use the full dataset
    if sample_size >= population_size:
        print("Sample size is larger than or equal to the population. Using the entire dataset.")
        sampled_df = df
    else:
        # Extract random sample
        print(f"Sampling {sample_size} rows from the dataset...")
        sampled_df = df.sample(n=sample_size, random_state=42)

    # Save the sampled dataset
    file_name = os.path.basename(parquet_file)
    output_file = os.path.join(output_directory, os.path.splitext(file_name)[0] + "_sample.parquet")
    print(f"Saving sampled data to: {output_file}")
    sampled_df.to_parquet(output_file, index=False)


### Step 5: Apply Sampling to All Files in a Directory

The `sample_all_parquet_files` function applies the `sample_parquet_file` function to all Parquet files in a specified directory.

1. **Process**:
   - Ensures the `output_directory` exists or creates it.
   - Iterates through all files in the `input_directory`.
   - Identifies Parquet files by their `.parquet` extension.
   - Applies the `sample_parquet_file` function to each Parquet file.

2. **Output**:
   - Sampled datasets are saved as new Parquet files in the `output_directory`, with filenames suffixed by `_sample.parquet`.

In [44]:
def sample_all_parquet_files(input_directory, output_directory="sampled yellow taxi", confidence_level=0.95, margin_of_error=0.05, proportion=0.5):
    """
    Apply sample_parquet_file to all Parquet files in a directory.

    Parameters:
        input_directory (str): Directory containing the Parquet files.
        output_directory (str): Directory to save the sampled Parquet files.
        confidence_level (float): Confidence level for sample size calculation (default is 0.95).
        margin_of_error (float): Margin of error for sample size calculation (default is 0.05).
        proportion (float): Estimated proportion of the population with the desired attribute (default is 0.5).
    """
    # Ensure the output directory exists
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    # Iterate through all files in the input directory
    for file_name in os.listdir(input_directory):
        if file_name.endswith(".parquet"):
            # Construct the full file path
            file_path = os.path.join(input_directory, file_name)
            print(f"Processing file: {file_path}")
            
            # Apply sampling to each file
            try:
                sample_parquet_file(file_path, output_directory, confidence_level, margin_of_error, proportion)
            except Exception as e:
                print(f"Failed to process file {file_name}: {e}")


In [46]:
input_directory = r"C:\Users\wenji\OneDrive\Project\downloads yellow taxi"
output_directory = r"C:\Users\wenji\OneDrive\Project\sampled yellow taxi"

# Apply sampling to all Parquet files in the input directory
sample_all_parquet_files(input_directory, output_directory)


Processing file: C:\Users\wenji\OneDrive\Project\downloads yellow taxi\yellow_tripdata_2020-01.parquet
Loading Parquet file: C:\Users\wenji\OneDrive\Project\downloads yellow taxi\yellow_tripdata_2020-01.parquet
Population size: 6405008, Sample size: 385
Sampling 385 rows from the dataset...
Saving sampled data to: C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2020-01_sample.parquet
Processing file: C:\Users\wenji\OneDrive\Project\downloads yellow taxi\yellow_tripdata_2020-02.parquet
Loading Parquet file: C:\Users\wenji\OneDrive\Project\downloads yellow taxi\yellow_tripdata_2020-02.parquet
Population size: 6299367, Sample size: 385
Sampling 385 rows from the dataset...
Saving sampled data to: C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2020-02_sample.parquet
Processing file: C:\Users\wenji\OneDrive\Project\downloads yellow taxi\yellow_tripdata_2020-03.parquet
Loading Parquet file: C:\Users\wenji\OneDrive\Project\downloads yellow taxi\yellow_

### Process Taxi data

### Step 1: Define the Cleaning Function

The `clean_parquet_columns` function reads a Parquet file, selects specific columns, and returns the cleaned DataFrame.

1. **Input**:
   - `parquet_file`: Path to the Parquet file that needs cleaning.

2. **Process**:
   - Reads the Parquet file into a Pandas DataFrame.
   - Specifies the relevant columns to keep:
     - `'tpep_pickup_datetime'`: Pickup time.
     - `'tpep_dropoff_datetime'`: Dropoff time.
     - `'passenger_count'`: Number of passengers.
     - `'trip_distance'`: Distance of the trip.
     - `'PULocationID'`: Pickup location ID.
     - `'DOLocationID'`: Dropoff location ID.
     - `'tip_amount'`: Amount of tip given.
     - `'total_amount'`: Total amount charged.
     - `'congestion_surcharge'`: Additional congestion surcharge.
   - Filters the DataFrame to include only these columns.

3. **Output**:
   - A cleaned DataFrame containing only the specified columns.

4. **Error Handling**:
   - If an error occurs during processing, the function prints the error message and returns `None`.

In [49]:
def clean_parquet_columns(parquet_file):
    """
    Read a Parquet file, keep only relevant columns, and return the cleaned DataFrame.

    Parameters:
    - parquet_file (str): Path to the Parquet file.

    Returns:
    - pd.DataFrame: The cleaned DataFrame.
    """
    try:
        # Read the Parquet file
        df = pd.read_parquet(parquet_file)

        # Specify the columns to keep
        columns_to_keep = [
            'tpep_pickup_datetime', 'tpep_dropoff_datetime',
            'passenger_count', 'trip_distance',
            'PULocationID', 'DOLocationID', 'tip_amount', 'total_amount', 'congestion_surcharge'
        ]

        # Filter the DataFrame to keep only the specified columns
        df_cleaned = df[columns_to_keep]
        return df_cleaned

    except Exception as e:
        print(f"Error processing the Parquet file: {e}")
        return None


In [50]:
file= r"C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2020-02_sample.parquet"
clean = clean_parquet_columns(file)
print(clean.head(10))

  tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  \
0  2020-02-19 19:37:08   2020-02-19 19:42:59              1.0           1.24   
1  2020-02-27 12:17:32   2020-02-27 12:26:00              1.0           0.86   
2  2020-02-06 14:38:29   2020-02-06 14:44:46              1.0           1.40   
3  2020-02-01 12:10:41   2020-02-01 12:22:25              1.0           1.80   
4  2020-02-14 19:18:29   2020-02-14 19:37:45              3.0           1.85   
5  2020-02-17 16:02:28   2020-02-17 16:09:40              1.0           1.90   
6  2020-02-08 12:38:37   2020-02-08 12:51:12              1.0           1.74   
7  2020-02-23 19:40:40   2020-02-23 19:48:55              1.0           1.31   
8  2020-02-01 04:43:14   2020-02-01 04:53:06              1.0           2.10   
9  2020-02-06 17:27:39   2020-02-06 17:43:09              1.0           3.13   

   PULocationID  DOLocationID  tip_amount  total_amount  congestion_surcharge  
0           237           236        0.

### Step 2: Define the Normalization Function

The `normalize_column_names` function modifies column names in the following ways:
1. **Convert to Lowercase**:
   - Ensures all column names are in lowercase for uniformity.
2. **Remove Special Characters**:
   - Strips out non-alphanumeric characters except for underscores.
3. **Replace Spaces with Underscores**:
   - Converts spaces and other whitespace to underscores.
4. **Strip Leading and Trailing Spaces**:
   - Removes any extra spaces around column names.

#### **Input**:
- `df`: A Pandas DataFrame with column names to be normalized.

#### **Output**:
- A DataFrame with normalized column names.


In [55]:
def normalize_column_names(df):
    """
    Normalize column names in a DataFrame by:
    - Converting to lowercase
    - Replacing spaces and special characters with underscores

    Parameters:
    - df (pd.DataFrame): The input DataFrame.

    Returns:
    - pd.DataFrame: The DataFrame with normalized column names.
    """
    # Normalize column names
    df.columns = (
        df.columns
        .str.strip()               # Remove leading/trailing spaces
        .str.lower()               # Convert to lowercase
        .str.replace(r'[^\w\s]', '', regex=True)  # Remove special characters
        .str.replace(r'\s+', '_', regex=True)     # Replace spaces with underscores
    )
    return df


In [57]:
clean1 = normalize_column_names(clean)
print(clean1.head(10))

  tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  \
0  2020-02-19 19:37:08   2020-02-19 19:42:59              1.0           1.24   
1  2020-02-27 12:17:32   2020-02-27 12:26:00              1.0           0.86   
2  2020-02-06 14:38:29   2020-02-06 14:44:46              1.0           1.40   
3  2020-02-01 12:10:41   2020-02-01 12:22:25              1.0           1.80   
4  2020-02-14 19:18:29   2020-02-14 19:37:45              3.0           1.85   
5  2020-02-17 16:02:28   2020-02-17 16:09:40              1.0           1.90   
6  2020-02-08 12:38:37   2020-02-08 12:51:12              1.0           1.74   
7  2020-02-23 19:40:40   2020-02-23 19:48:55              1.0           1.31   
8  2020-02-01 04:43:14   2020-02-01 04:53:06              1.0           2.10   
9  2020-02-06 17:27:39   2020-02-06 17:43:09              1.0           3.13   

   pulocationid  dolocationid  tip_amount  total_amount  congestion_surcharge  
0           237           236        0.

### Step 3: Define the Processing Function
This section explains how to process a taxi trip dataset by cleaning, mapping location IDs to geographic coordinates, and filtering the data based on a bounding box.
The `process_taxi_data_from_df` function processes a taxi trip dataset in the following steps:


1. **Processing Steps**:
   - **Normalize Column Names**:
     - Standardizes column names to lowercase and replaces special characters with underscores.
   - **Filter Invalid Location IDs**:
     - Removes rows where `PULocationID` or `DOLocationID` is missing or outside the valid range (1–263).
   - **Map Location IDs to Coordinates**:
     - Maps pickup and dropoff location IDs to their respective latitude and longitude using the Taxi Zone GeoDataFrame.
   - **Split Coordinates**:
     - Adds separate columns for pickup and dropoff latitude/longitude.
   - **Filter Rows by Bounding Box**:
     - Keeps rows with valid pickup and dropoff coordinates within the defined geographic bounding box.

2. **Output**:
   - A cleaned and filtered DataFrame ready for further analysis.

In [60]:
def process_taxi_data_from_df(df, loaded_taxi_zones, lat_min=40.560445, lon_min=-74.242330, lat_max=40.908524, lon_max=-73.717047):
    """
    Process a DataFrame by cleaning and filtering based on PULocationID and DOLocationID.

    Parameters:
        df (pd.DataFrame): Input DataFrame.
        loaded_taxi_zones (gpd.GeoDataFrame): GeoDataFrame containing Taxi Zone data.
        lat_min (float): Minimum latitude for the bounding box.
        lon_min (float): Minimum longitude for the bounding box.
        lat_max (float): Maximum latitude for the bounding box.
        lon_max (float): Maximum longitude for the bounding box.

    Returns:
        pd.DataFrame: Filtered DataFrame with valid coordinates.
    """
    # Copy the DataFrame to avoid warnings
    df = df.copy()
    print(f"Processing DataFrame with {len(df)} rows.")

    # Normalize column names
    df = normalize_column_names(df)

    # Step 1: Drop rows where PULocationID or DOLocationID is NaN or not in range 1-263
    valid_ids = range(1, 264)
    df = df[
        df['pulocationid'].isin(valid_ids) &
        df['dolocationid'].isin(valid_ids)
    ]
    print(f"After removing invalid location IDs: {len(df)} rows remaining.")

    # Step 2: Map PULocationID and DOLocationID to coordinates
    def lookup_coords_for_taxi_zone_id(zone_loc_id, loaded_taxi_zones):
        try:
            zone = loaded_taxi_zones[loaded_taxi_zones['LocationID'] == zone_loc_id]
            if zone.empty:
                return None
            centroid = zone.iloc[0].geometry.centroid
            return centroid.y, centroid.x
        except Exception as e:
            raise RuntimeError(f"Error looking up coordinates for Zone ID {zone_loc_id}: {e}")

    df.loc[:, 'pickup_coords'] = df['pulocationid'].apply(partial(lookup_coords_for_taxi_zone_id, loaded_taxi_zones=loaded_taxi_zones))
    df.loc[:, 'dropoff_coords'] = df['dolocationid'].apply(partial(lookup_coords_for_taxi_zone_id, loaded_taxi_zones=loaded_taxi_zones))

    # Split coordinates into separate latitude and longitude columns
    pickup_coords_df = pd.DataFrame(df['pickup_coords'].tolist(), columns=['pickup_latitude', 'pickup_longitude'])
    dropoff_coords_df = pd.DataFrame(df['dropoff_coords'].tolist(), columns=['dropoff_latitude', 'dropoff_longitude'])

    df = pd.concat([df, pickup_coords_df, dropoff_coords_df], axis=1)

    # Drop rows where mapping failed (NaN coordinates)
    df = df.dropna(subset=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'])
    print(f"After mapping location IDs to coordinates: {len(df)} rows remaining.")

    # Step 3: Filter rows based on the bounding box
    df = df[
        (df['pickup_latitude'].between(lat_min, lat_max)) &
        (df['pickup_longitude'].between(lon_min, lon_max)) &
        (df['dropoff_latitude'].between(lat_min, lat_max)) &
        (df['dropoff_longitude'].between(lon_min, lon_max))
    ]
    print(f"After filtering by bounding box: {len(df)} rows remaining.")

    # Drop intermediate columns if not needed
    df = df.drop(columns=['pickup_coords', 'dropoff_coords'])

    print("Processing completed.")
    return df




### Step 4: Define Total_clean Function

The `total_clean_taxi_data` function combines previously defined steps into a streamlined process for data cleaning and filtering.

#### **Processing Steps**:
1. **Clean Parquet Columns**:
   - Reads the Parquet file and retains only the relevant columns, such as pickup/dropoff timestamps, passenger count, trip distance, and location IDs.
2. **Normalize Column Names**:
   - Converts column names to lowercase and replaces spaces/special characters with underscores for consistency.
3. **Filter and Map Data**:
   - Maps location IDs to geographic coordinates, filters invalid rows, and applies a bounding box to keep rows within the NYC area.

#### **Output**:
- A cleaned and filtered DataFrame ready for analysis.


In [63]:
# It's a reference to a function that I wrote separately in the three steps above, and put it all together.
def total_clean_taxi_data(file, taxi_zones_gdf):
    """
    Process a Parquet file by cleaning, normalizing, and filtering data.

    Parameters:
        file (str): Path to the Parquet file.
        taxi_zones_gdf (gpd.GeoDataFrame): GeoDataFrame containing Taxi Zone data.

    Returns:
        pd.DataFrame: Processed and filtered DataFrame.
    """
    try:
        print(f"Processing file: {file}")

        # Step 1: Clean the Parquet columns
        cleaned_df = clean_parquet_columns(file)
        if cleaned_df is None:
            raise ValueError("Cleaning Parquet columns failed.")

        # Step 2: Normalize column names
        normalized_df = normalize_column_names(cleaned_df)

        # Step 3: Process the taxi data
        processed_df = process_taxi_data_from_df(normalized_df, taxi_zones_gdf)

        print("Processing completed.")
        return processed_df

    except Exception as e:
        print(f"Error during processing: {e}")
        return None


In [65]:
file= r"C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2021-09_sample.parquet"
ok= total_clean_taxi_data(file, taxi_zones_gdf)
print(ok.head(10))

Processing file: C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2021-09_sample.parquet
Processing DataFrame with 385 rows.
After removing invalid location IDs: 382 rows remaining.
After mapping location IDs to coordinates: 382 rows remaining.
After filtering by bounding box: 381 rows remaining.
Processing completed.
Processing completed.
  tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  \
0  2021-09-03 13:12:41   2021-09-03 13:19:43              1.0           1.01   
1  2021-09-14 17:57:35   2021-09-14 18:19:34              1.0           2.67   
2  2021-09-18 06:47:25   2021-09-18 06:52:03              1.0           1.04   
3  2021-09-20 10:32:05   2021-09-20 10:40:46              1.0           1.04   
4  2021-09-25 11:59:50   2021-09-25 12:00:08              1.0           0.00   
5  2021-09-19 10:56:31   2021-09-19 11:07:40              2.0           2.00   
6  2021-09-01 16:14:18   2021-09-01 16:22:33              1.0           1.00   



### Step 5: Define the merge files Function `process_and_merge_files`

The `process_and_merge_files` function combines multiple DataFrames generated from taxi trip data files into a single consolidated DataFrame.


#### **Processing Steps**:
1. **Iterate Through Files**:
   - Reads each file in the directory.
   - Ensures the file is valid before processing.
2. **Apply Cleaning and Filtering**:
   - Uses the `total_clean_taxi_data` function to clean and filter each file.
   - Appends the processed data to a list.
3. **Merge DataFrames**:
   - Combines all processed DataFrames into a single DataFrame.
   - Ensures unique indices by resetting the index during concatenation.
4. **Output**:
   - Returns the merged DataFrame containing all processed rows.

---


In [68]:
# merge all dataframe that generate from parquet 
def process_and_merge_files(directory, taxi_zones_gdf):
    """
    Process all files in a directory using total_clean_taxi_data, 
    and merge the resulting DataFrames where each processed file 
    contributes one row to the final DataFrame.
    
    Parameters:
        directory (str): Path to the directory containing the files.
        taxi_zones_gdf (GeoDataFrame): The taxi zones GeoDataFrame required by total_clean_taxi_data.
    
    Returns:
        pd.DataFrame: A merged DataFrame with unique indices.
    """
    all_data = []  # List to collect processed DataFrames
    
    for file_name in os.listdir(directory):
        file_path = os.path.join(directory, file_name)
        if os.path.isfile(file_path):  # Ensure it's a file
            try:
                # Process the file using total_clean_taxi_data
                processed_data = total_clean_taxi_data(file_path, taxi_zones_gdf)
                if isinstance(processed_data, pd.DataFrame):
                    # Add the resulting DataFrame with reset index to avoid duplicate indices
                    all_data.append(processed_data.reset_index(drop=True))
            except Exception as e:
                print(f"Error processing file {file_name}: {e}")
    
    # Combine all DataFrames into a single DataFrame
    if all_data:
        final_df = pd.concat(all_data, ignore_index=True)  # Ensure final DataFrame has unique indices
    else:
        final_df = pd.DataFrame()  # Return an empty DataFrame if no files were processed
    
    return final_df



In [70]:
directory= r"C:\Users\wenji\OneDrive\Project\sampled yellow taxi"
taxi_data=process_and_merge_files(directory, taxi_zones_gdf)

Processing file: C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2020-01_sample.parquet
Processing DataFrame with 385 rows.
After removing invalid location IDs: 377 rows remaining.
After mapping location IDs to coordinates: 377 rows remaining.
After filtering by bounding box: 377 rows remaining.
Processing completed.
Processing completed.
Processing file: C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2020-02_sample.parquet
Processing DataFrame with 385 rows.
After removing invalid location IDs: 382 rows remaining.
After mapping location IDs to coordinates: 382 rows remaining.
After filtering by bounding box: 382 rows remaining.
Processing completed.
Processing completed.
Processing file: C:\Users\wenji\OneDrive\Project\sampled yellow taxi\yellow_tripdata_2020-03_sample.parquet
Processing DataFrame with 385 rows.
After removing invalid location IDs: 380 rows remaining.
After mapping location IDs to coordinates: 380 rows remaining.
After filterin

In [71]:
print(taxi_data.head(100))
total_rows = taxi_data.shape[0]
print(f"Total number of rows: {total_rows}")

   tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  trip_distance  \
0   2020-01-25 10:49:58   2020-01-25 11:07:35              1.0           3.28   
1   2020-01-15 07:30:08   2020-01-15 07:40:01              1.0           1.75   
2   2020-01-09 06:29:09   2020-01-09 06:35:44              1.0           0.87   
3   2020-01-26 12:24:04   2020-01-26 12:29:15              2.0           0.98   
4   2020-01-30 07:57:53   2020-01-30 08:10:19              1.0           1.30   
..                  ...                   ...              ...            ...   
95  2020-01-02 18:44:02   2020-01-02 19:04:20              1.0           1.98   
96  2020-01-06 15:20:04   2020-01-06 15:25:53              1.0           0.80   
97  2020-01-30 14:18:00   2020-01-30 14:58:52              1.0          10.76   
98  2020-01-02 08:19:56   2020-01-02 08:30:49              5.0           1.62   
99  2020-01-21 21:51:49   2020-01-21 21:53:33              1.0           0.28   

    pulocationid  dolocatio

In [74]:
# Save the taxi_data DataFrame as a CSV file
taxi_data.to_csv("taxi_data.csv", index=False)