### Part 1: Data Preprocessing

In [1]:
import os
import re
import csv
import zipfile
import requests
import pyarrow
import pandas as pd 
import geopandas as gpd
from datetime import datetime
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Column, Integer, String, Float, Date, Time, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


def fetch_yellow_taxi_links(base_url):
    resp = requests.get(base_url)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, 'html.parser')
    return soup.find_all('a', href=re.compile(r'^(?=.*yellow_tripdata)(?=.*(\d{4}-\d{2}\.parquet|\.zip)).*$'))


def fetch_taxi_data(links, start_date, end_date): 
    print('Check availablity of retrieved data ...')
    if not os.path.exists(retrieved_files_dir):
        os.makedirs(retrieved_files_dir)
    
    for link in links:
        url = link['href']
        file_name = url.split('/')[-1]
        date_str = file_name.split('_')[-1].split('.')[0]
        date_obj = datetime.strptime(date_str, '%Y-%m')
        flag_load_file = True

        if start_date <= date_obj <= end_date:
            file_path = os.path.join(retrieved_files_dir, file_name)
            
            
            if not os.path.exists(file_path):
                print(f'Downloading {file_name}...')
                flag_load_file = False # retrieved file unavailable
                response = requests.get(url)
                response.raise_for_status()

                with open(file_path, 'wb') as file:
                    file.write(response.content) 
                
            if file_name.endswith('.zip'):
                csv_file_name = file_name.replace('.zip', '.csv')
                csv_file_path = os.path.join(retrieved_files_dir, csv_file_name)

                if not os.path.exists(csv_file_path):
                    print(f'Extracting {file_name}...')
                    with zipfile.ZipFile(file_path, 'r') as zip_file:
                        zip_file.extractall(retrieved_files_diroutput_dir)

                os.remove(file_path)
    if flag_load_file:
        print("Retrieved data found!")
    else:
        print('Data fetching completed.')


def clean_and_sample_data(data: pd.DataFrame, columns_to_keep: list, columns_to_rename: dict,
                          down_threshold: float, up_threshold: float,
                          left_threshold: float, right_threshold: float, sample_size: int, year: int) -> pd.DataFrame:
    data = data[columns_to_keep].copy() 
    data.rename(columns={old_name: new_name for old_name, new_name in zip(columns_to_keep, columns_to_rename)}, inplace=True)

    if year < 2011:
        data = data[(data['Start_Lat'] <= up_threshold) & (data['End_Lat'] <= up_threshold) & (data['Start_Lat'] >= down_threshold) & (
            data['End_Lat'] >= down_threshold) & (data['Start_Lon'] <= right_threshold) & (data['End_Lon'] <= right_threshold) & (
                    data['Start_Lon'] >= left_threshold) & (data['End_Lon'] >= left_threshold)]
    else:
        pass

    if data.empty:
        return data
    else:
        return data.sample(sample_size, random_state=42)
 
    return data.sample(sample_size, random_state=42)


def compile_and_clean_taxi_data() -> pd.DataFrame:
    yellow_taxi_data = pd.DataFrame()

    down_threshold = 40.560445
    up_threshold = 40.908524
    left_threshold = -74.242330
    right_threshold = -73.717047
    sample_size = 2500
    columns_to_rename = ['Pickup_Datetime', 'Dropoff_Datetime', "Trip_Distance",
                                     "Start_Lon", "Start_Lat", "End_Lon", "End_Lat", "Fare_Amt", "Tip_Amt"]
    print('Check availablity of sampled data ...')
    if not os.path.exists(sampled_files_dir):
        os.makedirs(sampled_files_dir)
        
    for year in range(2009, 2016):
        checked = False
        for month in range(1, 13):
            
            file_name = f"sampled_data_{year}.csv"
            file_path = os.path.join(sampled_files_dir, file_name)
            
            if os.path.exists(file_path):
                # If the file exists, read in the data
                yellow_taxi_data = pd.read_csv(file_path)
                if not checked:
                    print(f"Sampled data from {file_name} loaded successfully!") 
                checked = True
            else:
                # process the data as needed...
                print(f"Sampling {year}-{month:02d}")   
                if year == 2009:
                    columns_to_keep = ['Trip_Pickup_DateTime', 'Trip_Dropoff_DateTime', "Trip_Distance",
                                       "Start_Lon", "Start_Lat", "End_Lon", "End_Lat", "Fare_Amt", "Tip_Amt"] 
                elif year == 2010:
                    columns_to_keep = ['pickup_datetime', 'dropoff_datetime', "trip_distance", "pickup_longitude", "pickup_latitude",
                                       "dropoff_longitude", "dropoff_latitude", "fare_amount", "tip_amount"] 
                if year >= 2011:
                    columns_to_keep = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance',
                                       'Start_Lon', 'Start_Lat', 'End_Lon', 'End_Lat', 'fare_amount', 'tip_amount']


                data = pd.read_parquet(f"{retrieved_files_dir}/yellow_tripdata_{year}-{month:02d}.parquet")
                if year >= 2011:
                    #data = data[data['PULocationID'] >263]
                    #data = data[data["DOLocationID"] >263] 
                    data = data.merge(taxi_zones, left_on='PULocationID', right_on='LocationID', how='left') \
                        .rename(columns={'Lon': 'Start_Lon', 'Lat': 'Start_Lat'}) \
                        .drop(columns=['PULocationID', 'LocationID'])

                    data = data.merge(taxi_zones, left_on='DOLocationID', right_on='LocationID', how='left') \
                        .rename(columns={'Lon': 'End_Lon', 'Lat': 'End_Lat'}) \
                        .drop(columns=['DOLocationID', 'LocationID']) 

                sampled_data = clean_and_sample_data(data, columns_to_keep, columns_to_rename,
                                                     down_threshold, up_threshold, left_threshold, right_threshold,
                                                     sample_size, year)

                yellow_taxi_data = yellow_taxi_data.append(sampled_data, ignore_index=True)

        yellow_taxi_data.to_csv(file_path, index=False)
        print(f"Sampled data saved as {file_path}") 
        if year == 2015 and month >= 6:
            yellow_taxi_data.to_csv(file_path, index=False)
            break
            
    yellow_taxi_data.to_csv("sampled_taxi_data_2009_2015.csv", index=False)
    return yellow_taxi_data


def load_taxi_zones():
    zones = gpd.read_file('taxi_zone_data/taxi_zones.shp')
    zones = zones.to_crs(epsg=2263)  
    zones['Lon'] = zones.centroid.x
    zones['Lat'] = zones.centroid.y
    zones = zones.drop(columns=['OBJECTID', "Shape_Leng", 'Shape_Area', "zone", 'borough', 'geometry'])
    return zones


if __name__ == "__main__":
    main_url = 'https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page'
    start_date = datetime(2009, 1, 1)
    end_date = datetime(2015, 6, 30)
    retrieved_files_dir = 'monthly_data'
    sampled_files_dir = 'sampled_data'

    yellow_taxi_links = fetch_yellow_taxi_links(main_url)
    fetch_taxi_data(yellow_taxi_links, start_date, end_date)
    taxi_zones = load_taxi_zones()
    compiled_taxi_data = compile_and_clean_taxi_data()

Check availablity of retrieved data ...
Retrieved data found!
Check availablity of sampled data ...
Sampling 2009-01
Sampling 2009-02
Sampling 2009-03
Sampling 2009-04
Sampling 2009-05
Sampling 2009-06
Sampling 2009-07
Sampling 2009-08
Sampling 2009-09
Sampling 2009-10
Sampling 2009-11
Sampling 2009-12
Sampled data saved as sampled_data/sampled_data_2009.csv
Sampling 2010-01
Sampling 2010-02
Sampling 2010-03
Sampling 2010-04
Sampling 2010-05
Sampling 2010-06
Sampling 2010-07
Sampling 2010-08
Sampling 2010-09
Sampling 2010-10
Sampling 2010-11
Sampling 2010-12
Sampled data saved as sampled_data/sampled_data_2010.csv
Sampling 2011-01
Sampling 2011-02
Sampling 2011-03
Sampling 2011-04
Sampling 2011-05
Sampling 2011-06
Sampling 2011-07
Sampling 2011-08
Sampling 2011-09
Sampling 2011-10
Sampling 2011-11
Sampling 2011-12
Sampled data saved as sampled_data/sampled_data_2011.csv
Sampling 2012-01
Sampling 2012-02
Sampling 2012-03
Sampling 2012-04
Sampling 2012-05
Sampling 2012-06
Sampling 2012-0

FileNotFoundError: [Errno 2] No such file or directory: 'monthly_data/yellow_tripdata_2015-07.parquet'

In [None]:
### Calculate distance

In [None]:
from math import radians, sin, cos, sqrt, atan2

def add_trip_distance_column(df):
    def calculate_distance_from_coordinate(from_coord, to_coord):
        R = 6371.0
        lat1, lon1 = radians(from_coord[0]), radians(from_coord[1])
        lat2, lon2 = radians(to_coord[0]), radians(to_coord[1])
        dlon = lon2 - lon1
        dlat = lat2 - lat1
        a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
        c = 2 * atan2(sqrt(a), sqrt(1-a))
        distance = R * c
        return distance

    df['trip_distance'] = df.apply(lambda r: calculate_distance_from_coordinate(r['Start_Lat'], r['Start_Lon'], r['End_Lat'], r['End_Lon']), axis=1)
    df['trip_distance'] = df['trip_distance'].apply(lambda d: '{:.2f}'.format(d))

    return df

In [None]:
###Define the table classes:

In [None]:
from sqlalchemy import create_engine, Column, Integer, String, Float, Date, Time, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///project.db')
Base = declarative_base()


In [None]:
class YellowTaxi(Base):
    __tablename__ = 'yellow_taxi'
    id = Column(Integer, primary_key=True)
    Date = Column(Date)
    Pickup = Column(DateTime)
    Pickup_Time = Column(Time)
    Trip_Distance = Column(Float)
    Start_Lon = Column(Float)
    Start_Lat = Column(Float)
    End_Lon = Column(Float)
    End_Lat = Column(Float)
    Fare_Amt = Column(Float)
    Tip_Amt = Column(Float)
    Year = Column(Integer)
    Month = Column(Integer)
    Day = Column(Integer)
    Time = Column(Integer)
    DayofWeek = Column(Integer)

# Define classes for Uber, HourlyWeather, and DailyWeather tables similarly


In [None]:
Base.metadata.create_all(engine)

# Load the preprocessed datasets into DataFrames:
yellow_taxi_data = pd.read_csv("cleaned_yellow_taxi_data_2009_2015.csv")
weather_2009 = pd.read_csv("2009_weather.csv")
weather_2010 = pd.read_csv("2010_weather.csv")
weather_2011 = pd.read_csv("2011_weather.csv")
weather_2012 = pd.read_csv("2012_weather.csv")
weather_2013 = pd.read_csv("2013_weather.csv")
weather_2014 = pd.read_csv("2014_weather.csv")
weather_2015 = pd.read_csv("2015_weather.csv")





