In [1]:
#!/usr/bin/env python3
"""
clean_preprocess_to_hopsworks.py

1) Loads raw Citibike CSVs,  
2) Cleans & feature-engineers them,  
3) Filters to the top-K busiest start stations,  
4) Writes out a local Parquet, AND  
5) Pushes the DataFrame into Hopsworks Feature Store.
"""

import os
import glob
import pandas as pd
import hopsworks

# ──────────────────────────────────────────────────────────────────────────────
# LOCAL CONFIGURATION — update these to your paths
CSV_DIR     = "/Users/kaushalshivaprakash/Desktop/project3/data/processed/raw_citibike_csvs"
OUTPUT_DIR  = "/Users/kaushalshivaprakash/Desktop/project3/data/processed/cleaned_citibike"
OUTPUT_FILE = os.path.join(OUTPUT_DIR, "citibike_2023_top3.parquet")
TOP_K       = 3
# ──────────────────────────────────────────────────────────────────────────────

COLUMNS = [
    "ride_id","rideable_type","started_at","ended_at",
    "start_station_name","start_station_id",
    "end_station_name","end_station_id",
    "start_lat","start_lng","end_lat","end_lng",
    "member_casual",
]

def load_all_csvs(csv_dir: str) -> pd.DataFrame:
    paths = glob.glob(os.path.join(csv_dir, "*.csv"))
    dfs = []
    for p in paths:
        df = pd.read_csv(
            p, usecols=COLUMNS,
            dtype={
                "ride_id": str,
                "rideable_type": "category",
                "start_station_id": str,
                "end_station_id": str,
                "member_casual": "category",
            },
        )
        print(f"Loaded {len(df):,} rows from {os.path.basename(p)}")
        dfs.append(df)
    combined = pd.concat(dfs, ignore_index=True)
    print(f"Total rows after concat: {len(combined):,}")
    return combined

def parse_and_engineer(df: pd.DataFrame) -> pd.DataFrame:
    df["started_at"] = pd.to_datetime(df["started_at"])
    df["ended_at"]   = pd.to_datetime(df["ended_at"])
    df["trip_duration_min"] = (
        (df["ended_at"] - df["started_at"])
        .dt.total_seconds().div(60).clip(lower=0)
    )
    df["start_hour"]      = df["started_at"].dt.hour
    df["start_dayofweek"] = df["started_at"].dt.dayofweek
    return df

def filter_top_stations(df: pd.DataFrame, k: int) -> pd.DataFrame:
    top = df["start_station_name"].value_counts().nlargest(k).index.tolist()
    print(f"Top {k} stations:\n  " + "\n  ".join(top))
    return df[df["start_station_name"].isin(top)].copy()

def clean_df(df: pd.DataFrame) -> pd.DataFrame:
    before = len(df)
    df = df.drop_duplicates(subset=["ride_id"])
    df = df.dropna(subset=[
        "started_at","ended_at",
        "start_station_name","end_station_name",
        "trip_duration_min"
    ])
    df = df[df["trip_duration_min"] > 0]
    print(f"Dropped {before - len(df):,} invalid/duplicate rows")
    return df

def store_to_hopsworks(df: pd.DataFrame):
    # Log in to your Hopsworks instance (uses HOPSWORKS_HOST & HOPSWORKS_API_KEY env vars)
    project = hopsworks.login()  
    fs = project.get_feature_store()
    # Create (or get) a feature group
    fg = fs.get_or_create_feature_group(
        name="citibike_top3_trips",
        version=1,
        primary_key=["ride_id"],
        description="Cleaned Citibike 2023 trips for top-3 busiest start stations"
    )
    # Insert your DataFrame
    fg.insert(df, write_options={"wait_for_job": False})
    print("✅ Data inserted to Hopsworks feature store 'citibike_top3_trips'")

def main():
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # 1. Load & concat
    df = load_all_csvs(CSV_DIR)

    # 2. Parse & engineer
    df = parse_and_engineer(df)

    # 3. Filter to top-K stations
    df = filter_top_stations(df, TOP_K)

    # 4. Final cleaning
    df = clean_df(df)

    # 5. Persist locally
    df.to_parquet(OUTPUT_FILE, index=False)
    print(f"\n✓ Saved cleaned data ({len(df):,} rows) to {OUTPUT_FILE}\n")

    # 6. Push to Hopsworks
    store_to_hopsworks(df)

if __name__ == "__main__":
    main()

Loaded 1,000,000 rows from 202312-citibike-tripdata_2.csv
Loaded 1,000,000 rows from 202301-citibike-tripdata_1.csv
Loaded 204,874 rows from 202312-citibike-tripdata_3.csv
Loaded 453,152 rows from 202305-citibike-tripdata_4.csv
Loaded 1,000,000 rows from 202312-citibike-tripdata_1.csv
Loaded 795,412 rows from 202301-citibike-tripdata_2.csv
Loaded 1,000,000 rows from 202305-citibike-tripdata_1.csv
Loaded 1,000,000 rows from 202305-citibike-tripdata_2.csv
Loaded 1,000,000 rows from 202305-citibike-tripdata_3.csv
Loaded 1,000,000 rows from 202306-citibike-tripdata_2.csv
Loaded 1,000,000 rows from 202309-citibike-tripdata_3.csv
Loaded 1,000,000 rows from 202309-citibike-tripdata_2.csv
Loaded 1,000,000 rows from 202306-citibike-tripdata_3.csv
Loaded 1,000,000 rows from 202306-citibike-tripdata_1.csv
Loaded 1,000,000 rows from 202309-citibike-tripdata_1.csv
Loaded 451,549 rows from 202306-citibike-tripdata_4.csv
Loaded 1,000,000 rows from 202311-citibike-tripdata_1.csv
Loaded 696,171 rows fr

Uploading Dataframe: 100.00% |█| Rows 365696/365696 | Elapsed Time: 00:09 | Rema


Launching job: citibike_top3_trips_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1213683/jobs/named/citibike_top3_trips_1_offline_fg_materialization/executions
✅ Data inserted to Hopsworks feature store 'citibike_top3_trips'
