In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import roc_curve, auc
from keras import layers, models
from sklearn.preprocessing import scale
import keras.backend as K
from keras import layers, models
from keras.layers import Input, Dense, Layer, Reshape, Flatten, BatchNormalization, Dropout, Lambda, Activation, Multiply, LeakyReLU
from keras.models import Model
from keras.optimizers import Adam
import tensorflow as tf
import keras.backend as K

path = "/content/drive/MyDrive/Watashara_Projects/NIA/"


shapewx = [1, input_dimwx, 1]
labels = pd.read_csv(path + 'Features/AP2D.csv')
labels['Activity'] = labels['Class'].map({'Active': 1, 'Inactive': 0})
label = labels['Activity'].values

def get_shuffle(data, label):
    index = np.arange(len(label))
    np.random.shuffle(index)
    return data[index], label[index]

(datasetwx, label) = get_shuffle(datasetwx, label)

def scale_mean_var(input_arr, axis=0):
    mean_ = np.mean(input_arr, axis=0)
    scale_ = np.std(input_arr, axis=0)
    output_arr = input_arr - mean_
    scale_[scale_ == 0.0] = 1.0
    output_arr /= scale_
    return output_arr

# Define Capsule Network
def squash(vectors, axis=-1):
    s_squared_norm = tf.reduce_sum(tf.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / tf.sqrt(s_squared_norm + K.epsilon())
    return scale * vectors

def build_capsule_network(input_shape):
    input_layer = Input(shape=input_shape)

    # Convolutional layer
    x = layers.Conv2D(64, (1, 9), strides=2, padding='valid', name='conv1')(input_layer)
    x = LeakyReLU()(x)
    x = BatchNormalization(momentum=0.8)(x)

    # Primary Capsule Layer
    x = layers.Conv2D(32, (1, 3), strides=2, padding='valid', name='primarycap_conv2')(x)
    [_, _, num_caps, _] = x.shape
    num_caps = int(num_caps)
    x = Reshape(target_shape=[-1, num_caps], name='primarycap_reshape')(x)

    # Apply squash function
    x = Lambda(squash, output_shape=(x.shape[1], num_caps), name='primarycap_squash')(x)
    x = BatchNormalization(momentum=0.8)(x)

    # Digit Capsule Layer with routing mechanism
    x = Flatten()(x)
    uhat = Dense(128, kernel_initializer='he_normal', bias_initializer='zeros', name='uhat_digitcaps')(x)
    c = Activation('softmax', name='softmax_digitcaps1')(uhat)
    c = Dense(128)(c)
    x = Multiply()([uhat, c])
    s_j = LeakyReLU()(x)

    # Repeat routing process
    for _ in range(3):
        c = Activation('softmax')(s_j)
        c = Dense(128)(c)
        x = Multiply()([uhat, c])
        s_j = LeakyReLU()(x)

    # Output layer with Dropout
    s_j = Reshape((-1, 128, 1))(s_j)
    cbam = layers.GlobalAveragePooling2D()(s_j)
    cbam = Dropout(0.5)(cbam)  # Added Dropout for regularization
    cbam = Dense(2, activation='sigmoid')(cbam)

    return Model(input_layer, cbam)

# Train and evaluate the Capsule Network
def categorical_probas_to_classes(p):
    return np.argmax(p, axis=1)


ytest = np.ones((1, 2)) * 0.5
yscore = np.ones((1, 2)) * 0.5
sepscores = []

# Build and compile the Capsule Network
capsule_network = build_capsule_network((1, input_dimwx, 1))
capsule_network.compile(loss='binary_crossentropy', optimizer=Adam(0.0002, 0.5), metrics=['accuracy'])

skf = StratifiedKFold(n_splits=5)

if not os.path.exists(path + (f'Results/{fld_name}/')):
    os.makedirs(path + f'Results/{fld_name}/')

X_train, X_ind, y_train, y_ind = train_test_split(datasetwx, label, test_size=0.2, random_state=42)

for fold_index, (train, test) in enumerate(skf.split(X_train, y_train)):
    label_train = to_categorical(label[train])
    X = datasetwx

    # Reshape the input data
    X_train_fold = X[train].reshape(-1, 1, input_dimwx, 1)
    X_test_fold = X[test].reshape(-1, 1, input_dimwx, 1)

    # Initialize a new model for each fold
    capsule_network_fold = build_capsule_network((1, input_dimwx, 1))
    capsule_network_fold.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])#0.0002, 0.5

    # Early stopping
    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

    # Learning rate adjustment
    lr_reduction = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=2,
        min_lr=1e-6,
        verbose=1
    )

    history = capsule_network_fold.fit(
        X_train_fold,
        label_train,
        epochs=30,
        validation_data=(X_test_fold, to_categorical(label[test])),
        callbacks=[early_stopping, lr_reduction]
    )

    # Save the model after training on each fold with a unique filename
    model_save_path = path + f'Results/{fld_name}/capsule_network_model_fold_{fold_index + 1}.h5'
    capsule_network_fold.save(model_save_path)

    y_test = to_categorical(label[test])
    ytest = np.vstack((ytest, y_test))
    y_score = capsule_network_fold.predict(X_test_fold)
    yscore = np.vstack((yscore, y_score))
    fpr, tpr, _ = roc_curve(y_test[:, 0], y_score[:, 0])
    roc_auc = auc(fpr, tpr)
    y_class = categorical_probas_to_classes(y_score)
    acc, precision, npv, sensitivity, specificity, mcc, f1 = calculate_performance(len(y_class), y_class, label[test])
    sepscores.append([acc, precision, npv, sensitivity, specificity, mcc, f1, roc_auc])

    print(f'Fold {fold_index + 1}: acc={acc},  sensitivity={sensitivity}, specificity={specificity}, mcc={mcc}, f1={f1}, roc_auc={roc_auc}')

scores = np.array(sepscores)

result1 = np.mean(scores, axis=0)
H1 = result1.tolist()
sepscores.append(H1)
result = sepscores




In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import scale
from sklearn.metrics import roc_curve, auc, average_precision_score, precision_recall_curve
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import tensorflow as tf  # Make sure to import TensorFlow for the squash function

# Define the path and folder name
path = "/content/drive/MyDrive/Watashara_Projects/NIA/"
# fld_name = 'Caps_all_feat'

# Define your custom squash function
def squash(x):
    norm = tf.norm(x, axis=-1, keepdims=True)
    squared_norm = tf.square(norm)
    scale = squared_norm / (1 + squared_norm) / norm
    return scale * x

# Load your test data
Xt = X_ind  # Ensure X_ind is defined in your environment
yt = y_ind  # Ensure y_ind is defined in your environment

# Reshape Xt to the appropriate shape for the model
[sample_num, input_dim] = np.shape(Xt)
Xt = Xt.reshape(-1, 1, input_dim, 1)  # Assign the reshaped array back to Xt

sepscores = []
ytest = np.ones((1, 2)) * 0.5
yscore = np.ones((1, 2)) * 0.5

for i in range(5):
    loaded_model = load_model(
        path + f'Results/{fld_name}/capsule_network_model_fold_{i + 1}.h5',
        custom_objects={'squash': squash}  # Register the custom function here
    )
    print("Loaded model from disk")

    # Evaluate loaded model on test data
    y_score = loaded_model.predict(Xt)
    y_class = categorical_probas_to_classes(y_score)

    y_test = to_categorical(yt)
    ytest = np.vstack((ytest, y_test))
    yscore = np.vstack((yscore, y_score))

    acc, precision, npv, sensitivity, specificity, mcc, f1 = calculate_performance(len(y_class), y_class, yt)

    fpr, tpr, _ = roc_curve(y_test[:, 1], y_score[:, 1])
    roc_auc = auc(fpr, tpr)  # Calculate ROC AUC

    # Calculate AUPR
    aupr = average_precision_score(y_test[:, 1], y_score[:, 1])

    # Calculate Precision-Recall AUC
    precision_vals, recall, _ = precision_recall_curve(y_test[:, 1], y_score[:, 1])
    pr_auc = auc(recall, precision_vals)

    sepscores.append([acc, sensitivity, specificity, mcc, f1, roc_auc, pr_auc])
    print(f'Fold {i + 1}: acc={acc:.6f}, sensitivity={sensitivity:.6f}, specificity={specificity:.6f}, '
          f'mcc={mcc:.6f}, f1={f1:.6f}, roc_auc={roc_auc:.6f}, pr_auc={pr_auc:.6f}')

# Prepare results for saving
row = ytest.shape[0]
ytest = ytest[np.array(range(1, row)), :]
ytest_sum = pd.DataFrame(data=ytest)
ytest_sum.to_csv(path + f'Results/{fld_name}/Caps_ytest_test.csv')

yscore_ = yscore[np.array(range(1, row)), :]
yscore_sum = pd.DataFrame(data=yscore_)
yscore_sum.to_csv(path + f'Results/{fld_name}/Caps_yscore_test.csv')

# Calculate mean results
scores = np.array(sepscores)
result1 = np.mean(scores, axis=0)
H1 = result1.tolist()
sepscores.append(H1)
result = sepscores

data_csv = pd.DataFrame(data=result)
data_csv.to_csv(path + f'Results/{fld_name}/Caps_allfeat_test_results.csv')




Loaded model from disk
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 69ms/step
Fold 1: acc=0.936416, sensitivity=0.957447, specificity=0.911392, mcc=0.872016, f1=0.942408, roc_auc=0.966402, pr_auc=0.960335




Loaded model from disk
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 72ms/step
Fold 2: acc=0.959538, sensitivity=0.946809, specificity=0.974684, mcc=0.919267, f1=0.962162, roc_auc=0.971115, pr_auc=0.960232




Loaded model from disk
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 116ms/step
Fold 3: acc=0.965318, sensitivity=0.978723, specificity=0.949367, mcc=0.930222, f1=0.968421, roc_auc=0.975761, pr_auc=0.971071




Loaded model from disk
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 116ms/step
Fold 4: acc=0.942197, sensitivity=0.936170, specificity=0.949367, mcc=0.883991, f1=0.946237, roc_auc=0.973270, pr_auc=0.962334




Loaded model from disk
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 69ms/step
Fold 5: acc=0.971098, sensitivity=0.978723, specificity=0.962025, mcc=0.941764, f1=0.973545, roc_auc=0.976569, pr_auc=0.971659
