In [1]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split
import time
import os
from icecream import ic

ModuleNotFoundError: No module named 'numpy'

In [9]:
def read_fasta(filename):
    sequences = []
    current_seq = []
    with open(filename, 'r') as f:
        for line in f:
            line = line.strip()
            if line.startswith('>'):
                # If there's a current sequence, save it and reset
                if current_seq:
                    sequences.append(''.join(current_seq))
                    current_seq = []
            else:
                current_seq.append(line)
        # Append the last sequence if present
        if current_seq:
            sequences.append(''.join(current_seq))
    return sequences

# Example usage:
fasta_file = '/mnt/storageG1/lwang/Projects/TB-AMP-design/Benchmarks-Sidorczuk2022/train_AMP_3268.fasta'
seq_list = read_fasta(fasta_file)
df = pd.DataFrame(seq_list, columns=['Sequences'])

print(seq_list)

max_length = max(len(seq) for seq in seq_list)
print(f"The maximum length of the sequences is: {max_length}")


['AACSDRAHGHICESFKSFCKDSGRNGVKLRANCKKTCGLC', 'AAEFPDFYDSEEQMGPHQEAEDEKDRADQRVLTEEEKKELENLAAMDLELQKIAEKFSQR', 'AAFFAQQKGLPTQQQNQVSPKAVSMIVNLEGCVRNPYKCPADVWTNGVGNTHNVDKTKILTIDEVATDLRRNIKEAENCINTYFNGEKMNQGQYDAMVSLAFNVGCGNIKTYYSKTQGKRVATTIYRAAQAENWILMCNRIEDFNKSGGRVLKGLQNRRAKEKALCLGE', 'AAFRGCWTKNYSPKPCL', 'AAGMGFFGAR', 'AAGNPSETGGAVATYSTAVGSFLDGTVKVVATGGASRVPGNCGTAAVLECDNPESFDGTRAWGDLSADQGTGEDAPPETASLIFAVN', 'AAKNKKEKGKKGASDCTEWTWGSCIPNSKDCGAGTREGTCKEETRKLKCKIPCNWKKAFGADCKYKFENWGECNATTGQKVRSGTLKKALYNADCQQTVEATKPCSLKTKSKSKGKKGKGKE', 'AAKNKKEKGKKGASDCTEWTWGSCIPNSKDCGAGTREGTCKEETRKLKCKIPCNWKKDFGADCKYKFENWGECNATTGQKVRSGTLKKALYNADCQQTVEAAKPCSLKTKSKSKGKKGKGKE', 'AAKPMGITCDLLSLWKVGHAACAAHCLVLGDVGGYCTKEGLCVCKE', 'AALKGCWTKSIPPKPCFGKR', 'AALKGCWTKSIPPKPCSGKR', 'AALRGCWTKSIPPKPCPGKR', 'AALRGCWTKSIPPKPCSGKR', 'AANFGPSVFTPEVHETWQKFLNVVVAALGKQYH', 'AAPRGGKGFFCKLFKDC', 'AATAKKGAKKADAPAKPKKATKPKSPKKAAKKAGAKKGVKRAGKKGAKKTTKAKK', 'ACAAHCLLRGNRGGYCNGKG', 'ACDTATCVTHRLAGLLSRSGGVVKNNFVPTNVGSKAF', 'ACHAHCQSVGR

In [100]:
unique_letters = set(''.join(df['Sequences']))
print(len(unique_letters))

20


In [110]:


# Define One-Hot Encoding Function for Sequences in PyTorch
def one_hot_torch(seq: str, dtype=torch.float32):
    amino_acids = "ACDEFGHIKLMNPQRSTVWY"  # 20 standard amino acids
    seq_bytes = torch.ByteTensor(list(bytes(seq, "utf-8")))
    aa_bytes = torch.ByteTensor(list(bytes(amino_acids, "utf-8")))
    arr = torch.zeros(len(amino_acids), len(seq_bytes), dtype=dtype)  # One-hot encoded matrix
    for i, aa in enumerate(aa_bytes):
        arr[i, seq_bytes == aa] = 1
    return arr

def ordinal_encoding_torch(seq: str, dtype=torch.long):
    amino_acids = "ACDEFGHIKLMNPQRSTVWY"  # 20 standard amino acids
    aa_to_index = {aa: i for i, aa in enumerate(amino_acids)}
    seq_indices = torch.tensor([aa_to_index[aa] for aa in seq], dtype=dtype)
    return seq_indices

def preprocess_sequence(full_sequence, max_length):
    """
    Prepares input and target sequences while avoiding padding ('X') during training.
    
    Args:
        full_sequence (str): The original sequence including 'X' padding.
        max_length (int): Maximum sequence length (for padding).
    
    Returns:
        Tuple of (input sequence, target sequence) after removing padding.
    """
    # Remove trailing 'X' (padding) before slicing
    trimmed_sequence = full_sequence.rstrip('X')

    # Ensure there's at least 2 valid residues left for training
    if len(trimmed_sequence) < 2:
        return None, None  # Skip sequences that are too short to predict anything

    # Create input & target sequences (without padding)
    input_seq = trimmed_sequence[:-1]  # Exclude last valid residue
    target_seq = trimmed_sequence[-1]  # Shifted by one position

    # Pad back to maintain fixed-length format
    input_seq = input_seq.ljust(max_length - 1, 'X')  # Pad to max_length - 1
    # target_seq = target_seq.ljust(max_length - 1, 'X')  # Target also gets 'X' padding

    return input_seq, target_seq

# Custom Dataset for Generative Model
class GenerativeSequenceDataset(Dataset):
    def __init__(self, sequences, max_length, one_hot_dtype=torch.float32):
        self.sequences = sequences
        self.max_length = max_length
        self.one_hot_dtype = one_hot_dtype

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        full_sequence = self.sequences.iloc[idx]  # Get sequence

        # Process input & target while handling padding correctly
        input_seq, target_seq = preprocess_sequence(full_sequence, self.max_length)
        # print(input_seq, target_seq)
        if input_seq is None or target_seq is None:
            return None  # Skip sequences that are too short

        # Convert to one-hot encoding
        input_one_hot = one_hot_torch(input_seq, dtype=self.one_hot_dtype)
        target_one_hot = one_hot_torch(target_seq, dtype=self.one_hot_dtype)
        # target_one_hot = ordinal_encoding_torch(target_seq, dtype=torch.long)

        return input_one_hot, target_one_hot
    

# Split into train (70%), validation (15%), test (15%)
X_train_val, X_test = train_test_split(df["Sequences"], test_size=0.15, random_state=42)
X_train, X_val = train_test_split(X_train_val, test_size=0.15, random_state=42)  

# Convert to PyTorch Dataset
train_dataset = GenerativeSequenceDataset(X_train, max_length)
val_dataset = GenerativeSequenceDataset(X_val, max_length)
test_dataset = GenerativeSequenceDataset(X_test, max_length)

# Define DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)

# Display dataset sizes
dataset_sizes = {
    "Train": len(train_dataset),
    "Validation": len(val_dataset),
    "Test": len(test_dataset)
}
print(dataset_sizes)

{'Train': 2360, 'Validation': 417, 'Test': 491}


In [None]:
for x, y in train_loader:
    print(x, y) 
    break

## model

In [134]:


class LSTM_Generative(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout_rate=0.2):
        """
        LSTM Generative Model with:
         - Two LSTM layers (num_layers=2) with total hidden size of 512.
         - Dropout applied to LSTM outputs.
         - An intermediate fully connected layer with LeakyReLU activation.
         - A final fully connected layer with Sigmoid activation.
        
        Args:
            input_size (int): Dimension of input features (e.g., 20 for one-hot encoded amino acids)
            hidden_size (int): Hidden state size for LSTM (set to 512)
            num_layers (int): Number of LSTM layers (2)
            output_size (int): Dimension of output (e.g., number of classes; for next-residue prediction, typically 20)
            dropout_rate (float): Dropout probability (default 0.2)
        """
        super(LSTM_Generative, self).__init__()
        # LSTM layers (using dropout in between layers)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, 
                            batch_first=True, dropout=dropout_rate)
        # Dropout layer applied to LSTM output before dense mapping
        self.dropout = nn.Dropout(dropout_rate)
        
        # First dense layer with Leaky ReLU activation
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.leaky_relu = nn.LeakyReLU(negative_slope=0.01)
        
        # Final dense layer with Sigmoid activation for output probabilities
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # ic(x.shape)  # For debugging
        # x shape: (batch_size, seq_length, input_size)
        lstm_out, _ = self.lstm(x)
        # lstm_out shape: (batch_size, seq_length, hidden_size)
        # For generation, typically use the output from the last timestep
        last_step = lstm_out[:, -1, :]  # (batch_size, hidden_size)
        last_step = self.dropout(last_step)
        # ic(last_step.shape)
        x = self.fc1(last_step)
        x = self.leaky_relu(x)
        x = self.fc2(x)
        # ic(x.shape)
        # x = self.sigmoid(x)
        return x  # Returns probabilities in range [0,1]



In [137]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.tensorboard import SummaryWriter

def train_lstm(model, train_loader, val_loader, criterion, optimizer, tensorboard_log_dir, num_epochs=25):
    writer = SummaryWriter(tensorboard_log_dir)

    for epoch in range(num_epochs):
        model.train()
        train_losses = []

        for input_seq, target_residue in train_loader:
            optimizer.zero_grad()
            outputs = model(input_seq)  # Forward pass

            # Ensure outputs are shaped correctly
            ic(outputs.shape)  # Expected: (batch_size, num_classes)
            # ic(target_residue.shape)  # Expected: (batch_size,)
            # Ensure target_residue is a 1D tensor (batch_size,)
            # Ensure target_residue contains class indices instead of one-hot encoding
            # ic(target_residue)

            target_residue = torch.argmax(target_residue, dim=1)  # Converts (batch_size, 20) → (batch_size,)
            target_residue = target_residue.squeeze()
            # Compute loss (CrossEntropyLoss expects class indices as targets)
            # ic(target_residue.shape)  # Expected: (batch_size,)
            # ic(target_residue)
            loss = criterion(outputs, target_residue)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())

        avg_train_loss = np.mean(train_losses)
        writer.add_scalar('Loss/Train', avg_train_loss, epoch)

        # Validation Step
        model.eval()
        val_losses = []
        with torch.no_grad():
            for input_seq, target_residue in val_loader:
                target_residue = torch.argmax(target_residue, dim=1)  # Converts (batch_size, 20) → (batch_size,)
                target_residue = target_residue.squeeze()
                outputs = model(input_seq)
                loss = criterion(outputs, target_residue)
                val_losses.append(loss.item())

        avg_val_loss = np.mean(val_losses)
        writer.add_scalar('Loss/Val', avg_val_loss, epoch)

        if epoch % 5 == 0:
            print(f'Epoch {epoch+1}/{num_epochs}, TrainLoss: {avg_train_loss:.4f}, ValLoss: {avg_val_loss:.4f}')

    writer.close()


In [189]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.tensorboard import SummaryWriter
from torch.optim.lr_scheduler import ReduceLROnPlateau
from icecream import ic

def train_lstm(model, train_loader, val_loader, criterion, optimizer, tensorboard_log_dir, num_epochs=25):
    writer = SummaryWriter(tensorboard_log_dir)
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

    for epoch in range(num_epochs):
        model.train()
        train_losses = []
        for input_seq, target_residue in train_loader:
            optimizer.zero_grad()
            outputs = model(input_seq)  # Forward pass

            # Convert one-hot target to class indices
            target_residue = torch.argmax(target_residue, dim=1)  # (batch_size, 20) → (batch_size,)
            target_residue = target_residue.squeeze()
            ic(outputs.shape)  # Expected: (batch_size, num_classes)
            ic(target_residue.shape)  # Expected: (batch_size,)

            loss = criterion(outputs, target_residue)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())

        avg_train_loss = np.mean(train_losses)
        writer.add_scalar('Loss/Train', avg_train_loss, epoch)

        # Validation Step
        model.eval()
        val_losses = []
        with torch.no_grad():
            for input_seq, target_residue in val_loader:
                target_residue = torch.argmax(target_residue, dim=1)  # Converts (batch_size, 20) → (batch_size,)
                target_residue = target_residue.squeeze()
                outputs = model(input_seq)
                loss = criterion(outputs, target_residue)
                val_losses.append(loss.item())

        avg_val_loss = np.mean(val_losses)
        writer.add_scalar('Loss/Val', avg_val_loss, epoch)
        
        # Step the scheduler based on validation loss
        scheduler.step(avg_val_loss)

        if epoch % 5 == 0:
            print(f'Epoch {epoch+1}/{num_epochs}, TrainLoss: {avg_train_loss:.4f}, ValLoss: {avg_val_loss:.4f}')

    writer.close()


In [190]:
# Train the Model
ic.enable()  # Enable icecream debugging
ic.disable()  # Enable icecream debugging
# Example usage:
# Assume one-hot encoded inputs have size 20, and we have 20 output classes (for each amino acid).
input_size = max_length-1            # one-hot encoded vector length for each residue
hidden_size = 512          # total hidden size of the LSTM
num_layers = 2             # two LSTM layers
output_size = 20           # output predictions for each amino acid (e.g., next residue probability)
dropout_rate = 0.5
learning_rate = 1e-4
tensorboard_log_dir = f"runs/generative_lstm/experiment_{time.strftime('%Y-%m-%d_%H-%M-%S')}"
num_epochs = 50
model = LSTM_Generative(input_size, hidden_size, num_layers, output_size, dropout_rate)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

train_lstm(model, train_loader, val_loader, criterion, optimizer, tensorboard_log_dir, num_epochs)




Epoch 1/50, TrainLoss: 2.8750, ValLoss: 2.7260
Epoch 6/50, TrainLoss: 2.7518, ValLoss: 2.7100
Epoch 11/50, TrainLoss: 2.5501, ValLoss: 2.5022
Epoch 16/50, TrainLoss: 2.4372, ValLoss: 2.4952
Epoch 21/50, TrainLoss: 2.3439, ValLoss: 2.4290
Epoch 26/50, TrainLoss: 2.2583, ValLoss: 2.4106
Epoch 31/50, TrainLoss: 2.1850, ValLoss: 2.4265
Epoch 36/50, TrainLoss: 2.0844, ValLoss: 2.4282
Epoch 41/50, TrainLoss: 2.0110, ValLoss: 2.4520
Epoch 46/50, TrainLoss: 1.9870, ValLoss: 2.4530


In [169]:
# Define the set of valid amino acids
valid_amino_acids = set("ACDEFGHIKLMNPQRSTVWY")

# Concatenate all sequences into a single string
all_sequences = ''.join(df['Sequences'])

# Find unique characters in the concatenated string
unique_characters = set(all_sequences)

# Find characters that are not in the set of valid amino acids
invalid_characters = unique_characters - valid_amino_acids

print(f"Invalid characters found: {invalid_characters}")

Invalid characters found: set()


In [188]:
import torch
import numpy as np
from sklearn.metrics import accuracy_score
ic.disable()

def decode_prediction(outputs):
    """
    Decodes the predicted output tensor into an amino acid sequence.

    Args:
        outputs (torch.Tensor): The output tensor from the model of shape (batch_size, num_classes).

    Returns:
        str: The decoded amino acid sequence.
    """
    amino_acids = "ACDEFGHIKLMNPQRSTVWY"  # 20 standard amino acids
    sigmoid_outputs = torch.sigmoid(outputs)  # Apply sigmoid to the outputs
    # print(predicted_indices)

    predicted_indices = torch.argmax(sigmoid_outputs, dim=0).tolist()  # Get highest probability indices
    # predicted_sequence = ''.join([amino_acids[idx] for idx in predicted_indices])
    predicted_sequence = amino_acids[predicted_indices]
    # print(predicted_sequence)
    return predicted_sequence


def one_hot_to_sequence(one_hot_tensor):
    """
    Converts a one-hot encoded tensor back into an amino acid sequence,
    handling fully zero vectors (padding 'X').

    Args:
        one_hot_tensor (torch.Tensor): One-hot encoded tensor of shape (num_amino_acids, seq_length)

    Returns:
        str: Decoded amino acid sequence
    """
    amino_acids = "ACDEFGHIKLMNPQRSTVWY"  # 20 standard amino acids
    seq_length = one_hot_tensor.shape[1]  # Get sequence length
    
    decoded_sequence = ""
    for i in range(seq_length):
        column = one_hot_tensor[:, i]  # Get one-hot vector for residue
        
        if torch.sum(column) == 0:  # If it's fully zero, assume it's padding
            decoded_sequence += "X"
        else:
            amino_acid_index = torch.argmax(column).item()  # Get highest probability index
            decoded_sequence += amino_acids[amino_acid_index]  # Convert index to amino acid
    
    return decoded_sequence

def evaluate_lstm(model, test_loader, criterion):
    model.eval()  # Set model to evaluation mode
    
    total_loss = 0
    total_accuracy = 0
    total_perplexity = 0
    num_samples = 0

    with torch.no_grad():
        for input_seq, target_seq in test_loader:
            print('input_seq')
            # print(one_hot_to_sequence(input_seq[0]))
            outputs = model(input_seq)  # Predictions (batch_size, seq_length, num_classes)
            print('pred_seq')
            # print(outputs[0])
            print(decode_prediction(outputs[0]))
            print('target_seq')
            # print(target_seq[0])  
            # print(target_seq[0].shape)
            target_seq_ = target_seq[0].squeeze()
            # print(target_seq_)
            target_seq_ = torch.argmax(target_seq_, dim = 0)  # Converts (batch_size, 20) → (batch_size,)
            print(target_seq_)
            print(decode_prediction(target_seq[0].squeeze()))
            print('---------------------------------------------')
            
            
            target_seq = torch.argmax(target_seq, dim=1)  # Converts (batch_size, 20) → (batch_size,)
            target_seq = target_seq.squeeze()
            loss = criterion(outputs, target_seq)            
            total_loss += loss.item()

            # Compute accuracy (compare predicted residues with ground truth)
            predicted_indices = torch.argmax(outputs, dim=-1)  # Get highest probability residues
            target_indices = torch.argmax(target_seq, dim=-1)  # Get actual residues
            batch_accuracy = (predicted_indices == target_indices).float().mean().item()
            total_accuracy += batch_accuracy

            # Compute perplexity (exp of cross-entropy loss)
            batch_perplexity = torch.exp(loss).item()
            total_perplexity += batch_perplexity

            num_samples += 1
            # break
    
    # Calculate averages
    avg_loss = total_loss / num_samples
    avg_accuracy = total_accuracy / num_samples
    avg_perplexity = total_perplexity / num_samples

    print(f"Test Loss: {avg_loss:.4f}")
    print(f"Test Accuracy: {avg_accuracy:.4f}")
    print(f"Test Perplexity: {avg_perplexity:.4f}")

    return avg_loss, avg_accuracy, avg_perplexity
# Run evaluation
test_loss, test_accuracy, test_perplexity = evaluate_lstm(model, test_loader, criterion)
print('test_loss:',test_loss)
print('test_accuracy:',test_accuracy)
print('test_perplexity:',test_perplexity)



input_seq


pred_seq
T
target_seq
tensor(16)
T
---------------------------------------------
input_seq
pred_seq
C
target_seq
tensor(1)
C
---------------------------------------------
input_seq
pred_seq
L
target_seq
tensor(8)
K
---------------------------------------------
input_seq
pred_seq
Y
target_seq
tensor(9)
L
---------------------------------------------
input_seq
pred_seq
C
target_seq
tensor(15)
S
---------------------------------------------
input_seq
pred_seq
G
target_seq
tensor(5)
G
---------------------------------------------
input_seq
pred_seq
C
target_seq
tensor(7)
I
---------------------------------------------
input_seq
pred_seq
K
target_seq
tensor(14)
R
---------------------------------------------
input_seq
pred_seq
W
target_seq
tensor(5)
G
---------------------------------------------
input_seq
pred_seq
Q
target_seq
tensor(1)
C
---------------------------------------------
input_seq
pred_seq
D
target_seq
tensor(17)
V
---------------------------------------------
input_seq
pred_s