In [0]:
"""
Databricks Random Forest Incident Prediction System
==================================================

This program reads incident data from a Databricks table, performs Random Forest
prediction for asset risk scoring, and writes results back to Databricks.

Requirements:
- databricks-connect or running in Databricks environment
- pandas, scikit-learn, numpy
- Access to source and target Databricks tables

Author: ML Prediction System
Date: 2025
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from typing import Dict, List, Tuple, Optional

# Databricks/Spark imports
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, isnan, isnull, regexp_extract, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

# ML imports
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, mean_absolute_error
import joblib

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DatabricksRandomForestPredictor:
    """
    Random Forest predictor for IT incident risk analysis using Databricks
    """
    
    def __init__(self, spark_session: Optional[SparkSession] = None):
        """
        Initialize the predictor with Spark session
        
        Args:
            spark_session: Optional SparkSession, creates new one if None
        """
        self.spark = spark_session or SparkSession.builder.appName("IncidentRiskPredictor").getOrCreate()
        self.label_encoders = {}
        self.scaler = StandardScaler()
        self.rf_classifier = None
        self.rf_regressor = None
        self.feature_columns = []
        self.feature_importance = {}
        
        logger.info("Databricks Random Forest Predictor initialized")
    
    def read_incident_data(self, table_name: str) -> DataFrame:
        """
        Read incident data from Databricks table
        
        Args:
            table_name: Name of the Databricks table (e.g., 'default.incidents')
            
        Returns:
            Spark DataFrame with incident data
        """
        try:
            logger.info(f"Reading data from table: {table_name}")
            
            # Read from Databricks table
            df = self.spark.table(table_name)
            
            # Log data info
            row_count = df.count()
            col_count = len(df.columns)
            logger.info(f"Loaded {row_count} rows, {col_count} columns from {table_name}")
            
            return df
            
        except Exception as e:
            logger.error(f"Error reading from table {table_name}: {str(e)}")
            raise
    
    def clean_and_prepare_data(self, df: DataFrame) -> pd.DataFrame:
        """
        Clean and prepare incident data for ML processing
        
        Args:
            df: Raw Spark DataFrame
            
        Returns:
            Cleaned pandas DataFrame ready for ML
        """
        logger.info("Starting data cleaning and preparation")
        
        try:
            # Convert to pandas for easier processing
            pdf = df.toPandas()
            
            # Clean column names (remove spaces, special characters)
            pdf.columns = [col.strip().replace(' ', '_').replace('/', '_') for col in pdf.columns]
            
            # Parse dates
            date_columns = ['Created_Date', 'Last_Modified', 'Resolved_Date', 'SLA_Due']
            for col in date_columns:
                if col in pdf.columns:
                    pdf[col] = pd.to_datetime(pdf[col], errors='coerce')
            
            # Extract priority numbers
            if 'Priority' in pdf.columns:
                pdf['Priority_Num'] = pdf['Priority'].str.extract(r'(\d+)').astype(float)
                pdf['Priority_Num'] = pdf['Priority_Num'].fillna(3)  # Default to moderate
            
            # Clean asset names
            if 'CI' in pdf.columns:
                pdf['Asset_Name'] = pdf['CI'].fillna('Unknown').str.strip()
            
            # Clean locations
            if 'Location' in pdf.columns:
                pdf['Location_Clean'] = pdf['Location'].fillna('Unknown').str.strip()
            
            # Calculate resolution times (hours)
            if 'Created_Date' in pdf.columns and 'Resolved_Date' in pdf.columns:
                pdf['Resolution_Hours'] = (
                    pdf['Resolved_Date'] - pdf['Created_Date']
                ).dt.total_seconds() / 3600
                pdf['Resolution_Hours'] = pdf['Resolution_Hours'].fillna(0)
            
            # Create time-based features
            if 'Created_Date' in pdf.columns:
                pdf['Day_of_Week'] = pdf['Created_Date'].dt.dayofweek
                pdf['Hour_of_Day'] = pdf['Created_Date'].dt.hour
                pdf['Day_of_Month'] = pdf['Created_Date'].dt.day
                pdf['Is_Weekend'] = pdf['Day_of_Week'].isin([5, 6]).astype(int)
            
            # Create risk labels based on priority and resolution time
            pdf['Risk_Level'] = 'Low'
            if 'Priority_Num' in pdf.columns:
                pdf.loc[pdf['Priority_Num'] == 1, 'Risk_Level'] = 'High'
                pdf.loc[pdf['Priority_Num'] == 2, 'Risk_Level'] = 'High'
                pdf.loc[(pdf['Priority_Num'] == 3) & (pdf.get('Resolution_Hours', 0) > 24), 'Risk_Level'] = 'Medium'
            
            # Fill missing values
            categorical_columns = ['Category', 'Subcategory', 'Status', 'Assigned_Group', 'Incident_Type']
            for col in categorical_columns:
                if col in pdf.columns:
                    pdf[col] = pdf[col].fillna('Unknown')
            
            logger.info(f"Data cleaning completed. Shape: {pdf.shape}")
            logger.info(f"Risk level distribution:\n{pdf['Risk_Level'].value_counts()}")
            
            return pdf
            
        except Exception as e:
            logger.error(f"Error in data cleaning: {str(e)}")
            raise
    
    def create_features(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
        """
        Create features for Random Forest model
        
        Args:
            df: Cleaned pandas DataFrame
            
        Returns:
            Tuple of (feature DataFrame, list of feature column names)
        """
        logger.info("Creating features for Random Forest model")
        
        try:
            features_df = df.copy()
            
            # Asset-based features
            if 'Asset_Name' in df.columns:
                # Count incidents per asset
                asset_counts = df['Asset_Name'].value_counts()
                features_df['Asset_Incident_Count'] = features_df['Asset_Name'].map(asset_counts)
                
                # Encode asset names
                if 'Asset_Name' not in self.label_encoders:
                    self.label_encoders['Asset_Name'] = LabelEncoder()
                    features_df['Asset_Encoded'] = self.label_encoders['Asset_Name'].fit_transform(features_df['Asset_Name'])
                else:
                    # Handle new assets not seen during training
                    known_assets = set(self.label_encoders['Asset_Name'].classes_)
                    features_df['Asset_Name_Clean'] = features_df['Asset_Name'].apply(
                        lambda x: x if x in known_assets else 'Unknown'
                    )
                    features_df['Asset_Encoded'] = self.label_encoders['Asset_Name'].transform(features_df['Asset_Name_Clean'])
            
            # Categorical feature encoding
            categorical_features = ['Category', 'Subcategory', 'Status', 'Assigned_Group', 'Location_Clean', 'Incident_Type']
            
            for feature in categorical_features:
                if feature in df.columns:
                    if feature not in self.label_encoders:
                        self.label_encoders[feature] = LabelEncoder()
                        features_df[f'{feature}_Encoded'] = self.label_encoders[feature].fit_transform(features_df[feature])
                    else:
                        # Handle new categories
                        known_categories = set(self.label_encoders[feature].classes_)
                        features_df[f'{feature}_Clean'] = features_df[feature].apply(
                            lambda x: x if x in known_categories else 'Unknown'
                        )
                        features_df[f'{feature}_Encoded'] = self.label_encoders[feature].transform(features_df[f'{feature}_Clean'])
            
            # Select feature columns for ML
            feature_columns = [
                'Priority_Num', 'Day_of_Week', 'Hour_of_Day', 'Day_of_Month', 'Is_Weekend',
                'Asset_Incident_Count', 'Asset_Encoded'
            ]
            
            # Add encoded categorical features
            for feature in categorical_features:
                if feature in df.columns:
                    feature_columns.append(f'{feature}_Encoded')
            
            # Only keep columns that exist
            feature_columns = [col for col in feature_columns if col in features_df.columns]
            
            # Fill any remaining NaN values
            features_df[feature_columns] = features_df[feature_columns].fillna(0)
            
            self.feature_columns = feature_columns
            logger.info(f"Created {len(feature_columns)} features: {feature_columns}")
            
            return features_df, feature_columns
            
        except Exception as e:
            logger.error(f"Error creating features: {str(e)}")
            raise
    
    def train_random_forest_models(self, df: pd.DataFrame, feature_columns: List[str]) -> Dict:
        """
        Train Random Forest models for classification and regression
        
        Args:
            df: DataFrame with features
            feature_columns: List of feature column names
            
        Returns:
            Dictionary with training results and metrics
        """
        logger.info("Training Random Forest models")
        
        try:
            X = df[feature_columns].values
            
            # Classification model (Risk Level)
            y_class = df['Risk_Level']
            le_risk = LabelEncoder()
            y_class_encoded = le_risk.fit_transform(y_class)
            
            X_train_class, X_test_class, y_train_class, y_test_class = train_test_split(
                X, y_class_encoded, test_size=0.2, random_state=42, stratify=y_class_encoded
            )
            
            # Train classification model
            self.rf_classifier = RandomForestClassifier(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                min_samples_leaf=2,
                random_state=42,
                class_weight='balanced'
            )
            
            self.rf_classifier.fit(X_train_class, y_train_class)
            y_pred_class = self.rf_classifier.predict(X_test_class)
            
            class_accuracy = accuracy_score(y_test_class, y_pred_class)
            
            # Regression model (Risk Score)
            # Create risk scores based on priority and other factors
            df['Risk_Score'] = df['Priority_Num'].map({1: 2.0, 2: 1.5, 3: 1.0, 4: 0.5})
            df['Risk_Score'] += df['Asset_Incident_Count'] * 0.1  # Add volume factor
            
            y_reg = df['Risk_Score']
            X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
                X, y_reg, test_size=0.2, random_state=42
            )
            
            # Train regression model
            self.rf_regressor = RandomForestRegressor(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                min_samples_leaf=2,
                random_state=42
            )
            
            self.rf_regressor.fit(X_train_reg, y_train_reg)
            y_pred_reg = self.rf_regressor.predict(X_test_reg)
            
            reg_mae = mean_absolute_error(y_test_reg, y_pred_reg)
            
            # Feature importance
            feature_importance = dict(zip(feature_columns, self.rf_classifier.feature_importances_))
            self.feature_importance = dict(sorted(feature_importance.items(), key=lambda x: x[1], reverse=True))
            
            results = {
                'classification_accuracy': class_accuracy,
                'regression_mae': reg_mae,
                'feature_importance': self.feature_importance,
                'risk_label_encoder': le_risk,
                'n_samples': len(df),
                'n_features': len(feature_columns)
            }
            
            logger.info(f"Model training completed:")
            logger.info(f"- Classification accuracy: {class_accuracy:.3f}")
            logger.info(f"- Regression MAE: {reg_mae:.3f}")
            logger.info(f"- Top 3 features: {list(self.feature_importance.keys())[:3]}")
            
            return results
            
        except Exception as e:
            logger.error(f"Error training models: {str(e)}")
            raise
    
    def make_predictions(self, df: pd.DataFrame, feature_columns: List[str]) -> pd.DataFrame:
        """
        Make predictions on new data
        
        Args:
            df: DataFrame with features
            feature_columns: List of feature column names
            
        Returns:
            DataFrame with predictions added
        """
        logger.info("Making predictions with Random Forest models")
        
        try:
            if self.rf_classifier is None or self.rf_regressor is None:
                raise ValueError("Models not trained yet. Call train_random_forest_models first.")
            
            X = df[feature_columns].values
            
            # Classification predictions
            risk_pred_encoded = self.rf_classifier.predict(X)
            risk_pred_proba = self.rf_classifier.predict_proba(X)
            
            # Get class labels
            risk_classes = self.rf_classifier.classes_
            
            # Convert back to labels
            df['Predicted_Risk_Level'] = risk_pred_encoded
            df['Risk_Level_Confidence'] = np.max(risk_pred_proba, axis=1)
            
            # Regression predictions
            df['Predicted_Risk_Score'] = self.rf_regressor.predict(X)
            
            # Add recommendations based on predictions
            df['Recommendation'] = df.apply(self._generate_recommendation, axis=1)
            
            # Add prediction timestamp
            df['Prediction_Timestamp'] = datetime.now()
            
            logger.info(f"Predictions completed for {len(df)} records")
            
            return df
            
        except Exception as e:
            logger.error(f"Error making predictions: {str(e)}")
            raise
    
    def _generate_recommendation(self, row) -> str:
        """Generate recommendation based on predictions"""
        risk_score = row.get('Predicted_Risk_Score', 0)
        risk_level = row.get('Predicted_Risk_Level', 0)
        
        if risk_score >= 2.0 or risk_level == 2:  # High risk
            return "Immediate monitoring and proactive maintenance required"
        elif risk_score >= 1.5 or risk_level == 1:  # Medium risk
            return "Enhanced monitoring and scheduled maintenance recommended"
        else:  # Low risk
            return "Standard monitoring sufficient"
    
    def save_predictions_to_databricks(self, df: pd.DataFrame, target_table: str, mode: str = "overwrite"):
        """
        Save predictions to Databricks table
        
        Args:
            df: DataFrame with predictions
            target_table: Target table name (e.g., 'default.incident_predictions')
            mode: Write mode ('overwrite', 'append')
        """
        logger.info(f"Saving predictions to Databricks table: {target_table}")
        
        try:
            # Select relevant columns for output
            output_columns = [
                'Asset_Name', 'Category', 'Priority', 'Location_Clean',
                'Predicted_Risk_Level', 'Predicted_Risk_Score', 'Risk_Level_Confidence',
                'Recommendation', 'Prediction_Timestamp'
            ]
            
            # Add original identifier columns if available
            id_columns = ['Asset_Number', 'Incident_Number']
            for col in id_columns:
                if col in df.columns:
                    output_columns.insert(0, col)
            
            # Filter to existing columns
            available_columns = [col for col in output_columns if col in df.columns]
            output_df = df[available_columns].copy()
            
            # Convert to Spark DataFrame
            spark_df = self.spark.createDataFrame(output_df)
            
            # Write to Databricks table
            spark_df.write.mode(mode).saveAsTable(target_table)
            
            logger.info(f"Successfully saved {len(output_df)} predictions to {target_table}")
            
            # Log sample of results
            logger.info("Sample predictions:")
            sample_df = output_df.head(5)
            for _, row in sample_df.iterrows():
                logger.info(f"Asset: {row.get('Asset_Name', 'N/A')}, "
                          f"Risk Score: {row.get('Predicted_Risk_Score', 0):.2f}, "
                          f"Level: {row.get('Predicted_Risk_Level', 'N/A')}")
            
        except Exception as e:
            logger.error(f"Error saving to Databricks: {str(e)}")
            raise
    
    def run_full_pipeline(self, source_table: str, target_table: str, retrain: bool = True):
        """
        Run the complete ML pipeline
        
        Args:
            source_table: Source table with incident data
            target_table: Target table for predictions
            retrain: Whether to retrain models
        """
        logger.info("Starting full Random Forest prediction pipeline")
        
        try:
            # Step 1: Read data
            raw_df = self.read_incident_data(source_table)
            
            # Step 2: Clean and prepare data
            clean_df = self.clean_and_prepare_data(raw_df)
            
            # Step 3: Create features
            features_df, feature_columns = self.create_features(clean_df)
            
            # Step 4: Train models (if needed)
            if retrain:
                training_results = self.train_random_forest_models(features_df, feature_columns)
                logger.info(f"Training results: {training_results}")
            
            # Step 5: Make predictions
            predictions_df = self.make_predictions(features_df, feature_columns)
            
            # Step 6: Save to Databricks
            self.save_predictions_to_databricks(predictions_df, target_table)
            
            # Step 7: Generate summary report
            self._generate_summary_report(predictions_df)
            
            logger.info("Pipeline completed successfully!")
            
        except Exception as e:
            logger.error(f"Pipeline failed: {str(e)}")
            raise
    
    def _generate_summary_report(self, df: pd.DataFrame):
        """Generate summary report of predictions"""
        logger.info("=== PREDICTION SUMMARY REPORT ===")
        
        try:
            total_assets = df['Asset_Name'].nunique()
            total_predictions = len(df)
            
            # Risk level distribution
            if 'Predicted_Risk_Level' in df.columns:
                risk_dist = df['Predicted_Risk_Level'].value_counts()
                logger.info(f"Risk Level Distribution: {dict(risk_dist)}")
            
            # Top risk assets
            if 'Predicted_Risk_Score' in df.columns:
                top_risk = df.nlargest(5, 'Predicted_Risk_Score')[['Asset_Name', 'Predicted_Risk_Score']]
                logger.info("Top 5 Risk Assets:")
                for _, row in top_risk.iterrows():
                    logger.info(f"  - {row['Asset_Name']}: {row['Predicted_Risk_Score']:.2f}")
            
            # Feature importance
            if self.feature_importance:
                logger.info("Top 5 Feature Importance:")
                for feature, importance in list(self.feature_importance.items())[:5]:
                    logger.info(f"  - {feature}: {importance:.3f}")
            
            logger.info(f"Total assets analyzed: {total_assets}")
            logger.info(f"Total predictions made: {total_predictions}")
            
        except Exception as e:
            logger.error(f"Error generating summary: {str(e)}")

def main():
    """
    Main execution function
    """
    # Configuration
    SOURCE_TABLE = "sd_bdc_demo.default.service_now_only"  # Replace with your table name
    TARGET_TABLE = "sd_bdc_demo.default.service_now_rf_predictions"
    
    try:
        # Initialize predictor
        predictor = DatabricksRandomForestPredictor()
        
        # Run full pipeline
        predictor.run_full_pipeline(
            source_table=SOURCE_TABLE,
            target_table=TARGET_TABLE,
            retrain=True
        )
        
        print("✅ Random Forest prediction pipeline completed successfully!")
        
    except Exception as e:
        print(f"❌ Pipeline failed: {str(e)}")
        raise

if __name__ == "__main__":
    main()

2025-06-12 23:07:29,623 - INFO - Databricks Random Forest Predictor initialized
2025-06-12 23:07:29,629 - INFO - Starting full Random Forest prediction pipeline
2025-06-12 23:07:29,629 - INFO - Reading data from table: sd_bdc_demo.default.service_now_only
2025-06-12 23:07:40,898 - INFO - Loaded 111 rows, 20 columns from sd_bdc_demo.default.service_now_only
2025-06-12 23:07:40,899 - INFO - Starting data cleaning and preparation
2025-06-12 23:07:44,657 - INFO - Data cleaning completed. Shape: (111, 29)
2025-06-12 23:07:44,659 - INFO - Risk level distribution:
Low       67
High      37
Medium     7
Name: Risk_Level, dtype: int64
2025-06-12 23:07:44,660 - INFO - Creating features for Random Forest model
2025-06-12 23:07:44,669 - INFO - Created 13 features: ['Priority_Num', 'Day_of_Week', 'Hour_of_Day', 'Day_of_Month', 'Is_Weekend', 'Asset_Incident_Count', 'Asset_Encoded', 'Category_Encoded', 'Subcategory_Encoded', 'Status_Encoded', 'Assigned_Group_Encoded', 'Location_Clean_Encoded', 'Incid

✅ Random Forest prediction pipeline completed successfully!


In [0]:
"""
Databricks Random Forest Incident Prediction System
==================================================

This program reads incident data from a Databricks table, performs Random Forest
prediction for asset risk scoring, and writes results back to Databricks.

Requirements:
- databricks-connect or running in Databricks environment
- pandas, scikit-learn, numpy
- Access to source and target Databricks tables

Author: ML Prediction System
Date: 2025
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from typing import Dict, List, Tuple, Optional

# Databricks/Spark imports
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, isnan, isnull, regexp_extract, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

# ML imports
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import (accuracy_score, classification_report, mean_absolute_error, 
                           mean_squared_error, r2_score, explained_variance_score)
import joblib

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DatabricksRandomForestPredictor:
    """
    Random Forest predictor for IT incident risk analysis using Databricks
    """
    
    def __init__(self, spark_session: Optional[SparkSession] = None):
        """
        Initialize the predictor with Spark session
        
        Args:
            spark_session: Optional SparkSession, creates new one if None
        """
        self.spark = spark_session or SparkSession.builder.appName("IncidentRiskPredictor").getOrCreate()
        self.label_encoders = {}
        self.scaler = StandardScaler()
        self.rf_classifier = None
        self.rf_regressor = None
        self.feature_columns = []
        self.feature_importance = {}
        
        logger.info("Databricks Random Forest Predictor initialized")
    
    def read_incident_data(self, table_name: str) -> DataFrame:
        """
        Read incident data from Databricks table
        
        Args:
            table_name: Name of the Databricks table (e.g., 'default.incidents')
            
        Returns:
            Spark DataFrame with incident data
        """
        try:
            logger.info(f"Reading data from table: {table_name}")
            
            # Read from Databricks table
            df = self.spark.table(table_name)
            
            # Log data info
            row_count = df.count()
            col_count = len(df.columns)
            logger.info(f"Loaded {row_count} rows, {col_count} columns from {table_name}")
            
            return df
            
        except Exception as e:
            logger.error(f"Error reading from table {table_name}: {str(e)}")
            raise
    
    def clean_and_prepare_data(self, df: DataFrame) -> pd.DataFrame:
        """
        Clean and prepare incident data for ML processing
        
        Args:
            df: Raw Spark DataFrame
            
        Returns:
            Cleaned pandas DataFrame ready for ML
        """
        logger.info("Starting data cleaning and preparation")
        
        try:
            # Convert to pandas for easier processing
            pdf = df.toPandas()
            
            # Clean column names (remove spaces, special characters)
            pdf.columns = [col.strip().replace(' ', '_').replace('/', '_') for col in pdf.columns]
            
            # Parse dates
            date_columns = ['Created_Date', 'Last_Modified', 'Resolved_Date', 'SLA_Due']
            for col in date_columns:
                if col in pdf.columns:
                    pdf[col] = pd.to_datetime(pdf[col], errors='coerce')
            
            # Extract priority numbers
            if 'Priority' in pdf.columns:
                pdf['Priority_Num'] = pdf['Priority'].str.extract(r'(\d+)').astype(float)
                pdf['Priority_Num'] = pdf['Priority_Num'].fillna(3)  # Default to moderate
            
            # Clean asset names
            if 'CI' in pdf.columns:
                pdf['Asset_Name'] = pdf['CI'].fillna('Unknown').str.strip()
            
            # Clean locations
            if 'Location' in pdf.columns:
                pdf['Location_Clean'] = pdf['Location'].fillna('Unknown').str.strip()
            
            # Calculate resolution times (hours)
            if 'Created_Date' in pdf.columns and 'Resolved_Date' in pdf.columns:
                pdf['Resolution_Hours'] = (
                    pdf['Resolved_Date'] - pdf['Created_Date']
                ).dt.total_seconds() / 3600
                pdf['Resolution_Hours'] = pdf['Resolution_Hours'].fillna(0)
            
            # Create time-based features
            if 'Created_Date' in pdf.columns:
                pdf['Day_of_Week'] = pdf['Created_Date'].dt.dayofweek
                pdf['Hour_of_Day'] = pdf['Created_Date'].dt.hour
                pdf['Day_of_Month'] = pdf['Created_Date'].dt.day
                pdf['Is_Weekend'] = pdf['Day_of_Week'].isin([5, 6]).astype(int)
            
            # Create risk labels based on priority and resolution time
            pdf['Risk_Level'] = 'Low'
            if 'Priority_Num' in pdf.columns:
                pdf.loc[pdf['Priority_Num'] == 1, 'Risk_Level'] = 'High'
                pdf.loc[pdf['Priority_Num'] == 2, 'Risk_Level'] = 'High'
                pdf.loc[(pdf['Priority_Num'] == 3) & (pdf.get('Resolution_Hours', 0) > 24), 'Risk_Level'] = 'Medium'
            
            # Fill missing values
            categorical_columns = ['Category', 'Subcategory', 'Status', 'Assigned_Group', 'Incident_Type']
            for col in categorical_columns:
                if col in pdf.columns:
                    pdf[col] = pdf[col].fillna('Unknown')
            
            logger.info(f"Data cleaning completed. Shape: {pdf.shape}")
            logger.info(f"Risk level distribution:\n{pdf['Risk_Level'].value_counts()}")
            
            return pdf
            
        except Exception as e:
            logger.error(f"Error in data cleaning: {str(e)}")
            raise
    
    def create_features(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
        """
        Create features for Random Forest model
        
        Args:
            df: Cleaned pandas DataFrame
            
        Returns:
            Tuple of (feature DataFrame, list of feature column names)
        """
        logger.info("Creating features for Random Forest model")
        
        try:
            features_df = df.copy()
            
            # Asset-based features
            if 'Asset_Name' in df.columns:
                # Count incidents per asset
                asset_counts = df['Asset_Name'].value_counts()
                features_df['Asset_Incident_Count'] = features_df['Asset_Name'].map(asset_counts)
                
                # Encode asset names
                if 'Asset_Name' not in self.label_encoders:
                    self.label_encoders['Asset_Name'] = LabelEncoder()
                    features_df['Asset_Encoded'] = self.label_encoders['Asset_Name'].fit_transform(features_df['Asset_Name'])
                else:
                    # Handle new assets not seen during training
                    known_assets = set(self.label_encoders['Asset_Name'].classes_)
                    features_df['Asset_Name_Clean'] = features_df['Asset_Name'].apply(
                        lambda x: x if x in known_assets else 'Unknown'
                    )
                    features_df['Asset_Encoded'] = self.label_encoders['Asset_Name'].transform(features_df['Asset_Name_Clean'])
            
            # Categorical feature encoding
            categorical_features = ['Category', 'Subcategory', 'Status', 'Assigned_Group', 'Location_Clean', 'Incident_Type']
            
            for feature in categorical_features:
                if feature in df.columns:
                    if feature not in self.label_encoders:
                        self.label_encoders[feature] = LabelEncoder()
                        features_df[f'{feature}_Encoded'] = self.label_encoders[feature].fit_transform(features_df[feature])
                    else:
                        # Handle new categories
                        known_categories = set(self.label_encoders[feature].classes_)
                        features_df[f'{feature}_Clean'] = features_df[feature].apply(
                            lambda x: x if x in known_categories else 'Unknown'
                        )
                        features_df[f'{feature}_Encoded'] = self.label_encoders[feature].transform(features_df[f'{feature}_Clean'])
            
            # Select feature columns for ML
            feature_columns = [
                'Priority_Num', 'Day_of_Week', 'Hour_of_Day', 'Day_of_Month', 'Is_Weekend',
                'Asset_Incident_Count', 'Asset_Encoded'
            ]
            
            # Add encoded categorical features
            for feature in categorical_features:
                if feature in df.columns:
                    feature_columns.append(f'{feature}_Encoded')
            
            # Only keep columns that exist
            feature_columns = [col for col in feature_columns if col in features_df.columns]
            
            # Fill any remaining NaN values
            features_df[feature_columns] = features_df[feature_columns].fillna(0)
            
            self.feature_columns = feature_columns
            logger.info(f"Created {len(feature_columns)} features: {feature_columns}")
            
            return features_df, feature_columns
            
        except Exception as e:
            logger.error(f"Error creating features: {str(e)}")
            raise
    
    def train_random_forest_models(self, df: pd.DataFrame, feature_columns: List[str]) -> Dict:
        """
        Train Random Forest models for classification and regression
        
        Args:
            df: DataFrame with features
            feature_columns: List of feature column names
            
        Returns:
            Dictionary with training results and metrics including MSE and R²
        """
        logger.info("Training Random Forest models")
        
        try:
            X = df[feature_columns].values
            
            # Classification model (Risk Level)
            y_class = df['Risk_Level']
            le_risk = LabelEncoder()
            y_class_encoded = le_risk.fit_transform(y_class)
            
            X_train_class, X_test_class, y_train_class, y_test_class = train_test_split(
                X, y_class_encoded, test_size=0.2, random_state=42, stratify=y_class_encoded
            )
            
            # Train classification model
            self.rf_classifier = RandomForestClassifier(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                min_samples_leaf=2,
                random_state=42,
                class_weight='balanced'
            )
            
            self.rf_classifier.fit(X_train_class, y_train_class)
            y_pred_class = self.rf_classifier.predict(X_test_class)
            
            class_accuracy = accuracy_score(y_test_class, y_pred_class)
            
            # Regression model (Risk Score) - Enhanced for better R² calculation
            # Create more sophisticated risk scores
            df['Risk_Score'] = 0.0
            
            # Base score from priority (primary factor)
            priority_scores = {1: 3.0, 2: 2.0, 3: 1.0, 4: 0.5}
            df['Risk_Score'] += df['Priority_Num'].map(priority_scores).fillna(1.0)
            
            # Add incident volume factor (normalized)
            max_incidents = df['Asset_Incident_Count'].max() if 'Asset_Incident_Count' in df.columns else 1
            if max_incidents > 0:
                df['Risk_Score'] += (df.get('Asset_Incident_Count', 0) / max_incidents) * 1.5
            
            # Add resolution time factor
            if 'Resolution_Hours' in df.columns:
                # Normalize resolution hours (cap at 48 hours)
                normalized_hours = np.clip(df['Resolution_Hours'] / 48.0, 0, 1)
                df['Risk_Score'] += normalized_hours * 0.5
            
            # Add weekend/time factor
            if 'Is_Weekend' in df.columns:
                df['Risk_Score'] += df['Is_Weekend'] * 0.2  # Weekend incidents slightly riskier
            
            # Add category-based risk multiplier
            if 'Category' in df.columns:
                high_risk_categories = ['Security', 'Infrastructure', 'Network']
                df['Category_Risk_Multiplier'] = df['Category'].apply(
                    lambda x: 1.2 if x in high_risk_categories else 1.0
                )
                df['Risk_Score'] *= df['Category_Risk_Multiplier']
            
            y_reg = df['Risk_Score']
            X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
                X, y_reg, test_size=0.2, random_state=42
            )
            
            # Train regression model with optimized parameters
            self.rf_regressor = RandomForestRegressor(
                n_estimators=150,
                max_depth=12,
                min_samples_split=3,
                min_samples_leaf=1,
                max_features='sqrt',
                random_state=42,
                n_jobs=-1  # Use all CPU cores
            )
            
            self.rf_regressor.fit(X_train_reg, y_train_reg)
            y_pred_reg = self.rf_regressor.predict(X_test_reg)
            
            # Calculate comprehensive regression metrics
            reg_mse = mean_squared_error(y_test_reg, y_pred_reg)
            reg_rmse = np.sqrt(reg_mse)
            reg_mae = mean_absolute_error(y_test_reg, y_pred_reg)
            reg_r2 = r2_score(y_test_reg, y_pred_reg)
            reg_explained_var = explained_variance_score(y_test_reg, y_pred_reg)
            
            # Calculate additional metrics
            y_mean = np.mean(y_test_reg)
            reg_mape = np.mean(np.abs((y_test_reg - y_pred_reg) / y_test_reg)) * 100  # Mean Absolute Percentage Error
            
            # Feature importance
            feature_importance = dict(zip(feature_columns, self.rf_regressor.feature_importances_))
            self.feature_importance = dict(sorted(feature_importance.items(), key=lambda x: x[1], reverse=True))
            
            # Cross-validation score for more robust R²
            from sklearn.model_selection import cross_val_score
            cv_r2_scores = cross_val_score(self.rf_regressor, X, y_reg, cv=5, scoring='r2')
            cv_r2_mean = np.mean(cv_r2_scores)
            cv_r2_std = np.std(cv_r2_scores)
            
            results = {
                # Classification metrics
                'classification_accuracy': class_accuracy,
                
                # Regression metrics
                'regression_mse': reg_mse,
                'regression_rmse': reg_rmse,
                'regression_mae': reg_mae,
                'regression_r2': reg_r2,
                'regression_explained_variance': reg_explained_var,
                'regression_mape': reg_mape,
                
                # Cross-validation metrics
                'cv_r2_mean': cv_r2_mean,
                'cv_r2_std': cv_r2_std,
                
                # Model details
                'feature_importance': self.feature_importance,
                'risk_label_encoder': le_risk,
                'n_samples': len(df),
                'n_features': len(feature_columns),
                'target_mean': y_mean,
                'target_std': np.std(y_reg)
            }
            
            # Log detailed metrics
            logger.info(f"Model training completed:")
            logger.info(f"- Classification accuracy: {class_accuracy:.4f}")
            logger.info(f"- Regression R²: {reg_r2:.4f} ({reg_r2*100:.2f}%)")
            logger.info(f"- Regression MSE: {reg_mse:.6f}")
            logger.info(f"- Regression RMSE: {reg_rmse:.4f}")
            logger.info(f"- Regression MAE: {reg_mae:.4f}")
            logger.info(f"- Cross-validation R² (mean±std): {cv_r2_mean:.4f}±{cv_r2_std:.4f}")
            logger.info(f"- Mean Absolute Percentage Error: {reg_mape:.2f}%")
            logger.info(f"- Explained Variance: {reg_explained_var:.4f}")
            logger.info(f"- Top 3 features: {list(self.feature_importance.keys())[:3]}")
            
            return results
            
        except Exception as e:
            logger.error(f"Error training models: {str(e)}")
            raise
    
    def make_predictions(self, df: pd.DataFrame, feature_columns: List[str]) -> pd.DataFrame:
        """
        Make predictions on new data
        
        Args:
            df: DataFrame with features
            feature_columns: List of feature column names
            
        Returns:
            DataFrame with predictions added
        """
        logger.info("Making predictions with Random Forest models")
        
        try:
            if self.rf_classifier is None or self.rf_regressor is None:
                raise ValueError("Models not trained yet. Call train_random_forest_models first.")
            
            X = df[feature_columns].values
            
            # Classification predictions
            risk_pred_encoded = self.rf_classifier.predict(X)
            risk_pred_proba = self.rf_classifier.predict_proba(X)
            
            # Get class labels
            risk_classes = self.rf_classifier.classes_
            
            # Convert back to labels
            df['Predicted_Risk_Level'] = risk_pred_encoded
            df['Risk_Level_Confidence'] = np.max(risk_pred_proba, axis=1)
            
            # Regression predictions
            df['Predicted_Risk_Score'] = self.rf_regressor.predict(X)
            
            # Add recommendations based on predictions
            df['Recommendation'] = df.apply(self._generate_recommendation, axis=1)
            
            # Add prediction timestamp
            df['Prediction_Timestamp'] = datetime.now()
            
            logger.info(f"Predictions completed for {len(df)} records")
            
            return df
            
        except Exception as e:
            logger.error(f"Error making predictions: {str(e)}")
            raise
    
    def _generate_recommendation(self, row) -> str:
        """Generate recommendation based on predictions"""
        risk_score = row.get('Predicted_Risk_Score', 0)
        risk_level = row.get('Predicted_Risk_Level', 0)
        
        if risk_score >= 2.0 or risk_level == 2:  # High risk
            return "Immediate monitoring and proactive maintenance required"
        elif risk_score >= 1.5 or risk_level == 1:  # Medium risk
            return "Enhanced monitoring and scheduled maintenance recommended"
        else:  # Low risk
            return "Standard monitoring sufficient"
    
    def save_predictions_to_databricks(self, df: pd.DataFrame, target_table: str, mode: str = "overwrite"):
        """
        Save predictions to Databricks table
        
        Args:
            df: DataFrame with predictions
            target_table: Target table name (e.g., 'default.incident_predictions')
            mode: Write mode ('overwrite', 'append')
        """
        logger.info(f"Saving predictions to Databricks table: {target_table}")
        
        try:
            # Select relevant columns for output
            output_columns = [
                'Asset_Name', 'Category', 'Priority', 'Location_Clean',
                'Predicted_Risk_Level', 'Predicted_Risk_Score', 'Risk_Level_Confidence',
                'Recommendation', 'Prediction_Timestamp'
            ]
            
            # Add original identifier columns if available
            id_columns = ['Asset_Number', 'Incident_Number']
            for col in id_columns:
                if col in df.columns:
                    output_columns.insert(0, col)
            
            # Filter to existing columns
            available_columns = [col for col in output_columns if col in df.columns]
            output_df = df[available_columns].copy()
            
            # Convert to Spark DataFrame
            spark_df = self.spark.createDataFrame(output_df)
            
            # Write to Databricks table
            spark_df.write.mode(mode).saveAsTable(target_table)
            
            logger.info(f"Successfully saved {len(output_df)} predictions to {target_table}")
            
            # Log sample of results
            logger.info("Sample predictions:")
            sample_df = output_df.head(5)
            for _, row in sample_df.iterrows():
                logger.info(f"Asset: {row.get('Asset_Name', 'N/A')}, "
                          f"Risk Score: {row.get('Predicted_Risk_Score', 0):.2f}, "
                          f"Level: {row.get('Predicted_Risk_Level', 'N/A')}")
            
        except Exception as e:
            logger.error(f"Error saving to Databricks: {str(e)}")
            raise
    
    def run_full_pipeline(self, source_table: str, target_table: str, retrain: bool = True):
        """
        Run the complete ML pipeline
        
        Args:
            source_table: Source table with incident data
            target_table: Target table for predictions
            retrain: Whether to retrain models
        """
        logger.info("Starting full Random Forest prediction pipeline")
        
        try:
            # Step 1: Read data
            raw_df = self.read_incident_data(source_table)
            
            # Step 2: Clean and prepare data
            clean_df = self.clean_and_prepare_data(raw_df)
            
            # Step 3: Create features
            features_df, feature_columns = self.create_features(clean_df)
            
            # Step 4: Train models (if needed)
            if retrain:
                training_results = self.train_random_forest_models(features_df, feature_columns)
                self._last_training_results = training_results  # Store for summary report
                logger.info(f"Training results: {training_results}")
            
            # Step 5: Make predictions
            predictions_df = self.make_predictions(features_df, feature_columns)
            
            # Step 6: Save to Databricks
            self.save_predictions_to_databricks(predictions_df, target_table)
            
            # Step 7: Generate summary report
            self._generate_summary_report(predictions_df)
            
            logger.info("Pipeline completed successfully!")
            
        except Exception as e:
            logger.error(f"Pipeline failed: {str(e)}")
            raise
    
    def _generate_summary_report(self, df: pd.DataFrame):
        """Generate comprehensive summary report of predictions with model metrics"""
        logger.info("=== COMPREHENSIVE PREDICTION SUMMARY REPORT ===")
        
        try:
            total_assets = df['Asset_Name'].nunique()
            total_predictions = len(df)
            
            # Risk level distribution
            if 'Predicted_Risk_Level' in df.columns:
                risk_dist = df['Predicted_Risk_Level'].value_counts()
                logger.info(f"Risk Level Distribution: {dict(risk_dist)}")
            
            # Top risk assets
            if 'Predicted_Risk_Score' in df.columns:
                top_risk = df.nlargest(5, 'Predicted_Risk_Score')[['Asset_Name', 'Predicted_Risk_Score']]
                logger.info("Top 5 Risk Assets:")
                for _, row in top_risk.iterrows():
                    logger.info(f"  - {row['Asset_Name']}: {row['Predicted_Risk_Score']:.3f}")
                
                # Risk score statistics
                risk_stats = df['Predicted_Risk_Score'].describe()
                logger.info(f"Risk Score Statistics:")
                logger.info(f"  - Mean: {risk_stats['mean']:.3f}")
                logger.info(f"  - Std Dev: {risk_stats['std']:.3f}")
                logger.info(f"  - Min: {risk_stats['min']:.3f}")
                logger.info(f"  - Max: {risk_stats['max']:.3f}")
            
            # Feature importance
            if self.feature_importance:
                logger.info("Top 5 Feature Importance:")
                for feature, importance in list(self.feature_importance.items())[:5]:
                    logger.info(f"  - {feature}: {importance:.4f} ({importance*100:.2f}%)")
            
            # Model performance summary
            logger.info("=== MODEL PERFORMANCE METRICS ===")
            if hasattr(self, '_last_training_results'):
                results = self._last_training_results
                logger.info(f"R-squared (R²): {results.get('regression_r2', 0):.4f} ({results.get('regression_r2', 0)*100:.2f}%)")
                logger.info(f"Mean Squared Error (MSE): {results.get('regression_mse', 0):.6f}")
                logger.info(f"Root Mean Squared Error (RMSE): {results.get('regression_rmse', 0):.4f}")
                logger.info(f"Mean Absolute Error (MAE): {results.get('regression_mae', 0):.4f}")
                logger.info(f"Mean Absolute Percentage Error (MAPE): {results.get('regression_mape', 0):.2f}%")
                logger.info(f"Cross-validation R²: {results.get('cv_r2_mean', 0):.4f}±{results.get('cv_r2_std', 0):.4f}")
                logger.info(f"Classification Accuracy: {results.get('classification_accuracy', 0):.4f}")
                
                # Model interpretation
                r2 = results.get('regression_r2', 0)
                if r2 > 0.9:
                    logger.info("✅ EXCELLENT model performance (R² > 0.9)")
                elif r2 > 0.8:
                    logger.info("✅ VERY GOOD model performance (R² > 0.8)")  
                elif r2 > 0.7:
                    logger.info("✅ GOOD model performance (R² > 0.7)")
                elif r2 > 0.5:
                    logger.info("⚠️ MODERATE model performance (R² > 0.5)")
                else:
                    logger.info("❌ POOR model performance (R² < 0.5)")
                
                logger.info(f"The model explains {r2*100:.1f}% of the variance in risk scores")
            
            logger.info(f"\nTotal assets analyzed: {total_assets}")
            logger.info(f"Total predictions made: {total_predictions}")
            logger.info("=== END REPORT ===")
            
        except Exception as e:
            logger.error(f"Error generating summary: {str(e)}")

def main():
    """
    Main execution function
    """
    # Configuration
    SOURCE_TABLE = "sd_bdc_demo.default.service_now_only"  # Replace with your table name
    TARGET_TABLE = "sd_bdc_demo.default.service_now_rf_predictions"
    
    try:
        # Initialize predictor
        predictor = DatabricksRandomForestPredictor()
        
        # Run full pipeline
        predictor.run_full_pipeline(
            source_table=SOURCE_TABLE,
            target_table=TARGET_TABLE,
            retrain=True
        )
        
        print("✅ Random Forest prediction pipeline completed successfully!")
        
    except Exception as e:
        print(f"❌ Pipeline failed: {str(e)}")
        raise

if __name__ == "__main__":
    main()

2025-06-12 23:26:09,134 - INFO - Databricks Random Forest Predictor initialized
2025-06-12 23:26:09,135 - INFO - Starting full Random Forest prediction pipeline
2025-06-12 23:26:09,136 - INFO - Reading data from table: sd_bdc_demo.default.service_now_only
2025-06-12 23:26:10,028 - INFO - Loaded 111 rows, 20 columns from sd_bdc_demo.default.service_now_only
2025-06-12 23:26:10,030 - INFO - Starting data cleaning and preparation
2025-06-12 23:26:10,418 - INFO - Data cleaning completed. Shape: (111, 29)
2025-06-12 23:26:10,420 - INFO - Risk level distribution:
Low       67
High      37
Medium     7
Name: Risk_Level, dtype: int64
2025-06-12 23:26:10,422 - INFO - Creating features for Random Forest model
2025-06-12 23:26:10,432 - INFO - Created 13 features: ['Priority_Num', 'Day_of_Week', 'Hour_of_Day', 'Day_of_Month', 'Is_Weekend', 'Asset_Incident_Count', 'Asset_Encoded', 'Category_Encoded', 'Subcategory_Encoded', 'Status_Encoded', 'Assigned_Group_Encoded', 'Location_Clean_Encoded', 'Incid

✅ Random Forest prediction pipeline completed successfully!


In [0]:
"""
Databricks Random Forest Incident Prediction System - FIXED VERSION
==================================================================

This program reads incident data from a Databricks table, performs Random Forest
prediction for asset risk scoring, and writes results back to Databricks.

Requirements:
- databricks-connect or running in Databricks environment
- pandas, scikit-learn, numpy
- Access to source and target Databricks tables

Author: ML Prediction System
Date: 2025
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
from typing import Dict, List, Tuple, Optional

# Databricks/Spark imports
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, isnan, isnull, regexp_extract, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType

# ML imports
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (accuracy_score, classification_report, mean_absolute_error, 
                           mean_squared_error, r2_score, explained_variance_score)
import joblib

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DatabricksRandomForestPredictor:
    """
    Random Forest predictor for IT incident risk analysis using Databricks
    """
    
    def __init__(self, spark_session: Optional[SparkSession] = None):
        """
        Initialize the predictor with Spark session
        
        Args:
            spark_session: Optional SparkSession, creates new one if None
        """
        self.spark = spark_session or SparkSession.builder.appName("IncidentRiskPredictor").getOrCreate()
        self.label_encoders = {}
        self.scaler = StandardScaler()
        self.rf_classifier = None
        self.rf_regressor = None
        self.feature_columns = []
        self.feature_importance = {}
        
        logger.info("Databricks Random Forest Predictor initialized")
    
    def read_incident_data(self, table_name: str) -> DataFrame:
        """
        Read incident data from Databricks table
        
        Args:
            table_name: Name of the Databricks table (e.g., 'default.incidents')
            
        Returns:
            Spark DataFrame with incident data
        """
        try:
            logger.info(f"Reading data from table: {table_name}")
            
            # Read from Databricks table
            df = self.spark.table(table_name)
            
            # Log data info
            row_count = df.count()
            col_count = len(df.columns)
            logger.info(f"Loaded {row_count} rows, {col_count} columns from {table_name}")
            
            return df
            
        except Exception as e:
            logger.error(f"Error reading from table {table_name}: {str(e)}")
            raise
    
    def clean_and_prepare_data(self, df: DataFrame) -> pd.DataFrame:
        """
        Clean and prepare incident data for ML processing
        
        Args:
            df: Raw Spark DataFrame
            
        Returns:
            Cleaned pandas DataFrame ready for ML
        """
        logger.info("Starting data cleaning and preparation")
        
        try:
            # Convert to pandas for easier processing
            pdf = df.toPandas()
            
            # Clean column names (remove spaces, special characters)
            pdf.columns = [col.strip().replace(' ', '_').replace('/', '_') for col in pdf.columns]
            
            # Parse dates
            date_columns = ['Created_Date', 'Last_Modified', 'Resolved_Date', 'SLA_Due']
            for col in date_columns:
                if col in pdf.columns:
                    pdf[col] = pd.to_datetime(pdf[col], errors='coerce')
            
            # Extract priority numbers
            if 'Priority' in pdf.columns:
                pdf['Priority_Num'] = pdf['Priority'].str.extract(r'(\d+)').astype(float)
                pdf['Priority_Num'] = pdf['Priority_Num'].fillna(3)  # Default to moderate
            
            # Clean asset names
            if 'CI' in pdf.columns:
                pdf['Asset_Name'] = pdf['CI'].fillna('Unknown').str.strip()
            
            # Clean locations
            if 'Location' in pdf.columns:
                pdf['Location_Clean'] = pdf['Location'].fillna('Unknown').str.strip()
            
            # Calculate resolution times (hours)
            if 'Created_Date' in pdf.columns and 'Resolved_Date' in pdf.columns:
                pdf['Resolution_Hours'] = (
                    pdf['Resolved_Date'] - pdf['Created_Date']
                ).dt.total_seconds() / 3600
                pdf['Resolution_Hours'] = pdf['Resolution_Hours'].fillna(0)
            
            # Create time-based features
            if 'Created_Date' in pdf.columns:
                pdf['Day_of_Week'] = pdf['Created_Date'].dt.dayofweek
                pdf['Hour_of_Day'] = pdf['Created_Date'].dt.hour
                pdf['Day_of_Month'] = pdf['Created_Date'].dt.day
                pdf['Is_Weekend'] = pdf['Day_of_Week'].isin([5, 6]).astype(int)
            
            # Create risk labels based on priority and resolution time
            pdf['Risk_Level'] = 'Low'
            if 'Priority_Num' in pdf.columns:
                pdf.loc[pdf['Priority_Num'] == 1, 'Risk_Level'] = 'High'
                pdf.loc[pdf['Priority_Num'] == 2, 'Risk_Level'] = 'High'
                pdf.loc[(pdf['Priority_Num'] == 3) & (pdf.get('Resolution_Hours', 0) > 24), 'Risk_Level'] = 'Medium'
            
            # Fill missing values
            categorical_columns = ['Category', 'Subcategory', 'Status', 'Assigned_Group', 'Incident_Type']
            for col in categorical_columns:
                if col in pdf.columns:
                    pdf[col] = pdf[col].fillna('Unknown')
            
            logger.info(f"Data cleaning completed. Shape: {pdf.shape}")
            logger.info(f"Risk level distribution:\n{pdf['Risk_Level'].value_counts()}")
            
            return pdf
            
        except Exception as e:
            logger.error(f"Error in data cleaning: {str(e)}")
            raise
    
    def create_features(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
        """
        Create features for Random Forest model
        
        Args:
            df: Cleaned pandas DataFrame
            
        Returns:
            Tuple of (feature DataFrame, list of feature column names)
        """
        logger.info("Creating features for Random Forest model")
        
        try:
            features_df = df.copy()
            
            # Asset-based features
            if 'Asset_Name' in df.columns:
                # Count incidents per asset
                asset_counts = df['Asset_Name'].value_counts()
                features_df['Asset_Incident_Count'] = features_df['Asset_Name'].map(asset_counts)
                
                # Encode asset names
                if 'Asset_Name' not in self.label_encoders:
                    self.label_encoders['Asset_Name'] = LabelEncoder()
                    features_df['Asset_Encoded'] = self.label_encoders['Asset_Name'].fit_transform(features_df['Asset_Name'])
                else:
                    # Handle new assets not seen during training
                    known_assets = set(self.label_encoders['Asset_Name'].classes_)
                    features_df['Asset_Name_Clean'] = features_df['Asset_Name'].apply(
                        lambda x: x if x in known_assets else 'Unknown'
                    )
                    features_df['Asset_Encoded'] = self.label_encoders['Asset_Name'].transform(features_df['Asset_Name_Clean'])
            
            # Categorical feature encoding
            categorical_features = ['Category', 'Subcategory', 'Status', 'Assigned_Group', 'Location_Clean', 'Incident_Type']
            
            for feature in categorical_features:
                if feature in df.columns:
                    if feature not in self.label_encoders:
                        self.label_encoders[feature] = LabelEncoder()
                        features_df[f'{feature}_Encoded'] = self.label_encoders[feature].fit_transform(features_df[feature])
                    else:
                        # Handle new categories
                        known_categories = set(self.label_encoders[feature].classes_)
                        features_df[f'{feature}_Clean'] = features_df[feature].apply(
                            lambda x: x if x in known_categories else 'Unknown'
                        )
                        features_df[f'{feature}_Encoded'] = self.label_encoders[feature].transform(features_df[f'{feature}_Clean'])
            
            # Select feature columns for ML
            feature_columns = [
                'Priority_Num', 'Day_of_Week', 'Hour_of_Day', 'Day_of_Month', 'Is_Weekend',
                'Asset_Incident_Count', 'Asset_Encoded'
            ]
            
            # Add encoded categorical features
            for feature in categorical_features:
                if feature in df.columns:
                    feature_columns.append(f'{feature}_Encoded')
            
            # Only keep columns that exist
            feature_columns = [col for col in feature_columns if col in features_df.columns]
            
            # Fill any remaining NaN values
            features_df[feature_columns] = features_df[feature_columns].fillna(0)
            
            self.feature_columns = feature_columns
            logger.info(f"Created {len(feature_columns)} features: {feature_columns}")
            
            return features_df, feature_columns
            
        except Exception as e:
            logger.error(f"Error creating features: {str(e)}")
            raise
    
    def train_random_forest_models(self, df: pd.DataFrame, feature_columns: List[str]) -> Dict:
        """
        Train Random Forest models for classification and regression
        
        Args:
            df: DataFrame with features
            feature_columns: List of feature column names
            
        Returns:
            Dictionary with training results and metrics including MSE and R²
        """
        logger.info("Training Random Forest models")
        
        try:
            X = df[feature_columns].values
            
            # Classification model (Risk Level)
            y_class = df['Risk_Level']
            le_risk = LabelEncoder()
            y_class_encoded = le_risk.fit_transform(y_class)
            
            X_train_class, X_test_class, y_train_class, y_test_class = train_test_split(
                X, y_class_encoded, test_size=0.2, random_state=42, stratify=y_class_encoded
            )
            
            # Train classification model
            self.rf_classifier = RandomForestClassifier(
                n_estimators=100,
                max_depth=10,
                min_samples_split=5,
                min_samples_leaf=2,
                random_state=42,
                class_weight='balanced'
            )
            
            self.rf_classifier.fit(X_train_class, y_train_class)
            y_pred_class = self.rf_classifier.predict(X_test_class)
            
            class_accuracy = accuracy_score(y_test_class, y_pred_class)
            
            # Regression model (Risk Score) - Enhanced for better R² calculation
            # Create more sophisticated risk scores
            df['Risk_Score'] = 0.0
            
            # Base score from priority (primary factor)
            priority_scores = {1: 3.0, 2: 2.0, 3: 1.0, 4: 0.5}
            df['Risk_Score'] += df['Priority_Num'].map(priority_scores).fillna(1.0)
            
            # Add incident volume factor (normalized)
            max_incidents = df['Asset_Incident_Count'].max() if 'Asset_Incident_Count' in df.columns else 1
            if max_incidents > 0:
                df['Risk_Score'] += (df.get('Asset_Incident_Count', 0) / max_incidents) * 1.5
            
            # Add resolution time factor
            if 'Resolution_Hours' in df.columns:
                # Normalize resolution hours (cap at 48 hours)
                normalized_hours = np.clip(df['Resolution_Hours'] / 48.0, 0, 1)
                df['Risk_Score'] += normalized_hours * 0.5
            
            # Add weekend/time factor
            if 'Is_Weekend' in df.columns:
                df['Risk_Score'] += df['Is_Weekend'] * 0.2  # Weekend incidents slightly riskier
            
            # Add category-based risk multiplier
            if 'Category' in df.columns:
                high_risk_categories = ['Security', 'Infrastructure', 'Network']
                df['Category_Risk_Multiplier'] = df['Category'].apply(
                    lambda x: 1.2 if x in high_risk_categories else 1.0
                )
                df['Risk_Score'] *= df['Category_Risk_Multiplier']
            
            y_reg = df['Risk_Score']
            X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
                X, y_reg, test_size=0.2, random_state=42
            )
            
            # Train regression model with optimized parameters
            self.rf_regressor = RandomForestRegressor(
                n_estimators=150,
                max_depth=12,
                min_samples_split=3,
                min_samples_leaf=1,
                max_features='sqrt',
                random_state=42,
                n_jobs=-1  # Use all CPU cores
            )
            
            self.rf_regressor.fit(X_train_reg, y_train_reg)
            y_pred_reg = self.rf_regressor.predict(X_test_reg)
            
            # Calculate comprehensive regression metrics
            reg_mse = mean_squared_error(y_test_reg, y_pred_reg)
            reg_rmse = np.sqrt(reg_mse)
            reg_mae = mean_absolute_error(y_test_reg, y_pred_reg)
            reg_r2 = r2_score(y_test_reg, y_pred_reg)
            reg_explained_var = explained_variance_score(y_test_reg, y_pred_reg)
            
            # Calculate additional metrics
            y_mean = np.mean(y_test_reg)
            reg_mape = np.mean(np.abs((y_test_reg - y_pred_reg) / y_test_reg)) * 100  # Mean Absolute Percentage Error
            
            # Feature importance
            feature_importance = dict(zip(feature_columns, self.rf_regressor.feature_importances_))
            self.feature_importance = dict(sorted(feature_importance.items(), key=lambda x: x[1], reverse=True))
            
            # Cross-validation score for more robust R²
            cv_r2_scores = cross_val_score(self.rf_regressor, X, y_reg, cv=5, scoring='r2')
            cv_r2_mean = np.mean(cv_r2_scores)
            cv_r2_std = np.std(cv_r2_scores)
            
            results = {
                # Classification metrics
                'classification_accuracy': class_accuracy,
                
                # Regression metrics
                'regression_mse': reg_mse,
                'regression_rmse': reg_rmse,
                'regression_mae': reg_mae,
                'regression_r2': reg_r2,
                'regression_explained_variance': reg_explained_var,
                'regression_mape': reg_mape,
                
                # Cross-validation metrics
                'cv_r2_mean': cv_r2_mean,
                'cv_r2_std': cv_r2_std,
                
                # Model details
                'feature_importance': self.feature_importance,
                'risk_label_encoder': le_risk,
                'n_samples': len(df),
                'n_features': len(feature_columns),
                'target_mean': y_mean,
                'target_std': np.std(y_reg)
            }
            
            # Log detailed metrics
            logger.info(f"Model training completed:")
            logger.info(f"- Classification accuracy: {class_accuracy:.4f}")
            logger.info(f"- Regression R²: {reg_r2:.4f} ({reg_r2*100:.2f}%)")
            logger.info(f"- Regression MSE: {reg_mse:.6f}")
            logger.info(f"- Regression RMSE: {reg_rmse:.4f}")
            logger.info(f"- Regression MAE: {reg_mae:.4f}")
            logger.info(f"- Cross-validation R² (mean±std): {cv_r2_mean:.4f}±{cv_r2_std:.4f}")
            logger.info(f"- Mean Absolute Percentage Error: {reg_mape:.2f}%")
            logger.info(f"- Explained Variance: {reg_explained_var:.4f}")
            logger.info(f"- Top 3 features: {list(self.feature_importance.keys())[:3]}")
            
            return results
            
        except Exception as e:
            logger.error(f"Error training models: {str(e)}")
            raise
    
    def make_predictions(self, df: pd.DataFrame, feature_columns: List[str]) -> pd.DataFrame:
        """
        Make predictions on new data
        
        Args:
            df: DataFrame with features
            feature_columns: List of feature column names
            
        Returns:
            DataFrame with predictions added
        """
        logger.info("Making predictions with Random Forest models")
        
        try:
            if self.rf_classifier is None or self.rf_regressor is None:
                raise ValueError("Models not trained yet. Call train_random_forest_models first.")
            
            X = df[feature_columns].values
            
            # Classification predictions
            risk_pred_encoded = self.rf_classifier.predict(X)
            risk_pred_proba = self.rf_classifier.predict_proba(X)
            
            # Get class labels
            risk_classes = self.rf_classifier.classes_
            
            # Convert back to labels
            df['Predicted_Risk_Level'] = risk_pred_encoded
            df['Risk_Level_Confidence'] = np.max(risk_pred_proba, axis=1)
            
            # Regression predictions
            df['Predicted_Risk_Score'] = self.rf_regressor.predict(X)
            
            # Add recommendations based on predictions
            df['Recommendation'] = df.apply(self._generate_recommendation, axis=1)
            
            # Add prediction timestamp
            df['Prediction_Timestamp'] = datetime.now()
            
            # Store predictions for metrics calculation
            self._last_predictions = df.copy()
            
            logger.info(f"Predictions completed for {len(df)} records")
            
            return df
            
        except Exception as e:
            logger.error(f"Error making predictions: {str(e)}")
            raise
    
    def _generate_recommendation(self, row) -> str:
        """Generate recommendation based on predictions"""
        risk_score = row.get('Predicted_Risk_Score', 0)
        risk_level = row.get('Predicted_Risk_Level', 0)
        
        if risk_score >= 2.0 or risk_level == 2:  # High risk
            return "Immediate monitoring and proactive maintenance required"
        elif risk_score >= 1.5 or risk_level == 1:  # Medium risk
            return "Enhanced monitoring and scheduled maintenance recommended"
        else:  # Low risk
            return "Standard monitoring sufficient"
    
    def save_predictions_to_databricks(self, df: pd.DataFrame, predictions_table: str, mode: str = "overwrite"):
        """
        Save predictions to Databricks table
        
        Args:
            df: DataFrame with predictions
            predictions_table: Target table name (e.g., 'default.incident_predictions')
            mode: Write mode ('overwrite', 'append')
        """
        logger.info(f"Saving predictions to Databricks table: {predictions_table}")
        
        try:
            # Select relevant columns for output
            output_columns = [
                'Asset_Name', 'Category', 'Priority', 'Location_Clean',
                'Predicted_Risk_Level', 'Predicted_Risk_Score', 'Risk_Level_Confidence',
                'Recommendation', 'Prediction_Timestamp'
            ]
            
            # Add original identifier columns if available
            id_columns = ['Asset_Number', 'Incident_Number']
            for col in id_columns:
                if col in df.columns:
                    output_columns.insert(0, col)
            
            # Filter to existing columns
            available_columns = [col for col in output_columns if col in df.columns]
            output_df = df[available_columns].copy()
            
            # Convert to Spark DataFrame
            spark_df = self.spark.createDataFrame(output_df)
            
            # Write to Databricks table
            spark_df.write.mode(mode).saveAsTable(predictions_table)
            
            logger.info(f"Successfully saved {len(output_df)} predictions to {predictions_table}")
            
            # Log sample of results
            logger.info("Sample predictions:")
            sample_df = output_df.head(5)
            for _, row in sample_df.iterrows():
                logger.info(f"Asset: {row.get('Asset_Name', 'N/A')}, "
                          f"Risk Score: {row.get('Predicted_Risk_Score', 0):.2f}, "
                          f"Level: {row.get('Predicted_Risk_Level', 'N/A')}")
            
        except Exception as e:
            logger.error(f"Error saving predictions to Databricks: {str(e)}")
            raise
    
    def save_model_metrics_to_databricks(self, training_results: Dict, metrics_table: str):
        """
        Save comprehensive model performance metrics to Databricks table
        
        Args:
            training_results: Dictionary containing model training results and metrics
            metrics_table: Target table name for metrics (e.g., 'default.model_performance_metrics')
        """
        logger.info(f"Saving model metrics to Databricks table: {metrics_table}")
        
        try:
            # Create comprehensive metrics record
            metrics_record = {
                'model_run_id': f"rf_incident_prediction_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
                'model_type': 'Random Forest',
                'model_version': '1.0',
                'training_timestamp': datetime.now(),
                'dataset_size': training_results.get('n_samples', 0),
                'feature_count': training_results.get('n_features', 0),
                
                # Classification Metrics
                'classification_accuracy': training_results.get('classification_accuracy', 0.0),
                
                # Regression Metrics - Primary
                'regression_r2': training_results.get('regression_r2', 0.0),
                'regression_mse': training_results.get('regression_mse', 0.0),
                'regression_rmse': training_results.get('regression_rmse', 0.0),
                'regression_mae': training_results.get('regression_mae', 0.0),
                'regression_mape': training_results.get('regression_mape', 0.0),
                'regression_explained_variance': training_results.get('regression_explained_variance', 0.0),
                
                # Cross-Validation Metrics
                'cv_r2_mean': training_results.get('cv_r2_mean', 0.0),
                'cv_r2_std': training_results.get('cv_r2_std', 0.0),
                'cv_r2_min': training_results.get('cv_r2_mean', 0.0) - training_results.get('cv_r2_std', 0.0),
                'cv_r2_max': training_results.get('cv_r2_mean', 0.0) + training_results.get('cv_r2_std', 0.0),
                
                # Target Variable Statistics
                'target_mean': training_results.get('target_mean', 0.0),
                'target_std': training_results.get('target_std', 0.0),
                
                # Model Performance Classification
                'performance_grade': self._classify_model_performance(training_results.get('regression_r2', 0.0)),
                'model_quality_score': self._calculate_quality_score(training_results),
                
                # Feature Importance (Top 5)
                'top_feature_1': list(training_results.get('feature_importance', {}).keys())[0] if training_results.get('feature_importance') else None,
                'top_feature_1_importance': list(training_results.get('feature_importance', {}).values())[0] if training_results.get('feature_importance') else 0.0,
                'top_feature_2': list(training_results.get('feature_importance', {}).keys())[1] if len(training_results.get('feature_importance', {})) > 1 else None,
                'top_feature_2_importance': list(training_results.get('feature_importance', {}).values())[1] if len(training_results.get('feature_importance', {})) > 1 else 0.0,
                'top_feature_3': list(training_results.get('feature_importance', {}).keys())[2] if len(training_results.get('feature_importance', {})) > 2 else None,
                'top_feature_3_importance': list(training_results.get('feature_importance', {}).values())[2] if len(training_results.get('feature_importance', {})) > 2 else 0.0,
                
                # Model Hyperparameters
                'n_estimators': 150,
                'max_depth': 12,
                'min_samples_split': 3,
                'min_samples_leaf': 1,
                'max_features': 'sqrt',
                
                # Business Metrics
                'high_risk_assets_predicted': self._count_high_risk_predictions() if hasattr(self, '_last_predictions') else 0,
                'model_confidence_avg': self._calculate_avg_confidence() if hasattr(self, '_last_predictions') else 0.0,
                
                # Data Quality Indicators
                'missing_data_percentage': 0.0,  # Will be calculated if needed
                'data_quality_score': self._assess_data_quality(training_results),
                
                # Comments and Notes
                'model_notes': f"Random Forest model trained on {training_results.get('n_samples', 0)} incidents with {training_results.get('n_features', 0)} features",
                'performance_summary': self._generate_performance_summary(training_results)
            }
            
            # Convert to DataFrame
            metrics_df = pd.DataFrame([metrics_record])
            
            # Convert to Spark DataFrame
            spark_df = self.spark.createDataFrame(metrics_df)
            
            # Create table if it doesn't exist
            self._create_metrics_table_if_not_exists(metrics_table)
            
            # Write to Databricks table
            spark_df.write.mode("append").saveAsTable(metrics_table)
            
            logger.info(f"Successfully saved model metrics to {metrics_table}")
            logger.info(f"Model Run ID: {metrics_record['model_run_id']}")
            logger.info(f"R²: {metrics_record['regression_r2']:.4f}, MSE: {metrics_record['regression_mse']:.6f}")
            logger.info(f"Performance Grade: {metrics_record['performance_grade']}")
            
        except Exception as e:
            logger.error(f"Error saving model metrics to Databricks: {str(e)}")
            raise
    
    def _classify_model_performance(self, r2_score: float) -> str:
        """Classify model performance based on R² score"""
        if r2_score >= 0.95:
            return "EXCELLENT"
        elif r2_score >= 0.90:
            return "VERY_GOOD"
        elif r2_score >= 0.80:
            return "GOOD"
        elif r2_score >= 0.70:
            return "ACCEPTABLE"
        elif r2_score >= 0.50:
            return "MODERATE"
        else:
            return "POOR"
    
    def _calculate_quality_score(self, training_results: Dict) -> float:
        """Calculate overall model quality score (0-100)"""
        try:
            r2 = training_results.get('regression_r2', 0.0)
            classification_acc = training_results.get('classification_accuracy', 0.0)
            cv_r2_mean = training_results.get('cv_r2_mean', 0.0)
            cv_r2_std = training_results.get('cv_r2_std', 1.0)
            
            # Weighted quality score
            quality_score = (
                r2 * 40 +                          # R² contributes 40%
                classification_acc * 30 +          # Classification accuracy 30%
                cv_r2_mean * 20 +                  # CV performance 20%
                (1 - min(cv_r2_std, 0.5)) * 10    # Low variance bonus 10%
            ) * 100
            
            return min(100.0, max(0.0, quality_score))
            
        except Exception:
            return 0.0
    
    def _count_high_risk_predictions(self) -> int:
        """Count number of high-risk asset predictions"""
        try:
            if hasattr(self, '_last_predictions') and 'Predicted_Risk_Level' in self._last_predictions.columns:
                return int((self._last_predictions['Predicted_Risk_Level'] == 2).sum())  # High risk = 2
            return 0
        except Exception:
            return 0
    
    def _calculate_avg_confidence(self) -> float:
        """Calculate average prediction confidence"""
        try:
            if hasattr(self, '_last_predictions') and 'Risk_Level_Confidence' in self._last_predictions.columns:
                return float(self._last_predictions['Risk_Level_Confidence'].mean())
            return 0.0
        except Exception:
            return 0.0
    
    def _assess_data_quality(self, training_results: Dict) -> float:
        """Assess data quality score (0-100)"""
        try:
            n_samples = training_results.get('n_samples', 0)
            n_features = training_results.get('n_features', 0)
            
            # Basic data quality assessment
            sample_score = min(100, (n_samples / 100) * 50)  # 50 points for adequate samples
            feature_score = min(50, n_features * 5)          # 5 points per feature, max 50
            
            return min(100.0, sample_score + feature_score)
            
        except Exception:
            return 50.0  # Default moderate score
    
    def _generate_performance_summary(self, training_results: Dict) -> str:
        """Generate human-readable performance summary"""
        try:
            r2 = training_results.get('regression_r2', 0.0)
            mse = training_results.get('regression_mse', 0.0)
            acc = training_results.get('classification_accuracy', 0.0)
            
            return f"R²={r2:.3f} MSE={mse:.4f} ClassAcc={acc:.3f} - Model explains {r2*100:.1f}% of variance"
            
        except Exception:
            return "Performance summary unavailable"
    
    def _create_metrics_table_if_not_exists(self, metrics_table: str):
        """Create metrics table if it doesn't exist"""
        try:
            # Check if table exists
            try:
                self.spark.table(metrics_table).limit(1).collect()
                logger.info(f"Metrics table {metrics_table} already exists")
            except Exception:
                logger.info(f"Creating new metrics table: {metrics_table}")
                
                # Create table with SQL DDL
                create_table_sql = f"""
                CREATE TABLE IF NOT EXISTS {metrics_table} (
                    model_run_id STRING,
                    model_type STRING,
                    model_version STRING,
                    training_timestamp TIMESTAMP,
                    dataset_size INT,
                    feature_count INT,
                    
                    classification_accuracy DOUBLE,
                    
                    regression_r2 DOUBLE,
                    regression_mse DOUBLE,
                    regression_rmse DOUBLE,
                    regression_mae DOUBLE,
                    regression_mape DOUBLE,
                    regression_explained_variance DOUBLE,
                    
                    cv_r2_mean DOUBLE,
                    cv_r2_std DOUBLE,
                    cv_r2_min DOUBLE,
                    cv_r2_max DOUBLE,
                    
                    target_mean DOUBLE,
                    target_std DOUBLE,
                    
                    performance_grade STRING,
                    model_quality_score DOUBLE,
                    
                    top_feature_1 STRING,
                    top_feature_1_importance DOUBLE,
                    top_feature_2 STRING,
                    top_feature_2_importance DOUBLE,
                    top_feature_3 STRING,
                    top_feature_3_importance DOUBLE,
                    
                    n_estimators INT,
                    max_depth INT,
                    min_samples_split INT,
                    min_samples_leaf INT,
                    max_features STRING,
                    
                    high_risk_assets_predicted INT,
                    model_confidence_avg DOUBLE,
                    
                    missing_data_percentage DOUBLE,
                    data_quality_score DOUBLE,
                    
                    model_notes STRING,
                    performance_summary STRING
                )
                USING DELTA
                PARTITIONED BY (DATE(training_timestamp))
                """
                
                self.spark.sql(create_table_sql)
                logger.info(f"Successfully created metrics table: {metrics_table}")
                
        except Exception as e:
            logger.error(f"Error creating metrics table: {str(e)}")
            # Continue anyway - table creation might still work with automatic schema
    
    def run_full_pipeline(self, source_table: str, predictions_table: str, metrics_table: str = None, retrain: bool = True):
        """
        Run the complete ML pipeline with metrics tracking
        
        Args:
            source_table: Source table with incident data
            predictions_table: Target table for predictions
            metrics_table: Target table for model performance metrics
            retrain: Whether to retrain models
        """
        logger.info("Starting full Random Forest prediction pipeline with metrics tracking")
        
        try:
            # Step 1: Read data
            raw_df = self.read_incident_data(source_table)
            
            # Step 2: Clean and prepare data
            clean_df = self.clean_and_prepare_data(raw_df)
            
            # Step 3: Create features
            features_df, feature_columns = self.create_features(clean_df)
            
            # Step 4: Train models (if needed)
            if retrain:
                training_results = self.train_random_forest_models(features_df, feature_columns)
                self._last_training_results = training_results  # Store for summary report
                logger.info(f"Training completed with R²: {training_results.get('regression_r2', 0):.4f}")
                
                # Step 4.1: Save model metrics to Databricks (if metrics table specified)
                if metrics_table:
                    self.save_model_metrics_to_databricks(training_results, metrics_table)
            
            # Step 5: Make predictions
            predictions_df = self.make_predictions(features_df, feature_columns)
            
            # Step 6: Save predictions to Databricks
            self.save_predictions_to_databricks(predictions_df, predictions_table)
            
            # Step 7: Generate summary report
            self._generate_summary_report(predictions_df)
            
            logger.info("Pipeline completed successfully!")
            
            # Return summary for external use
            return {
                'status': 'success',
                'predictions_table': predictions_table,
                'metrics_table': metrics_table,
                'records_processed': len(predictions_df),
                'assets_analyzed': predictions_df['Asset_Name'].nunique(),
                'model_performance': self._last_training_results if hasattr(self, '_last_training_results') else None
            }
            
        except Exception as e:
            logger.error(f"Pipeline failed: {str(e)}")
            raise
    
    def _generate_summary_report(self, df: pd.DataFrame):
        """Generate comprehensive summary report of predictions with model metrics"""
        logger.info("=== COMPREHENSIVE PREDICTION SUMMARY REPORT ===")
        
        try:
            total_assets = df['Asset_Name'].nunique()
            total_predictions = len(df)
            
            # Risk level distribution
            if 'Predicted_Risk_Level' in df.columns:
                risk_dist = df['Predicted_Risk_Level'].value_counts()
                logger.info(f"Risk Level Distribution: {dict(risk_dist)}")
            
            # Top risk assets
            if 'Predicted_Risk_Score' in df.columns:
                top_risk = df.nlargest(5, 'Predicted_Risk_Score')[['Asset_Name', 'Predicted_Risk_Score']]
                logger.info("Top 5 Risk Assets:")
                for _, row in top_risk.iterrows():
                    logger.info(f"  - {row['Asset_Name']}: {row['Predicted_Risk_Score']:.3f}")
                
                # Risk score statistics
                risk_stats = df['Predicted_Risk_Score'].describe()
                logger.info(f"Risk Score Statistics:")
                logger.info(f"  - Mean: {risk_stats['mean']:.3f}")
                logger.info(f"  - Std Dev: {risk_stats['std']:.3f}")
                logger.info(f"  - Min: {risk_stats['min']:.3f}")
                logger.info(f"  - Max: {risk_stats['max']:.3f}")
            
            # Feature importance
            if self.feature_importance:
                logger.info("Top 5 Feature Importance:")
                for feature, importance in list(self.feature_importance.items())[:5]:
                    logger.info(f"  - {feature}: {importance:.4f} ({importance*100:.2f}%)")
            
            # Model performance summary
            logger.info("=== MODEL PERFORMANCE METRICS ===")
            if hasattr(self, '_last_training_results'):
                results = self._last_training_results
                logger.info(f"R-squared (R²): {results.get('regression_r2', 0):.4f} ({results.get('regression_r2', 0)*100:.2f}%)")
                logger.info(f"Mean Squared Error (MSE): {results.get('regression_mse', 0):.6f}")
                logger.info(f"Root Mean Squared Error (RMSE): {results.get('regression_rmse', 0):.4f}")
                logger.info(f"Mean Absolute Error (MAE): {results.get('regression_mae', 0):.4f}")
                logger.info(f"Mean Absolute Percentage Error (MAPE): {results.get('regression_mape', 0):.2f}%")
                logger.info(f"Cross-validation R²: {results.get('cv_r2_mean', 0):.4f}±{results.get('cv_r2_std', 0):.4f}")
                logger.info(f"Classification Accuracy: {results.get('classification_accuracy', 0):.4f}")
                
                # Model interpretation
                r2 = results.get('regression_r2', 0)
                if r2 > 0.9:
                    logger.info("✅ EXCELLENT model performance (R² > 0.9)")
                elif r2 > 0.8:
                    logger.info("✅ VERY GOOD model performance (R² > 0.8)")  
                elif r2 > 0.7:
                    logger.info("✅ GOOD model performance (R² > 0.7)")
                elif r2 > 0.5:
                    logger.info("⚠️ MODERATE model performance (R² > 0.5)")
                else:
                    logger.info("❌ POOR model performance (R² < 0.5)")
                
                logger.info(f"The model explains {r2*100:.1f}% of the variance in risk scores")
            
            logger.info(f"\nTotal assets analyzed: {total_assets}")
            logger.info(f"Total predictions made: {total_predictions}")
            logger.info("=== END REPORT ===")
            
        except Exception as e:
            logger.error(f"Error generating summary: {str(e)}")

def main():
    """
    Main execution function with metrics tracking - FIXED VERSION
    """
    # Configuration
    SOURCE_TABLE = "sd_bdc_demo.default.service_now_only" # Replace with your table name
    PREDICTIONS_TABLE = "sd_bdc_demo.default.service_now_rf_predictions" # Replace with your predictions table
    METRICS_TABLE = "sd_bdc_demo.default.service_now_rf_model_performance_metrics"  # New metrics table
    
    try:
        # Initialize predictor
        predictor = DatabricksRandomForestPredictor()
        
        # Run full pipeline with metrics tracking
        results = predictor.run_full_pipeline(
            source_table=SOURCE_TABLE,
            predictions_table=PREDICTIONS_TABLE,  # Fixed: using correct parameter name
            metrics_table=METRICS_TABLE,  # Enable metrics tracking
            retrain=True
        )
        
        print("✅ Random Forest prediction pipeline completed successfully!")
        print(f"📊 Results: {results}")
        print(f"📈 Predictions saved to: {PREDICTIONS_TABLE}")
        print(f"📋 Model metrics saved to: {METRICS_TABLE}")
        
        # Query and display recent metrics
        try:
            recent_metrics = predictor.spark.sql(f"""
                SELECT model_run_id, training_timestamp, regression_r2, regression_mse, 
                       performance_grade, model_quality_score
                FROM {METRICS_TABLE}
                ORDER BY training_timestamp DESC
                LIMIT 1
            """).collect()
            
            if recent_metrics:
                metric = recent_metrics[0]
                print(f"\n🎯 Latest Model Performance:")
                print(f"   - Run ID: {metric['model_run_id']}")
                print(f"   - R²: {metric['regression_r2']:.4f}")
                print(f"   - MSE: {metric['regression_mse']:.6f}")
                print(f"   - Grade: {metric['performance_grade']}")
                print(f"   - Quality Score: {metric['model_quality_score']:.1f}/100")
                
        except Exception as e:
            print(f"⚠️ Could not retrieve metrics summary: {str(e)}")
        
    except Exception as e:
        print(f"❌ Pipeline failed: {str(e)}")
        raise

if __name__ == "__main__":
    main()

2025-06-13 12:35:10,149 - INFO - Databricks Random Forest Predictor initialized
2025-06-13 12:35:10,153 - INFO - Starting full Random Forest prediction pipeline with metrics tracking
2025-06-13 12:35:10,155 - INFO - Reading data from table: sd_bdc_demo.default.service_now_only
2025-06-13 12:35:22,575 - INFO - Loaded 111 rows, 20 columns from sd_bdc_demo.default.service_now_only
2025-06-13 12:35:22,578 - INFO - Starting data cleaning and preparation
2025-06-13 12:35:26,119 - INFO - Data cleaning completed. Shape: (111, 29)
2025-06-13 12:35:26,122 - INFO - Risk level distribution:
Low       67
High      37
Medium     7
Name: Risk_Level, dtype: int64
2025-06-13 12:35:26,123 - INFO - Creating features for Random Forest model
2025-06-13 12:35:26,150 - INFO - Created 13 features: ['Priority_Num', 'Day_of_Week', 'Hour_of_Day', 'Day_of_Month', 'Is_Weekend', 'Asset_Incident_Count', 'Asset_Encoded', 'Category_Encoded', 'Subcategory_Encoded', 'Status_Encoded', 'Assigned_Group_Encoded', 'Location_

✅ Random Forest prediction pipeline completed successfully!
📊 Results: {'status': 'success', 'predictions_table': 'sd_bdc_demo.default.service_now_rf_predictions', 'metrics_table': 'sd_bdc_demo.default.service_now_rf_model_performance_metrics', 'records_processed': 111, 'assets_analyzed': 59, 'model_performance': {'classification_accuracy': 0.9565217391304348, 'regression_mse': 0.19279825659837393, 'regression_rmse': 0.4390879827533133, 'regression_mae': 0.36331281395854054, 'regression_r2': 0.7829665982348297, 'regression_explained_variance': 0.7908802677139908, 'regression_mape': 23.443231313917273, 'cv_r2_mean': 0.589838944050805, 'cv_r2_std': 0.08237502073601397, 'feature_importance': {'Priority_Num': 0.35249947369671075, 'Category_Encoded': 0.10496577873483534, 'Asset_Incident_Count': 0.09059702541180577, 'Incident_Type_Encoded': 0.0674771491695897, 'Subcategory_Encoded': 0.06472263085590493, 'Asset_Encoded': 0.06268476546723169, 'Assigned_Group_Encoded': 0.058541818209804766, 'Lo