# Citi Bike Data Processing

This notebook provides a comprehensive workflow for processing and analyzing Citi Bike trip data. Using DuckDB for efficient data aggregation, the notebook generates a summary table that displays the number of trips per day throughout the year. The workflow includes data filtering and summarization. 

In [1]:
import duckdb
import pandas as pd
import os
import zipfile
import time
import tempfile
import shutil
import glob
import re
from io import BytesIO

In [69]:
root_dir = "data/citi_bike/"

In [70]:
def find_all_csvs(root_dir):
    # Recursively find all .csv files, excluding any .zip files
    all_csvs = glob.glob(os.path.join(root_dir, '**', '*.csv'), recursive=True)
    all_csvs = [f for f in all_csvs if not f.endswith('.zip')]
    return all_csvs

csv_files = find_all_csvs(root_dir)
print(f"Found {len(csv_files)} CSV files.")

Found 321 CSV files.


In [None]:
# sort descending
csv_files.sort(reverse=True)
csv_files

['data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_4.csv',
 'data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_3.csv',
 'data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_2.csv',
 'data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_1.csv',
 'data/citi_bike/202503-citibike-tripdata.csv',
 'data/citi_bike/202502-citibike-tripdata/202502-citibike-tripdata_3.csv',
 'data/citi_bike/202502-citibike-tripdata/202502-citibike-tripdata_2.csv',
 'data/citi_bike/202502-citibike-tripdata/202502-citibike-tripdata_1.csv',
 'data/citi_bike/202501-citibike-tripdata/202501-citibike-tripdata_3.csv',
 'data/citi_bike/202501-citibike-tripdata/202501-citibike-tripdata_2.csv',
 'data/citi_bike/202501-citibike-tripdata/202501-citibike-tripdata_1.csv',
 'data/citi_bike/202412-citibike-tripdata/202412-citibike-tripdata_3.csv',
 'data/citi_bike/202412-citibike-tripdata/202412-citibike-tripdata_2.csv',
 'data/citi_bike/202412-citibike-tripdata/202412-cit

In [4]:
# def collect_csv_paths(folder_path):
#     csv_paths = []

#     def process_zip(zip_file_obj, zip_path=""):
#         for info in zip_file_obj.infolist():
#             if info.is_dir():
#                 continue

#             filename = info.filename.lower()

#             if filename.endswith(".csv"):
#                 csv_paths.append(f"{zip_path}:{info.filename}")

#             elif filename.endswith(".zip"):
#                 # Read inner ZIP into memory
#                 with zip_file_obj.open(info.filename) as inner_file:
#                     inner_bytes = BytesIO(inner_file.read())
#                     try:
#                         with zipfile.ZipFile(inner_bytes) as inner_zip:
#                             # Recursive call for nested zip
#                             process_zip(inner_zip, f"{zip_path}:{info.filename}")
#                     except zipfile.BadZipFile:
#                         print(f"Skipping corrupt zip: {zip_path}:{info.filename}")

#     for root, _, files in os.walk(folder_path):
#         for fname in files:
#             full_path = os.path.join(root, fname)
#             if fname.lower().endswith(".csv"):
#                 csv_paths.append(full_path)
#             elif fname.lower().endswith(".zip"):
#                 with zipfile.ZipFile(full_path, 'r') as outer_zip:
#                     process_zip(outer_zip, full_path)

#     return csv_paths


In [85]:
def is_file_2020_or_later(file_path):
    match = re.search(r'(\d{6})-citibike-tripdata', file_path)
    if match:
        year = int(match.group(1)[:4])
        return year >= 2020
    return False  

def is_file_april_2017_or_later(file_path):
    match = re.search(r'(\d{6})-citibike-tripdata', file_path)
    if match:
        yyyymm = int(match.group(1))
        return yyyymm >= 201704
    return False  

def is_file_october_2016_or_later(file_path):
    match = re.search(r'(\d{6})-citibike-tripdata', file_path)
    if match:
        yyyymm = int(match.group(1))
        return yyyymm >= 201610
    return False 

def is_august_2014_or_earlier(file_path):
    match = re.search(r'(\d{6})-citibike-tripdata', file_path)
    if match:
        yyyymm = int(match.group(1))
        return yyyymm <= 201408
    return False

In [80]:
csv_files_hm_format = ["data/citi_bike/2015-citibike-tripdata/1_January/201501-citibike-tripdata_1.csv",
                       "data/citi_bike/2015-citibike-tripdata/2_February/201502-citibike-tripdata_1.csv", 
                       "data/citi_bike/2015-citibike-tripdata/3_March/201503-citibike-tripdata_1.csv", 
                       "data/citi_bike/2015-citibike-tripdata/6_June/201506-citibike-tripdata_1.csv"]

In [87]:
def create_trips_table(folder_path: str, df=None) -> pd.DataFrame:
    """
    Function to read all CSV files from the given folder, create the trips table in DuckDB,
    summarize trips by day, and export the result to a single CSV output file named after the folder.
    """

    start_time = time.time() 

    # Initialize connection to DuckDB
    con = duckdb.connect()

    con.execute("PRAGMA max_temp_directory_size='50GB';")

    #csv_files = glob.glob(os.path.join(folder_path, "*.csv"))
    csv_files = folder_path

    # Create the 'trips' table based on the first CSV file schema
    first_file = csv_files[0] 

    try:
        con.execute(f"""
            CREATE TABLE IF NOT EXISTS trips AS 
            SELECT * 
            FROM read_csv_auto('{first_file}', all_varchar=1)
            LIMIT 0  -- This ensures the table schema is created without data insertion
        """)

        for file in csv_files:

            print(f"Processing file: {file}")

            if file in csv_files_hm_format:
                con.execute(f"""
                    INSERT INTO trips
                    SELECT 
                        NULL AS ride_id,
                        NULL AS rideable_type,
                        STRPTIME(starttime, '%m/%d/%Y %H:%M') AS started_at,
                        STRPTIME(stoptime, '%m/%d/%Y %H:%M') AS ended_at,
                        "start station name" AS start_station_name,
                        "start station id" AS start_station_id,
                        "end station name" AS end_station_name,
                        "end station id" AS end_station_id,
                        "start station latitude" AS start_lat,
                        "start station longitude" AS start_lng,
                        "end station latitude" AS end_lat,
                        "end station longitude" AS end_lng,
                        usertype AS member_casual
                    FROM read_csv_auto('{file}', all_varchar=1)
                    WHERE "start station id" NOT LIKE 'SYS%'
                """)

            elif is_file_2020_or_later(file):
                    
                    con.execute(f"""
                        INSERT INTO trips
                        SELECT 
                            ride_id,
                            rideable_type,
                            started_at,
                            ended_at,
                            start_station_name,
                            start_station_id,
                            end_station_name,
                            end_station_id,
                            start_lat,
                            start_lng,
                            end_lat,
                            end_lng,
                            member_casual
                        FROM read_csv_auto('{file}', all_varchar=1)
                        WHERE start_station_id NOT LIKE 'SYS%'
                    """)
            
            elif is_file_april_2017_or_later(file):

                con.execute(f"""
                        INSERT INTO trips
                        SELECT 
                            NULL AS ride_id,
                            NULL AS rideable_type,
                            try_cast(starttime AS DATE) AS started_at,
                            try_cast(stoptime AS DATE) AS ended_at,
                            "start station name" AS start_station_name,
                            "start station id" AS start_station_id,
                            "end station name" AS end_station_name,
                            "end station id" AS end_station_id,
                            "start station latitude" AS start_lat,
                            "start station longitude" AS start_lng,
                            "end station latitude" AS end_lat,
                            "end station longitude" AS end_lng,
                            usertype AS member_casual
                        FROM read_csv_auto('{file}', all_varchar=1, quote='"')
                        WHERE "start station id" NOT LIKE 'SYS%'
                    """)
                
            elif is_file_october_2016_or_later(file):
                    con.execute(f"""
                    INSERT INTO trips
                    SELECT 
                        NULL AS ride_id,
                        NULL AS rideable_type,
                        try_cast("Start Time" AS DATE) AS started_at,
                        try_cast("Stop Time" AS DATE) AS ended_at,
                        "Start Station Name" AS start_station_name,
                        "Start Station ID" AS start_station_id,
                        "End Station Name" AS end_station_name,
                        "End Station ID" AS end_station_id,
                        "Start Station Latitude" AS start_lat,
                        "Start Station Longitude" AS start_lng,
                        "End Station Latitude" AS end_lat,
                        "End Station Longitude" AS end_lng,
                        "User Type" AS member_casual
                    FROM read_csv_auto('{file}', all_varchar=1)
                    WHERE "Start Station ID" NOT LIKE 'SYS%'
                """)
            
            elif is_august_2014_or_earlier(file):
                    con.execute(f"""
                        INSERT INTO trips
                        SELECT 
                            NULL AS ride_id,
                            NULL AS rideable_type,
                            STRPTIME(starttime, '%Y-%m-%d %H:%M:%S') AS started_at,
                            STRPTIME(stoptime, '%Y-%m-%d %H:%M:%S') AS ended_at,
                            "start station name" AS start_station_name,
                            "start station id" AS start_station_id,
                            "end station name" AS end_station_name,
                            "end station id" AS end_station_id,
                            "start station latitude" AS start_lat,
                            "start station longitude" AS start_lng,
                            "end station latitude" AS end_lat,
                            "end station longitude" AS end_lng,
                            usertype AS member_casual
                        FROM read_csv_auto('{file}', all_varchar=1)
                        WHERE "start station id" NOT LIKE 'SYS%'
                    """)
            else:
                    con.execute(f"""
                    INSERT INTO trips
                    SELECT 
                        NULL AS ride_id,
                        NULL AS rideable_type,
                        STRPTIME(starttime, '%m/%d/%Y %H:%M:%S') AS started_at,
                        STRPTIME(stoptime, '%m/%d/%Y %H:%M:%S') AS ended_at,
                        "start station name" AS start_station_name,
                        "start station id" AS start_station_id,
                        "end station name" AS end_station_name,
                        "end station id" AS end_station_id,
                        "start station latitude" AS start_lat,
                        "start station longitude" AS start_lng,
                        "end station latitude" AS end_lat,
                        "end station longitude" AS end_lng,
                        usertype AS member_casual
                    FROM read_csv_auto('{file}', all_varchar=1)
                    WHERE "start station id" NOT LIKE 'SYS%'
                """)
                    
                    
        #df_trips = con.execute("SELECT * FROM trips").fetchdf()
        #return df_trips
           

        con.execute("VACUUM;")

        # Summarize by day
        summary_df = con.execute("""
            SELECT 
                CAST(started_at AS DATE) AS day,
                COUNT(*) AS number_of_trips
            FROM trips
            GROUP BY day
            ORDER BY day
        """).fetchdf()


        # folder_name = os.path.basename(folder_path)
        # output_file_name = f"{folder_name[:6]}.csv"

        # con.execute(f"""
        #     COPY trips_by_day TO '{output_file_name}' (HEADER, DELIMITER ',');
        # """)

        if df is not None:
            combined_df = pd.concat([df, summary_df], ignore_index=True)
        else:
            combined_df = summary_df
    
    finally:
        con.execute("VACUUM;")

        # Close the connection
        con.close()

    end_time = time.time()  
    duration = end_time - start_time

    print(f"Process completed in {duration:.2f} seconds")

    return combined_df

In [88]:
all_data = None

all_data = create_trips_table(csv_files, all_data)

Processing file: data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_4.csv
Processing file: data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_3.csv
Processing file: data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_2.csv
Processing file: data/citi_bike/202504-citibike-tripdata/202504-citibike-tripdata_1.csv
Processing file: data/citi_bike/202503-citibike-tripdata.csv
Processing file: data/citi_bike/202502-citibike-tripdata/202502-citibike-tripdata_3.csv
Processing file: data/citi_bike/202502-citibike-tripdata/202502-citibike-tripdata_2.csv
Processing file: data/citi_bike/202502-citibike-tripdata/202502-citibike-tripdata_1.csv
Processing file: data/citi_bike/202501-citibike-tripdata/202501-citibike-tripdata_3.csv
Processing file: data/citi_bike/202501-citibike-tripdata/202501-citibike-tripdata_2.csv
Processing file: data/citi_bike/202501-citibike-tripdata/202501-citibike-tripdata_1.csv
Processing file: data/citi_bike/202412-citibike-tripdata/20

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processing file: data/citi_bike/2021-citibike-tripdata/202109-citibike-tripdata/202109-citibike-tripdata_1.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202108-citibike-tripdata/202108-citibike-tripdata_4.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202108-citibike-tripdata/202108-citibike-tripdata_3.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202108-citibike-tripdata/202108-citibike-tripdata_2.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202108-citibike-tripdata/202108-citibike-tripdata_1.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202107-citibike-tripdata/202107-citibike-tripdata_4.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202107-citibike-tripdata/202107-citibike-tripdata_3.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202107-citibike-tripdata/202107-citibike-tripdata_2.csv
Processing file: data/citi_bike/2021-citibike-tripdata/202107-citibike-tripdata/202107-citibike-tripdata_1.csv
P

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Processing file: data/citi_bike/2014-citibike-tripdata/3_March/201403-citibike-tripdata_1.csv
Processing file: data/citi_bike/2014-citibike-tripdata/2_February/201402-citibike-tripdata_1.csv
Processing file: data/citi_bike/2014-citibike-tripdata/1_January/201401-citibike-tripdata_1.csv
Processing file: data/citi_bike/2014-citibike-tripdata/12_December/201412-citibike-tripdata_1.csv
Processing file: data/citi_bike/2014-citibike-tripdata/11_November/201411-citibike-tripdata_1.csv
Processing file: data/citi_bike/2014-citibike-tripdata/10_October/201410-citibike-tripdata_1.csv


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Process completed in 288.70 seconds


In [90]:
all_data.head(10)

Unnamed: 0,day,number_of_trips
0,2014-01-01,6059
1,2014-01-02,8600
2,2014-01-03,1144
3,2014-01-04,2292
4,2014-01-05,2678
5,2014-01-06,9510
6,2014-01-07,6267
7,2014-01-08,9246
8,2014-01-09,13354
9,2014-01-10,9847


In [91]:
all_data.tail(10)

Unnamed: 0,day,number_of_trips
4119,2025-04-21,116298
4120,2025-04-22,158681
4121,2025-04-23,164428
4122,2025-04-24,171454
4123,2025-04-25,168961
4124,2025-04-26,129606
4125,2025-04-27,121701
4126,2025-04-28,152958
4127,2025-04-29,169718
4128,2025-04-30,174174


In [None]:
# Ensure 'day' column is datetime
all_data['day'] = pd.to_datetime(all_data['day'], errors='coerce')

# Generate the full expected date range
start_date = all_data['day'].min()
end_date = all_data['day'].max()
full_range = pd.date_range(start=start_date, end=end_date, freq='D')

In [93]:
start_date, end_date

(Timestamp('2014-01-01 00:00:00'), Timestamp('2025-04-30 00:00:00'))

In [94]:
# Find missing dates
missing_dates = full_range.difference(all_data['day'].dropna().unique())

print(f"Missing {len(missing_dates)} dates:")
print(missing_dates)

Missing 9 dates:
DatetimeIndex(['2016-01-23', '2016-01-24', '2016-01-25', '2016-01-26',
               '2017-02-09', '2017-03-14', '2017-03-15', '2017-03-16',
               '2021-02-02'],
              dtype='datetime64[ns]', freq=None)


In [96]:
all_data.shape

(4129, 2)

In [101]:
duplicates = all_data[all_data.duplicated()]
duplicates

Unnamed: 0,day,number_of_trips


In [102]:
duplicates_subset = all_data[all_data.duplicated(subset=['day'])]
duplicates_subset

Unnamed: 0,day,number_of_trips


In [None]:
# all_data['day'] = pd.to_datetime(all_data['day'])

# start_date = pd.to_datetime("2023-01-01")
# end_date = pd.to_datetime("2023-12-31")

# filtered_data = all_data[(all_data['day'] >= start_date) & (all_data['day'] <= end_date)]

# filtered_data = filtered_data.reset_index(drop=True)

In [105]:
all_data.to_csv("data/citibike.csv", index=False)

In [2]:
df = pd.read_csv("data/citibike.csv", parse_dates=['day'])

In [3]:
df['number_of_trips'].sum()

272212231