# Citibike Data Pull

I've seperated the data pull from the analyses as it takes around a 
minute per month of data, and I've written a lot of 

## Notebook Setup & Data Pull

First we need to get installations and imports out of the way, as well as load the data.

In [1]:
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from io import BytesIO
import os
import pandas as pd
import requests
import time
from zipfile import ZipFile

## Data Retrieval, Formatting, and Memory Reduction
The data is available in S3 here: https://s3.amazonaws.com/tripdata/index.html

Each file in the S3 bucket is 1 month of data with the filename formatted like YYYYMM-citibike-tripdata.csv.zip.

While Pandas can sometimes handle reading zipped CSV files directly, we get a bunch of unicode errors if we attempt it here. As such, we'll explicitly unzip the files then read them into dataframes. 

Critically, each month of data is hundreds of MB, if not well over a GB. To pull just a year of data will start burning through memory rather quickly. I want this to work out of the box on most computers so we'll put particular emphasis on reducing memory consumption in this section. 

Below this markdown cell there's a global setting that skips the data retrieval. If data retrieval has been run before, the code will skip straight to reading the files from disk.

Firstly, we'll write a function that retrieves a single file.

In [2]:
FETCH_RAW_DATA = True

### Data Download 
All functions for downloading data from S3, reducing memory, downloading each month of data, and writing data to disk. 

In [5]:
def download_tripdata_file(yyyymm:str, sample_only=False):
    """
    Downloads a single CSV file from https://s3.amazonaws.com/tripdata.
    
    Args:
        yyyymm: Year and month for the target file.
        sample_only: If set to True, this will return only five rows of data, the aim being to retrieve the file structure
          while not committing the data in its entirity to a dataframe object. Note that the full raw file itself will be
          downloaded within the scope of this function, but discarded on function exit. 
        
    Returns:
        Pandas dataframe containing Citi Bike trip data.
    """
    # URL of the file
    csvzip_url = f'https://s3.amazonaws.com/tripdata/{yyyymm}-citibike-tripdata.csv.zip'
    zip_url = f'https://s3.amazonaws.com/tripdata/{yyyymm}-citibike-tripdata.zip'

    # Send a GET request to the URL
    # If .csv.zip doesn't work, remove the .csv and try again
    response = requests.get(csvzip_url)
    if not response.ok:
        response = requests.get(zip_url)

    # Ensure the response content is a zip file
    if response.ok:
        # Read the content of the response as a zip file
        with BytesIO(response.content) as f:
            with ZipFile(f) as zipfile:
                # Extract the names of files in the zip file
                csv_files = [name for name in zipfile.namelist() if name.endswith('.csv')]
                if csv_files:
                    # Read the first CSV file into a Pandas DataFrame
                    if sample_only is False:
                        csv_datasets = []
                        for csv_file in csv_files:
                            df = pd.read_csv(
                                zipfile.open(csv_file),
                                low_memory=False
                            )
                            csv_datasets.append(df)
                        df = pd.concat(csv_datasets)
                        
                    elif sample_only is True:
                        df = pd.read_csv(
                            zipfile.open(csv_files[0]),
                            low_memory=False,
                            nrows=5
                        )
                        
                    else:
                        raise ValueError('sample_only must be a boolean value')

                else:
                    print("No CSV files found in the zip archive.")
                    
                return df
            
    else:
        print("Failed to retrieve the file. Status code:", response.status_code)

In [6]:
# Get sample from the month before last so we can see it
month_before_last = (datetime.now() - timedelta(days=60)).strftime('%Y%m')
data_sample = download_tripdata_file(month_before_last, True)
data_sample

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,C943DA3BBC04DA57,classic_bike,2024-06-19 19:24:11.778,2024-06-19 19:35:37.667,E 81 St & York Ave,7084.12,E 84 St & Park Ave,7243.04,40.772838,-73.949892,40.778627,-73.957721,member
1,35F6F3D1D27DEF6D,electric_bike,2024-06-20 17:01:54.322,2024-06-20 17:14:34.198,E 81 St & York Ave,7084.12,Central Park West & W 72 St,7141.07,40.77294,-73.949694,40.775794,-73.976206,member
2,9B8071426046B632,classic_bike,2024-06-25 15:39:31.800,2024-06-25 15:44:36.811,Broadway & W 58 St,6948.1,Central Park West & W 72 St,7141.07,40.766953,-73.981693,40.775794,-73.976206,member
3,995E66CC32088A47,electric_bike,2024-06-26 15:29:28.377,2024-06-26 15:34:19.452,Broadway & W 58 St,6948.1,Central Park West & W 72 St,7141.07,40.766713,-73.9819,40.775794,-73.976206,member
4,1D16A382B788B03E,electric_bike,2024-06-21 19:42:04.451,2024-06-21 19:51:11.660,Banker St & Meserole Ave,5633.04,Metropolitan Ave & Bedford Ave,5308.04,40.726267,-73.956254,40.715348,-73.960241,casual


In [7]:
data_sample.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 13 columns):
 #   Column              Non-Null Count  Dtype  
---  ------              --------------  -----  
 0   ride_id             5 non-null      object 
 1   rideable_type       5 non-null      object 
 2   started_at          5 non-null      object 
 3   ended_at            5 non-null      object 
 4   start_station_name  5 non-null      object 
 5   start_station_id    5 non-null      float64
 6   end_station_name    5 non-null      object 
 7   end_station_id      5 non-null      float64
 8   start_lat           5 non-null      float64
 9   start_lng           5 non-null      float64
 10  end_lat             5 non-null      float64
 11  end_lng             5 non-null      float64
 12  member_casual       5 non-null      object 
dtypes: float64(6), object(7)
memory usage: 648.0+ bytes


From the dataframe sample and dataframe info above, we see the layout of our file and the data types we're dealing with. 

So - memory consumption. Data types are going to be important here as apart from dropping columns outright, that's the primary way we'll be able get the data size down. 

What can we drop? There doesn't appear to be superfluous data, however the station names, ids, and locations are going to be duplicative across the data. When we retrieve each file, we can create a dataframe of unique identified stations, then leave only the IDs in the data, removing 2 object columns and 4 float64 columns in the process. We'll then have ride data and a seperate dataset of stations that we can merge to ad-hoc.   

We can do similar with rideable type and member_casual, identifying unique rideables and unique membership types.

Furthermore, we've got two columns with datetime strings. We can calculate the ride time in seconds using the difference of the two datetimes, then remove the end time, replacing a memory hogging object dtype column with an integer column. 

That's a few different things to do, so we'll break it into functions. 

In [8]:
def identify_unique_stations(df):
    """
    Identifies unique Citi Bike stations using start stations and end stations. Memory conserved by casting lat longs as
      float16.
    
    Args:
        df: Dataframe of ride data as pulled from S3. 
        
    Returns:
        Dataframe containing all uniquely identified stations IDs, names, and locations. 
    """    
    # Get unique start stations
    start_station_cols = ['start_station_id', 'start_station_name', 'start_lat', 'start_lng']
    unique_start_stations = df.loc[:, start_station_cols].drop_duplicates()
    unique_start_stations.columns = ['station_id', 'station_name', 'lat', 'lng']
    
    # Get unique end stations
    end_station_cols = ['end_station_id', 'end_station_name', 'end_lat', 'end_lng']
    unique_end_stations = df.loc[:, end_station_cols].drop_duplicates()
    unique_end_stations.columns = ['station_id', 'station_name', 'lat', 'lng']
    
    # Concatenate unique start and end stations then drop duplicates and reset index
    unique_stations = pd.concat(
        [unique_start_stations, unique_end_stations]
    ).drop_duplicates('station_id').reset_index(drop=True)
    
    # Now we'll drop the current ID and reset index again to create integer unique IDs for each station
    unique_stations = unique_stations.reset_index(names=['int_station_id'])   
    
    # ***Memory usage***
    # Cast lats and lons as float32 as we've got more decimal places than the 4 float16 would allow
    for col in ['lat', 'lng']:
        unique_stations[col] = unique_stations[col].astype('float32')
    # We have to use int16, not int8 for station_id as there are over 127 Citi Bike stations
    unique_stations['int_station_id'] = unique_stations['int_station_id'].astype('int16')
    
    return unique_stations

To demo what this looks like, here's the unique station dataframe from the data sample.

In [9]:
identify_unique_stations(data_sample)

Unnamed: 0,int_station_id,station_id,station_name,lat,lng
0,0,7084.12,E 81 St & York Ave,40.772839,-73.94989
1,1,6948.1,Broadway & W 58 St,40.766953,-73.981697
2,2,5633.04,Banker St & Meserole Ave,40.726269,-73.956253
3,3,7243.04,E 84 St & Park Ave,40.778625,-73.957718
4,4,7141.07,Central Park West & W 72 St,40.775795,-73.976204
5,5,5308.04,Metropolitan Ave & Bedford Ave,40.715347,-73.960243


Now onto unique rideable types. 

In [10]:
def identify_unique_rideables(rideables):
    """
    Identifies unique rideable types, e.g. classic_bike. 
    
    Args:
        rideables: Series containing the rideable_type data.
        
    Returns:
        Dataframe containing unique rideables with a newly generated identifier
    """    
    # Get unique rideables
    unique_rideables = pd.DataFrame(rideables.drop_duplicates(keep='first'))
    
    # Now we'll use reset index to add a unique identifier
    # Reset twice as we dropped a bunch of dupes so the index is currently not sequential
    # Resetting with drop then resetting without createts a sequential set of IDs
    unique_rideables = unique_rideables.reset_index(drop=True).reset_index(names=['rideable_id'])
    
    # ***Memory usage***
    # Cast lats and lons as float16
    unique_rideables['rideable_id'] = unique_rideables['rideable_id'].astype('int8')
    
    return unique_rideables

In [11]:
identify_unique_rideables(data_sample['rideable_type'])

Unnamed: 0,rideable_id,rideable_type
0,0,classic_bike
1,1,electric_bike


We can copy much the same code to do memberships as well.

In [12]:
def identify_unique_memberships(memberships):
    """
    Identifies unique membership types, e.g. member vs casual. 
    
    Args:
        memberships: Series containing the membership_type data.
        
    Returns:
        Dataframe containing unique memberships with a newly generated identifier
    """    
    # Get unique rideables
    unique_memberships = pd.DataFrame(memberships.drop_duplicates(keep='first'))
    
    # Now we'll use reset index to add a unique identifier
    # Reset twice as we dropped a bunch of dupes so the index is currently not sequential
    # Resetting with drop then resetting without createts a sequential set of IDs
    unique_memberships = unique_memberships.reset_index(drop=True).reset_index(names=['membership_id'])
    
    # Rename member_casual
    unique_memberships = unique_memberships.rename(columns={'member_casual': 'membership_type'})
    
    # ***Memory usage***
    # Cast lats and lons as float16
    unique_memberships['membership_id'] = unique_memberships['membership_id'].astype('int8')
    
    return unique_memberships

In [13]:
identify_unique_memberships(data_sample['member_casual'])

Unnamed: 0,membership_id,membership_type
0,0,member
1,1,casual


Now we'll address creating the ride time column from the started_at and ended_at columns. 

In [14]:
def create_trip_duration_col(df):
    """
    Calculates a series of trip durations in seconds, then downcasts data before returning. Memory conserved by casting 
      trip duration as int16. 
    
    Args:
        df: started_at and ended_at columns from dataframe. 
        
    Returns:
        Pandas series of trip durations.
    """
    trip_duration = (pd.to_datetime(df['ended_at']) - pd.to_datetime(df['started_at'])).dt.seconds
    
    # ***Memory Usage***
    # We choose int16 as it allows precision up to 32767, meaning in practicality, up to 8 hours of ride time
    # This is a reasonable ride time (accuracy) cutoff, as only up to 45 minutes are included in the "base price"
    trip_duration = trip_duration.astype('int16')
    
    return trip_duration

We've completed our data formatting and ID generation processes, so now we need to actually build out the dataset. We'll be pulling multiple months of data, applying the formatting functions to each month as it's pulled. 

Below are some helper functions to reduce redundancy and make our final data build functions cleaner.

In [15]:
def merge_on_columns(df, ref_df, left_on, right_on, new_col_name):
    """
    Helper function to merge df with ref_df on specified columns and rename the new column.

    Args:
        df: The main DataFrame.
        ref_df: The reference DataFrame to merge.
        left_on: The column name in the main DataFrame to merge on.
        right_on: The column name in the reference DataFrame to merge on.
        new_col_name: The new column name after merging.

    Returns:
        Updated DataFrame after merge and column renaming.
    """
    return (df.merge(ref_df[['station_id', 'int_station_id']], left_on=left_on, right_on=right_on, how='left')
              .drop([right_on, left_on], axis=1)
              .rename(columns={'int_station_id': new_col_name}))

In [16]:
def concatenate_dataframes(dfs, drop_col=None):
    """
    Concatenates a list of dataframes, drops duplicates, and resets the index.

    Args:
        dfs: List of dataframes to concatenate.
        drop_col: Specific column to drop duplicates by, if needed.

    Returns:
        Concatenated dataframe.
    """
    if drop_col is None:
        return pd.concat(dfs).drop_duplicates().reset_index(drop=True)
    else:
        return pd.concat(dfs).drop_duplicates(drop_col).reset_index(drop=True)

In [17]:
def calculate_and_report_memory_usage(initial_memory_values, dataframes):
    """
    Calculates and reports the memory usage of the data processing.

    Args:
        initial_memory_values: List of memory usage values for each month's raw data.
        dataframes: List of dataframes to include in the final memory usage calculation.

    Returns:
        None
    """
    # Calculate initial and ending memory usage in MB
    initial_memory_usage = sum(initial_memory_values) / 1024 / 1000
    total_ending_memory_usage = sum(df.memory_usage(index=True, deep=True).sum() for df in dataframes) / 1024 / 1000
    memory_saved = initial_memory_usage - total_ending_memory_usage

    # Print memory usage information
    print(f'Initial memory usage: {round(initial_memory_usage, 2)} MB')
    print(f'Ending memory usage: {round(total_ending_memory_usage, 2)} MB')
    print(f'Memory saved: {round(memory_saved, 2)} MB')

Now we'll create a function that downloads a month of data and applies all the formatting work to that month. 

In [18]:
def create_formatted_month_rides(yyyymm):
    """
    Downloads trip data file and conducts formatting. Formatting consists of creating a trip duration column.
      
    Args:
        yyyymm: Year and month of target file.
        
    Returns:
        Formatted dataframe of ride data, with multiple memory conservation steps applied. 
    """
    # Pull raw data
    month_data = download_tripdata_file(yyyymm)
    initial_memory_usage = month_data.memory_usage(index=True, deep=True).sum()

    # Get unique stations, rideables, and memberships
    unique_month_stations = identify_unique_stations(month_data)
    unique_rideables = identify_unique_rideables(month_data['rideable_type'])
    unique_memberships = identify_unique_memberships(month_data['member_casual'])

    # Merge and format data
    month_data = merge_on_columns(month_data, unique_month_stations, 'start_station_id', 'station_id', 'start_station_id')
    month_data = merge_on_columns(month_data, unique_month_stations, 'end_station_id', 'station_id', 'end_station_id')
    month_data = month_data.merge(unique_rideables, on='rideable_type', how='left')
    month_data = month_data.merge(unique_memberships, left_on='member_casual', right_on='membership_type', how='left')

    # Add trip duration
    month_data['trip_duration'] = create_trip_duration_col(month_data[['started_at', 'ended_at']])

    # Drop unnecessary columns
    drop_cols = [
        'ended_at', 
        'rideable_type', 
        'member_casual',
        'membership_type',
        'start_lat', 
        'end_lat', 
        'start_lng', 
        'end_lng', 
        'start_station_name', 
        'end_station_name'
    ]
    month_data = month_data.drop(drop_cols, axis=1)
    
    return month_data, unique_month_stations, unique_rideables, unique_memberships, initial_memory_usage

Finally, we use multiple iterations of create_formatted_month_rides to build a multi-month dataset with dramatically reduced memory consumption vs the raw files. 

In [19]:
def process_month_data(month, data_directory):
    """
    Process and save data for a single month to a parquet file.

    Args:
        month: The month to process in yyyymm format.
        data_directory: The directory where the parquet file will be saved.

    Returns:
        A tuple containing:
            - The path to the saved parquet file.
            - Unique stations DataFrame.
            - Unique rideables DataFrame.
            - Unique memberships DataFrame.
            - Memory usage of the processed month data.
    """
    month_data, unique_stations, unique_rideables, unique_memberships, memory_usage = create_formatted_month_rides(month)
    file_path = f'{data_directory}/{month}_data.parquet'
    month_data.to_parquet(file_path)

    return file_path, unique_stations, unique_rideables, unique_memberships, memory_usage

In [20]:
def get_all_data(months_to_pull):
    """
    Pulls requested trip data for each month in parallel, writes it to 
    parquet files in a parallel 'data' directory, and disposes of it from 
    memory. Memory conserved by dropping Citi Bike unique ID and by not 
    storing month data in memory. Unique data for stations, rideables, 
    and memberships are still aggregated in memory. The unique data 
    tables function as dimension tables are are written to a data 
    directory as such. 

    Args:
        months_to_pull: List of yyyymm format months to create data with.

    Returns: 
        A tuple containing:
            - A list of file paths to the parquet files containing the ride data.
            - DataFrames of all unique stations, rideables, and memberships.
    """
    data_directory = '../data'  # Adjust this path according to your directory structure
    if not os.path.exists(data_directory):
        os.makedirs(data_directory)

    with ThreadPoolExecutor(max_workers=len(months_to_pull)) as executor:
        results = executor.map(process_month_data, months_to_pull, [data_directory] * len(months_to_pull))

    # Unpack results
    data_file_paths, all_unique_stations, all_unique_rideables, all_unique_memberships, month_file_memory_values = zip(*results)

    # Concatenate and process unique dataframes
    all_unique_stations = concatenate_dataframes(all_unique_stations, 'station_name')
    all_unique_rideables = concatenate_dataframes(all_unique_rideables, 'rideable_type')
    all_unique_memberships = concatenate_dataframes(all_unique_memberships, 'membership_type')

    # Write uniques tables to data directory for use as fact tables
    for df, filename in zip([all_unique_stations, all_unique_rideables, all_unique_memberships], ['stations', 'rideable_types', 'membership_types']):
        filepath_no_type = os.path.join(data_directory, filename)
        df.to_parquet(f'{filepath_no_type}.parquet')

    # Calculate and report memory usage
    calculate_and_report_memory_usage(
        month_file_memory_values, 
        [all_unique_stations, all_unique_rideables, all_unique_memberships]
    )

    return data_file_paths, all_unique_stations, all_unique_rideables, all_unique_memberships


## Data Pull

In [21]:
# This variable is at the top of the notebook also, but including here for convienience
FETCH_RAW_DATA = True

# SET MONTHS TO PULL
months_to_pull = ['202404', '202405', '202406']

In [22]:
if FETCH_RAW_DATA is True:
    ride_data_paths, unique_stations, unique_rideables, unique_memberships = get_all_data(
        months_to_pull
    )
else:
    pass

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe4 in position 99: invalid continuation byte

Let's also output our unique lookup tables for reference. 

In [None]:
unique_stations.head(3) # sample as this has a few hundred stations in it

Unnamed: 0,int_station_id,station_id,station_name,lat,lng
0,0,6230.04,FDR Drive & E 35 St,40.743954,-73.97139
1,1,5382.07,Forsyth St & Grand St,40.717739,-73.993385
2,2,5971.08,E 20 St & 2 Ave,40.73579,-73.981689


In [None]:
unique_rideables.head(3)

Unnamed: 0,rideable_id,rideable_type
0,0,electric_bike
1,1,classic_bike


In [None]:
unique_memberships.head(3)

Unnamed: 0,membership_id,membership_type
0,0,member
1,1,casual


### Data Read

In [None]:
rides = pd.DataFrame()
for path in ride_data_paths:
    rides = pd.concat([rides, pd.read_parquet(path)])

In [None]:
rides.info()

<class 'pandas.core.frame.DataFrame'>
Index: 5000639 entries, 0 to 783575
Data columns (total 7 columns):
 #   Column            Dtype 
---  ------            ----- 
 0   ride_id           object
 1   started_at        object
 2   start_station_id  int16 
 3   end_station_id    int16 
 4   rideable_id       int8  
 5   membership_id     int8  
 6   trip_duration     int16 
dtypes: int16(3), int8(2), object(2)
memory usage: 152.6+ MB
