In [3]:
import numpy as np
import torch
from transformers import T5EncoderModel, T5Tokenizer
import re

# Load model
tokenizer = T5Tokenizer.from_pretrained("Rostlab/prot_t5_xl_uniref50", do_lower_case=False)
model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_uniref50")

Some weights of the model checkpoint at Rostlab/prot_t5_xl_uniref50 were not used when initializing T5EncoderModel: ['decoder.block.22.layer.1.EncDecAttention.o.weight', 'decoder.block.16.layer.1.EncDecAttention.k.weight', 'decoder.block.11.layer.0.layer_norm.weight', 'decoder.block.18.layer.0.SelfAttention.o.weight', 'decoder.block.22.layer.2.layer_norm.weight', 'decoder.block.15.layer.1.EncDecAttention.k.weight', 'decoder.block.3.layer.1.EncDecAttention.k.weight', 'decoder.block.5.layer.1.EncDecAttention.o.weight', 'decoder.block.22.layer.1.EncDecAttention.q.weight', 'decoder.block.5.layer.0.layer_norm.weight', 'decoder.block.21.layer.2.layer_norm.weight', 'decoder.block.22.layer.0.layer_norm.weight', 'decoder.block.18.layer.1.EncDecAttention.q.weight', 'decoder.block.17.layer.2.DenseReluDense.wo.weight', 'decoder.block.0.layer.2.layer_norm.weight', 'decoder.block.0.layer.0.layer_norm.weight', 'decoder.block.16.layer.0.SelfAttention.k.weight', 'decoder.block.17.layer.0.SelfAttention.

# INITIAL LOCAL ALIGNMENT TESTING

In [2]:
# Initialize protein sequences (taken from two PH domain proteins)
seq1 = 'AGFISVISKKQGEYLEDEWY'
seq2 = 'QVLDKFGS'
#seq1 = 'MRSSASRLSSFSSRDSLWNRMPDQISVSEFIAETTEDYNSPTTSSFTTRLHNCRNTVTLLEEALDQDRTALQKVKKSVKAIYNSGQDHVQNEENYAQVLDKFGSNFLSRDNPDLGTAFVKFSTLTKELSTLLKNLLQGLSHNVIFTLDSLLKGDLKGVKGDLKKPFDKAWKDYETKFTKIEKEKREHAKQHGMIRTEITGAEIAEEMEKERRLFQLQMCEYLIKVNEIKTKKGVDLLQNLIKYYHAQCNFFQDGLKTADKLKQYIEKLAADLYNIKQTQDEEKKQLTALRDLIKSSLQLDQKEDSQSRQGGYSMHQLQGN'
#seq2 = 'NTVTLLEEALDQDRTALQKVKKSVKAIYNSGQDHVQNEENYAQVLDKFGSNFLSRDNPDLGTAF'

In [27]:
def vec_seqs(seq1, seq2):
    """=============================================================================================
    This function accepts two protein sequences and returns a two lists of vectors, one list for
    each protein where each vector represents one amino acid. Vectorization performed by the Rostlab
    ProtT5-XL_UniRef50 model.

    :param seq1: first protein sequence
    :param seq2: second protein sequence
    return: lists of vectorized amino acids
    ============================================================================================="""

    # Tokenize, encode, and load sequences
    sequences = [' '.join([*seq1]), ' '.join([*seq2])]  # Add spaces between each amino acid
    sequences = [re.sub(r"[UZOB]", "X", seq) for seq in sequences]
    ids = tokenizer.batch_encode_plus(sequences, add_special_tokens=True, padding=True)
    input_ids = torch.tensor(ids['input_ids']).to(device)
    attention_mask = torch.tensor(ids['attention_mask']).to(device)

    # Extract sequence features
    with torch.no_grad():
        embedding = model(input_ids=input_ids,attention_mask=attention_mask)
    embedding = embedding.last_hidden_state.cpu().numpy()

    # Remove padding and special tokens
    features = [] 
    for seq_num in range(len(embedding)):
        seq_len = (attention_mask[seq_num] == 1).sum()
        seq_emd = embedding[seq_num][:seq_len-1]
        features.append(seq_emd)

    # Return lists of vectors
    seq1 = features[0]
    seq2 = features[1]
    return seq1, seq2

In [42]:
def SW_align(seq1, seq2, vecs1, vecs2):
    """=============================================================================================
    This function accepts two sequences, creates a matrix corresponding to their lengths, and  
    calculates the score of the alignments for each index. A second matrix is scored so that the
    best alignment can be tracebacked.

    :param seq1: first sequence
    :param seq2: second sequence
    :param vecs1: list of vectorized amino acids for first sequence
    :param vecs2: list of vectorized amino acids for second sequence
    return: scoring and traceback matrices of optimal scores for the SW-alignment of sequences
    ============================================================================================="""

    # NCBI default gap costs
    gap_open = -11
    gap_ext = -1
    gap = False

    # Protein alphabet
    chars = 'ACDEFGHIKLMNPQRSTVWY'

    # Initialize scoring and traceback matrix based on sequence lengths
    row_length = len(seq1)+1
    col_length = len(seq2)+1
    score_m = np.full((row_length, col_length), 0)
    trace_m = np.full((row_length, col_length), 0)

    # Score matrix by moving through each index
    for i in range(len(seq1)):
        seq1_char = seq1[i]  # Character in 1st sequence
        seq1_vec = vecs1[i]  # Corresponding amino acid vector
        for j in range(len(seq2)):
            seq2_char = seq2[j]
            
            # Preceding scoring matrix values
            diagonal = score_m[i][j]
            horizontal = score_m[i+1][j]
            vertical = score_m[i][j+1]

            # Score residues based off cosine similarity between vectors
            print(seq1_vec, seq2_vec)
            seq2_vec = vecs2[j]  # Corresponding amino acid vector
            cos_sim = np.dot(seq1_vec,seq2_vec)/(np.linalg.norm(seq1_vec)*np.linalg.norm(seq2_vec))

            '''TRYING OUT DIFFERENT COSINE SIMILARITY VALUE MODIFIERS'''
            cos_sim = (cos_sim*10)

            # Add to matrix values via scoring method
            diagonal += cos_sim
            if gap is False:  # Apply gap_open penalty if there is no gap
                horizontal += gap_open
                vertical += gap_open
            if gap is True:  # Apply gap_extension penalty if there is a gap
                horizontal += gap_ext
                vertical += gap_ext

            # Update gap status
            score = max(diagonal, horizontal, vertical)
            if score == horizontal: gap = True
            if score == vertical: gap = True
            if score == diagonal: gap = False

            # Assign value to traceback matrix
            if score == diagonal:
                trace_m[i+1][j+1] = 0
            if score == horizontal:
                trace_m[i+1][j+1] = -1
            if score == vertical:
                trace_m[i+1][j+1] = -1

            # Assign max value to scoring matrix
            score_m[i+1][j+1] = max(score, 0)

    return score_m, trace_m

In [28]:
# Call vec_seqs() to get list of vectorized amino acids
seq1_vecs, seq2_vecs = vec_seqs(seq1, seq2)

In [41]:
seq1_vecs

array([[-0.09522017, -0.14406678, -0.08793196, ...,  0.11808879,
        -0.24785922,  0.00921392],
       [ 0.21217601,  0.01319041, -0.05396945, ...,  0.43071347,
        -0.18787555,  0.02300165],
       [ 0.30360776, -0.11321972, -0.18153799, ...,  0.02647195,
         0.12702596, -0.02317703],
       [ 0.1496497 , -0.08891518, -0.12834446, ...,  0.02811868,
         0.05552136,  0.17308137],
       [ 0.00992799, -0.01431641,  0.22271894, ...,  0.1056217 ,
        -0.01820021, -0.22538273]], dtype=float32)

In [44]:
# Call SW_align()
score_m, trace_m = SW_align(seq1, seq2, seq1_vecs, seq2_vecs)

In [46]:
def traceback(score_m, trace_m, seq1, seq2):
    """=============================================================================================
    This function accepts a scoring and a traceback matrix and two sequences and returns the highest 
    scoring local alignment between the two sequences

    :param score_m: scoring matrix
    :param trace_m: traceback matrix
    :param seq1: first sequence
    :param seq2: second sequence
    return: highest scoring local alignment of the two sequences
    ============================================================================================="""

    # Find index of highest score in scoring matrix, start traceback at this matrix
    high_score_ind = np.unravel_index(np.argmax(score_m, axis=None), score_m.shape)

    # Reverse strings and convert to lists so gaps can be inserted
    rev_seq1 = list(seq1[::-1])
    rev_seq2 = list(seq2[::-1])

    # Move through matrix starting at bottom right
    index = [high_score_ind[0], high_score_ind[1]]
    count = 0
    while (index[0] and index[1]) != 0:
        val = trace_m[index[0], index[1]]

        if val == 1:  # If cell is equal to 1, insert a gap into the second sequence
            index[0] = index[0] - 1
            rev_seq2.insert(count, '_')
        if val == -1:  # If cell is equal to -1, insert a gap into the first sequence
            index[1] = index[1] - 1
            rev_seq1.insert(count, '_')
        if val == 0:  # If cell is equal to 0, there is no gap
            index[0] = index[0] - 1
            index[1] = index[1] - 1
        count += 1

    # Join lists and reverse strings again
    seq1 = ''.join(rev_seq1)
    seq2 = ''.join(rev_seq2)

    # Need to store alignment data somehow, but until then we are printing spaces
    space = ' '
    print(index[1]*space+seq1[::-1])
    print(index[0]*space+seq2[::-1]) 

In [47]:
traceback(score_m, trace_m, seq1, seq2)

AGFISVISKKQGEYLEDEWY
            QVLDKFGS


# TESTING ON GLOBAL ALIGNMENT

In [4]:
from Bio import SeqIO

def parse_fasta(filename):
    """=============================================================================================
    This function accepts a fasta file name and returns the sequence.

    :param filename: name of file
    return: sequence
    ============================================================================================="""

    # Parse fasta file
    seq = ''
    with open(filename, 'r', encoding='utf8') as file:
        for seq in SeqIO.parse(file, 'fasta'):
            seq = str(seq.seq)
    return seq

In [39]:
def embed_seq(seq, tokenizer, encoder):
    """=============================================================================================
    This function accepts a protein sequence and returns a list of vectors, each vector representing
    a single amino acid.

    :param seq: protein sequence
    :param: tokenizer: tokenizer model
    :param encoder: encoder model
    return: np array of vectors
    ============================================================================================="""

    # Add space after each amino acid so each residue is vectorized
    print(seq)
    seq = [' '.join([*seq])]
    print(seq)

    # Tokenize, encode, and load sequence
    ids = tokenizer.batch_encode_plus(seq, add_special_tokens=True, padding=True)
    input_ids = torch.tensor(ids['input_ids'])
    attention_mask = torch.tensor(ids['attention_mask'])

    # Extract sequence features
    with torch.no_grad():
        embedding = encoder(input_ids=input_ids,attention_mask=attention_mask)
    embedding = embedding.last_hidden_state.cpu().numpy()

    # Remove padding and special tokens
    features = [] 
    for seq_num in range(len(embedding)):
        seq_len = (attention_mask[seq_num] == 1).sum()
        seq_emd = embedding[seq_num][:seq_len-1]
        features.append(seq_emd)
    return features[0]  # Returns as a np array

In [6]:
import numpy as np

def global_align(seq1, seq2, vecs1, vecs2, gopen, gext):
    """=============================================================================================
    This function accepts two sequences, creates a matrix corresponding to their lengths, and
    calculates the score of the alignments for each index. A second matrix is scored so that the
    best alignment can be tracebacked.

    :param seq1: first sequence
    :param seq2: second sequence
    :param vecs1: list of vectors for first sequence
    :param vecs2: list of vectors for second sequence
    :param gopen: gap penalty for opening a new gap
    :param gext: gap penalty for extending a gap
    return: traceback matrix
    ============================================================================================="""

    # Initialize scoring and traceback matrix based on sequence lengths
    row_length = len(seq1)+1
    col_length = len(seq2)+1
    score_m = np.full((row_length, col_length), 0)
    trace_m = np.full((row_length, col_length), 0)

    # Initialize first row and column with gap values for scoring matrix
    for i in range(1, len(score_m[0])):
        score_m[0][i] = gopen+gext*i+1  # +1 to offset i starting at 1
    for i in range(1, len(score_m.T[0])):
        score_m.T[0][i] = gopen+gext*i+1

    # Score matrix by moving through each index
    gap = False
    for i, char in enumerate(seq1):
        seq1_vec = vecs1[i]  # Corresponding amino acid vector
        for j, char in enumerate(seq2):

            # Preceding scoring matrix values
            diagonal = score_m[i][j]
            horizontal = score_m[i+1][j]
            vertical = score_m[i][j+1]

            # Score residues based off cosine similarity between vectors
            seq2_vec = vecs2[j]  # Corresponding amino acid vector
            print(seq1_vec, seq2_vec)
            print(seq1_vec.shape, seq2_vec.shape)
            cos_sim = np.dot(seq1_vec,seq2_vec)/(np.linalg.norm(seq1_vec)*np.linalg.norm(seq2_vec))

            '''NOT SETTLED ON WEIGHT OF COSINE SIMILARITY YET'''
            cos_sim = (cos_sim*10)

            # Add to matrix values via scoring method
            diagonal += cos_sim
            if gap is False:  # Apply gap_open penalty if there is no gap
                horizontal += gopen
                vertical += gopen
            if gap is True:  # Apply gap_extension penalty if there is a gap
                horizontal += gext
                vertical += gext

            # Update gap status
            score = max(diagonal, horizontal, vertical)
            if score == horizontal:
                gap = True
            if score == vertical:
                gap = True
            if score == diagonal:
                gap = False

            # Assign value to traceback matrix
            if score == diagonal:
                trace_m[i+1][j+1] = 0
            if score == horizontal:
                trace_m[i+1][j+1] = -1
            if score == vertical:
                trace_m[i+1][j+1] = 1

            # Assign value to scoring matrix
            score_m[i+1][j+1] = score

    return trace_m

In [7]:
def write_align(seq1, seq2):
    """=============================================================================================
    This function accepts two sequences after gaps have been introduced and writes them to a file
    in no particular format (yet).

    :param seq1: first aligned sequence
    :param seq2: second aligned sequence
    ============================================================================================="""

    # Add space every 10 characters
    seq1 = [seq1[i:i+10] for i in range(0, len(seq1), 10)]
    seq1 = ' '.join(seq1)
    seq2 = [seq2[i:i+10] for i in range(0, len(seq2), 10)]
    seq2 = ' '.join(seq2)

    # Split sequences every 50 characters
    seq1_split = [seq1[i:i+55] for i in range(0, len(seq1), 55)]
    seq2_split = [seq2[i:i+55] for i in range(0, len(seq2), 55)]

    # Find max length sequence and write to file based on its length
    name1 = 'seque1'
    name2 = 'seque2'
    with open('PEbA_alignment.txt', 'w', encoding='utf8') as file:
        file.write('PileUp\n\n\n')
        file.write(f'   MSF:  {len(seq1)}  Type:  P\n\n')
        file.write(f'Name: {name1} oo  Len:  {len(seq1)}\n')
        file.write(f'Name: {name2} oo  Len:  {len(seq2)}\n\n//\n\n\n\n')
        for i in range(len(seq1_split)):
            file.write(f'{name1}      {seq1_split[i]}\n')
            file.write(f'{name2}      {seq2_split[i]}\n\n')

In [8]:
def traceback(trace_m, seq1, seq2):
    """=============================================================================================
    This function accepts a scoring and a traceback matrix and two sequences and returns global
    alignment between the two sequences

    :param trace_m: traceback matrix
    :param seq1: first sequence
    :param seq2: second sequence
    return: highest scoring local alignment of the two sequences
    ============================================================================================="""

    # Reverse strings and convert to lists so gaps can be inserted
    rev_seq1 = list(seq1[::-1])
    rev_seq2 = list(seq2[::-1])

    # Move through matrix starting at bottom right
    rows, cols = trace_m.shape
    index = [rows-1, cols-1]
    count = 0
    while index != [0, 0]:
        val = trace_m[index[0], index[1]]
        if val == 1:  # If cell is equal to 1, insert a gap into the second sequence
            index[0] = max(index[0] - 1, 0)  # Taking max of new index and 0 so index never below 0
            rev_seq2.insert(count, '.')
        if val == -1:  # If cell is equal to -1, insert a gap into the first sequence
            index[1] = max(index[1] - 1, 0)
            rev_seq1.insert(count, '.')
        if val == 0:  # If cell is equal to 0, there is no gap
            index[0] = max(index[0] - 1, 0)
            index[1] = max(index[1] - 1, 0)
        count += 1

    # Join lists and reverse strings again
    seq1 = ''.join(rev_seq1)
    seq2 = ''.join(rev_seq2)
    seq1 = seq1[::-1]
    seq2 = seq2[::-1]

    # Introduce gaps at end of either sequence based off length of other sequence
    seq1 = seq1+"."*max(0, len(seq2)-len(seq1))
    seq2 = seq2+"."*max(0, len(seq1)-len(seq2))
    write_align(seq1, seq2)

In [12]:
# Parse fasta files
seq1 = parse_fasta('test1.fa')
seq2 = parse_fasta('test2.fa')

KKSVK
KVKKSVKAIYKVKKKSVKAIY


In [40]:
# Vectorize sequences
vecs1 = embed_seq(seq1, tokenizer, model)
vecs2 = embed_seq(seq2, tokenizer, model)

KKSVK
['K K S V K']
[array([[-0.09522031, -0.14406678, -0.08793188, ...,  0.11808877,
        -0.2478591 ,  0.00921371],
       [ 0.21217607,  0.01319035, -0.05396955, ...,  0.4307134 ,
        -0.18787546,  0.0230017 ],
       [ 0.30360746, -0.11321975, -0.18153802, ...,  0.02647187,
         0.12702607, -0.02317699],
       [ 0.14964975, -0.08891524, -0.12834471, ...,  0.02811862,
         0.0555214 ,  0.17308147],
       [ 0.00992806, -0.01431647,  0.22271892, ...,  0.10562176,
        -0.01820003, -0.22538298]], dtype=float32)]
KVKKSVKAIYKVKKKSVKAIY
['K V K K S V K A I Y K V K K K S V K A I Y']
[array([[-8.48566666e-02,  2.06543244e-02, -1.97138995e-01, ...,
         5.74776828e-01,  9.62849483e-02,  2.74258345e-01],
       [ 9.24015641e-02,  2.07972765e-01, -2.83893049e-01, ...,
         1.81381553e-01,  1.26026258e-01, -9.12059322e-02],
       [-3.66858184e-01,  4.48539332e-02, -6.46745265e-02, ...,
         1.05602242e-01,  2.25269645e-01, -2.63850943e-05],
       ...,
       [ 

In [43]:
print(vecs1[0].shape)

(5, 1024)


In [25]:
# Call global_align() to get traceback matrix
trace_m = global_align(seq1, seq2, vecs1, vecs2, -3, -1)

[[ 0.08304115 -0.21574347 -0.02853072 ... -0.0790941  -0.49132112
  -0.23327847]] [[ 0.08304115 -0.21574347 -0.02853072 ... -0.0790941  -0.49132112
  -0.23327847]]
(1, 1024) (1, 1024)


ValueError: shapes (1,1024) and (1,1024) not aligned: 1024 (dim 1) != 1 (dim 0)

In [None]:
# Call traceback() to get global alignment between seq1 and seq2
traceback(trace_m, seq1, seq2)