In [46]:
# Reg fetch new batch of features and compute predictions and save to feature store
# 

In [47]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [48]:
import sys
import os

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
import src.config as config

In [49]:
from src.inference import get_feature_store, load_model_from_registry, get_model_predictions
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import src.config as config
import lightgbm as lgb

try:
    # Step 1: Setup and get feature store data
    current_date = pd.Timestamp.now(tz='Etc/UTC')
    feature_store = get_feature_store()
    
    fetch_data_to = current_date - timedelta(hours=1)
    fetch_data_from = fetch_data_to - timedelta(days=30)
    
    print(f"Fetching data from {fetch_data_from} to {fetch_data_to}")
    
    feature_view = feature_store.get_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION
    )
    
    # Get and prepare initial data
    ts_data = feature_view.get_batch_data(
        start_time=fetch_data_from,
        end_time=fetch_data_to
    )
    
    # Step 2: Prepare continuous time series data
    ts_data['pickup_hour'] = ts_data['pickup_hour'].dt.tz_localize(None)
    
    # Create full date range
    date_range = pd.date_range(
        start=ts_data.pickup_hour.min(),
        end=ts_data.pickup_hour.max(),
        freq='H'
    )
    
    # Process each location to ensure continuous data
    processed_locations = []
    
    for location_id in ts_data.pickup_location_id.unique():
        loc_data = ts_data[ts_data.pickup_location_id == location_id].copy()
        
        # Create continuous series for location
        location_series = pd.DataFrame({
            'pickup_hour': date_range,
            'pickup_location_id': location_id
        })
        
        # Merge with actual data
        location_series = location_series.merge(
            loc_data[['pickup_hour', 'rides']], 
            on='pickup_hour', 
            how='left'
        )
        
        # Fill missing values with 0
        location_series['rides'] = location_series['rides'].fillna(0)
        
        # Only keep locations with sufficient data
        if len(location_series) >= 672:  # 28 days * 24 hours
            processed_locations.append(location_series)
    
    # Combine processed locations
    if not processed_locations:
        raise ValueError("No locations with sufficient data found")
    
    ts_data_processed = pd.concat(processed_locations)
    print(f"\nProcessed {len(processed_locations)} locations with sufficient data")
    
    # Step 3: Generate features
    from src.data_utils import transform_ts_data_info_features
    features, _ = transform_ts_data_info_features(
        ts_data,
        window_size=504,  # 21 days
        step_size=24
    )

    
    # Add missing column
    features['rides_t-672'] = 0
    
    # Step 4: Load and prepare model
    model = load_model_from_registry()
    if isinstance(model, lgb.Booster):
        model.params['predict_disable_shape_check'] = True
    elif hasattr(model, 'steps') and isinstance(model.steps[-1][1], lgb.LGBMRegressor):
        model.steps[-1][1].set_params(predict_disable_shape_check=True)
    
    # Step 5: Generate predictions
    predictions = get_model_predictions(model, features)
    
    if predictions is not None and not predictions.empty:
        predictions['pickup_hour'] = current_date.ceil('h')
        
        # Save to feature store
        feature_group = feature_store.get_or_create_feature_group(
            name=config.FEATURE_GROUP_MODEL_PREDICTION,
            version=1,
            description="Predictions from LGBM Model",
            primary_key=['pickup_location_id', 'pickup_hour'],
            event_time='pickup_hour',
        )
        
        feature_group.insert(predictions, write_options={"wait_for_job": False})
        
        print(f"\nSaved {len(predictions)} predictions to feature store")
        print("\nTop 10 locations by predicted demand:")
        print(predictions.sort_values('predicted_demand', ascending=False)[
            ['pickup_location_id', 'predicted_demand']
        ].head(10))

except Exception as e:
    print(f"Error: {str(e)}")
    print("\nDebug Info:")
    if 'ts_data_processed' in locals():
        print(f"Processed data shape: {ts_data_processed.shape}")
        print(f"Locations processed: {ts_data_processed.pickup_location_id.nunique()}")
    
predictions if 'predictions' in locals() else None

2025-03-04 12:23:51,990 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 12:23:52,026 INFO: Initializing external client
2025-03-04 12:23:52,028 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 12:23:52,655 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1215673
Fetching data from 2025-02-02 16:23:51.990130+00:00 to 2025-03-04 16:23:51.990130+00:00
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (13.20s) 

Processed 251 locations with sufficient data
Error: cannot import name 'transform_ts_data_info_features' from 'src.data_utils' (d:\EAS-500\sp25_taxi-main\src\data_utils.py)

Debug Info:
Processed data shape: (171935, 3)
Locations processed: 251


Unnamed: 0,pickup_location_id,predicted_demand,pickup_hour
0,79,162.0,2025-03-04 18:00:00+00:00
1,87,29.0,2025-03-04 18:00:00+00:00
2,70,7.0,2025-03-04 18:00:00+00:00
3,143,54.0,2025-03-04 18:00:00+00:00
4,96,0.0,2025-03-04 18:00:00+00:00
...,...,...,...
246,253,0.0,2025-03-04 18:00:00+00:00
247,83,0.0,2025-03-04 18:00:00+00:00
248,207,0.0,2025-03-04 18:00:00+00:00
249,35,1.0,2025-03-04 18:00:00+00:00


In [50]:
from src.inference import load_model_from_registry

model = load_model_from_registry()

2025-03-04 12:24:15,605 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 12:24:15,628 INFO: Initializing external client
2025-03-04 12:24:15,631 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 12:24:16,255 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1215673
Downloading model artifact (0 dirs, 1 files)... DONE

In [51]:
from src.inference import get_feature_store, load_model_from_registry, get_model_predictions
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import src.config as config
import lightgbm as lgb

try:
    # Step 1: Setup time window
    current_date = pd.Timestamp.now(tz='Etc/UTC')
    fetch_data_to = current_date - timedelta(hours=1)
    fetch_data_from = fetch_data_to - timedelta(days=30)
    
    print(f"Fetching data from {fetch_data_from} to {fetch_data_to}")
    
    # Step 2: Get feature store data
    feature_store = get_feature_store()
    feature_view = feature_store.get_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION
    )
    
    ts_data = feature_view.get_batch_data(
        start_time=fetch_data_from,
        end_time=fetch_data_to
    )
    
    ts_data['pickup_hour'] = ts_data['pickup_hour'].dt.tz_localize(None)
    ts_data = ts_data.sort_values(['pickup_location_id', 'pickup_hour'])
    
    print(f"Data loaded: {len(ts_data)} records")
    
    # Step 3: Generate initial features
    from src.data_utils import transform_ts_data_info_features_and_target  # or _loop if needed
    features = transform_ts_data_info_features(
        ts_data,
        window_size=504,  # 21 days
        step_size=24
    )
    
    # Step 4: Add all required time lag columns
    max_lag = 672  # Maximum required lag
    current_lags = set(col for col in features.columns if col.startswith('rides_t-'))
    
    # Add missing time lag columns with zeros
    for lag in range(1, max_lag + 1):
        col_name = f'rides_t-{lag}'
        if col_name not in current_lags:
            features[col_name] = 0
    
    # Step 5: Load and prepare model
    model = load_model_from_registry()
    
    # Modify model parameters to handle shape mismatch
    if isinstance(model, lgb.Booster):
        model.params['predict_disable_shape_check'] = True
    elif hasattr(model, 'steps') and isinstance(model.steps[-1][1], lgb.LGBMRegressor):
        model.steps[-1][1].set_params(predict_disable_shape_check=True)
    
    # Step 6: Generate predictions
    predictions = get_model_predictions(model, features)
    
    if predictions is not None and not predictions.empty:
        # Add timestamp and save predictions
        predictions['pickup_hour'] = current_date.ceil('h')
        
        feature_group = feature_store.get_or_create_feature_group(
            name=config.FEATURE_GROUP_MODEL_PREDICTION,
            version=1,
            description="Predictions from LGBM Model",
            primary_key=['pickup_location_id', 'pickup_hour'],
            event_time='pickup_hour',
        )
        
        feature_group.insert(predictions, write_options={"wait_for_job": False})
        
        print(f"\nSaved {len(predictions)} predictions to feature store")
        print("\nTop 10 locations by predicted demand:")
        print(predictions.sort_values('predicted_demand', ascending=False)[
            ['pickup_location_id', 'predicted_demand']
        ].head(10))

except Exception as e:
    print(f"Error: {str(e)}")
    print("\nDebug Info:")
    if 'features' in locals():
        print(f"Features shape: {features.shape}")
        print(f"Number of time lag columns: {len([c for c in features.columns if c.startswith('rides_t-')])}")
    
# Display predictions
predictions if 'predictions' in locals() else None

Fetching data from 2025-02-02 16:24:19.231284+00:00 to 2025-03-04 16:24:19.231284+00:00
2025-03-04 12:24:19,231 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 12:24:19,241 INFO: Initializing external client
2025-03-04 12:24:19,241 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 12:24:19,831 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1215673
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (9.99s) 
Data loaded: 171935 records
Error: name 'transform_ts_data_info_features' is not defined

Debug Info:
Features shape: (251, 675)
Number of time lag columns: 672


Unnamed: 0,pickup_location_id,predicted_demand,pickup_hour
0,79,162.0,2025-03-04 18:00:00+00:00
1,87,29.0,2025-03-04 18:00:00+00:00
2,70,7.0,2025-03-04 18:00:00+00:00
3,143,54.0,2025-03-04 18:00:00+00:00
4,96,0.0,2025-03-04 18:00:00+00:00
...,...,...,...
246,253,0.0,2025-03-04 18:00:00+00:00
247,83,0.0,2025-03-04 18:00:00+00:00
248,207,0.0,2025-03-04 18:00:00+00:00
249,35,1.0,2025-03-04 18:00:00+00:00


In [52]:
predictions["pickup_hour"] = current_date.ceil('h')
predictions

Unnamed: 0,pickup_location_id,predicted_demand,pickup_hour
0,79,162.0,2025-03-04 18:00:00+00:00
1,87,29.0,2025-03-04 18:00:00+00:00
2,70,7.0,2025-03-04 18:00:00+00:00
3,143,54.0,2025-03-04 18:00:00+00:00
4,96,0.0,2025-03-04 18:00:00+00:00
...,...,...,...
246,253,0.0,2025-03-04 18:00:00+00:00
247,83,0.0,2025-03-04 18:00:00+00:00
248,207,0.0,2025-03-04 18:00:00+00:00
249,35,1.0,2025-03-04 18:00:00+00:00


In [53]:
from src.inference import get_feature_store

feature_group = get_feature_store().get_or_create_feature_group(
    name=config.FEATURE_GROUP_MODEL_PREDICTION,
    version=1,
    description="Predictions from LGBM Model",
    primary_key=["pickup_location_id", "pickup_hour"],
    event_time="pickup_hour",
)

2025-03-04 12:24:37,334 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-03-04 12:24:37,349 INFO: Initializing external client
2025-03-04 12:24:37,351 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-03-04 12:24:37,870 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1215673


In [54]:
feature_group.insert(predictions, write_options={"wait_for_job": False})

Uploading Dataframe: 100.00% |██████████| Rows 251/251 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: taxi_hourly_model_prediction_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1215673/jobs/named/taxi_hourly_model_prediction_1_offline_fg_materialization/executions


(Job('taxi_hourly_model_prediction_1_offline_fg_materialization', 'SPARK'),
 None)