# Environment

In [1]:
# clone our github repo
!git clone https://github.com/MicroResearchLab/AMP-potency-prediction-EvoGradient.git
%cd AMP-potency-prediction-EvoGradient/

Cloning into 'AMP-potency-prediction-EvoGradient'...
remote: Enumerating objects: 97, done.[K
remote: Counting objects: 100% (97/97), done.[K
remote: Compressing objects: 100% (68/68), done.[K
remote: Total 97 (delta 30), reused 88 (delta 26), pack-reused 0 (from 0)[K
Receiving objects: 100% (97/97), 17.16 MiB | 24.07 MiB/s, done.
Resolving deltas: 100% (30/30), done.
/content/AMP-potency-prediction-EvoGradient


In [2]:
!pip install biopython==1.81 # Based on the default Colab environment, you only need to install Biopython to run our code.

Collecting biopython==1.81
  Downloading biopython-1.81-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Downloading biopython-1.81-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/3.1 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m3.1/3.1 MB[0m [31m132.5 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m73.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: biopython
Successfully installed biopython-1.81


# AMP-CLIP

In [3]:
import torch
import numpy as np
import pandas as pd
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.autograd import Variable
import numpy as np
import math
import argparse
from Bio import SeqIO

# Parse command-line arguments
parser = argparse.ArgumentParser(description="AMP Classification")
parser.add_argument("--testPath", type=str, default='./data/classification/demo.fasta', help="Path to the test dataset")
parser.add_argument("--savePath", type=str, default='output/classification_result.csv', help="Path to save the results")
# args = parser.parse_args()
args, unknown = parser.parse_known_args()

testPath = args.testPath
savePath = args.savePath

# Data paths
trainPath = "./data/classification/train.csv"
validatePath = "./data/classification/test.csv"

# Configuration parameters
batch_size = 256
embedding_size = 20
num_tokens = 100
num_classes = 2
num_heads = 4

# Model paths
model_list = {
    "CNN": "./model/classification/CNN.pth",
    "Transformer": "./model/classification/Transformer.pth",
    "Attention": "./model/classification/Attention.pth",
    "LSTM": "./model/classification/LSTM.pth",
}
nameList = model_list.keys()

# Sequence to numerical mapping
mydict = {"A": 0, "C": 1, "D": 2, "E": 3, "F": 4, "G": 5, "H": 6, "I": 7, "K": 8, "L": 9, "M": 10, "N": 11, "P": 12, "Q": 13, "R": 14, "S": 15, "T": 16, "V": 17, "W": 18, "Y": 19}

softmax = nn.functional.softmax


def fasta_to_csv(fasta_path, csv_path):
    """
    Convert a FASTA file to a CSV file.

    Parameters:
    fasta_path (str): Path to the input FASTA file.
    csv_path (str): Path to the output CSV file.

    Returns:
    str: Path to the output CSV file.
    """
    sequences = []
    lengths = []

    # Parse the FASTA file and extract sequences and their lengths
    for record in SeqIO.parse(fasta_path, "fasta"):
        sequences.append(str(record.seq))
        lengths.append(len(record.seq))

    # Create a DataFrame with sequences and their lengths
    df = pd.DataFrame({"Sequence": sequences, "Length": lengths})

    # Save the DataFrame to a CSV file
    print(csv_path)
    df.to_csv(csv_path, index=False)
    return csv_path


# Transform the test FASTA file to CSV
testPath = fasta_to_csv(testPath, testPath[:-5] + ".csv")


def dataProcessPipeline(seq):
    """
    Process a sequence into a padded one-hot encoded tensor and a mask.

    Parameters:
    seq (str): The input sequence to process.

    Returns:
    tuple: A tuple containing the padded one-hot encoded tensor and the mask tensor.
    """
    testest = seq
    num_seq = [mydict[character.upper()] for character in seq]

    seq = np.array(num_seq, dtype=int)
    len = seq.shape[0]
    torch_seq = torch.tensor(seq)

    if torch.sum(torch_seq[torch_seq < 0]) != 0:
        print(torch_seq[torch_seq < 0])
        print("wrong seq:", seq)
        print(testest)

    onehotSeq = torch.nn.functional.one_hot(torch_seq, num_classes=20)
    # Pad the sequence to a length of 100
    pad = torch.nn.ZeroPad2d(padding=(0, 0, 0, 100 - len))
    mask = np.zeros(100, dtype=int)
    mask[len:] = 1
    mask = torch.tensor(mask)
    pad_seq = pad(onehotSeq)

    return pad_seq, mask


# train dataset
class TrainDataset(Dataset):
    def __init__(self, data_path):
        df = pd.read_csv(data_path, header=0)
        df = df[df["Length"] <= 100]
        self.seqs = list(df["Sequence"])
        self.labels = list(df["label"])

    def __getitem__(self, index):
        seq = self.seqs[index]
        num_seq, mask = dataProcessPipeline(seq)
        label = self.labels[index]
        return num_seq, mask, label

    def __len__(self):
        return len(self.seqs)


# test dataset
class TestDataset(Dataset):
    def __init__(self, data_path):
        df = pd.read_csv(data_path, header=0).reset_index()
        self.seqs = df["Sequence"]

    def __getitem__(self, index):
        seq = self.seqs[index]
        num_seq, mask = dataProcessPipeline(seq)
        return num_seq, mask, seq

    def __len__(self):
        return len(self.seqs)


class FastaDataset(Dataset):
    def __init__(self, data_path, transform=dataProcessPipeline):
        """
        Initialize the dataset from a FASTA file.

        Parameters:
        data_path (str): Path to the FASTA file.
        transform (function): Function to process the sequences.
        """
        self.seqs = [record.seq for record in SeqIO.parse(data_path, "fasta")]
        self.transform = transform

    def __getitem__(self, index):
        seq = str(self.seqs[index])
        num_seq, mask = self.transform(seq)
        return num_seq, mask, seq

    def __len__(self):
        return len(self.seqs)


class PositionalEncoding(nn.Module):
    def __init__(self, length, d_model=20):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(length, d_model)
        position = torch.arange(0, length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer("pe", pe.unsqueeze(0))

    def forward(self, x):
        x = x + self.pe
        return x


"""attention model"""


class AttentionNetwork(nn.Module):

    def __init__(self, batch_size=128, embedding_size=20, num_tokens=100, num_classes=2, num_heads=4):

        super(AttentionNetwork, self).__init__()
        self.pe = PositionalEncoding(len=num_tokens, d_model=embedding_size)

        self.batch_size = batch_size
        self.embedding_size = embedding_size
        self.num_tokens = num_tokens
        self.num_classes = num_classes
        self.num_heads = num_heads

        self.hidden1 = 20
        self.hidden2 = 60
        self.hidden3 = 20
        self.dropout = 0.5

        self.relu = nn.ReLU()
        self.LN = nn.LayerNorm(normalized_shape=self.hidden1)
        self.fc1 = nn.Linear(self.embedding_size, self.hidden1)
        self.multihead_att = nn.MultiheadAttention(embed_dim=self.hidden1, num_heads=self.num_heads, batch_first=1, dropout=self.dropout)
        self.flatten = nn.Flatten()
        self.fc2 = nn.Linear(self.hidden1 * self.num_tokens, self.hidden2)
        self.fc3 = nn.Linear(self.hidden2, self.hidden3)
        self.fc4 = nn.Linear(self.hidden3, self.num_classes)
        self.dropout = nn.Dropout(self.dropout)
        self.softmax = nn.functional.softmax

    def forward(self, x, mask):
        x = self.pe(x)
        x = self.fc1(x)

        mask = mask.to(torch.bool)
        x, _ = self.multihead_att.forward(x, x, x, key_padding_mask=mask)
        x = self.flatten(x)
        x = self.fc2(x)
        x = self.dropout(x)

        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc4(x)

        return x


trainData = TrainDataset(data_path=trainPath)
validateData = TrainDataset(data_path=validatePath)
testData = TestDataset(data_path=testPath)

train_loader = DataLoader(dataset=trainData, batch_size=batch_size, shuffle=True, num_workers=2)
test_loader = DataLoader(dataset=testData, batch_size=batch_size, shuffle=False)
validate_loader = DataLoader(dataset=validateData, batch_size=batch_size, shuffle=False)

loss_function = nn.MSELoss()

result_df = pd.read_csv(testPath, header=0)

model_out = {}
# process with all the models
for modelName in nameList:
    modelPath = model_list[modelName]
    id = modelPath.split("/")[-2]
    model_out[modelName] = []

    t_model = torch.load(modelPath, weights_only=False, map_location='cpu')
    t_model.cpu()
    if modelName == "Transformer":
        t_model.postion_embedding.device = "cpu"

    # evaluate models
    def score(test_loader):
        t_model.eval()
        epi = 0.000001
        tp = 0
        tn = 0
        fp = 0
        fn = 0
        total = 0
        count = 0

        for data in test_loader:
            inputs, masks, labels = data
            inputs = inputs.float()
            masks = masks.float()
            inputs, masks, labels = Variable(inputs), Variable(masks), Variable(labels)

            inputs = inputs.cpu()
            masks = masks.cpu()

            if modelName != "Attention" and modelName != "Transformer2":
                out = t_model(inputs)
            else:
                out = t_model(inputs, masks)
            out = torch.squeeze(out)

            out = torch.argmax(out, -1)
            out = out.cpu()
            for i, pre in enumerate(out):
                total += 1
                if pre == labels[i]:
                    count += 1
                    if pre == 0:
                        tn += 1
                    else:
                        tp += 1
                if pre != labels[i]:
                    if pre == 0:
                        fn += 1
                    else:
                        fp += 1

        print("AMP classification result:")
        print("Precision:", np.round(tp / (tp + fp + epi), 3))
        print("Recall:", np.round(tp / (tp + fn + epi), 3))
        print("Specificity:", np.round(tn / (tn + fp + epi), 3))
        print("F1:", np.round(2 * tp / (2 * tp + fp + fn + epi), 3))
        print("Accuracy：", np.round(count / total, 3))
        print()

    print()
    print("Model:", modelName)
    score(validate_loader)

    # use model to predict test data
    for i, data in enumerate(test_loader):
        inputs, masks, seqs = data
        inputs = inputs.float()
        masks = masks.float()

        t_model.eval()
        inputs = inputs.cpu()
        masks = masks.cpu()
        if modelName != "Attention":
            out = t_model(inputs)
        else:
            out = t_model(inputs, masks)

        out = out.cpu()
        if "LSTM" in modelName:
            out = out.unsqueeze(0)
        out_ori = torch.squeeze(out)

        out_ori = torch.squeeze(out)
        out_soft = softmax(out_ori, -1)
        out_soft_AMP = out_soft[:, 1]

        out_soft_numpy = list(out_soft_AMP.detach().numpy())
        out_soft_numpy = [round(v, 3) for v in out_soft_numpy]
        model_out[modelName] = list(model_out[modelName]) + out_soft_numpy

# summarize the results
for k, v in model_out.items():
    result_df[k] = v

result_df = result_df[["Sequence", "CNN", "Transformer", "Attention", "LSTM"]]

y = (result_df["CNN"] > 0.5) * (result_df["Transformer"] > 0.5) * (result_df["LSTM"] > 0.5) * (result_df["Attention"] > 0.5)
result_df["Ensemble"] = y

result_df.to_csv(savePath, index=0)
print(result_df)
print(f"Test result is saved to ./{savePath} ")


./data/classification/demo..csv

Model: CNN
AMP classification result:
Precision: 0.982
Recall: 0.843
Specificity: 0.985
F1: 0.908
Accuracy： 0.916


Model: Transformer
AMP classification result:
Precision: 0.975
Recall: 0.845
Specificity: 0.98
F1: 0.906
Accuracy： 0.913


Model: Attention
AMP classification result:
Precision: 0.975
Recall: 0.85
Specificity: 0.979
F1: 0.908
Accuracy： 0.916


Model: LSTM
AMP classification result:
Precision: 0.979
Recall: 0.865
Specificity: 0.982
F1: 0.919
Accuracy： 0.925

                                               Sequence    CNN  Transformer  \
0                             FIHHIIGGLFSAGKAIHRLIRRRRR  0.725        0.816   
1                                    MSTNPKPQRKTKRNTNRR  0.381        0.637   
2     SDSHLGDLHKKAVPCKDLVPVVVDILVEHFGAARREREEDEEEEQLGGN  0.283        0.213   
3     LIDHLGAPRWAVDTILGAIAVGNLASWVLALVPGPGWAVKAGLATA...  0.416        0.521   
4     MSGRGKTGGKARAKAKTRSSRAGLQFPVGRVHRLLRKGNYAHRVGA...  0.418        0.499   
...              

# AMP-READ

In [4]:
import torch
import numpy as np
import pandas as pd
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torch.autograd import Variable
import numpy as np
import pandas as pd
import math
import argparse
from Bio import SeqIO

# Parse command-line arguments
parser = argparse.ArgumentParser(description="AMP Classification")
parser.add_argument("--testPath", type=str, default="./data/regression/demo.fasta", help="Path to the test dataset")
parser.add_argument("--savePath", type=str, default="output/regression_result.csv", help="Path to save the results")
# args = parser.parse_args()
args, unknown = parser.parse_known_args()

testPath = args.testPath
savePath = args.savePath

# Data paths
trainPath = "./data/regression/train.csv"
validatePath = "./data/regression/test.csv"


# Configuration parameters

batch_size = 256
MAX_MIC = math.log10(8192)
My_MAX_MIC = math.log10(600)


# Model paths
model_list = {
    #
    "CNN": "./model/regression/CNN.pth",
    "Transformer": "./model/regression/Transformer.pth",
    "Attention": "./model/regression/Attention.pth",
    "LSTM": "./model/regression/LSTM.pth",
}

nameList = model_list.keys()
weight = {"CNN": 0.25000594, "Transformer": 0.2500046, "Attention": 0.25000825, "LSTM": 0.24998219}

mydict = {"A": 0, "C": 1, "D": 2, "E": 3, "F": 4, "G": 5, "H": 6, "I": 7, "K": 8, "L": 9, "M": 10, "N": 11, "P": 12, "Q": 13, "R": 14, "S": 15, "T": 16, "V": 17, "W": 18, "Y": 19}
myInvDict = dict([val, key] for key, val in mydict.items())


def fasta_to_csv(fasta_path, csv_path):

    sequences = []
    lengths = []

    for record in SeqIO.parse(fasta_path, "fasta"):
        sequences.append(str(record.seq))
        lengths.append(len(record.seq))

    df = pd.DataFrame({"Sequence": sequences, "Length": lengths})

    print(csv_path)
    df.to_csv(csv_path, index=False)
    return csv_path


# transform fasta to csv
testPath = fasta_to_csv(testPath, testPath[:-5] + ".csv")


def dataProcessPipeline(seq):

    # this function first transform peptide sequences into numerical sequence,
    # transformer it into onehot vector and padding them into a fix length
    # returning the padding vector and mask

    testest = seq
    num_seq = [mydict[character.upper()] for character in seq]

    seq = np.array(num_seq, dtype=int)
    len = seq.shape[0]
    torch_seq = torch.tensor(seq)
    if torch.sum(torch_seq[torch_seq < 0]) != 0:
        print(torch_seq[torch_seq < 0])
        print("wrong seq:", seq)
        print(testest)
    onehotSeq = torch.nn.functional.one_hot(torch_seq, num_classes=20)
    # onehotSeq = torch.nn.functional.one_hot(c
    pad = torch.nn.ZeroPad2d(padding=(0, 0, 0, 100 - len))
    mask = np.zeros(100, dtype=int)
    mask[len:] = 1
    mask = torch.tensor(mask)

    pad_seq = pad(onehotSeq)

    return pad_seq, mask


class TrainDataset(Dataset):
    def __init__(self, data_path, transform=dataProcessPipeline):
        df = pd.read_csv(data_path, header=0)

        self.df = df

        self.seqs = list(self.df["Sequence"])
        self.values = self.df["value"]

        self.values[self.values > MAX_MIC] = MAX_MIC
        self.values = list(self.values)
        self.transform = transform

    def __getitem__(self, idex):
        seq = self.seqs[idex]
        num_seq, mask = self.transform(seq)
        label = self.values[idex]

        return num_seq, mask, label

    def __len__(self):
        return len(self.seqs)


class TestDataset(Dataset):
    def __init__(self, data_path, transform=dataProcessPipeline):
        df = pd.read_csv(data_path, header=0)

        self.df = df

        self.seqs = self.df["Sequence"]
        self.transform = transform

    def __getitem__(self, idex):
        seq = self.seqs[idex]
        num_seq, mask = self.transform(seq)

        return num_seq, mask, seq

    def __len__(self):
        return len(self.seqs)


class FastaDataset(Dataset):
    def __init__(self, fasta_path, transform=dataProcessPipeline):
        self.seqs = [record.seq for record in SeqIO.parse(fasta_path, "fasta")]
        self.transform = transform

    def __getitem__(self, index):
        seq = str(self.seqs[index])
        num_seq, mask = self.transform(seq)
        return num_seq, mask, seq

    def __len__(self):
        return len(self.seqs)


class PositionalEncoding(nn.Module):
    def __init__(self, len, d_model=20, dropout=0):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(len, d_model)
        position = torch.arange(0, len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        # pe = pe.unsqueeze(0).transpose(0, 1)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe
        # x = x + self.pe[:,:x.size(0), :]
        return x


class AttentionNetwork(nn.Module):

    def __init__(self, batch_size=128, embedding_size=20, num_tokens=100, num_classes=1, num_heads=4):
        super(AttentionNetwork, self).__init__()
        self.pe = PositionalEncoding(len=num_tokens, d_model=embedding_size)
        self.batch_size = batch_size
        self.embedding_size = embedding_size
        self.num_tokens = num_tokens
        self.num_classes = num_classes
        self.hidden1 = 20
        self.hidden2 = 60
        self.hidden3 = 20
        self.dropout = 0.2
        self.relu = nn.ReLU()

        self.LN = nn.LayerNorm(normalized_shape=self.hidden1)
        self.fc1 = nn.Linear(self.embedding_size, self.hidden1)

        self.multihead_att = nn.MultiheadAttention(embed_dim=self.hidden1, num_heads=self.num_heads, batch_first=1, dropout=self.dropout)
        self.flatten = nn.Flatten()
        self.fc2 = nn.Linear(self.hidden1 * self.num_tokens, self.hidden2)
        self.fc3 = nn.Linear(self.hidden2, self.hidden3)
        self.new_fc4 = nn.Linear(self.hidden3, self.num_classes)

        self.dropout = nn.Dropout(self.dropout)

    def forward(self, x, mask):
        x = self.pe(x)
        x = self.fc1(x)

        mask = mask.to(torch.bool)
        x, w1 = self.multihead_att.forward(x, x, x, key_padding_mask=mask)

        x = self.flatten(x)
        x = self.fc2(x)
        x = self.dropout(x)

        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.new_fc4(x)

        return x


trainData = TrainDataset(data_path=trainPath)
validateData = TrainDataset(data_path=validatePath)
testData = TestDataset(data_path=testPath)

train_loader = DataLoader(dataset=trainData, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(dataset=testData, batch_size=batch_size, shuffle=False)
validate_loader = DataLoader(dataset=validateData, batch_size=batch_size, shuffle=False)

testData1 = TestDataset(data_path=testPath)
test_loader1 = DataLoader(dataset=testData1, batch_size=batch_size, shuffle=0, num_workers=4)


frames = []

result_df = pd.read_csv(testPath, header=0)

model_out = {}
for modelName in nameList:

    modelPath = model_list[modelName]
    modelPreName = modelPath.split("/")[-1][:-4]
    id = modelPath.split("/")[-2]
    model_out[modelName] = []

    t_model = torch.load(modelPath, weights_only=False, map_location='cpu')
    t_model.cpu()
    if modelName == "Transformer":
        t_model.postion_embedding.device = "cpu"

    t_model.zero_grad()

    def test_eval(test_loader):
        t_model.eval()
        total_loss = []
        loss_function = nn.MSELoss()
        for i, data in enumerate(test_loader):
            inputs, masks, labels = data
            inputs = inputs.float()
            masks = masks.float()
            labels = labels.float()

            inputs = inputs.cpu()
            masks = masks.cpu()
            if modelName != "Attention":
                out = t_model(inputs)
            else:
                out = t_model(inputs, masks)
            out = torch.squeeze(out)

            out = out.cpu()
            loss = loss_function(out, labels)
            total_loss.append(loss.detach().numpy())

        ave = np.mean(total_loss)
        return ave

    loss0 = test_eval(validate_loader)
    print(modelName, " MSE loss in validation set:", str(loss0))

    for i, data in enumerate(test_loader1):
        inputs, masks, seqs = data
        inputs = inputs.float()
        masks = masks.float()

        t_model.eval()
        inputs = inputs.cpu()
        masks = masks.cpu()
        if modelName != "Attention":
            out = t_model(inputs)
        else:
            out = t_model(inputs, masks)
        out = out.cpu()
        out_ori = torch.squeeze(out)

        out_numpy = list(out_ori.detach().numpy())
        out_numpy = [round(v, 3) for v in out_numpy]
        model_out[modelName] = list(model_out[modelName]) + out_numpy

# summarize the results
for k, v in model_out.items():
    result_df[k] = v

result_df["Length"] = [len(v) for v in result_df["Sequence"]]
result_df = result_df[["Sequence", "Length", "CNN", "Transformer", "Attention", "LSTM"]]
df = result_df
y = result_df["CNN"] * weight["CNN"] + result_df["Transformer"] * weight["Transformer"] + result_df["Attention"] * weight["Attention"] + result_df["LSTM"] * weight["LSTM"]

df["Ensemble"] = [round(v, 3) for v in y]

df = df[["Sequence", "Ensemble", "CNN", "Transformer", "Attention", "LSTM"]]

print(df)

df.to_csv(savePath, index=0)
print(f"Regression test result is saved to ./{savePath} ")

./data/regression/demo..csv


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.values[self.values > MAX_MIC] = MAX_MIC
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self.values[self.values > MAX_MIC] = MAX_MIC


CNN  MSE loss in validation set: 0.6175737




Transformer  MSE loss in validation set: 0.58181876
Attention  MSE loss in validation set: 0.53963906
LSTM  MSE loss in validation set: 0.605843
                                               Sequence  Ensemble    CNN  \
0       AMDPTKYYGNGVYCNSKKCWVDWGQGSGCIGQTVVGGWLGGAIPGKC    -0.493 -0.674   
1     SIEERVKKIIVDQLGAKAEDVKPETSFIEDLGADSLDTVELVMALE...     3.502  2.033   
2                        RSVFTKKNGKVFLYVVLLVLAAWRGYALAD     3.297  3.879   
3                                 GLLGSLFGAGKKVACALSGLC     1.118  1.348   
4                          AGIGTCCGGCMYTTAGGTCCSGIPICAK     1.679  1.889   
...                                                 ...       ...    ...   
1307    KAVTVVGMGDEGCPGLSSIAANAVAKAQILAGGKRHLDFFLNSPEKK     3.118  2.604   
1308                      WHSRVSPGVPYNCKSDQPQPRHMGVSCGV     3.526  2.484   
1309                       GGLRSLGRKILRAWKKYGPIIVPIIRIG     0.609  0.872   
1310             GLMSLFRGGVLKTAGKHIFKNVGGSLLDQAKCKITGEC     0.678  0.595   
1311            DNN

# EvoGradient

In [5]:
%cd EvoGradient

/content/AMP-potency-prediction-EvoGradient/EvoGradient


In [7]:
import os
import torch
import numpy as np
import pandas as pd
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import math
import warnings
import argparse

warnings.filterwarnings("ignore")

# Set up argument parser
parser = argparse.ArgumentParser(description="EvoGradient")
parser.add_argument("--peptide", type=str, default="RPLIKLRSTAGTGYTYVTRK", help="Peptide to optimize")
# args = parser.parse_args()
args, unknown = parser.parse_known_args()
to_opt = args.peptide


# Dictionary to map amino acids to numerical values
mydict = {"A": 0, "C": 1, "D": 2, "E": 3, "F": 4, "G": 5, "H": 6, "I": 7, "K": 8, "L": 9, "M": 10, "N": 11, "P": 12, "Q": 13, "R": 14, "S": 15, "T": 16, "V": 17, "W": 18, "Y": 19}
# Inverse dictionary to map numerical values back to amino acids
myInvDict = dict([val, key] for key, val in mydict.items())
MAX_MIC = math.log10(8192)
max_mic_buffer = 0.1
My_MAX_MIC = math.log10(600)


def num2seq(narr, len):
    """
    Convert a numerical array to a sequence of amino acids.

    Parameters:
    narr (numpy array): Array of numerical values representing amino acids.
    len (int): Length of the sequence to return.

    Returns:
    list: Sequence of amino acids.
    """
    numlist = np.argmax(narr, axis=1)
    seq = [myInvDict[value] for value in numlist]
    seq = seq[:len]
    return seq


def colorstr(*input):
    """
    Colors a string using ANSI escape codes.

    Parameters:
    *input (str): Colors and the string to color.

    Returns:
    str: Colored string.
    """
    # Colors a string https://en.wikipedia.org/wiki/ANSI_escape_code, i.e.  colorstr('blue', 'hello world')
    *args, string = input if len(input) > 1 else ("blue", "bold", input[0])  # color arguments, string
    colors = {
        "black": "\033[30m",  # basic colors
        "red": "\033[31m",
        "green": "\033[32m",
        "yellow": "\033[33m",
        "blue": "\033[34m",
        "magenta": "\033[35m",
        "cyan": "\033[36m",
        "white": "\033[37m",
        "bright_black": "\033[90m",  # bright colors
        "bright_red": "\033[91m",
        "bright_green": "\033[92m",
        "bright_yellow": "\033[93m",
        "bright_blue": "\033[94m",
        "bright_magenta": "\033[95m",
        "bright_cyan": "\033[96m",
        "bright_white": "\033[97m",
        "end": "\033[0m",  # misc
        "bold": "\033[1m",
        "underline": "\033[4m",
    }
    return "".join(colors[x] for x in args) + f"{string}" + colors["end"]


def standout(seq1, seq2):
    """
    Compare two sequences and highlight differences.

    Parameters:
    seq1 (str): First sequence.
    seq2 (str): Second sequence.

    Returns:
    list: Second sequence with differences highlighted.
    """
    # compare bewteen two seqs
    index = [1 if seq1[j] == seq2[j] else 0 for j in range(len(seq1))]
    newSeq2 = list(seq2)
    for i in range(len(seq1)):
        if index[i] == 0:
            newSeq2[i] = colorstr("blue", seq2[i])

    newSeq2 = "".join(newSeq2)

    return seq1, newSeq2


def colorPrint(ls):
    """
    Print the list with colored strings.

    Parameters:
    ls (list): List of sequences to print.
    """
    newls = [ls[0]]
    for i in range(len(ls) - 1):
        s1, s2 = standout(ls[i], ls[i + 1])
        newls.append(s2)

    for i in newls:
        print(i)


def colorShow(ls):
    """
    Highlight the changes with colored strings.

    Parameters:
    ls (list): List of sequences to compare and highlight.
    """
    length = len(ls[0])
    colorls = [ls[0]]
    flag = [0 for i in range(length)]  # record if the position was changed
    for i in range(len(ls) - 1):
        index = [1 if ls[i][j] == ls[i + 1][j] else 0 for j in range(length)]
        for k in range(length):
            if index[k] == 0:
                flag[k] = 1

        colorSeq = list(ls[i + 1])
        for k in range(length):
            if flag[k] == 1:
                colorSeq[k] = colorstr("blue", colorSeq[k])

        colorSeq = "".join(colorSeq)
        colorls.append(colorSeq)

    for seq in colorls:
        print(seq)


def dataProcessPipeline(seq):
    """
    Process a sequence into a padded one-hot encoded tensor and a mask.

    Parameters:
    seq (str): The input sequence to process.

    Returns:
    tuple: A tuple containing the padded one-hot encoded tensor and the mask tensor.
    """
    testest = seq
    num_seq = [mydict[character.upper()] for character in seq]

    seq = np.array(num_seq, dtype=int)
    len = seq.shape[0]
    torch_seq = torch.tensor(seq)
    if torch.sum(torch_seq[torch_seq < 0]) != 0:
        print(torch_seq[torch_seq < 0])
        print("wrong seq:", seq)
        print(testest)
    onehotSeq = torch.nn.functional.one_hot(torch_seq, num_classes=20)
    pad = torch.nn.ZeroPad2d(padding=(0, 0, 0, 100 - len))
    mask = np.zeros(100, dtype=int)
    mask[len:] = 1
    mask = torch.tensor(mask)

    pad_seq = pad(onehotSeq)

    return pad_seq, mask


def num2onehot(array2d):
    """
    Convert a numerical array to a one-hot encoded tensor.

    Parameters:
    array2d (torch.Tensor): The input numerical array.

    Returns:
    torch.Tensor: The one-hot encoded tensor.
    """
    result = torch.zeros_like(array2d)
    index = torch.argmax(array2d, dim=-1)
    for i in range(index.shape[0]):
        result[i, index[i]] = 1

    return result


# Define the train dataset class
class TrainDataset(Dataset):
    def __init__(self, data_path, transform=dataProcessPipeline):
        """
        Initialize the dataset.

        Parameters:
        data_path (str): Path to the CSV file containing the data.
        transform (function): Function to process the sequences.
        """
        df = pd.read_csv(data_path, header=0)
        df = df[df["Length"] <= 100]
        self.df = df
        self.seqs = list(self.df["Sequence"])
        self.values = self.df["value"]
        self.values[self.values > MAX_MIC] = MAX_MIC
        self.values = list(self.values)

        self.transform = transform

    def __getitem__(self, idex):
        """
        Get an item from the dataset.

        Parameters:
        idex (int): Index of the item to retrieve.

        Returns:
        tuple: A tuple containing the processed sequence, mask, label, and original sequence.
        """
        seq = self.seqs[idex]
        num_seq, mask = self.transform(seq)
        label = self.values[idex]

        return num_seq, mask, label, seq

    def __len__(self):
        return len(self.seqs)


# Define the test dataset class
class TestDataset(Dataset):
    def __init__(self, data_path, transform=dataProcessPipeline):
        self.df = pd.read_csv(data_path, header=0)
        self.seqs = self.df["Sequence"]

        self.transform = transform

    def __getitem__(self, idex):
        seq = self.seqs[idex]
        num_seq, mask = self.transform(seq)

        return num_seq, mask, seq

    def __len__(self):
        return len(self.seqs)


class PositionalEncoding(nn.Module):
    def __init__(self, len, d_model=20, dropout=0):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(len, d_model)
        position = torch.arange(0, len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        x = x + self.pe
        return x


pe = PositionalEncoding(len=100, d_model=20)


class AttentionNetwork(nn.Module):

    def __init__(self, batch_size=128, embedding_size=20, num_tokens=100, num_classes=1, num_heads=4):
        super(AttentionNetwork, self).__init__()
        self.pe = PositionalEncoding(len=num_tokens, d_model=embedding_size)
        self.batch_size = batch_size
        self.embedding_size = embedding_size
        self.num_tokens = num_tokens
        self.num_classes = num_classes
        self.num_heads = num_heads
        # self.hidden1 = 20
        self.hidden1 = 20
        self.hidden2 = 60
        self.hidden3 = 20
        self.dropout = 0.2
        self.relu = nn.ReLU()
        self.LN = nn.LayerNorm(normalized_shape=self.hidden1)
        self.fc1 = nn.Linear(self.embedding_size, self.hidden1)

        self.multihead_att = nn.MultiheadAttention(embed_dim=self.hidden1, num_heads=self.num_heads, batch_first=1, dropout=self.dropout)
        self.flatten = nn.Flatten()
        self.fc2 = nn.Linear(self.hidden1 * self.num_tokens, self.hidden2)
        self.fc3 = nn.Linear(self.hidden2, self.hidden3)
        self.new_fc4 = nn.Linear(self.hidden3, self.num_classes)
        self.dropout = nn.Dropout(self.dropout)
        self.softmax = nn.functional.softmax

    def forward(self, x, mask):
        x = self.pe(x)
        x = self.fc1(x)

        mask = mask.to(torch.bool)
        x, w1 = self.multihead_att.forward(x, x, x, key_padding_mask=mask)

        x = self.flatten(x)
        x = self.fc2(x)
        x = self.dropout(x)

        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.new_fc4(x)

        return x

# use attention model to optimize the sequence for demostration
model_list = {
    "Attention": "../model/regression/Attention.pth",
}

# models used to evaluate the optimized sequences
test_model_list = {"CNN": "../model/regression/CNN.pth", "Transformer": "../model/regression/Transformer.pth", "Attention": "../model/regression/Attention.pth", "LSTM": "../model/regression/LSTM.pth"}

opt_seqls = [to_opt]

# Set the number of iterations and learning rate for each model
iter_dict = {"CNN": 500, "Transformer": 500, "Attention": 500, "RCNN": 500}
lr_dict = {"CNN": 0.01, "Transformer": 0.0005, "Attention": 0.005, "RCNN": 0.001}

for seq in opt_seqls:
    tseq = seq
    ModelNameList = model_list.keys()

    oriseq = tseq

    # Create a DataFrame to store the sequences, their lengths, and labels
    df = pd.DataFrame(columns=["Sequence", "Length", "label"])
    items = [{"Sequence": oriseq, "Length": len(oriseq)}]

    # use _append or append for different pandas version
    # df = df.append(items, ignore_index=1)
    df = df._append(items, ignore_index=1)

    df.to_csv("./EvoResult/" + oriseq + ".csv", index=False)
    SeqPath = "./EvoResult/" + oriseq + ".csv"

    testData1 = TrainDataset(data_path=r"../data/regression/test.csv")
    test_loader1 = DataLoader(dataset=testData1, batch_size=4, drop_last=True)

    # Iterate over the models
    for modelName in ModelNameList:
        alpha = lr_dict[modelName]
        iters = iter_dict[modelName]

        iternum = 0

        testData = TestDataset(data_path=SeqPath)
        test_loader = DataLoader(dataset=testData, batch_size=1)
        t_model = torch.load(model_list[modelName], weights_only=False, map_location='cpu')
        if modelName == "Transformer":
            t_model.postion_embedding.device = "cpu"

        t_model.zero_grad()

        print("Using", modelName, " to optimize this sequence:")
        flag = 1

        for data in test_loader:
            resultList = []
            # ensamble_values = []
            resultSeq = [oriseq]
            outMIC = []
            # t_model.zero_grad()
            inputs, masks, seqs = data

            inputs = inputs.float()
            masks = masks.float()

            inputs = inputs.cpu()
            inputs.requires_grad = True
            masks = masks.cpu()
            print(seqs[0])

            t_model.eval()
            for iter in range(iters):
                t_model.zero_grad()
                inputs.retain_grad = True

                if modelName != "Attention":
                    out = t_model(inputs)
                else:
                    out = t_model(inputs, masks)

                # out = torch.squeeze(out)
                out = out.cpu()
                conloss = out
                conloss.backward()
                grad = inputs.grad

                colindex = masks[0] == 1
                grad[0][masks[0] == 1] = 0
                mylen = 100 - colindex.sum()

                ori_onehot = num2onehot(inputs[0].cpu())
                result = inputs[0] - alpha * grad[0]
                result[mylen:, :] = 0
                tempt_onehot = num2onehot(result.cpu())

                if (tempt_onehot == ori_onehot).all():  # if no AAs (after projection) was changed
                    flag = 0
                else:
                    result = tempt_onehot
                    flag = 1
                with torch.no_grad():
                    inputs[0] = result

                result = result.cpu().detach().numpy()
                seq = num2seq(result, len=mylen)
                seq = "".join(seq)
                if flag == 1:
                    resultSeq.append(seq)

        print()
        colorShow(resultSeq)
        print()

        optSeqDir = "./EvoResult/" + oriseq
        if not os.path.exists(optSeqDir):
            os.makedirs(optSeqDir)
        optSeqSavePath = optSeqDir + "/" + modelName + ".csv"

        result_df = pd.DataFrame(columns=["Sequence", "label", "Length"])
        items = []
        for seq in resultSeq:
            item = {"Sequence": seq, "Length": len(seq)}
            items.append(item)

        # use _append or append for different pandas version
        # result_df = result_df.append(items)
        result_df = result_df._append(items)

        result_df.to_csv(optSeqSavePath)

        result_soli = []

        testModelNameList = test_model_list.keys()

        preList = {}

        mylen = result_df.shape[0]
        numslist = []

        model_out = {}
        for testModelName in testModelNameList:
            ls = []
            model_out[testModelName] = []
            testData = TestDataset(data_path=optSeqSavePath)
            test_loader = DataLoader(dataset=testData, batch_size=64)

            testData1 = TrainDataset(data_path=r"../data/regression/test.csv")
            test_loader1 = DataLoader(dataset=testData1, batch_size=4, drop_last=True)

            t_model = torch.load(test_model_list[testModelName], weights_only=False, map_location='cpu')
            if testModelName == "Transformer":
                t_model.postion_embedding.device = "cpu"

            t_model.cpu()
            t_model.zero_grad()
            t_model.eval()

            for data in test_loader:
                resultList = []
                t_model.zero_grad()
                inputs, masks, seqs = data
                inputs = inputs.float()
                masks = masks.float()

                inputs = inputs.cpu()
                masks = masks.cpu()
                t_model.zero_grad()

                if testModelName != "Attention":
                    out = t_model(inputs)
                else:
                    out = t_model(inputs, masks)

                out = out.cpu()
                if len(out.shape) > 0:
                    out_ori = torch.squeeze(out)
                else:
                    out_ori = out.unsqueeze(0)

                out_numpy = list(out_ori.detach().numpy())
                out_numpy = [round(v, 3) for v in out_numpy]
                model_out[testModelName] = list(model_out[testModelName]) + out_numpy

        # summarize the results
        for k, v in model_out.items():
            result_df[k] = v
        resultPath = optSeqSavePath[:-4] + "_result.csv"

        ensamble_values = [(result_df["Attention"][k] + result_df["Transformer"][k] + result_df["CNN"][k] + result_df["LSTM"][k]) / 4 for k in range(result_df.shape[0])]
        ensamble_values = [round(v, 3) for v in ensamble_values]
        result_df["Ensemble"] = ensamble_values

        result_df = result_df[["Sequence", "Ensemble", "Length", "CNN", "Transformer", "Attention", "LSTM"]]

        result_df.to_csv(resultPath, index=0)
        print(f"Optimization result is saved to ./{resultPath} ")

Using Attention  to optimize this sequence:
RPLIKLRSTAGTGYTYVTRK

RPLIKLRSTAGTGYTYVTRK
RPLIKLR[34mW[0mTAGTGYTYVTRK
R[34mK[0mLIKLR[34mW[0mTAGTGYTYVTRK
R[34mK[0mL[34mK[0mKLR[34mW[0mTAGTGYTYVTRK
R[34mK[0mL[34mK[0mKLR[34mW[0m[34mR[0mAGTGYTYVTRK
R[34mK[0mL[34mK[0mKLR[34mW[0m[34mR[0mAGT[34mM[0mYTYVTRK
R[34mK[0mL[34mK[0mKLR[34mW[0m[34mR[0mAGT[34mM[0mY[34mK[0mYVTRK
R[34mK[0mL[34mK[0mKLR[34mW[0m[34mR[0mAG[34mM[0m[34mM[0mY[34mK[0mYVTRK
R[34mK[0mL[34mK[0mKLR[34mW[0m[34mR[0mAG[34mM[0m[34mM[0mY[34mK[0mYVT[34mL[0mK
R[34mK[0mL[34mK[0mKLR[34mW[0m[34mR[0mAG[34mM[0m[34mM[0mY[34mK[0mYV[34mK[0m[34mL[0mK
[34mT[0m[34mK[0mL[34mK[0mKLR[34mW[0m[34mR[0mAG[34mM[0m[34mM[0mY[34mK[0mYV[34mK[0m[34mL[0mK

Optimization result is saved to ././EvoResult/RPLIKLRSTAGTGYTYVTRK/Attention_result.csv 
