In [1]:
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import csv
import json
import ast
from numpy import random
#import os
#from utils.config import CFG
from src.utils.dataset_utils import *

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional, BatchNormalization
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
from focal_loss import SparseCategoricalFocalLoss
import matplotlib.pyplot as plt
import seaborn as sns

np.random.seed(42)
tf.random.set_seed(42)

In [17]:
def create_embeddings_sequences(df, df_3grams):
    # dict with the embedding of each 3gram
    embedding_dict = {row['words']: row[1:].tolist() for _, row in df_3grams.iterrows()}
    # add the column with the list of the 3grams embeddings for each sequence
    df['sequence_embeddings'] = None
    for i, row in tqdm(df.iterrows(), total=len(df)):
        sequence_embedding = []
        protein_seq = row['sequence']

        # Create the list of 3grams embeddings of the protein sequence
        for j in range(len(protein_seq) - 2):
            trigram = protein_seq[j:j + 3]

            if trigram in embedding_dict:
                embedding = embedding_dict[trigram]
            else:
                embedding = embedding_dict['<unk>']

            sequence_embedding.append(embedding)

        df.at[i, 'sequence_embeddings'] = np.array(sequence_embedding)

    return df

In [50]:
def load_data(file_path_seq, file_path_meta, file_path_3grams):
    """
    Load protein data from CSV and prepare it for the LSTM model.

    Parameters:
    file_path (str): Path to the CSV file containing protein data

    Returns:
    X (list): List of 3-gram embedding sequences for each protein
    y (array): Array of encoded protein family labels
    label_encoder: The LabelEncoder used for converting family names to integers
    """
    df_final = rawdata_to_df(file_path_seq, file_path_meta)
    df_3grams = pd.read_csv(file_path_3grams, sep='\t')

    n = 5
    df_final = preprocess_data(df_final, n)
    top_families = list(df_final['label'].value_counts()[:n+1].index)
    top_families.remove('other')
    print(top_families)
    mask = df_final['label'].isin(top_families)
    print(df_final.head())
    df_topfamilies = df_final[mask]
    df_others = df_final[~mask]
    #df_others.reset_index(drop=True, inplace=True)
    print(df_topfamilies.shape, df_others.shape)
    df_others = df_others.sample(n=len(df_topfamilies), random_state=CFG['project']['seed'])

    df = pd.concat([df_topfamilies, df_others], axis=0)
    df.reset_index(drop=True, inplace=True)
    df = create_embeddings_sequences(df, df_3grams)

    # Process sequences and embeddings
    X = []  # Will contain list of embedding matrices (one per protein)
    y = []  # Will contain family labels

    for _, row in df.iterrows():
        embedding_matrix = row['sequence_embeddings']
        X.append(embedding_matrix)
        y.append(row['label'])

    # Encode the family labels
    label_encoder = LabelEncoder()
    y_encoded = label_encoder.fit_transform(y)

    print(f"Loaded {len(X)} protein sequences")
    print(f"Number of unique families: {len(label_encoder.classes_)}")
    print(f"Family distribution: {np.bincount(y_encoded)}")

    return X, y_encoded, label_encoder

In [51]:
# Function to pad sequences to the same length
def pad_embedding_sequences(X, max_length=None):
    """
    Pad sequences of embeddings to the same length.

    Parameters:
    X (list): List of embedding matrices for each protein
    max_length (int, optional): Maximum sequence length. If None, calculated from data.

    Returns:
    padded_X (array): Padded sequences
    """
    if max_length is None:
        # Calculate the sequence length at 95th percentile to avoid excessive padding
        lengths = [len(seq) for seq in X]
        max_length = int(np.percentile(lengths, 95))
        print(f"Using max_length of {max_length} (95th percentile of sequence lengths)")

    # Create a padding mask - sequences shorter than max_length will be padded with zeros
    # Sequences longer than max_length will be truncated
    padded_X = []
    for seq in tqdm(X, total=len(X)):
        # Trim
        if len(seq) > max_length:
            padded_X.append(seq[:max_length])
        else:
            # Pad with zeros
            padding = np.zeros((max_length - len(seq), seq.shape[1]))
            padded_X.append(np.vstack([seq, padding]))
    
    return np.array(padded_X)

In [52]:
# Function to build the LSTM model
def build_lstm_model(input_shape, num_classes):
    """
    Build and compile an LSTM model for protein classification.

    Parameters:
    input_shape (tuple): Shape of input data (sequence_length, embedding_dim)
    num_classes (int): Number of protein families to classify

    Returns:
    model: Compiled Keras model
    """
    model = Sequential([
        # Bidirectional LSTM layer to capture context from both directions
        Bidirectional(LSTM(64, return_sequences=True), input_shape=input_shape),
        BatchNormalization(),
        Dropout(0.3),

        # Second LSTM layer
        Bidirectional(LSTM(32)),
        BatchNormalization(),
        Dropout(0.3),
        # Dense layer
        #Dense(64, activation='relu'),
        #BatchNormalization(),
        #Dropout(0,3),
        # Output layer
        Dense(num_classes, activation='softmax')
    ])

    # Compile the model
    model.compile(
        optimizer= tf.keras.optimizers.Adam(learning_rate=0.0005),
        #loss='sparse_categorical_crossentropy',
        loss=SparseCategoricalFocalLoss(gamma=2.0),
        metrics=['accuracy']
    )

    return model



In [53]:
# Function to train the model with early stopping
def train_model(model, X_train, y_train, X_val, y_val, batch_size=32, epochs=50):
    """
    Train the LSTM model with early stopping.

    Parameters:
    model: Compiled Keras model
    X_train, y_train: Training data and labels
    X_val, y_val: Validation data and labels
    batch_size (int): Batch size for training
    epochs (int): Maximum number of epochs

    Returns:
    history: Training history
    """
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    )
    
    # Reduce learning rate when validation loss plateaus
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-7,
        verbose=1
    )
    
    # Model checkpoint to save best model
    model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
        'best_model.keras',
        monitor='val_loss',
        save_best_only=True,
        verbose=1
    )

    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        batch_size=batch_size,
        epochs=epochs,
        callbacks=[early_stopping, reduce_lr, model_checkpoint],
        verbose=1
    )

    return history


In [54]:
# Function to evaluate the model
def evaluate_model(model, X_test, y_test, label_encoder):
    """
    Evaluate the trained model on test data.

    Parameters:
    model: Trained Keras model
    X_test, y_test: Test data and labels
    label_encoder: LabelEncoder used for converting labels

    Returns:
    None (prints evaluation metrics and plots confusion matrix)
    """
    # Predict on test data
    y_pred_proba = model.predict(X_test)
    y_pred = np.argmax(y_pred_proba, axis=1)

    # Print classification report
    print("\nClassification Report:")
    print(classification_report(
        y_test, y_pred,
        target_names=label_encoder.classes_
    ))

    # Plot confusion matrix
    plt.figure(figsize=(10, 8))
    cm = confusion_matrix(y_test, y_pred)
    sns.heatmap(
        cm,
        annot=True,
        fmt='d',
        cmap='Blues',
        xticklabels=label_encoder.classes_,
        yticklabels=label_encoder.classes_
    )
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png')
    plt.close()

    # Plot accuracy and loss curves
    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='lower right')

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Validation'], loc='upper right')

    plt.tight_layout()
    plt.savefig('training_history.png')
    plt.close()

In [55]:
# File path to your dataset
#file_path = "..\\data\\list_protein_sequences_embeddings.csv"
file_path_seq = "..\\data\\family_classification_sequences.csv"
file_path_meta = "..\\data\\family_classification_metadata.xlsx"
file_path_3grams = "..\\data\\protVec_100d_3grams.csv"

# 1. Load and preprocess data
X, y, label_encoder = load_data(file_path_seq, file_path_meta, file_path_3grams)

Reading raw dataset files...


  warn(msg)


Preprocessing data...
['MMR_HSR1', 'ATP-synt_ab', '7tm_1', 'Helicase_C', 'tRNA-synt_1']
                                                 sequence  label
137318  AAAAAVSGAKRSLRAELKQRLRAISAEERLRCQRLLTQKVIAHRQY...  other
288090  AAAAGDSAASDLLGDNILRSEDPPMSIDLTFHMLRNMIHMAKMEGE...  other
251153                AAAAPGAAGGAQLPLGNRERKAGCKNFFWKTFSSC  other
7525    AAAATQAVPAPNQQPEVFYNQIFINNEWHDAVSKKTFPTVNPSTGE...  other
133640  AAAAVPRRGPRGGPGRSYTADAGYAPATPAAAGAAAGKATTEEQKL...  other
(7797, 2) (230207, 2)
Loaded 15594 protein sequences
Number of unique families: 6
Family distribution: [1639 1845 1287 1846 7797 1180]


In [None]:
# 2. Split the data into training, validation, and test sets
X_train_val, X_test, y_train_val, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y, shuffle=True
)

X_train, X_val, y_train, y_val = train_test_split(
    X_train_val, y_train_val, test_size=0.2, random_state=42, stratify=y_train_val, shuffle=True
)

In [None]:
# 3. Determine suitable max_length for padding
sequence_lengths = [len(seq) for seq in X_train]
max_length = int(np.percentile(sequence_lengths, 95))  # 95th percentile
print(f"Max sequence length (95th percentile): {max_length}")

In [None]:
# 4. Pad sequences
X_train_padded = pad_embedding_sequences(X_train, max_length)
X_val_padded = pad_embedding_sequences(X_val, max_length)
X_test_padded = pad_embedding_sequences(X_test, max_length)

print(f"Padded training data shape: {X_train_padded.shape}")

In [None]:
# 5. Build the model
embedding_dim = X_train[0].shape[1]  # Get embedding dimension from the data
num_classes = len(np.unique(y))

model = build_lstm_model(
    input_shape=(max_length, embedding_dim),
    num_classes=num_classes
)

model.summary()

In [None]:
# 6. Train the model
history = train_model(
    model, X_train_padded, y_train, X_val_padded, y_val,
    batch_size=32,
    epochs=50
)

In [None]:
# 7. Evaluate the model
model.load_weights("best_model.h5")
evaluate_model(model, X_test_padded, y_test, label_encoder)

# 8. Save the model
model.save('protein_family_classifier_other.h5')
print("Model saved as 'protein_family_classifier_other.h5'")

In [22]:
#dropout
#dense
#l2 reg

array([[-0.104897,  0.014198, -0.053189, ...,  0.065978, -0.132549,
         0.165701],
       [-0.101278,  0.080609, -0.126381, ...,  0.008599,  0.066099,
         0.112817],
       [-0.132686,  0.073024, -0.226585, ..., -0.019128,  0.122064,
         0.096697],
       ...,
       [ 0.      ,  0.      ,  0.      , ...,  0.      ,  0.      ,
         0.      ],
       [ 0.      ,  0.      ,  0.      , ...,  0.      ,  0.      ,
         0.      ],
       [ 0.      ,  0.      ,  0.      , ...,  0.      ,  0.      ,
         0.      ]])