In [None]:
import pandas as pd
import numpy as np
import os
import re
#import lightgbm as lgb
import matplotlib.pyplot as plt
from Bio import SeqIO
import h5py

# from scipy.stats import chi2_contingency
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer, roc_auc_score
from sklearn.feature_selection import GenericUnivariateSelect, f_regression, SelectKBest
from sklearn.preprocessing import OneHotEncoder, LabelEncoder,StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA
from transformers import logging

# from transformers import T5Tokenizer, T5EncoderModel
import torch

# Load model directly
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, T5Tokenizer

# from transformers import T5Tokenizer

# from transformers import pipeline

# pipe = pipeline("translation", model="Rostlab/ProstT5")

# tokenizer = T5Tokenizer.from_pretrained("Rostlab/prot_t5_xl_uniref50", do_lower_case=False)

# tokenizer = T5Tokenizer.from_pretrained("Rostlab/prot_t5_xl_uniref50")#, use_fast=False)
# model = AutoModelForSeq2SeqLM.from_pretrained("Rostlab/prot_t5_xl_uniref50")

In [None]:
# things to study

# huggingface transformers
https://huggingface.co/learn/llm-course/chapter1/1?fw=pt

In [None]:
# Notes

# first use ProtT5 encoder only, half precision model (for speed, no k-mer/window size )
# feed seqs in 1 at a time (batch size 1)

# 1. identify test, train data
    # get embeddings for whole protein
# 2. build models w/ embeddings as input
    # 
# 3. evaluate models

# https://github.com/CalvinRusley/peapod/blob/main/README.md
# https://github.com/WRiegs/Squidly/blob/main/squidly/squidly.py#L265



In [None]:
# Definitions

## logits = raw/unnormalized predictions generated by model
## softmax = converts logits to probabilities that sum to one
## for classification tasks, softmax function generates a vector of (normalized) probabilities with one value for each possible class


In [None]:
from transformers import T5Tokenizer, T5EncoderModel
import torch
import re

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

# Load the tokenizer
tokenizer = T5Tokenizer.from_pretrained('Rostlab/prot_t5_xl_half_uniref50-enc', do_lower_case=False)

# Load the model
model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_half_uniref50-enc").to(device)

# gc.collect()

# # only GPUs support half-precision currently; if you want to run on CPU use full-precision (not recommended, much slower)
# if device == torch.device("cpu"):
#     model.to(torch.float32)

# only GPUs support half-precision currently; if you want to run on CPU use full-precision (not recommended, much slower)
model.full() if device=='cpu' else model.half()

# prepare your protein sequences as a dict {accession: sequence})
nif = list(SeqIO.parse("example.fasta", "fasta"))
nif_header = [seq_record.id for seq_record in nif]
nif_label = [seq_record.description.split("|")[1] for seq_record in nif]
nif_sequences = [str(seq_record.seq) for seq_record in nif]

# replace all rare/ambiguous amino acids by X and introduce white-space between all amino acids
nif_sequences = [" ".join(list(re.sub(r"[UZOB]", "X", sequence))) for sequence in nif_sequences]

nif_dict = dict(zip(nif_header, nif_sequences))

# tokenize sequences and pad up to the longest sequence in the batch
#ids = tokenizer(sequence_examples, add_special_tokens=True, padding="longest")
ids = tokenizer.batch_encode_plus(nif_sequences, add_special_tokens=True, padding="longest",return_tensors='pt')
input_ids = torch.tensor(ids['input_ids']).to(device)
attention_mask = torch.tensor(ids['attention_mask']).to(device)

# generate embeddings
with torch.no_grad():
    embedding = model(input_ids=input_ids, attention_mask=attention_mask)

embedding = embedding.last_hidden_state.cpu().numpy()
emb_0_per_protein = emb_0.mean(dim=0)


# save embeddings as a dict (without special tokens and padding)
nif_embeddings = {}
for i, seq_id in enumerate(nif_dict.keys()):
    seq_len = (attention_mask[i] == 1).sum()
    nif_embeddings[seq_id] = embedding[i][:seq_len]


In [None]:
# load dependencies
from pathlib import Path
from Bio import SeqIO
import pandas as pd
from transformers import T5Tokenizer, T5EncoderModel
import torch
import re
import os

In [None]:
# set up model

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")

# Load the tokenizer
tokenizer = T5Tokenizer.from_pretrained('Rostlab/prot_t5_xl_half_uniref50-enc', do_lower_case=False)

# Load the model
model = T5EncoderModel.from_pretrained("Rostlab/prot_t5_xl_half_uniref50-enc").to(device)

# gc.collect()

# # only GPUs support half-precision currently; if you want to run on CPU use full-precision (not recommended, much slower)
# if device == torch.device("cpu"):
#     model.to(torch.float32)

# only GPUs support half-precision currently; if you want to run on CPU use full-precision (not recommended, much slower)
model.full() if device=='cpu' else model.half()

In [None]:
# load fasta files

def process_sequence(seq):
    return " ".join(list(re.sub(r"[UZOB]", "X", seq)))

def load_fasta_by_class(fasta_dir):
    samples = []
    class_to_idx = {}

    for i, fasta_file in enumerate(sorted(Path(fasta_dir).glob("*.fasta"))):
        class_name = fasta_file.stem
        class_to_idx[class_name] = i

        for record in SeqIO.parse(fasta_file, "fasta"):
            samples.append({
                "id": record.id,
                "sequence": process_sequence(str(record.seq)),
                "label": i,
                "class": class_name
            })

    return samples, class_to_idx

# generagte embeddings
def embed_sequence(seq):

    ids = tokenizer.batch_encode_plus(nif_sequences, add_special_tokens=True, padding="longest",return_tensors='pt')
    input_ids = torch.tensor(ids['input_ids']).to(device)
    attention_mask = torch.tensor(ids['attention_mask']).to(device)

    # generate embeddings
    with torch.no_grad():
        embedding = model(input_ids=input_ids, attention_mask=attention_mask)

    embedding = embedding.last_hidden_state.cpu().numpy()

    # Remove special tokens
    embedding = embedding[1:-1]  # shape: (L, d)

    return embedding.cpu()


# save embeddings

def embed_and_save(samples, out_dir):
    os.makedirs(out_dir, exist_ok=True)
    metadata = []

    for s in samples:
        emb = embed_sequence(s["sequence"])

        out_file = Path(out_dir) / f"{s['id']}.pt"
        torch.save(emb, out_file)

        metadata.append({
            "id": s["id"],
            "label": s["label"],
            "class": s["class"],
            "length": emb.shape[0],
            "embedding_path": str(out_file)
        })

    return metadata


samples, class_map = load_fasta_by_class("data/fasta")

metadata = embed_and_save(
    samples,
    out_dir="data/embeddings/prostt5"
)

df = pd.DataFrame(metadata)
df.to_csv("data/metadata.tsv", sep="\t", index=False)

In [None]:
nif_Yhat = pred_model(emb.mean(dim=0,keepdims=True))
results["mem"][identifier] = torch.max(nif_Yhat, dim=1)[1].detach().cpu().numpy().squeeze()

In [None]:
# predict on new seqs

input_fasta_file = "input_sequences.fasta"

# create results dataframe
results_df = pd.DataFrame(columns = ['prot_desc', 'position','site_residue', 'probability', 'prediction'])

# load model
combined_model = load_model(model_path)

for seq_record in tqdm(SeqIO.parse(input_fasta_file, "fasta")):
    prot_id = seq_record.id
    sequence = str(seq_record.seq)
    
    positive_predicted = []
    negative_predicted = []
    
    # extract protT5 for full sequence and store in temporary dataframe 
    pt5_all = get_protT5_features(sequence)
    
    # generate embedding features and window for each amino acid in sequence
    for index, amino_acid in enumerate(sequence):
        
        # check if AA is 'K'
        if amino_acid in ['K']:
            
            # we consider site one more than index, as index starts from 0
            site = index + 1
            
            # extract window
            window = extract_one_windows_position(sequence, site)
            
            # extract embedding_encoding
            X_test_embedding = get_input_for_embedding(window)
            
            # get ProtT5 features extracted above
            X_test_pt5 = pt5_all[index]
            
            # prediction results           
            y_pred = combined_model.predict([X_test_embedding.reshape(1, win_size), np.array(X_test_pt5.reshape(1,1024))], verbose = 0)[0][0]
            
            # append results to results_df
            results_df.loc[len(results_df)] = [prot_id, site, amino_acid, y_pred, int(y_pred > cutoff_threshold)]


In [None]:
# evaluate model on test data

# load test data
test_positive_pt5 = pd.read_csv("data/test/features/test_positive_ProtT5-XL-UniRef50.csv", header = None).iloc[:,2:]
test_negative_pt5 = pd.read_csv("data/test/features/test_negative_ProtT5-XL-UniRef50.csv", header = None).iloc[:,2:]

# create labels
test_positive_labels = np.ones(test_positive_pt5.shape[0])
test_negative_labels = np.zeros(test_negative_pt5.shape[0])

# stack positive and negative data together
X_test_pt5 = np.vstack((test_positive_pt5,test_negative_pt5))
y_test = np.concatenate((test_positive_labels, test_negative_labels), axis = 0)

# shuffle X and y together
# X_train_pt5, y_train_pt5 = shuffle(X_train_pt5, y_train_pt5)
# X_test_pt5, y_test_pt5 = shuffle(X_test_pt5, y_test_pt5)

# convert sequences to integer encoding, for embedding
test_positive_embedding = get_input_for_embedding('data/test/fasta/test_positive_sites.fasta')
test_negative_embedding = get_input_for_embedding('data/test/fasta/test_negative_sites.fasta')

# stack positive and negative data together
X_test_embedding = np.vstack((test_positive_embedding,test_negative_embedding))

# load saved model
combined_model = load_model(model_path)

# predict test data
y_pred = combined_model.predict([X_test_embedding,X_test_pt5]).reshape(y_test.shape[0],)
y_pred = (y_pred > cutoff_threshold)
y_pred = [int(i) for i in y_pred]
y_test = np.array(y_test)
y_pred = np.array(y_pred)

# calculate performance metrics
cm = confusion_matrix(y_test, y_pred)
mcc = matthews_corrcoef(y_test, y_pred)
acc = accuracy_score(y_test, y_pred)

sn = cm[1][1]/(cm[1][1]+cm[1][0])
sp = cm[0][0]/(cm[0][0]+cm[0][1])

print("\n %s, %s, %s, %s, %s \n" %(str(acc), str(mcc), str(sn), str(sp), cm))

In [None]:
# Visualize correlations between numerical features and the target variable
correlation_matrix = df_train[numerical_cols].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f")
plt.title('Correlation Matrix')
plt.show()

Models

In [None]:
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb

# RF
rf_clf = RandomForestClassifier(max_depth=20, max_features='sqrt', min_samples_split=5, n_estimators=10)
rf_clf.fit(X_train_pt5, y_train_pt5)

# SVM
svm_clf = SVC(C = 3, gamma = 0.01, kernel = 'rbf', probability=True)
svm_clf.fit(X_train_pt5, y_train_pt5)

#XGB
xgb_clf = xgb.XGBRegressor(seed = 321, max_depth =  10, learning_rate = 0.01, n_estimators = 100, colsample_bytree = 0.3)
xgb_clf.fit(X_train_pt5, y_train_pt5)

In [None]:
# Other

Preprocessing
1. Import Data 
2. Subset data (reduce runtime, improve mafft performance)
2. Align sequences (mafft)
3. Encode categorical values (Optional, AA, gene annotations)
4. Collapse AA into functional groups (Optional)

In [None]:
# align sequences (mafft)
os.system(f"cat in1.fasta in2.fasta > in-merged.fasta")
os.system(f"mafft --auto --quiet in.fasta > out.aln")

# convert to dataframe, add Y (gene annotation)
df = pd.read_csv('out.aln', sep="\t")  # example

Profile/Probabilistic Methods

Statistical Methods
1. ANOVA

In [None]:
# 1. ANOVA
import statsmodels.api as sm
import statsmodels.formula.api as smf
import statsmodels.stats.api as sms

def OLS_sm(df,
           dependant_var='price',
           numeric_features=[],
           categorical_features=[],
           verbose=False,
           show_summary=True,
           show_plots=True,
           target_is_dollar=True):
    """
    ### Uses formula based statsmodels regression for OLS. ###
    
    Displays a statsmodels.iolib.summary.Summary object containing summary of OLS analysis. 
    Returns a statsmodels.regression.linear_model.RegressionResultsWrapper which can be used to access other options available.

    Parameters:
    ===========
        df = pandas.DataFrame; no default. 
                Input dataset to use for OLS.
        dependant_var = str; default: 'price'. 
                Dependent variable.
        numeric_features = list; default = []. 
                Identify numeric features.
        categorical_features = list; default = []. 
                Identify categorical features.
        verbose = boolean; default: False. 
                Shows some formula used and drop information.
                    `True` shows information.
                    `False` does not show information.
        show_summary = boolean; default: False. 
                Shows summary report.
                    `True` shows information.
                    `False` does not show information.
        show_plots = boolean; default: True. 
                Shows summary and Homoscedasticity information.
                    `True` shows information.
                    `False` does not show information.
        target_is_dollar = boolean; default: True. 
                Modify chart axis label.
                    `True` shows information.
                    `False` does not show information.    
    Under-The-Hood:
    =============
    --{Major Steps}--
        
        ## Regression
        cate = ' + '.join([f'C({x})' for x in categorical_features])
        nume = ' + '.join([f'{x}' for x in numeric_features])
        formula = f'{dependant_var} ~ {nume} + {cate}'
        
        ## plots
        # plot on the left
        sm.qqplot(multiple_regression.resid,
                  dist=stats.norm,
                  line='45',
                  fit=True,
                  ax=ax1)
        # plot on the right
        ax2.scatter(x=multiple_regression.fittedvalues,
                    y=multiple_regression.resid,
                    s=4,
                    color='gold')
    
    Note:
    =====
        Make sure that every column in the DataFrame has the correct dtype.
        Numeric values stored as str (i.e, object) will make stats model assume that those are categorical variable.
        If Erros, check df to see if the passed feature is available in the DataFrame.
    
    Issues:
    =======
        - Output control is not clear.
    
    Changelog:
    ==========
        - changed `resid`, was using `resid_pearson`.
    
    -- ver: 1.3 --
    """
    cate = ' + '.join([f'C({x})' for x in categorical_features])
    nume = ' + '.join([f'{x}' for x in numeric_features])
    if len(cate)==0:
        formula = f'{dependant_var} ~ {nume}'
    else:
        formula = f'{dependant_var} ~ {nume} + {cate}'
    print('Formula for the OLS model: ', formula)
    # OLS regressor
    multiple_regression = smf.ols(formula=formula, data=df).fit()

    if verbose:
        show_summary = True
        show_plots = True

    if show_summary:
        display(multiple_regression.summary())
    if show_plots:
        # plotting
        # plot 1
        fig, (ax1,
              ax2) = plt.subplots(ncols=2,
                                  figsize=(10, 5),
                                  gridspec_kw={'width_ratios': [0.6, 0.4]})
        sm.qqplot(multiple_regression.resid,
                  dist=stats.norm,
                  line='45',
                  fit=True,
                  ax=ax1)
        ax1.set_title('Q-Q Plot', fontdict={"size": 15})
        # plot 2
        # uses The predicted values for the original (unwhitened) design.
        ax2.scatter(x=multiple_regression.fittedvalues, 
                    y=multiple_regression.resid,
                    s=4,
                    color='gold')
        if target_is_dollar:
            ax2.yaxis.set_major_formatter(format_number)
        ax2.set(xlabel='Predicted', ylabel='Residuals')
        ax2.axhline(y=0, c='r', lw=4, ls='--')
        ax2.set_title('Predicted VS Residuals', fontdict={"size": 15})
        plt.suptitle('Visual Check of Residuals for Homoscedasticity',
                     ha='center',
                     va='bottom',
                     fontdict={"size": 25})
        plt.tight_layout()
    if verbose == False and show_summary == False and show_plots == True:
        print('r_sq:', round(multiple_regression.rsquared, 4))
    return multiple_regression

# Univariate ANOVA
''' package can handle categorical variables directly, no need to encode'''

stat_list = []
for idx, column in enumerate(df.columns):
    regression_target = 'annot'
    # for dealing with categorical variables
    temp_df = df.copy()
    for column in df.columns:
        f = f'{regression_target} ~ C({column})'
    model = smf.ols(formula=f, data=temp_df).fit()
    temp_dict = {
        'name': column,
        'r_sq': model.rsquared,
        'intercept': model.params[0],
        'beta': model.params[1],
        'p_val': model.pvalues[1],
        'Jarque-Bera': sms.jarque_bera(model.resid)[0] 
    }
    stat_list.append(temp_dict)
df_stat = pd.DataFrame(stat_list).set_index('name')

# Multivariate ANOVA
OLS_sm(df=df,
       numeric_features=[],
       dependant_var='price',
       categorical_features=df.columns.tolist()[:-1],
       show_summary=False)

ML-based methods (Wrapper?)