# AMATO Production - Forecasting Batch Inference Pipeline

This notebook performs batch inference for revenue and CTR forecasting.

**Author:** Data Science Team  
**Date:** 2024

In [4]:
# Import required libraries
import pandas as pd
import numpy as np
import yaml
import logging
import os
import sys
import joblib
from pathlib import Path
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Add project root to path for imports
# Try multiple possible paths for Jupyter notebook compatibility
possible_paths = [
    Path.cwd(),  # Current working directory
    Path.cwd().parent,  # Parent of current directory
    Path.cwd().parent.parent,  # Grandparent of current directory
    Path(__file__).parent.parent.parent if '__file__' in globals() else None  # If __file__ exists
]

# Filter out None values and find the one with utils folder
project_root = None
for path in possible_paths:
    if path and (path / 'utils').exists():
        project_root = path
        break

if project_root is None:
    # Fallback: use current directory and hope for the best
    project_root = Path.cwd()

sys.path.append(str(project_root))
print(f"🔧 Using project root: {project_root}")

try:
    from utils.s3_utils import get_s3_manager
    print("✅ Successfully imported utils.s3_utils")
except ImportError as e:
    print(f"❌ Failed to import utils.s3_utils: {e}")
    print("🔧 Trying alternative import...")
    try:
        # Try relative import
        sys.path.append('.')
        from utils.s3_utils import get_s3_manager
        print("✅ Successfully imported with relative path")
    except ImportError as e2:
        print(f"❌ Alternative import also failed: {e2}")
        raise

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

🔧 Using project root: /Users/priyankmavani/Desktop/apps/amato
✅ Successfully imported utils.s3_utils


## Forecasting Batch Inference Class

In [5]:
class ForecastingBatchInference:
    def __init__(self):
        """Initialize the Forecasting Batch Inference Pipeline"""
        self.models = {}
        self.scalers = {}
        self.metadata = {}
        
    def load_trained_models(self):
        """Load trained forecasting models from S3"""
        logger.info("📥 Loading trained forecasting models...")
        
        try:
            s3_manager = get_s3_manager()
            
            # Create models directory if it doesn't exist
            models_dir = 'models/forecasting'
            os.makedirs(models_dir, exist_ok=True)
            
            # Load Revenue Forecasting model
            revenue_model_path = f'{models_dir}/revenue_forecasting_model.pkl'
            revenue_scaler_path = f'{models_dir}/revenue_forecasting_scaler.pkl'
            
            # Download Revenue Forecasting model from S3 if not exists locally
            if not os.path.exists(revenue_model_path):
                logger.info("📥 Downloading Revenue Forecasting model from S3...")
                try:
                    s3_manager.download_file('amato_pm/models/forecasting/revenue_forecasting_model.pkl', revenue_model_path)
                    logger.info("✅ Downloaded Revenue Forecasting model from S3")
                except Exception as e:
                    logger.warning(f"⚠️  Failed to download Revenue Forecasting model from S3: {e}")
            
            # Download Revenue Forecasting scaler from S3 if not exists locally
            if not os.path.exists(revenue_scaler_path):
                logger.info("📥 Downloading Revenue Forecasting scaler from S3...")
                try:
                    s3_manager.download_file('amato_pm/models/forecasting/revenue_forecasting_scaler.pkl', revenue_scaler_path)
                    logger.info("✅ Downloaded Revenue Forecasting scaler from S3")
                except Exception as e:
                    logger.warning(f"⚠️  Failed to download Revenue Forecasting scaler from S3: {e}")
            
            # Load Revenue Forecasting model if available
            if os.path.exists(revenue_model_path):
                self.models['revenue'] = joblib.load(revenue_model_path)
                if os.path.exists(revenue_scaler_path):
                    self.scalers['revenue'] = joblib.load(revenue_scaler_path)
                else:
                    self.scalers['revenue'] = None
                
                # Create metadata with correct feature columns
                self.metadata['revenue'] = {
                    'feature_columns': [
                        'rfm_score', 'frequency', 'avg_order_value', 'days_since_last_purchase',
                        'day_of_week', 'month', 'quarter', 'year', 'revenue_lag_1',
                        'revenue_lag_7', 'revenue_lag_30', 'revenue_ma_7', 'revenue_ma_30',
                        'customer_age_days'
                    ],
                    'model_type': 'RandomForestRegressor',
                    'training_date': datetime.now().isoformat()
                }
                logger.info("✅ Loaded Revenue Forecasting model")
            else:
                logger.warning("⚠️  Revenue Forecasting model not available")
            
            # Load CTR Forecasting model
            ctr_model_path = f'{models_dir}/ctr_forecasting_model.pkl'
            ctr_scaler_path = f'{models_dir}/ctr_forecasting_scaler.pkl'
            
            # Download CTR Forecasting model from S3 if not exists locally
            if not os.path.exists(ctr_model_path):
                logger.info("📥 Downloading CTR Forecasting model from S3...")
                try:
                    s3_manager.download_file('amato_pm/models/forecasting/ctr_forecasting_model.pkl', ctr_model_path)
                    logger.info("✅ Downloaded CTR Forecasting model from S3")
                except Exception as e:
                    logger.warning(f"⚠️  Failed to download CTR Forecasting model from S3: {e}")
            
            # Download CTR Forecasting scaler from S3 if not exists locally
            if not os.path.exists(ctr_scaler_path):
                logger.info("📥 Downloading CTR Forecasting scaler from S3...")
                try:
                    s3_manager.download_file('amato_pm/models/forecasting/ctr_forecasting_scaler.pkl', ctr_scaler_path)
                    logger.info("✅ Downloaded CTR Forecasting scaler from S3")
                except Exception as e:
                    logger.warning(f"⚠️  Failed to download CTR Forecasting scaler from S3: {e}")
            
            # Load CTR Forecasting model if available
            if os.path.exists(ctr_model_path):
                self.models['ctr'] = joblib.load(ctr_model_path)
                if os.path.exists(ctr_scaler_path):
                    self.scalers['ctr'] = joblib.load(ctr_scaler_path)
                else:
                    self.scalers['ctr'] = None
                
                # Create metadata with correct feature columns
                self.metadata['ctr'] = {
                    'feature_columns': [
                        'rfm_score', 'frequency', 'avg_order_value', 'day_of_week',
                        'month', 'quarter', 'year', 'ctr_lag_1', 'ctr_ma_7',
                        'customer_age_days'
                    ],
                    'model_type': 'RandomForestRegressor',
                    'training_date': datetime.now().isoformat()
                }
                logger.info("✅ Loaded CTR Forecasting model")
            else:
                logger.warning("⚠️  CTR Forecasting model not available")
            
            logger.info(f"✅ Loaded {len(self.models)} models")
            
            if len(self.models) == 0:
                logger.error("❌ No models loaded. Please ensure models are available in S3.")
                raise Exception("No models available for inference")
            
        except Exception as e:
            logger.error(f"❌ Failed to load models: {e}")
            raise
    
    def load_inference_data(self, data_path=None):
        """Load recent inference data for forecasting"""
        logger.info("📊 Loading recent inference data...")
        
        try:
            # Load recent inference data from S3
            logger.info("🔍 Loading recent inference data from S3...")
            s3_manager = get_s3_manager()
            s3_manager.load_inference_data_from_s3()
            logger.info("✅ Recent inference data loaded from S3")
            
            if data_path is None:
                data_path = 'data_pipelines/unified_dataset/output/recent_customer_dataset.parquet'
            
            if os.path.exists(data_path):
                df = pd.read_parquet(data_path)
                logger.info(f"✅ Loaded recent inference data: {len(df)} customers")
                return df
            else:
                logger.error(f"❌ Recent inference data not found at {data_path}")
                return None
                
        except Exception as e:
            logger.error(f"❌ Failed to load recent inference data: {e}")
            return None
    
    def prepare_features(self, df, target_col):
        """Prepare features for forecasting inference"""
        logger.info(f"�� Preparing features for {target_col} inference...")
        
        # Use EXACTLY the same features that were used during training
        if target_col == 'revenue':
            feature_columns = [
                'recency_days', 'frequency', 'monetary_value',
                'avg_order_value', 'total_orders', 'days_since_first_order',
                'customer_lifetime_value', 'avg_days_between_orders',
                'order_count_30d', 'order_count_90d', 'order_count_365d',
                'revenue_30d', 'revenue_90d', 'revenue_365d'
            ]
        elif target_col == 'ctr':
            feature_columns = [
                'recency_days', 'frequency', 'monetary_value',
                'avg_order_value', 'total_orders', 'days_since_first_order',
                'customer_lifetime_value', 'avg_days_between_orders',
                'order_count_30d', 'order_count_90d', 'order_count_365d',
                'revenue_30d', 'revenue_90d', 'revenue_365d'
            ]
        else:
            logger.error(f"❌ Unknown target column: {target_col}")
            return None
        
        # Filter available features
        available_features = [col for col in feature_columns if col in df.columns]
        
        if len(available_features) < 5:
            logger.warning(f"⚠️  Only {len(available_features)} features available for {target_col}")
            
        # Create feature matrix with EXACTLY the same features used in training
        X = df[available_features].copy()
        
        # Handle missing values
        X = X.fillna(X.median())
        
        # Remove outliers using IQR method
        for col in X.columns:
            Q1 = X[col].quantile(0.25)
            Q3 = X[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            X[col] = X[col].clip(lower_bound, upper_bound)
        
        logger.info(f"✅ Prepared {len(X)} customers with {len(X.columns)} features for {target_col}")
        return X
    
    def perform_revenue_forecasting(self, df_features):
        """Perform revenue forecasting inference"""
        logger.info("🎯 Performing revenue forecasting inference...")
        
        if 'revenue' not in self.models:
            logger.error("❌ Revenue Forecasting model not loaded")
            return None
        
        # Scale features if scaler exists
        if self.scalers['revenue'] is not None:
            X_scaled = self.scalers['revenue'].transform(df_features)
        else:
            X_scaled = df_features
        
        # Predict revenue
        predicted_revenue = self.models['revenue'].predict(X_scaled)
        
        # Create results dataframe
        results = df_features.copy()
        results['predicted_revenue'] = predicted_revenue
        results['revenue_category'] = pd.cut(predicted_revenue, 
                                           bins=5, 
                                           labels=['Very Low', 'Low', 'Medium', 'High', 'Very High'])
        
        logger.info(f"✅ Revenue forecasting completed: {len(results)} predictions")
        return results
    
    def perform_ctr_forecasting(self, df_features):
        """Perform CTR forecasting inference"""
        logger.info("🎯 Performing CTR forecasting inference...")
        
        if 'ctr' not in self.models:
            logger.error("❌ CTR Forecasting model not loaded")
            return None
        
        # Scale features if scaler exists
        if self.scalers['ctr'] is not None:
            X_scaled = self.scalers['ctr'].transform(df_features)
        else:
            X_scaled = df_features
        
        # Predict CTR
        predicted_ctr = self.models['ctr'].predict(X_scaled)
        
        # Create results dataframe
        results = df_features.copy()
        results['predicted_ctr'] = predicted_ctr
        results['ctr_category'] = pd.cut(predicted_ctr, 
                                       bins=5, 
                                       labels=['Very Low', 'Low', 'Medium', 'High', 'Very High'])
        
        logger.info(f"✅ CTR forecasting completed: {len(results)} predictions")
        return results
    
    def save_inference_results(self, results, model_name):
        """Save inference results directly to S3"""
        logger.info(f"💾 Saving {model_name} inference results...")
        
        try:
            s3_manager = get_s3_manager()
            
            # Save results directly to S3
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            results_key = f'models/forecasting/inference_results/{model_name}_inference_results_{timestamp}.parquet'
            
            # Convert to parquet bytes and upload
            results_bytes = results.to_parquet(index=False)
            results_success = s3_manager.upload_bytes_direct(
                results_bytes, results_key, 'application/octet-stream'
            )
            
            # Generate and save report
            report = self.generate_inference_report(results, model_name)
            report_key = f'models/forecasting/inference_results/{model_name}_inference_report_{timestamp}.yaml'
            
            report_success = s3_manager.upload_bytes_direct(
                yaml.dump(report, default_flow_style=False).encode('utf-8'),
                report_key, 'text/yaml'
            )
            
            if results_success and report_success:
                logger.info(f"✅ {model_name} results uploaded directly to S3")
                return results_key, report_key
            else:
                logger.warning(f"⚠️  Some {model_name} results failed to upload to S3")
                return None, None
                
        except Exception as e:
            logger.error(f"❌ Failed to save {model_name} results: {e}")
            return None, None
    
    def generate_inference_report(self, results, model_name):
        """Generate inference report"""
        logger.info(f"📋 Generating {model_name} inference report...")
        
        if model_name == 'revenue':
            report = {
                'model_name': model_name,
                'inference_date': datetime.now().isoformat(),
                'total_customers': len(results),
                'predicted_revenue_stats': {
                    'mean': float(results['predicted_revenue'].mean()),
                    'median': float(results['predicted_revenue'].median()),
                    'std': float(results['predicted_revenue'].std())
                },
                'revenue_category_distribution': results['revenue_category'].value_counts().to_dict(),
                'feature_summary': {
                    'total_features': len(results.columns),
                    'numeric_features': len(results.select_dtypes(include=[np.number]).columns),
                    'categorical_features': len(results.select_dtypes(include=['object']).columns)
                }
            }
        elif model_name == 'ctr':
            report = {
                'model_name': model_name,
                'inference_date': datetime.now().isoformat(),
                'total_customers': len(results),
                'predicted_ctr_stats': {
                    'mean': float(results['predicted_ctr'].mean()),
                    'median': float(results['predicted_ctr'].median()),
                    'std': float(results['predicted_ctr'].std())
                },
                'ctr_category_distribution': results['ctr_category'].value_counts().to_dict(),
                'feature_summary': {
                    'total_features': len(results.columns),
                    'numeric_features': len(results.select_dtypes(include=[np.number]).columns),
                    'categorical_features': len(results.select_dtypes(include=['object']).columns)
                }
            }
        else:
            report = {
                'model_name': model_name,
                'inference_date': datetime.now().isoformat(),
                'total_customers': len(results),
                'feature_summary': {
                    'total_features': len(results.columns),
                    'numeric_features': len(results.select_dtypes(include=[np.number]).columns),
                    'categorical_features': len(results.select_dtypes(include=['object']).columns)
                }
            }
        
        return report
    
    def create_inference_visualizations(self, results, model_name):
        """Create inference visualizations and upload directly to S3"""
        logger.info(f"📊 Creating {model_name} inference visualizations...")
        
        try:
            s3_manager = get_s3_manager()
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            
            if model_name == 'revenue':
                # Predicted revenue distribution
                fig1 = px.histogram(
                    results, x='predicted_revenue',
                    title=f'{model_name.replace("_", " ").title()} Predicted Revenue Distribution',
                    labels={'predicted_revenue': 'Predicted Revenue', 'count': 'Customer Count'}
                )
                
                # Revenue category distribution
                fig2 = px.pie(
                    values=results['revenue_category'].value_counts().values,
                    names=results['revenue_category'].value_counts().index,
                    title=f'{model_name.replace("_", " ").title()} Revenue Category Distribution'
                )
                
                # Upload visualizations directly to S3
                html1_key = f'models/forecasting/inference_results/{model_name}_revenue_distribution_{timestamp}.html'
                html2_key = f'models/forecasting/inference_results/{model_name}_revenue_categories_{timestamp}.html'
                
            elif model_name == 'ctr':
                # Predicted CTR distribution
                fig1 = px.histogram(
                    results, x='predicted_ctr',
                    title=f'{model_name.replace("_", " ").title()} Predicted CTR Distribution',
                    labels={'predicted_ctr': 'Predicted CTR', 'count': 'Customer Count'}
                )
                
                # CTR category distribution
                fig2 = px.pie(
                    values=results['ctr_category'].value_counts().values,
                    names=results['ctr_category'].value_counts().index,
                    title=f'{model_name.replace("_", " ").title()} CTR Category Distribution'
                )
                
                # Upload visualizations directly to S3
                html1_key = f'models/forecasting/inference_results/{model_name}_ctr_distribution_{timestamp}.html'
                html2_key = f'models/forecasting/inference_results/{model_name}_ctr_categories_{timestamp}.html'
            
            # Convert figures to HTML and upload
            html1_bytes = fig1.to_html().encode('utf-8')
            html2_bytes = fig2.to_html().encode('utf-8')
            
            s3_manager.upload_bytes_direct(html1_bytes, html1_key, 'text/html')
            s3_manager.upload_bytes_direct(html2_bytes, html2_key, 'text/html')
            
            logger.info(f"✅ {model_name} visualizations uploaded directly to S3")
            
        except Exception as e:
            logger.error(f"❌ Failed to create {model_name} visualizations: {e}")
    
    def run_batch_inference(self, data_path=None, models=None):
        """Run batch inference for all models"""
        logger.info("🚀 Starting Forecasting Batch Inference...")
        
        try:
            # Load models
            self.load_trained_models()
            
            # Load data
            df = self.load_inference_data(data_path)
            if df is None:
                raise Exception("Failed to load inference data")
            
            # Determine which models to run
            if models is None:
                models = list(self.models.keys())
            
            all_results = {}
            
            for model_name in models:
                if model_name not in self.models:
                    logger.warning(f"⚠️ Model {model_name} not found, skipping...")
                    continue
                
                # Prepare features
                df_features = self.prepare_features(df, model_name)
                
                if df_features is None or len(df_features) == 0:
                    logger.warning(f"⚠️  No features prepared for {model_name}, skipping...")
                    continue
                
                # Perform inference
                if model_name == 'revenue':
                    results = self.perform_revenue_forecasting(df_features)
                elif model_name == 'ctr':
                    results = self.perform_ctr_forecasting(df_features)
                else:
                    logger.warning(f"⚠️  Unknown model: {model_name}")
                    continue
                
                if results is not None:
                    # Save results
                    results_file, report_file = self.save_inference_results(results, model_name)
                    
                    # Create visualizations
                    self.create_inference_visualizations(results, model_name)
                    
                    all_results[model_name] = results
                    
                    logger.info(f"✅ {model_name} batch inference completed")
            
            logger.info("=" * 60)
            logger.info("🎉 BATCH INFERENCE COMPLETED!")
            logger.info("=" * 60)
            logger.info(f"📊 Processed {len(df)} customers")
            logger.info(f"🎯 Ran inference for {len(all_results)} models")
            
            return all_results
            
        except Exception as e:
            logger.error(f"❌ Error in batch inference: {e}")
            raise

## Run the Pipeline

In [6]:
# Initialize and run the pipeline
if __name__ == "__main__":
    inference = ForecastingBatchInference()
    results = inference.run_batch_inference()
    
    print("\n🎉 Forecasting Batch Inference completed successfully!")
    print(f"📊 Results saved to models/forecasting/inference_results/")
    print("📈 Ready for revenue and CTR forecasting analysis!")

INFO:__main__:🚀 Starting Forecasting Batch Inference...
INFO:__main__:📥 Loading trained forecasting models...
INFO:__main__:✅ Loaded Revenue Forecasting model
INFO:__main__:✅ Loaded CTR Forecasting model
INFO:__main__:✅ Loaded 2 models
INFO:__main__:📊 Loading recent inference data...
INFO:__main__:🔍 Loading recent inference data from S3...
INFO:utils.s3_utils:Loading recent inference data from S3 (last 3 months)...
INFO:utils.s3_utils:Loading data newer than 2025-06-03
INFO:utils.s3_utils:Downloading s3://nuscale-data-services-public/amato_pm/data_pipelines/unified_dataset/output//unified_customer_dataset.parquet to data_pipelines/unified_dataset/output/unified_customer_dataset.parquet
INFO:utils.s3_utils:Downloading s3://nuscale-data-services-public/amato_pm/data_pipelines/unified_dataset/output//unified_dataset_report.yaml to data_pipelines/unified_dataset/output/unified_dataset_report.yaml
INFO:utils.s3_utils:Downloading s3://nuscale-data-services-public/amato_pm/data_pipelines/unif


🎉 Forecasting Batch Inference completed successfully!
📊 Results saved to models/forecasting/inference_results/
📈 Ready for revenue and CTR forecasting analysis!
