# GCBLANE

In [None]:
from tensorflow.keras.layers import Input, Dense, Conv1D, BatchNormalization, LSTM, Flatten, MultiHeadAttention, MaxPooling1D, Add, PReLU, SpatialDropout1D, Bidirectional, ZeroPadding1D, Multiply, Attention, AdditiveAttention, Concatenate

from tensorflow.keras.models import Sequential, Model
import keras
# Input layers
input_layer_1 = Input(shape=(101, 4), name='Input_Layer_1')
input_layer_2 = Input(shape=(12, 16), name='Input_Layer_2')

input_gnnbilstm_1 = Bidirectional(LSTM(64, return_sequences=True), merge_mode="sum")(input_layer_2)
input_gnnlstm_1 = LSTM(16, dropout=0.1)(input_gnnbilstm_1)


# Convolutional Block 0
convolution_layer_1 = Conv1D(filters = 256,kernel_size=8,padding="same",name='Conv_0')(input_layer_1)
convolution_layer_1 = PReLU(name='PReLU_0')(convolution_layer_1)
convolution_layer_1 = SpatialDropout1D(0.01, name='SpatialDropout_0')(convolution_layer_1)
convolution_layer_1 = MaxPooling1D(pool_size=1, name='MaxPooling_0')(convolution_layer_1)
convolution_layer_1 = BatchNormalization(name='BatchNormalization_0')(convolution_layer_1)

# Convolutional Block 1
convolution_layer_2 = Conv1D(filters=128, kernel_size=4, padding="same", name='Conv_1')(convolution_layer_1)
convolution_layer_2 = PReLU(name='PReLU_1')(convolution_layer_2)
convolution_layer_2 = SpatialDropout1D(0.01, name='SpatialDropout_1')(convolution_layer_2)
convolution_layer_2 = MaxPooling1D(pool_size=1, name='MaxPooling_1')(convolution_layer_2)
convolution_layer_2 = BatchNormalization(name='BatchNormalization_1')(convolution_layer_2)

# Convolutional Block 2
convolution_layer_3 = Conv1D(filters=64, kernel_size=2, padding="same", name='Conv_2')(convolution_layer_2)
convolution_layer_3 = PReLU(name='PReLU_2')(convolution_layer_3)
convolution_layer_3 = SpatialDropout1D(0.01, name='SpatialDropout_2')(convolution_layer_3)
convolution_layer_3 = MaxPooling1D(pool_size=2, name='MaxPooling_2')(convolution_layer_3)
convolution_layer_3 = BatchNormalization(name='BatchNormalization_2')(convolution_layer_3)
# 50

# Convolutional Block 3
convolution_layer_4 = Conv1D(filters=64, kernel_size=2, padding="same", name='Conv_3')(convolution_layer_3)
convolution_layer_4 = PReLU(name='PReLU_3')(convolution_layer_4)
convolution_layer_4 = SpatialDropout1D(0.01, name='SpatialDropout_3')(convolution_layer_4)
convolution_layer_4 = MaxPooling1D(pool_size=2, name='MaxPooling_3')(convolution_layer_4)
convolution_layer_4 = BatchNormalization(name='BatchNormalization_3')(convolution_layer_4)
# 25

Query = Conv1D(filters=64, padding="same", kernel_size=8, name=f'Query')(convolution_layer_4)

# Multi-Head Attention
heads = 8
self_attention_layer, attention_scores = MultiHeadAttention(num_heads=heads,
                                                            key_dim=32,
                                                            value_dim=32,
                                                            name='MultiHeadAttention')(
                                                                query=Query,
                                                                key=convolution_layer_4,
                                                                value=convolution_layer_4,
                                                                return_attention_scores=True)

self_attention_layer = Multiply(name='SelfAttention_Multiplication')([self_attention_layer, convolution_layer_4])

# Recurrent Block
bilstm_layer = Bidirectional(LSTM(64, return_sequences=True), merge_mode="sum", name='Bidirectional_LSTM')(self_attention_layer)
lstm_layer = LSTM(64, dropout=0.1, name='LSTM')(bilstm_layer)

concatenated_features = Concatenate()([input_gnnlstm_1, lstm_layer])

# Output Block
output_layer = Dense(2, activation="softmax", name='finalOutput')(Flatten(name='flattenOutput')(concatenated_features))

# Model definition
GCBLANE = Model(inputs=[input_layer_1,input_layer_2], outputs=output_layer)

GCBLANE.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy',keras.metrics.AUC()])

GCBLANE.summary()

In [None]:
import keras
from keras import backend as K
@keras.saving.register_keras_serializable()
def check_units(y_true, y_pred):
    if y_pred.shape[1] != 1:
      y_pred = y_pred[:,1:2]
      y_true = y_true[:,1:2]
    return y_true, y_pred

@keras.saving.register_keras_serializable()
def precision(y_true, y_pred):
    y_true, y_pred = check_units(y_true, y_pred)
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

@keras.saving.register_keras_serializable()
def recall(y_true, y_pred):
    y_true, y_pred = check_units(y_true, y_pred)
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

@keras.saving.register_keras_serializable()
def f1(y_true, y_pred):
    prec = precision(y_true, y_pred)
    rec = recall(y_true, y_pred)
    return 2*((prec*rec)/(prec+rec+K.epsilon()))

In [None]:
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
import numpy as np
import keras

class TrainDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, seq_features_path, gnn_features_path, labels_path, batch_size):
        self.seq_features = np.load(seq_features_path)['train_sequences']
        self.gnn_features = np.load(gnn_features_path)['arr_0']
        self.labels = to_categorical(np.load(labels_path)['train_labels'],num_classes=2)
        self.batch_size = batch_size
        self.indexes = np.arange(len(self.seq_features))

    def __len__(self):
        return int(np.ceil(len(self.seq_features) / self.batch_size))

    def __getitem__(self, index):
        start = index * self.batch_size
        end = (index + 1) * self.batch_size
        batch_seq_features = self.seq_features[start:end]
        batch_gnn_features = self.gnn_features[start:end]
        batch_labels = self.labels[start:end]
        return (batch_seq_features,batch_gnn_features), batch_labels

    def on_epoch_end(self):
        np.random.shuffle(self.indexes)
        self.seq_features = self.seq_features[self.indexes]
        self.gnn_features = self.gnn_features[self.indexes]
        self.labels = self.labels[self.indexes]

class testDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, seq_features_path, gnn_features_path, labels_path, batch_size):
        self.seq_features = np.load(seq_features_path)['test_sequences']
        self.gnn_features = np.load(gnn_features_path)['arr_0']
        self.labels = to_categorical(np.load(labels_path)['test_labels'],num_classes=2)
        self.batch_size = batch_size
        self.indexes = np.arange(len(self.seq_features))

    def __len__(self):
        return int(np.ceil(len(self.seq_features) / self.batch_size))

    def __getitem__(self, index):
        start = index * self.batch_size
        end = (index + 1) * self.batch_size
        batch_seq_features = self.seq_features[start:end]
        batch_gnn_features = self.gnn_features[start:end]
        batch_labels = self.labels[start:end]
        return (batch_seq_features, batch_gnn_features), batch_labels

    def on_epoch_end(self):
        np.random.shuffle(self.indexes)
        self.seq_features = self.seq_features[self.indexes]
        self.gnn_features = self.gnn_features[self.indexes]
        self.labels = self.labels[self.indexes]
        
class validationDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, seq_features_path, gnn_features_path, labels_path, batch_size):
        self.seq_features = np.load(seq_features_path)['validation_sequences']
        self.gnn_features = np.load(gnn_features_path)['arr_0']
        self.labels = to_categorical(np.load(labels_path)['validation_labels'],num_classes=2)
        self.batch_size = batch_size
        self.indexes = np.arange(len(self.seq_features))

    def __len__(self):
        return int(np.ceil(len(self.seq_features) / self.batch_size))

    def __getitem__(self, index):
        start = index * self.batch_size
        end = (index + 1) * self.batch_size
        batch_seq_features = self.seq_features[start:end]
        batch_gnn_features = self.gnn_features[start:end]
        batch_labels = self.labels[start:end]
        return (batch_seq_features, batch_gnn_features), batch_labels

    def on_epoch_end(self):
        np.random.shuffle(self.indexes)
        self.seq_features = self.seq_features[self.indexes]
        self.gnn_features = self.gnn_features[self.indexes]
        self.labels = self.labels[self.indexes]

In [None]:
output_signature = (
    (
        tf.TensorSpec(shape=(None, 101, 4), dtype=tf.float32),
        tf.TensorSpec(shape=(None, 12, 16), dtype=tf.float32)
    ),
    tf.TensorSpec(shape=(None, 2), dtype=tf.float32)
)

In [None]:
import os
import tensorflow as tf
import keras
from keras.models import load_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.optimizers import Adam

gnn_train_dirs = sorted([f for f in os.listdir("/content/content/final_data/train/") if f.startswith("gnn") and f.endswith(".npz")])
seq_train_dirs = sorted([f for f in os.listdir("/content/content/final_data/train/") if f.startswith("seq") and f.endswith(".npz")])

gnn_test_dirs = sorted([f for f in os.listdir("/content/content/final_data/test/") if f.startswith("gnn") and f.endswith(".npz")])
seq_test_dirs = sorted([f for f in os.listdir("/content/content/final_data/test/") if f.startswith("seq") and f.endswith(".npz")])

metrics = {}

for i in range(690):
    gnn_train_dirs[i] = "/content/content/final_data/train/" + gnn_train_dirs[i]
    seq_train_dirs[i] = "/content/content/final_data/train/" + seq_train_dirs[i]
    gnn_test_dirs[i] = "/content/content/final_data/test/" + gnn_test_dirs[i]
    seq_test_dirs[i] = "/content/content/final_data/test/" + seq_test_dirs[i]

    batch_size = 128

    test_generator = testDataGenerator(seq_test_dirs[i], gnn_test_dirs[i], seq_test_dirs[i], batch_size)

    test_dataset = tf.data.Dataset.from_generator(
        lambda: test_generator,
        output_signature=output_signature
    ).repeat()

    train_generator = TrainDataGenerator(seq_train_dirs[i], gnn_train_dirs[i], seq_train_dirs[i], batch_size)

    train_dataset = tf.data.Dataset.from_generator(
        lambda: train_generator,
        output_signature=output_signature
    ).repeat()

    GCBLANE = load_model("GCBLANE.keras")

    GCBLANE.compile(optimizer=Adam(0.001),
                        loss='categorical_crossentropy',
                        metrics = [
                            keras.metrics.CategoricalAccuracy(name='accuracy'),
                             precision,
                             recall,
                            keras.metrics.AUC(name='PRAUC', curve='PR'),
                            keras.metrics.AUC(name='ROCAUC', curve='ROC'),
                             f1
                            ])

    checkpoint_callback = ModelCheckpoint(
        filepath='best_model.keras',
        monitor='val_ROCAUC',
        save_best_only=True,
        mode='max',
        verbose=0
    )

    early_stopping_callback = EarlyStopping(
        monitor='val_ROCAUC',
        patience=3,
        mode='max',
        verbose=0
    )


    # Fit the model with the callbacks
    hist = GCBLANE.fit(train_dataset,
                       steps_per_epoch=len(train_generator),
                       epochs=50,
                       validation_data=test_dataset,
                       validation_steps=len(test_generator),
                       callbacks=[checkpoint_callback, early_stopping_callback],
                       verbose = 1
                       )

    best_model = load_model('best_model.keras')
    metrics[i] = best_model.evaluate(test_dataset, steps=len(test_generator), verbose=1)
    print(metrics[i])

# GNN

In [None]:
import numpy as np

train_graphs = np.load('/content/train_graph_data.npz')
validation_graphs = np.load('/content/validation_graph_data.npz')
test_graphs = np.load('/content/test_graph_data.npz')

In [None]:
from spektral.data import Graph, Dataset
class DNADataset(Dataset):
    def __init__(self, graph_list):
        self.node_features = graph_list['node_features']
        self.adj = graph_list['adj']
        self.labels = graph_list['labels']
        super().__init__()

    def read(self):
        graphs = []
        for i in range(len(self.node_features)):
            graphs.append(Graph(x=self.node_features[i].astype(np.float16), a=self.adj[i].astype(np.float16), y = self.labels[i]))
        return graphs

In [None]:
train_dataset = DNADataset(train_graphs)
validation_dataset = DNADataset(validation_graphs)
test_dataset = DNADataset(test_graphs)

In [None]:
import tensorflow as tf
import keras
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.models import Model, Sequential
from spektral.data import BatchLoader, PackedBatchLoader
from spektral.layers import GCNConv, GlobalMaxPool, MinCutPool
from tensorflow.keras.utils import register_keras_serializable

@register_keras_serializable()
class GNNModel(Model):
    def __init__(self):
        super().__init__()

        self.conv1 = GCNConv(128, activation='relu')
        self.mincutpool1 = MinCutPool(40)
        self.conv2 = GCNConv(64, activation='relu')
        self.mincutpool2 = MinCutPool(12)
        self.conv3 = GCNConv(16, activation='relu')

        self.maxpool = GlobalMaxPool()
        self.dense1 = Dense(32, activation='relu')
        self.dense2 = Dense(1, activation='sigmoid')

    def call(self, inputs):
        x, a = inputs
        x = self.conv1([x, a])

        x, a = self.mincutpool1([x, a])
        x = self.conv2([x, a])

        x, a = self.mincutpool2([x, a])
        x = self.conv3([x, a])

        x = self.maxpool(x)

        x = self.dense1(x)
        x = self.dense2(x)

        return x

    def get_config(self):
        config = super().get_config().copy()
        return config

In [None]:
GNN_Model = GNNModel()

model_checkpoint = keras.callbacks.ModelCheckpoint(filepath='gnn_model.keras',
                                                      monitor='val_accuracy',
                                                      mode='max',
                                                      save_best_only=True)


GNN_Model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=tf.keras.losses.BinaryCrossentropy(),
    metrics=['accuracy']
)


In [None]:
train_loader = BatchLoader(train_dataset, batch_size=128)
validation_loader = BatchLoader(validation_dataset, batch_size=128)
test_loader = BatchLoader(test_dataset, batch_size=128)

GNN_Model.fit(train_loader.load(),
              steps_per_epoch=train_loader.steps_per_epoch,
              epochs=20,
              validation_data=validation_loader.load(),
              validation_steps=validation_loader.steps_per_epoch
              )

GNN_Model.evaluate(test_loader.load(),
                   steps=test_loader.steps_per_epoch)

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras.models import Model
from spektral.layers import GCNConv, GlobalMaxPool, MinCutPool
from tensorflow.keras.utils import register_keras_serializable

@register_keras_serializable()
class GNNModelPooled(Model):
    def __init__(self, model):
        super().__init__()

        self.conv1 = GCNConv(128, activation='relu')
        self.mincutpool1 = MinCutPool(40)
        self.conv2 = GCNConv(64, activation='relu')
        self.mincutpool2 = MinCutPool(12)
        self.conv3 = GCNConv(16, activation='relu')

    def call(self, inputs):
        x, a = inputs

        x = self.conv1([x, a])
        x, a = self.mincutpool1([x, a])
        x = self.conv2([x, a])
        x, a = self.mincutpool2([x, a])
        x = self.conv3([x, a])

        return x

    def get_config(self):
        config = super().get_config().copy()
        return config

    @classmethod
    def from_config(cls, config):
        return cls(**config)

In [None]:
GCBLANEGNN = GNNModelPooled(GNNModel())

GNN_Model = load_model("gnn.keras")

GCBLANEGNN.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss=tf.keras.losses.BinaryCrossentropy(),
    metrics=['accuracy']
)
try:
  GCBLANEGNN.fit(validation_loader.load(),
              steps_per_epoch=validation_loader.steps_per_epoch,
              epochs=1)
except:
  pass

GCBLANEGNN.layers[0].set_weights(GNN_Model.layers[0].get_weights())
GCBLANEGNN.layers[1].set_weights(GNN_Model.layers[1].get_weights())
GCBLANEGNN.layers[2].set_weights(GNN_Model.layers[2].get_weights())
GCBLANEGNN.layers[3].set_weights(GNN_Model.layers[3].get_weights())
GCBLANEGNN.layers[4].set_weights(GNN_Model.layers[4].get_weights())