In [None]:
"""
Production-Ready One-Class SVM for Anomaly Detection with Visualizations

This notebook demonstrates a complete anomaly detection pipeline using One-Class SVM.
The pipeline includes:
    - Loading and preprocessing a feature-engineered equipment anomaly dataset.
    - Training a One-Class SVM model on normal data (non-faulty samples).
    - Hyperparameter tuning via grid search with a custom F1-score metric.
    - Evaluating the model and generating visualizations such as:
         • Confusion matrix heatmap.
         • Distribution of decision function scores.
         • PCA 2D scatter plot with the decision boundary.
    - Saving the model, predictions, and evaluation reports for production deployment.

File paths used:
    - Dataset: "C:\Users\Ken Ira Talingting\Desktop\anomaly-detection-project\data\processed\equipment_anomaly_data_feature_engineered.csv"
    - Output directory: "C:\Users\Ken Ira Talingting\Desktop\anomaly-detection-project\data\processed_results"
"""

# Import standard libraries for file operations, logging, and time handling
import os
import numpy as np
import pandas as pd
from datetime import datetime
import joblib  # For saving the model
import logging

# Import libraries for plotting and visualizations
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.decomposition import PCA

# Import libraries from scikit-learn for preprocessing, modeling, and evaluation
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.metrics import classification_report, confusion_matrix, f1_score, make_scorer
from sklearn.model_selection import GridSearchCV

# Configure logging to capture events for production-level monitoring
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


In [None]:
# Define file paths and directories for data input and result output
DATA_PATH = r"C:\Users\Ken Ira Talingting\Desktop\anomaly-detection-project\data\processed\equipment_anomaly_data_feature_engineered.csv"
RESULTS_DIR = r"C:\Users\Ken Ira Talingting\Desktop\anomaly-detection-project\data\processed_results"
MODEL_PATH = os.path.join(RESULTS_DIR, "one_class_svm_model.pkl")
PREDICTIONS_PATH = os.path.join(RESULTS_DIR, "predictions.csv")
REPORT_PATH = os.path.join(RESULTS_DIR, "evaluation_report.txt")
CM_PLOT_PATH = os.path.join(RESULTS_DIR, "confusion_matrix.png")
SCORE_DIST_PLOT_PATH = os.path.join(RESULTS_DIR, "decision_scores_distribution.png")
PCA_PLOT_PATH = os.path.join(RESULTS_DIR, "pca_decision_boundary.png")

# Create the results directory if it does not exist to ensure that outputs can be saved
os.makedirs(RESULTS_DIR, exist_ok=True)


In [None]:
def load_data(filepath: str) -> pd.DataFrame:
    """
    Load dataset from a CSV file located at the given filepath.
    
    Parameters:
        filepath (str): Path to the CSV file.
        
    Returns:
        pd.DataFrame: Loaded dataset.
    """
    logging.info(f"Loading data from {filepath}")
    df = pd.read_csv(filepath)
    return df

# Example usage:
# df = load_data(DATA_PATH)


In [None]:
def preprocess_data(df: pd.DataFrame):
    """
    Preprocess the input dataset:
      - Converts the 'faulty' column to integer type.
      - Separates the dataset into features (X) and labels (y).
      - Imputes missing values in features using the mean strategy.
      - Scales the features using StandardScaler.
      
    Note:
      - The One-Class SVM is trained exclusively on the normal class (faulty == 0).
    
    Parameters:
        df (pd.DataFrame): Input dataset.
        
    Returns:
        X_scaled (np.ndarray): Scaled feature matrix.
        y (pd.Series): Target labels.
        scaler (StandardScaler): Fitted scaler (for future transformations).
    """
    # Convert target variable 'faulty' to integer
    df['faulty'] = df['faulty'].astype(int)
    
    # Separate features and target variable
    feature_cols = [col for col in df.columns if col != 'faulty']
    X = df[feature_cols].copy()
    y = df['faulty'].copy()  # 0 for normal, 1 for anomaly

    # Impute missing values using the mean of each column
    imputer = SimpleImputer(strategy="mean")
    X_imputed = imputer.fit_transform(X)
    logging.info("Missing values have been imputed using the mean strategy.")

    # Scale features to standardize the data distribution
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_imputed)
    logging.info("Feature scaling completed using StandardScaler.")

    return X_scaled, y, scaler

# Example usage:
# X_scaled, y, scaler = preprocess_data(df)


In [None]:
def get_training_data(X, y):
    """
    Select the training data for One-Class SVM.
    
    For One-Class SVM, we train only on normal (non-faulty) samples.
    
    Parameters:
        X (np.ndarray): Feature matrix.
        y (pd.Series): Labels (0: normal, 1: anomaly).
        
    Returns:
        X_train (np.ndarray): Subset of X corresponding to normal samples.
    """
    X_train = X[y == 0]
    return X_train

# Example usage:
# X_train = get_training_data(X_scaled, y)


In [None]:
def custom_score(y_true, y_pred):
    """
    Custom scoring function to compute the F1 score.
    
    The One-Class SVM returns 1 for inliers (normal) and -1 for outliers (anomalies).
    This function remaps predictions to match the ground truth:
        1 -> 0 (normal)
       -1 -> 1 (anomaly)
    
    Parameters:
        y_true (np.ndarray or pd.Series): True labels.
        y_pred (np.ndarray): Predictions from One-Class SVM.
    
    Returns:
        float: F1 score.
    """
    # Map One-Class SVM predictions to match our labels
    y_pred_mapped = np.where(y_pred == 1, 0, 1)
    return f1_score(y_true, y_pred_mapped)

# Example usage:
# score = custom_score(true_labels, svm_predictions)


In [None]:
def tune_hyperparameters(X_train, X_full, y_full):
    """
    Perform hyperparameter tuning for One-Class SVM using GridSearchCV.
    
    Even though One-Class SVM is unsupervised, we use known labels for evaluation.
    The grid search considers different values for 'nu' and 'gamma'.
    
    Parameters:
        X_train (np.ndarray): Training data (normal samples only).
        X_full (np.ndarray): Entire feature matrix (used for evaluation).
        y_full (pd.Series): Full set of labels.
        
    Returns:
        best_model (OneClassSVM): Trained model with the best hyperparameters.
        best_params (dict): Best hyperparameters found during tuning.
    """
    # Define the hyperparameter grid
    param_grid = {
        'nu': [0.01, 0.05, 0.1, 0.2],
        'gamma': ['scale', 'auto']
    }

    # Create a wrapper class to make One-Class SVM compatible with GridSearchCV
    class OneClassSVMWrapper(OneClassSVM):
        def fit(self, X, y=None):
            return super().fit(X)
        def predict(self, X):
            return super().predict(X)

    # Use the custom scorer defined earlier
    scorer = make_scorer(custom_score, greater_is_better=True)
    
    # Configure GridSearchCV to tune the One-Class SVM hyperparameters
    grid_search = GridSearchCV(estimator=OneClassSVMWrapper(kernel='rbf'),
                               param_grid=param_grid,
                               scoring=scorer,
                               cv=3,
                               verbose=1,
                               n_jobs=-1)
    grid_search.fit(X_train, y=None)
    best_params = grid_search.best_params_
    logging.info(f"Best hyperparameters found: {best_params}")

    # Train the final model with the best parameters
    best_model = OneClassSVM(kernel='rbf', **best_params)
    best_model.fit(X_train)
    return best_model, best_params

# Example usage:
# best_model, best_params = tune_hyperparameters(X_train, X_scaled, y)


In [None]:
def evaluate_model(model, X, y):
    """
    Evaluate the trained One-Class SVM model using the full dataset.
    
    The function maps the predictions from {1, -1} to {0 (normal), 1 (anomaly)}
    and calculates evaluation metrics such as the confusion matrix and classification report.
    
    Parameters:
        model (OneClassSVM): Trained One-Class SVM model.
        X (np.ndarray): Feature matrix for evaluation.
        y (pd.Series): True labels.
        
    Returns:
        y_pred_mapped (np.ndarray): Remapped predictions.
        cm (np.ndarray): Confusion matrix.
        report (str): Detailed classification report.
    """
    # Generate predictions using the model
    y_pred = model.predict(X)
    
    # Map predictions to the expected label format
    y_pred_mapped = np.where(y_pred == 1, 0, 1)
    
    # Compute the confusion matrix and classification report
    cm = confusion_matrix(y, y_pred_mapped)
    report = classification_report(y, y_pred_mapped)
    logging.info("Model evaluation completed.")
    return y_pred_mapped, cm, report

# Example usage:
# y_pred, cm, report = evaluate_model(best_model, X_scaled, y)


In [None]:
def plot_confusion_matrix(cm):
    """
    Plot the confusion matrix as a heatmap and save the figure.
    
    Parameters:
        cm (np.ndarray): Confusion matrix.
    """
    plt.figure(figsize=(6, 4))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
    plt.title("Confusion Matrix")
    plt.xlabel("Predicted Label")
    plt.ylabel("Actual Label")
    plt.tight_layout()
    plt.savefig(CM_PLOT_PATH)
    plt.close()
    logging.info(f"Confusion matrix plot saved to {CM_PLOT_PATH}")

# Example usage:
# plot_confusion_matrix(cm)


In [None]:
def plot_decision_score_distribution(model, X, y):
    """
    Plot the distribution of decision function scores for normal and anomalous samples.
    
    The decision function provides a confidence measure of the predictions.
    
    Parameters:
        model (OneClassSVM): Trained model.
        X (np.ndarray): Feature matrix.
        y (pd.Series): True labels.
    """
    # Compute decision scores for each sample
    scores = model.decision_function(X)
    
    # Map numeric labels to categorical for visualization
    labels = np.where(y == 1, 'Anomaly', 'Normal')
    
    # Create a DataFrame for plotting
    df_scores = pd.DataFrame({"Score": scores, "Label": labels})
    
    plt.figure(figsize=(8, 5))
    sns.histplot(data=df_scores, x="Score", bins=30, kde=True, hue="Label", palette="viridis")
    plt.title("Distribution of Decision Function Scores")
    plt.xlabel("Decision Score")
    plt.ylabel("Frequency")
    plt.tight_layout()
    plt.savefig(SCORE_DIST_PLOT_PATH)
    plt.close()
    logging.info(f"Decision function score distribution plot saved to {SCORE_DIST_PLOT_PATH}")

# Example usage:
# plot_decision_score_distribution(best_model, X_scaled, y)


In [None]:
def plot_pca_decision_boundary(model, X, y, scaler):
    """
    Use PCA to project the high-dimensional data into 2D and visualize the decision boundary.
    
    Parameters:
        model (OneClassSVM): Trained model.
        X (np.ndarray): Feature matrix.
        y (pd.Series): True labels.
        scaler (StandardScaler): Fitted scaler (if needed for inverse transformation).
    """
    # Reduce the dimensionality to 2 principal components
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X)
    
    # Define the mesh grid for plotting the decision boundary
    x_min, x_max = X_pca[:, 0].min() - 1, X_pca[:, 0].max() + 1
    y_min, y_max = X_pca[:, 1].min() - 1, X_pca[:, 1].max() + 1
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
                         np.linspace(y_min, y_max, 200))
    
    # Convert grid points back to original feature space using inverse PCA transform
    grid = np.c_[xx.ravel(), yy.ravel()]
    grid_original = pca.inverse_transform(grid)
    
    # Evaluate the decision function over the grid
    Z = model.decision_function(grid_original)
    Z = Z.reshape(xx.shape)
    
    plt.figure(figsize=(8, 6))
    # Plot the decision function using contour plot
    plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.PuBu)
    plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='darkred')
    
    # Overlay the original data points in the PCA space
    plt.scatter(X_pca[y==0, 0], X_pca[y==0, 1], c='blue', s=20, label='Normal')
    plt.scatter(X_pca[y==1, 0], X_pca[y==1, 1], c='orange', s=20, label='Anomaly')
    plt.legend()
    plt.title("PCA Projection with One-Class SVM Decision Boundary")
    plt.xlabel("Principal Component 1")
    plt.ylabel("Principal Component 2")
    plt.tight_layout()
    plt.savefig(PCA_PLOT_PATH)
    plt.close()
    logging.info(f"PCA decision boundary plot saved to {PCA_PLOT_PATH}")

# Example usage:
# plot_pca_decision_boundary(best_model, X_scaled, y, scaler)


In [None]:
def save_results(y_pred, cm, report, scaler, model, best_params):
    """
    Save the trained model, predictions, and evaluation report to disk.
    
    Parameters:
        y_pred (np.ndarray): Final predictions (mapped to {0,1}).
        cm (np.ndarray): Confusion matrix.
        report (str): Detailed classification report.
        scaler (StandardScaler): Fitted scaler for data preprocessing.
        model (OneClassSVM): Trained One-Class SVM model.
        best_params (dict): Best hyperparameters obtained from grid search.
    """
    # Bundle model components for future production use
    model_bundle = {'model': model, 'scaler': scaler, 'best_params': best_params}
    joblib.dump(model_bundle, MODEL_PATH)
    logging.info(f"Model bundle saved to {MODEL_PATH}")

    # Save predictions to a CSV file
    pred_df = pd.DataFrame({'prediction': y_pred})
    pred_df.to_csv(PREDICTIONS_PATH, index=False)
    logging.info(f"Predictions saved to {PREDICTIONS_PATH}")

    # Save the evaluation report (including confusion matrix and classification report) as a text file
    with open(REPORT_PATH, 'w') as f:
        f.write("Evaluation Report\n")
        f.write(f"Generated on: {datetime.now()}\n\n")
        f.write("Confusion Matrix:\n")
        f.write(np.array2string(cm))
        f.write("\n\nClassification Report:\n")
        f.write(report)
        f.write("\n\nBest Hyperparameters:\n")
        f.write(str(best_params))
    logging.info(f"Evaluation report saved to {REPORT_PATH}")

# Example usage:
# save_results(y_pred, cm, report, scaler, best_model, best_params)


In [None]:
def main():
    """
    Main function to execute the entire One-Class SVM anomaly detection pipeline.
    
    Steps:
      1. Load and preprocess the data.
      2. Extract training samples (normal data only) for One-Class SVM.
      3. Tune hyperparameters and train the One-Class SVM.
      4. Evaluate the model on the full dataset.
      5. Save the model, predictions, and evaluation results.
      6. Generate and save visualizations.
    """
    # Step 1: Load and preprocess data
    df = load_data(DATA_PATH)
    X_scaled, y, scaler = preprocess_data(df)
    X_train = get_training_data(X_scaled, y)
    
    # Step 2: Hyperparameter tuning and model training
    best_model, best_params = tune_hyperparameters(X_train, X_scaled, y)
    
    # Step 3: Evaluate the model using the full dataset
    y_pred, cm, report = evaluate_model(best_model, X_scaled, y)
    
    # Step 4: Save the model, predictions, and evaluation report
    save_results(y_pred, cm, report, scaler, best_model, best_params)
    
    # Step 5: Generate and save visualizations for performance insights
    plot_confusion_matrix(cm)
    plot_decision_score_distribution(best_model, X_scaled, y)
    plot_pca_decision_boundary(best_model, X_scaled, y, scaler)
    
    logging.info("One-Class SVM pipeline executed successfully.")

# Run the pipeline when this cell is executed as a script
if __name__ == "__main__":
    main()
