# Imports

In [1]:
!pip install blosum
!pip install Bio
!pip3 install torch torchvision torchaudio transformers sentencepiece accelerate --extra-index-url https://download.pytorch.org/whl/cu116
!pip install protein-bert
!pip install biopython biotite
!pip3 install torch torchvision torchaudio transformers sentencepiece accelerate --extra-index-url https://download.pytorch.org/whl/cu116
!pip install fair-esm

Collecting blosum
  Downloading blosum-2.0.2-py3-none-any.whl (21 kB)
Installing collected packages: blosum
Successfully installed blosum-2.0.2
Collecting Bio
  Downloading bio-1.5.9-py3-none-any.whl (276 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m276.4/276.4 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting biopython>=1.80 (from Bio)
  Downloading biopython-1.81-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
Collecting mygene (from Bio)
  Downloading mygene-3.2.2-py2.py3-none-any.whl (5.4 kB)
Collecting gprofiler-official (from Bio)
  Downloading gprofiler_official-1.0.0-py3-none-any.whl (9.3 kB)
Collecting biothings-client>=0.2.6 (from mygene->Bio)
  Downloading biothings_client-0.3.0-py2.py3-none-any.whl (29 kB)
Installing collected packages: biopython, gprofiler-official, biothings-client, mygene, Bio
Succes

In [2]:
import scipy as sp
import matplotlib.pyplot as plt
import seaborn as sns; sns.set()
import pandas as pd
import numpy as np
from scipy.stats import t
import blosum as bl
from Bio import SeqIO
import random
from scipy import stats
import torch
import esm
import re
import os
from tqdm import tqdm
import seaborn as sns
import re
import random
import pickle
import statistics

from transformers import T5Tokenizer, T5EncoderModel
from transformers import AlbertModel, AlbertTokenizer
from transformers import BertModel, BertTokenizer
from transformers import XLNetModel, XLNetTokenizer
import time
from zipfile import ZipFile

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print("Using device: {}".format(device))

torch.set_grad_enabled(False)

import warnings

warnings.filterwarnings("ignore")

Using device: cuda:0


# Models

## T5

In [3]:
def ProtT5_initialize():

  print("ProtT5 Initialize : ")
  transformer_link = "Rostlab/prot_t5_xl_half_uniref50-enc"
  print("Loading: {}".format(transformer_link))
  T5 = T5EncoderModel.from_pretrained(transformer_link)
  T5.full() if device=='cpu' else T5.half() # only cast to full-precision if no GPU is available
  T5 = T5.to(device)
  T5 = T5.eval()
  T5_tokenizer = T5Tokenizer.from_pretrained(transformer_link, do_lower_case=False)

  return T5 , T5_tokenizer

In [4]:
def get_embs_T5(T5, tokenizer, sequences, n):
  sequence_examples = sequences[:n]

  # this will replace all rare/ambiguous amino acids by X and introduce white-space between all amino acids
  sequence_examples = [" ".join(list(re.sub(r"[UZOB]", "X", sequence))) for sequence in sequence_examples]

  # tokenize sequences and pad up to the longest sequence in the batch
  ids = tokenizer.batch_encode_plus(sequence_examples, add_special_tokens=True, padding= True)
  input_ids = torch.tensor(ids['input_ids']).to(device)
  attention_mask = torch.tensor(ids['attention_mask']).to(device)

  # generate embeddings
  with torch.no_grad():
      embedding_repr = T5(input_ids=input_ids, attention_mask=attention_mask)

  last_layer_repr = embedding_repr.last_hidden_state
  final_embs = []
  for i in range(len(last_layer_repr)):
    final_embs.append(last_layer_repr[i , :len(sequences[i])])

  return final_embs

## ESM1b

In [5]:
def ESM1b_initialize():
  # Load ESM-1b model
  print("ESM1b Initialize : ")
  ESM1b, alphabet = esm.pretrained.esm1b_t33_650M_UR50S()
  batch_converter = alphabet.get_batch_converter()
  ESM1b.eval()  # disables dropout for deterministic results

  return ESM1b, batch_converter

In [6]:
def get_embs_ESM1b(ESM1b, batch_converter, sequences, n):
  sequences = sequences[:n]
  data = [("" , sequences[0])]

  batch_labels, batch_strs, batch_tokens = batch_converter(data)

  # Extract per-residue representations
  with torch.no_grad():
      results = ESM1b(batch_tokens, repr_layers=[33], return_contacts= False)
  token_representations = results["representations"][33]

  final_embs = []
  for i in range(len(token_representations)):
    final_embs.append(token_representations[i][1:-1])

  return final_embs

## ESM2

In [7]:
def ESM2_initialize():
  # Load ESM-2 model
  print("ESM2 Initialize : ")
  ESM2, alphabet = esm.pretrained.esm2_t33_650M_UR50D()
  batch_converter = alphabet.get_batch_converter()
  ESM2.eval()  # disables dropout for deterministic results

  return ESM2, batch_converter

In [8]:
def get_embs_ESM2(ESM2, batch_converter, sequences, n):
  sequences = sequences[:n]
  data = [("" , sequences[0])]

  batch_labels, batch_strs, batch_tokens = batch_converter(data)

  # Extract per-residue representations
  with torch.no_grad():
      results = ESM2(batch_tokens, repr_layers=[33], return_contacts= False)
  token_representations = results["representations"][33]

  final_embs = []
  for i in range(len(token_representations)):
    final_embs.append(token_representations[i][1:-1])

  return final_embs

## Bert Model

In [9]:
def ProtBert_initialize():

  print("ProtBert Initialize : ")

  Bert_tokenizer = BertTokenizer.from_pretrained("Rostlab/prot_bert", do_lower_case=False)
  Bert = BertModel.from_pretrained("Rostlab/prot_bert")

  Albert = Bert.to(device)
  Albert = Bert.eval()

  return Bert, Bert_tokenizer

In [10]:
def get_embs_ProtBert(Bert , tokenizer , sequences , n):
  sequence_examples = sequences[:n]

  # this will replace all rare/ambiguous amino acids by X and introduce white-space between all amino acids
  sequence_examples = [" ".join(list(re.sub(r"[UZOB]", "X", sequence))) for sequence in sequence_examples]

  ids = tokenizer.batch_encode_plus(sequence_examples, add_special_tokens=True, pad_to_max_length=True)
  input_ids = torch.tensor(ids['input_ids']).to(device)
  attention_mask = torch.tensor(ids['attention_mask']).to(device)

  with torch.no_grad():
    embedding = Bert(input_ids=input_ids,attention_mask=attention_mask)[0]

  final_embs = []
  for seq_num in range(len(embedding)):
      seq_len = (attention_mask[seq_num] == 1).sum()
      seq_emd = embedding[seq_num][1:seq_len-1]
      final_embs.append(seq_emd)

  return final_embs

## Albert Model

In [11]:
def ProtAlbert_initialize():

  print("ProtAlbert Initialize : ")

  Albert_tokenizer = AlbertTokenizer.from_pretrained("Rostlab/prot_albert", do_lower_case=False)
  Albert = AlbertModel.from_pretrained("Rostlab/prot_albert")

  Albert = Albert.to(device)
  Albert = Albert.eval()

  return Albert, Albert_tokenizer

In [12]:
def get_embs_ProtAlbert(Albert, Albert_tokenizer, sequences, n):

  sequences = [" ".join(re.sub(r"[UZOB]", "X", sequence)) for sequence in sequences]
  ids = Albert_tokenizer.batch_encode_plus(sequences, add_special_tokens=True, padding = 'longest')
  input_ids = torch.tensor(ids['input_ids']).to(device)
  attention_mask = torch.tensor(ids['attention_mask']).to(device)

  with torch.no_grad():
      embedding = Albert(input_ids = input_ids , attention_mask = attention_mask)[0]

  features = []
  for seq_num in range(len(embedding)):
      seq_len = (attention_mask[seq_num] == 1).sum()
      seq_emd = embedding[seq_num][1 : seq_len - 1]
      features.append(seq_emd)

  return features

## XLNet

In [13]:
def XLNet_initialize():

  print("XLNet Initialize : ")

  XLNet_tokenizer = XLNetTokenizer.from_pretrained("Rostlab/prot_xlnet" , do_lower_case=False)
  XLNet = XLNetModel.from_pretrained("Rostlab/prot_xlnet" , mem_len= 1024)

  XLNet = XLNet.to(device)
  XLNet = XLNet.eval()

  return XLNet, XLNet_tokenizer

In [14]:
def get_embs_XLNet(XLNet, XLNet_tokenizer, sequences, n):

  sequences = [" ".join(re.sub(r"[UZOBX]" , "<unk>", sequence)) for sequence in sequences]
  ids = XLNet_tokenizer.batch_encode_plus(sequences, add_special_tokens = True, padding = 'longest')
  input_ids = torch.tensor(ids['input_ids']).to(device)
  attention_mask = torch.tensor(ids['attention_mask']).to(device)

  with torch.no_grad():
      output = XLNet(input_ids = input_ids , attention_mask = attention_mask)
      embedding = output.last_hidden_state

  features = []
  for seq_num in range(len(embedding)):
      seq_len = (attention_mask[seq_num] == 1).sum()
      padded_seq_len = len(attention_mask[seq_num])
      seq_emd = embedding[seq_num][padded_seq_len - seq_len : padded_seq_len - 2]
      features.append(seq_emd)


  return features

# Alignment Algorithms

## Global

In [15]:
def affine_global_dp(seq_1, seq_2, g_open, g_ext,
                     scoring="ProtT5", Model=None, Model_tokenizer=None):

    MODELS_LIST = ["ProtT5", "ProtBert", "ProtAlbert", "XLNet", "ESM1b", "ESM2"]

    # initialize the matrix
    m = len(seq_1);
    n = len(seq_2)
    M = np.zeros([m + 1, n + 1])
    M[0, 1:] = g_open + g_ext * np.arange(0, n, 1)
    M[1:, 0] = g_open + g_ext * np.arange(0, m, 1)
    L = np.copy(M);
    U = np.copy(M)
    L[1:, 0] = L[1:, 0] + g_open;
    U[0, 1:] = U[0, 1:] + g_open  # avoiding Gotoh's error

    # fill up
    tracer = np.zeros([np.shape(M)[0], np.shape(M)[1], 7])

    if scoring == "ProtT5":
        emb1 = get_embs_T5(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_T5(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ProtBert":
        emb1 = get_embs_ProtBert(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ProtBert(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ProtAlbert":
        emb1 = get_embs_ProtAlbert(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ProtAlbert(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ProtXLNet":
        emb1 = get_embs_XLNet(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_XLNet(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ESM1b":
        emb1 = get_embs_ESM1b(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ESM1b(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ESM2":
        emb1 = get_embs_ESM2(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ESM2(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            l_arr = np.array([M[i, j - 1] + g_open, L[i, j - 1] + g_ext])
            L[i, j] = np.max(l_arr)
            l_where = l_arr == np.max(l_arr)

            u_arr = np.array([M[i - 1, j] + g_open, U[i - 1, j] + g_ext])
            U[i, j] = np.max(u_arr)
            u_where = u_arr == np.max(u_arr)

            if scoring in MODELS_LIST:
                sim = cos(torch.tensor(emb1[i - 1], dtype=torch.float32)
                          , torch.tensor(emb2[j - 1], dtype=torch.float32)).item()

                m_arr = np.array([M[i - 1, j - 1] + sim, U[i, j], L[i, j]])

            M[i, j] = np.max(m_arr)
            m_where = m_arr == np.max(m_arr)

            idx = np.hstack([m_where, u_where, l_where])
            tracer[i, j, idx] = 1

    # traceback

    alignment = []
    alignment.append(traceback_g(tracer, seq_1, seq_2, affine= True, roadmap=0))

    alignment = list(set(map(tuple, alignment)))

    return M, L, U, tracer, alignment


def traceback_g(tracer, seq_1, seq_2, mat=None, affine=False, roadmap=0):
    # get sequence lengths
    m = len(seq_1);
    n = len(seq_2)

    # convert to numpy arrays
    x = np.array(list(seq_1), dtype='object')
    y = np.array(list(seq_2), dtype='object')

    # set start location
    st = [m + 1, n + 1]

    st_lv = 0  # start in midgard

    while ((st[0] > 1) & (st[1] > 1)):

        B = np.zeros([2, 2])  # define 2x2 box which specifies which way to move

        if affine is True:
            Tr = np.zeros([7])  # define a 7x1 Tr array (will store arrows at each step)
        else:
            Tr = np.zeros([3])  # define a 3x1 Tr array (will store arrows at each step)

        if affine is False:
            Tr[0] = np.copy(tracer[st[0] - 1, st[1] - 1, 0])
            Tr[1] = np.copy(tracer[st[0] - 1, st[1] - 1, 1])
            Tr[2] = np.copy(tracer[st[0] - 1, st[1] - 1, 2])

        else:
            # tracer
            Tr[0] = np.copy(tracer[st[0] - 1, st[1] - 1, 0])
            Tr[1] = np.copy(tracer[st[0] - 1, st[1] - 1, 1])
            Tr[2] = np.copy(tracer[st[0] - 1, st[1] - 1, 2])
            Tr[3] = np.copy(tracer[st[0] - 1, st[1] - 1, 3])
            Tr[4] = np.copy(tracer[st[0] - 1, st[1] - 1, 4])
            Tr[5] = np.copy(tracer[st[0] - 1, st[1] - 1, 5])
            Tr[6] = np.copy(tracer[st[0] - 1, st[1] - 1, 6])

        # bifurcations
        if affine is True:
            levels = [[2, 0, 1], [4, 3], [6, 5]]
        else:
            levels = [[2, 0, 1]]
        for l in levels:
            if np.sum(Tr[l]) > 1:
                choose = np.where(Tr[l] == 1)[0]
                Tr[l] = 0
                if roadmap == 0:
                    r = np.random.choice(choose, 1)[0]  # random turning
                elif roadmap == 1:
                    r = choose[-1]  # highroad
                elif roadmap == 2:
                    r = choose[0]  # lowroad
                else:
                    raise Exception("roadmap only accepts 0: random turning, 1: highroad, 2: lowroad")
                Tr[l[r]] = 1

        # level up-down
        if ((Tr[0] == 1) & (st_lv == 0)):  # diagonal
            B[0, 0] = 1

        if ((Tr[1] == 1) & (st_lv == 0)):
            if affine is True:
                st_lv = 1  # level up
            else:
                B[0, 1] = 1

        if ((Tr[2] == 1) & (st_lv == 0)):
            if affine is True:
                st_lv = 2  # level down
            else:
                B[1, 0] = 1

        # affine gaps allow for level shifts
        if affine is True:
            if ((Tr[4] == 1) & (st_lv == 1)):  # move up
                B[0, 1] = 1

            if ((Tr[3] == 1) & (st_lv == 1)):  # move up back to main
                st_lv = 0
                B[0, 1] = 1

            if ((Tr[6] == 1) & (st_lv == 2)):  # move left
                B[1, 0] = 1

            if ((Tr[5] == 1) & (st_lv == 2)):  # move left back to main
                st_lv = 0
                B[1, 0] = 1

        # movements
        if B[0, 1] == 1:  # upward
            y = np.insert(y, st[1] - 1, '-')  # add a gap
            st[0] = st[0] - 1

        if B[1, 0] == 1:  # leftward
            x = np.insert(x, st[0] - 1, '-')  # add a gap
            st[1] = st[1] - 1

        if B[0, 0] == 1:  # diagonal
            st[1] = st[1] - 1
            st[0] = st[0] - 1

    # some end gaps are left when you hit the upper/lower end of the matrix or a 0
    end_size = (np.size(x) - np.size(y))  # how many gaps and for which sequence
    end_gap = (['-'] * abs(end_size))
    if end_size > 0:
        y = np.insert(y, 0, end_gap)
    elif end_size < 0:
        x = np.insert(x, 0, end_gap)

    # check no overlapping gaps
    x = np.where(((x == '-') & (y == '-')), None, x)
    y = np.where((x == None), '', y)
    x = np.where((x == None), '', x)

    return np.sum(x), np.sum(y)


def traceback_iterator_g(tracer, seq_1, seq_2,
                         affine=False):
    alignment = []
    alignment.append(traceback_g(tracer, seq_1, seq_2, affine=affine, roadmap=0))

    return list(set(map(tuple, alignment)))


## Prefix/Suffix

In [16]:
def affine_semi_global_dp(seq_1, seq_2, g_open, g_ext,
                          high_low=False, scoring="ProtT5", Model=None, Model_tokenizer=None):
    MODELS_LIST = ["ProtT5", "ProtBert", "ProtAlbert", "ProtXLNet", "ESM1b", "ESM2"]

    # initialize the matrix
    m = len(seq_1);
    n = len(seq_2)
    M = np.zeros([m + 1, n + 1])
    M[0, 1:] = 0
    M[1:, 0] = 0
    L = np.copy(M);
    U = np.copy(M)
    L[1:, 0] = 0;
    U[0, 1:] = 0

    # fill up
    tracer = np.zeros([np.shape(M)[0], np.shape(M)[1], 7])

    if scoring == "ProtT5":
        emb1 = get_embs_T5(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_T5(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ProtBert":
        emb1 = get_embs_ProtBert(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ProtBert(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ProtAlbert":
        emb1 = get_embs_ProtAlbert(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ProtAlbert(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ProtXLNet":
        emb1 = get_embs_XLNet(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_XLNet(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ESM1b":
        emb1 = get_embs_ESM1b(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ESM1b(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    if scoring == "ESM2":
        emb1 = get_embs_ESM2(Model, Model_tokenizer, [seq_1], 1)[0].cpu().numpy()
        emb2 = get_embs_ESM2(Model, Model_tokenizer, [seq_2], 1)[0].cpu().numpy()
        cos = torch.nn.CosineSimilarity(dim=0)

    for i in range(1, m + 1):
        for j in range(1, n + 1):
            l_arr = np.array([M[i, j - 1] + g_open, L[i, j - 1] + g_ext])
            L[i, j] = np.max(l_arr)
            l_where = l_arr == np.max(l_arr)

            u_arr = np.array([M[i - 1, j] + g_open, U[i - 1, j] + g_ext])
            U[i, j] = np.max(u_arr)
            u_where = u_arr == np.max(u_arr)

            if scoring in MODELS_LIST:
                sim = cos(torch.tensor(emb1[i - 1], dtype=torch.float32)
                          , torch.tensor(emb2[j - 1], dtype=torch.float32)).item()

                m_arr = np.array([M[i - 1, j - 1] + sim, U[i, j], L[i, j]])

            M[i, j] = np.max(m_arr)
            m_where = m_arr == np.max(m_arr)

            idx = np.hstack([m_where, u_where, l_where])
            tracer[i, j, idx] = 1


    alignment = []
    alignment.append(traceback_sg(tracer, seq_1, seq_2, mat=M, affine=True,
                                  local= True, roadmap=0))
    alignment = list(set(map(tuple, alignment)))

    return M, L, U, tracer, alignment


def traceback_sg(tracer, seq_1, seq_2, mat=None, local=False, affine=False, roadmap=0):

    m = len(seq_1);
    n = len(seq_2)

    x = np.array(list(seq_1), dtype='object')
    y = np.array(list(seq_2), dtype='object')

    # set start location
    if roadmap == 0:
        r = np.random.choice(range(np.size(np.where(mat == np.max(mat))[0])), 1)[0]  # random maxima
    elif roadmap == 1:
        r = -1
    elif roadmap == 2:
        r = 0

    st = [(np.where(mat == np.max(mat))[0][r]) + 1, (np.where(mat == np.max(mat))[1][r]) + 1]

    # set starting gaps based on the start location
    start_size = ((m - st[0]) - (n - st[1]))  # how many gaps and for which sequence
    start_gap = (['-'] * abs(start_size))
    if start_size > 0:
        y = np.append(y, start_gap)
    elif start_size < 0:
        x = np.append(x, start_gap)

    st_lv = 0  # start in midgard

    while ((st[0] > 1) & (st[1] > 1)):

        B = np.zeros([2, 2])  # define 2x2 box which specifies which way to move

        if affine is True:
            Tr = np.zeros([7])  # define a 7x1 Tr array (will store arrows at each step)
        else:
            Tr = np.zeros([3])  # define a 3x1 Tr array (will store arrows at each step)


        if affine is False:
            Tr[0] = np.copy(tracer[st[0] - 1, st[1] - 1, 0])
            Tr[1] = np.copy(tracer[st[0] - 1, st[1] - 1, 1])
            Tr[2] = np.copy(tracer[st[0] - 1, st[1] - 1, 2])

        else:
            # tracer
            Tr[0] = np.copy(tracer[st[0] - 1, st[1] - 1, 0])
            Tr[1] = np.copy(tracer[st[0] - 1, st[1] - 1, 1])
            Tr[2] = np.copy(tracer[st[0] - 1, st[1] - 1, 2])
            Tr[3] = np.copy(tracer[st[0] - 1, st[1] - 1, 3])
            Tr[4] = np.copy(tracer[st[0] - 1, st[1] - 1, 4])
            Tr[5] = np.copy(tracer[st[0] - 1, st[1] - 1, 5])
            Tr[6] = np.copy(tracer[st[0] - 1, st[1] - 1, 6])

        # bifurcations
        if affine is True:
            levels = [[2, 0, 1], [4, 3], [6, 5]]
        else:
            levels = [[2, 0, 1]]
        for l in levels:
            if np.sum(Tr[l]) > 1:
                choose = np.where(Tr[l] == 1)[0]
                Tr[l] = 0
                if roadmap == 0:
                    r = np.random.choice(choose, 1)[0]  # random turning
                elif roadmap == 1:
                    r = choose[-1]  # highroad
                elif roadmap == 2:
                    r = choose[0]  # lowroad
                else:
                    raise Exception("roadmap only accepts 0: random turning, 1: highroad, 2: lowroad")
                Tr[l[r]] = 1

        # level up-down
        if ((Tr[0] == 1) & (st_lv == 0)):  # diagonal
            B[0, 0] = 1

        if ((Tr[1] == 1) & (st_lv == 0)):
            if affine is True:
                st_lv = 1  # level up
            else:
                B[0, 1] = 1

        if ((Tr[2] == 1) & (st_lv == 0)):
            if affine is True:
                st_lv = 2  # level down
            else:
                B[1, 0] = 1

        # affine gaps allow for level shifts
        if affine is True:
            if ((Tr[4] == 1) & (st_lv == 1)):  # move up
                B[0, 1] = 1

            if ((Tr[3] == 1) & (st_lv == 1)):  # move up back to main
                st_lv = 0
                B[0, 1] = 1

            if ((Tr[6] == 1) & (st_lv == 2)):  # move left
                B[1, 0] = 1

            if ((Tr[5] == 1) & (st_lv == 2)):  # move left back to main
                st_lv = 0
                B[1, 0] = 1

        if local is True:
            if (mat[st[0] - 1, st[1] - 1] == 0):
                break

        # movements
        if B[0, 1] == 1:  # upward
            y = np.insert(y, st[1] - 1, '-')  # add a gap
            st[0] = st[0] - 1

        if B[1, 0] == 1:  # leftward
            x = np.insert(x, st[0] - 1, '-')  # add a gap
            st[1] = st[1] - 1

        if B[0, 0] == 1:  # diagonal
            st[1] = st[1] - 1
            st[0] = st[0] - 1

    # some end gaps are left when you hit the upper/lower end of the matrix or a 0
    end_size = (np.size(x) - np.size(y))  # how many gaps and for which sequence
    end_gap = (['-'] * abs(end_size))
    if end_size > 0:
        y = np.insert(y, 0, end_gap)
    elif end_size < 0:
        x = np.insert(x, 0, end_gap)

    # check no overlapping gaps
    x = np.where(((x == '-') & (y == '-')), None, x)
    y = np.where((x == None), '', y)
    x = np.where((x == None), '', x)

    return np.sum(x), np.sum(y)

# Aux Funx

In [17]:
def load_fasta(path):
    fasta_sequences = SeqIO.parse(open(path),'fasta')
    sequences = []

    for fasta in fasta_sequences:
        name, sequence = fasta.id, str(fasta.seq)
        sequences.append((name, sequence.upper()))

    return sequences

def just_seqs(seqs):
    final_seqs = []
    for seq in seqs:
        final_seqs.append(seq[1])

    return final_seqs

def aligned_to_indexed(seqs):
  no_dash = []
  positions = []
  for seq in seqs:
    no_dash.append(seq.replace("-" , ""))
    pos = []
    for i , char in enumerate(seq):
      if char != "-":
        pos.append(i)
    positions.append(pos)

  return no_dash, positions

def length_matcher(x , y , place = ""):
  length = 5

  if len(x) < length:
    spaces = abs(len(x) - length)

    if place == "Back":
      x = " " * spaces + x
    if place == "Front":
      x = x + " " * spaces

  if len(y) < length:
    spaces = abs(len(y) - length)

    if place == "Back":
      y = " " * spaces + y
    if place == "Front":
      y = y + " " * spaces

  return x, y

# Alignment Computations

In [18]:
def get_alignments(prot1, prot2, gap_penalty = 0, gap_extension_penalty = 0 ,
                   scoring = "ProtT5" , alignment_type = "Global-regular" , Model = "" , Model_Tokenizer = ""):

    if alignment_type == "Global-regular":
      M, L, U , tracer , alignment= affine_global_dp(prot1, prot2, gap_penalty, gap_extension_penalty
                                                    ,scoring = scoring , Model = Model, Model_tokenizer = Model_Tokenizer)
      max_score = np.max(M)

    if alignment_type == "Global-end-gap-free" or alignment_type == "End-Gap-Free":
      M, L, U , tracer , alignment= affine_semi_global_dp(prot1, prot2, gap_penalty, gap_extension_penalty
                                                    ,scoring = scoring , Model = Model, Model_tokenizer = Model_Tokenizer)

      max_score = max(M[-1,-1],L[-1,-1],U[-1,-1])

    aligned1 = alignment[0][0]
    aligned2 = alignment[0][1]

    return aligned1, aligned2, max_score

In [19]:
def get_visualization(prot1, prot2 , score , Type = "" , Model = "" , Model_Tokenizer = ""):

  MODELS_LIST = ["ProtT5" , "ProtBert" , "ProtAlbert" , "ProtXLNet" , "ESM1b" , "ESM2"]
  cos = torch.nn.CosineSimilarity(dim=0)

  seqs = [prot1 , prot2]
  no_dash , positions = aligned_to_indexed(seqs)

  if Type == "ProtT5":
    model = Model
    tokenizer = Model_Tokenizer
    p1_emb = get_embs_T5(model, tokenizer, [no_dash[0]] , 1)
    p2_emb = get_embs_T5(model, tokenizer, [no_dash[1]] , 1)

  if Type == "ProtBert":
    model = Model
    tokenizer = Model_Tokenizer
    p1_emb = get_embs_ProtBert(model, tokenizer, [no_dash[0]] , 1)
    p2_emb = get_embs_ProtBert(model, tokenizer, [no_dash[1]] , 1)

  if Type == "ProtAlbert":
    model = Model
    tokenizer = Model_Tokenizer
    p1_emb = get_embs_ProtAlbert(model, tokenizer, [no_dash[0]] , 1)
    p2_emb = get_embs_ProtAlbert(model, tokenizer, [no_dash[1]] , 1)

  if Type == "ProtXLNet":
    model = Model
    tokenizer = Model_Tokenizer
    p1_emb = get_embs_XLNet(model, tokenizer, [no_dash[0]] , 1)
    p2_emb = get_embs_XLNet(model, tokenizer, [no_dash[1]] , 1)

  if Type == "ESM1b":
    model = Model
    tokenizer = Model_Tokenizer
    p1_emb = get_embs_ESM1b(model, tokenizer, [no_dash[0]] , 1)
    p2_emb = get_embs_ESM1b(model, tokenizer, [no_dash[1]] , 1)

  if Type == "ESM2":
    model = Model
    tokenizer = Model_Tokenizer
    p1_emb = get_embs_ESM2(model, tokenizer, [no_dash[0]] , 1)
    p2_emb = get_embs_ESM2(model, tokenizer, [no_dash[1]] , 1)

  p1_revived = ""
  p2_revived = ""
  aligned_info = ""

  for i in range(len(prot1)):

    if i in positions[0]:
      p1_revived += prot1[i]
    else:
      p1_revived += "-"

    if i in positions[1]:
      p2_revived += prot2[i]
    else:
      p2_revived += "-"


    if p1_revived[-1] == p2_revived[-1]:
      aligned_info += p1_revived[-1]

    elif p1_revived[-1] == "-" or p2_revived[-1] == "-":
      aligned_info += " "

    elif p1_revived[-1] != p2_revived[-1]:

      if Type in MODELS_LIST:
        sim = cos(torch.tensor(p1_emb[0][positions[0].index(i)] , dtype = torch.float32) ,
                  torch.tensor(p2_emb[0][positions[1].index(i)] , dtype = torch.float32)).item()

        aligned_info += " "

  del model
  del tokenizer

  return p1_revived , aligned_info, p2_revived, score

# Get Alignment For 2 Sequences

In [20]:
def alignment_file_TXT(saving_add, seqs_path, scoring, alignment_type,
                      gap_penalty, gap_extension_penalty):

  if scoring == "ProtT5":
    Model , Model_Tokenizer = ProtT5_initialize()

  if scoring == "ProtBert":
    Model , Model_Tokenizer = ProtBert_initialize()

  if scoring == "ProtAlbert":
    Model , Model_Tokenizer = ProtAlbert_initialize()

  if scoring == "ProtXLNet":
    Model , Model_Tokenizer = XLNet_initialize()

  if scoring == "ESM1b":
    Model , Model_Tokenizer = ESM1b_initialize()

  if scoring == "ESM2":
    Model , Model_Tokenizer = ESM2_initialize()

  seqs = load_fasta(seqs_path)

  prot1 = seqs[0][1]
  prot2 = seqs[1][1]

  name1 = seqs[0][0]
  name2 = seqs[1][0]

  reference_al, query_al, al_score = get_alignments(prot1, prot2, gap_penalty = gap_penalty,
                    gap_extension_penalty = gap_extension_penalty ,
                                              scoring = scoring , alignment_type = alignment_type,
                                              Model = Model , Model_Tokenizer = Model_Tokenizer)

  p1_al , aligned_info , p2_al , al_score = get_visualization(reference_al , query_al, al_score , Type = scoring,
                                                              Model = Model, Model_Tokenizer = Model_Tokenizer)

  full_al_1 = p1_al
  full_al_2 = p2_al

  FOLDER = saving_add

  if not os.path.exists(FOLDER):
   os.makedirs(FOLDER)

  file_name = FOLDER + seqs_path.split("/")[-1].split(".")[-2] + "_" + scoring + "_" + alignment_type + "_"
  file_name += str(gap_penalty) + "_" + str(gap_extension_penalty) + "_"+ "Alignment" + ".txt"
  f = open(file_name, "w")

  f.write("Seq 1 \n")
  f.write(">" + name1)
  f.write("\n")
  f.write(reference_al.replace("-" , ""))
  f.write("\n")
  f.write("Seq 2 \n")
  f.write(">" + name2)
  f.write("\n")
  f.write(query_al.replace("-" , ""))
  f.write("\n")
  f.write("\n")
  f.write("Alignment Type : " + alignment_type)
  f.write("\n")
  f.write("\n")

  f.write("Opening Gap Penalty : " + str(gap_penalty))
  f.write("\n")
  f.write("Extension Gap Penalty : " + str(gap_extension_penalty))
  f.write("\n")
  f.write("Scoring System : " + scoring)
  f.write("\n")
  f.write("Score : "  + str(al_score))

  f.write("\n")
  f.write("\n")

  p1_pos = 1
  p2_pos = 1
  aligned_gaps = ""

  for j in range(int(len(p1_al) / 60) + 1):
    p1_posix = p1_al[j * 60: (j + 1) * 60]
    p2_posix = p2_al[j * 60: (j + 1) * 60]
    p1_back_str, p2_back_str = length_matcher(str(p1_pos) , str(p2_pos) , place = "Front")

    for k in range(len(p1_posix)):
      if p1_posix[k] != "-":
        p1_pos += 1
      if p2_posix[k] != "-":
        p2_pos += 1

    p1_end_str, p2_end_str = length_matcher(str(p1_pos - 1) , str(p2_pos - 1) , place = "Back")
    aligned_gaps = " " * len(p1_back_str)

    f.write("Seq 1 : " + p1_back_str + " " + p1_al[j * 60: (j + 1) * 60] + " " + p1_end_str)
    f.write("\n")
    f.write("        "  +  aligned_gaps + " " + aligned_info[j * 60: (j + 1) * 60])
    f.write("\n")
    f.write("Seq 2 : "  + p2_back_str + " " + p2_al[j * 60: (j + 1) * 60] + " " + p2_end_str)
    f.write("\n")
    f.write("\n")

  print("Alignment Computation is Done!")

  del Model
  del Model_Tokenizer

In [21]:
def user_guide(MODELS_LIST , ALIGNMENT_TYPES):
  print("Parameters & Descriptions : ")
  print()
  print("saving_add : the path to the directory for the output")
  print("seqs : the path of the directory for the FASTA file with two protein sequences")
  print("scoring_type : the embedding method used to produce the embedding vectors;"  +
        " allowed values are: " , end = "")
  for model_name in MODELS_LIST[:-1] : print(model_name + ", " , end = "")
  print(MODELS_LIST[-1])

  print("alignment_type: Global-regular or Global-end-gap-free")
  print("gap_penalty = -1 (default); Recommended Values: -4, -3, -2, -1.5, -1, -0.5")
  print("gap_expension_penalty = -0.2 (default); Recommended Values: -1, -0.8, -0.5, -0.3, -0.2, -0.1")

# Tests

In [22]:
MODELS_LIST = ["ProtT5" , "ESM2" , "ProtBert" , "ProtAlbert" , "ESM1b" ,"ProtXLNet"]
ALIGNMENT_TYPES = ["Global-regular" , "Global-end-gap-free"]

user_guide(MODELS_LIST , ALIGNMENT_TYPES)

Parameters & Descriptions : 

saving_add : the path to the directory for the output
seqs : the path of the directory for the FASTA file with two protein sequences
scoring_type : the embedding method used to produce the embedding vectors; allowed values are: ProtT5, ESM2, ProtBert, ProtAlbert, ESM1b, ProtXLNet
alignment_type: Global-regular or Global-end-gap-free
gap_penalty = -1 (default); Recommended Values: -4, -3, -2, -1.5, -1, -0.5
gap_expension_penalty = -0.2 (default); Recommended Values: -1, -0.8, -0.5, -0.3, -0.2, -0.1


In [23]:
saving_add =  "/content/"
seqs_path = "Test1.fasta"
scoring = MODELS_LIST[0] #ProtT5
alignment_type = ALIGNMENT_TYPES[0] #Global-regular
gap_penalty = -1
gap_extension_penalty = -0.2

alignment_file_TXT(saving_add = saving_add , seqs_path = seqs_path, scoring = scoring, alignment_type = alignment_type,
                      gap_penalty = gap_penalty, gap_extension_penalty = gap_extension_penalty)

ProtT5 Initialize : 
Loading: Rostlab/prot_t5_xl_half_uniref50-enc


Downloading (…)lve/main/config.json:   0%|          | 0.00/656 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/2.42G [00:00<?, ?B/s]

Downloading (…)ve/main/spiece.model:   0%|          | 0.00/238k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/1.79k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. If you see this, DO NOT PANIC! This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=True`. This should only be set if you understand what it means, and thouroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


Alignment Computation is Done!
