In [2]:
import os
import sys
import pandas as pd
from datetime import datetime
import mlflow

# 🔐 Set explicit MLflow tracking to YOUR DagsHub account
mlflow.set_tracking_uri("https://dagshub.com/ryallavinuthnareddy/citibikeproject.mlflow")
os.environ["MLFLOW_TRACKING_USERNAME"] = "ryallavinuthnareddy"
os.environ["MLFLOW_TRACKING_PASSWORD"] = "92dd14cdf34bef08d8871bf22dbdc0869a9198cf"  # Replace with your token

# Ensure your source code path is accessible
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from src.data_utils import load_and_process_citibike_data, transform_raw_data_into_ts_data

# 🗓️ Load Citi Bike data (2023)
from_year = 2023
to_year = 2023
print(f"Loading Citi Bike data from {from_year} to {to_year}")

chunks = []
for year in range(from_year, to_year + 1):
    rides_one_year = load_and_process_citibike_data(year)
    chunks.append(rides_one_year)
    break  # You can remove this break to load multiple years

rides = pd.concat(chunks, ignore_index=True)
print("✅ Citi Bike ride data loaded:", rides.shape)

# 🧹 Rename columns
rides.rename(columns={"start_station_id": "pickup_location_id", "started_at": "pickup_datetime"}, inplace=True)

# 🔄 Transform to hourly time series
ts_data = transform_raw_data_into_ts_data(rides)
print("✅ Transformed to time-series format:", ts_data.shape)
ts_data.info()

# ✅ Optional: Log data info to MLflow
mlflow.set_experiment("citibikeproject-experiment")
with mlflow.start_run() as run:
    mlflow.log_param("year_loaded", from_year)
    mlflow.log_metric("raw_rows", rides.shape[0])
    mlflow.log_metric("ts_data_rows", ts_data.shape[0])
    
    run_id = run.info.run_id
    exp_id = mlflow.get_experiment_by_name("citibikeproject-experiment").experiment_id
    print(f"🏃 View run at: https://dagshub.com/ryallavinuthnareddy/citibikeproject.mlflow/#/experiments/{exp_id}/runs/{run_id}")



Loading Citi Bike data from 2023 to 2023

📁 Looking for monthly zips in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202301-citibike-tripdata
🗂️ Reading 202301-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202301-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


✅ Finished processing for 2023-01

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202302-citibike-tripdata
🗂️ Reading 202302-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


🗂️ Reading 202302-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-02

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202303-citibike-tripdata
🗂️ Reading 202303-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202303-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202303-citibike-tripdata_3.csv
✅ Finished processing for 2023-03

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202304-citibike-tripdata
🗂️ Reading 202304-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)


🗂️ Reading 202304-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


🗂️ Reading 202304-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-04

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202305-citibike-tripdata
🗂️ Reading 202305-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)


🗂️ Reading 202305-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202305-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)


🗂️ Reading 202305-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)


✅ Finished processing for 2023-05

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202306-citibike-tripdata
🗂️ Reading 202306-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202306-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202306-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202306-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-06

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202307-citibike-tripdata
🗂️ Reading 202307-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202307-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202307-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202307-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-07

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202308-citibike-tripdata
🗂️ Reading 202308-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202308-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202308-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202308-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-08

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202309-citibike-tripdata
🗂️ Reading 202309-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202309-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202309-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202309-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-09

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202310-citibike-tripdata
🗂️ Reading 202310-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202310-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202310-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202310-citibike-tripdata_4.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-10

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202311-citibike-tripdata
🗂️ Reading 202311-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202311-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202311-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-11

📁 Checking for CSVs in: /Users/Vinuthna/Downloads/Final_Project/Final_Project/data/raw/2023-citibike-tripdata/2023-citibike-tripdata/202312-citibike-tripdata
🗂️ Reading 202312-citibike-tripdata_2.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202312-citibike-tripdata_3.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


🗂️ Reading 202312-citibike-tripdata_1.csv


  df = pd.read_csv(csv_file)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['start_station_id'] = df['start_station_id'].astype(str)


✅ Finished processing for 2023-12

✅ All data loaded. Total records: 316,816
✅ Citi Bike ride data loaded: (316816, 15)
✅ Transformed to time-series format: (26535, 3)
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 26535 entries, 0 to 26534
Data columns (total 3 columns):
 #   Column              Non-Null Count  Dtype         
---  ------              --------------  -----         
 0   pickup_hour         26535 non-null  datetime64[ns]
 1   pickup_location_id  26535 non-null  object        
 2   rides               26535 non-null  int64         
dtypes: datetime64[ns](1), int64(1), object(1)
memory usage: 622.0+ KB
🏃 View run at: https://dagshub.com/ryallavinuthnareddy/citibikeproject.mlflow/#/experiments/4/runs/21f153754f7e47c0b426630eb305e41b
🏃 View run casual-lamb-30 at: https://dagshub.com/ryallavinuthnareddy/citibikeproject.mlflow/#/experiments/4/runs/21f153754f7e47c0b426630eb305e41b
🧪 View experiment at: https://dagshub.com/ryallavinuthnareddy/citibikeproject.mlflow/#/experim

In [3]:
ts_data.head()


Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2022-12-28 11:00:00,5329.03,1
1,2022-12-28 12:00:00,5329.03,0
2,2022-12-28 13:00:00,5329.03,0
3,2022-12-28 14:00:00,5329.03,0
4,2022-12-28 15:00:00,5329.03,0


In [4]:
import hopsworks

project = hopsworks.login()  # or hopsworks.login(api_key_value="your_api_key")
feature_store = project.get_feature_store()

2025-05-11 00:12:06,882 INFO: Initializing external client
2025-05-11 00:12:06,882 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2025-05-11 00:12:08,205 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1215676


In [5]:
from hsfs.feature import Feature
import numpy as np

# Define the schema for the feature group
features = [
    Feature(name="pickup_hour", type="timestamp"),
    Feature(name="pickup_location_id", type="string"),
    Feature(name="rides", type="bigint"),
]

# Create or retrieve the feature group
feature_group = feature_store.get_or_create_feature_group(
    name="citibike_hourly_features",
    version=1,
    description="Time-series Citi Bike rides aggregated by hour and location",
    primary_key=["pickup_location_id", "pickup_hour"],
    event_time="pickup_hour",
    features=features
)

# ✅ Ensure the 'rides' column has the correct dtype (int64 for 'bigint')
ts_data["rides"] = ts_data["rides"].astype(np.int64)

# Insert data into the feature group
feature_group.insert(ts_data, write_options={"wait_for_job": False})


Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1215676/fs/1203303/fg/1458577


Uploading Dataframe: 100.00% |█| Rows 26535/26535 | Elapsed Time: 00:02 | Remain


Launching job: citibike_hourly_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1215676/jobs/named/citibike_hourly_features_1_offline_fg_materialization/executions


(Job('citibike_hourly_features_1_offline_fg_materialization', 'SPARK'), None)

In [6]:
# Load the feature group (if not already loaded)
feature_group = feature_store.get_feature_group(
    name="citibike_hourly_features",
    version=1
)

# Read data from offline storage
ts_data_from_hopsworks = feature_group.read()

# Preview the data
ts_data_from_hopsworks.head()


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (6.97s) 


Unnamed: 0,pickup_hour,pickup_location_id,rides
0,2023-05-24 16:00:00+00:00,6140.05,42
1,2023-07-01 20:00:00+00:00,6948.1,12
2,2023-10-22 14:00:00+00:00,6948.1,38
3,2023-08-07 18:00:00+00:00,6140.05,57
4,2023-01-11 23:00:00+00:00,6948.1,0


In [7]:
# notebooks/create_full_feature_group.py
import sys
import os

# Add root path to sys.path (adjust if needed)
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from datetime import datetime, timedelta
import logging
import pandas as pd
import hopsworks

from src.data_utils import (
    load_and_process_citibike_data_from_local,
    transform_raw_data_into_ts_data,
)
import src.config as config
from hsfs.feature import Feature

# ─────────────────────────────────────────────────────
# Logging setup
# ─────────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)s  %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

# ─────────────────────────────────────────────────────
# 1. Load raw Citi Bike data (2023 only)
# ─────────────────────────────────────────────────────
logger.info("📅 Loading Citi Bike data from Jan to Dec 2023 …")
raw_rides = load_and_process_citibike_data_from_local(
    year=2023,
    months=list(range(1, 13)),
    base_path=config.LOCAL_CITIBIKE_DATA_PATH,
)
logger.info(f"✅ Loaded {len(raw_rides):,} rows of 2023 ride data.")

# ─────────────────────────────────────────────────────
# 2. Transform raw data into hourly time-series
# ─────────────────────────────────────────────────────
logger.info("🧮 Aggregating to hourly time-series format …")
ts_data = transform_raw_data_into_ts_data(raw_rides)

# ─────────────────────────────────────────────────────
# 3. Log in to Hopsworks
# ─────────────────────────────────────────────────────
logger.info("🔐 Logging in to Hopsworks …")
project = hopsworks.login(
    project=config.HOPSWORKS_PROJECT_NAME,
    api_key_value=config.HOPSWORKS_API_KEY,
)
fs = project.get_feature_store()

# ─────────────────────────────────────────────────────
# 4. Define feature group schema (fixed schema)
# ─────────────────────────────────────────────────────
fg_schema = [
    Feature("pickup_hour", "timestamp"),
    Feature("pickup_location_id", "string"),
    Feature("rides", "bigint"),  # ← FIXED: now matches Hopsworks expected type
]
# ─────────────────────────────────────────────────────
# 5. Create or update Hopsworks feature group
# ─────────────────────────────────────────────────────
logger.info("📦 Writing to Hopsworks feature group …")
hourly_fg = fs.get_or_create_feature_group(
    name="citibike_hourly_features",
    version=1,
    description="Hourly Citi Bike rides per location (2023)",
    primary_key=["pickup_hour", "pickup_location_id"],
    event_time="pickup_hour",
    online_enabled=False,
    features=fg_schema,
)

# ✅ Ensure types match the schema
ts_data["pickup_location_id"] = ts_data["pickup_location_id"].astype(str)
ts_data["rides"] = ts_data["rides"].astype("int64")  # ← FIXED: bigint = int64 in pandas

# ─────────────────────────────────────────────────────
# 6. Insert into feature store
# ─────────────────────────────────────────────────────
hourly_fg.insert(ts_data, write_options={"wait_for_job": True})
logger.info("✅ Done uploading data to Hopsworks!")


2025-05-11 00:34:44,435 INFO: 📅 Loading Citi Bike data from Jan to Dec 2023 …




2025-05-11 00:36:29,839 INFO: ✅ Loaded 410,344 rows of 2023 ride data.
2025-05-11 00:36:29,839 INFO: 🧮 Aggregating to hourly time-series format …
2025-05-11 00:36:30,080 INFO: 🔐 Logging in to Hopsworks …
2025-05-11 00:36:30,080 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-05-11 00:36:30,088 INFO: Initializing external client
2025-05-11 00:36:30,089 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2025-05-11 00:36:31,008 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1215676
2025-05-11 00:36:31,975 INFO: 📦 Writing to Hopsworks feature group …


Uploading Dataframe: 100.00% |█| Rows 26535/26535 | Elapsed Time: 00:03 | Remain


Launching job: citibike_hourly_features_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1215676/jobs/named/citibike_hourly_features_1_offline_fg_materialization/executions
2025-05-11 00:37:08,087 INFO: Waiting for execution to finish. Current state: INITIALIZING. Final status: UNDEFINED
2025-05-11 00:37:11,247 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-11 00:37:14,387 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-11 00:38:45,501 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-11 00:38:45,617 INFO: Waiting for log aggregation to finish.
2025-05-11 00:38:57,317 INFO: Execution finished successfully.
2025-05-11 00:38:57,321 INFO: ✅ Done uploading data to Hopsworks!
