In [2]:
import os
import json
from metrics import *
from sklearn.metrics import make_scorer, cohen_kappa_score, balanced_accuracy_score
import numpy as np
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV
from sklearn.svm import SVC

In [11]:
model_name = 'svm'

In [3]:
# Load features and labels
X = np.load('../../features.npy')
y = np.load('../../labels.npy')

X.shape, y.shape

((8724, 2048), (8724,))

In [4]:
# Count the number of samples in each class
print('Number of samples in each class:')
print(np.unique(y, return_counts=True))

# Define class names
class_names = ["Few", "Many", "None"]
print('Class names:')
print(class_names)

Number of samples in each class:
(array([0, 1, 2]), array([6232,  256, 2236]))
Class names:
['Few', 'Many', 'None']


In [5]:
# Divide the data into 2 classes only (Normal and Abnormal)
y_binary = np.zeros(y.shape)
y_binary[y != 2] = 0 # Few and Many
y_binary[y == 2] = 1 # None

# Check the number of samples in each class
print('Number of samples in each class:')
print(np.unique(y_binary, return_counts=True))

# Define the new class names
binary_class_names = ["Abormal", "Normal"]
print('Binary class names:')
print(binary_class_names)

Number of samples in each class:
(array([0., 1.]), array([6488, 2236]))
Binary class names:
['Abormal', 'Normal']


In [6]:
# Define the parameter grid for SVM
param_grid = {
    'C': [0.1, 1, 10, 100],
    'gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1],
    'kernel': ['linear', 'rbf', 'poly', 'sigmoid'],
    'class_weight': ['balanced', None]
}

custom_score = custom_score = make_scorer(custom_scorer, greater_is_better=True, 
                           response_method='predict_proba'
                           )

In [7]:
# Initialize the KNN classifier
svm = SVC(probability=True, random_state=42)

# Define the cross-validation strategy
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# Initialize RandomizedSearchCV
random_search = RandomizedSearchCV(
    estimator=svm, 
    param_distributions=param_grid,
    n_iter=100, 
    cv=cv, 
    verbose=2, 
    random_state=42, 
    n_jobs=-1,
    scoring=custom_score
)

In [8]:
random_search.fit(X, y)

Fitting 5 folds for each of 100 candidates, totalling 500 fits
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time= 1.0min
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time= 1.8min
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time= 1.8min
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time= 1.8min
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time= 1.9min
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time= 1.9min
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time= 2.1min
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time= 1.7min
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time= 1.0min
[CV] END ......C=0.1, class_weight=None, gamma=1, kernel=rbf; total time= 3.6min
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time= 1.7min
[CV] END ......C=0.1, class_weight=None, gamma

In [9]:
# Save best parameters
with open('dd_normal_vs_abnormal_best_params.json', 'w') as f:
    json.dump(random_search.best_params_, f, indent=4)

In [10]:
# Get the best parameters
best_params = random_search.best_params_
print("Best parameters:", best_params)

# Load the best parameters from a json file
# with open('dd_normal_vs_abnormal_best_params.json', 'r') as f:
#     best_params = json.load(f)

# Perform 10-fold cross-validation using the best model
best_clsfr_nor_abn = SVC(**best_params, probability=True, random_state=42)

Best parameters: {'kernel': 'rbf', 'gamma': 0.1, 'class_weight': 'balanced', 'C': 1}


In [12]:
fold_results = []

for fold, (train_index, val_index) in enumerate(cv.split(X, y_binary), 1):
    print(f"Fold {fold}")
    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y_binary[train_index], y_binary[val_index]
    
    best_clsfr_nor_abn.fit(X_train, y_train)
    y_pred = best_clsfr_nor_abn.predict(X_val)
    y_prob = best_clsfr_nor_abn.predict_proba(X_val)
    
    # Calculate metrics
    accuracy, class_metrics, auc, f1, cm, avg_sensitivity, avg_specificity = calculate_metrics(y_val, y_pred, y_prob, num_classes=2)

    # Log metrics for this fold
    metrics = {
        'fold': fold,
        'val_accuracy': accuracy,
        'val_auc': auc,
        'val_f1': f1,
        'avg_sensitivity': avg_sensitivity,
        'avg_specificity': avg_specificity,
        **{f'class_{binary_class_names[i]}_sensitivity': metrics["sensitivity"] for i, metrics in enumerate(class_metrics)},
        **{f'class_{binary_class_names[i]}_specificity': metrics["specificity"] for i, metrics in enumerate(class_metrics)},
        **{f'class_{binary_class_names[i]}_f1': 2 * metrics["sensitivity"] * metrics["specificity"] / (metrics["sensitivity"] + metrics["specificity"]) for i, metrics in enumerate(class_metrics)},
    }
    
    custom_log(metrics, model_name=f'dd_svm', log_dir='logs_dd/nor_vs_abn/')
    print("Metrics for this fold:")
    print(metrics)

    # Plot confusion matrix for this fold
    plot_confusion_matrix(cm, class_names=binary_class_names, epoch_num=0, model_name='dd_svm', fold_num=fold, save_dir='figures_dd/nor_vs_abn/')
    
    fold_results.append(metrics)

# Calculate and print average results across all folds
avg_results = {key: np.mean([fold[key] for fold in fold_results if key in fold]) 
               for key in fold_results[0].keys() if key != 'fold'}

print("Average results across all folds:")
for key, value in avg_results.items(): 
    print(f"{key}: {value}")

# Log average results
custom_log(avg_results, model_name='dd_svm', log_dir='logs_dd/nor_vs_abn/')

Fold 1
Metrics for this fold:
{'fold': 1, 'val_accuracy': 0.9742120343839542, 'val_auc': 0.9934040668314357, 'val_f1': 0.9494949494949495, 'avg_sensitivity': 0.9650649941572476, 'avg_specificity': 0.9650649941572476, 'class_Abormal_sensitivity': 0.9838212634822804, 'class_Normal_sensitivity': 0.9463087248322147, 'class_Abormal_specificity': 0.9463087248322147, 'class_Normal_specificity': 0.9838212634822804, 'class_Abormal_f1': 0.9647004615701958, 'class_Normal_f1': 0.9647004615701958}
Fold 2
Metrics for this fold:
{'fold': 2, 'val_accuracy': 0.966189111747851, 'val_auc': 0.9944795469195424, 'val_f1': 0.9362162162162162, 'avg_sensitivity': 0.9670056841880297, 'avg_specificity': 0.9670056841880297, 'class_Abormal_sensitivity': 0.9653312788906009, 'class_Normal_sensitivity': 0.9686800894854586, 'class_Abormal_specificity': 0.9686800894854586, 'class_Normal_specificity': 0.9653312788906009, 'class_Abormal_f1': 0.967002784894731, 'class_Normal_f1': 0.967002784894731}
Fold 3
Metrics for this

In [13]:
# Predict the class Abnormal for each sample using the best model from the first stage
y_pred = best_clsfr_nor_abn.predict(X)
abnormal_indices = np.where(y_pred == 0)[0]
print(f"Number of predicted abnormal samples: {len(abnormal_indices)}")

Number of predicted abnormal samples: 6393


In [14]:
# Filter the dataset to include only the predicted Abnormal sample
X_pred_abnormal = X[abnormal_indices]
y_pred_abnormal = y[abnormal_indices]

# Check the number of samples in each class
print('Number of samples in each class:')
len(y_pred_abnormal), len(X_pred_abnormal)

Number of samples in each class:


(6393, 6393)

In [15]:
print(np.unique(y_pred_abnormal, return_counts=True))
normal_in_abnormal = y_pred_abnormal == 2  # 2 is the label for Normal
len(y_pred_abnormal[normal_in_abnormal])

(array([0, 1, 2]), array([6060,  252,   81]))


81

In [16]:
X_abnormal = X_pred_abnormal[~normal_in_abnormal]
y_abnormal = y_pred_abnormal[~normal_in_abnormal]

# Check the number of samples in each class
print('Number of samples in each class:')
np.unique(y_abnormal, return_counts=True)

Number of samples in each class:


(array([0, 1]), array([6060,  252]))

In [17]:
# Initialize RandomizedSearchCV
random_search_abn = RandomizedSearchCV(
    estimator=svm, 
    param_distributions=param_grid,
    n_iter=100, 
    cv=cv, 
    verbose=2, 
    random_state=42, 
    n_jobs=-1,
    scoring=custom_score
)

In [18]:
random_search_abn.fit(X_abnormal, y_abnormal)

Fitting 5 folds for each of 100 candidates, totalling 500 fits
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time=   6.8s
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time=   6.9s
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time=   7.0s
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time=   7.1s
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time=   7.2s
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time=   8.0s
[CV] END ..C=10, class_weight=None, gamma=0.1, kernel=linear; total time=   9.3s
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time=   5.5s
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time=   5.8s
[CV] END ..C=1, class_weight=None, gamma=auto, kernel=linear; total time=   5.8s
[CV] END .C=10, class_weight=balanced, gamma=0.1, kernel=rbf; total time=  24.9s
[CV] END ......C=0.1, class_weight=None, gamma

In [19]:
# Save best parameters few vs many
with open('dd_few_vs_many_best_params.json', 'w') as f:
    json.dump(random_search_abn.best_params_, f, indent=4)

In [20]:
# Load the best parameters from a json file
with open('dd_few_vs_many_best_params.json', 'r') as f:
    best_params_abn = json.load(f)

best_clsfr_few_many = SVC(**best_params_abn, probability=True, random_state=42)

abn_class_names = ["Few", "Many"]

In [21]:
fold_results = []

for fold, (train_index, val_index) in enumerate(cv.split(X_abnormal, y_abnormal), 1):
    print(f"Fold {fold}")
    X_train, X_val = X_abnormal[train_index], X_abnormal[val_index]
    y_train, y_val = y_abnormal[train_index], y_abnormal[val_index]
    
    best_clsfr_few_many.fit(X_train, y_train)
    y_pred = best_clsfr_few_many.predict(X_val)
    y_prob = best_clsfr_few_many.predict_proba(X_val)
    
    # Calculate metrics
    accuracy, class_metrics, auc, f1, cm, avg_sensitivity, avg_specificity = calculate_metrics(y_val, y_pred, y_prob, num_classes=2)

    # Log metrics for this fold
    metrics = {
        'fold': fold,
        'val_accuracy': accuracy,
        'val_auc': auc,
        'val_f1': f1,
        'avg_sensitivity': avg_sensitivity,
        'avg_specificity': avg_specificity,
        **{f'class_{abn_class_names[i]}_sensitivity': metrics["sensitivity"] for i, metrics in enumerate(class_metrics)},
        **{f'class_{abn_class_names[i]}_specificity': metrics["specificity"] for i, metrics in enumerate(class_metrics)},
        **{f'class_{abn_class_names[i]}_f1': 2 * metrics["sensitivity"] * metrics["specificity"] / (metrics["sensitivity"] + metrics["specificity"]) for i, metrics in enumerate(class_metrics)},
    }
    
    custom_log(metrics, model_name=f'dd_{model_name}_{fold}', log_dir='logs_dd/few_vs_many/')
    print("Metrics for this fold:")
    print(metrics)

    # Plot confusion matrix for this fold
    plot_confusion_matrix(cm, class_names=abn_class_names, epoch_num=0, model_name=f'dd_{model_name}', fold_num=fold, save_dir='figures_dd/few_vs_many/')
    
    fold_results.append(metrics)

# Calculate and print average results across all folds
avg_results = {key: np.mean([fold[key] for fold in fold_results if key in fold]) 
               for key in fold_results[0].keys() if key != 'fold'}

print("Average results across all folds:")
for key, value in avg_results.items(): 
    print(f"{key}: {value}")

# Log average results
custom_log(avg_results, model_name=f'dd_{model_name}_average', log_dir='logs_dd/few_vs_many/')

Fold 1
Metrics for this fold:
{'fold': 1, 'val_accuracy': 0.9968329374505146, 'val_auc': 0.9893224616579305, 'val_f1': 0.9615384615384616, 'avg_sensitivity': 0.9889584546689962, 'avg_specificity': 0.9889584546689962, 'class_Few_sensitivity': 0.9975247524752475, 'class_Many_sensitivity': 0.9803921568627451, 'class_Few_specificity': 0.9803921568627451, 'class_Many_specificity': 0.9975247524752475, 'class_Few_f1': 0.9888842539199568, 'class_Many_f1': 0.9888842539199568}
Fold 2
Metrics for this fold:
{'fold': 2, 'val_accuracy': 1.0, 'val_auc': 1.0, 'val_f1': 1.0, 'avg_sensitivity': 1.0, 'avg_specificity': 1.0, 'class_Few_sensitivity': 1.0, 'class_Many_sensitivity': 1.0, 'class_Few_specificity': 1.0, 'class_Many_specificity': 1.0, 'class_Few_f1': 1.0, 'class_Many_f1': 1.0}
Fold 3
Metrics for this fold:
{'fold': 3, 'val_accuracy': 0.9960380348652932, 'val_auc': 0.9987128712871287, 'val_f1': 0.9494949494949495, 'avg_sensitivity': 0.9691749174917492, 'avg_specificity': 0.9691749174917492, 'cla

In [22]:
def combine_predictions(normal_abnormal_pred, few_many_pred):
    # Initialize final_pred with the same shape as normal_abnormal_pred, filled with 2 (Normal)
    final_pred = np.full(normal_abnormal_pred.shape, 2)
    
    # Create a mask where normal_abnormal_pred is 0 (Abnormal)
    abnormal_mask = normal_abnormal_pred == 0
    
    # Update final_pred where the mask is True with values from few_many_pred
    final_pred[abnormal_mask] = few_many_pred
    
    # Return the final combined predictions
    return final_pred

def combine_probabilities(normal_abnormal_prob, few_many_prob, abnormal_mask):
    # Initialize with probabilities for Normal class
    combined_prob = np.column_stack((np.zeros_like(normal_abnormal_prob[:, 0]), 
                                     np.zeros_like(normal_abnormal_prob[:, 0]), 
                                     normal_abnormal_prob[:, 1]))
    
    # Update probabilities for Abnormal (Few and Many) classes
    if few_many_prob.size > 0:
        combined_prob[abnormal_mask, 0] = few_many_prob[:, 0] * normal_abnormal_prob[abnormal_mask, 0]
        combined_prob[abnormal_mask, 1] = few_many_prob[:, 1] * normal_abnormal_prob[abnormal_mask, 0]
        combined_prob[abnormal_mask, 2] = 0  # Probability of being Normal for predicted Abnormal samples
    
    # Normalize probabilities
    row_sums = combined_prob.sum(axis=1)
    combined_prob /= row_sums[:, np.newaxis]
    
    return combined_prob

In [23]:
overall_true = []
overall_pred = []
overall_prob = []

for fold, (train_index, val_index) in enumerate(cv.split(X, y), 1):
    print(f"Fold {fold}")
    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y[train_index], y[val_index]
    
    # Stage 1: Normal vs Abnormal
    y_train_binary = (y_train == 2).astype(int)
    best_clsfr_nor_abn.fit(X_train, y_train_binary)
    y_pred_binary = best_clsfr_nor_abn.predict(X_val)
    y_prob_binary = best_clsfr_nor_abn.predict_proba(X_val)
    
    # Stage 2: Few vs Many (only for predicted Abnormal samples)
    abnormal_mask_train = y_train != 2
    X_train_abnormal = X_train[abnormal_mask_train]
    y_train_abnormal = y_train[abnormal_mask_train]
    
    best_clsfr_few_many.fit(X_train_abnormal, y_train_abnormal)
    
    abnormal_mask_val = y_pred_binary == 0
    X_val_abnormal = X_val[abnormal_mask_val]
    
    if len(X_val_abnormal) > 0:
        y_pred_few_many = best_clsfr_few_many.predict(X_val_abnormal)
        y_prob_few_many = best_clsfr_few_many.predict_proba(X_val_abnormal)
    else:
        y_pred_few_many = np.array([])
        y_prob_few_many = np.array([])
    
    # Combine predictions and probabilities
    y_pred_combined = combine_predictions(y_pred_binary, y_pred_few_many)
    y_prob_combined = combine_probabilities(y_prob_binary, y_prob_few_many, abnormal_mask_val)
    
    overall_true.extend(y_val)
    overall_pred.extend(y_pred_combined)
    overall_prob.extend(y_prob_combined)
    
    # Calculate and log metrics for this fold
    accuracy, class_metrics, auc, f1, cm, avg_sensitivity, avg_specificity = calculate_metrics(y_val, y_pred_combined, y_prob_combined, num_classes=3)
     
    metrics = {
        'fold': fold,
        'val_accuracy': accuracy,
        'val_auc': auc,
        'val_f1': f1,
        'avg_sensitivity': avg_sensitivity,
        'avg_specificity': avg_specificity,
        **{f'class_{class_names[i]}_sensitivity': metrics["sensitivity"] for i, metrics in enumerate(class_metrics)},
        **{f'class_{class_names[i]}_specificity': metrics["specificity"] for i, metrics in enumerate(class_metrics)},
        **{f'class_{class_names[i]}_f1': 2 * metrics["sensitivity"] * metrics["specificity"] / (metrics["sensitivity"] + metrics["specificity"]) for i, metrics in enumerate(class_metrics)},
    }
    
    custom_log(metrics, model_name=f'dd_combined_{model_name}_{fold}', log_dir='logs_dd/combined/')
    print("Metrics for this fold:")
    print(metrics)
    
    # Plot confusion matrix for this fold
    plot_confusion_matrix(cm, class_names=class_names, epoch_num=0, model_name=f'dd_combined_{model_name}', fold_num=fold, save_dir='figures_dd/combined/')

# Calculate overall metrics
overall_true = np.array(overall_true)
overall_pred = np.array(overall_pred)
overall_prob = np.array(overall_prob)

accuracy, class_metrics, auc, f1, cm, avg_sensitivity, avg_specificity = calculate_metrics(overall_true, overall_pred, overall_prob, num_classes=3)

overall_metrics = {
    'overall_accuracy': accuracy,
    'overall_auc': auc,
    'overall_f1': f1,
    'overall_avg_sensitivity': avg_sensitivity,
    'overall_avg_specificity': avg_specificity,
    **{f'overall_class_{class_names[i]}_sensitivity': metrics["sensitivity"] for i, metrics in enumerate(class_metrics)},
    **{f'overall_class_{class_names[i]}_specificity': metrics["specificity"] for i, metrics in enumerate(class_metrics)},
    **{f'overall_class_{class_names[i]}_f1': 2 * metrics["sensitivity"] * metrics["specificity"] / (metrics["sensitivity"] + metrics["specificity"]) for i, metrics in enumerate(class_metrics)},
}

print("Overall metrics:")
for key, value in overall_metrics.items():
    print(f"{key}: {value}")

custom_log(overall_metrics, model_name=f'dd_combined_{model_name}_overall', log_dir='logs_dd/combined/')

# Plot overall confusion matrix
plot_confusion_matrix(cm, class_names=class_names, epoch_num=0, model_name=f'dd_combined_{model_name}_overall', save_dir='figures_dd/combined/') 

Fold 1
Metrics for this fold:
{'fold': 1, 'val_accuracy': 0.9787965616045845, 'val_auc': 0.9855139926329484, 'val_f1': 0.9789799035717778, 'avg_sensitivity': 0.9877175766208776, 'avg_specificity': 0.9887957033612073, 'class_Few_sensitivity': 0.9743384121892542, 'class_Many_sensitivity': 1.0, 'class_None_sensitivity': 0.9888143176733781, 'class_Few_specificity': 0.9899598393574297, 'class_Many_specificity': 0.9964580873671782, 'class_None_specificity': 0.9799691833590138, 'class_Few_f1': 0.9820870097004446, 'class_Many_f1': 0.9982259018332348, 'class_None_f1': 0.9843718812921299}
Fold 2
Metrics for this fold:
{'fold': 2, 'val_accuracy': 0.9782234957020057, 'val_auc': 0.9844761119278884, 'val_f1': 0.9784024025439811, 'avg_sensitivity': 0.9874502684145154, 'avg_specificity': 0.9882987683496344, 'class_Few_sensitivity': 0.9735364875701684, 'class_Many_sensitivity': 1.0, 'class_None_sensitivity': 0.9888143176733781, 'class_Few_specificity': 0.9899598393574297, 'class_Many_specificity': 0.99