In [None]:
!pip install statannotations scikit-learn-intelex

In [None]:
import itertools

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns

from sklearnex import patch_sklearn
patch_sklearn()

import sklearn

import sklearn.pipeline 
import sklearn.model_selection
import sklearn.metrics
import sklearn.neural_network

from sklearn.preprocessing import StandardScaler

import gensim.models 

from scipy import stats
import statsmodels.stats.multicomp as mc

from statannotations.Annotator import Annotator

In [None]:
mpl.rcParams.update({'xtick.labelsize': 14, 'ytick.labelsize': 14, 
                     'axes.titlesize':14, 'axes.labelsize':16}) #default font sizes for plots

In [None]:
!wget -O data.tar 'https://drive.google.com/uc?export=download&id=12ekhGbgMPGYSX-t6mFR5TEgxGF46-sQ_'
!tar -xvf data.tar

In [None]:
data_dir = './'

# MPRA data

We are going to predict data from a massively parallel reporter assay (MPRA) study (Griesemer et al. 2021):

<img src="https://pbs.twimg.com/media/Ers7U2YVgAI-BDG?format=jpg&name=large" width=70% />

We will treat Ref and Alt seqeunces independently and predict reporter expression in HMEC cells for each seqeunce using regression methods.

In [None]:
mpra_df = pd.read_csv(data_dir + 'mpra_HMEC.csv') #sequence info

In [None]:
mpra_df.head()

In [None]:
len(mpra_df)

In [None]:
ax = mpra_df.Expression.hist(bins='auto',figsize=(6,3))

ax.set_xlabel('MPRA expression Log2FC')
ax.set_ylabel('counts')

The predictions will be based on embeddings, obtained for each seqeunce via a masked language model (MLM) (Gankin et al. 2023). MLM is an unsupervised model which we specifically retrained to reconstruct masked nucleotides in DNA seqeunces of mammalian 3'UTR regions. 

<img src="https://pbs.twimg.com/media/FntnrCzXgAEUMh3?format=jpg&name=medium" width=70% />

In [None]:
mlm_embeddings = np.load(data_dir + "mpra_MLM.npy") #masked language model embeddings

# Compare regression algorithms

We will first compare a few common machine learning regression algorithms: Ridge (L2) regression, multilayer perceptron (MLP), and support vector regression (SVR). 

Nested Cross-Validation is a common approach to compare different machine-learning algorithms. 
The outer loop serves to estimate the algorithm performance and the inner loop is used to tune hyper-parameters.
When the best performing algorithm is chosen,  hyperparameter tuning is performed again in a single CV loop over the whole available data. The final model is then obtained via training with the resulting hyperparamaters on all the data. To report the estimated performance, the score obtained at the 1st step with the Nested CV can be used.

<img src="https://hackingmaterials.lbl.gov/automatminer/_images/cv_nested.png" width=70% />

In [None]:
class GroupNestedCV():
    
    '''
    Group Nested Cross-Validation
    
    Parameters:
    clf: sklearn-compatible classifier
    hpp_search_grid: dictionary of parameter values for hyperparameter search
    
    if hpp_search_grid is None, just perform Group k-fold CV
    '''
    
    def __init__(self, clf, hpp_search_grid=None, n_splits = 10):
        
        self.clf = clf
        self.hpp_search_grid = hpp_search_grid
        self.group_kfold = sklearn.model_selection.GroupKFold(n_splits=n_splits)
        
    def run(self, X, y, groups):
    
        '''
        Iterates over  self.group_kfold folds and performs hyperparameter search within each fold

        Returns:
        Predictions for all folds
        '''

        kfold_scores = [] #predictions in all folds

        #outer loop
        for fold_idx, (train_idx, test_idx) in enumerate(self.group_kfold.split(X, y, groups)):

            X_train, y_train, groups_train = X[train_idx,:], y[train_idx], groups[train_idx]

            X_test, y_test = X[test_idx,:], y[test_idx]

            if self.hpp_search_grid!=None:

                print(f'Hyperparameter search in fold {fold_idx}')

                gs = sklearn.pipeline.make_pipeline(StandardScaler(),
                                                sklearn.model_selection.GridSearchCV(self.clf, self.hpp_search_grid, cv=3))

                gs.fit(X_train, y_train, gridsearchcv__groups = groups_train)

                best_params = gs['gridsearchcv'].best_params_
                print(f'Best hyperparameters: {best_params}')

            #train 
            pipe = sklearn.pipeline.make_pipeline(...)
            pipe.fit(X_train, y_train)
            
            #inference
            y_pred = pipe.predict(X_test)

            kfold_scores.append(np.vstack(([fold_idx]*len(y_test),y_pred,y_test))) #add predictions for the current fold

        return kfold_scores

In [None]:
algs = {} #algorithms to test
hpp_search_space = {} #hyperparameter search space for each model

algs['ridge'] = sklearn.linear_model.Ridge()
hpp_search_space['ridge'] = {'alpha':10.**np.arange(-10,10)}
              
algs['MLP'] = sklearn.neural_network.MLPRegressor(hidden_layer_sizes=(64,32,16,), alpha=10, 
                 batch_size=1000, learning_rate_init=5e-4, max_iter=300, shuffle=False)

algs['SVR'] = sklearn.svm.SVR(C=4, epsilon=0.1)

In [None]:
X = mlm_embeddings #MLM embeddings
y = mpra_df['Expression'].values #dependent variable
groups = mpra_df['group'].values #groups (genes)

In [None]:
preds_mpra = []

for model, alg in algs.items():
    
    print(f'MODEL: {model}')
    
    group_nested_cv = GroupNestedCV(alg, hpp_search_space.get(model,None), )
    
    scores = group_nested_cv.run(X, y, groups)
    
    scores = pd.DataFrame(np.hstack(scores).T, columns=['fold','y_pred','y_test']) #numpy array to DataFrame
    
    scores['model'] = model #add model column
    
    preds_mpra.append(scores)#stack dataframes of different models

preds_mpra = pd.concat(preds_mpra)#concatenate dataframes

In [None]:
preds_mpra

In [None]:
def r2_score(y_test, y_pred):
    ...

In [None]:
per_fold_scores = preds_mpra.groupby(['model','fold']).apply(lambda x: r2_score(x.y_test,x.y_pred)).rename('score').reset_index()

In [None]:
fig, ax = plt.subplots(figsize=(4,4))

ax = sns.swarmplot(data=per_fold_scores, x="model", y="score") #scatter plot
ax = sns.boxplot(data=per_fold_scores, x="model", y="score", boxprops={'facecolor':'None'})

box_pairs=[ ("MLP", "SVR"), ("MLP", "ridge"), ("SVR", 'ridge')]

annotator = Annotator(ax, box_pairs, data=per_fold_scores, x="model", y="score")
annotator.configure(test='Wilcoxon', text_format='star', loc='inside')

annotator.apply_and_annotate()

ax.set_xlabel("")
ax.set_ylabel("score")
ax.tick_params(rotation=30)
ax.grid()

An alternatlive way to compare models: pool predictions from all CV folds and pretend that they are obtained from a single test set.
Then the error for each model and each observation is computed. Afterwards, a statistical test is preformed to determine if the average error from one model is greater than the average error from another model.

In [None]:
preds_mpra['error'] = abs(preds_mpra.y_pred-preds_mpra.y_test) #absolute errors for each observation

In [None]:
preds_mpra.groupby('model').error.mean()

In [None]:
comp1 = mc.MultiComparison(preds_mpra['error'], preds_mpra['model'])
tbl, a1, a2 = comp1.allpairtest(stats.wilcoxon, method= "bonf")

tbl

Despite being very popular, both approaches violate the sample independence assumption: each sample is used to train 9 out of 10 models (for 10-fold CV), so neither per-fold scores nor single pooled observations are completely independent.

See hte followig paper for more details:

Dietterich, Thomas G. "Approximate statistical tests for comparing supervised classification learning algorithms." Neural computation 10.7 (1998): 1895-1923.

# Comparing with other embeddings

We shall also try to predict MPRA expression from alternative embeddings: 4-mer counts and a Word2Vec model.

For this, we need to define some utility functions.

In [None]:
class Kmerizer:
    '''
    Helper class to generate k-mers and Word2Vec embeddings
    '''
    
    def __init__(self, k):
        
        self.k = k
        
        #generate all possible k-mers out of 4 characters, e.g. ACGTC, GTACC, etc.. for k=5
        
        ...

        self.kmers = {kmer:idx for idx,kmer in enumerate(kmers)}
        
    def kmerize(self, seq):
        '''
        Count all k-mers in the sequence 
        Returns:
        A list with counts corresponding to each possible k-mer from self.kmers
        e.g. for k=2 and seq='ACTAC'
        > [0, 2, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0]
        '''
        counts = [0]*4**self.k
        for i in range(len(seq) - self.k + 1): 
            kmer = seq[i:i+self.k]
            counts[self.kmers[kmer]] += 1
        return counts
    
    def tokenize(self, seq):
        '''
        Get all k-mers in the sequence
        Returns:
        A list of all k-mers
        e.g. for 2-mers and seq='ACTAC' 
        > ['AC', 'CT', 'TA', 'AC']
        '''
        kmers = []
        
        ...
        
        return kmers

In [None]:
def word2vec_model(mpra_df):
    
    '''
    Word2Vec model
    
    k-mers are defined through their context: 
    k-mers with similar context will have similar embeddings
    '''
    
    kmerizer_w2v = Kmerizer(k=4)
    
    w2v_model = gensim.models.Word2Vec(sentences=mpra_df.seq.apply(lambda x: kmerizer_w2v.tokenize(x)), 
                         vector_size=128, window=5, min_count=1, workers=4, sg=1) #default: CBOW

    word2vec_emb = mpra_df.seq.apply(
        lambda x: np.mean([w2v_model.wv[x]  for x in kmerizer_w2v.tokenize(x)],axis=0)) #average embedding of all 4-mers in the sequence

    X = np.stack(word2vec_emb,axis=0)
    
    return X

In [None]:
data_matrices = {} #embeddings dictionary

data_matrices['MLM'] = mlm_embeddings

kmerizer4 = Kmerizer(k=4)
data_matrices['4-mers'] = np.stack(mpra_df.seq.apply(lambda x: kmerizer4.kmerize(x))) 

data_matrices['Word2Vec'] = word2vec_model(mpra_df)

y = mpra_df['Expression'].values
groups = mpra_df['group'].values

In [None]:
alg = sklearn.svm.SVR(C=4, epsilon=0.1)

preds_mpra = []

for model, X in data_matrices.items():
    
    print(f'MODEL: {model}')
    
    group_nested_cv = GroupNestedCV(alg)
    
    scores = group_nested_cv.run(X, y, groups)
    
    scores = pd.DataFrame(np.hstack(scores).T, columns=['fold','y_pred','y_test']) #numpy array to DataFrame
    
    scores['model'] = model #add model column
    
    preds_mpra.append(scores)#stack dataframes of different models

preds_mpra = pd.concat(preds_mpra)

In [None]:
per_fold_scores = preds_mpra.groupby(['model','fold']).apply(lambda x: sklearn.metrics.r2_score(x.y_test,x.y_pred)).rename('score').reset_index()

In [None]:
fig, ax = plt.subplots(figsize=(4,4))

ax = sns.swarmplot(data=per_fold_scores, x="model", y="score") #scatter plot
ax = sns.boxplot(data=per_fold_scores, x="model", y="score", boxprops={'facecolor':'None'})

box_pairs=[ ("MLM", "4-mers"), ("MLM", "Word2Vec"), ("4-mers", "Word2Vec")]

annotator = Annotator(ax, box_pairs, data=per_fold_scores, x="model", y="score")
annotator.configure(test='Wilcoxon', text_format='star', loc='inside', comparisons_correction="BH")
#annotator.configure(test='t-test_paired', text_format='star', loc='inside', comparisons_correction="BH")

annotator.apply_and_annotate()

ax.set_xlabel("")
ax.set_ylabel("score")
ax.tick_params(rotation=30)
ax.grid()

# Assessing model stability

For a stable model, predictions are robust with respect to small changes in the train set (e.g. when including/excluding individual train instances).

To assess model stability, one performs repeated Cross-Validation: at each round the dataset is split into the same number of folds, but the exact fold composition is different. Then variance in predictions for each test point are estimated.

Example for classification (https://stats.stackexchange.com/questions/551242):

<img src="img/stability.png" width=70% />

In [None]:
X = mlm_embeddings
y = mpra_df['Expression'].values
groups = mpra_df['group'].values

pipe = sklearn.pipeline.make_pipeline(sklearn.preprocessing.StandardScaler(), 
                     sklearn.linear_model.Ridge(alpha=100)) 

#pipe = sklearn.pipeline.make_pipeline(sklearn.preprocessing.StandardScaler(), 
#                     sklearn.svm.SVR(C=4, epsilon=0.1))

#pipe = sklearn.pipeline.make_pipeline(sklearn.preprocessing.StandardScaler(), 
#                sklearn.neural_network.MLPRegressor(hidden_layer_sizes=(64,32,16,), alpha=10, batch_size=1000, learning_rate_init=5e-4, max_iter=500, shuffle=False))

N_rounds = 50 #number of CV rounds
N_splits = 10 #number of CV splits in each round

N_instances = len(y) #total number of test points equals dataset size

cv_res = np.zeros((N_rounds*N_splits,N_instances)) #CV predictions for each point
cv_res[:] = np.NaN 

cv_scores = [] # score for each fold in each round, N_rounds X N_splits

for round_idx in range(N_rounds):
    
    print(f'CV round {round_idx}')

    gss = sklearn.model_selection.GroupShuffleSplit(n_splits=N_splits, train_size=.9, random_state = round_idx) #10-fold CV

    for fold_idx, (train_idx, test_idx) in enumerate(gss.split(X, y, groups)):
        
        X_train, X_test, y_train, y_test = X[train_idx,:],X[test_idx,:],y[train_idx],y[test_idx]
        
        pipe.fit(X_train,y_train)
        
        y_pred = pipe.predict(X_test)
            
        cv_res[round_idx*N_splits+fold_idx,test_idx] = y_pred #predictions for test instances in this fold in this round
        
        cv_scores.append((sklearn.metrics.r2_score(y_test,y_pred), round_idx)) #score for this fold in this round
        
cv_scores = pd.DataFrame(cv_scores, columns=['score', 'CV round'])

In [None]:
fig, ax = plt.subplots(figsize=(4,4))

cv_scores['model'] = 'MLM'

ax = sns.swarmplot(data=cv_scores[cv_scores['CV round']<10],y='score',x='model',hue="CV round", palette="deep")
ax = sns.boxplot(data=cv_scores,y='score',x='model', boxprops={'facecolor':'None'})
sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1))

In [None]:
fig, axes = plt.subplots(1,5, figsize=(10,2))

axes = axes.flatten()

samples = np.random.choice(a=range(len(y)),size=5) #choose 5 random test points

for ax,sample_idx in zip(axes,samples):
    #sns.kdeplot(x=cv_res[:,sample_idx], ax=ax)
    sns.swarmplot(x=cv_res[:,sample_idx], ax=ax)
    sns.boxplot(x=cv_res[:,sample_idx], ax=ax, boxprops={'facecolor':'None'})
    ax.set_ylabel('')
    ax.set_yticks([])
    ax.tick_params(axis='x', which='major', labelsize=8)
    ylims = ax.get_ylim()
    ax.plot(y[sample_idx]*np.ones((50,)),np.linspace(*ylims), linestyle='--', color='tab:orange') #y_true
    ax.set_title(f'sample {sample_idx}')
    ax.set_xlabel('y_pred')
    
fig.tight_layout()

We can evaluate model instability by computing the average coefficient of variation over test points:

In [None]:
cv = ... #coefficient of variation of each test point due to change in train set distribution

In [None]:
abs(cv).mean() #measure of model instability

When choosing models, we prefer stable ones.

How to fight instability?

- add regularization
- use stable algorithms
- reduce the number of features through feature engineering

How to estimate generalization performance of an unstable model?

In [None]:
score_per_round = cv_scores.groupby('CV round')['score'].mean()#mean score in each round

CV_rounds = np.arange(1,N_rounds+1)

average_score = np.cumsum(score_per_round)/CV_rounds #cumulative average

In [None]:
fig, ax = plt.subplots()

ax.plot(CV_rounds, score_per_round, marker='o', markersize=2, linestyle='') 

ax.plot(CV_rounds, average_score, markersize=2) 

ax.set_xlabel('CV rounds')
ax.set_ylabel('score')
ax.grid()

To estimate generalization performance of an unstable model, one performs repeated K-fold CV. The score is then averaged over all folds and all rounds. The number of repeats can be determined by plotting the averaged performance metric vs the number of rounds.