Import statements

In [4]:
# Import necessary libraries
from pathlib import Path
import requests
import pandas as pd
import zipfile
import io

In [10]:
def fetch_raw_data(year: int, month: int) -> str:
    """
    Fetches raw Citi Bike trip data for a given year and month from the S3 bucket,
    extracts the ZIP file, combines CSVs if multiple, and saves as a parquet file.

    Args:
        year (int): The year to fetch data for (e.g., 2024).
        month (int): The month to fetch data for (e.g., 1 for January).

    Returns:
        str: Path to the saved parquet file.

    Raises:
        Exception: If the URL is not available or data cannot be processed.
    """
    # Construct the URL for the Citi Bike data (monthly files from 2024 onward)
    url = f"https://s3.amazonaws.com/tripdata/{year}{month:02}-citibike-tripdata.csv.zip"
    
    # Send a request to download the ZIP file
    response = requests.get(url)
    if response.status_code != 200:
        raise Exception(f"{url} is not available")

    # Create the directory for raw data if it doesn't exist
    raw_data_dir = Path("..") / "data" / "raw"
    raw_data_dir.mkdir(parents=True, exist_ok=True)

    # Path to save the combined data as parquet
    output_path = raw_data_dir / f"rides_{year}_{month:02}.parquet"

    try:
        # Expected columns based on Citi Bike schema
        expected_columns = [
            'ride_id', 'rideable_type', 'started_at', 'ended_at',
            'start_station_name', 'start_station_id', 'end_station_name', 'end_station_id',
            'start_lat', 'start_lng', 'end_lat', 'end_lng', 'member_casual'
        ]

        # Extract the ZIP file content
        with zipfile.ZipFile(io.BytesIO(response.content)) as z:
            # List all CSV files in the ZIP
            csv_files = [f for f in z.namelist() if f.endswith('.csv')]
            if not csv_files:
                raise Exception(f"No CSV files found in {url}")

            # Read and combine all CSVs into a single DataFrame
            dfs = []
            for csv_file in csv_files:
                with z.open(csv_file) as f:
                    # Use 'latin1' encoding to handle special characters; skip bad lines if necessary
                    df = pd.read_csv(f, encoding='latin1', on_bad_lines='skip')
                    # Keep only the expected columns, drop any extras like 'Unnamed: 0', 'Unnamed: 1'
                    df = df[[col for col in expected_columns if col in df.columns]]
                    dfs.append(df)

            # Combine all CSVs into one DataFrame
            combined_df = pd.concat(dfs, ignore_index=True)

        # Inspect the DataFrame for debugging
        print("DataFrame info before cleaning:")
        print(combined_df.info())
        print("\nUnique values in start_station_id:")
        print(combined_df['start_station_id'].unique()[:10])  # Show first 10 unique values

        # Clean the data: Convert start_station_id and end_station_id to strings, handle NaN
        combined_df['start_station_id'] = combined_df['start_station_id'].astype(str).replace('nan', '')
        combined_df['end_station_id'] = combined_df['end_station_id'].astype(str).replace('nan', '')

        # Convert started_at and ended_at to datetime
        combined_df['started_at'] = pd.to_datetime(combined_df['started_at'], errors='coerce')
        combined_df['ended_at'] = pd.to_datetime(combined_df['ended_at'], errors='coerce')

        # Ensure other columns are of appropriate types
        combined_df['start_lat'] = combined_df['start_lat'].astype(float, errors='ignore')
        combined_df['start_lng'] = combined_df['start_lng'].astype(float, errors='ignore')
        combined_df['end_lat'] = combined_df['end_lat'].astype(float, errors='ignore')
        combined_df['end_lng'] = combined_df['end_lng'].astype(float, errors='ignore')
        combined_df['member_casual'] = combined_df['member_casual'].astype(str)

        # Inspect the DataFrame after cleaning
        print("\nDataFrame info after cleaning:")
        print(combined_df.info())

        # Save the combined DataFrame as a parquet file
        combined_df.to_parquet(output_path, engine="pyarrow", index=False)
        print(f"Successfully fetched and saved: {str(output_path)}")
        return str(output_path)

    except Exception as e:
        raise Exception(f"Error processing data from {url}: {str(e)}")


Function Test

In [11]:
# Test the function for January 2024
fetch_raw_data(2024, 1)

  df = pd.read_csv(f, encoding='latin1', on_bad_lines='skip')


DataFrame info before cleaning:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1888085 entries, 0 to 1888084
Data columns (total 13 columns):
 #   Column              Dtype  
---  ------              -----  
 0   ride_id             object 
 1   rideable_type       object 
 2   started_at          object 
 3   ended_at            object 
 4   start_station_name  object 
 5   start_station_id    object 
 6   end_station_name    object 
 7   end_station_id      object 
 8   start_lat           float64
 9   start_lng           float64
 10  end_lat             float64
 11  end_lng             float64
 12  member_casual       object 
dtypes: float64(4), object(9)
memory usage: 187.3+ MB
None

Unique values in start_station_id:
['7954.12' '6771.13' '5659.11' '7443.01' '4339.01' '4422.05' '7650.05'
 '6490.02' '5440.05' '7918.12']

DataFrame info after cleaning:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1888085 entries, 0 to 1888084
Data columns (total 13 columns):
 #   Column      

'..\\data\\raw\\rides_2024_01.parquet'

In [12]:
df = pd.read_parquet("../data/raw/rides_2024_01.parquet")
df.head()

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,5078F3D302000BD2,electric_bike,2024-01-22 18:43:19.012,2024-01-22 18:48:10.708,Frederick Douglass Blvd & W 145 St,7954.12,St Nicholas Ave & W 126 St,7756.1,40.823072,-73.941738,40.811432,-73.951878,member
1,814337105D37302A,electric_bike,2024-01-11 19:19:18.721,2024-01-11 19:47:36.007,W 54 St & 6 Ave,6771.13,E 74 St & 1 Ave,6953.08,40.761822,-73.977036,40.768974,-73.954823,member
2,A33A920E2B10710C,electric_bike,2024-01-30 19:17:41.693,2024-01-30 19:32:49.857,E 11 St & Ave B,5659.11,W 10 St & Washington St,5847.06,40.727592,-73.979751,40.733424,-74.008515,casual
3,A3A5FC0DD7D34D74,electric_bike,2024-01-27 11:27:01.759,2024-01-27 11:38:01.213,W 54 St & 6 Ave,6771.13,E 74 St & 1 Ave,6953.08,40.761779,-73.977144,40.768974,-73.954823,member
4,6F96728ECEFBDAA4,electric_bike,2024-01-16 15:15:41.000,2024-01-16 15:29:26.156,Madison Ave & E 99 St,7443.01,E 74 St & 1 Ave,6953.08,40.789808,-73.952214,40.768974,-73.954823,member
