In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pickle
import warnings
from pathlib import Path
from typing import Tuple, Dict, Any, Optional
import logging

# Machine Learning imports
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

In [None]:
"""
Optimized Gold Price Prediction using Machine Learning
=====================================================

This module provides a comprehensive implementation for predicting gold price
"""


# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Suppress warnings
warnings.filterwarnings("ignore")

# Set plotting style
sns.set_style("darkgrid", {"grid.color": ".6", "grid.linestyle": ":"})
plt.rcParams['figure.figsize'] = (12, 8)


class GoldPricePredictor:
    """
    A comprehensive class for gold price prediction using machine learning.
    
    This class handles data loading, preprocessing, feature engineering,
    model training, and evaluation with multiple algorithms.
    """
    
    def __init__(self, data_path: str = "gold_price_data.csv"):
        """
        Initialize the GoldPricePredictor.
        
        Args:
            data_path (str): Path to the CSV file containing gold price data
        """
        self.data_path = data_path
        self.dataset = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.X_train_scaled = None
        self.X_test_scaled = None
        self.scaler = StandardScaler()
        self.imputer = SimpleImputer(strategy='mean')
        self.models = {}
        self.feature_names = None
        
    def load_data(self) -> pd.DataFrame:
        """
        Load and validate the dataset.
        
        Returns:
            pd.DataFrame: Loaded dataset
        """
        try:
            logger.info(f"Loading data from {self.data_path}")
            self.dataset = pd.read_csv(self.data_path, parse_dates=["Date"])
            
            # Basic validation
            if self.dataset.empty:
                raise ValueError("Dataset is empty")
            
            logger.info(f"Dataset loaded successfully. Shape: {self.dataset.shape}")
            logger.info(f"Columns: {list(self.dataset.columns)}")
            
            return self.dataset
            
        except FileNotFoundError:
            logger.error(f"Data file not found: {self.data_path}")
            raise
        except Exception as e:
            logger.error(f"Error loading data: {str(e)}")
            raise
    
    def explore_data(self) -> Dict[str, Any]:
        """
        Perform exploratory data analysis.
        
        Returns:
            Dict[str, Any]: Summary statistics and insights
        """
        logger.info("Performing exploratory data analysis...")
        
        # Basic info
        info = {
            'shape': self.dataset.shape,
            'columns': list(self.dataset.columns),
            'dtypes': self.dataset.dtypes.to_dict(),
            'missing_values': self.dataset.isna().sum().to_dict(),
            'skewness': self.dataset.drop("Date", axis=1).skew().to_dict()
        }
        
        # Display basic info
        logger.info(f"Dataset shape: {info['shape']}")
        logger.info(f"Missing values: {info['missing_values']}")
        logger.info(f"Skewness: {info['skewness']}")
        
        return info
    
    def create_correlation_heatmap(self, save_path: Optional[str] = None):
        """Create and display correlation heatmap."""
        correlation = self.dataset.corr()
        
        plt.figure(figsize=(10, 8))
        sns.heatmap(correlation, cmap='coolwarm', center=0, annot=True, fmt='.2f')
        plt.title('Correlation Matrix Heatmap')
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
    
    def preprocess_data(self, drop_slv: bool = True) -> pd.DataFrame:
        """
        Preprocess the data including feature engineering and outlier removal.
        
        Args:
            drop_slv (bool): Whether to drop the SLV column
            
        Returns:
            pd.DataFrame: Preprocessed dataset
        """
        logger.info("Starting data preprocessing...")
        
        # Create a copy to avoid modifying original data
        data = self.dataset.copy()
        
        # Drop SLV column if requested
        if drop_slv and 'SLV' in data.columns:
            data = data.drop("SLV", axis=1)
            logger.info("Dropped SLV column")
        
        # Set Date as index
        data.set_index("Date", inplace=True)
        
        # Create price trend feature using rolling mean
        data["price_trend"] = data["EUR/USD"].rolling(window=20).mean()
        
        # Reset index to include Date as column
        data.reset_index(inplace=True)
        
        # Handle skewed data with square root transformation
        if data['USO'].skew() > 1.0:
            data["USO"] = data["USO"].apply(lambda x: np.sqrt(x))
            logger.info("Applied square root transformation to USO column")
        
        # Remove outliers using percentile capping
        numeric_columns = ['SPX', 'GLD', 'USO', 'EUR/USD']
        for col in numeric_columns:
            if col in data.columns:
                data[col] = self._remove_outliers(data[col])
        
        logger.info("Data preprocessing completed")
        return data
    
    def _remove_outliers(self, column: pd.Series) -> pd.Series:
        """
        Remove outliers using percentile capping.
        
        Args:
            column (pd.Series): Input column
            
        Returns:
            pd.Series: Column with outliers capped
        """
        upper_limit = column.quantile(0.95)
        lower_limit = column.quantile(0.05)
        
        column_capped = column.copy()
        column_capped.loc[column_capped > upper_limit] = upper_limit
        column_capped.loc[column_capped < lower_limit] = lower_limit
        
        return column_capped
    
    def prepare_features(self, target_column: str = 'EUR/USD') -> Tuple[pd.DataFrame, pd.Series]:
        """
        Prepare features and target for modeling.
        
        Args:
            target_column (str): Name of the target column
            
        Returns:
            Tuple[pd.DataFrame, pd.Series]: Features and target
        """
        logger.info("Preparing features and target...")
        
        # Select features and target
        X = self.dataset.drop(['Date', target_column], axis=1)
        y = self.dataset[target_column]
        
        self.feature_names = X.columns.tolist()
        logger.info(f"Features: {self.feature_names}")
        logger.info(f"Target: {target_column}")
        
        return X, y
    
    def split_and_scale_data(self, X: pd.DataFrame, y: pd.Series, 
                           test_size: float = 0.2, random_state: int = 42) -> None:
        """
        Split data into train/test sets and scale features.
        
        Args:
            X (pd.DataFrame): Feature matrix
            y (pd.Series): Target variable
            test_size (float): Proportion of data for testing
            random_state (int): Random seed for reproducibility
        """
        logger.info("Splitting and scaling data...")
        
        # Split data
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )
        
        # Scale features
        self.X_train_scaled = self.scaler.fit_transform(self.X_train)
        self.X_test_scaled = self.scaler.transform(self.X_test)
        
        # Handle missing values
        self.X_train_scaled = self.imputer.fit_transform(self.X_train_scaled)
        self.X_test_scaled = self.imputer.transform(self.X_test_scaled)
        
        logger.info(f"Training set shape: {self.X_train.shape}")
        logger.info(f"Test set shape: {self.X_test.shape}")
    
    def train_lasso_model(self) -> Dict[str, Any]:
        """
        Train Lasso regression with polynomial features and hyperparameter tuning.
        
        Returns:
            Dict[str, Any]: Model results and metrics
        """
        logger.info("Training Lasso model with polynomial features...")
        
        # Create pipeline
        poly = PolynomialFeatures(degree=2)
        lasso = Lasso()
        pipeline = make_pipeline(poly, lasso)
        
        # Define parameter grid
        param_grid = {
            'lasso__alpha': [1e-4, 1e-3, 1e-2, 1e-1, 1, 5, 10, 20, 30, 40]
        }
        
        # Grid search
        lasso_grid_search = GridSearchCV(
            pipeline, param_grid, scoring='r2', cv=3, n_jobs=-1
        )
        lasso_grid_search.fit(self.X_train_scaled, self.y_train)
        
        # Predictions
        y_train_pred = lasso_grid_search.predict(self.X_train_scaled)
        y_test_pred = lasso_grid_search.predict(self.X_test_scaled)
        
        # Metrics
        train_r2 = r2_score(self.y_train, y_train_pred)
        test_r2 = r2_score(self.y_test, y_test_pred)
        test_rmse = np.sqrt(mean_squared_error(self.y_test, y_test_pred))
        test_mae = mean_absolute_error(self.y_test, y_test_pred)
        
        results = {
            'model': lasso_grid_search,
            'train_r2': train_r2,
            'test_r2': test_r2,
            'test_rmse': test_rmse,
            'test_mae': test_mae,
            'best_params': lasso_grid_search.best_params_,
            'best_score': lasso_grid_search.best_score_
        }
        
        self.models['lasso'] = results
        
        logger.info(f"Lasso Model - Train R²: {train_r2:.4f}, Test R²: {test_r2:.4f}")
        logger.info(f"Lasso Model - Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}")
        
        return results
    
    def train_random_forest_model(self) -> Dict[str, Any]:
        """
        Train Random Forest model with hyperparameter tuning.
        
        Returns:
            Dict[str, Any]: Model results and metrics
        """
        logger.info("Training Random Forest model...")
        
        # Define parameter grid
        param_grid = {
            'n_estimators': [50, 80, 100, 150],
            'max_depth': [3, 5, 7, 10],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4]
        }
        
        # Grid search
        rf = RandomForestRegressor(random_state=42)
        rf_grid_search = GridSearchCV(
            rf, param_grid, scoring='r2', cv=3, n_jobs=-1
        )
        rf_grid_search.fit(self.X_train_scaled, self.y_train)
        
        # Predictions
        y_train_pred = rf_grid_search.predict(self.X_train_scaled)
        y_test_pred = rf_grid_search.predict(self.X_test_scaled)
        
        # Metrics
        train_r2 = r2_score(self.y_train, y_train_pred)
        test_r2 = r2_score(self.y_test, y_test_pred)
        test_rmse = np.sqrt(mean_squared_error(self.y_test, y_test_pred))
        test_mae = mean_absolute_error(self.y_test, y_test_pred)
        
        results = {
            'model': rf_grid_search,
            'train_r2': train_r2,
            'test_r2': test_r2,
            'test_rmse': test_rmse,
            'test_mae': test_mae,
            'best_params': rf_grid_search.best_params_,
            'best_score': rf_grid_search.best_score_,
            'feature_importance': rf_grid_search.best_estimator_.feature_importances_
        }
        
        self.models['random_forest'] = results
        
        logger.info(f"Random Forest - Train R²: {train_r2:.4f}, Test R²: {test_r2:.4f}")
        logger.info(f"Random Forest - Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}")
        
        return results
    
    def train_xgboost_model(self) -> Dict[str, Any]:
        """
        Train XGBoost model with hyperparameter tuning.
        
        Returns:
            Dict[str, Any]: Model results and metrics
        """
        logger.info("Training XGBoost model...")
        
        # Define parameter grid
        param_grid = {
            'n_estimators': [100, 200, 300],
            'max_depth': [3, 5, 7],
            'learning_rate': [0.01, 0.1, 0.2],
            'subsample': [0.8, 0.9, 1.0],
            'colsample_bytree': [0.8, 0.9, 1.0]
        }
        
        # Grid search
        xgb = XGBRegressor(random_state=42, n_jobs=-1)
        xgb_grid_search = GridSearchCV(
            xgb, param_grid, scoring='r2', cv=3, n_jobs=-1
        )
        xgb_grid_search.fit(self.X_train_scaled, self.y_train)
        
        # Predictions
        y_train_pred = xgb_grid_search.predict(self.X_train_scaled)
        y_test_pred = xgb_grid_search.predict(self.X_test_scaled)
        
        # Metrics
        train_r2 = r2_score(self.y_train, y_train_pred)
        test_r2 = r2_score(self.y_test, y_test_pred)
        test_rmse = np.sqrt(mean_squared_error(self.y_test, y_test_pred))
        test_mae = mean_absolute_error(self.y_test, y_test_pred)
        
        results = {
            'model': xgb_grid_search,
            'train_r2': train_r2,
            'test_r2': test_r2,
            'test_rmse': test_rmse,
            'test_mae': test_mae,
            'best_params': xgb_grid_search.best_params_,
            'best_score': xgb_grid_search.best_score_,
            'feature_importance': xgb_grid_search.best_estimator_.feature_importances_
        }
        
        self.models['xgboost'] = results
        
        logger.info(f"XGBoost - Train R²: {train_r2:.4f}, Test R²: {test_r2:.4f}")
        logger.info(f"XGBoost - Test RMSE: {test_rmse:.4f}, Test MAE: {test_mae:.4f}")
        
        return results
    
    def plot_feature_importance(self, model_name: str = 'random_forest', 
                              save_path: Optional[str] = None):
        """
        Plot feature importance for tree-based models.
        
        Args:
            model_name (str): Name of the model to plot importance for
            save_path (str): Path to save the plot
        """
        if model_name not in self.models:
            logger.error(f"Model {model_name} not found")
            return
        
        if 'feature_importance' not in self.models[model_name]:
            logger.error(f"Feature importance not available for {model_name}")
            return
        
        importances = self.models[model_name]['feature_importance']
        indices = np.argsort(importances)
        
        plt.figure(figsize=(10, 6))
        plt.title(f'Feature Importance - {model_name.replace("_", " ").title()}')
        plt.barh(range(len(indices)), importances[indices], color='red', align='center')
        plt.yticks(range(len(indices)), [self.feature_names[i] for i in indices])
        plt.xlabel('Relative Importance')
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
    
    def compare_models(self) -> pd.DataFrame:
        """
        Compare performance of all trained models.
        
        Returns:
            pd.DataFrame: Comparison table
        """
        comparison_data = []
        
        for model_name, results in self.models.items():
            comparison_data.append({
                'Model': model_name.replace('_', ' ').title(),
                'Train R²': f"{results['train_r2']:.4f}",
                'Test R²': f"{results['test_r2']:.4f}",
                'Test RMSE': f"{results['test_rmse']:.4f}",
                'Test MAE': f"{results['test_mae']:.4f}",
                'Best CV Score': f"{results['best_score']:.4f}"
            })
        
        comparison_df = pd.DataFrame(comparison_data)
        logger.info("Model Comparison:")
        logger.info(f"\n{comparison_df.to_string(index=False)}")
        
        return comparison_df
    
    def save_best_model(self, model_name: str = 'xgboost', filepath: str = 'best_model.pkl'):
        """
        Save the best performing model.
        
        Args:
            model_name (str): Name of the model to save
            filepath (str): Path to save the model
        """
        if model_name not in self.models:
            logger.error(f"Model {model_name} not found")
            return
        
        try:
            model = self.models[model_name]['model'].best_estimator_
            with open(filepath, 'wb') as f:
                pickle.dump({
                    'model': model,
                    'scaler': self.scaler,
                    'imputer': self.imputer,
                    'feature_names': self.feature_names,
                    'model_name': model_name,
                    'performance': self.models[model_name]
                }, f)
            
            logger.info(f"Best model saved to {filepath}")
            
        except Exception as e:
            logger.error(f"Error saving model: {str(e)}")
    
    def predict(self, new_data: pd.DataFrame, model_name: str = 'xgboost') -> np.ndarray:
        """
        Make predictions on new data.
        
        Args:
            new_data (pd.DataFrame): New data for prediction
            model_name (str): Name of the model to use
            
        Returns:
            np.ndarray: Predictions
        """
        if model_name not in self.models:
            raise ValueError(f"Model {model_name} not found")
        
        # Preprocess new data
        new_data_scaled = self.scaler.transform(new_data)
        new_data_scaled = self.imputer.transform(new_data_scaled)
        
        # Make predictions
        model = self.models[model_name]['model'].best_estimator_
        predictions = model.predict(new_data_scaled)
        
        return predictions
    
    def create_visualizations(self, save_dir: Optional[str] = None):
        """
        Create comprehensive visualizations for the analysis.
        
        Args:
            save_dir (str): Directory to save plots
        """
        logger.info("Creating visualizations...")
        
        # 1. Price trend over time
        plt.figure(figsize=(12, 6))
        self.dataset.set_index('Date')['EUR/USD'].plot()
        plt.title('Gold Price Trend Over Time')
        plt.xlabel('Date')
        plt.ylabel('Price (EUR/USD)')
        plt.xticks(rotation=45)
        plt.tight_layout()
        
        if save_dir:
            plt.savefig(f"{save_dir}/price_trend.png", dpi=300, bbox_inches='tight')
        plt.show()
        
        # 2. Distribution plots
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        numeric_cols = self.dataset.drop('Date', axis=1).columns
        
        for i, col in enumerate(numeric_cols):
            row, col_idx = i // 3, i % 3
            sns.histplot(data=self.dataset, x=col, kde=True, ax=axes[row, col_idx])
            axes[row, col_idx].set_title(f'Distribution of {col}')
        
        plt.tight_layout()
        if save_dir:
            plt.savefig(f"{save_dir}/distributions.png", dpi=300, bbox_inches='tight')
        plt.show()
        
        # 3. Box plots
        fig, axes = plt.subplots(2, 3, figsize=(15, 10))
        
        for i, col in enumerate(numeric_cols):
            row, col_idx = i // 3, i % 3
            sns.boxplot(data=self.dataset, x=col, ax=axes[row, col_idx], color='violet')
            axes[row, col_idx].set_title(f'Box Plot of {col}')
        
        plt.tight_layout()
        if save_dir:
            plt.savefig(f"{save_dir}/boxplots.png", dpi=300, bbox_inches='tight')
        plt.show()
        
        # 4. Model comparison
        if self.models:
            model_names = list(self.models.keys())
            test_r2_scores = [self.models[name]['test_r2'] for name in model_names]
            
            plt.figure(figsize=(10, 6))
            bars = plt.bar(model_names, test_r2_scores, color=['red', 'blue', 'green'])
            plt.title('Model Performance Comparison (Test R²)')
            plt.ylabel('R² Score')
            plt.ylim(0, 1)
            
            # Add value labels on bars
            for bar, score in zip(bars, test_r2_scores):
                plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
                        f'{score:.4f}', ha='center', va='bottom')
            
            plt.tight_layout()
            if save_dir:
                plt.savefig(f"{save_dir}/model_comparison.png", dpi=300, bbox_inches='tight')
            plt.show()


def main():
    """
    Main function to run the complete gold price prediction pipeline.
    """
    try:
        # Initialize predictor
        predictor = GoldPricePredictor("gold_price_data.csv")
        
        # Load and explore data
        predictor.load_data()
        predictor.explore_data()
        
        # Create initial visualizations
        predictor.create_correlation_heatmap()
        
        # Preprocess data
        predictor.dataset = predictor.preprocess_data()
        
        # Prepare features
        X, y = predictor.prepare_features()
        
        # Split and scale data
        predictor.split_and_scale_data(X, y)
        
        # Train models
        predictor.train_lasso_model()
        predictor.train_random_forest_model()
        predictor.train_xgboost_model()
        
        # Compare models
        comparison = predictor.compare_models()
        
        # Create visualizations
        predictor.create_visualizations()
        
        # Plot feature importance for best model
        predictor.plot_feature_importance('random_forest')
        predictor.plot_feature_importance('xgboost')
        
        # Save best model
        predictor.save_best_model('xgboost', 'gold_price_model.pkl')
        
        logger.info("Gold price prediction pipeline completed successfully!")
        
    except Exception as e:
        logger.error(f"Error in main pipeline: {str(e)}")
        raise


if __name__ == "__main__":
    main()
