#### Import libraries

In [112]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import TensorDataset
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
import pandas as pd
import os
import numpy as np
import datetime
import scipy
import seaborn as sns
import sys
from collections import Counter
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

from sklearn.utils import shuffle
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTETomek, SMOTEENN
from sklearn.utils import resample
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, OneHotEncoder, OrdinalEncoder

from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import multilabel_confusion_matrix
from sklearn.metrics import f1_score, accuracy_score

from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
import joblib
from sklearn.utils.class_weight import compute_class_weight
from skorch import NeuralNetClassifier
from skorch.callbacks import EarlyStopping, LRScheduler, Checkpoint
from skorch.helper import predefined_split
from skorch.dataset import Dataset
from skorch.callbacks import EpochScoring
from sklearn.metrics import RocCurveDisplay
from itertools import cycle
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import auc, roc_curve

%load_ext autotime

time: 16 ms (started: 2024-04-03 10:53:05 +01:00)


In [None]:
# Set random seed
np.random.seed(42)
torch.manual_seed(42)

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    
else:
    device = torch.device("cpu")
print(device)

#### Functions and Tools

In [None]:
def get_timestamp():
    """
        Get current timestamp
    """

    return datetime.datetime.now().strftime("%Y%m%dT%H%M%S")

In [116]:
def _import_data(path, validation_size=None):
    """
        Import source data
    """

    # Read source files
    df = pd.read_csv(f'source/mitbih_{path}.csv', header=None)

    # Extract data, and labels
    X = df.iloc[:, :-1].values
    y = df.iloc[:, -1].values.astype('int64')

    # Split into validation set, if needed
    if validation_size:
        X1, X2, y1, y2 = train_test_split(X, y, test_size=validation_size, random_state=42)
        
        return X1, y1, X2, y2
        
    else:
        return X, y

time: 0 ns (started: 2024-04-03 10:53:05 +01:00)


In [117]:
def _gaussian_noise(X_train):
    """
        Add noise to dataset
    """

    noise = np.random.normal(loc=0, scale=0.03, size=X_train.shape)

    return X_train + noise

time: 16 ms (started: 2024-04-03 10:53:06 +01:00)


In [118]:
def _balancing(X, y, num_sample):
    """
        Balancing data with specific number of records
    """

    # Get records count
    label, count = np.unique(y, return_counts=True)
    
    X_balanced = []
    y_balanced = []
    
    for lbl, cnt in zip(label, count):
        X_filter = X[y==lbl]
        y_filter = y[y==lbl]

        # Downsampling if data exceeds desire number
        if cnt > num_sample:
            X_filter, y_filter = resample(X_filter, y_filter, 
                                          replace=False,
                                          n_samples=num_sample,
                                          random_state=42)

        # Otherwise, upsampling with bootstrap
        elif cnt < num_sample:
            X_filter, y_filter = resample(X_filter, y_filter, 
                                          replace=True,
                                          n_samples=num_sample,
                                          random_state=42)
        X_balanced.append(X_filter)   
        y_balanced.append(y_filter)
        
    X_balanced = np.concatenate(X_balanced, axis=0)
    y_balanced = np.concatenate(y_balanced, axis=0)
    
    return X_balanced, y_balanced

time: 0 ns (started: 2024-04-03 10:53:06 +01:00)


In [119]:
def _get_report(y_true, y_pred):
    """
        Generate classification report
    """

    report = classification_report(y_true, y_pred)
                          
    print(report)

time: 0 ns (started: 2024-04-03 10:53:06 +01:00)


In [2]:
def _roc_curve(y_true, y_pred):
    """
    Generate ROC curve
    Code for generating ROC curve obtained from documentation : 
    https://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html
    """

    # Convert true labels to one-hot encoding
    y_true = LabelBinarizer().fit_transform(y_true)

    # Define the number of classes
    n_classes = 5
    class_labels = {0: 'N', 1: 'S', 2: 'V', 3: 'F', 4: 'Q'}

    # Define colors for each class
    colors = cycle(["aqua", "darkorange", "cornflowerblue", "olive", "maroon"])

    # Initialize dictionaries to store fpr, tpr, and roc_auc for each class
    fpr, tpr, roc_auc = dict(), dict(), dict()

    # Compute micro-average ROC
    fpr["micro"], tpr["micro"], _ = roc_curve(y_true.ravel(), y_pred.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])

    # Compute macro-average ROC 
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_true[:, i], y_pred[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    fpr_grid = np.linspace(0.0, 1.0, 1000)
    
    # Interpolate all ROC curves at these points
    mean_tpr = np.zeros_like(fpr_grid)
    for i in range(n_classes):
        mean_tpr += np.interp(fpr_grid, fpr[i], tpr[i])  # linear interpolation

    # Average interpolated TPRs and compute macro AUC
    mean_tpr /= n_classes
    
    fpr["macro"] = fpr_grid
    tpr["macro"] = mean_tpr
    roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

    fig, ax = plt.subplots()

    # Set the figure size
    fig.set_size_inches(8, 6)

    # Plot micro-average ROC curve
    plt.plot(
        fpr["micro"],
        tpr["micro"],
        label=f"micro-average ROC curve (AUC = {roc_auc['micro']:.2f})",
        color="deeppink",
        linestyle=":",
        linewidth=4,
    )

    # Plot macro-average ROC curve
    plt.plot(
        fpr["macro"],
        tpr["macro"],
        label=f"macro-average ROC curve (AUC = {roc_auc['macro']:.2f})",
        color="navy",
        linestyle=":",
        linewidth=4,
    )

    # Plot individual ROC curves for each class
    for class_id, color in zip(range(n_classes), colors):
        RocCurveDisplay.from_predictions(
            y_true[:, class_id],
            y_pred[:, class_id],
            name=f"ROC curve for Class {class_labels[class_id]}",
            color=color,
            ax=ax,
        )

    # Set plot labels and title
    ax.set(
        xlabel="False Positive Rate",
        ylabel="True Positive Rate",
        title=" Receiver Operation Curve",
    )
    
    plt.legend()
    plt.show()

In [None]:
def _get_confusion_matrix(y_true, y_pred, title=None):
    """
        Generate confusion matrix
    """

    cm = confusion_matrix(y_true, y_pred)

    class_labels = ['N', 'S', 'V', 'F', 'Q']

    # Calculate counts for each class
    class_totals = cm.sum(axis=1)

    # Calculate percentage for each class
    cm_percent = (cm.T / class_totals).T * 100

    plt.figure(figsize=(6, 6))

    # Plot confusion matrix with heatmap
    sns.heatmap(cm_percent, annot=False, cmap="Blues", fmt='d', xticklabels=class_labels, yticklabels=class_labels, cbar=False, linewidths=1, linecolor='white')

    # Annotate with total predictions
    for i in range(len(class_labels)):
        for j in range(len(class_labels)):
            # Annotations for count
            plt.text(j + 0.5, i + 0.6, f'{cm[i, j]}', ha='center', va='center', color='black', fontsize=8)
            # Annotations for percentage
            plt.text(j + 0.5, i + 0.4, f'{cm_percent[i, j]:.2f}%', ha='center', va='center', color='black', fontsize=8)

    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(title)
    plt.grid(False)
    plt.show()

#### Models and Pipelines

##### CNN

In [131]:
def _convert_to_tensor(X, y):
    """
        Convert data to tensor dataset
    """

    X = X.reshape(-1, 1, X.shape[-1])
    
    X = torch.from_numpy(X).float()
    y = torch.from_numpy(y).long()
    
    return X, y

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [133]:
def _load_cnn_model(cnn, fn):
    """
        Load model for evaluation
    """

    # Load checkpoint
    checkpoint = torch.load(f"models/cnn/{fn}.pth", map_location=device)

    # Initialize model
    model = cnn()
    model_dict = model.state_dict()

    # Update model parameters with checkpoint values
    for key in checkpoint.keys():
        if key in model_dict:
            model_dict[key] = checkpoint[key]

    # Load updated parameters into the model
    model.load_state_dict(model_dict)

    # Move model to cuda (if available)
    model.to(device)

    # Set model to evaluation mode
    model.eval()
    
    return model

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [134]:
def _evaluate_cnn(cnn, fn, subset='test', roc_curve=True):
    """
        Evaluate CNN model
                                
        Params: 
        ------------------------------------------------------------------------------
            cnn - Model Class (CNN or ResCNN)
            fn - Filename of model to be evaluated
            subset - Subset of source data to be evaluated (train, validation, test)
            roc_curve - Whether to show ROC curve
            
    """

    # Load model
    model = _load_cnn_model(cnn, fn)


    if subset == 'test':
        # Load data
        X_test, y_test = _import_data('test')

        # Preprocess data
        X, y = _preprocess(X_test, y_test)

    else:
        # Load data
        X_train, y_train, X_val, y_val = _import_data('train', validation_size=0.2)

        if subset == 'train':
            X, y = _preprocess(X_train, y_train)

        else:
            X, y = _preprocess(X_val, y_val)

    start = datetime.datetime.now()
    
    # Evaluate, and predict with probability
    with torch.no_grad():
        outputs = model(X)
        
    end = datetime.datetime.now()
    print(f"Predicting time: {end-start}")
    
    # Get predicted labels with highest probability
    _, y_pred = torch.max(outputs, 1)

    # Get classification report
    _get_report(y.cpu(), y_pred.cpu())

    # Generate confusion matric
    _get_confusion_matrix(y.cpu(), y_pred.cpu())

    if roc_curve:
        # Plot ROC curve
        _roc_curve(y.detach().cpu().numpy(), outputs.detach().cpu().numpy())

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [136]:
def _initilise_cnn(model, **kwargs):
    """
        Initialise CNN model using skorch, NeuralNetClassifier
    """

    # Define batchsize, epoch, and loss function
    return NeuralNetClassifier(
                            model,
                            criterion=nn.CrossEntropyLoss,
                            device=device,
                            verbose=True,
                            max_epochs=100,
                            batch_size=128,
                            **kwargs
                        )

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [137]:
def _callbacks(earlystop_patience=10, lr_scheduler=None, checkpoint=True):
    """
        Define callbacks for model training
    """

    # Earlystopping to prevent overfitting, by stop training when validation loss does not improve more than threshold
    early_stop = EarlyStopping(monitor='valid_loss', patience=earlystop_patience)

    # Model checkpoint to continuosly save best model, with the focus on best validation loss
    model_path = f'models/cnn/{get_timestamp()}.pth'
    checkpoint = Checkpoint(
        f_params=model_path,
        monitor='valid_loss_best',
        f_optimizer=None,
        f_history=None,
        f_criterion=None
    )

    # Define callback to compute and log training accuracy
    train_acc = EpochScoring(scoring='accuracy', name='train_acc', on_train=True)
    
    if checkpoint:
        return [early_stop, lr_scheduler, checkpoint, train_acc]
    
    else:
        return [early_stop, lr_scheduler, train_acc]

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [1]:
def _gridsearchcv(X_train, y_train, model, param_grid, cv=5, scoring='f1_macro'):
    """
        Perform paremeter tuning with stratify k-fold cross validation
    """

    skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=42)
    grid = GridSearchCV(estimator=model, param_grid=param_grid, cv=skf, scoring=scoring, n_jobs=-1, verbose=0)
    grid_result = grid.fit(X_train.cpu(), y_train.cpu())
    
    return grid_result

In [141]:
def _plot_history(history):
    """
        Plot training and validation loss/accuracy over epochs
    """

    fig, axs = plt.subplots(1, 2, figsize=(10, 4))

    axs[0].plot(history[:, 'train_loss'], label='Training')
    axs[0].plot(history[:, 'valid_loss'], label='Validation')
    axs[0].set_xlabel('Epoch')
    axs[0].set_ylabel('Loss')
    axs[0].legend()

    axs[1].plot(history[:, 'train_acc'], label='Training')
    axs[1].plot(history[:, 'valid_acc'], label='Validation')
    axs[1].set_xlabel('Epoch')
    axs[1].set_ylabel('Accuracy')
    axs[1].legend()

    plt.tight_layout()
    plt.show()

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [142]:
def _preprocess(X, y, balance=False, noise=False):
    """
    Preprocess data with optional balancing and augmentation, 
    then convert to tensor dataset and move to CUDA (if available)
    """

    # Balance data if specified
    if balance:
        X, y = _balancing(X, y, balance)

    # Add noise for augmentation if specified
    if noise:
        X = _gaussian_noise(X)

    # Convert data to tensor dataset
    X, y = _convert_to_tensor(X, y)

    # Move data to CUDA device if available
    X, y = X.to(device), y.to(device)
    
    return X, y

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [139]:
def _cnn_pipeline_with_gridsearch(cnn, param_grid, earlystop_patience=10, 
                                  checkpoint=False, balance=False, noise=False, **kwargs):
    """
        Encapsulated CNN pipeline for importing training data, 
        preprocessing, hyperparameter tuning, 
        and returning the best parameters
        
        Params: 
        ------------------------------------------------------------------------------
            cnn - Model Class (CNN or ResCNN)
            param_grid - Dictionary of parameters to be selected through gridsearch process
            earlystop_patience - Early stopping threshold
            checkpoint - Whether to enable checkpoint to continuously save best model during training
            balance - Whether to balance dataset or not
            noise - Whether to add noise to dataset or not
        
    """

    # Import training data, and filter out validation set
    X_train, y_train, X_val, y_val = _import_data('train', validation_size=0.2)    

    # Preproces train and validation set
    X_train, y_train = _preprocess(X_train, y_train, balance=balance, noise=noise)

    # Define scheduler to adjust learning rate during training
    lr_scheduler = LRScheduler(policy='ReduceLROnPlateau', mode='min', patience=5, factor=0.5, verbose=True)

    # Define callbacks for earlystopping and learning rate scheduler
    callbacks = _callbacks(earlystop_patience=earlystop_patience, lr_scheduler=lr_scheduler)

    # Initialize CNN model
    model = _initilise_cnn(cnn, callbacks=callbacks, optimizer=optim.SGD, optimizer__momentum=0.9, 
                           optimizer__weight_decay=0.0001, lr=0.05, **kwargs)
    model.initialize()

    # Move to cuda (if available)
    model.module_.to(device)

    # Perform gridsearch cross-validation
    grid_result = _gridsearchcv(X_train, y_train, model, param_grid, cv=5)

    # Get best params and scores
    best_params = grid_result.best_params_
    best_score = grid_result.best_score_
    print("Best score: %f with %s" % (best_score, best_params))
    
    return best_params

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [140]:
def _cnn_pipeline_with_best_param(cnn, params, earlystop_patience=10, 
                                  checkpoint=True, class_weight=False, balance=False, 
                                  noise=False, fn=None, optimizer=None, lr_scheduler=None):
    """
        Encapsulated CNN pipeline for training model with best parameters
                
        Params: 
        ------------------------------------------------------------------------------
            cnn - Model Class (CNN or ResCNN)
            params - Dictionary of best parameters obtained from parameter selection process
            earlystop_patience - Early stopping threshold
            checkpoint - Whether to enable checkpoint to continuously save best model during training
            balance - Whether to balance dataset or not
            noise - Whether to add noise to dataset or not
            fn - Customized filename of final model will be saved to
            optimizer - Whether to use default optimizer settings, or manually passes
            lr_scheduler - Learning rate scheduler
            
    """

    # Import train and validation set
    X_train, y_train, X_val, y_val = _import_data('train', validation_size=0.2)

    # Compute class weight with class frequency to handle class imbalanced
    weights = torch.tensor(compute_class_weight('balanced', 
                                        classes=np.unique(y_train), 
                                        y=y_train.flatten()), dtype=torch.float)

    # Preprocess train and validation set
    X_train, y_train = _preprocess(X_train, y_train, balance=balance, noise=noise)
    X_val, y_val = _preprocess(X_val, y_val, balance=balance, noise=noise)


    # Define scheduler to adjust learning rate during training
    if lr_scheduler is None:
        lr_scheduler = LRScheduler(policy='ReduceLROnPlateau', mode='min', patience=5, factor=0.5, verbose=True)

    
    # Define callbacks
    callbacks = _callbacks(earlystop_patience=earlystop_patience, lr_scheduler=lr_scheduler, checkpoint=checkpoint)

    # Initialize CNN model
    if optimizer is None:
        if class_weight:
            model = _initilise_cnn(cnn, callbacks=callbacks, train_split=predefined_split(Dataset(X_val, y_val)), 
                                   criterion__weight=weights, optimizer=optim.SGD, optimizer__momentum=0.9, optimizer__weight_decay=0.0001, lr=0.05,**params)
            
        else:
            model = _initilise_cnn(cnn, callbacks=callbacks, train_split=predefined_split(Dataset(X_val, y_val)), 
                                   optimizer=optim.SGD, optimizer__momentum= 0.9, optimizer__weight_decay=0.0001, lr=0.05,**params)

    # To train model with replicate structure from reference paper
    else:
        model = _initilise_cnn(cnn, callbacks=callbacks, criterion__weight=weights, train_split=predefined_split(Dataset(X_val, y_val)), **params)
    
    model.initialize()

    # Move model to cuda if available
    model.module_.to(device)

    # Train model
    model.fit(X_train, y_train)

    # Get model prediction on train set
    y_pred = model.predict(X_train)

    # Generate classification report for train set
    _get_report(y_train.cpu().numpy(), y_pred)

    # Get model prediction on validation set
    y_pred = model.predict(X_val)

    # Generate classification report for validation set
    _get_report(y_val.cpu().numpy(), y_pred)

    # Plot learning graph through epochs, with accuracy and loss of train and validation set
    _plot_history(model.history)

    # Save final models
    if not fn:
        fn = get_timestamp()
    
    fp = f'models/cnn/{fn}.pth'
    torch.save(model.module_.state_dict(), fp)
    
    print(f"Best model saved to {fp}")
    
    return model

time: 0 ns (started: 2024-04-03 10:53:30 +01:00)


In [None]:
class ConvBlock(nn.Module):
    """
        Convolutional block for a layer of convolution followed by batch normalization, activation, and max pooling
    """

    def __init__(self, inputs, outputs, activation=nn.GELU, kernel_size=3, 
                 padding='same', pool_kernel=3, pool_stride=2):
        
        super().__init__()

        # Define convolutional layer
        self.conv = nn.Conv1d(inputs, outputs, kernel_size=kernel_size, padding=padding)

        # Batch normalization
        self.bn = nn.BatchNorm1d(outputs)

        # Activation function
        self.activation = activation()

        # Max pooling
        self.maxpool = nn.MaxPool1d(kernel_size=pool_kernel, stride=pool_stride)

    def forward(self, x):

        # Forward through convolution, batch normalization, activation, and max pooling
        x = self.activation(self.bn(self.conv(x)))
        x = self.maxpool(x)
        return x

    
class CNN(nn.Module):
    """
        Convolutional Neural Network model with multiple ConvBlocks followed by fully connected layers
    """

    def __init__(self, neurons=128, activation=nn.GELU, dropout=0.3):
        super().__init__()

        # Convolutional layers
        self.conv1 = ConvBlock(1, 32, activation=activation)
        self.conv2 = ConvBlock(32, 64, activation=activation)
        self.conv3 = ConvBlock(64, 128, activation=activation)
        self.conv4 = ConvBlock(128, 256, activation=activation)
        self.conv5 = ConvBlock(256, 512, activation=activation)

        # Adaptive max pooling
        self.pool = nn.AdaptiveMaxPool1d(1)

        # Activation function
        self.activation = activation()

        # Fully connected layer
        self.fc1 = nn.Linear(512, neurons)

        # Batch normalization
        self.bn = nn.BatchNorm1d(neurons)

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

        # Output layer with 5 classes
        self.fc2 = nn.Linear(neurons, 5)

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):

        # Forward through convolutional layers
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)

        # Adaptive max pooling
        x = self.pool(x)

        # Flatten before passing to fully connected layers
        x = torch.flatten(x, 1)

        # Fully connected layers
        x = self.activation(self.bn(self.fc1(x)))
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.softmax(x)
        
        return x

In [None]:
class ResidualBlock(nn.Module):
    """
        Residual block for a layer of convolution followed by activation and max pooling.
        This structure is aimed to replicate the models done by M. Kachuee et al.
        Further details will be described in glossary.
    """

    def __init__(self, in_channels, out_channels, kernel_size=5, stride=1, padding='same', activation=nn.ReLU):
        super().__init__()

        # Define convolutional layers
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, stride, padding)

        # Activation function
        self.activation = activation()

        # Max pooling layer
        self.maxpool = nn.MaxPool1d(kernel_size=5, stride=2)
        
    def forward(self, x):

        # Store the residual
        residual = x

        # Forward through the first convolutional layer and activation
        out = self.conv1(x)
        out = self.activation(out)

        # Forward through the second convolutional layer
        out = self.conv2(out)

        # Add residual to output
        out += residual

        # Apply activation to output
        out = self.activation(out)

        # Apply max pooling
        out = self.maxpool(out)
        
        return out

class ResCNN(nn.Module):
    """
        Residual Convolutional Neural Network model with multiple ResidualBlocks followed by fully connected layers
    """

    def __init__(self, activation=nn.ReLU):
        super().__init__()

        # Define the initial convolutional layer
        self.conv1 = nn.Conv1d(1, 32, kernel_size=5, stride=1)

        # Define the sequence of residual blocks
        self.res_blocks = nn.Sequential(
            ResidualBlock(32, 32, activation=activation),
            ResidualBlock(32, 32, activation=activation),
            ResidualBlock(32, 32, activation=activation),
            ResidualBlock(32, 32, activation=activation),
            ResidualBlock(32, 32, activation=activation)
        )

        # Flatten layer
        self.flatten = nn.Flatten()

        # Activation function
        self.activation = activation()

        # Fully connected layers
        self.fc1 = nn.Linear(64, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, 5)
        
    def forward(self, x):

        # Forward through the initial convolutional layer
        x = self.conv1(x)

        # Forward through the sequence of residual blocks
        x = self.res_blocks(x)

        # Flatten the output
        x = self.flatten(x)

        # Fully connected layers
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)

        # Softmax activation for multiclass classification
        x = F.softmax(self.fc3(x), dim=1)
        
        return x

##### SVM

In [17]:
def _svm_pipeline(balanced_sample=None, dimredc=None, 
                      n_components=None, n_folds=5, class_weight=None,
                      decision_function_shape='ovr', model_fn=None,
                      max_iter=-1):
    """
        Encapsulated SVM pipeline to import data, preproces, 
        hyper paramater tuning, and training model
                        
        Params: 
        ------------------------------------------------------------------------------
            balanced_sample - Number of records after balancing
            dimredc - Feature reduction type (PCA, LDA)
            n_components - Number of components will be retained after transformation
            n_folds - Number of subsets that the dataset will be divided for cross-validation
            class_weight - Whether to apply class weights or not
            decision_function_shape - Shape of the decision function (ovo, ovr)
            model_fn - Filename of final model
            max_iter - Maximum number of iteration

    """

    # Import train data
    X_train, y_train = _import_data('train')

    # Balance data if specific
    if balanced_sample:
        X_train, y_train = _balancing(X_train, y_train, balanced_sample)
    print(np.unique(y_train, return_counts=True))

    # Feature reduction if specific
    steps = []
    if dimredc == 'pca':
        steps.append(('pca', PCA(n_components=n_components)))
    elif dimredc == 'lda':
        steps.append(('lda', LinearDiscriminantAnalysis(n_components=n_components)))


    # Initialize SVM model
    steps.append(('svm', SVC(decision_function_shape=decision_function_shape, 
                             max_iter=max_iter, 
                             verbose=1,
                             class_weight=class_weight)))
    pipeline = Pipeline(steps)

    # Perform stratified gridsearch cross validation
    grid_search = GridSearchCV(pipeline, params, cv=StratifiedKFold(n_splits=n_folds), n_jobs=-1, scoring='f1_macro')
    grid_search.fit(X_train, y_train)

    # Get best parameters
    print("Best Parameters:", grid_search.best_params_)

    # Define final model with best paramaters
    best_model = grid_search.best_estimator_

    # Save best model for further evaluation
    if model_fn:
        model_fp = f'models/svm/{model_fn}.pkl'
    else:
        model_fp = f'models/svm/{get_timestamp()}.pkl'

    joblib.dump(best_model, model_fp)
    print(f"Model saved to {model_fp}")

    # Predict on train set, with classification report
    y_pred = best_model.predict(X_train)
    _get_report(y_train, y_pred)

time: 0 ns (started: 2024-03-27 13:59:12 +00:00)


In [None]:
def _evaluate_svm(fn, subset='test', roc_curve=True):
    """
        Evaluate SVM model
                        
        Params: 
        ------------------------------------------------------------------------------
            fn - Filename of model to be evaluated
            subset - Subset of source data to be evaluated (train, validation, test)
            roc_curve - Whether to show ROC curve
            
    """

    if subset == 'test':
        # Load data
        X, y = _import_data('test')

    else:
        X, y = _import_data('train')

    # Load best model
    model = joblib.load(f"models/svm/{fn}.pkl")

    start = datetime.datetime.now()
    
    # Evaluate, and predict
    y_pred = model.predict(X)

    end = datetime.datetime.now()
    print(f"Predicting time: {end-start}")

    # Get classification report
    _get_report(y, y_pred)

    # Generate confusion matric
    _get_confusion_matrix(y, y_pred)

    if roc_curve:

        # Evaluate, and predict with probability
        y_prob_test = model.predict_proba(X)

        # Plot ROC curve
        _roc_curve(y, y_prob_test)