In [1]:
!pip install xgboost statsmodels matplotlib seaborn plotly mlflow boto3
print("Required packages installed")

Collecting mlflow
  Downloading mlflow-2.21.3-py3-none-any.whl.metadata (30 kB)
Collecting boto3
  Downloading boto3-1.37.37-py3-none-any.whl.metadata (6.7 kB)
Collecting mlflow-skinny==2.21.3 (from mlflow)
  Downloading mlflow_skinny-2.21.3-py3-none-any.whl.metadata (31 kB)
Collecting alembic!=1.10.0,<2 (from mlflow)
  Downloading alembic-1.15.2-py3-none-any.whl.metadata (7.3 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==2.21.3->mlflow)
  Downloading databricks_sdk-0.50.0-py3-none-any.whl.metadata (38 kB)
Collecting fastapi<1 (from mlflow-skinny==2.21.3->mlflow)
  Downloading fastapi-0.115.12-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn<1 (from mlflow-skinn

In [2]:
import os
import pandas as pd
import numpy as np
import pickle
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
import logging
import boto3
from io import StringIO

# Configure prettier plots
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette("muted")
sns.set_context("talk")

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Suppress warnings for cleaner notebook output
warnings.filterwarnings("ignore")

print("Libraries imported successfully")
print("Environment setup complete")

Libraries imported successfully
Environment setup complete


In [15]:
class S3Connector:
    """Utility class for connecting to S3 and retrieving files."""

    def __init__(self,
                 aws_access_key_id=None,
                 aws_secret_access_key=None,
                 region_name='us-east-1'):
        """
        Initialize S3 client.

        Args:
            aws_access_key_id (str, optional): AWS access key ID
            aws_secret_access_key (str, optional): AWS secret access key
            region_name (str, optional): AWS region name
        """
        self.s3_client = boto3.client(
            's3',
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=region_name
        )
        self.region_name = region_name
        print(f"✅ S3 client initialized for region: {region_name}")

    def read_csv_from_s3(self, bucket_name, key):
        """
        Read CSV file from S3 bucket.

        Args:
            bucket_name (str): S3 bucket name
            key (str): Object key (file path in bucket)

        Returns:
            pandas.DataFrame: DataFrame containing CSV data
        """
        try:
            print(f"Reading file from S3: s3://{bucket_name}/{key}")
            response = self.s3_client.get_object(Bucket=bucket_name, Key=key)
            csv_content = response['Body'].read().decode('utf-8')
            df = pd.read_csv(StringIO(csv_content))
            print(f"✅ Successfully read CSV from S3 with {len(df):,} rows")
            return df
        except Exception as e:
            print(f"❌ Error reading file from S3: {str(e)}")
            return None

    def save_file_to_s3(self, local_file_path, bucket_name, key):
        """
        Save a local file to S3 bucket.

        Args:
            local_file_path (str): Local file path
            bucket_name (str): S3 bucket name
            key (str): Object key (file path in bucket)

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            print(f"Uploading file to S3: s3://{bucket_name}/{key}")
            self.s3_client.upload_file(local_file_path, bucket_name, key)
            print(f"✅ Successfully uploaded file to S3")
            return True
        except Exception as e:
            print(f"❌ Error uploading file to S3: {str(e)}")
            return False

    def save_data_to_s3(self, data, bucket_name, key, content_type=None):
        """
        Save data directly to S3 without creating a local file first.

        Args:
            data: Data to save (bytes, string, or serializable object)
            bucket_name (str): S3 bucket name
            key (str): Object key (file path in bucket)
            content_type (str, optional): Content type of the data

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            print(f"Saving data to S3: s3://{bucket_name}/{key}")

            # Handle different data types
            if isinstance(data, bytes):
                body = data
            elif isinstance(data, str):
                body = data.encode('utf-8')
            else:
                # For other objects like dictionaries, try to JSON serialize
                try:
                    body = json.dumps(data).encode('utf-8')
                    if not content_type:
                        content_type = 'application/json'
                except:
                    print(f"❌ Error: Data type not supported for direct S3 upload")
                    return False

            # Set up the upload parameters
            params = {
                'Bucket': bucket_name,
                'Key': key,
                'Body': body
            }

            if content_type:
                params['ContentType'] = content_type

            # Upload the data
            self.s3_client.put_object(**params)
            print(f"✅ Successfully saved data to S3")
            return True

        except Exception as e:
            print(f"❌ Error saving data to S3: {str(e)}")
            return False

    def save_pickle_to_s3(self, obj, bucket_name, key):
        """
        Pickle an object and save it directly to S3.

        Args:
            obj: Python object to pickle
            bucket_name (str): S3 bucket name
            key (str): Object key (file path in bucket)

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            print(f"Saving pickled object to S3: s3://{bucket_name}/{key}")

            # Pickle the object to a bytes buffer
            import pickle
            from io import BytesIO

            buffer = BytesIO()
            pickle.dump(obj, buffer)
            buffer.seek(0)

            # Upload the pickled object
            self.s3_client.upload_fileobj(buffer, bucket_name, key)
            print(f"✅ Successfully saved pickled object to S3")
            return True

        except Exception as e:
            print(f"❌ Error pickling object to S3: {str(e)}")
            return False

    def save_figure_to_s3(self, figure, bucket_name, key, dpi=300, format='png'):
        """
        Save a matplotlib figure directly to S3.

        Args:
            figure: Matplotlib figure object
            bucket_name (str): S3 bucket name
            key (str): Object key (file path in bucket)
            dpi (int): DPI for the figure
            format (str): Format to save the figure ('png', 'jpg', etc.)

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            print(f"Saving figure to S3: s3://{bucket_name}/{key}")

            # Save figure to a bytes buffer
            from io import BytesIO
            buffer = BytesIO()
            figure.savefig(buffer, format=format, dpi=dpi)
            buffer.seek(0)

            # Set appropriate content type based on format
            content_type = f"image/{format}"

            # Upload the figure
            self.s3_client.upload_fileobj(
                buffer,
                bucket_name,
                key,
                ExtraArgs={'ContentType': content_type}
            )
            print(f"✅ Successfully saved figure to S3")
            return True

        except Exception as e:
            print(f"❌ Error saving figure to S3: {str(e)}")
            return False

    def check_s3_path_exists(self, bucket_name, prefix):
        """
        Check if an S3 path (prefix) exists.

        Args:
            bucket_name (str): S3 bucket name
            prefix (str): Path prefix to check

        Returns:
            bool: True if path exists, False otherwise
        """
        try:
            # Ensure the prefix ends with a slash if it's meant to be a directory
            if prefix and not prefix.endswith('/'):
                prefix += '/'

            response = self.s3_client.list_objects_v2(
                Bucket=bucket_name,
                Prefix=prefix,
                MaxKeys=1
            )

            return 'Contents' in response
        except Exception as e:
            print(f"❌ Error checking S3 path: {str(e)}")
            return False

    def create_s3_directory(self, bucket_name, directory):
        """
        Create a directory structure in S3 (by adding an empty object with trailing slash).

        Args:
            bucket_name (str): S3 bucket name
            directory (str): Directory path to create

        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Ensure the directory path ends with a slash
            if not directory.endswith('/'):
                directory += '/'

            # Create an empty object with the directory name (S3 convention for directories)
            self.s3_client.put_object(
                Bucket=bucket_name,
                Key=directory,
                Body=''
            )

            print(f"✅ Created S3 directory: s3://{bucket_name}/{directory}")
            return True
        except Exception as e:
            print(f"❌ Error creating S3 directory: {str(e)}")
            return False

In [4]:
class DataUtils:
    """Utility class for data loading and preprocessing operations."""

    @staticmethod
    def load_data(s3_connector, bucket_name, key):
        """
        Load data from S3 bucket.

        Args:
            s3_connector (S3Connector): S3 connector instance
            bucket_name (str): S3 bucket name
            key (str): Object key (file path in bucket)

        Returns:
            pandas.DataFrame: Loaded data
        """
        print(f"Loading data from S3: s3://{bucket_name}/{key}")
        data = s3_connector.read_csv_from_s3(bucket_name, key)

        if data is None:
            print(f"❌ Error loading data from S3")
            return None

        print(f"✅ Successfully loaded {len(data):,} records with {len(data.columns)} columns")

        # Display sample data
        print("\n📊 Data Preview:")
        print(data.head())

        # Display data info
        print("\n📋 Data Information:")
        print(f"Shape: {data.shape}")
        print(f"Columns: {', '.join(data.columns)}")
        print(f"Date range: {pd.to_datetime(data['date']).min()} to {pd.to_datetime(data['date']).max()}")

        # Check for missing values
        missing_values = data.isnull().sum().sum()
        print(f"Missing values: {missing_values:,} ({missing_values/data.size:.2%} of all data)")

        return data

    @staticmethod
    def create_output_dirs(output_dir):
        """
        Create output directories for models and plots.

        Args:
            output_dir (str): Base output directory

        Returns:
            tuple: Paths to model registry and plots directories
        """
        print(f"Creating output directories in: {output_dir}")

        # Create main output directory
        os.makedirs(output_dir, exist_ok=True)

        # Create subdirectories
        model_registry_dir = os.path.join(output_dir, "model_registry")
        plots_dir = os.path.join(output_dir, "plots")

        os.makedirs(model_registry_dir, exist_ok=True)
        os.makedirs(plots_dir, exist_ok=True)

        print(f"✅ Created directory structure:")
        print(f"  - 📁 {output_dir} (main)")
        print(f"  - 📁 {model_registry_dir} (models)")
        print(f"  - 📁 {plots_dir} (visualizations)")

        return model_registry_dir, plots_dir


In [5]:
class DataPreprocessor:
    """Class for preprocessing wheat price data."""

    def __init__(self):
        """Initialize the preprocessor."""
        self.date_col = None

    def preprocess_retail_prices(self, raw_data):
        """
        Preprocess retail price data.

        Args:
            raw_data (pandas.DataFrame): Raw input data

        Returns:
            pandas.DataFrame: Preprocessed retail data
        """
        print("Starting data preprocessing...")

        # Convert MSP from quintal to KG
        if 'MSP_Wheat' in raw_data.columns:
            raw_data['MSP_Wheat_KG'] = raw_data['MSP_Wheat'] / 100
            print("Converted MSP from quintal to KG")
        else:
            raw_data['MSP_Wheat_KG'] = 0
            print("MSP_Wheat column not found, using 0 as default")

        # Keep only retail prices if price type available
        if 'pricetype' in raw_data.columns:
            df_retail = raw_data[raw_data['pricetype'].str.lower() == 'retail'].copy()
            print(f"Filtered {len(df_retail):,} retail price records")
        else:
            df_retail = raw_data.copy()
            print(f"No price type column found. Using all {len(df_retail):,} records")

        # Initialize price_per_KG column
        if 'price' in df_retail.columns:
            df_retail['price_per_KG'] = df_retail['price']
            print("Using 'price' column as price_per_KG")
        else:
            df_retail['price_per_KG'] = 0
            print("No price column found, using 0 as default")

        # Drop rows with missing state values
        if 'state' in df_retail.columns:
            pre_count = len(df_retail)
            df_retail = df_retail[df_retail['state'].notna() & (df_retail['state'].str.strip() != '')]
            dropped = pre_count - len(df_retail)
            print(f"Dropped {dropped:,} rows with missing states ({dropped/pre_count:.2%})")
            print(f"Remaining records: {len(df_retail):,}")

        # Convert date to datetime
        self.date_col = self._find_date_column(df_retail)

        if self.date_col:
            print(f"Converting '{self.date_col}' to datetime")
            df_retail['date'] = pd.to_datetime(df_retail[self.date_col])
        else:
            print("❌ No date column found. Cannot proceed with time-based modeling")
            return None

        # Extract time features
        print("Extracting time features...")
        df_retail = self._add_time_features(df_retail)

        # Sort for logical imputation order
        df_retail = df_retail.sort_values(['state', 'date'])
        print("Sorted data by state and date")

        # Handle missing values
        print("Handling missing values...")
        df_retail = self._handle_missing_values(df_retail)

        # Filter for wheat only if commodity column exists
        if 'commodity' in df_retail.columns:
            pre_count = len(df_retail)
            wheat_data = df_retail[df_retail['commodity'].str.lower() == 'wheat'].copy()
            print(f"Filtered for wheat commodity: {len(wheat_data):,} records from {pre_count:,}")
        else:
            wheat_data = df_retail.copy()
            print(f"No commodity column found. Using all {len(wheat_data):,} records")

        print("✅ Preprocessing complete")

        # Display summary statistics
        print("\n📊 Summary Statistics for Preprocessed Data:")
        print(wheat_data.describe())

        return wheat_data

    def _find_date_column(self, df):
        """Find the date column in the dataframe."""
        date_candidates = ['date', 'date_x', 'Date']
        for col in date_candidates:
            if col in df.columns:
                return col
        return None

    def _add_time_features(self, df):
        """Add time-based features to the dataframe."""
        # Basic time components
        df['year'] = df['date'].dt.year
        df['month_num'] = df['date'].dt.month
        df['day'] = df['date'].dt.day
        df['quarter'] = df['date'].dt.quarter

        # Cyclical time features - these capture seasonality better
        df['month_sin'] = np.sin(2 * np.pi * df['month_num'] / 12)
        df['month_cos'] = np.cos(2 * np.pi * df['month_num'] / 12)
        df['quarter_sin'] = np.sin(2 * np.pi * df['quarter'] / 4)
        df['quarter_cos'] = np.cos(2 * np.pi * df['quarter'] / 4)

        print("Added time features: year, month, day, quarter")
        print("Added cyclical features: month_sin, month_cos, quarter_sin, quarter_cos")

        return df

    def _handle_missing_values(self, df):
        """Handle missing values in the dataframe."""
        # Fill Rainfall using state and month group mean
        if 'Rainfall' in df.columns:
            missing_before = df['Rainfall'].isna().sum()
            df['Rainfall'] = df.groupby(['state', 'month_num'])['Rainfall'].transform(
                lambda x: x.fillna(x.mean())
            )
            missing_after = df['Rainfall'].isna().sum()
            print(f"Filled {missing_before - missing_after:,} missing Rainfall values")

        # Fill diesel price forward within each state
        if 'Diesel Price' in df.columns:
            missing_before = df['Diesel Price'].isna().sum()
            df['diesel_price'] = df.groupby('state')['Diesel Price'].ffill()
            missing_after = df['diesel_price'].isna().sum()
            print(f"Filled {missing_before - missing_after:,} missing Diesel Price values")

        # Convert ROC columns to numeric and handle missing values
        if 'Diesel ROC' in df.columns:
            df['Diesel ROC'] = pd.to_numeric(df['Diesel ROC'], errors='coerce')
            missing_before = df['Diesel ROC'].isna().sum()
            df['Diesel ROC'].fillna(method='ffill', inplace=True)
            missing_after = df['Diesel ROC'].isna().sum()
            print(f"Filled {missing_before - missing_after:,} missing Diesel ROC values")

        if 'Wheat ROC' in df.columns:
            df['Wheat ROC'] = pd.to_numeric(df['Wheat ROC'], errors='coerce')
            missing_before = df['Wheat ROC'].isna().sum()
            df['Wheat ROC'].fillna(method='ffill', inplace=True)
            missing_after = df['Wheat ROC'].isna().sum()
            print(f"Filled {missing_before - missing_after:,} missing Wheat ROC values")

        # Calculate price ratios if not already present
        if 'diesel_price' in df.columns and 'price_per_KG' in df.columns:
            df['Diesel / Wheat Price Ratio'] = df['diesel_price'] / df['price_per_KG'].replace(0, np.nan)
            print("Calculated 'Diesel / Wheat Price Ratio'")

        # Report remaining missing values
        missing_counts = df.isnull().sum()
        if missing_counts.sum() > 0:
            print("\nRemaining missing values:")
            for col in missing_counts[missing_counts > 0].index:
                print(f"- {col}: {missing_counts[col]:,} missing values")
        else:
            print("No missing values remain in the dataset")

        return df


In [6]:
class FeatureEngineer:
    """Class for feature engineering on time series data."""

    def engineer_features(self, df_state):
        """
        Add time series features to the data.

        Args:
            df_state (pandas.DataFrame): State data with date index

        Returns:
            pandas.DataFrame: Data with added features
        """
        print("Adding time series features...")

        # Set date as index if not already
        if not isinstance(df_state.index, pd.DatetimeIndex):
            if 'date' in df_state.columns:
                df_state = df_state.set_index('date', drop=False)
                df_state = df_state.sort_index()
                print("Set date as index and sorted data")

        # Add time lags for price
        for lag in [1, 3, 6, 12]:  # 1, 3, 6, 12 month lags
            lag_col = f'lag_{lag}m'
            df_state[lag_col] = df_state['price_per_KG'].shift(lag)
            print(f"Added {lag_col} feature")

        # Add rolling statistics
        for window in [3, 6]:
            # Rolling mean
            mean_col = f'rolling_mean_{window}m'
            df_state[mean_col] = df_state['price_per_KG'].rolling(window=window).mean()
            print(f"Added {mean_col} feature")

            # Rolling std
            std_col = f'rolling_std_{window}m'
            df_state[std_col] = df_state['price_per_KG'].rolling(window=window).std()
            print(f"Added {std_col} feature")

        # Calculate rate of change
        for period in [1, 3]:
            roc_col = f'roc_{period}m'
            df_state[roc_col] = df_state['price_per_KG'].pct_change(periods=period)
            print(f"Added {roc_col} feature")

        # Add MSP to retail price ratio
        if 'MSP_Wheat_KG' in df_state.columns:
            df_state['MSP_to_retail_ratio'] = df_state['MSP_Wheat_KG'] / df_state['price_per_KG']
            print("Added MSP_to_retail_ratio feature")

        # Report on engineered features
        feature_count = len(df_state.columns) - len(df_state.select_dtypes(include=['object']).columns)
        print(f"✅ Feature engineering complete. Dataset now has {feature_count} numeric features")

        # Display correlations with target
        print("\n📊 Correlation with price_per_KG:")
        # Select only numeric columns before calculating correlation
        numeric_df_state = df_state.select_dtypes(include=['number'])
        correlations = numeric_df_state.corr()['price_per_KG'].sort_values(ascending=False)
        print(correlations.head(10))

        return df_state


In [16]:
class ModelTrainer:
    """Class for training and evaluating forecasting models."""

    def __init__(self, s3_connector, s3_bucket, output_prefix):
        """
        Initialize the model trainer with S3 storage.

        Args:
            s3_connector (S3Connector): S3 connector instance
            s3_bucket (str): S3 bucket for saving models and plots
            output_prefix (str): Prefix path in the S3 bucket
        """
        self.s3_connector = s3_connector
        self.s3_bucket = s3_bucket
        self.output_prefix = output_prefix
        self.feature_cols = None

        # Create S3 directory structure
        self.model_registry_prefix = f"{output_prefix}/model_registry"
        self.plots_prefix = f"{output_prefix}/plots"

        # Ensure directories exist in S3
        self.s3_connector.create_s3_directory(s3_bucket, self.model_registry_prefix)
        self.s3_connector.create_s3_directory(s3_bucket, self.plots_prefix)

        print(f"✅ Set up S3 directory structure:")
        print(f"  - 📁 s3://{s3_bucket}/{self.model_registry_prefix} (models)")
        print(f"  - 📁 s3://{s3_bucket}/{self.plots_prefix} (visualizations)")

    def train_models_for_state(self, state, df_state, split_date):
        """
        Train models for a specific state.

        Args:
            state (str): State name
            df_state (pandas.DataFrame): Preprocessed data for the state
            split_date (str): Date to split train/test data

        Returns:
            dict: State model results
        """
        print(f"\n{'='*80}")
        print(f"📌 Processing state: {state.title()}")
        print(f"{'='*80}")

        try:
            # Check if we have enough data
            if len(df_state) < 24:  # Skip states with insufficient data
                print(f"⚠️ Skipping {state.title()} - insufficient data (only {len(df_state)} records)")
                return None

            print(f"Found {len(df_state):,} records for {state.title()}")

            # Define feature columns based on available columns
            self.feature_cols = self._get_feature_columns(df_state)

            # Prepare data for modeling
            print(f"Preparing train/test split using date {split_date}")

            # Split based on date
            train_data = df_state[df_state.index < split_date].copy()
            test_data = df_state[df_state.index >= split_date].copy()

            print(f"Train data: {len(train_data):,} records ({len(train_data)/len(df_state):.1%})")
            print(f"Test data: {len(test_data):,} records ({len(test_data)/len(df_state):.1%})")

            # Drop rows with NaN values in key columns
            train_data = train_data.dropna(subset=['price_per_KG'] +
                                         [col for col in self.feature_cols if col in train_data.columns])
            test_data = test_data.dropna(subset=['price_per_KG'] +
                                        [col for col in self.feature_cols if col in test_data.columns])

            print(f"After dropping NaN values - Train: {len(train_data):,}, Test: {len(test_data):,}")

            if len(train_data) < 12 or len(test_data) < 4:
                print(f"⚠️ Skipping {state.title()} - insufficient data after cleaning")
                return None

            # Feature scaling
            scaler_features = StandardScaler()

            # Extract features and target
            X_train = pd.DataFrame(
                scaler_features.fit_transform(train_data[self.feature_cols]),
                columns=self.feature_cols,
                index=train_data.index
            )
            y_train = train_data['price_per_KG'].values

            X_test = pd.DataFrame(
                scaler_features.transform(test_data[self.feature_cols]),
                columns=self.feature_cols,
                index=test_data.index
            )
            y_test = test_data['price_per_KG'].values

            print(f"Final feature matrix shapes - X_train: {X_train.shape}, X_test: {X_test.shape}")

            # Train models
            models = {}
            model_metrics = {}

            # 1. Random Forest
            print("\n🔹 Training Random Forest model...")
            rf_model, rf_metrics, rf_importance = self._train_random_forest(X_train, y_train, X_test, y_test)
            models['random_forest'] = rf_model
            model_metrics['random_forest'] = rf_metrics

            # 2. XGBoost
            print("\n🔹 Training XGBoost model...")
            xgb_model, xgb_metrics, xgb_importance = self._train_xgboost(X_train, y_train, X_test, y_test)
            models['xgboost'] = xgb_model
            model_metrics['xgboost'] = xgb_metrics

            # 3. Holt-Winters
            print("\n🔹 Training Holt-Winters model...")
            hw_fit, hw_metrics = self._train_holt_winters(train_data, test_data)

            if hw_fit is not None:
                models['holt_winters'] = hw_fit
                model_metrics['holt_winters'] = hw_metrics

            # Determine best model
            print("\n🔹 Determining best model...")
            best_model_name = min(model_metrics, key=lambda x: model_metrics[x]['test_rmse'])
            best_model = models[best_model_name]
            best_metrics = model_metrics[best_model_name]

            print(f"Best model for {state.title()}: {best_model_name}")
            print(f"Best model metrics:")
            for metric, value in best_metrics.items():
                print(f"   - {metric}: {value:.4f}")

            # Create visualizations and save directly to S3
            self._create_forecast_visualization(
                state,
                train_data,
                test_data,
                models,
                best_model_name,
                split_date
            )

            # Save all models directly to S3
            state_model_prefix = f"{self.model_registry_prefix}/{state}"
            self.s3_connector.create_s3_directory(self.s3_bucket, state_model_prefix)

            for model_name, model in models.items():
                s3_model_key = f"{state_model_prefix}/{model_name}.pkl"
                self.s3_connector.save_pickle_to_s3(model, self.s3_bucket, s3_model_key)
                print(f"Saved {model_name} model to s3://{self.s3_bucket}/{s3_model_key}")

            # Save best model separately
            best_model_key = f"{self.model_registry_prefix}/{state}_best_model.pkl"
            self.s3_connector.save_pickle_to_s3(best_model, self.s3_bucket, best_model_key)
            print(f"Saved best model ({best_model_name}) to s3://{self.s3_bucket}/{best_model_key}")

            # Store feature importance data
            if best_model_name == 'random_forest':
                importance_data = rf_importance
            elif best_model_name == 'xgboost':
                importance_data = xgb_importance
            else:
                importance_data = None

            # Create result structure with S3 paths instead of local paths
            result = {
                'best_model': best_model_name,
                'model': best_model,
                'metrics': best_metrics,
                'feature_importance': importance_data.to_dict() if importance_data is not None else None,
                'model_s3_path': f"s3://{self.s3_bucket}/{best_model_key}"
            }

            print(f"✅ Successfully processed {state.title()}")
            return result

        except Exception as e:
            print(f"❌ Error processing state {state}: {str(e)}")
            import traceback
            traceback.print_exc()
            return None

    def _get_feature_columns(self, df):
        """Get feature columns that exist in the dataframe."""
        all_feature_cols = [
            # Economic indicators
            'MSP_Wheat_KG', 'CPI', 'diesel_price',
            'Diesel ROC', 'Wheat ROC', 'Diesel / Wheat Price Ratio',

            # External factors
            'Rainfall',

            # Time components
            'year', 'month_num', 'quarter',
            'month_sin', 'month_cos', 'quarter_sin', 'quarter_cos',

            # Lagged features
            'lag_1m', 'lag_3m', 'lag_6m', 'lag_12m',

            # Rolling statistics
            'rolling_mean_3m', 'rolling_mean_6m',
            'rolling_std_3m', 'rolling_std_6m',

            # Rate of change
            'roc_1m', 'roc_3m',

            # Price ratios
            'MSP_to_retail_ratio'
        ]

        # Filter to columns that exist in the dataframe
        feature_cols = [col for col in all_feature_cols if col in df.columns]
        print(f"Using {len(feature_cols)} features: {', '.join(feature_cols)}")

        return feature_cols

    def _train_random_forest(self, X_train, y_train, X_test, y_test):
        """Train and evaluate Random Forest model."""
        rf_params = {
            'n_estimators': 200,
            'max_depth': 15,
            'min_samples_split': 5,
            'random_state': 42,
            'n_jobs': -1
        }

        print(f"RF Parameters: {rf_params}")

        # Train model
        rf_model = RandomForestRegressor(**rf_params)
        rf_model.fit(X_train, y_train)

        # Evaluate
        train_pred_rf = rf_model.predict(X_train)
        test_pred_rf = rf_model.predict(X_test)

        rf_metrics = self._calculate_metrics(y_train, train_pred_rf, y_test, test_pred_rf)

        # Get feature importance
        feature_importance_rf = pd.DataFrame({
            'Feature': X_train.columns,
            'Importance': rf_model.feature_importances_
        }).sort_values('Importance', ascending=False)

        print(f"Random Forest metrics - Test RMSE: {rf_metrics['test_rmse']:.4f}, R²: {rf_metrics['test_r2']:.4f}")
        print(f"Top 5 important features:")
        for i, row in feature_importance_rf.head(5).iterrows():
            print(f"   - {row['Feature']}: {row['Importance']:.4f}")

        return rf_model, rf_metrics, feature_importance_rf

    def _train_xgboost(self, X_train, y_train, X_test, y_test):
        """Train and evaluate XGBoost model."""
        xgb_params = {
            'n_estimators': 1000,
            'learning_rate': 0.03,
            'max_depth': 6,
            'colsample_bytree': 0.8,
            'subsample': 0.9,
            'gamma': 0.1,
            'reg_alpha': 0.1,
            'reg_lambda': 0.5,
            'random_state': 42
        }

        print(f"XGBoost Parameters: {xgb_params}")

        # Train model
        xgb_model = xgb.XGBRegressor(**xgb_params)
        xgb_model.fit(X_train, y_train)

        # Evaluate
        train_pred_xgb = xgb_model.predict(X_train)
        test_pred_xgb = xgb_model.predict(X_test)

        xgb_metrics = self._calculate_metrics(y_train, train_pred_xgb, y_test, test_pred_xgb)

        # Get feature importance
        feature_importance_xgb = pd.DataFrame({
            'Feature': X_train.columns,
            'Importance': xgb_model.feature_importances_
        }).sort_values('Importance', ascending=False)

        print(f"XGBoost metrics - Test RMSE: {xgb_metrics['test_rmse']:.4f}, R²: {xgb_metrics['test_r2']:.4f}")
        print(f"Top 5 important features:")
        for i, row in feature_importance_xgb.head(5).iterrows():
            print(f"   - {row['Feature']}: {row['Importance']:.4f}")

        return xgb_model, xgb_metrics, feature_importance_xgb

    def _train_holt_winters(self, train_data, test_data):
        """Train and evaluate Holt-Winters model."""
        # Convert to pandas Series with datetime index
        train_series = pd.Series(train_data['price_per_KG'].values, index=train_data.index)
        test_series = pd.Series(test_data['price_per_KG'].values, index=test_data.index)

        hw_params = {
            'trend': 'add',
            'seasonal': 'mul',
            'seasonal_periods': 12  # Monthly data with yearly seasonality
        }

        print(f"Holt-Winters Parameters: {hw_params}")

        try:
            # Create and fit model
            hw_model = ExponentialSmoothing(train_series, **hw_params)
            hw_fit = hw_model.fit()

            # Make predictions
            hw_train_pred = hw_fit.fittedvalues
            hw_test_pred = hw_fit.forecast(steps=len(test_series))

            # Align indices
            hw_test_pred.index = test_series.index

            # Calculate metrics
            hw_train_rmse = np.sqrt(mean_squared_error(train_series, hw_train_pred))
            hw_test_rmse = np.sqrt(mean_squared_error(test_series, hw_test_pred))
            hw_train_mae = mean_absolute_error(train_series, hw_train_pred)
            hw_test_mae = mean_absolute_error(test_series, hw_test_pred)

            # Avoid division by zero in MAPE
            hw_train_mape = np.mean(np.abs((train_series - hw_train_pred) /
                                          np.maximum(0.01, np.abs(train_series)))) * 100
            hw_test_mape = np.mean(np.abs((test_series - hw_test_pred) /
                                         np.maximum(0.01, np.abs(test_series)))) * 100

            # Calculate R² if possible
            try:
                hw_train_r2 = r2_score(train_series, hw_train_pred)
                hw_test_r2 = r2_score(test_series, hw_test_pred)
            except:
                hw_train_r2 = np.nan
                hw_test_r2 = np.nan

            hw_metrics = {
                'train_rmse': hw_train_rmse,
                'test_rmse': hw_test_rmse,
                'train_mae': hw_train_mae,
                'test_mae': hw_test_mae,
                'train_mape': hw_train_mape,
                'test_mape': hw_test_mape,
                'train_r2': hw_train_r2,
                'test_r2': hw_test_r2
            }

            print(f"Holt-Winters metrics - Test RMSE: {hw_test_rmse:.4f}")

            return hw_fit, hw_metrics

        except Exception as e:
            print(f"❌ Error training Holt-Winters model: {str(e)}")
            return None, None

    def _calculate_metrics(self, y_train, train_pred, y_test, test_pred):
        """Calculate common evaluation metrics."""
        train_rmse = np.sqrt(mean_squared_error(y_train, train_pred))
        test_rmse = np.sqrt(mean_squared_error(y_test, test_pred))
        train_mae = mean_absolute_error(y_train, train_pred)
        test_mae = mean_absolute_error(y_test, test_pred)
        train_r2 = r2_score(y_train, train_pred)
        test_r2 = r2_score(y_test, test_pred)

        # Avoid division by zero in MAPE
        train_mape = np.mean(np.abs((y_train - train_pred) / np.maximum(0.01, np.abs(y_train)))) * 100
        test_mape = np.mean(np.abs((y_test - test_pred) / np.maximum(0.01, np.abs(y_test)))) * 100

        metrics = {
            'train_rmse': train_rmse,
            'test_rmse': test_rmse,
            'train_mae': train_mae,
            'test_mae': test_mae,
            'train_mape': train_mape,
            'test_mape': test_mape,
            'train_r2': train_r2,
            'test_r2': test_r2
        }

        return metrics

    def _create_forecast_visualization(self, state, train_data, test_data, models, best_model_name, split_date):
        """Create visualization of model forecasts and save directly to S3."""
        print("\n🔹 Creating visualization...")

        plt.figure(figsize=(14, 8))

        # Plot actual prices
        plt.plot(train_data.index, train_data['price_per_KG'], 'b-',
                 label='Actual (Train)', alpha=0.7, linewidth=2)
        plt.plot(test_data.index, test_data['price_per_KG'], 'k-',
                 label='Actual (Test)', alpha=0.7, linewidth=2)

        # Plot best model predictions
        if best_model_name == 'holt_winters':
            hw_fit = models['holt_winters']
            hw_train_pred = hw_fit.fittedvalues
            hw_test_pred = hw_fit.forecast(steps=len(test_data))
            hw_test_pred.index = test_data.index

            plt.plot(hw_train_pred.index, hw_train_pred, 'g--',
                     label=f'Predicted (Train - {best_model_name})', alpha=0.7, linewidth=2)
            plt.plot(hw_test_pred.index, hw_test_pred, 'r--',
                     label=f'Predicted (Test - {best_model_name})', alpha=0.7, linewidth=2)
        else:
            # For RF and XGB, calculate predictions
            X_train = train_data[self.feature_cols]
            X_test = test_data[self.feature_cols]

            best_model = models[best_model_name]
            train_pred = best_model.predict(X_train)
            test_pred = best_model.predict(X_test)

            plt.plot(train_data.index, train_pred, 'g--',
                     label=f'Predicted (Train - {best_model_name})', alpha=0.7, linewidth=2)
            plt.plot(test_data.index, test_pred, 'r--',
                     label=f'Predicted (Test - {best_model_name})', alpha=0.7, linewidth=2)

        # Add chart elements
        plt.title(f'Wheat Price Forecast for {state.title()} - {best_model_name.title()}', fontsize=16)
        plt.xlabel('Date', fontsize=14)
        plt.ylabel('Price per KG (₹)', fontsize=14)
        plt.grid(True, alpha=0.3)
        plt.legend(loc='best', fontsize=12)

        # Add vertical line for train/test split
        plt.axvline(pd.to_datetime(split_date), color='gray', linestyle='--', alpha=0.7)
        plt.text(pd.to_datetime(split_date), plt.ylim()[0], 'Train-Test Split',
                 rotation=90, verticalalignment='bottom', fontsize=12)

        # Format plot
        plt.tight_layout()

        # Save the plot directly to S3
        s3_plot_key = f"{self.plots_prefix}/{state}_forecast.png"
        self.s3_connector.save_figure_to_s3(plt.gcf(), self.s3_bucket, s3_plot_key)
        plt.close()

        print(f"✅ Saved visualization to s3://{self.s3_bucket}/{s3_plot_key}")

        # Create feature importance plot if applicable
        if best_model_name in ['random_forest', 'xgboost']:
            self._create_feature_importance_plot(state, best_model_name, models[best_model_name], self.feature_cols)


    def _create_feature_importance_plot(self, state, model_name, model, feature_cols):
          """Create feature importance visualization and save directly to S3."""
          # Get feature importances
          importances = model.feature_importances_

          # Create DataFrame of feature importances
          feature_importance = pd.DataFrame({
              'Feature': feature_cols,
              'Importance': importances
          }).sort_values('Importance', ascending=False)

          # Create plot
          plt.figure(figsize=(12, 8))

          # Plot horizontal bar chart
          sns.barplot(x='Importance', y='Feature', data=feature_importance.head(15), palette='viridis')

          # Add chart elements
          plt.title(f'Top 15 Feature Importances for {state.title()} - {model_name.title()}', fontsize=16)
          plt.xlabel('Importance', fontsize=14)
          plt.ylabel('Feature', fontsize=14)
          plt.grid(True, alpha=0.3, axis='x')

          # Format plot
          plt.tight_layout()

          # Save the plot directly to S3
          s3_plot_key = f"{self.plots_prefix}/{state}_{model_name}_importance.png"
          self.s3_connector.save_figure_to_s3(plt.gcf(), self.s3_bucket, s3_plot_key)
          plt.close()

          print(f"✅ Saved feature importance plot to s3://{self.s3_bucket}/{s3_plot_key}")

          # Also save the feature importance data as JSON
          importance_data = feature_importance.to_dict('records')
          s3_importance_key = f"{self.plots_prefix}/{state}_{model_name}_importance.json"
          self.s3_connector.save_data_to_s3(
              importance_data,
              self.s3_bucket,
              s3_importance_key,
              content_type='application/json'
          )
          print(f"✅ Saved feature importance data to s3://{self.s3_bucket}/{s3_importance_key}")


In [17]:
class ModelAggregator:
    """Class for aggregating models across all states."""

    def __init__(self, s3_connector, s3_bucket, output_prefix):
        """
        Initialize the model aggregator with S3 storage.

        Args:
            s3_connector (S3Connector): S3 connector instance
            s3_bucket (str): S3 bucket for saving models
            output_prefix (str): Prefix path in the S3 bucket
        """
        self.s3_connector = s3_connector
        self.s3_bucket = s3_bucket
        self.output_prefix = output_prefix

    def aggregate_models(self, all_states_models, feature_cols):
        """
        Combine all state models into a single file with metadata and save to S3.

        Args:
            all_states_models (dict): Dictionary of state-model mappings
            feature_cols (list): Feature columns used in modeling

        Returns:
            str: S3 path to the combined model file
        """
        print("\n" + "="*80)
        print("📦 Aggregating models across all states to S3")
        print("="*80)

        # Filter out None values
        valid_models = {state: data for state, data in all_states_models.items() if data is not None}

        print(f"Aggregating models for {len(valid_models)}/{len(all_states_models)} states")

        # Create metadata
        metadata = {
            'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'states_included': list(valid_models.keys()),
            'total_states': len(valid_models),
            'model_details': {
                state: {
                    'best_model': data['best_model'],
                    'metrics': data['metrics'],
                    'feature_importance': data['feature_importance'],
                    's3_model_path': data.get('model_s3_path', f"s3://{self.s3_bucket}/{self.output_prefix}/model_registry/{state}_best_model.pkl")
                } for state, data in valid_models.items()
            },
            'feature_cols': feature_cols
        }

        # Create combined data structure
        combined_data = {
            'models': {state: data['model'] for state, data in valid_models.items()},
            'metadata': metadata
        }

        # Generate S3 paths with timestamp
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

        # Save the combined model directly to S3
        s3_combined_key = f"{self.output_prefix}/models/all_states_best_model_{timestamp}.pkl"
        s3_latest_key = f"{self.output_prefix}/models/all_states_best_model_latest.pkl"
        s3_metadata_key = f"{self.output_prefix}/models/all_states_metadata_{timestamp}.json"

        # Save combined model to S3
        model_saved = self.s3_connector.save_pickle_to_s3(
            combined_data,
            self.s3_bucket,
            s3_combined_key
        )

        if model_saved:
            print(f"✅ Saved combined model to S3: s3://{self.s3_bucket}/{s3_combined_key}")

            # Save a copy as "latest" version
            self.s3_connector.save_pickle_to_s3(
                combined_data,
                self.s3_bucket,
                s3_latest_key
            )
            print(f"✅ Saved latest version to S3: s3://{self.s3_bucket}/{s3_latest_key}")

            # Save metadata as JSON
            self.s3_connector.save_data_to_s3(
                metadata,
                self.s3_bucket,
                s3_metadata_key,
                content_type='application/json'
            )
            print(f"✅ Saved metadata to S3: s3://{self.s3_bucket}/{s3_metadata_key}")
        else:
            print(f"❌ Failed to save combined model to S3")

        # Return the S3 path
        combined_s3_path = f"s3://{self.s3_bucket}/{s3_combined_key}"
        return combined_s3_path

    def print_summary_table(self, all_states_models):
        """
        Print a summary table of model results.

        Args:
            all_states_models (dict): Dictionary of state-model mappings
        """
        print("\n" + "="*80)
        print("📊 Summary of Results")
        print("="*80)

        # Filter out None values
        valid_models = {state: data for state, data in all_states_models.items() if data is not None}

        # Print header
        print(f"{'State':<20} {'Best Model':<15} {'Test RMSE':<12} {'Test MAPE':<12} {'R²':<8}")
        print(f"{'-'*80}")

        # Print each state's results
        for state, details in sorted(valid_models.items()):
            rmse = details['metrics'].get('test_rmse', 'N/A')
            mape = details['metrics'].get('test_mape', 'N/A')
            r2 = details['metrics'].get('test_r2', 'N/A')

            rmse_str = f"{rmse:.4f}" if isinstance(rmse, (int, float)) else rmse
            mape_str = f"{mape:.2f}%" if isinstance(mape, (int, float)) else mape
            r2_str = f"{r2:.4f}" if isinstance(r2, (int, float)) else r2

            print(f"{state.title():<20} {details['best_model']:<15} {rmse_str:<12} {mape_str:<12} {r2_str:<8}")

        print(f"{'='*80}")
        print(f"✅ Successfully processed {len(valid_models)}/{len(all_states_models)} states")

In [18]:
class ModelAggregator:
    """Class for aggregating models across all states."""

    def __init__(self, s3_connector, s3_bucket, output_prefix):
        """
        Initialize the model aggregator with S3 storage.

        Args:
            s3_connector (S3Connector): S3 connector instance
            s3_bucket (str): S3 bucket for saving models
            output_prefix (str): Prefix path in the S3 bucket
        """
        self.s3_connector = s3_connector
        self.s3_bucket = s3_bucket
        self.output_prefix = output_prefix

    def aggregate_models(self, all_states_models, feature_cols):
        """
        Combine all state models into a single file with metadata and save to S3.

        Args:
            all_states_models (dict): Dictionary of state-model mappings
            feature_cols (list): Feature columns used in modeling

        Returns:
            str: S3 path to the combined model file
        """
        print("\n" + "="*80)
        print("📦 Aggregating models across all states to S3")
        print("="*80)

        # Filter out None values
        valid_models = {state: data for state, data in all_states_models.items() if data is not None}

        print(f"Aggregating models for {len(valid_models)}/{len(all_states_models)} states")

        # Create metadata
        metadata = {
            'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'states_included': list(valid_models.keys()),
            'total_states': len(valid_models),
            'model_details': {
                state: {
                    'best_model': data['best_model'],
                    'metrics': data['metrics'],
                    'feature_importance': data['feature_importance'],
                    's3_model_path': data.get('model_s3_path', f"s3://{self.s3_bucket}/{self.output_prefix}/model_registry/{state}_best_model.pkl")
                } for state, data in valid_models.items()
            },
            'feature_cols': feature_cols
        }

        # Create combined data structure
        combined_data = {
            'models': {state: data['model'] for state, data in valid_models.items()},
            'metadata': metadata
        }

        # Generate S3 paths with timestamp
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

        # Save the combined model directly to S3
        s3_combined_key = f"{self.output_prefix}/models/all_states_best_model_{timestamp}.pkl"
        s3_latest_key = f"{self.output_prefix}/models/all_states_best_model_latest.pkl"
        s3_metadata_key = f"{self.output_prefix}/models/all_states_metadata_{timestamp}.json"

        # Save combined model to S3
        model_saved = self.s3_connector.save_pickle_to_s3(
            combined_data,
            self.s3_bucket,
            s3_combined_key
        )

        if model_saved:
            print(f"✅ Saved combined model to S3: s3://{self.s3_bucket}/{s3_combined_key}")

            # Save a copy as "latest" version
            self.s3_connector.save_pickle_to_s3(
                combined_data,
                self.s3_bucket,
                s3_latest_key
            )
            print(f"✅ Saved latest version to S3: s3://{self.s3_bucket}/{s3_latest_key}")

            # Save metadata as JSON
            self.s3_connector.save_data_to_s3(
                metadata,
                self.s3_bucket,
                s3_metadata_key,
                content_type='application/json'
            )
            print(f"✅ Saved metadata to S3: s3://{self.s3_bucket}/{s3_metadata_key}")
        else:
            print(f"❌ Failed to save combined model to S3")

        # Return the S3 path
        combined_s3_path = f"s3://{self.s3_bucket}/{s3_combined_key}"
        return combined_s3_path

    def print_summary_table(self, all_states_models):
        """
        Print a summary table of model results.

        Args:
            all_states_models (dict): Dictionary of state-model mappings
        """
        print("\n" + "="*80)
        print("📊 Summary of Results")
        print("="*80)

        # Filter out None values
        valid_models = {state: data for state, data in all_states_models.items() if data is not None}

        # Print header
        print(f"{'State':<20} {'Best Model':<15} {'Test RMSE':<12} {'Test MAPE':<12} {'R²':<8}")
        print(f"{'-'*80}")

        # Print each state's results
        for state, details in sorted(valid_models.items()):
            rmse = details['metrics'].get('test_rmse', 'N/A')
            mape = details['metrics'].get('test_mape', 'N/A')
            r2 = details['metrics'].get('test_r2', 'N/A')

            rmse_str = f"{rmse:.4f}" if isinstance(rmse, (int, float)) else rmse
            mape_str = f"{mape:.2f}%" if isinstance(mape, (int, float)) else mape
            r2_str = f"{r2:.4f}" if isinstance(r2, (int, float)) else r2

            print(f"{state.title():<20} {details['best_model']:<15} {rmse_str:<12} {mape_str:<12} {r2_str:<8}")

        print(f"{'='*80}")
        print(f"✅ Successfully processed {len(valid_models)}/{len(all_states_models)} states")


class WheatPriceForecaster:
    """Main class for wheat price forecasting process with S3 storage."""

    def __init__(self,
                 s3_bucket,
                 s3_key,
                 output_prefix="wheat_forecaster",
                 split_date="2020-01-01",
                 mlflow_uri=None,
                 aws_access_key_id=None,
                 aws_secret_access_key=None,
                 aws_region='us-east-1'):
        """
        Initialize the wheat price forecasting process with S3 data source and storage.

        Args:
            s3_bucket (str): S3 bucket containing the CSV data file and for storing outputs
            s3_key (str): S3 key (path) to the CSV data file
            output_prefix (str): Prefix path in the S3 bucket for all outputs
            split_date (str): Date to split train/test data
            mlflow_uri (str, optional): MLflow tracking URI
            aws_access_key_id (str, optional): AWS access key ID
            aws_secret_access_key (str, optional): AWS secret access key
            aws_region (str, optional): AWS region name
        """
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.output_prefix = output_prefix
        self.split_date = split_date
        self.mlflow_uri = mlflow_uri

        # Initialize S3 connector
        self.s3_connector = S3Connector(
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=aws_region
        )

        # Initialize other attributes
        self.raw_data = None
        self.wheat_data = None
        self.all_states = None
        self.all_state_models = {}
        self.feature_cols = None

        # Print initialization information
        print(f"🚀 Initializing Wheat Price Forecaster with S3 storage")
        print(f"📄 Input file: s3://{self.s3_bucket}/{self.s3_key}")
        print(f"📁 Output location: s3://{self.s3_bucket}/{self.output_prefix}/")
        print(f"📅 Train/Test split date: {self.split_date}")
        if self.mlflow_uri:
            print(f"📊 MLflow tracking URI: {self.mlflow_uri}")

            # Set up MLflow if provided
            import mlflow
            mlflow.set_tracking_uri(self.mlflow_uri)

    def run_full_process(self):
        """Run the complete wheat price forecasting process with S3 storage."""
        # Step 1: Create S3 directory structure
        print("\n" + "="*80)
        print("Step 1: Setting up S3 directory structure")
        print("="*80)

        # Create S3 directory structure
        self.s3_connector.create_s3_directory(self.s3_bucket, f"{self.output_prefix}/")
        self.s3_connector.create_s3_directory(self.s3_bucket, f"{self.output_prefix}/model_registry/")
        self.s3_connector.create_s3_directory(self.s3_bucket, f"{self.output_prefix}/plots/")
        self.s3_connector.create_s3_directory(self.s3_bucket, f"{self.output_prefix}/models/")

        print(f"✅ Created S3 directory structure in s3://{self.s3_bucket}/{self.output_prefix}/")

        # Step 2: Load data from S3
        print("\n" + "="*80)
        print("Step 2: Loading data from S3")
        print("="*80)
        self.raw_data = self.s3_connector.read_csv_from_s3(self.s3_bucket, self.s3_key)
        if self.raw_data is None:
            print("❌ Failed to load data from S3. Exiting process.")
            return None

        # Step 3: Preprocess data
        print("\n" + "="*80)
        print("Step 3: Preprocessing data")
        print("="*80)
        preprocessor = DataPreprocessor()
        self.wheat_data = preprocessor.preprocess_retail_prices(self.raw_data)
        if self.wheat_data is None:
            print("❌ Preprocessing failed. Exiting process.")
            return None

        # Get all states
        self.all_states = self.wheat_data['state'].str.lower().unique().tolist()
        print(f"\nFound {len(self.all_states)} states with wheat price data:")
        for i, state in enumerate(self.all_states):
            print(f"   {i+1}. {state.title()}")

        # Step 4: Process each state
        print("\n" + "="*80)
        print("Step 4: Processing individual states")
        print("="*80)

        feature_engineer = FeatureEngineer()
        model_trainer = ModelTrainer(
            s3_connector=self.s3_connector,
            s3_bucket=self.s3_bucket,
            output_prefix=self.output_prefix
        )

        for state_idx, state in enumerate(self.all_states):
            print(f"\nProcessing state {state_idx+1}/{len(self.all_states)}: {state.title()}")

            # Filter data for this state
            df_state = self.wheat_data[self.wheat_data['state'].str.lower() == state.lower()].copy()

            # Engineer features
            df_state = feature_engineer.engineer_features(df_state)

            # Train models for this state
            state_result = model_trainer.train_models_for_state(state, df_state, self.split_date)

            if state_result is not None:
                # Store feature columns from first successful state if not already set
                if self.feature_cols is None:
                    self.feature_cols = model_trainer.feature_cols

                # Store state result
                self.all_state_models[state] = state_result

        # Step 5: Aggregate models to S3
        print("\n" + "="*80)
        print("Step 5: Aggregating models to S3")
        print("="*80)

        # Initialize aggregator with S3 capability
        aggregator = ModelAggregator(
            s3_connector=self.s3_connector,
            s3_bucket=self.s3_bucket,
            output_prefix=self.output_prefix
        )
        combined_s3_path = aggregator.aggregate_models(self.all_state_models, self.feature_cols)

        # Step 6: Print summary
        print("\n" + "="*80)
        print("Step 6: Results summary")
        print("="*80)

        aggregator.print_summary_table(self.all_state_models)

        print("\n" + "="*80)
        print("✅ Wheat price forecasting process completed successfully")
        print(f"✅ All outputs saved to S3: s3://{self.s3_bucket}/{self.output_prefix}/")
        print("="*80)

        return {
            'combined_model_s3_path': combined_s3_path,
            'all_state_models': self.all_state_models,
            'feature_cols': self.feature_cols,
            's3_location': f"s3://{self.s3_bucket}/{self.output_prefix}/"
        }

In [21]:
def main(s3_bucket,
         s3_key,
         output_prefix="wheat_forecaster",
         split_date="2020-01-01",
         mlflow_uri=None,
         aws_access_key_id=None,
         aws_secret_access_key=None,
         aws_region='us-east-1'):
    """
    Run the wheat price forecasting pipeline using data from S3 and storing all outputs to S3.

    Args:
        s3_bucket (str): S3 bucket containing the CSV data file and for storing outputs
        s3_key (str): S3 key (path) to the CSV data file
        output_prefix (str): Prefix path in the S3 bucket for all outputs
        split_date (str): Date to split train/test data
        mlflow_uri (str, optional): MLflow tracking URI
        aws_access_key_id (str, optional): AWS access key ID
        aws_secret_access_key (str, optional): AWS secret access key
        aws_region (str, optional): AWS region name

    Returns:
        dict: Results from the forecasting process with S3 paths
    """
    forecaster = WheatPriceForecaster(
        s3_bucket=s3_bucket,
        s3_key=s3_key,
        output_prefix=output_prefix,
        split_date=split_date,
        mlflow_uri=mlflow_uri,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_region=aws_region
    )
    results = forecaster.run_full_process()
    return results

In [23]:
results = main(
    s3_bucket="foundation-project-data",
    s3_key="data/wheat_prices_merged.csv",
    output_prefix="wheat_forecaster",
    split_date="2020-01-01",
    mlflow_uri="http://ec2-51-21-244-174.eu-north-1.compute.amazonaws.com:5000/",
    aws_access_key_id="AKIAWZDATPAE545UJHYU",
    aws_secret_access_key="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",
    aws_region="eu-north-1"
)

✅ S3 client initialized for region: eu-north-1
🚀 Initializing Wheat Price Forecaster with S3 storage
📄 Input file: s3://foundation-project-data/data/wheat_prices_merged.csv
📁 Output location: s3://foundation-project-data/wheat_forecaster/
📅 Train/Test split date: 2020-01-01
📊 MLflow tracking URI: http://ec2-51-21-244-174.eu-north-1.compute.amazonaws.com:5000/

Step 1: Setting up S3 directory structure
✅ Created S3 directory: s3://foundation-project-data/wheat_forecaster/
✅ Created S3 directory: s3://foundation-project-data/wheat_forecaster/model_registry/
✅ Created S3 directory: s3://foundation-project-data/wheat_forecaster/plots/
✅ Created S3 directory: s3://foundation-project-data/wheat_forecaster/models/
✅ Created S3 directory structure in s3://foundation-project-data/wheat_forecaster/

Step 2: Loading data from S3
Reading file from S3: s3://foundation-project-data/data/wheat_prices_merged.csv
✅ Successfully read CSV from S3 with 5,658 rows

Step 3: Preprocessing data
Starting data 