In [4]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import geopandas as gpd
import re
import os
import zipfile
import duckdb

## Retrieving the data links

In [5]:
# make a request to the website - NYC Taxi & Limousine Commission TLC Trip Record Data
url = "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page"
r = requests.get(url)
html = r.content
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a")

In [6]:
# extract all the yellow taxi link from the webpage
taxi_links = list()
for link in links:
    cur_link = link.get("href")
    data_link = re.findall(r".+\.parquet$", cur_link)
    if data_link:
        cur_link2 = data_link[0]
        is_yellow = re.findall(r".*yellow.*", cur_link2)
        if is_yellow:
            taxi_links.append(is_yellow[0])

## Viewing sample data

In [None]:
# sample data
df1 = pd.read_parquet("./temp-data/taxi-2021-1.parquet", engine="pyarrow")

In [None]:
df1.info()

In [None]:
df1.head()

## Data Analysis using DuckDB

### References:
1. [PySpark @ RealPython](https://realpython.com/pyspark-intro/)
2. [NYC Data Dictionary](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf)
3. [DuckDB - Query Syntax](https://duckdb.org/docs/sql/functions/timestamp.html)

- Questions:
    - What is the yearly/daily/hourly taxi traffic for each taxi zone in NYC?
    - When is a driver more likely to get higher amount of tip?
    - When/where a driver is more likely to get a customer(s)?
    - For each taxi zone, how far a taxi driver is likely to drive? Is there any particular zone with particularly higher overall trip distance?

### Reading in Each Month's Data (parquet files)

1. Thought about compressing parquet files, but [it is inefficient](https://stackoverflow.com/questions/60774906/how-to-write-a-parquet-bytes-object-as-zipfile-to-disk).
2. Tried combining all of them into a single SQL database, but it takes up too much storage.
3. Tried reading and analyzing monthly data individually from 2011 to 2022.
   - Changed the scope to from 2016 to 2022 due to column specifiation (DOLocationID & PULocationID)
4. Thought about using API, but it's much more time consuming (working with a generator with millions of rows).
5. **Currently trying to work with `pandas` and `duckdb`**
    - `dask` does not work when trying to read parquet files from the web due to "encoding error."

## Preparing Analysis

In [None]:
taxi_links[:5]

In [7]:
# update taxi_links: only select data between 2016 and 2022
taxi_links_11_22 = list()
for link in taxi_links:
    cur_year = link.split("_")[-1].split("-")[0]
    if 2011 <= int(cur_year) and int(cur_year) <= 2022:
        taxi_links_11_22.append(link)
taxi_links_11_22[-5:]

['https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2011-08.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2011-09.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2011-10.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2011-11.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2011-12.parquet']

In [None]:
print(len(taxi_links), ",", len(taxi_links_11_22))

### Efficiently Load Data from Large Parquet Files

1. Load less data by specifying desired columns
2. Change data type into more efficient formats
3. Use `duckdb` to query each dataframe

In [None]:
columns_to_use = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "total_amount", "tip_amount", "trip_distance"]
jan_2011 = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2011-01.parquet", columns = columns_to_use)
jan_2011

In [None]:
jan_2011.memory_usage(deep=True)

In [None]:
jan_2011[["PULocationID", "total_amount", "tip_amount", "trip_distance"]] = jan_2011[["PULocationID", "total_amount", "tip_amount", "trip_distance"]].apply(pd.to_numeric, downcast='float')

In [None]:
jan_2011.memory_usage(deep=True)

In [None]:
jan_2011.head()

## Understand the data

In order to understand the data more in depth, it is crucial to know how they are recorded. First, make sure to read the data description from the [NYC TLC data dictionary](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf)
and study how [taximeter works](https://www.staxi.nl/en/how-taximeters-work/#:~:text=How%20a%20taximeter%20works,taxi%20travels%20a%20certain%20distance.). There are few irregular rows and deciding what to do with them heavily depends on these domain knowledge. 

## Clean Data

Filtered out the followings rows:

1. Rides with `trip_distance` of 0.
2. Rides with `trip_time` <= 1.
3. Rides that would require the speed of more than 100 mph.

Let's create a new column `trip_time` which indicates the total time spent per ride. Notice there are few rides with negative `trip_time` which does not make sense. Let's filter and take a look at those rows.

In [None]:
jan_2011 = duckdb.query("""
    SELECT *, DATE_DIFF('minute', tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_time
    FROM jan_2011
""").df()

In [None]:
duckdb.query("""
    SELECT *
    FROM jan_2011
    WHERE trip_time < 0
""").df()

Notice nearly all of them are from `PULocatonID` of 264 which is unknown borough according to [NYC Taxi Zone Code](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page).

In [None]:
duckdb.query("""
    SELECT PULocationID, COUNT(*) AS cnt
    FROM jan_2011
    WHERE DATE_DIFF('minute', tpep_pickup_datetime, tpep_dropoff_datetime) < 0
    GROUP BY PULocationID
""").df()

There is a single row whose `PULocationID` is not 264 (Unknown location doe) and has a negative `trip_time`. Notice its `trip_distance` is also 0.

In [None]:
duckdb.query("""
    SELECT *
    FROM jan_2011
    WHERE DATE_DIFF('minute', tpep_pickup_datetime, tpep_dropoff_datetime) < 0 AND PULocationID = 145.0
""").df()

Let's see how many rows are of `trip_distance` 0. There are quite a few of them (n=76093). 

In [None]:
duckdb.query("""
    SELECT *, DATE_DIFF('minute', tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_time
    FROM jan_2011
    WHERE trip_distance = 0
""").df()

What are the `trip_time` of those trips whose `trip_distance` is 0?

- Notice many for them have `trip_time` of 0, 1, or negative values. Let's filter those rows out.

In [None]:
duckdb.query("""
    SELECT trip_time, COUNT(*) AS cnt
    FROM
        (SELECT *, DATE_DIFF('minute', tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_time
        FROM jan_2011
        WHERE trip_distance = 0) AS t1
    GROUP BY trip_time
""").df()

In [None]:
jan_2011 = duckdb.query("""
    SELECT *
    FROM jan_2011
    WHERE trip_distance != 0
""").df()
jan_2011.head()

Rides with `trip_time` less than 0, 0, or 1 does not make any sense. Let's filter those rides out too.

In [None]:
duckdb.query("""
    SELECT *
    FROM jan_2011
    WHERE trip_time <= 0
""").df().head()

In [None]:
jan_2011 = duckdb.query("""
    SELECT *
    FROM jan_2011
    WHERE trip_time > 0
""").df()

jan_2011.head()

Few rows have really short `trip_time` but long `trip_distance`. Let's filter those rows out too.

1. Convert `trip_time` to hour. [(Do the conversion first)](https://stackoverflow.com/questions/34504497/division-not-giving-my-answer-in-postgresql).
2. Calculate the `avg_mph` column (the average speed of each ride).
3. Filter out rows with `avg_mph` greater than 100.

In [None]:
duckdb.query("""
    SELECT *
    FROM
        (SELECT
            *,
            ROUND(CAST(trip_time AS decimal) / 60, 2) AS trip_time_hour,
            trip_distance / trip_time_hour AS avg_mph
        FROM jan_2011) AS t1
    WHERE avg_mph >= 100
    ORDER BY avg_mph 
""").df()

Let's also filter out rides with extremely slow speed (less than 1).

In [None]:
duckdb.query("""
    SELECT *
    FROM
        (SELECT
            *,
            ROUND(CAST(trip_time AS decimal) / 60, 2) AS trip_time_hour,
            trip_distance / trip_time_hour AS avg_mph
        FROM jan_2011) AS t1
    WHERE avg_mph < 1
    ORDER BY avg_mph 
""").df().head()

In [None]:
jan_2011 = duckdb.query("""
    SELECT *
    FROM
        (SELECT
            *,
            ROUND(CAST(trip_time AS decimal) / 60, 2) AS trip_time_hour,
            trip_distance / trip_time_hour AS avg_mph
        FROM jan_2011) AS t1
    WHERE avg_mph > 1 AND avg_mph < 100
    ORDER BY tpep_pickup_datetime
""").df()

jan_2011.head()

### Create new columns
    1. split pick up datetime into year, month, day, and hour

In [None]:
jan_2011_v2 = duckdb.query("""
    SELECT *, DATE_TRUNC('hour', tpep_pickup_datetime) AS PUDate
    FROM jan_2011
""").df()

In [None]:
# change data type into more efficient ones for newly created columns
jan_2011_v2[["trip_time"]] = jan_2011_v2[["trip_time"]].apply(pd.to_numeric, downcast='float')

In [None]:
jan_2011_v2.head()

In [None]:
# drop unused columns
jan_2011_v3 = duckdb.query("""
    SELECT PUDate, PULocationID, total_amount, tip_amount, trip_time, trip_distance
    FROM jan_2011_v2
""").df()

In [None]:
jan_2011_v3.head()

In [None]:
jan_2011_v3.memory_usage()

## Trend

For NYC TLC (New York City Taxi and Limousine Commission), it would be important to know the overall traffic trend of yellow taxi by several time periods: hourly, daily, monhtly, and yearly.
Let's analyze the overall traffic, total fare amount, and tip amount for each time period using previously created columns: `pickup_year`, `pickup_month`, `pickup_wday`, `pickup_hour`






In [None]:
jan_2011_v4 = duckdb.query("""
    SELECT
        PUDate, PULocationID,
        COUNT(*) AS total_rides,
        AVG(total_amount) AS avg_total_fare,
        AVG(tip_amount) AS avg_tip,
        AVG(trip_time) AS avg_trip_time,
        AVG(trip_distance) AS avg_trip_distance
    FROM jan_2011_v3
    GROUP BY PUDate, PULocationID
    ORDER BY PULocationID ASC, PUDate ASC
""").df()

jan_2011_v4.head()

## Spatial Data for Visualization

Certain columns are numerically coded including `PULocationID` and `pickup_wday`. Let's use another dataframe that contain geospatial data
and Tableau to convert those values properly.

In [46]:
# https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
location_shape = gpd.read_file("./taxi_zones/taxi_zones.shp")
location_shape.head()

Unnamed: 0,OBJECTID,Shape_Leng,Shape_Area,zone,LocationID,borough,geometry
0,1,0.116357,0.000782,Newark Airport,1,EWR,"POLYGON ((933100.918 192536.086, 933091.011 19..."
1,2,0.43347,0.004866,Jamaica Bay,2,Queens,"MULTIPOLYGON (((1033269.244 172126.008, 103343..."
2,3,0.084341,0.000314,Allerton/Pelham Gardens,3,Bronx,"POLYGON ((1026308.770 256767.698, 1026495.593 ..."
3,4,0.043567,0.000112,Alphabet City,4,Manhattan,"POLYGON ((992073.467 203714.076, 992068.667 20..."
4,5,0.092146,0.000498,Arden Heights,5,Staten Island,"POLYGON ((935843.310 144283.336, 936046.565 14..."


---

## Iterating Processing Procedure for All Data from 2011 to 2022

### Read Data

In [8]:
def read_data(link):
    
    columns_to_use = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "total_amount", "tip_amount", "trip_distance"]
    
    df = pd.read_parquet(link, columns = columns_to_use)
    
    df[["PULocationID", "total_amount", "tip_amount", "trip_distance"]] = df[["PULocationID", "total_amount", "tip_amount", "trip_distance"]].apply(pd.to_numeric, downcast='float')
    
    return df

### Clean Data

In [9]:
def clean_data(df):
    # New column : trip_time
    # Filter out rows with `trip_distance` = 0
    # Filter out rows with `trip_time` <= 0
    df1 = duckdb.query("""
        SELECT *, DATE_DIFF('minute', tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_time
        FROM df
        WHERE trip_distance != 0 AND trip_time > 0
    """).df()
    
    # Filter out rides with average mph > 100 or mph < 1
    df2 = duckdb.query("""
        SELECT *
        FROM
            (SELECT
                *,
                ROUND(CAST(trip_time AS decimal) / 60, 2) AS trip_time_hour,
                trip_distance / trip_time_hour AS avg_mph
            FROM df1) AS t1
        WHERE avg_mph > 1 AND avg_mph < 100
        ORDER BY tpep_pickup_datetime
    """).df()
    
    # New Columns : Convert datetime to date for group analysis
    df3 = duckdb.query("""
        SELECT *, date_trunc('hour', tpep_pickup_datetime) as PUDate
        FROM df2
    """).df()

    df3[["trip_time"]] = df3[["trip_time"]].apply(pd.to_numeric, downcast='float')
    
    # Drop unused columns
    df4 = duckdb.query("""
        SELECT PUDate, PULocationID, total_amount, tip_amount, trip_time, trip_distance
        FROM df3
    """).df()
    
    return df4

## Trend

- Analysis Grouped by `LocationID` and `PUDate` (pickup date)

In [10]:
def trend(df):
    df1 = duckdb.query("""
        SELECT
            PUDate, PULocationID,
            COUNT(*) AS total_rides,
            AVG(total_amount) AS avg_total_fare,
            AVG(tip_amount) AS avg_tip,
            AVG(trip_time) AS avg_trip_time,
            AVG(trip_distance) AS avg_trip_distance
        FROM df
        GROUP BY PUDate, PULocationID
        ORDER BY PULocationID ASC, PUDate ASC
    """).df()
    
    return df1

In [17]:
# first few rows of the links of the data
taxi_links_11_22[:5]

['https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-02.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-03.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-04.parquet',
 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-05.parquet']

In [16]:
# specifying required columns for the data analysis
data_list = list()
total_row = 0
for link in reversed(taxi_links_11_22):
    cur_year = link.split("_")[-1].split("-")[0]
    cur_month = link.split("-")[-1].split(".")[0]
    
    # read raw data with columns specified
    data = read_data(link)
    
    # clean data
    cleaned_df = clean_data(data)
    
    # trend analysis per location_id
    trend_df = trend(cleaned_df)
    
    data_list.append(trend_df)
    
    total_row += trend_df.shape[0]
    
    print(f">> {cur_year}-{cur_month} Data Processed, Total Row: ", total_row)

>> 2011-12 Data Processed, Total Row:  104695
>> 2011-11 Data Processed, Total Row:  206923
>> 2011-10 Data Processed, Total Row:  315818
>> 2011-09 Data Processed, Total Row:  427225
>> 2011-08 Data Processed, Total Row:  534977
>> 2011-07 Data Processed, Total Row:  644151
>> 2011-06 Data Processed, Total Row:  750436
>> 2011-05 Data Processed, Total Row:  859713
>> 2011-04 Data Processed, Total Row:  962151
>> 2011-03 Data Processed, Total Row:  1069789
>> 2011-02 Data Processed, Total Row:  1166291
>> 2011-01 Data Processed, Total Row:  1268030
>> 2012-12 Data Processed, Total Row:  1365748
>> 2012-11 Data Processed, Total Row:  1460889
>> 2012-10 Data Processed, Total Row:  1558664
>> 2012-09 Data Processed, Total Row:  1659004
>> 2012-08 Data Processed, Total Row:  1764154
>> 2012-07 Data Processed, Total Row:  1868176
>> 2012-06 Data Processed, Total Row:  1970560
>> 2012-05 Data Processed, Total Row:  2073919
>> 2012-04 Data Processed, Total Row:  2172068
>> 2012-03 Data Proces

In [47]:
# temporarily saving the current dataframe
if not os.path.exists(os.path.join(".", "data")):
    os.mkdir(os.path.join(".", "data"))
temp_taxi_df = pd.concat(data_list, ignore_index=True)
temp_taxi_df.to_csv(os.path.join(".", "data", "temp_taxi.csv"))

Notice there are few rows with a suspicious pick up date. Let's filter out those rows.

In [28]:
duckdb.query("""
    SELECT *
    FROM  temp_taxi_df
    WHERE YEAR(PUDate) < 2011 OR YEAR(PUDate) > 2022
""").df()

Unnamed: 0,PUDate,PULocationID,total_rides,avg_total_fare,avg_tip,avg_trip_time,avg_trip_distance
0,2008-12-31 23:00:00,13.0,1,9.80,0.00,11.0,1.33
1,2009-01-01 01:00:00,41.0,1,54.36,8.80,28.0,13.44
2,2009-01-01 23:00:00,50.0,1,10.80,0.00,18.0,1.97
3,2008-12-31 11:00:00,70.0,1,49.27,8.21,40.0,9.53
4,2009-01-01 01:00:00,79.0,1,21.80,0.00,31.0,3.89
...,...,...,...,...,...,...,...
1608,2008-12-31 23:00:00,41.0,1,5.80,0.00,3.0,0.72
1609,2009-01-01 00:00:00,137.0,1,9.30,0.00,5.0,1.02
1610,2009-01-01 09:00:00,140.0,1,8.80,0.00,9.0,0.64
1611,2008-12-31 23:00:00,161.0,1,10.80,0.00,7.0,1.55


In [31]:
temp_taxi_v2 = duckdb.query("""
    SELECT *
    FROM temp_taxi_df
    WHERE YEAR(PUDate) >= 2011 AND YEAR(PUDate) <= 2022
""").df()

# type conversion for Tableau (using PULocationID as a key to join the spatial)
# turns out PULocationID can't be used as a key to joinning if it's float data type
temp_taxi_v2[["PULocationID"]] = temp_taxi_v2[["PULocationID"]].astype(int)
temp_taxi_v2.to_csv(os.path.join(".", "data", "temp_taxi_v2.csv"), index=False)

## Add 0 to Pickup Datetime & PULocation Without Any Ride

I exported grouped data which summarizes it beforehand. This is to save storage space. Thus, I need to manually add 0 values to rows with pickup datetime
that has no rides at all for each `PULocationID` for more precise hourly analysis.

Notice the last date data being recorded is Sunday of October, 2022. Let's trim the `final_df` accordingly.

In [38]:
duckdb.query("""
    SELECT *
    FROM temp_taxi_v2
    ORDER BY PUDate
""").df().tail(5)

Unnamed: 0,PUDate,PULocationID,total_rides,avg_total_fare,avg_tip,avg_trip_time,avg_trip_distance
12687855,2022-10-01,229,1,11.8,0.0,10.0,1.55
12687856,2022-10-01,234,1,15.8,2.0,14.0,1.62
12687857,2022-10-01,237,1,8.76,1.46,2.0,0.54
12687858,2022-10-01,263,1,9.3,0.0,5.0,1.16
12687859,2022-10-01,4,1,25.8,0.0,2.0,0.27


In [43]:
date_template = duckdb.query("""
    SELECT t1.generate_series AS gen_PUDate, t2.generate_series AS PULocationID
        FROM generate_series(
               (date '2011-01-01')::timestamp,
               (date '2022-10-01')::timestamp,
               interval '1 hour'
               ) AS t1
        CROSS JOIN generate_series(1,265,1) AS t2
""").df()

taxi_df = duckdb.query("""
    SELECT gen_PUDate AS PUDate, t1.PULocationID, total_rides, avg_total_fare, avg_tip, avg_trip_time, avg_trip_distance
    FROM
        (SELECT *
        FROM date_template AS t1
        LEFT JOIN temp_taxi_v2 AS t2
        ON t1.gen_PUDate = t2.PUDate AND
           t1.PULocationID = t2.PULocationID) AS t1
    ORDER BY gen_PUDate, t1.PULocationID
""").df().fillna(0)

In [44]:
taxi_df.head(5)

Unnamed: 0,PUDate,PULocationID,total_rides,avg_total_fare,avg_tip,avg_trip_time,avg_trip_distance
0,2011-01-01,1,0.0,0.0,0.0,0.0,0.0
1,2011-01-01,2,0.0,0.0,0.0,0.0,0.0
2,2011-01-01,3,0.0,0.0,0.0,0.0,0.0
3,2011-01-01,4,78.0,11.189231,0.559744,15.038462,2.919231
4,2011-01-01,5,0.0,0.0,0.0,0.0,0.0


In [45]:
# RUN ONLY ONCE : Writing a huge data set (1.8GB).
# taxi_df.to_csv(os.path.join(".", "data", "taxi_df.csv"), index=False)

---

# More Efficiently Load Data - Using Dask / S3

For the future practice with big datasets, `dask` might be useful.

- [How to connect to s3 bucket using `boto3`](https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3)
- [How to set up the credentials for `boto3`](https://www.youtube.com/watch?v=tW3HoYRnABs&ab_channel=KGPTalkie)

In [None]:
import dask.dataframe as dd
import boto3

In [None]:
s3 = boto3.resource("s3")
bucket = s3.Bucket("nyc-tlc")

In [None]:
for obj in bucket.objects.all():
    print(obj)
    break

In [None]:
s3_links = list()
for link in taxi_links_11_22:
    s3_links.append("s3://nyc-tlc/trip data/" + link.split("/")[-1])
s3_links[:5]

In [None]:
columns_to_use

In [None]:
# temp data
temp_link = s3_links[0]
temp_link

In [None]:
# filter out the rides with trip distance 0 when loading => filtering within partition is possible when using pyarrow engine
# https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html
filters=[('trip_distance', '>', 1)]

# df = dd.read_parquet(s3_links,
#                        columns = columns_to_use,
#                        filters = filters,
#                        engine="pyarrow")

# TEMP
df = dd.read_parquet(temp_link,
                  columns = columns_to_use,
                  filters = filters,
                  engine="pyarrow")

In [None]:
# filter out the rows with trip time <= 0
df["trip_time"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60.0
df = df[df["trip_time"] > 0]

In [None]:
# Filter out rides that would require the speed of more than 100 mph.
df["avg_mph"] = df["trip_distance"] / (df["trip_time"] / 60.0)
df = df[df["avg_mph"] < 100]

In [None]:
# split `tpep_pickup_datetime` into year, month, and hour
df["pickup_year"] = df["tpep_pickup_datetime"].dt.year
df["pickup_month"] = df["tpep_pickup_datetime"].dt.month
df["pickup_wday"] = df["tpep_pickup_datetime"].dt.weekday
df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour

In [None]:
trend_df = df.groupby(["PULocationID", "pickup_year", "pickup_month", "pickup_wday", "pickup_hour"]).agg({"total_amount":"mean", "tip_amount":"mean", "trip_time":"mean"})
trend_df["count"] = df.groupby(["PULocationID", "pickup_year", "pickup_month", "pickup_wday", "pickup_hour"]).size()

---