In [1]:
%reload_ext autoreload
%autoreload 2

In [2]:
import pandas as pd
import numpy as np
import shapefile
from sqlalchemy import create_engine
import requests
import zipfile
from datetime import datetime

from src.paths import TRIP_DATA
from src.paths import SQL_DB_DIR

In [3]:
from pathlib import Path
from typing import Optional

def download_parquet_files(start_year: int = 2023, start_month: int = 1) -> None:
    """
    Downloads NYC Yellow Taxi trip data parquet files for a specified range of months within the year 2023.

    This function iterates through months starting from `start_month` of `start_year` (2023) and
    attempts to download the corresponding parquet file if it does not already exist in the storage.
    The downloads are limited to the year 2023 to prevent including data from future years.

    Parameters:
    - start_year (int, optional): The year to start downloading data from, defaults to 2023.
    - start_month (int, optional): The month to start downloading data from, defaults to January (1).

    Returns:
    - None: Files are downloaded to the specified `TRIP_DATA` path, and no value is returned.

    Raises:
    - Exception: If the file cannot be downloaded due to the URL being unavailable.
    """
    # Set end_year to 2023 to ensure we do not go beyond this year.
    end_year = 2023
    current_month = datetime.now().month
    current_year = datetime.now().year
    
    # Adjust the range to be within 2023 only, considering the current year and month.
    for year in range(start_year, end_year + 1):
        for month in range(start_month if year == start_year else 1, 
                           current_month + 1 if year == current_year and current_year <= end_year else 13):
            path = TRIP_DATA / Path(f'rides_{year}_{month:02d}.parquet')
            url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02d}.parquet"
            if not path.exists():
                try:
                    print(f'Downloading file {year}_{month:02d}')
                    r = requests.get(url)
                    if r.status_code == 200:
                        with open(path, 'wb') as f:
                            f.write(r.content)
                    else:
                        raise Exception(f"{url} is not available.")
                except Exception as e:
                    print(f'Error downloading file for {year}_{month:02d}')
                    continue
            else:
                print(f'File {year}_{month:02d} already in storage.')

In [4]:
# Run the download function.
download_parquet_files()

File 2023_01 already in storage.
File 2023_02 already in storage.
File 2023_03 already in storage.
File 2023_04 already in storage.
File 2023_05 already in storage.
File 2023_06 already in storage.
File 2023_07 already in storage.
File 2023_08 already in storage.
File 2023_09 already in storage.
File 2023_10 already in storage.
File 2023_11 already in storage.
File 2023_12 already in storage.


In [5]:
test_df = pd.read_parquet(TRIP_DATA/f'rides_2023_01.parquet')
test_df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.30,1.00,0.5,0.00,0.0,1.0,14.30,2.5,0.00
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.10,1.0,N,43,237,1,7.90,1.00,0.5,4.00,0.0,1.0,16.90,2.5,0.00
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.90,1.00,0.5,15.00,0.0,1.0,34.90,2.5,0.00
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.90,1.0,N,138,7,1,12.10,7.25,0.5,0.00,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.40,1.00,0.5,3.28,0.0,1.0,19.68,2.5,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3066723,2,2023-01-31 23:58:34,2023-02-01 00:12:33,,3.05,,,107,48,0,15.80,0.00,0.5,3.96,0.0,1.0,23.76,,
3066724,2,2023-01-31 23:31:09,2023-01-31 23:50:36,,5.80,,,112,75,0,22.43,0.00,0.5,2.64,0.0,1.0,29.07,,
3066725,2,2023-01-31 23:01:05,2023-01-31 23:25:36,,4.67,,,114,239,0,17.61,0.00,0.5,5.32,0.0,1.0,26.93,,
3066726,2,2023-01-31 23:40:00,2023-01-31 23:53:00,,3.15,,,230,79,0,18.15,0.00,0.5,4.43,0.0,1.0,26.58,,


In [6]:
from typing import List

def validate_and_clean_data(file_paths: List[Path], start_datetime: str, end_datetime: str) -> None:
    """
    Validates and cleans NYC Yellow Taxi trip data by removing rows with pickup_datetime values
    outside the specified valid range.

    Parameters:
    - file_paths (List[Path]): A list of Paths to the .parquet files to be processed.
    - start_datetime (str): The start of the valid datetime range in 'YYYY-MM-DD HH:MM:SS' format.
    - end_datetime (str): The end of the valid datetime range in 'YYYY-MM-DD HH:MM:SS' format.

    Returns:
    - None: The function directly modifies the files on disk, removing invalid rows.

    Note: This function assumes the 'tpep_pickup_datetime' column is in the correct datetime format. Adjust
    the date parsing as necessary based on the actual data format.
    """
    for file_path in file_paths:
        # Read the parquet file
        df = pd.read_parquet(file_path)

        # Convert 'pickup_datetime' column to datetime if not already
        df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])

        # Filter the DataFrame based on the valid datetime range
        valid_data = df[(df['tpep_pickup_datetime'] >= start_datetime) & (df['tpep_pickup_datetime'] <= end_datetime)]

        # Save the cleaned data back to disk, overwriting the original file
        valid_data.to_parquet(file_path, index=False)

        print(f"Processed and cleaned data in {file_path}.")


In [7]:
file_paths =[TRIP_DATA/f'rides_2023_{month:02d}.parquet' for month in range(1, 13)]

# Define the valid datetime range
start_datetime = '2023-01-01 00:00:00'
end_datetime = '2023-12-31 23:59:59'

validate_and_clean_data(file_paths=file_paths,
                        start_datetime=start_datetime,
                        end_datetime=end_datetime)

Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_01.parquet.
Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_02.parquet.
Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_03.parquet.
Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_04.parquet.
Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_05.parquet.
Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_06.parquet.
Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_07.parquet.
Processed and cleaned data in /Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2023_08.parquet.
Processed and cleaned data in /Users/olanrewajuo

In [8]:
# Define the engine with the path to the nyc_taxi_data.db file in the database folder
engine = create_engine(f'sqlite:///{SQL_DB_DIR}/nyc_taxi_data.db')

def load_data_to_sqlite():
    """
    Loads NYC Yellow Taxi data from .parquet files into an SQLite database.

    Iterates through .parquet files starting from January for each year including and following 2023,
    reading each file into a pandas DataFrame and appending it to the 'yellow_taxi_data' table in
    the specified SQLite database. This process is repeated for each month up to the current month in the current year.

    Note: Ensure the SQL_DB_DIR variable is correctly set to the directory containing the SQLite database.
    """
    current_month = datetime.now().month
    current_year = datetime.now().year
    for year in range(2023, current_year + 1):
        for month in range(1, current_month + 1 if year == current_year else 13):
            file_name = f"rides_{year}_{month:02d}.parquet"
            try:
                # Update TRIP_DATA to the correct path where your parquet files are stored
                df = pd.read_parquet(TRIP_DATA / file_name)
                df.to_sql('yellow_taxi_data', engine, if_exists='append', index=False)
                print(f"Loaded {file_name} into SQLite database.")
            except Exception as e:
                print(e)
                print(f"Failed to load {file_name}.")


In [9]:
# Running the function to load data into SQLite.
load_data_to_sqlite()

Loaded rides_2023_01.parquet into SQLite database.
Loaded rides_2023_02.parquet into SQLite database.
Loaded rides_2023_03.parquet into SQLite database.
Loaded rides_2023_04.parquet into SQLite database.
Loaded rides_2023_05.parquet into SQLite database.
Loaded rides_2023_06.parquet into SQLite database.
Loaded rides_2023_07.parquet into SQLite database.
Loaded rides_2023_08.parquet into SQLite database.
Loaded rides_2023_09.parquet into SQLite database.
Loaded rides_2023_10.parquet into SQLite database.
Loaded rides_2023_11.parquet into SQLite database.
Loaded rides_2023_12.parquet into SQLite database.
[Errno 2] No such file or directory: '/Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2024_01.parquet'
Failed to load rides_2024_01.parquet.
[Errno 2] No such file or directory: '/Users/olanrewajuoladele/Desktop/nyc_taxi_analysis/data/trip_data/rides_2024_02.parquet'
Failed to load rides_2024_02.parquet.
[Errno 2] No such file or directory: '/Users/olanrewajuola

In [10]:
from src.paths import TAXI_ZONES_DIR

In [11]:
# URL from where to download the ZIP file
url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zones.zip"

# Define the local path where the ZIP file will be saved
# TAXI_ZONES_DIR is a Path object pointing to the desired directory
local_zip_path = TAXI_ZONES_DIR / "taxi_zones.zip"

# Define the folder where to extract the contents of the ZIP file
# Here, we're extracting directly into the TAXI_ZONES_DIR
extract_to_folder = TAXI_ZONES_DIR

# Start the download
response = requests.get(url)
if response.status_code == 200:
    with open(local_zip_path, "wb") as file:
        file.write(response.content)
    print("Download successful.")
else:
    print(f"Failed to download file. Status code: {response.status_code}")

# Extract the ZIP file
try:
    with zipfile.ZipFile(local_zip_path, "r") as zip_ref:
        zip_ref.extractall(extract_to_folder)
    print("Extraction complete.")
except zipfile.BadZipFile:
    print("Error: The downloaded file is not a valid ZIP file.")

Download successful.
Extraction complete.


In [12]:
try:
    # Read the .shp file from the 'TAXI_ZONES_DIR'
    sf = shapefile.Reader(TAXI_ZONES_DIR/'taxi_zones.shp')
    print("Successfully read the shapefile.")
    # Optional: Print out the shapes or records to confirm
    print(sf.shapes())
except Exception as e:
    print("Failed to read the shapefile:", e)


Successfully read the shapefile.
Shapes: [Shape #0: POLYGON, Shape #1: POLYGON, Shape #2: POLYGON, Shape #3: POLYGON, Shape #4: POLYGON, Shape #5: POLYGON, Shape #6: POLYGON, Shape #7: POLYGON, Shape #8: POLYGON, Shape #9: POLYGON, Shape #10: POLYGON, Shape #11: POLYGON, Shape #12: POLYGON, Shape #13: POLYGON, Shape #14: POLYGON, Shape #15: POLYGON, Shape #16: POLYGON, Shape #17: POLYGON, Shape #18: POLYGON, Shape #19: POLYGON, Shape #20: POLYGON, Shape #21: POLYGON, Shape #22: POLYGON, Shape #23: POLYGON, Shape #24: POLYGON, Shape #25: POLYGON, Shape #26: POLYGON, Shape #27: POLYGON, Shape #28: POLYGON, Shape #29: POLYGON, Shape #30: POLYGON, Shape #31: POLYGON, Shape #32: POLYGON, Shape #33: POLYGON, Shape #34: POLYGON, Shape #35: POLYGON, Shape #36: POLYGON, Shape #37: POLYGON, Shape #38: POLYGON, Shape #39: POLYGON, Shape #40: POLYGON, Shape #41: POLYGON, Shape #42: POLYGON, Shape #43: POLYGON, Shape #44: POLYGON, Shape #45: POLYGON, Shape #46: POLYGON, Shape #47: POLYGON, Shape #4

In [13]:
fields = sf.fields
# Printing the fields
print("Fields in the shapefile:")
for field in fields[1:]:  # Skip the first element as it's a deletion flag field
    print(field)

Fields in the shapefile:
['OBJECTID', 'N', 9, 0]
['Shape_Leng', 'F', 19, 11]
['Shape_Area', 'F', 19, 11]
['zone', 'C', 254, 0]
['LocationID', 'N', 4, 0]
['borough', 'C', 254, 0]


In [14]:
import fiona
from shapely.geometry import shape

def calculate_centroids(filepath: str) -> list:
    """
    Calculates the centroids of geographic features from a shapefile.

    This function opens a shapefile specified by the filepath, iterates through each geographic
    feature, calculates the centroid of the feature, and collects the centroid's coordinates
    along with the feature's LocationID. It returns a list of dictionaries, each containing
    the LocationID, latitude, and longitude of a feature's centroid.

    Parameters:
    - filepath (str): The path to the shapefile from which to read the geographic features.

    Returns:
    - list: A list of dictionaries. Each dictionary represents a feature's centroid and contains
      'LocationID', 'Latitude', and 'Longitude' keys.
    
    Example of returned list item:
    - {'LocationID': '1', 'Latitude': 40.123456, 'Longitude': -74.123456}
    
    Note:
    This function relies on Fiona to read shapefiles and Shapely to calculate geometric properties.
    Ensure these libraries are installed and the shapefile contains a 'LocationID' field.
    """
    
    # Initialize a list to hold centroid information
    centroids = []

    # Use Fiona to open the shapefile
    with fiona.open(filepath, 'r') as shp:
        # Iterate over each feature in the shapefile
        for item in shp:
            # Convert the geometry part of the feature into a Shapely Geometry object
            geom = shape(item['geometry'])
            
            # Calculate the centroid of the geometry
            centroid = geom.centroid
            
            # Append a dictionary with LocationID, latitude, and longitude of the centroid
            centroids.append({
                'LocationID': item['properties']['LocationID'],  # Extract LocationID from feature properties
                'Latitude': centroid.y,  # Latitude (y coordinate) of the centroid
                'Longitude': centroid.x  # Longitude (x coordinate) of the centroid
            })

    # Return the list of centroids
    return centroids


In [15]:
# Calculate centroids for the taxi_zones shapefile
centroids_data = calculate_centroids(TAXI_ZONES_DIR/'taxi_zones.shp')

# Convert the list of centroids to a DataFrame
centroids_df = pd.DataFrame(centroids_data)

# Display the DataFrame
centroids_df.head()

Unnamed: 0,LocationID,Latitude,Longitude
0,1,191376.749531,935996.8
1,2,164018.754403,1031086.0
2,3,254265.478659,1026453.0
3,4,202959.782391,990634.0
4,5,140681.351376,931871.4


In [16]:
# Check for missing LocationID.
expected_ids = set(range(1, 264))
actual_ids = set(centroids_df['LocationID'].unique())
missing_ids = expected_ids - actual_ids

print(f"Missing LocationID: {missing_ids}")

Missing LocationID: {104, 57, 105}


In [23]:
from pyproj import Transformer

def calculate_transformed_centroids(filepath: str) -> pd.DataFrame:
    """
    Calculates and transforms the centroids of geographic features from a shapefile
    to WGS 84 (EPSG:4326) coordinate reference system and returns them as a pandas DataFrame.

    This function opens a shapefile, iterates through each geographic feature to calculate
    the centroid, transforms the centroid coordinates from the source CRS (EPSG:2263 by default)
    to WGS 84 (EPSG:4326), and collects these transformed coordinates along with the feature's
    LocationID. The results are returned in a pandas DataFrame.

    Parameters:
    - filepath (str): The path to the shapefile from which to read the geographic features.

    Returns:
    - pd.DataFrame: A DataFrame with columns 'LocationID', 'Latitude', and 'Longitude'
      for each feature's centroid in WGS 84 coordinates.
    
    Note:
    - This function assumes the shapefile uses the EPSG:2263 (NAD83/New York Long Island) CRS.
      If your data uses a different source CRS, adjust the `from_crs` parameter of the Transformer accordingly.
    - The function relies on Fiona for reading shapefiles, Shapely for geometric operations,
      PyProj for CRS transformation, and pandas for data structuring. Ensure these libraries are installed.
    """
    # Initialize a transformer to convert from EPSG:2263 to EPSG:4326, ensuring longitude comes before latitude
    transformer = Transformer.from_crs("epsg:2263", "epsg:4326", always_xy=True)
    
    centroids = []  # List to store centroid information
    with fiona.open(filepath, 'r') as shp:  # Open the shapefile
        for item in shp:  # Iterate over each feature
            geom = shape(item['geometry'])  # Convert geometry to Shapely object
            centroid = geom.centroid  # Calculate centroid
            # Transform the centroid coordinates to WGS 84
            lon, lat = transformer.transform(centroid.x, centroid.y)
            # Append the transformed coordinates with LocationID to the list
            centroids.append({
                'LocationID': item['properties']['LocationID'],  # LocationID from feature properties
                'Latitude': lat,  # Transformed latitude
                'Longitude': lon  # Transformed longitude
            })
    
    # Convert the list of centroids to a pandas DataFrame and return
    return pd.DataFrame(centroids)


In [24]:
# Calculate and transform centroids
transformed_centroids_df = calculate_transformed_centroids(TAXI_ZONES_DIR/'taxi_zones.shp')

# Display the DataFrame
transformed_centroids_df.head()

Unnamed: 0,LocationID,Latitude,Longitude
0,1,40.69183,-74.174002
1,2,40.616746,-73.8313
2,3,40.864474,-73.847422
3,4,40.723752,-73.976968
4,5,40.552659,-74.188485


In [25]:
# Check for duplicates based on 'LocationID'
duplicates = transformed_centroids_df[transformed_centroids_df.duplicated('LocationID', keep=False)]

# Print out the duplicates
print(duplicates)

     LocationID   Latitude  Longitude
55           56  40.741407 -73.858845
56           56  40.751819 -73.853582
102         103  40.689860 -74.045288
103         103  40.698769 -74.040771
104         103  40.688784 -74.019073


In [26]:
# Step 1: Replace the second occurrence of '56' with '57'
# Convert LocationID column to a list for manipulation
location_ids = transformed_centroids_df['LocationID'].tolist()
second_occurrence_index = [index for index, value in enumerate(location_ids) if value == 56][1]
transformed_centroids_df.at[second_occurrence_index, 'LocationID'] = 57

# Step 2:Replace the second and third occurence of '103 with '104 nd 105' respectively.
occurrences_103_indices = [index for index, value in enumerate(location_ids) if value == 103]
print(occurrences_103_indices)

[102, 103, 104]


In [29]:
# Safety check to ensure there are at least three occurrences
if len(occurrences_103_indices) >= 3:
    # Step 2: Update the second and third occurrences of '103' to '104' and '105'
    transformed_centroids_df.at[occurrences_103_indices[1], 'LocationID'] = 104
    transformed_centroids_df.at[occurrences_103_indices[2], 'LocationID'] = 105
else:
    print("Not enough occurrences of '103' to update.")

print(transformed_centroids_df.loc[transformed_centroids_df['LocationID'].isin([102, 103, 104, 105, 106])])

     LocationID   Latitude  Longitude
101         102  40.703546 -73.875737
102         103  40.689860 -74.045288
103         104  40.698769 -74.040771
104         105  40.688784 -74.019073
105         106  40.673513 -73.990648


In [30]:
transformed_centroids_df.LocationID.duplicated().sum()

0

In [31]:
# Extract field names directly, skipping the first 'DeletionFlag' field
fields_name = [field[0] for field in sf.fields[1:]]

attributes = [dict(zip(fields_name, attr)) for attr in sf.records()]

# Create a DataFrame directly from the list of attribute dictionaries
df_attributes = pd.DataFrame(attributes)

In [34]:
df_location = pd.merge(df_attributes, transformed_centroids_df, on='LocationID', how='left')

In [35]:
df_location.head()

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,Latitude,Longitude
0,1,0.116357,0.000782,Newark Airport,1,EWR,40.69183,-74.174002
1,2,0.43347,0.004866,Jamaica Bay,2,Queens,40.616746,-73.8313
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,40.864474,-73.847422
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,40.723752,-73.976968
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,40.552659,-74.188485


In [36]:
df_location.tail()

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,Latitude,Longitude
258,259,0.12675,0.000395,Woodlawn/Wakefield,259,Bronx,40.897932,-73.852215
259,260,0.133514,0.000422,Woodside,260,Queens,40.744234,-73.906307
260,261,0.02712,3.4e-05,World Trade Center,261,Manhattan,40.709139,-74.013023
261,262,0.049064,0.000122,Yorkville East,262,Manhattan,40.775932,-73.94651
262,263,0.037017,6.6e-05,Yorkville West,263,Manhattan,40.778766,-73.95101


In [None]:
df_location.to_parquet(TAXI_ZONES_DIR/'taxi_zones_df.parquet')