In [0]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
import catboost as cb
import joblib
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

class PricePredictionModel:
    """Class to handle price prediction modeling with preprocessing, training, evaluation, and model saving."""
    
    def __init__(self, data_query):
        """Initialize with Spark SQL query."""
        self.data_query = data_query
        self.models = {
            "LinearRegression": LinearRegression(),
            "RandomForest": RandomForestRegressor(random_state=42),
            "XGBoost": xgb.XGBRegressor(random_state=42),
            "CatBoost": cb.CatBoostRegressor(verbose=0, random_state=42)
        }
        self.param_grids = {
            "LinearRegression": {
                "model__fit_intercept": [True, False]
            },
            "RandomForest": {
                "model__n_estimators": [100, 200],
                "model__max_depth": [5, 10]
            },
            "XGBoost": {
                "model__n_estimators": [100, 200],
                "model__max_depth": [3, 5],
                "model__learning_rate": [0.01, 0.1]
            },
            "CatBoost": {
                "model__iterations": [100, 200],
                "model__depth": [4, 6],
                "model__learning_rate": [0.01, 0.1]
            }
        }
        self.target = 'MODAL_PRICE'
        self.results = []
        self.best_model = None
        self.best_model_name = None
        self.X_train = None
        self.X_test = None
        self.y_train = None
        self.y_test = None
        self.preprocessor = None
        
    def preprocess_data(self, spark):
        """Preprocess the data from Spark query."""
        df = spark.sql(self.data_query).toPandas()
        df['DATE_OF_PRICING'] = pd.to_datetime(df['DATE_OF_PRICING'])
        df['month'] = df['DATE_OF_PRICING'].dt.month
        df['dayofweek'] = df['DATE_OF_PRICING'].dt.dayofweek
        df['year'] = df['DATE_OF_PRICING'].dt.year
        
        columns_to_drop = ['DATE_OF_PRICING', 'lakehouse_inserted_date', 
                           'lakehouse_updated_date', 'ROW_ID', 
                           'TEMPARATURE_UNIT', 'MARKET_MAX_TEMPARATURE', 'MARKET_POPULATION']
        df = df.drop(columns=columns_to_drop, errors='ignore')
        df = df.dropna()
        
        features = df.columns.drop([self.target, 'MAXIMUM_PRICE', 'MINIMUM_PRICE'])
        X = df[features]
        y = df[self.target]
        
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=0.2, shuffle=False
        )
        
        numerical_cols = X.select_dtypes(include=['float64', 'int64']).columns.tolist()
        categorical_cols = X.select_dtypes(include=['object']).columns.tolist()
        
        numeric_transformer = Pipeline(steps=[
            ("scaler", StandardScaler())
        ])
        
        categorical_transformer = Pipeline(steps=[
            ("onehot", OneHotEncoder(handle_unknown='ignore'))
        ])
        
        self.preprocessor = ColumnTransformer(transformers=[
            ("num", numeric_transformer, numerical_cols),
            ("cat", categorical_transformer, categorical_cols)
        ])
        
        return self
    
    def train_and_evaluate(self):
        """Train models with GridSearchCV and evaluate performance."""
        self.results = []  # Clear results to avoid duplication
        
        for name, model in self.models.items():
            pipeline = Pipeline(steps=[
                ("preprocessor", self.preprocessor),
                ("model", model)
            ])
            
            # Perform GridSearchCV
            grid = GridSearchCV(
                pipeline,
                param_grid=self.param_grids[name],
                cv=3,
                scoring='neg_root_mean_squared_error',
                n_jobs=-1
            )
            grid.fit(self.X_train, self.y_train)
            
            # Use the best estimator from GridSearchCV
            best_pipeline = grid.best_estimator_
            
            # Get predictions
            train_pred = best_pipeline.predict(self.X_train)
            test_pred = best_pipeline.predict(self.X_test)
            
            # Calculate metrics
            train_rmse = mean_squared_error(self.y_train, train_pred, squared=False)
            test_rmse = mean_squared_error(self.y_test, test_pred, squared=False)
            train_r2 = r2_score(self.y_train, train_pred)
            test_r2 = r2_score(self.y_test, test_pred)
            
            # Check for overfitting/underfitting
            fit_status = self._check_fit_status(train_rmse, test_rmse, train_r2, test_r2)
            
            self.results.append({
                "Model": name,
                "Train_RMSE": train_rmse,
                "Test_RMSE": test_rmse,
                "Train_R2": train_r2,
                "Test_R2": test_r2,
                "MAE": mean_absolute_error(self.y_test, test_pred),
                "Fit_Status": fit_status,
                "Best_Params": grid.best_params_
            })
            
            # Update best model if current model has lower Test_RMSE
            if self.best_model is None or test_rmse < min(r['Test_RMSE'] for r in self.results[:-1]):
                self.best_model = best_pipeline
                self.best_model_name = name
    
    def _check_fit_status(self, train_rmse, test_rmse, train_r2, test_r2):
        """Check for overfitting and underfitting."""
        rmse_diff = abs(train_rmse - test_rmse) / train_rmse
        r2_diff = abs(train_r2 - test_r2)
        
        if rmse_diff > 0.2 and train_rmse < test_rmse:
            return "Overfitting"
        elif train_r2 < 0.5 and test_r2 < 0.5:
            return "Underfitting"
        return "Good Fit"
    
    def save_best_model(self, filename="best_price_prediction_model.joblib"):
        """Save the best model to a file."""
        if self.best_model is None:
            raise ValueError("No best model found. Please run train_and_evaluate first.")
        joblib.dump(self.best_model, filename)
        print(f"Best model ({self.best_model_name}) saved to {filename}")
    
    def get_results(self):
        """Return results sorted by Test_RMSE."""
        results_df = pd.DataFrame(self.results).sort_values("Test_RMSE")
        return results_df
    
    def run(self, spark, save_model=True, model_filename="best_price_prediction_model.joblib"):
        """Run the complete modeling pipeline."""
        self.preprocess_data(spark)
        self.train_and_evaluate()
        if save_model:
            self.save_best_model(model_filename)
        return self.get_results()

# Usage example
if __name__ == "__main__":
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("PricePrediction").getOrCreate()
    query = "SELECT * FROM pricing_analytics.gold.datalake_price_prediction_gold"
    model = PricePredictionModel(query)
    results = model.run(spark, save_model=True)
    print("\nFinal Results:")
    print(results)