<a href="https://colab.research.google.com/github/quang-m-nguyen/DeepPGD/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [42]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [43]:
import time

import numpy as np
import pandas as pd
import tensorflow as tf
from keras.callbacks import Callback

# from keras_self_attention import SeqSelfAttention
# from keras.utils import to_categorical
from keras.layers import (
    Bidirectional,
    Conv1D,
    Dense,
    Dropout,
    Embedding,
    Flatten,
    Input,
    LayerNormalization,
    concatenate,
    LSTM
)
from keras.models import Model
from tensorflow.keras.preprocessing.text import Tokenizer
from sklearn.metrics import matthews_corrcoef, roc_auc_score
from tensorflow.keras import initializers, layers
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [44]:
# global x_test, y_test, x_train, y_train
# global best_acc
# global accuracy_best_list

# # make all of these variable global
# # global x_test, y_test, x_train, y_train
# # global best_acc
# # global accuracy_best_list



In [45]:
def prepare_training_data():
    train_filename = '/content/drive/MyDrive/deepPGD/4mC/4mC_C.equisetifolia/train.tsv'
    test_filename = '/content/drive/MyDrive/deepPGD/4mC/4mC_F.vesca/test.tsv'

    x_test = np.array([])
    y_test = np.array([])
    x_train = np.array([])
    y_train = np.array([])

    test_labels = []
    three_er_list = []

    K_MER = 3


    train_data = pd.read_csv(train_filename,header = None, sep = "\t")
    test_data  = pd.read_csv(test_filename,header = None, sep = "\t")

    pro_x_train = train_data[2][1:]
    y_train =  train_data[1][:]
    pos_train_len = len(pro_x_train)

    pro_x_test =  test_data[2][1:]
    y_test = test_data[1][:]

    pro_x_data  = pd.concat([pro_x_train,pro_x_test],ignore_index= True )
    pro_y_data  = pd.concat([y_train,y_test],ignore_index= True )


    for i in range(1, len(pro_y_data)):
      if(pro_y_data[i] == "1"):
          test_labels.append([1,0])
      elif(pro_y_data[i] == "0"):
          test_labels.append([0,1])
      else:
          continue

    # K-mer Encoding for DNA Sequences
    #
    # Purpose:
    # - Transform variable-length DNA sequences into fixed-length feature representations
    # - Capture local sequence patterns that may be relevant to DNA methylation sites
    # - Create a suitable input format for machine learning models
    #
    # Functionality:
    # 1. Set k-mer size (K=3 in this case)
    # 2. For each DNA sequence:
    #    a. Convert to string and remove any extra characters
    #    b. Generate all possible k-mers (substrings of length K)
    #    c. Store k-mers for each sequence in a list
    # 3. Collect k-mer lists for all sequences in str_array
    #
    # Benefits:
    # - Captures local sequence context
    # - Provides fixed-length representation for variable-length sequences
    # - Reduces sequence complexity while retaining important features
    # - Facilitates efficient sequence comparison and analysis
    # - Improves feature extraction for machine learning models
    #
    # Example:
    # Input DNA sequence: "ATCGATCG"
    # Resulting k-mers (K=3): ["ATC", "TCG", "CGA", "GAT", "ATC", "TCG"]
    #
    # Data structure:
    # str_array = [
    #     ["ATC", "TCG", "CGA", "GAT", "ATC", "TCG"],
    # ]

    for i in pro_x_data:
        seq_str = str(i)
        seq_str = seq_str.strip('[]\'')
        t=0
        l=[]
        for index in range(len(seq_str)):
            t=seq_str[index:index+K_MER]
            if (len(t))==K_MER:
                l.append(t)
        three_er_list.append(l)



    # DNA Sequence Preprocessing
    # Purpose: Turn DNA chunks into number lists for machine learning
    # Steps:
    # 1. Assign a unique number to each DNA chunk (up to 30,000 most common chunks)
    # 2. Convert each DNA sequence to a list of these numbers tokens
    # 3. Make all lists the same length (48) by adding zeros at the end if needed

    # Example:
    # Input DNA sequences:
    #   ["ATCG", "CGTA", "ATCGATCG"]
    #
    # After assigning numbers:
    #   ATCG -> 1, CGTA -> 2, GATC -> 3
    #
    # Converted to number lists:
    #   [1, 2]
    #   [2, 1]
    #   [1, 3, 1]
    #
    # Final output (padded to length 48):
    #   [1, 2, 0, 0, ..., 0]  (46 zeros)
    #   [2, 1, 0, 0, ..., 0]  (46 zeros)
    #   [1, 3, 1, 0, ..., 0]  (45 zeros)

    tokenizer = Tokenizer(num_words = 30000)
    tokenizer.fit_on_texts(three_er_list)
    sequences = tokenizer.texts_to_sequences(three_er_list)
    sequences = pad_sequences(sequences, maxlen = 48, padding = "post")
    sequences = np.array(sequences)


    x_train,x_test = sequences[:pos_train_len],sequences[pos_train_len:]
    # print(x_train)

    y_train,y_test = test_labels[:pos_train_len],test_labels[pos_train_len:]

    return x_train, x_test, y_train, y_test





In [46]:
def create_masked_data(x_train, mask_percentage):
    """
    Create a masked version of the input data.

    Args:
    x_train (numpy.ndarray): Input data to be masked.
    mask_percentage (float): Percentage of data to be masked, between 0 and 1.

    Returns:
    numpy.ndarray: Masked version of the input data.
    """
    # Validate mask_percentage range
    if mask_percentage < 0.0 or mask_percentage > 1.0:
        raise ValueError("mask_percentage must be between 0 and 1.")

    # Create a boolean mask: True with probability mask_percentage
    mask = np.random.random(x_train.shape) < mask_percentage

    # Apply the mask: keep original values where mask is False, set to 0 where mask is True
    masked_data = np.where(mask, 0.0, x_train)

    return np.array(masked_data)

In [47]:
def make_model_architecture(masked_data):
    t = time.time()
    my_time = int(round(t * 1000)) % 2147483648
    np.random.seed(my_time)

    sequence_input = Input(shape=(48,))
    embedding_layer = Embedding(30000, 16)
    embedded_sequences = embedding_layer(sequence_input)


    stem = LayerNormalization(epsilon=1e-6)(embedded_sequences)
    stem = Dropout(0.2)(stem)

    MLP_1 = Dense(
        int(16 * 2.0),
        name="Dense_0",
        kernel_initializer=initializers.GlorotUniform(),
        bias_initializer=initializers.RandomNormal(stddev=1e-6),
    )(stem)
    Activation_1 = layers.Activation("gelu")(MLP_1)
    Activation_1 = LayerNormalization(epsilon=1e-6)(Activation_1)
    Dropout_2 = Dropout(0.2)(Activation_1)

    MLP_2 = Dense(
        int(16 * 1.0),
        name="Dense_1",
        kernel_initializer=initializers.GlorotUniform(),
        bias_initializer=initializers.RandomNormal(stddev=1e-6),
    )(Dropout_2)
    MLP_2 = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=16)(MLP_2, stem)
    stem = stem + MLP_2
    stem = Dropout(0.2)(stem)

    lstm = Bidirectional(LSTM(16, return_sequences=True))(stem)
    lstm = layers.Activation("gelu")(lstm)
    lstm = LayerNormalization(epsilon=1e-6)(lstm)
    lstm = Dropout(0.5)(lstm)

    lstm1 = Bidirectional(LSTM(16, return_sequences=True))(lstm)
    lstm1 = layers.Activation("gelu")(lstm1)
    lstm1 = LayerNormalization(epsilon=1e-6)(lstm1)
    lstm1 = Dropout(0.2)(lstm1)
    lstm1 = lstm1 + lstm

    lstm2 = Bidirectional(LSTM(16, return_sequences=True))(lstm1)
    lstm2 = layers.Activation("gelu")(lstm2)
    lstm2 = LayerNormalization(epsilon=1e-6)(lstm2)
    lstm2 = Dropout(0.2)(lstm2)

    lstm2 = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=16)(lstm2, lstm)
    lstm2 = lstm2 + lstm

    cnn1_24 = Conv1D(
        filters=32, kernel_size=9, strides=1, activation="gelu", padding="causal"
    )(stem)
    cnn1_16 = Conv1D(
        filters=64, kernel_size=8, strides=1, activation="gelu", padding="causal"
    )(stem)
    cnn1_10 = Conv1D(
        filters=32, kernel_size=5, strides=1, activation="gelu", padding="causal"
    )(stem)
    merge_fla_1 = concatenate([cnn1_24, cnn1_16, cnn1_10], axis=2)
    merge_fla_1 = LayerNormalization(epsilon=1e-6)(merge_fla_1)
    merge_fla_1 = Dropout(0.2)(merge_fla_1)

    cnn2_24 = Conv1D(
        filters=32, kernel_size=9, strides=1, activation="gelu", padding="causal"
    )(merge_fla_1)
    cnn2_16 = Conv1D(
        filters=64, kernel_size=8, strides=1, activation="gelu", padding="causal"
    )(merge_fla_1)
    cnn2_10 = Conv1D(
        filters=32, kernel_size=5, strides=1, activation="gelu", padding="causal"
    )(merge_fla_1)

    merge_fla_2 = concatenate([cnn2_24, cnn2_16, cnn2_10], axis=2)
    merge_fla_2 = merge_fla_2 + merge_fla_1
    merge_fla_2 = LayerNormalization(epsilon=1e-6)(merge_fla_2)
    merge_fla_2 = Dropout(0.2)(merge_fla_2)

    cnn3_24 = Conv1D(
        filters=32, kernel_size=9, strides=1, activation="gelu", padding="causal"
    )(merge_fla_2)
    cnn3_16 = Conv1D(
        filters=64, kernel_size=8, strides=1, activation="gelu", padding="causal"
    )(merge_fla_2)
    cnn3_10 = Conv1D(
        filters=32, kernel_size=5, strides=1, activation="gelu", padding="causal"
    )(merge_fla_2)

    merge_fla_3 = concatenate([cnn3_24, cnn3_16, cnn3_10], axis=2)
    merge_fla_3 = merge_fla_2 + merge_fla_3
    merge_fla_3 = LayerNormalization(epsilon=1e-6)(merge_fla_3)
    merge_fla_3 = Dropout(0.2)(merge_fla_3)

    cnn4_24 = Conv1D(
        filters=32, kernel_size=9, strides=1, activation="gelu", padding="causal"
    )(merge_fla_3)
    cnn4_16 = Conv1D(
        filters=64, kernel_size=8, strides=1, activation="gelu", padding="causal"
    )(merge_fla_3)
    cnn4_10 = Conv1D(
        filters=32, kernel_size=5, strides=1, activation="gelu", padding="causal"
    )(merge_fla_3)

    merge_fla_4 = concatenate([cnn4_24, cnn4_16, cnn4_10], axis=2)
    merge_fla_4 = tf.keras.layers.MultiHeadAttention(num_heads=4, key_dim=16)(
        merge_fla_1, merge_fla_4
    )
    merge_fla_4 = merge_fla_1 + merge_fla_4
    merge_fla_4 = LayerNormalization(epsilon=1e-6)(merge_fla_4)
    merge_fla_4 = Dropout(0.2)(merge_fla_4)
    lstm = concatenate([merge_fla_4, lstm2])

    lstm = Flatten()(lstm)
    # merge_fla = Dropout(0.2)(lstm)

    merge = Dense(200, activation="sigmoid")(lstm)
    merge = Dropout(0.5)(merge)

    preds = Dense(2, activation="softmax")(merge)
    model = Model(sequence_input, preds)

    model.summary()

    model.compile(
        loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )

    return model


In [48]:
class AUCMCCCallback(Callback):
    def __init__(self, validation_data):
        super().__init__()
        self.validation_data = validation_data
        self.last_epoch_logs = {}
        self.best_val_accuracy = 0.0

    def on_epoch_end(self, epoch, logs={}):
        self.last_epoch_logs = logs.copy()

    def on_epoch_begin(self, epoch, logs={}):
        if epoch < 1:
            return

        x_val, y_val = self.validation_data
        y_pred = self.model.predict(x_val)

        # Calculate metrics using TensorFlow operations
        auc = roc_auc_score(y_val, y_pred)

        # Convert predictions and labels to binary for MCC
        y_pred_binary = tf.argmax(y_pred, axis=1)
        y_val_binary = tf.argmax(y_val, axis=1)
        mcc = matthews_corrcoef(y_val_binary.numpy(), y_pred_binary.numpy())  # MCC needs NumPy arrays

        # Calculate accuracy with TensorFlow
        accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred_binary, y_val_binary), tf.float32))

        print(f"\nValidation AUC: {auc:.4f} - MCC: {mcc:.4f} - ACC: {accuracy.numpy():.4f}")

        # Track and print best validation accuracy
        val_accuracy = self.last_epoch_logs.get("val_accuracy", 0)
        if val_accuracy > self.best_val_accuracy:
            self.best_val_accuracy = val_accuracy
        print("epoch[", epoch, "].val_accuracy:", val_accuracy)
        print("epoch[", epoch, "].best_accuracy:", self.best_val_accuracy)

In [49]:
# Usage
x_train, x_test, y_train, y_test = prepare_training_data()
# Convert NumPy arrays to TensorFlow Tensors
x_train = tf.convert_to_tensor(x_train)
x_test = tf.convert_to_tensor(x_test)
y_train = tf.convert_to_tensor(y_train)
y_test = tf.convert_to_tensor(y_test)
# print(x_train)
mask_percentage = 0.15  # Mask 15% of the data
x_train = create_masked_data(x_train, mask_percentage)
model = make_model_architecture(x_train)

auc_mcc_callback = AUCMCCCallback(validation_data=(x_test, y_test))
model.fit(
    x_train,
    y_train,
    batch_size=30,
    epochs=100,
    validation_data=(x_test, y_test),
    callbacks=[auc_mcc_callback],
)

# accuracy_best_list[number] = best_acc

# for i in accuracy_best_list.keys():
#     print("best_acc[", i, "] = ", accuracy_best_list[i])
# avg = float(sum(accuracy_best_list.values())) / len(accuracy_best_list)
# print()
# print("best_acc[avg] = ", avg)

Epoch 1/100
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m84s[0m 4s/step - accuracy: 0.5460 - loss: 0.9688 - val_accuracy: 0.6119 - val_loss: 0.7967
[1m494/494[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 97ms/step

Validation AUC: 0.6587 - MCC: 0.2296 - ACC: 0.6119
epoch[ 1 ].val_accuracy: 0.6119270920753479
epoch[ 1 ].best_accuracy: 0.6119270920753479
Epoch 2/100
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m125s[0m 7s/step - accuracy: 0.7828 - loss: 0.4610 - val_accuracy: 0.6317 - val_loss: 0.8394
[1m475/494[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m1s[0m 91ms/step

KeyboardInterrupt: 

In [None]:
# # Draw architecture

# from tensorflow.keras.utils import plot_model

# plot_model(
#     model,
#     to_file='model_plot.png',         # Save the plot to a file
#     show_shapes=True,                 # Show input and output shapes
#     show_layer_names=True,            # Show layer names
#     dpi=96,                           # Set the resolution of the plot
# )
