# Data Preprocessing

In [1]:
import math
from typing import List
import requests
import re
import os
import bs4
from bs4 import BeautifulSoup
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import geopandas as gpd
import warnings
warnings.filterwarnings("ignore")

## Downloading: programmatically download the Yellow Taxi & High-Volume For-Hire Vehicle (HVFHV) trip data 

In [2]:
TAXI_URL: str = "https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page"

def get_taxi_html() -> str:
    response = requests.get(TAXI_URL)
    html = response.content
    return html

def find_taxi_parquet_links() -> List[str]:
    ### BEGIN SOLUTION
    html = get_taxi_html()
    soup = bs4.BeautifulSoup(html, "html.parser")
    HVFHV_a_tags = soup.find_all("a", attrs={"title": "High Volume For-Hire Vehicle Trip Records"})
    yellow_a_tags = soup.find_all("a", attrs={"title": "Yellow Taxi Trip Records"})
    all_a_tags = HVFHV_a_tags + yellow_a_tags
    return [a["href"] for a in all_a_tags]
    ### END SOLUTION

In [3]:
def filter_urls(urls, start_year, start_month, end_year, end_month):
    filtered_urls = []
    for url in urls:
        match = re.search(r'(\d{4})-(\d{2})', url)
        if match:
            year, month = int(match.group(1)), int(match.group(2))
            if (start_year < year < end_year) or \
               (year == start_year and month >= start_month) or \
               (year == end_year and month <= end_month):
                filtered_urls.append(url.strip())
    return filtered_urls

# Filtering URLs from January 2020 to August 2024
urls = find_taxi_parquet_links()
filtered_urls = filter_urls(urls, 2020, 1, 2024, 8)

# Display the result
urls = sorted(filtered_urls)
urls

['https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-01.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-02.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-03.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-04.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-05.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-06.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-07.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-08.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-09.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-10.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-11.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2020-12.parquet',
 'ht

In [None]:
urls = urls
# Directory to save the downloaded files
output_directory = "Dataset"

# Create the directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

# Function to download a file
def download_parquet_file(i, url, output_directory):
    try:
        response = requests.get(url, stream=True)
        response.raise_for_status()  # Raise an error for bad HTTP responses
        
        # Extract file name from the URL
        file_name = url.split("/")[-1]
        file_path = os.path.join(output_directory, file_name)
        
        # Save the file
        with open(file_path, "wb") as file:
            for chunk in response.iter_content(chunk_size=1024):
                file.write(chunk)
        print(f"File {i} Downloaded: {file_name}")
    except requests.RequestException as e:
        print(f"File {i} Failed to download {url}. Error: {e}")

# Download all files
for i, url in enumerate(urls):
    download_parquet_file(i, url, output_directory)

## Sampling

In [4]:
def cochran_sample_size(N, p=0.5, e=0.05, confidence=0.95):
    """
    Calculate Cochran's sample size for a finite population.
    
    Parameters:
    N (int): Population size
    p (float): Proportion of population (default=0.5 for max variability)
    e (float): Margin of error (default=0.05 for ±5%)
    confidence (float): Confidence level (default=0.95 for 95%)
    
    Returns:
    int: Sample size
    """
    # Z-score for the given confidence level
    Z = {
        0.90: 1.645,
        0.95: 1.96,
        0.99: 2.576
    }.get(confidence, 1.96)  # Default to 95% confidence if not specified
    
    # Step 1: Cochran's formula for infinite population
    n0 = (Z**2 * p * (1 - p)) / (e**2)
    
    # Step 2: Adjust for finite population
    n = n0 / (1 + (n0 - 1) / N)
    
    return math.ceil(n)

def stable_sample_size(monthly_populations, p=0.5, e=0.05, confidence=0.99, method='max'):
    """
    Calculate a stable sample size across multiple months.
    
    Parameters:
    monthly_populations (list): List of population sizes for each month
    p (float): Proportion of population (default=0.5 for max variability)
    e (float): Margin of error (default=0.05 for ±5%)
    confidence (float): Confidence level (default=0.95 for 95%)
    method (str): Aggregation method ('max', 'average', 'safety')
    
    Returns:
    int: Stable sample size
    """
    sample_sizes = [cochran_sample_size(N, p, e, confidence) for N in monthly_populations]
    print(monthly_populations)
    
    if method == 'max':
        return max(sample_sizes)
    elif method == 'average':
        return math.ceil(sum(sample_sizes) / len(sample_sizes))
    elif method == 'safety':
        return math.ceil(max(sample_sizes) * 1.1)  # Add a 10% safety margin
    else:
        raise ValueError("Invalid method. Choose from 'max', 'average', or 'safety'.")

## Yellow Taxi
yellow_file_folder = 'Dataset/yellow_tripdata/'
yellow_file_names = [f for f in os.listdir(yellow_file_folder) if os.path.isfile(os.path.join(yellow_file_folder, f))]

monthly_populations_yellow = []
for file in yellow_file_names:
    parquet_file = pq.ParquetFile(yellow_file_folder + file)
    monthly_populations_yellow.append(parquet_file.metadata.num_rows)
    
# Example monthly population sizes
stable_size_yellow = stable_sample_size(monthly_populations_yellow, method='average')
print(f"Yellow Taxi Stable sample size: {stable_size_yellow}")

## Uber
fhvhv_file_folder = 'Dataset/fhvhv_tripdata/'
fhvhv_file_names = [f for f in os.listdir(fhvhv_file_folder) if os.path.isfile(os.path.join(fhvhv_file_folder, f))]

monthly_populations_uber = []
for file in fhvhv_file_names:
    parquet_file = pq.ParquetFile(fhvhv_file_folder + file)
    monthly_populations_uber.append(parquet_file.metadata.num_rows)
    
# Example monthly population sizes
stable_size_uber = stable_sample_size(monthly_populations_uber, method='average')
print(f"Uber Stable sample size: {stable_size_uber}")


[6405008, 6299367, 3007687, 238073, 348415, 549797, 800412, 1007286, 1341017, 1681132, 1509000, 1461898, 1369769, 1371709, 1925152, 2171187, 2507109, 2834264, 2821746, 2788757, 2963793, 3463504, 3472949, 3214369, 2463931, 2979431, 3627882, 3599920, 3588295, 3558124, 3174394, 3152677, 3183767, 3675411, 3252717, 3399549, 3066766, 2913955, 3403766, 3288250, 3513649, 3307234, 2907108, 2824209, 2846722, 3522285, 3339715, 3376567, 2964624, 3007526, 3582628, 3514289, 3723833, 3539193, 3076903, 2979183]
Yellow Taxi Stable sample size: 664
[20569368, 21725100, 13392928, 4312909, 6089999, 7555193, 9958454, 11096852, 12106669, 13268411, 11596865, 11637123, 11908468, 11613942, 14227393, 14111371, 14719171, 14961892, 15027174, 14499696, 14886055, 16545356, 16041639, 16054495, 14751591, 16019283, 18453548, 17752561, 18157335, 17780075, 17464619, 17185687, 17793551, 19306090, 18085896, 19665847, 18479031, 17960971, 20413539, 19144903, 19847676, 19366619, 19132131, 18322150, 19851123, 20186330, 192692

## Cleaning & Filtering

In [5]:
shapefile_path = 'taxi_zones.shp'
zones_gdf = gpd.read_file(shapefile_path)
if zones_gdf.crs is None:
    zones_gdf.set_crs(epsg=2263, inplace=True)  # Example: NY State Plane (EPSG:2263)
zones_gdf = zones_gdf.to_crs(epsg=4326) # Reproject to WGS84 (Latitude/Longitude)

# Calculate centroids for each zone (polygon)
zones_gdf['centroid'] = zones_gdf.geometry.centroid
zones_gdf['latitude'] = zones_gdf['centroid'].y
zones_gdf['longitude'] = zones_gdf['centroid'].x

# Retain only relevant columns: location ID, latitude, and longitude
zones_df = zones_gdf[['LocationID', 'latitude', 'longitude']]

### Yellow Taxi Filtering and Sampling

In [6]:
output_directory_yellow = "Clean_Sampled_Dataset/Yellow/"
os.makedirs(output_directory_yellow, exist_ok=True)

In [None]:
for i, file in enumerate(yellow_file_names[:]):
    trips_df = pd.read_parquet(yellow_file_folder + file)
    print('Current Processing:', i, file)

    # Merge trip data with zone centroids for pickups and dropoffs
    trips_with_pickup = trips_df.merge(
        zones_df,
        how='left',
        left_on='PULocationID',
        right_on='LocationID'
    ).rename(columns={'latitude': 'pickup_latitude', 'longitude': 'pickup_longitude'})

    trips_with_locations = trips_with_pickup.merge(
        zones_df,
        how='left',
        left_on='DOLocationID',
        right_on='LocationID',
        suffixes=('', '_dropoff')
    ).rename(columns={'latitude': 'dropoff_latitude', 'longitude': 'dropoff_longitude'})

    # Filter out trips with invalid location IDs
    valid_trips = trips_with_locations.dropna(subset=['pickup_latitude', 'dropoff_latitude'])

    # Delete records that start_pos or end_pos is out of range
    LAT_MIN, LAT_MAX = 40.560445, 40.908524
    LON_MIN, LON_MAX = -74.242330, -73.717047
    
    valid_trips = valid_trips[
        (valid_trips['pickup_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['pickup_longitude'].between(LON_MIN, LON_MAX)) &
        (valid_trips['dropoff_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['dropoff_longitude'].between(LON_MIN, LON_MAX))
    ]
    ## Delete original locationID columns
    valid_trips.drop(['PULocationID','DOLocationID','LocationID','LocationID_dropoff'],axis=1,inplace=True)
    
    valid_trips.columns = valid_trips.columns.str.lower()
    
    ## Delete records that trip_distance is missing or trip_distance <= 0, and convert datatype into Float
    valid_trips = valid_trips.dropna(subset=['trip_distance'])
    valid_trips = valid_trips[valid_trips['trip_distance']>0]
    valid_trips['trip_distance'] = valid_trips['trip_distance'].astype(float)
    
    ## Delete records that passenger_count is missing or passenger_count <= 0, and convert datatype into Integer
    valid_trips = valid_trips.dropna(subset=['passenger_count'])
    valid_trips = valid_trips[valid_trips['passenger_count']>0]
    valid_trips['passenger_count'] = valid_trips['passenger_count'].astype(int)
    
    ## Delete records that where Fare_amount, Total_amount, or Tolls_amount are negative.
    valid_trips = valid_trips[
        (valid_trips['fare_amount']>=0) &
        (valid_trips['total_amount']>=0) &
        (valid_trips['tolls_amount']>=0)
        ]
    
    ## Delete records that Payment_type not in the valid range (1-6), and convert datatype into Integer
    valid_trips['payment_type'] = valid_trips['payment_type'].astype(int)
    valid_trips = valid_trips[valid_trips['payment_type'].between(1,6)]
    
    ## Delete records that RateCodeID not in the valid range (1-6), and convert datatype into Integer
    valid_trips['ratecodeid'] = valid_trips['ratecodeid'].astype(int)
    valid_trips = valid_trips[valid_trips['ratecodeid'].between(1,6)]
    valid_trips = valid_trips.rename(columns={'ratecodeid':'RateCodeID',})
    
    ## Convert store_and_fwd_flag into 0 and 1
    valid_trips['store_and_fwd_flag'] = valid_trips['store_and_fwd_flag'].map({'Y':1,'N':0}).fillna(0)
    
    ## Convert airport_fee into Float
    valid_trips['airport_fee'] = pd.to_numeric(valid_trips['airport_fee'], errors='coerce').fillna(0)
    
    ## Rename: extra -> Miscellaneous_Extras, tpep_pickup_datetime → pickup_datetime, tpep_dropoff_datetime → dropoff_datetime
    valid_trips = valid_trips.rename(
        columns={'extra':'Miscellaneous_Extras','tpep_pickup_datetime':'pickup_datetime','tpep_dropoff_datetime':'dropoff_datetime'})
    
    ## Delete records that dropoff_datetime is earlier than pickup_datetime.
    valid_trips = valid_trips[valid_trips['dropoff_datetime'] >= valid_trips['pickup_datetime']]
    
    ## Sampling & Save to parquet file
    valid_trips = valid_trips.sample(n=stable_size_yellow, random_state=42).reset_index(drop=True)
    valid_trips.to_parquet(output_directory_yellow + file)


In [7]:
yellow_sampled_records = pd.DataFrame()
yellow_sampled_file_names = [f for f in os.listdir(output_directory_yellow) if os.path.isfile(os.path.join(output_directory_yellow, f))]

for file in yellow_sampled_file_names:
    sampled_df = pd.read_parquet(output_directory_yellow + file)
    yellow_sampled_records = pd.concat([yellow_sampled_records,sampled_df],axis=0)
    
output_directory_final = "Clean_Sampled_Dataset/Final/"
yellow_sampled_records.to_parquet(output_directory_final + 'Yellow_all.parquet')

### Uber Sampling & Filtering

In [8]:
output_directory_uber = "Clean_Sampled_Dataset/Uber/"
os.makedirs(output_directory_uber, exist_ok=True)

In [None]:
for i, file in enumerate(fhvhv_file_names[:]):
    trips_df = pd.read_parquet(fhvhv_file_folder + file)
    print('Current Processing:', i, file)
    
    ## Retain records that are Uber rides
    trips_df = trips_df[trips_df['hvfhs_license_num'] == 'HV0003']

    # Merge trip data with zone centroids for pickups and dropoffs
    trips_with_pickup = trips_df.merge(
        zones_df,
        how='left',
        left_on='PULocationID',
        right_on='LocationID'
    ).rename(columns={'latitude': 'pickup_latitude', 'longitude': 'pickup_longitude'})

    trips_with_locations = trips_with_pickup.merge(
        zones_df,
        how='left',
        left_on='DOLocationID',
        right_on='LocationID',
        suffixes=('', '_dropoff')
    ).rename(columns={'latitude': 'dropoff_latitude', 'longitude': 'dropoff_longitude'})

    # Filter out trips with invalid location IDs
    valid_trips = trips_with_locations.dropna(subset=['pickup_latitude', 'dropoff_latitude'])

    # Delete records that start_pos or end_pos is out of range
    LAT_MIN, LAT_MAX = 40.560445, 40.908524
    LON_MIN, LON_MAX = -74.242330, -73.717047
    
    valid_trips = valid_trips[
        (valid_trips['pickup_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['pickup_longitude'].between(LON_MIN, LON_MAX)) &
        (valid_trips['dropoff_latitude'].between(LAT_MIN, LAT_MAX)) &
        (valid_trips['dropoff_longitude'].between(LON_MIN, LON_MAX))
    ]
    ## Delete original locationID columns
    valid_trips.drop(['PULocationID','DOLocationID','LocationID','LocationID_dropoff'],axis=1,inplace=True)
    
    valid_trips.columns = valid_trips.columns.str.lower()
    
    ## Delete records that trip_distance is missing or trip_distance <= 0, and convert datatype into Float
    valid_trips = valid_trips.dropna(subset=['trip_miles'])
    valid_trips = valid_trips[valid_trips['trip_miles']>0]
    valid_trips['trip_miles'] = valid_trips['trip_miles'].astype(float)
    
    ## Delete records that trip_time is missing or trip_distance <= 0, and convert datatype into Float
    valid_trips = valid_trips.dropna(subset=['trip_time'])
    valid_trips = valid_trips[valid_trips['trip_time']>0]
    valid_trips['trip_time'] = valid_trips['trip_time'].astype(float)
    
    ## Delete records that where base_passenger_fare, tolls, sales_tax, bcf, tips, congestion_surcharge or driver_pay are negative.
    valid_trips = valid_trips[
        (valid_trips['base_passenger_fare']>=0) &
        (valid_trips['tolls']>=0) &
        (valid_trips['sales_tax']>=0) &
        (valid_trips['bcf']>=0) &
        (valid_trips['tips']>=0) &
        (valid_trips['congestion_surcharge']>=0) &
        (valid_trips['driver_pay']>=0) 
        ]
    
    ## Convert shared_request_flag, shared_match_flag, access_a_ride_flag, wav_request_flag, wav_match_flag into 0 and 1
    valid_trips['shared_request_flag'] = valid_trips['shared_request_flag'].map({'Y':1,'N':0}).fillna(0)
    valid_trips['shared_match_flag'] = valid_trips['shared_match_flag'].map({'Y':1,'N':0}).fillna(0)
    valid_trips['access_a_ride_flag'] = valid_trips['access_a_ride_flag'].map({'Y':1,'N':0}).fillna(0)
    valid_trips['wav_request_flag'] = valid_trips['wav_request_flag'].map({'Y':1,'N':0}).fillna(0)
    valid_trips['wav_match_flag'] = valid_trips['wav_match_flag'].map({'Y':1,'N':0}).fillna(0)
    
    ## Delete records that dropoff_datetime is earlier than pickup_datetime.
    valid_trips = valid_trips[valid_trips['dropoff_datetime'] >= valid_trips['pickup_datetime']]
    
    ## Delete records that on_scene_datetime is earlier than request_datetime.
    valid_trips = valid_trips[valid_trips['on_scene_datetime'] >= valid_trips['request_datetime']]
    
    ## Rename: bcf -> Black_Car_Fund
    valid_trips = valid_trips.rename(
        columns={'bcf':'Black_Car_Fund',})
    
    ## Delete useless columns: dispatching_base_num, Hvfhs_license_num, originating_base_num
    valid_trips = valid_trips.drop(['hvfhs_license_num','dispatching_base_num','originating_base_num'], axis=1)
    
    ## Sampling & Save to parquet file
    valid_trips = valid_trips.sample(n=stable_size_uber, random_state=42).reset_index(drop=True)
    valid_trips.to_parquet(output_directory_uber + file)


In [9]:
uber_sampled_records = pd.DataFrame()
uber_sampled_file_names = [f for f in os.listdir(output_directory_uber) if os.path.isfile(os.path.join(output_directory_uber, f))]

for file in uber_sampled_file_names:
    sampled_df = pd.read_parquet(output_directory_uber + file)
    uber_sampled_records = pd.concat([uber_sampled_records,sampled_df],axis=0)
    
output_directory_final = "Clean_Sampled_Dataset/Final/"
uber_sampled_records.to_parquet(output_directory_final + 'Uber_all.parquet')

### Weather data preprocessing: Hourly

In [143]:
weather_data_file_folder = 'Dataset/weather_data/'
weather_data_files = ['2020_weather.csv','2021_weather.csv','2022_weather.csv','2023_weather.csv','2024_weather.csv']

weather_data_hourly_df = pd.DataFrame()
for file in weather_data_files:
    print("Current Processing:", file)
    weather_data = pd.read_csv(weather_data_file_folder + file)

    ## Split Date and Hour
    weather_data[['Date', 'Hour']] = weather_data['DATE'].str.split('T', expand=True)
    
    weather_data_hourly = weather_data[weather_data['Hour'] !='23:59:00']
    weather_data_hourly['Hour'] = weather_data_hourly['Hour'].str.split(':').str[0]
    weather_data_hourly = weather_data_hourly.groupby(['Date','Hour']).first().reset_index()

    original_columns = list(weather_data_hourly.columns)
    columns_to_drop = ['DATE','ELEVATION','STATION','NAME','LATITUDE','LONGITUDE','NormalsCoolingDegreeDay','NormalsHeatingDegreeDay','Sunrise', 'Sunset','WindEquipmentChangeDate'] \
    + ['AWND','CDSD','CLDD','DSNW','HDSD','HTDD','DYTS','DYHF',] \
    + ['HourlyPresentWeatherType','HourlySkyConditions','REM','HourlyWindDirection'] \
    + [col for col in original_columns if col.startswith('Daily')] \
    + [col for col in original_columns if col.startswith('Monthly')] \
    + [col for col in original_columns if col.startswith('Backup')] \
    + [col for col in original_columns if col.startswith('ShortDuration')]
    weather_data_hourly = weather_data_hourly.drop(columns_to_drop,axis=1)

    ## Transform data type & Fill missing values
    weather_data_hourly['Hour'] = weather_data_hourly['Hour'].astype(int)
    weather_data_hourly['HourlyAltimeterSetting'] = pd.to_numeric(weather_data_hourly['HourlyAltimeterSetting'], errors='coerce')
    weather_data_hourly['HourlyAltimeterSetting'] = weather_data_hourly['HourlyAltimeterSetting'].fillna(weather_data_hourly['HourlyAltimeterSetting'].mean())
    weather_data_hourly['HourlyDewPointTemperature'] = pd.to_numeric(weather_data_hourly['HourlyDewPointTemperature'], errors='coerce')
    weather_data_hourly['HourlyDewPointTemperature'] = weather_data_hourly['HourlyDewPointTemperature'].fillna(weather_data_hourly['HourlyDewPointTemperature'].mean())
    weather_data_hourly['HourlyDryBulbTemperature'] = pd.to_numeric(weather_data_hourly['HourlyDryBulbTemperature'], errors='coerce')
    weather_data_hourly['HourlyDryBulbTemperature'] = weather_data_hourly['HourlyDryBulbTemperature'].fillna(weather_data_hourly['HourlyDryBulbTemperature'].mean())
    weather_data_hourly['HourlySeaLevelPressure'] = pd.to_numeric(weather_data_hourly['HourlySeaLevelPressure'], errors='coerce')
    weather_data_hourly['HourlySeaLevelPressure'] = weather_data_hourly['HourlySeaLevelPressure'].fillna(weather_data_hourly['HourlySeaLevelPressure'].mean())
    weather_data_hourly['HourlyStationPressure'] = pd.to_numeric(weather_data_hourly['HourlyStationPressure'], errors='coerce')
    weather_data_hourly['HourlyStationPressure'] = weather_data_hourly['HourlyStationPressure'].fillna(weather_data_hourly['HourlyStationPressure'].mean())

    weather_data_hourly['HourlyVisibility'] = weather_data_hourly['HourlyVisibility'].str.extract(r'(\d+\.\d+)', expand=False)
    weather_data_hourly['HourlyVisibility'] = pd.to_numeric(weather_data_hourly['HourlyVisibility'], errors='coerce')
    weather_data_hourly['HourlyVisibility'] = weather_data_hourly['HourlyVisibility'].fillna(weather_data_hourly['HourlyVisibility'].mean())

    weather_data_hourly['HourlyPrecipitation'] = weather_data_hourly['HourlyPrecipitation'].replace('T', 0.0005)
    weather_data_hourly['HourlyPrecipitation'] = weather_data_hourly['HourlyPrecipitation'].replace(['M', ''], np.nan)
    weather_data_hourly['HourlyPrecipitation'] = weather_data_hourly['HourlyPrecipitation'].fillna(0)
    weather_data_hourly['HourlyPrecipitation'] = pd.to_numeric(weather_data_hourly['HourlyPrecipitation'], errors='coerce')
    weather_data_hourly['HourlyPrecipitation'] = weather_data_hourly['HourlyPrecipitation'].fillna(0)

    weather_data_hourly['HourlyPressureChange'] = pd.to_numeric(weather_data_hourly['HourlyPressureChange'], errors='coerce')
    weather_data_hourly['HourlyPressureChange'] = weather_data_hourly['HourlyPressureChange'].fillna(0)
    weather_data_hourly['HourlyPressureTendency'] = pd.to_numeric(weather_data_hourly['HourlyPressureTendency'], errors='coerce')
    weather_data_hourly['HourlyPressureTendency'] = weather_data_hourly['HourlyPressureTendency'].fillna(0)
    weather_data_hourly['HourlyRelativeHumidity'] = pd.to_numeric(weather_data_hourly['HourlyPrecipitation'], errors='coerce')
    weather_data_hourly['HourlyRelativeHumidity'] = weather_data_hourly['HourlyRelativeHumidity'].fillna(0)
    weather_data_hourly['HourlyWetBulbTemperature'] = pd.to_numeric(weather_data_hourly['HourlyWetBulbTemperature'], errors='coerce')
    weather_data_hourly['HourlyWetBulbTemperature'] = weather_data_hourly['HourlyWetBulbTemperature'].fillna(weather_data_hourly['HourlyWetBulbTemperature'].mean())
    weather_data_hourly['HourlyWindGustSpeed'] = weather_data_hourly['HourlyWindGustSpeed'].fillna(weather_data_hourly['HourlyWindGustSpeed'].mean())
    weather_data_hourly['HourlyWindSpeed'] = weather_data_hourly['HourlyWindSpeed'].fillna(weather_data_hourly['HourlyWindSpeed'].mean())

    weather_data_hourly_df = pd.concat([weather_data_hourly_df,weather_data_hourly],axis=0)
    
output_directory_final = "Clean_Sampled_Dataset/Final/"
weather_data_hourly_df.to_parquet(output_directory_final + 'Weather_hourly.parquet')

Current Processing: 2020_weather.csv
Current Processing: 2021_weather.csv
Current Processing: 2022_weather.csv
Current Processing: 2023_weather.csv
Current Processing: 2024_weather.csv


In [144]:
weather_data_hourly_df.head()

Unnamed: 0,Date,Hour,REPORT_TYPE,SOURCE,HourlyAltimeterSetting,HourlyDewPointTemperature,HourlyDryBulbTemperature,HourlyPrecipitation,HourlyPressureChange,HourlyPressureTendency,HourlyRelativeHumidity,HourlySeaLevelPressure,HourlyStationPressure,HourlyVisibility,HourlyWetBulbTemperature,HourlyWindGustSpeed,HourlyWindSpeed
0,2020-01-01,0,FM-15,7,29.66,26.0,40.0,0.0,-0.01,3.0,0.0,29.64,29.49,10.0,35.0,21.300203,8.0
1,2020-01-01,1,FM-15,7,29.67,27.0,39.0,0.0,0.0,0.0,0.0,29.65,29.5,10.0,34.0,17.0,8.0
2,2020-01-01,2,FM-15,7,29.68,26.0,39.0,0.0,0.0,0.0,0.0,29.66,29.51,10.0,34.0,23.0,14.0
3,2020-01-01,3,FM-15,7,29.7,24.0,39.0,0.0,-0.03,3.0,0.0,29.67,29.53,10.0,33.0,23.0,11.0
4,2020-01-01,4,FM-15,7,29.7,23.0,38.0,0.0,0.0,0.0,0.0,29.67,29.53,10.0,32.0,20.0,6.0


### Weather data preprocessing: Daily

In [145]:
weather_data_file_folder = 'Dataset/weather_data/'
weather_data_files = ['2020_weather.csv','2021_weather.csv','2022_weather.csv','2023_weather.csv','2024_weather.csv']

weather_data_daily_df = pd.DataFrame()
for file in weather_data_files:
    print("Current Processing:", file)
    weather_data = pd.read_csv(weather_data_file_folder + '2020_weather.csv')

    ## Split Date and Hour
    weather_data[['Date', 'Hour']] = weather_data['DATE'].str.split('T', expand=True)

    weather_data_daily = weather_data[weather_data['Hour'] =='23:59:00']
    weather_data_daily['Hour'] = weather_data_daily['Hour'].str.split(':').str[0]
    weather_data_daily = weather_data_daily[weather_data_daily['REPORT_TYPE'] == 'SOD  ']
    weather_data_daily = weather_data_daily[['Date','Hour'] + [col for col in list(weather_data_daily.columns) if col not in ['Date','Hour']]]
    weather_data_daily

    original_columns = list(weather_data_daily.columns)
    columns_to_drop = ['DATE','ELEVATION','Hour','STATION','NAME','LATITUDE','LONGITUDE','WindEquipmentChangeDate','DailyWeather'] \
        + ['AWND','CDSD','CLDD','DSNW','HDSD','HTDD','DYTS','DYHF',] \
        + ['HourlyPresentWeatherType','HourlySkyConditions','REM','NormalsCoolingDegreeDay','NormalsHeatingDegreeDay','DailySustainedWindDirection'] \
        + [col for col in original_columns if col.startswith('Hourly')] \
        + [col for col in original_columns if col.startswith('Monthly')] \
        + [col for col in original_columns if col.startswith('Backup')] \
        + [col for col in original_columns if col.startswith('ShortDuration')]
    weather_data_daily = weather_data_daily.drop(columns_to_drop,axis=1)

    special_treat_col1 = ['DailySnowfall','DailyPrecipitation','DailySnowDepth']
    for col in special_treat_col1:
        weather_data_daily[col] = weather_data_daily[col].replace('T', 0.0005)
        weather_data_daily[col] = weather_data_daily[col].replace(['M', ''], np.nan)
        weather_data_daily[col] = weather_data_daily[col].fillna(0)
        weather_data_daily[col] = pd.to_numeric(weather_data_daily[col], errors='coerce')
        weather_data_daily[col] = weather_data_daily[col].fillna(0)
        
    special_treat_col2 = ['DailyAverageDewPointTemperature','DailyAverageDryBulbTemperature','DailyAverageRelativeHumidity','DailyAverageSeaLevelPressure',
    'DailyAverageStationPressure','DailyAverageWetBulbTemperature','DailyAverageWindSpeed',
    'DailyCoolingDegreeDays',
    'DailyDepartureFromNormalAverageTemperature',
    'DailyHeatingDegreeDays',
    'DailyCoolingDegreeDays',
    'DailyMaximumDryBulbTemperature',
    'DailyMinimumDryBulbTemperature',
    'DailySustainedWindSpeed',
    ]
    for col in special_treat_col2:
        weather_data_daily[col] = pd.to_numeric(weather_data_daily[col], errors='coerce')
        weather_data_daily[col] = weather_data_daily[col].fillna(weather_data_daily[col].mean())

    weather_data_daily_df = pd.concat([weather_data_daily_df,weather_data_daily])
    
output_directory_final = "Clean_Sampled_Dataset/Final/"
weather_data_daily_df.to_parquet(output_directory_final + 'Weather_daily.parquet')

Current Processing: 2020_weather.csv
Current Processing: 2021_weather.csv
Current Processing: 2022_weather.csv
Current Processing: 2023_weather.csv
Current Processing: 2024_weather.csv


# Storing Data

In [157]:
from sqlalchemy import create_engine, Column, Integer, Float, String, Date, DateTime, MetaData, Table

# Create SQLite database
engine = create_engine('sqlite:///transport_weather.db')
metadata = MetaData()

In [158]:
uber_sampled_records_file = pq.ParquetFile(output_directory_final + 'Uber_all.parquet')
uber_sampled_records_file.schema.names

['request_datetime',
 'on_scene_datetime',
 'pickup_datetime',
 'dropoff_datetime',
 'trip_miles',
 'trip_time',
 'base_passenger_fare',
 'tolls',
 'Black_Car_Fund',
 'sales_tax',
 'congestion_surcharge',
 'airport_fee',
 'tips',
 'driver_pay',
 'shared_request_flag',
 'shared_match_flag',
 'access_a_ride_flag',
 'wav_request_flag',
 'wav_match_flag',
 'pickup_latitude',
 'pickup_longitude',
 'dropoff_latitude',
 'dropoff_longitude',
 '__index_level_0__']

In [159]:
# Define Uber trips table schema
uber_trips_table = Table(
    'uber_trips', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('request_datetime', DateTime),
    Column('on_scene_datetime', DateTime),
    Column('pickup_datetime', DateTime),
    Column('dropoff_datetime', DateTime),
    Column('trip_miles', Float),
    Column('trip_time', Float),
    Column('base_passenger_fare', Float),
    Column('tolls', Float),
    Column('Black_Car_Fund', Float),
    Column('sales_tax', Float),
    Column('congestion_surcharge', Float),
    Column('airport_fee', Float),
    Column('tips', Float),
    Column('driver_pay', Float),
    Column('shared_request_flag', Integer),  # Y/N
    Column('shared_match_flag', Integer),    # Y/N
    Column('access_a_ride_flag', Integer),   # Y/N
    Column('wav_request_flag', Integer),     # Y/N
    Column('wav_match_flag', Integer),       # Y/N
    Column('pickup_latitude', Float),
    Column('pickup_longitude', Float),
    Column('dropoff_latitude', Float),
    Column('dropoff_longitude', Float),
)

In [160]:
yellow_sampled_records_file = pq.ParquetFile(output_directory_final + 'Yellow_all.parquet')
yellow_sampled_records_file.schema.names

['vendorid',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RateCodeID',
 'store_and_fwd_flag',
 'payment_type',
 'fare_amount',
 'Miscellaneous_Extras',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee',
 'pickup_latitude',
 'pickup_longitude',
 'dropoff_latitude',
 'dropoff_longitude',
 '__index_level_0__']

In [161]:
# Define Yellow Taxi trips table schema
yellow_trips_table = Table(
    'yellow_taxi_trips', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('vendorid', Integer),  # Can be TEXT or INTEGER depending on data
    Column('pickup_datetime', DateTime),
    Column('dropoff_datetime', DateTime),
    Column('passenger_count', Integer),
    Column('trip_distance', Float),
    Column('RateCodeID', Integer),
    Column('store_and_fwd_flag', Integer),  # Y/N
    Column('payment_type', Integer),
    Column('fare_amount', Float),
    Column('Miscellaneous_Extras', Float),
    Column('mta_tax', Float),
    Column('tip_amount', Float),
    Column('tolls_amount', Float),
    Column('improvement_surcharge', Float),
    Column('total_amount', Float),
    Column('congestion_surcharge', Float),
    Column('airport_fee', Float),
    Column('pickup_latitude', Float),
    Column('pickup_longitude', Float),
    Column('dropoff_latitude', Float),
    Column('dropoff_longitude', Float),
)

In [162]:
weather_hourly_records_file = pq.ParquetFile(output_directory_final + 'Weather_hourly.parquet')
weather_hourly_records_file.schema.names

['Date',
 'Hour',
 'REPORT_TYPE',
 'SOURCE',
 'HourlyAltimeterSetting',
 'HourlyDewPointTemperature',
 'HourlyDryBulbTemperature',
 'HourlyPrecipitation',
 'HourlyPressureChange',
 'HourlyPressureTendency',
 'HourlyRelativeHumidity',
 'HourlySeaLevelPressure',
 'HourlyStationPressure',
 'HourlyVisibility',
 'HourlyWetBulbTemperature',
 'HourlyWindGustSpeed',
 'HourlyWindSpeed',
 '__index_level_0__']

In [163]:
# Define weather hourly table schema
weather_hourly_table = Table(
    'weather_hourly', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('Date', Date),
    Column('Hour', Integer),
    Column('REPORT_TYPE', String),
    Column('SOURCE', String),
    Column('HourlyAltimeterSetting', Float),
    Column('HourlyDewPointTemperature', Float),
    Column('HourlyDryBulbTemperature', Float),
    Column('HourlyPrecipitation', Float),
    Column('HourlyPressureChange', Float),
    Column('HourlyPressureTendency', String),
    Column('HourlyRelativeHumidity', Float),
    Column('HourlySeaLevelPressure', Float),
    Column('HourlyStationPressure', Float),
    Column('HourlyVisibility', Float),
    Column('HourlyWetBulbTemperature', Float),
    Column('HourlyWindGustSpeed', Float),
    Column('HourlyWindSpeed', Float),
)

In [164]:
weather_daily_records_file = pq.ParquetFile(output_directory_final + 'Weather_daily.parquet')
weather_daily_records_file.schema.names

['Date',
 'REPORT_TYPE',
 'SOURCE',
 'Sunrise',
 'Sunset',
 'DailyAverageDewPointTemperature',
 'DailyAverageDryBulbTemperature',
 'DailyAverageRelativeHumidity',
 'DailyAverageSeaLevelPressure',
 'DailyAverageStationPressure',
 'DailyAverageWetBulbTemperature',
 'DailyAverageWindSpeed',
 'DailyCoolingDegreeDays',
 'DailyDepartureFromNormalAverageTemperature',
 'DailyHeatingDegreeDays',
 'DailyMaximumDryBulbTemperature',
 'DailyMinimumDryBulbTemperature',
 'DailyPeakWindDirection',
 'DailyPeakWindSpeed',
 'DailyPrecipitation',
 'DailySnowDepth',
 'DailySnowfall',
 'DailySustainedWindSpeed',
 '__index_level_0__']

In [165]:
# Define weather daily table schema
weather_daily_table = Table(
    'weather_daily', metadata,
    Column('id', Integer, primary_key=True, autoincrement=True),
    Column('Date', Date),
    Column('REPORT_TYPE', String),
    Column('SOURCE', String),
    Column('Sunrise', String),
    Column('Sunset', String),
    Column('DailyAverageDewPointTemperature', Float),
    Column('DailyAverageDryBulbTemperature', Float),
    Column('DailyAverageRelativeHumidity', Float),
    Column('DailyAverageSeaLevelPressure', Float),
    Column('DailyAverageStationPressure', Float),
    Column('DailyAverageWetBulbTemperature', Float),
    Column('DailyAverageWindSpeed', Float),
    Column('DailyCoolingDegreeDays', Float),
    Column('DailyDepartureFromNormalAverageTemperature', Float),
    Column('DailyHeatingDegreeDays', Float),
    Column('DailyMaximumDryBulbTemperature', Float),
    Column('DailyMinimumDryBulbTemperature', Float),
    Column('DailyPeakWindDirection', String),
    Column('DailyPeakWindSpeed', Float),
    Column('DailyPrecipitation', Float),
    Column('DailySnowDepth', Float),
    Column('DailySnowfall', Float),
    Column('DailySustainedWindSpeed', Float),
)



In [166]:
metadata.create_all(engine)

In [167]:
yellow_df = pd.read_parquet(output_directory_final + 'Yellow_all.parquet')
yellow_df.to_sql('yellow_taxi_trips', engine, if_exists='replace', index=False)

uber_df = pd.read_parquet(output_directory_final + 'Uber_all.parquet')
uber_df.to_sql('uber_trips', engine, if_exists='replace', index=False)

weather_hourly_df = pd.read_parquet(output_directory_final + 'Weather_hourly.parquet')
weather_hourly_df.to_sql('weather_hourly', engine, if_exists='replace', index=False)

weather_daily_df = pd.read_parquet(output_directory_final + 'Weather_daily.parquet')
weather_daily_df.to_sql('weather_daily', engine, if_exists='replace', index=False)

1830

### Try to load data from database

In [None]:
import sqlite3
# Connect to the SQLite database
conn = sqlite3.connect('transport_weather.db')

# Query the database
query = "SELECT * FROM weather_daily LIMIT 5"  # Retrieve first 5 rows
result = pd.read_sql(query, conn)

print(result)
conn.close()

         Date REPORT_TYPE  SOURCE  Sunrise  Sunset  \
0  2020-01-01       SOD         6    720.0  1639.0   
1  2020-01-02       SOD         6    720.0  1640.0   
2  2020-01-03       SOD         6    720.0  1641.0   
3  2020-01-04       SOD         6    720.0  1642.0   
4  2020-01-05       SOD         6    720.0  1643.0   

   DailyAverageDewPointTemperature  DailyAverageDryBulbTemperature  \
0                             21.0                            38.0   
1                             25.0                            41.0   
2                             41.0                            47.0   
3                             45.0                            46.0   
4                             20.0                            39.0   

   DailyAverageRelativeHumidity  DailyAverageSeaLevelPressure  \
0                          52.0                         29.76   
1                          52.0                         29.91   
2                          82.0                         29.

### Create SQL Schema

In [170]:
# Connect to the SQLite database
db_path = 'transport_weather.db' # Replace with your actual database file
conn = sqlite3.connect(db_path)

# Query the sqlite_master table to get schema definitions
cursor = conn.cursor()
cursor.execute("SELECT name, sql FROM sqlite_master WHERE type='table';")

# Open a file to write the schema
with open("schema.sql", "w") as file:
    for table_name, schema in cursor.fetchall():
        if schema:  # Exclude views or invalid entries
            file.write(schema + ";\n\n")

conn.close()
print("Schema.sql file has been successfully generated.")

Schema.sql file has been successfully generated.


# Understanding Data

# Visualizing Data