In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV, TimeSeriesSplit
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.metrics import (accuracy_score, classification_report, confusion_matrix,
                             precision_score, recall_score, f1_score, roc_auc_score,
                             roc_curve, precision_recall_curve)
from imblearn.over_sampling import SMOTE
from sklearn.feature_selection import SelectFromModel
import joblib
import warnings
warnings.filterwarnings('ignore')

class KubernetesFailurePrediction:
    def __init__(self, data_paths, log_path="prediction_logs.txt"):
        """
        Initialize the Kubernetes Failure Prediction model.

        Parameters:
        data_paths (dict): Dictionary containing paths to the data files
        log_path (str): Path to save logs
        """
        self.data_paths = data_paths
        self.log_path = log_path
        self.model = None
        self.scaler = None
        self.feature_names = None
        self.feature_importances = None
        self.label_mapping = {"benign": 0, "normal": 0, "operational": 0,
                             "malicious": 1, "failure": 1, "disruption": 1,
                             "resource_exhaustion": 2, "network_issue": 3}

        # Open log file
        self.log_file = open(self.log_path, 'w')
        self.log("Kubernetes Failure Prediction Model initialized")

    def log(self, message):
        """Log a message to both console and log file"""
        print(message)
        self.log_file.write(message + "\n")
        self.log_file.flush()

    def load_data(self):
        """Load and combine datasets from provided paths"""
        self.log("Loading datasets...")

        dataframes = []
        for name, path in self.data_paths.items():
            try:
                df = pd.read_csv(path, on_bad_lines='skip', low_memory=False)
                self.log(f"Loaded {name}: {df.shape[0]} rows, {df.shape[1]} columns")
                dataframes.append(df)
            except Exception as e:
                self.log(f"Error loading {name}: {str(e)}")

        # Combine all dataframes
        self.df_combined = pd.concat(dataframes, ignore_index=True)
        self.log(f"Combined dataset shape: {self.df_combined.shape}")

        return self.df_combined

    def preprocess_data(self):
        """Clean and preprocess the combined dataset"""
        self.log("Preprocessing data...")

        df = self.df_combined

        # Clean up column names
        df.columns = df.columns.str.strip().str.lower()

        # Drop unnecessary columns
        cols_to_drop = ["_source_flow_id", "_source_flow_final", "_source_source_ip", "_source_destination_ip"]
        df.drop(columns=cols_to_drop, inplace=True, errors="ignore")

        # Handle labels - map to Kubernetes failure types
        if "label" in df.columns:
            df.rename(columns={"label": "failure_label"}, inplace=True)

        # Map labels to Kubernetes failure categories
        if "failure_label" in df.columns:
            df["failure_label"] = df["failure_label"].map(lambda x: self.label_mapping.get(str(x).lower(), 0))
        else:
            self.log("Warning: 'failure_label' column not found. Creating synthetic labels for demonstration.")
            df["failure_label"] = 0  # Default to normal

            # Create synthetic labels based on network patterns
            if "_source_network_bytes" in df.columns:
                network_threshold = df["_source_network_bytes"].quantile(0.95)
                df.loc[df["_source_network_bytes"] > network_threshold, "failure_label"] = 3  # Network issue

            if "_source_event_duration" in df.columns:
                duration_threshold = df["_source_event_duration"].quantile(0.9)
                df.loc[df["_source_event_duration"] > duration_threshold, "failure_label"] = 2  # Resource exhaustion

            if "_source_network_transport" in df.columns:
                df.loc[df["_source_network_transport"].astype(str) == "icmp", "failure_label"] = 1  # Pod failure

        # Ensure failure_label is numeric
        df["failure_label"] = pd.to_numeric(df["failure_label"], errors="coerce")

        # Handle missing values
        df.fillna(0, inplace=True)

        # Convert categorical features
        categorical_cols = df.select_dtypes(include=['object']).columns
        encoder = LabelEncoder()

        for col in categorical_cols:
            df[col] = df[col].astype(str)
            df[col] = encoder.fit_transform(df[col])

        # Convert numeric features
        numeric_cols = df.select_dtypes(include=['float64', 'int64']).columns
        for col in numeric_cols:
            df[col] = pd.to_numeric(df[col], errors="coerce")
            df[col].fillna(0, inplace=True)

        # Create new features for Kubernetes-specific metrics
        if "_source_event_duration" in df.columns:
            df["simulated_cpu_usage"] = df["_source_event_duration"] / df["_source_event_duration"].max() * 100

        if "_source_network_bytes" in df.columns:
            df["simulated_memory_usage"] = df["_source_network_bytes"] / df["_source_network_bytes"].max() * 100

        # Simulate disk usage
        df["simulated_disk_usage"] = np.random.uniform(10, 95, size=df.shape[0])

        # Simulate pod restart count
        df["simulated_pod_restarts"] = np.random.randint(0, 5, size=df.shape[0])
        df.loc[df["failure_label"] == 1, "simulated_pod_restarts"] = np.random.randint(2, 10,
                                                                                       size=df[df["failure_label"] == 1].shape[0])

        self.log(f"Data preprocessing completed. Final shape: {df.shape}")
        self.preprocessed_data = df
        return df

    def select_features(self):
        """Select relevant features for the model"""
        self.log("Selecting features...")

        df = self.preprocessed_data

        # Define base features
        base_features = [
            "_source_network_bytes",
            "_source_event_duration",
            "_source_network_transport"
        ]

        # Add simulated Kubernetes metrics
        k8s_features = [
            "simulated_cpu_usage",
            "simulated_memory_usage",
            "simulated_pod_restarts"  # Removed 'simulated_disk_usage' as it has low importance
        ]

        # Combine all features
        all_features = []
        for feature in base_features + k8s_features:
            if feature in df.columns:
                all_features.append(feature)

        self.log(f"Selected features: {all_features}")
        self.feature_names = all_features

        # Create feature matrix and target vector
        X = df[all_features]
        y = df["failure_label"]

        # Scale features
        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X)

        self.log(f"Feature matrix shape: {X.shape}, Target vector shape: {y.shape}")

        return X_scaled, y

    def train_test_chronological_split(self, X, y, test_size=0.2):
        """Split data chronologically for time series data"""
        self.log(f"Splitting data chronologically with test_size={test_size}")

        # For time series data, we split chronologically
        split_idx = int(len(X) * (1 - test_size))
        X_train, X_test = X[:split_idx], X[split_idx:]
        y_train, y_test = y[:split_idx], y[split_idx:]

        self.log(f"Training set: {X_train.shape[0]} samples, Test set: {X_test.shape[0]} samples")

        return X_train, X_test, y_train, y_test

    def handle_imbalanced_data(self, X_train, y_train):
        """Apply SMOTE to handle class imbalance"""
        self.log("Applying SMOTE to handle class imbalance...")

        # Get class distribution before SMOTE
        class_dist_before = pd.Series(y_train).value_counts(normalize=True)
        self.log(f"Class distribution before SMOTE: {class_dist_before.to_dict()}")

        # Apply SMOTE
        smote = SMOTE(random_state=42)
        X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)

        # Get class distribution after SMOTE
        class_dist_after = pd.Series(y_train_balanced).value_counts(normalize=True)
        self.log(f"Class distribution after SMOTE: {class_dist_after.to_dict()}")

        return X_train_balanced, y_train_balanced

    def train_model(self, X_train, y_train):
        """Train the model with XGBoost"""
        self.log("Training XGBoost model...")

        # Initialize and train the model
        model = XGBClassifier(
            n_estimators=100,
            max_depth=5,
            learning_rate=0.1,
            scale_pos_weight=10,  # Adjust for class imbalance
            random_state=42
        )

        # Train the model
        model.fit(X_train, y_train)

        self.model = model
        self.log("Model training completed")

        # Extract feature importances
        self.feature_importances = pd.DataFrame({
            'Feature': self.feature_names,
            'Importance': model.feature_importances_
        }).sort_values('Importance', ascending=False)

        self.log("\nFeature Importance:")
        self.log(self.feature_importances.to_string())

        return model

    def evaluate_model(self, X_test, y_test, threshold=0.3):
        """Evaluate the trained model on test data"""
        self.log("Evaluating model...")

        # Make predictions with adjusted threshold
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]
        y_pred = (y_pred_proba > threshold).astype(int)

        # Calculate metrics
        accuracy = accuracy_score(y_test, y_pred)
        precision = precision_score(y_test, y_pred, average='weighted')
        recall = recall_score(y_test, y_pred, average='weighted')
        f1 = f1_score(y_test, y_pred, average='weighted')

        # Print metrics
        self.log(f"\nModel Evaluation Metrics:")
        self.log(f"Accuracy: {accuracy:.4f}")
        self.log(f"Precision: {precision:.4f}")
        self.log(f"Recall: {recall:.4f}")
        self.log(f"F1-Score: {f1:.4f}")

        # Print classification report
        class_report = classification_report(y_test, y_pred)
        self.log("\nClassification Report:")
        self.log(class_report)

        # Print confusion matrix
        cm = confusion_matrix(y_test, y_pred)
        self.log("\nConfusion Matrix:")
        self.log(str(cm))

        # Plot confusion matrix
        self.plot_confusion_matrix(y_test, y_pred)

        # Plot ROC curve
        self.plot_roc_curve(y_test, y_pred_proba)

        # Return evaluation metrics as a dictionary
        eval_metrics = {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'classification_report': class_report,
            'confusion_matrix': cm
        }

        return eval_metrics

    def plot_confusion_matrix(self, y_test, y_pred, output_path="confusion_matrix.png"):
        """Plot confusion matrix and save to file"""
        self.log("Plotting confusion matrix...")

        plt.figure(figsize=(8, 6))
        cm = confusion_matrix(y_test, y_pred)
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                   xticklabels=['Normal', 'Pod Failure', 'Resource Exhaustion', 'Network Issue'],
                   yticklabels=['Normal', 'Pod Failure', 'Resource Exhaustion', 'Network Issue'])
        plt.xlabel('Predicted Label')
        plt.ylabel('True Label')
        plt.title('Confusion Matrix for Kubernetes Failure Prediction')
        plt.tight_layout()
        plt.savefig(output_path)
        plt.close()

        self.log(f"Confusion matrix plot saved to {output_path}")

    def plot_roc_curve(self, y_test, y_pred_proba, output_path="roc_curve.png"):
        """Plot ROC curve for binary classification and save to file"""
        if len(np.unique(y_test)) == 2:
            self.log("Plotting ROC curve...")

            fpr, tpr, _ = roc_curve(y_test, y_pred_proba)
            roc_auc = roc_auc_score(y_test, y_pred_proba)

            plt.figure(figsize=(8, 6))
            plt.plot(fpr, tpr, color='darkorange', lw=2,
                    label=f'ROC curve (area = {roc_auc:.2f})')
            plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title('ROC Curve for Kubernetes Failure Prediction')
            plt.legend(loc="lower right")
            plt.savefig(output_path)
            plt.close()

            self.log(f"ROC curve plot saved to {output_path}")

    def save_model(self, model_path="kubernetes_failure_prediction_model.joblib"):
        """Save the trained model to disk"""
        self.log(f"Saving model to {model_path}...")

        model_data = {
            'model': self.model,
            'scaler': self.scaler,
            'feature_names': self.feature_names,
            'feature_importances': self.feature_importances,
            'label_mapping': self.label_mapping
        }

        joblib.dump(model_data, model_path)
        self.log("Model saved successfully")

    def close(self):
        """Close the log file"""
        if hasattr(self, 'log_file') and self.log_file:
            self.log("Closing log file")
            self.log_file.close()


# Example usage
if __name__ == "__main__":
    # Define paths to data files
    data_paths = {
        "feb2022": "elastic_february2022_data.csv",
        "may2021_benign": "elastic_may2021_benign_data.csv",
        "may2021_malicious": "elastic_may2021_malicious_data.csv",
        "may2022": "elastic_may2022_data.csv"
    }

    # Initialize the predictor
    predictor = KubernetesFailurePrediction(data_paths)

    # Run the complete pipeline
    try:
        # Load and preprocess data
        predictor.load_data()
        predictor.preprocess_data()

        # Select features
        X, y = predictor.select_features()

        # Split data
        X_train, X_test, y_train, y_test = predictor.train_test_chronological_split(X, y)

        # Handle imbalanced data
        X_train_balanced, y_train_balanced = predictor.handle_imbalanced_data(X_train, y_train)

        # Train model
        predictor.train_model(X_train_balanced, y_train_balanced)

        # Evaluate model with adjusted threshold
        eval_metrics = predictor.evaluate_model(X_test, y_test, threshold=0.3)

        # Save model
        predictor.save_model()

        print("\nPipeline completed successfully!")
        print(f"Model accuracy: {eval_metrics['accuracy']:.4f}")
        print(f"Model F1-score: {eval_metrics['f1']:.4f}")
    except Exception as e:
        print(f"Error during pipeline execution: {str(e)}")
    finally:
        predictor.close()

Kubernetes Failure Prediction Model initialized
Loading datasets...
Loaded feb2022: 398414 rows, 11 columns
Loaded may2021_benign: 1777238 rows, 11 columns
Loaded may2021_malicious: 1954212 rows, 11 columns
Loaded may2022: 961046 rows, 1 columns
Combined dataset shape: (5090910, 12)
Preprocessing data...
Data preprocessing completed. Final shape: (5090910, 12)
Selecting features...
Selected features: ['_source_network_bytes', '_source_event_duration', '_source_network_transport', 'simulated_cpu_usage', 'simulated_memory_usage', 'simulated_pod_restarts']
Feature matrix shape: (5090910, 6), Target vector shape: (5090910,)
Splitting data chronologically with test_size=0.2
Training set: 4072728 samples, Test set: 1018182 samples
Applying SMOTE to handle class imbalance...
Class distribution before SMOTE: {0: 0.9485966163220328, 1: 0.0514033836779672}
Class distribution after SMOTE: {0: 0.5, 1: 0.5}
Training XGBoost model...
Model training completed

Feature Importance:
                    

In [20]:

import random
import pandas as pd

# Reverse label mapping for better readability
reverse_label_mapping = {v: k for k, v in {
    "benign": 0, "normal": 0, "operational": 0,
    "malicious": 1, "failure": 1, "disruption": 1,
    "resource_exhaustion": 2, "network_issue": 3
}.items()}

def stratified_sample_predictions(predictor, X_test, y_test, samples_per_class=3):
    """Ensure that test samples cover all label categories."""

    # Convert test data to DataFrame
    test_df = pd.DataFrame(X_test, columns=predictor.feature_names)
    test_df["actual_label"] = y_test.values

    # Create a new column with actual category names
    test_df["actual_category"] = test_df["actual_label"].map(reverse_label_mapping)

    # Collect samples from each class
    sampled_data = test_df.groupby("actual_label").apply(
        lambda x: x.sample(min(samples_per_class, len(x)), random_state=42)
    ).reset_index(drop=True)

    # Get predictions
    sampled_features = sampled_data[predictor.feature_names]
    sampled_preds = predictor.model.predict(sampled_features)  # Predict class labels

    # Add predictions to the DataFrame
    sampled_data["predicted_label"] = sampled_preds
    sampled_data["predicted_category"] = sampled_data["predicted_label"].map(reverse_label_mapping)

    print("\nStratified Sampled Predictions (Actual vs Predicted):")
    print(sampled_data[["actual_category", "predicted_category"]])

    return sampled_data

# Run sampling function with at least 3 samples per class
sampled_results = stratified_sample_predictions(predictor, X_test, y_test, samples_per_class=3)



Stratified Sampled Predictions (Actual vs Predicted):
  actual_category predicted_category
0     operational        operational
1     operational        operational
2     operational        operational
3      disruption         disruption
4      disruption         disruption
5      disruption         disruption
