# Citi Bike Trip Data Pipeline 🚲
End-to-end data pipeline for:
- Downloading 12 months of Citi Bike data
- Extracting and filtering for Top 3 stations
- Cleaning and engineering features
- Uploading to Hopsworks Feature Store


In [1]:
# Core Libraries
import os
import requests
import zipfile
import pandas as pd
import shutil
from datetime import datetime
from dateutil.relativedelta import relativedelta
import pytz
from io import BytesIO
from collections import Counter
from dotenv import load_dotenv

# Hopsworks
import hopsworks



In [2]:
# Configurable Constants
TEMP_FOLDER = "tripdata_temp"
OUTPUT_FILE = "top3_stations_output.csv"
CHUNK_SIZE = 500_000

TARGET_COLS = [
    "ride_id", "rideable_type", "started_at", "ended_at",
    "start_station_name", "start_station_id",
    "end_station_name", "end_station_id",
    "start_lat", "start_lng", "end_lat", "end_lng",
    "member_casual"
]


## ⏳ Get Last 12 Months in US/Eastern Timezone


In [3]:
def get_last_12_months_est():
    eastern = pytz.timezone("US/Eastern")
    now_est = datetime.now(eastern)
    return [(now_est - relativedelta(months=i + 1)).strftime('%Y%m') for i in range(12)]

months = get_last_12_months_est()
months


['202504',
 '202503',
 '202502',
 '202501',
 '202412',
 '202411',
 '202410',
 '202409',
 '202408',
 '202407',
 '202406',
 '202405']

## 🌐 Download Citi Bike ZIPs
Tries both `.zip` and `.csv.zip` formats


In [4]:
def download_zip_to_memory(ym):
    base_url = "https://s3.amazonaws.com/tripdata/"
    filenames = [f"{ym}-citibike-tripdata.zip", f"{ym}-citibike-tripdata.csv.zip"]
    for fname in filenames:
        url = base_url + fname
        try:
            print(f"🌐 Trying: {url}")
            r = requests.get(url, timeout=20)
            if r.status_code == 200:
                print(f"✅ Downloaded: {fname}")
                return BytesIO(r.content)
            else:
                print(f"❌ Not found: {url}")
        except Exception as e:
            print(f"⚠️ Error downloading {url}: {e}")
    return None


## 🗂️ Extract CSVs (including nested ZIPs)


In [5]:
def extract_all_csvs(zip_bytes_io, extract_to):
    try:
        with zipfile.ZipFile(zip_bytes_io) as zf:
            for member in zf.namelist():
                if member.endswith('.zip'):
                    nested_zip_data = zf.read(member)
                    with zipfile.ZipFile(BytesIO(nested_zip_data)) as nested_zf:
                        for nested_member in nested_zf.namelist():
                            if nested_member.endswith('.csv'):
                                print(f"📦 Extracting nested CSV: {nested_member}")
                                nested_zf.extract(nested_member, extract_to)
                elif member.endswith('.csv'):
                    print(f"📁 Extracting CSV: {member}")
                    zf.extract(member, extract_to)
    except Exception as e:
        print(f"⚠️ Error extracting zip: {e}")


## 📁 Flatten CSV Paths from Folder


In [6]:
def flatten_csvs_folder(root_folder):
    flat_files = []
    for root, _, files in os.walk(root_folder):
        for fname in files:
            if fname.endswith(".csv"):
                full_path = os.path.join(root, fname)
                flat_files.append(full_path)
    return flat_files


## 📊 Identify Top 3 Most Frequent Start Stations


In [7]:
def get_top3_station_names(filepaths):
    freq = Counter()
    for path in filepaths:
        try:
            for chunk in pd.read_csv(path, usecols=["start_station_name"], dtype={"start_station_name": str}, chunksize=CHUNK_SIZE):
                chunk = chunk.dropna(subset=["start_station_name"])
                freq.update(chunk["start_station_name"])
        except Exception as e:
            print(f"⚠️ Skipping {path}: {e}")
    top3 = [name for name, _ in freq.most_common(3)]
    return top3


## 📝 Write Filtered Rows (Only Top 3 Stations)


In [8]:
def write_top3_data(filepaths, top3, output=OUTPUT_FILE):
    first_write = True
    for path in filepaths:
        try:
            for chunk in pd.read_csv(path, usecols=TARGET_COLS, chunksize=CHUNK_SIZE, low_memory=False):
                chunk = chunk.dropna(subset=["start_station_name"])
                filtered = chunk[chunk["start_station_name"].isin(top3)]
                if not filtered.empty:
                    filtered.to_csv(output, index=False, mode='a' if not first_write else 'w', header=first_write)
                    print(f"✅ Written {len(filtered)} rows from {path}")
                    first_write = False
        except Exception as e:
            print(f"⚠️ Skipping {path}: {e}")


## 🧼 Clean Data and Engineer Features
- Handle nulls
- Parse timestamps
- Add ride_duration, day_of_week, hour_of_day, month


In [9]:
def clean_and_engineer_features(file_path):
    print("🧼 Cleaning and engineering features...")
    df = pd.read_csv(file_path)

    df.columns = df.columns.str.strip().str.lower()
    df['started_at'] = pd.to_datetime(df['started_at'], errors='coerce')
    df['ended_at'] = pd.to_datetime(df['ended_at'], errors='coerce')
    df = df.dropna(subset=['started_at', 'ended_at'])

    critical_cols = ['ride_id', 'rideable_type', 'start_lat', 'start_lng', 'end_lat', 'end_lng', 'member_casual']
    df = df.dropna(subset=critical_cols)

    df['start_station_name'] = df['start_station_name'].fillna('Unknown')
    df['end_station_name'] = df['end_station_name'].fillna('Unknown')
    df['start_station_id'] = df['start_station_id'].fillna('-1').astype(str)
    df['end_station_id'] = df['end_station_id'].fillna('-1').astype(str)

    df['rideable_type'] = df['rideable_type'].astype('category')
    df['member_casual'] = df['member_casual'].astype('category')

    df['ride_duration_mins'] = (df['ended_at'] - df['started_at']).dt.total_seconds() / 60
    df = df[df['ride_duration_mins'] > 0]

    df['day_of_week'] = df['started_at'].dt.day_name()
    df['hour_of_day'] = df['started_at'].dt.hour.astype('int32')
    df['month'] = df['started_at'].dt.month.astype('int32')

    print(f"✅ Cleaned dataset: {df.shape[0]:,} rows × {df.shape[1]} columns")
    return df


## 🔐 Connect and Push to Hopsworks Feature Store


In [10]:
def connect_to_hopsworks():
    print("🔐 Connecting to Hopsworks...")
    load_dotenv()
    project = hopsworks.login(
        project=os.getenv("HOPSWORKS_PROJECT"),
        api_key_value=os.getenv("HOPSWORKS_API_KEY")
    )
    return project.get_feature_store()

def push_to_feature_store(df, fs):
    print("🚀 Pushing to Hopsworks Feature Store...")

    try:
        # Delete the entire feature group if it exists
        fg = fs.get_feature_group("citi_bike_trips", version=1)
        fg.delete()
        print("🗑️ Existing feature group deleted.")
    except Exception as e:
        print(f"ℹ️ Feature group does not exist or could not be deleted: {e}")

    # Recreate feature group
    fg = fs.create_feature_group(
        name="citi_bike_trips",
        version=1,
        description="Citi Bike data from top 3 stations in last 12 months",
        primary_key=["ride_id"],
        event_time="started_at"
    )

    fg.insert(df, write_options={"wait_for_job": True})
    print("✅ Feature group created and data inserted.")


## ⚙️ Run the Full ETL Pipeline


In [11]:
# RUN THIS TO EXECUTE THE FULL WORKFLOW
if os.path.exists(TEMP_FOLDER):
    shutil.rmtree(TEMP_FOLDER)
os.makedirs(TEMP_FOLDER, exist_ok=True)

print("🚀 Starting download + extraction...")
for ym in months:
    zip_mem = download_zip_to_memory(ym)
    if zip_mem:
        extract_all_csvs(zip_mem, TEMP_FOLDER)

all_csvs = flatten_csvs_folder(TEMP_FOLDER)
print("🔍 Counting top 3 stations...")
top3 = get_top3_station_names(all_csvs)
print(f"🏆 Top 3 Stations: {top3}")

print("📤 Writing filtered data...")
write_top3_data(all_csvs, top3)

if not os.path.exists(OUTPUT_FILE) or os.path.getsize(OUTPUT_FILE) == 0:
    raise RuntimeError("❌ No data written. Check filters or input data.")

print("🧹 Cleaning up temp files...")
shutil.rmtree(TEMP_FOLDER)
print("✅ Temp cleanup done.")

df = clean_and_engineer_features(OUTPUT_FILE)
fs = connect_to_hopsworks()
push_to_feature_store(df, fs)


🚀 Starting download + extraction...
🌐 Trying: https://s3.amazonaws.com/tripdata/202504-citibike-tripdata.zip
✅ Downloaded: 202504-citibike-tripdata.zip
📁 Extracting CSV: 202504-citibike-tripdata_3.csv
📁 Extracting CSV: 202504-citibike-tripdata_2.csv
📁 Extracting CSV: 202504-citibike-tripdata_1.csv
📁 Extracting CSV: 202504-citibike-tripdata_4.csv
🌐 Trying: https://s3.amazonaws.com/tripdata/202503-citibike-tripdata.zip
❌ Not found: https://s3.amazonaws.com/tripdata/202503-citibike-tripdata.zip
🌐 Trying: https://s3.amazonaws.com/tripdata/202503-citibike-tripdata.csv.zip
✅ Downloaded: 202503-citibike-tripdata.csv.zip
📁 Extracting CSV: 202503-citibike-tripdata.csv
🌐 Trying: https://s3.amazonaws.com/tripdata/202502-citibike-tripdata.zip
✅ Downloaded: 202502-citibike-tripdata.zip
📁 Extracting CSV: 202502-citibike-tripdata_3.csv
📁 Extracting CSV: 202502-citibike-tripdata_2.csv
📁 Extracting CSV: 202502-citibike-tripdata_1.csv
🌐 Trying: https://s3.amazonaws.com/tripdata/202501-citibike-tripdata.



🗑️ Existing feature group deleted.
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1228957/fs/1213523/fg/1454726


Uploading Dataframe: 100.00% |██████████| Rows 39994/39994 | Elapsed Time: 00:15 | Remaining Time: 00:00


Launching job: citi_bike_trips_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1228957/jobs/named/citi_bike_trips_1_offline_fg_materialization/executions
2025-05-12 06:44:03,246 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-12 06:44:06,341 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-12 06:46:22,575 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-05-12 06:46:31,843 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-12 06:46:31,942 INFO: Waiting for log aggregation to finish.
2025-05-12 06:46:40,333 INFO: Execution finished successfully.
✅ Feature group created and data inserted.
