In [None]:
import os
import pickle
import math
import numpy as np
import tensorflow as tf
from time import time
from sklearn import metrics
from tensorflow.keras import layers, Model
from tensorflow.keras.utils import Sequence

# =========================
#  GPU MEMORY CONFIGURATION
# =========================
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("Enabled memory growth on GPUs")
    except RuntimeError as e:
        print("Could not set GPU memory growth:", e)

# =========================
#  PARAMETERS
# =========================
NUM_DEPENDENT  = 7
MAXSEQ         = NUM_DEPENDENT * 2 + 1
NUM_FEATURE    = 1024
NUM_FILTER     = 128
NUM_HIDDEN     = 1000
BATCH_SIZE     = 32
WINDOW_SIZES   = [4, 6, 8, 10, 12]
NUM_CLASSES    = 2
EPOCHS         = 10

# Where to save your ROC curves
ROC_SAVE_DIR   = "C:/jupyter/Malik/hERG/Hussain/DNA/output/Roc"
os.makedirs(ROC_SAVE_DIR, exist_ok=True)

# =========================
#  DATA LOADING
# =========================
x_train = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/output/TR573_RAG.npy", allow_pickle=True)
y_train = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/output/TR573_label.npy", allow_pickle=True)
x_test  = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/output/TE129_RAG.npy", allow_pickle=True)
y_test  = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/output/TE129_label.npy", allow_pickle=True)

print(f"Training set: X={x_train.shape}, y={y_train.shape}")
print(f"   Testing set: X={x_test.shape},  y={y_test.shape}")

# =========================
#  DATA GENERATOR
# =========================
class DataGenerator(Sequence):
    def __init__(self, X, y, batch_size):
        self.X = X
        self.y = y
        self.bs = batch_size
        self.indexes = np.arange(len(X))

    def __len__(self):
        return math.ceil(len(self.X) / self.bs)

    def __getitem__(self, idx):
        batch_idxs = self.indexes[idx * self.bs:(idx + 1) * self.bs]
        return self.X[batch_idxs], self.y[batch_idxs]

# =========================
#  MODEL DEFINITION
# =========================
class DeepScan(Model):
    def __init__(self,
                 input_shape=(1, MAXSEQ, NUM_FEATURE),
                 window_sizes=WINDOW_SIZES,
                 num_filters=NUM_FILTER,
                 num_hidden=NUM_HIDDEN):
        super().__init__()

        # Depthwise Separable Convolution + Pooling for each window size
        self.convs = []
        self.pools = []
        for w in window_sizes:
            self.convs.append(layers.SeparableConv2D(
                filters=num_filters,
                kernel_size=(1, w),
                activation='relu',
                padding='valid'
            ))
            self.pools.append(layers.MaxPooling2D(
                pool_size=(1, MAXSEQ - w + 1),
                strides=(1, MAXSEQ - w + 1)
            ))

        self.flatten = layers.Flatten()
        self.dropout = layers.Dropout(0.7)
        self.dense1  = layers.Dense(num_hidden, activation='relu')
        self.dense2  = layers.Dense(NUM_CLASSES, activation='softmax',
                                    kernel_regularizer=tf.keras.regularizers.l2(1e-3))

    def call(self, x, training=False):
        features = []
        for conv, pool in zip(self.convs, self.pools):
            h = conv(x)
            h = pool(h)
            features.append(self.flatten(h))
        x = tf.concat(features, axis=1)
        x = self.dropout(x, training=training)
        x = self.dense1(x)
        return self.dense2(x)

# =========================
#  ROC SAVE & EVALUATION
# =========================
def save_roc(fpr, tpr, auc, model_name="DeepScan"):
    timestamp = int(time())
    filename = os.path.join(ROC_SAVE_DIR, f"{model_name}_{timestamp}.pkl")
    with open(filename, 'wb') as f:
        pickle.dump({"fpr": fpr, "tpr": tpr, "auc": auc}, f)
    print(f"Saved ROC data to: {filename}")

def model_test(model, X, y, model_name="DeepScan"):
    preds = model.predict(X, batch_size=BATCH_SIZE)
    fpr, tpr, thresholds = metrics.roc_curve(y[:, 1], preds[:, 1])
    auc_value = metrics.auc(fpr, tpr)
    save_roc(fpr, tpr, auc_value, model_name)

    gmeans = np.sqrt(tpr * (1 - fpr))
    ix = np.argmax(gmeans)
    best_thresh, best_g = thresholds[ix], gmeans[ix]

    y_pred = (preds[:, 1] >= best_thresh).astype(int)
    TN, FP, FN, TP = metrics.confusion_matrix(y[:, 1], y_pred).ravel()

    Sens  = TP / (TP + FN) if (TP + FN) > 0 else 0.0
    Spec  = TN / (TN + FP) if (TN + FP) > 0 else 0.0
    Acc   = (TP + TN) / (TP + TN + FP + FN)
    Prec  = TP / (TP + FP) if (TP + FP) > 0 else 0.0
    MCC   = metrics.matthews_corrcoef(y[:, 1], y_pred)
    F1    = metrics.f1_score(y[:, 1], y_pred)

    print(f"\n=== {model_name} Evaluation ===")
    print(f"Best thresh: {best_thresh:.4f} (G-Mean={best_g:.4f}), AUC={auc_value:.4f}")
    print(f"TP={TP}, FP={FP}, TN={TN}, FN={FN}")
    print(f"Sensitivity={Sens:.4f}, Specificity={Spec:.4f}")
    print(f"Accuracy={Acc:.4f}, Precision={Prec:.4f}, F1={F1:.4f}, MCC={MCC:.4f}\n")

    return {
        "TP": TP, "FP": FP, "TN": TN, "FN": FN,
        "Sensitivity": Sens, "Specificity": Spec,
        "Accuracy": Acc, "Precision": Prec,
        "F1": F1, "MCC": MCC, "AUC": auc_value
    }

# =========================
#  TRAIN & EVALUATE
# =========================
if __name__ == "__main__":
    train_gen = DataGenerator(x_train, y_train, BATCH_SIZE)

    model = DeepScan()
    model.build(input_shape=x_train.shape)
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    model.summary()

    model.fit(
        train_gen,
        epochs=EPOCHS,
        shuffle=True
    )

    # Free memory
    del x_train, y_train, train_gen
    import gc; gc.collect()

    results = model_test(model, x_test, y_test, model_name="DeepScan_RAG")


In [1]:
import os
import pickle
import math
import numpy as np
import tensorflow as tf
from time import time
from sklearn import metrics
from tensorflow.keras import layers, Model
from tensorflow.keras.utils import Sequence

# =========================
#  GPU MEMORY CONFIGURATION
# =========================
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("Enabled memory growth on GPUs")
    except RuntimeError as e:
        print("Could not set GPU memory growth:", e)

# =========================
#  PARAMETERS
# =========================
NUM_DEPENDENT  = 7
MAXSEQ         = NUM_DEPENDENT * 2 + 1
NUM_FEATURE    = 1024
NUM_FILTER     = 128
NUM_HIDDEN     = 1000
BATCH_SIZE     = 32
WINDOW_SIZES   = [4, 6, 8, 10, 12]
NUM_CLASSES    = 2
EPOCHS         = 10

ROC_SAVE_DIR   = "C:/jupyter/Malik/hERG/Hussain/DNA/output/Roc"
os.makedirs(ROC_SAVE_DIR, exist_ok=True)

# =========================
#  DATA LOADING
# =========================
x_train = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/TR495_data.npy", allow_pickle=True)
y_train = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/TR495_label.npy", allow_pickle=True)
x_test  = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/TE117_data.npy", allow_pickle=True)
y_test  = np.load("C:/jupyter/Malik/hERG/Hussain/DNA/TE117_label.npy", allow_pickle=True)

print(f"Training set: X={x_train.shape}, y={y_train.shape}")
print(f"Testing set: X={x_test.shape}, y={y_test.shape}")

# =========================
#  DATA GENERATOR
# =========================
class DataGenerator(Sequence):
    def __init__(self, X, y, batch_size):
        self.X = X
        self.y = y
        self.bs = batch_size
        self.indexes = np.arange(len(X))

    def __len__(self):
        return math.ceil(len(self.X) / self.bs)

    def __getitem__(self, idx):
        batch_idxs = self.indexes[idx * self.bs:(idx + 1) * self.bs]
        return self.X[batch_idxs], self.y[batch_idxs]

# =========================
#  STATE SPACE LAYER
# =========================
class StateSpaceLayer(tf.keras.layers.Layer):
    def __init__(self, units):
        super(StateSpaceLayer, self).__init__()
        self.gru = tf.keras.layers.GRU(units, return_sequences=False)

    def call(self, x):
        x = tf.squeeze(x, axis=1)  # from (B, 1, seq, feat) to (B, seq, feat)
        return self.gru(x)         # (B, units)

# =========================
#  DEEPSCAN + SSM MODEL
# =========================
class DeepScanSSM(Model):
    def __init__(self,
                 input_shape=(1, MAXSEQ, NUM_FEATURE),
                 window_sizes=WINDOW_SIZES,
                 num_filters=NUM_FILTER,
                 num_hidden=NUM_HIDDEN):
        super().__init__()

        self.convs = []
        self.pools = []
        for w in window_sizes:
            self.convs.append(layers.SeparableConv2D(
                filters=num_filters,
                kernel_size=(1, w),
                activation='relu',
                padding='valid'
            ))
            self.pools.append(layers.MaxPooling2D(
                pool_size=(1, MAXSEQ - w + 1),
                strides=(1, MAXSEQ - w + 1)
            ))

        self.flatten = layers.Flatten()
        self.ssm     = StateSpaceLayer(units=128)
        self.dropout = layers.Dropout(0.7)
        self.dense1  = layers.Dense(num_hidden, activation='relu')
        self.dense2  = layers.Dense(NUM_CLASSES, activation='softmax',
                                    kernel_regularizer=tf.keras.regularizers.l2(1e-3))

    def call(self, x, training=False):
        features = []
        for conv, pool in zip(self.convs, self.pools):
            h = conv(x)
            h = pool(h)
            features.append(self.flatten(h))
        cnn_output = tf.concat(features, axis=1)
        ssm_output = self.ssm(x)
        x = tf.concat([cnn_output, ssm_output], axis=1)
        x = self.dropout(x, training=training)
        x = self.dense1(x)
        return self.dense2(x)

# =========================
#  ROC SAVE & EVALUATION
# =========================
def save_roc(fpr, tpr, auc, model_name="DeepScan"):
    timestamp = int(time())
    filename = os.path.join(ROC_SAVE_DIR, f"{model_name}_{timestamp}.pkl")
    with open(filename, 'wb') as f:
        pickle.dump({"fpr": fpr, "tpr": tpr, "auc": auc}, f)
    print(f"Saved ROC data to: {filename}")

def model_test(model, X, y, model_name="DeepScan"):
    preds = model.predict(X, batch_size=BATCH_SIZE)
    fpr, tpr, thresholds = metrics.roc_curve(y[:, 1], preds[:, 1])
    auc_value = metrics.auc(fpr, tpr)
    save_roc(fpr, tpr, auc_value, model_name)

    gmeans = np.sqrt(tpr * (1 - fpr))
    ix = np.argmax(gmeans)
    best_thresh, best_g = thresholds[ix], gmeans[ix]

    y_pred = (preds[:, 1] >= best_thresh).astype(int)
    TN, FP, FN, TP = metrics.confusion_matrix(y[:, 1], y_pred).ravel()

    Sens  = TP / (TP + FN) if (TP + FN) > 0 else 0.0
    Spec  = TN / (TN + FP) if (TN + FP) > 0 else 0.0
    Acc   = (TP + TN) / (TP + TN + FP + FN)
    Prec  = TP / (TP + FP) if (TP + FP) > 0 else 0.0
    MCC   = metrics.matthews_corrcoef(y[:, 1], y_pred)
    F1    = metrics.f1_score(y[:, 1], y_pred)

    print(f"\n=== {model_name} Evaluation ===")
    print(f"Best thresh: {best_thresh:.4f} (G-Mean={best_g:.4f}), AUC={auc_value:.4f}")
    print(f"TP={TP}, FP={FP}, TN={TN}, FN={FN}")
    print(f"Sensitivity={Sens:.4f}, Specificity={Spec:.4f}")
    print(f"Accuracy={Acc:.4f}, Precision={Prec:.4f}, F1={F1:.4f}, MCC={MCC:.4f}\n")

    return {
        "TP": TP, "FP": FP, "TN": TN, "FN": FN,
        "Sensitivity": Sens, "Specificity": Spec,
        "Accuracy": Acc, "Precision": Prec,
        "F1": F1, "MCC": MCC, "AUC": auc_value
    }

# =========================
#  TRAIN & EVALUATE
# =========================
if __name__ == "__main__":
    train_gen = DataGenerator(x_train, y_train, BATCH_SIZE)

    model = DeepScanSSM()
    model.build(input_shape=x_train.shape)
    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    model.summary()

    model.fit(
        train_gen,
        epochs=EPOCHS,
        shuffle=True
    )

    # Free memory
    del x_train, y_train, train_gen
    import gc; gc.collect()

    results = model_test(model, x_test, y_test, model_name="DeepScanSSM")


Enabled memory growth on GPUs
Training set: X=(32071, 1, 15, 1024), y=(32071, 2)
Testing set: X=(4424, 1, 15, 1024), y=(4424, 2)
Model: "deep_scan_ssm"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 separable_conv2d (Separable  multiple                 135296    
 Conv2D)                                                         
                                                                 
 separable_conv2d_1 (Separab  multiple                 137344    
 leConv2D)                                                       
                                                                 
 separable_conv2d_2 (Separab  multiple                 139392    
 leConv2D)                                                       
                                                                 
 separable_conv2d_3 (Separab  multiple                 141440    
 leConv2D)                                              