In [3]:
import os
import math
import numpy as np
import random
import logging

# Bring in PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
# Most of the examples have typing on the signatures for readability
from typing import Optional, Callable, List, Tuple
from Bio import SeqIO
# For data loading
from torch.utils.data import Dataset, IterableDataset, TensorDataset, DataLoader
import json
import glob
import gzip
import bz2

# For progress and timing
from tqdm import tqdm
import time
import shutil
from Bio.PDB import PDBList
from Bio.PDB.MMCIFParser import MMCIFParser

In [4]:
rom Bio.PDB import PDBList

#def pdb_retriever(file_name):
file_name = 'AF-A0A1D8PD42-F1-model_v4'
#file_name = '3goe'

pdbl = PDBList()
#pdbl.retrieve_pdb_file("3goe", file_format='mmCif', pdir=".")
pdbl.retrieve_pdb_file(file_name, file_format='mmCif', pdir=".")
# import the needed class
from Bio.PDB.MMCIFParser import MMCIFParser

# instantiate the class to prepare the parser
cif_parser = MMCIFParser()
# load the structure to an object
structure = cif_parser.get_structure(file_name, f"{file_name}.cif")
#structure = cif_parser.get_structure("3goe", "3goe.cif")

model0 = structure[0]

#model0 = structure[1] - error due to there only being one model
chain_A = model0['A']  # and we get chain A
# dictionary converting 3-letter codes to 1-letter codes
# this is a very common need in bioinformatics of proteins
d3to1 = {'CYS': 'C', 'ASP': 'D', 'SER': 'S', 'GLN': 'Q', 'LYS': 'K',
 'ILE': 'I', 'PRO': 'P', 'THR': 'T', 'PHE': 'F', 'ASN': 'N',
 'GLY': 'G', 'HIS': 'H', 'LEU': 'L', 'ARG': 'R', 'TRP': 'W',
 'ALA': 'A', 'VAL':'V', 'GLU': 'E', 'TYR': 'Y', 'MET': 'M'}

seq = []
for residue in chain_A:
    # for simplicity we can use X for heteroatoms (ions and water)
    seq.append(d3to1.get(residue.get_resname(), 'X'))  #converts water and ions to X
print(''.join(seq))

SyntaxError: invalid syntax (171553503.py, line 1)

In [4]:
#define standard amino acids
AAs= "ACDEFGHIKLMNPQRSTVWY"

#define additional tokens for special purposes
Additional_Tokens= ['<OTHER>', '<START>', '<END>', '<PAD>']

#number of additional tokens added to each sequence (start and end)
added_tokens_per_seq = 2

#number of amino acids
n_AAs= len(AAs)

#create a dictionary mapping each amino acid to a unique index
aa_to_token_index= {aa: i for i, aa in enumerate(AAs)}

#create a dictionary mapping each additional token to a unique index starting after the amino acids
additional_token_to_index= {token: i + n_AAs for i, token in enumerate(Additional_Tokens)}

#combine the two dictionaries into one for easy tokens indexing
token_to_index= {**aa_to_token_index, **additional_token_to_index}

#create a reverse dictionary mapping indices to tokens
index_to_token= {index: token for token, index in token_to_index.items()}

#total number of tokens (amino acids + additional tokens)
n_token= len(token_to_index)

def tokenize_seq(seq):
    """
    Convert a sequence into token indices.

    :param seq: The input sequence (string, bytes, or SeqRecord)
    :return: A tensor of token indices
    """
    #Index for the <OTHER> token used for unknown amino acids
    other_token_index= additional_token_to_index["<OTHER>"]

    #Tokenize the sequence with start and end tokens
    tokenized_seq= [additional_token_to_index["<START>"]] + [aa_to_token_index.get(aa, other_token_index) for aa in parse_seq(seq)] + [additional_token_to_index["<END>"]]
    
    #convert tokenized sequence into a tensor
    return torch.tenosor(tokenized_seq)

def parse_seq(seq):
    """
    Parse the input sequence into a string.

    :param seq: The input sequence which can be a string, bytes, or SeqRecord
    :return: A string representation of the sequence
    """
    if isinstance(seq, str):
        # If the sequence is already a string, return it as is
        return seq
    elif isinstance(seq, bytes):
        # If the sequence is in bytes, decode it to a string using utf-8 encoding
        return seq.decode("utf8")
    elif isinstance(seq, SeqIO.SeqRecord):
        # If the sequence is a SeqRecord, extract the sequence part as a string
        return str(seq.seq)
    else:
        # Raise a TypeError if the input sequence type is not recognized
        raise TypeError("unexpected sequence type: %s' % type(seq)")

In [5]:
class Embedding(nn.Module):
    """
    Embedding layer that combines tokens embeddings with positional encodings
    """
    def __init__(self, n_token, embedding_dim):
        """
        :param n_token: total number of unique token
        :param embedding_dim: dimension of the embedding space
        """
        super(Embedding, self).__init__()
        # Token embedding layer
        self.token_embedding= nn.Embedding(n_token, embedding_dim)
        # positional encoding layer
        self.positional_encoding= PositionalEncoding(embedding_dim)
    
    def forward(self, input_seq):
        """
        Forward pass for the embedding layer.

        :param input_seq: Input sequence of token indices
        :return: Combined embeddings with positional encodings
        """
        token_embeddings= self.token_embedding(input_seq)
        positional_encodings= self.positional_encoding(input_seq)
        embeddings= token_embeddings + positional_encodings
        return embeddings

class PositionalEncoding(nn.Module):
    """
    Positional encoding layer to add positional information to token embeddings
    """
    def __init__(self, embedding_dim, max_len= 100):
        """
        Initializes the positional encoding layer.

        :param embedding_dim: Dimension of the embedding space
        :param max_len: Maximum length of the sequences to be encoded
        """
        super(PositionalEncoding, self).__init__()
        #create a matrix of position (0 to max_len-1) with shape [max_len, 1]
        position= torch.arange(0, max_len).unsqueeze(1)
        #compute the division term for the position encodings
        div_term = torch.exp(torch.arange(0, embedding_dim, 2) * -(np.log(10000.0) / embedding_dim))
        #initialize a matrix for positional encodings with shape [max_len, 1, embedding_dim]
        pe = torch.zeros(max_len, 1, embedding_dim)
        #apply sine to even indices in the array
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        #apply cosine to odd indices in the array
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, input_seq):
        """
        forward pass for the positional encoding layer
        :param input_seq: input tensor of shape [batch_size, seq_len]
        :return: positional encodings corresponding to the input sequence length
        """
        #get the positional encodings up to the length of the input sequence
        return self.pe[:input_seq.size(1), :].transpose(0, 1)
    

In [None]:
class MultiheadAttention(nn.Module):
    def __init__(
        self,
        embedding_dim,
        num_heads,
    ):
        """
        :param embedding_dim: dimension of input embeddings
        :param num_heads: number of attention heads
        """
        super(MultiheadAttention, self).__init__()
        assert embedding_dim % num_heads == 0 #embedding dimension must be divisible by nember of heads
        self.embedding_dim = embedding_dim
        self.num_heads= num_heads
        self.d_k = embedding_dim // num_heads #dimension of each attention head

        #linear layers to project input embeddings to queries, keys, and values
        self.w_q = nn.Linear(embedding_dim, embedding_dim)
        self.w_k = nn.Linear(embedding_dim, embedding_dim)
        self.w_v = nn.Linear(embedding_dim, embedding_dim)
        self.w_o = nn.Linear(embedding_dim, embedding_dim) #linear layer to combine the heads' outputs

    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        """
        :param Q: query tensor
        :param K: key tensor
        :param V: value tensor
        :param mask: optional mask tensor 
        :return: attention output tensor
        """
        #calculate the attention score
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        #apply softmax to egt the attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)
        #compute the attention output as a weighted sum of the values
        output = torch.matmul(attn_probs, V)
        return output
    
    def split_heads(self, x):
        """
        split the input tensor into multiple heads.

        :param x: input tensor of shape (batch_size, input_seq, embedding_dim)
        :return: tensor of shape (batch_size, num_heads, input_seq, d_k)
        """
        batch_size, input_seq, embedding_dim = x.size()
        #reshape and transpose to seperate heads 
        return x.view(batch_size, input_seq, self.num_heads, self.d_k).transpose(1, 2)
    
    def combine_heads(self, x):
        """
        combine multiple heads back into a single tensor 
        
        :param x: input tensor of shape (batch_size, num_heads, input_seq, d_k)
        :return: tensor of shape (batch_size, input_seq, embedding_dim)
        """
        batch_size, _, input_seq, d_k = x.size()
        #transpose and reshape to combine heads
        return x.transpose(1, 2).contiguous().view(batch_size, input_seq, self.embedding_dim)
    
    def forward(self, Q, K, V, mask= None):
        """
        Forward pass for the MultiheadAttention module.

        :param Q: Query tensor
        :param K: Key tensor
        :param V: Value tensor
        :param mask: Optional mask tensor
        :return: Output tensor of the multihead attention mechanism
        """
        Q= self.split_heads(self.w_q(Q))
        K= self.split_heads(self.w_k(K))
        V= self.split_heads(self.w_v(V))

        attn_output = self.scaled_dot_product_attention(Q, K, V)
        output= self.w_o(self.combine_heads(attn_output))
        return output

In [6]:
class SingleHeadAttention(nn.Module):
    def __init__(
        self, 
        inp_ch: int,
        inp_seq: int, 
        key_dim: list, 
        que_dim: list, 
        out_ch: int, 
        out_seq: int, 
        d_k: int, 
        device, 
        verbose=False) -> None:
        """
        Initializes the SingleHeadAttention module.
        
        :param inp_ch: Number of input channels (N)
        :param inp_seq: Input sequence length (T)
        :param key_dim: List containing dimensions for the key projection matrix (k, s)
        :param que_dim: List containing dimensions for the query projection matrix (k, u)
        :param out_ch: Number of output channels (m)
        :param out_seq: Output sequence length (o)
        :param d_k: Dimension for scaling in dot-product attention
        :param device: Device to perform computations on (CPU/GPU)
        :param verbose: Flag for printing intermediate shapes for debugging
        """
        super(SingleHeadAttention, self).__init__()  

        self.N = inp_ch #number of input channels
        self.T = inp_seq #input sequence length
        self.k, self.s = key_dim #dimensions for the key projection matrix
        self.k, self.u = que_dim #dimensions for the query projectoin matrix
        self.m = out_ch #number of output channels
        self.o = out_seq #output sequence length
        self.d_k = d_k #dimension for scaling in dot-product attention
        self.device = device #device to perform computations on
        
        #initialize learnable weight parameters for q, k, and v matrices
        self.w_q = nn.Parameter(torch.randn([self.s, self.T]))
        self.w_k = nn.Parameter(torch.randn([self.u, self.o]))
        self.w_v = nn.Parameter(torch.randn([self.m, self.N]))
        self.verbos = verbose #flag for printing intermediate shapes for debugging
     

    def forward(self, Q, K, V):
        """
        Forward pass for the SingleHeadAttention module.
        
        :param Q: Query tensor
        :param K: Key tensor
        :param V: Value tensor
        :return: Tuple of (context, attention weights)
        """
        #project the input tensors q, k, and v using the corresponding weight matrices
        Q = torch.matmul(Q, self.w_q.to(self.device))
        K = torch.matmul(K, self.w_k.to(self.device))
        V = torch.matmul(V, self.w_v.to(self.device))
        
        #compute the attention scores by by scaling the dot product of q and k
        scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(self.d_k)
        #apply the softmax function to obtain the attention weights 
        attn = torch.softmax(scores, dim=-1)  
        # Compute the context vector as the weighted sum of the values V
        context = torch.matmul(attn, V)
        return context, attn


In [7]:
class TransformerEncoderLayer(nn.Module):
    def __init__(
        self,
        n_token, 
        embedding_dim,
        output_dim = 2, 
        num_heads, 
        feed_forward_dim, 
        dropout_rate= 0.1,
        max_len= 100):
        super(TransformerEncoderLayer, self).__init__()
        #Embedding layer: maps each token to a dense vector of fixed size (embedding dim)
        self.embedding= nn.Embedding(n_token, embedding_dim) 
        #positional encoding layer: adds positional information to the token embeddings 
        self.positional_encoding = PositionalEncoding(embedding_dim, max_len)
        #single head self attention layer: computee attention scores and context vectors 
        self.self_attention = SingleHeadAttention(
            inp_ch=embedding_dim, 
            inp_seq=max_len,
            key_dim=[embedding_dim // num_heads, embedding_dim],
            que_dim=[embedding_dim // num_heads, embedding_dim],
            out_ch=embedding_dim,
            out_seq=max_len,
            d_k=embedding_dim // num_heads,
            device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
            verbose=False
        )
        #layer normalization layers: normalize the outputs of each sub-layer
        self.layer_norm1= nn.LayerNorm(embedding_dim)
        self.layer_norm2= nn.LayerNorm(embedding_dim)
        
        #feed-forward network: three linear layers with ReLU activation in between
        self.feed_forward = nn.Sequential(
            nn.Linear(embedding_dim, feed_forward_dim),
            nn.ReLU(),
            nn.Linear(feed_forward_dim, feed_forward_dim),
            nn.ReLU(),
            nn.Linear(feed_forward_dim, output_dim)
        )
        #dropout layer: randomly zeroes some of the elements of the input tensor with probability dropout_rate
        self.dropout = nn.Dropout(dropout_rate)
    
    def forward(self, input_seq):
        #compute token embeddings
        token_embeddings = self.embedding(input_seq)
        #compute positional encodings
        positional_encoding= self.positional_encoding(input_seq)
        embeddings = token_embeddings + positional_encoding

        #self_attention
        attn_output, _ = self.self_attention(embeddings, embeddings, embeddings)
        #apply dropout to attention output 
        attn_output = self.dropout(attn_output)
        #add residual connection and apply layer normalizaiton
        out1 = self.layer_norm1(embeddings + attn_output)

        #apply feed-forward network 
        ffn_output = self.feed_forward(out1)
        #apply dropout to feed-foward output
        ffn_output = self.dropout(ffn_output)
        #add residual connection and apply layer normalization
        out2 = self.layer_norm2(out1 + ffn_output)

        return out2

In [8]:
class TransformerEncoder(nn.Module):
    def __init__(
        self,
        n_token,
        embedding_dim,
        output_dim,
        num_heads,
        feed_forward_dim,
        num_layers=1,
        dropout_rate=0.1,
        max_len= 100
    ):
        super(TransformerEncoder, self).__init__()
        self.embedding = nn.Embedding(n_token, embedding_dim)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model = embedding_dim,
            nhead = num_heads,
            dim_feedforward=feed_forward_dim,
            dropout=dropout_rate
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(embedding_dim, output_dim)
        self.dropout = nn.Dropout(dropout_rate)
        #stack multiple transformer encoder layers
        #each layer is an instance of the TransformerEncoderLayer class
        #self.layers = nn.ModuleList([TransformerEncoderLayer(n_token, embedding_dim, output_dim, num_heads, feed_forward_dim, dropout_rate, max_len) for _ in range(num_layers)])

    def forward(self, input_seq):
        embedded = self.embedding(input_seq)
        embedded = self.dropout(embedded)
        encoded = self.transformer_encoder(embedded)
        output = self.fc_out(encoded)
        return output

In [None]:
embedding_dim = 1024  # Dimensionality of the input embedding for each amino acid
feed_forward_dim = 512  # Hidden size of the transformer 
num_layers = 6  # Number of encoder layers 
num_heads = 8  # Number of attention heads 
output_dim = 2  # Dimensionality of the output (phi and psi angles)
dropout_rate= 0.01
max_len = 128

model = TransformerEncoder(n_token, embedding_dim, output_dim, num_heads, feed_forward_dim, num_layers, dropout_rate, max_len)
model = model.to(device)

#define the loss function and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=additional_token_to_index["<PAD>"]) #use <PAD> token index as ignore index
optimizer = optim.Adam(model.parameter(), lr=0.001)

def train(model, test_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0

    for input_seq, target_seq in test_loader:
        input_seq, target_seq = input_seq.to(device), target_seq.to(device)

        #clear gradients w.r.t. parameters
        optimizer.zero_grad()

        output_seq = model(input_seq)

        #reshape output and target to calculate loss
        output_seq = output_seq.view(-1, n_token)
        target_seq = target_seq.view(-1)

        #calculate loss: softmax --> cross entropy loss
        loss = criterion(output_seq, target_seq)

        #backward pass and optimize
        loss.backward()
        optimizer.step()

        total_loss +=loss.item()
    
    return total_loss / len(test_loader)

for epoch in range(num_epochs):
    epoch_loss = train(model, test_loader, criterion, optimizer, device)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}') 

In [None]:
n_token = N
embedding_dim = D
output_dim = 2
num_heads = 1
feed_forward_dim = 512
num_layers = 2
dropout_rate = 0.001
max_len = 128
model = TransformerEncoder(n_token, embedding_dim, output_dim, num_heads, feed_forward_dim, num_layers, dropout_rate, max_len)
# model = TransformerEncoder(N, D, 2, 1, 200)
model = model.to(device)
num_epochs = 100

#define the loss function and optimizer
criterion = nn.MSELoss()#nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #model.parameter?

def train(model, data, criterion, optimizer, device):
    model.train()
    total_loss = 0

    for input_seq, target_seq in data:
        input_seq, target_seq = input_seq.to(device), target_seq.to(device)

        #clear gradients w.r.t. parameters
        optimizer.zero_grad()

        output_seq = model(input_seq)

        #reshape output and target to calculate loss
        output_seq = output_seq.view(-1, N)
        target_seq = target_seq.view(-1)

        #calculate loss: softmax --> cross entropy loss
        loss = criterion(output_seq, target_seq)

        #backward pass and optimize
        loss.backward()
        optimizer.step()

        total_loss +=loss.item()

    return total_loss / len(data)

for epoch in range(num_epochs):
    epoch_loss = train(model, data, criterion, optimizer, device)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')