In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from datetime import datetime, timedelta
import pandas as pd

current_date = pd.to_datetime(datetime.utcnow(), utc=True).floor('H')
print(f'{current_date=}')
current_date

current_date=Timestamp('2025-08-10 21:00:00+0000', tz='UTC')


Timestamp('2025-08-10 21:00:00+0000', tz='UTC')

In [4]:
# Versión robusta para cargar features
import pandas as pd
from datetime import datetime, timedelta
import hopsworks
import src.config as config

def load_features_robust(current_date):
    """Cargar features de manera robusta"""
    print(f"Loading features for: {current_date}")
    
    try:
        # Conectar usando config.py (no variables de entorno)
        project = hopsworks.login(
            project=config.HOPSWORKS_PROJECT_NAME,
            api_key_value=config.HOPSWORKS_API_KEY
        )
        
        fs = project.get_feature_store()
        
        # Primero, verificar qué feature groups existen
        try:
            fg = fs.get_feature_group(
                name=config.FEATURE_GROUP_NAME,
                version=config.FEATURE_GROUP_VERSION
            )
            print(f"✅ Feature group found: {fg.name}")
            print(f"📊 Schema: {[f.name for f in fg.features][:10]}...") # Primeras 10 columnas
            
            # Intentar leer datos del feature group directamente
            try:
                # Método 1: Leer directamente sin limit
                data = fg.read()
                print(f"✅ Read {len(data)} rows directly from feature group")
                print(f"Columns: {list(data.columns)}")
                print(f"Date range: {data['pickup_hour'].min()} to {data['pickup_hour'].max()}")
                return data
                
            except Exception as read_error:
                print(f"⚠️ Direct read failed: {str(read_error)}")
                
                # Método 2: Usar feature view si el FG directo falla
                return load_via_feature_view(fs, current_date)
                
        except Exception as fg_error:
            print(f"❌ Feature group error: {str(fg_error)}")
            raise
            
    except Exception as e:
        print(f"❌ Connection error: {str(e)}")
        raise

def load_via_feature_view(fs, current_date):
    """Cargar datos usando feature view como fallback"""
    print("🔄 Trying feature view approach...")
    
    fv = fs.get_feature_view(
        name=config.FEATURE_VIEW_NAME,
        version=config.FEATURE_VIEW_VERSION
    )
    
    # Calcular fechas
    fetch_data_from = current_date - timedelta(days=28)
    fetch_data_to = current_date - timedelta(hours=1)
    
    print(f"Fetching from {fetch_data_from} to {fetch_data_to}")
    
    # Inicializar batch scoring
    if hasattr(fv, '_batch_scoring_server') and not fv._batch_scoring_server._serving_initialized:
        print("Initializing batch scoring...")
        fv.init_batch_scoring()
    
    # Intentar con diferentes configuraciones
    configs = [
        {"arrow_flight_config": {"timeout": 300}},
        {"arrow_flight_config": {"timeout": 180}},
        {}
    ]
    
    for i, read_options in enumerate(configs):
        try:
            print(f"Trying config {i+1}/3...")
            ts_data = fv.get_batch_data(
                start_time=pd.to_datetime(fetch_data_from - timedelta(days=1), utc=True),
                end_time=pd.to_datetime(fetch_data_to + timedelta(days=1), utc=True),
                read_options=read_options
            )
            print(f"✅ Success with config {i+1}: {len(ts_data)} rows")
            return ts_data
            
        except Exception as e:
            print(f"⚠️ Config {i+1} failed: {str(e)}")
            continue
    
    raise Exception("All feature view configs failed")

# Ejecutar
try:
    features = load_features_robust(current_date)
    print("\n🎯 FEATURES LOADED SUCCESSFULLY!")
    print(f"Shape: {features.shape}")
    print(f"Columns: {list(features.columns)[:10]}...")  # Primeras 10
except Exception as e:
    print(f"\n❌ FAILED TO LOAD FEATURES: {str(e)}")
    print("\n💡 NEXT STEPS:")
    print("1. Run the feature pipeline first: notebooks/12_feature_pipeline.ipynb")
    print("2. Check if data exists in Hopsworks UI")
    print("3. Verify your feature group schema matches your data")
    raise

Loading features for: 2025-08-10 21:00:00+00:00
2025-08-10 23:08:13,662 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-08-10 23:08:13,666 INFO: Initializing external client
2025-08-10 23:08:13,667 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-08-10 23:08:14,763 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/100501
✅ Feature group found: time_series_hourly_feature_group
📊 Schema: ['pickup_hour', 'rides', 'pickup_location_id']...
2025-08-10 23:08:34,972 ERROR: [Errno 2] Opening HDFS file '/apps/hive/warehouse/bike_sharing_demand_featurestore.db/time_series_hourly_feature_group_1/.hoodie/hoodie.properties' failed. Detail: [errno 2] No such file or directory. Detail: Python exception: Traceback (most recent call last):
  File "/usr/src/app/src/server.py", line 142, in wrapper
    result = func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/src/app/src/server.py", line 166,

Exception: All feature view configs failed

In [None]:
# Mostrar información sobre las features cargadas
if 'features' in locals():
    print("📊 FEATURES SUMMARY:")
    print(f"Shape: {features.shape}")
    print(f"Columns: {list(features.columns)}")
    print(f"\nFirst few rows:")
    display(features.head())
    
    print(f"\nDate info:")
    if 'pickup_hour' in features.columns:
        print(f"Date range: {features['pickup_hour'].min()} to {features['pickup_hour'].max()}")
        print(f"Unique dates: {features['pickup_hour'].nunique()}")
    
    if 'pickup_location_id' in features.columns:
        print(f"Unique locations: {features['pickup_location_id'].nunique()}")
else:
    print("❌ Features not loaded")

In [None]:
# Cargar modelo y hacer predicciones (solo si features están disponibles)
if 'features' in locals() and len(features) > 0:
    try:
        from src.model_registry_api import get_latest_model_from_registry
        from src.inference import get_model_predictions

        print("🤖 Loading model...")
        model = get_latest_model_from_registry(
            model_name='bike_demand_predictor_next_hour', 
            status='Production'
        )
        
        print("🔮 Making predictions...")
        predictions = get_model_predictions(model, features)
        predictions['pickup_hour'] = current_date
        
        print(f"✅ Generated {len(predictions)} predictions")
        display(predictions.head())
        
    except Exception as e:
        print(f"❌ Model prediction failed: {str(e)}")
        # Crear predicciones dummy para testing
        print("🔧 Creating dummy predictions for testing...")
        predictions = pd.DataFrame({
            'pickup_location_id': features['pickup_location_id'],
            'predicted_demand': [1.0] * len(features),  # Dummy predictions
            'pickup_hour': current_date
        })
        print(f"Created {len(predictions)} dummy predictions")
else:
    print("❌ Cannot make predictions without features")

In [None]:
# Guardar predicciones en feature store (solo si existen)
if 'predictions' in locals():
    try:
        from src.feature_store_api import get_feature_store
        import src.config as config

        print("💾 Saving predictions to feature store...")
        
        # Conectar al feature store
        feature_group = get_feature_store().get_or_create_feature_group(
            name=config.FEATURE_GROUP_MODEL_PREDICTIONS,
            version=1,
            description="Predictions generated by our production model",
            primary_key=['pickup_location_id', 'pickup_hour'],
            event_time='pickup_hour',
        )
        
        # Insertar predicciones
        job, _ = feature_group.insert(
            predictions, 
            write_options={"wait_for_job": False}
        )
        
        print(f"✅ Predictions saved successfully!")
        print(f"Job URL: {job.get_url() if hasattr(job, 'get_url') else 'N/A'}")
        
    except Exception as e:
        print(f"❌ Failed to save predictions: {str(e)}")
        print("⚠️ This might be OK for testing purposes")
else:
    print("❌ No predictions to save")