In [None]:
import numpy as np
from memoization import cached
import scikit_posthocs as sp
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Memory memoization
from joblib import Memory
location = './cachedir'
memory = Memory(location, verbose=0)

# n_hold_out_validation = memory.cache(n_hold_out_validation) 

# k_cross_validation = memory.cache(k_cross_validation) 

# nk_rep_cross_validation = memory.cache(nk_rep_cross_validation) 

In [None]:
@cached
def order_df(df, metric_name, ascending=True):
    order = df.loc[metric_name].mean().sort_values(ascending=ascending)
    df = df.reindex(order.index, axis=1)
    return df

In [None]:


def plot_swarm_metrics(df_results, metric_name, 
                       order_cols=True, title_extra='', 
                       ref_values = None,
                       **kwargs):
    sns.set(context='talk', style='whitegrid', font_scale=0.8)
    
    if order_cols:
        df_results = order_df(df_results, metric_name, **kwargs) 
    
    df_melted = pd.melt(df_results.loc[metric_name], var_name='method', value_name=metric_name)

    fig, ax = plt.subplots(figsize=(12,6))
    ax = sns.swarmplot(data=df_melted, x='method', y=metric_name,
                      size=6)
    sns.pointplot(data=df_melted, x='method', y=metric_name, dodge=True,
                  ci=95, ax=ax, join=False, color='black', capsize=0.4)
    ax.set(
           xlabel='VS Method')
    ax.set_title(f'ML vs CS results: {metric_name.replace("_", " ").upper()}_{title_extra}', 
                 fontweight='bold', fontsize=18)
    if metric_name == 'roc_auc':
        ax.set_ylim(0.3, 1.001)
        ax.axhline(0.5, ls='--')
    else:
        ax.set_ylim(0.0, 1.001)
    # plot horizontal lines with reference values
    if ref_values != None and isinstance(ref_values, dict):
        for name, value in ref_values:
            ax.axhline(value, ls='--', c='powderblue')


In [None]:
from sklearn.model_selection import StratifiedKFold, StratifiedShuffleSplit, RepeatedStratifiedKFold

def _validate(clf, clf_name, 
                X_test, y_test,
                metric_name, metric_params):
    if clf_name.startswith('ml_'):
        # Make the predictions
        y_pred = clf.predict_proba(X_test)[:,1]
    elif clf_name.startswith('cs_'):
        # apply consensus
        y_pred = clf(pd.DataFrame(X_test))
    else:
        print(clf_name, 'not found. Ommited.')
        return
    
    # Make the evaluation
    metric = PlotMetric(y_test, {'': y_pred},
                decreasing=False)\
                .format_metric_results(
                    rounded=5,
                    #metric_name=metric_name,
        # metric_params already includes metric_name
                    **metric_params)

    return metric.values[0][0]


def _train_cfl(clf, X_train, y_train):
    # Fit the estimator
    clf.fit(X_train, y_train)
    return clf
    

def _do_replicates(splits, 
                   estimators, X, y,
                   metrics,
                   random_vars=False,
                   k_random_vars=0
                  ):
    results={}
    # Machine Learning Classifiers
    for clf_name, clf in estimators.items():
        folds = []
        for i, (train, test) in enumerate(splits):
            if clf_name.startswith('ml_'):
                
                # Subset k random variables?
                if random_vars and k_random_vars > 0:
                    n_ = X.shape[1]
                    vars_index = np.random.choice(
                            a = n_,
                            size = k_random_vars,
                            replace = False
                        )
                    #Subset X by columns
                    X = X[:, vars_index]
                
                
                # Fit the ml classifier once per fold
                cfl = _train_cfl(clf, X[train], y[train])
            
            
            for metric_name, metric_params in metrics.items():
                metric = _validate(
                    clf, clf_name, 
                    X[test], y[test],
                    metric_name, metric_params
                )
                # Append the results
                folds.append(metric)
            
            

        # Add to the results dictonary 
        results[clf_name] = folds

    return results


@memory.cache
def _format_results_to_df(metrics, results, n):
        # Format into a dataframe
    # Create the metric names and repeat them 
    n_metrics = len(metrics)
    index_names = [*metrics.keys()]*n
    
    # convert to a dataframe
    df_res = pd.DataFrame(
        results, 
        index= pd.MultiIndex.from_tuples(
            zip(index_names,
                np.repeat(range(n), n_metrics))
        ))
    df_res = df_res.sort_index()
    
    return df_res

@memory.cache
def k_cross_validation(
          estimators, X, y,
          metrics,
          n_splits=5, 
          random_state=None, 
          shuffle=True):
    # Compute the Stratified K folds
    cv = StratifiedKFold(n_splits=n_splits, 
                         random_state=random_state,
                         shuffle=shuffle)
    splits = [*cv.split(X, y)]
    
    results = _do_replicates(splits, estimators, X, y, 
                             metrics)
    
    df_res = _format_results_to_df(metrics, results, n=n_splits)
    
    return df_res 

@memory.cache
def n_hold_out_validation(
          estimators, X, y,
          metrics,
          n_reps=5, test_size=0.25,
          random_state=None, **kwargs):
    # Compute the Stratified K folds
    cv = StratifiedShuffleSplit(
                        n_splits=n_reps, 
                        test_size=test_size,
                        random_state=random_state)
    splits = [*cv.split(X, y)]
    
    results = _do_replicates(splits, estimators, X, y,
          metrics, **kwargs)
    
    df_res = _format_results_to_df(metrics, results, n=n_reps)
    
    return df_res 

@memory.cache
def nk_rep_cross_validation(
          estimators, X, y,
          metrics,
          n_splits=2, 
          n_repeats=5,
          random_state=None, 
          shuffle=True):
    # Compute the Stratified K folds
    cv = RepeatedStratifiedKFold(
                         n_splits=n_splits,
                         n_repeats=n_repeats,
                         random_state=random_state)
    splits = [*cv.split(X, y)]
    
    results = _do_replicates(splits, estimators, X, y, 
                             metrics)
    
    df_res = _format_results_to_df(metrics, results, n=n_splits*n_repeats)
    
    return df_res 

In [None]:
# To get the best and the mean single score
@cached
def get_best_single_performance(y_true, X, metric_params, decreasing=True):
    preds = {i:j for i,j in zip(range(X.shape[1]), X.T)}
    performances = PlotMetric(y_true,preds,
                decreasing=decreasing)\
                .format_metric_results(
                    rounded=5,
                    **metric_params)
    mean = performances.mean()
    maximum = performances.max()
    return mean.values[0], maximum.values[0]

@cached
def n_hold_out_single_performance(
          X, y,
          metric,
          n_reps=5, test_size=0.25,
          random_state=None, decreasing=True):
    # Compute the Stratified K folds
    cv = StratifiedShuffleSplit(
                        n_splits=n_reps, 
                        test_size=test_size,
                        random_state=random_state)
    splits = [*cv.split(X, y)]
    
    rep_mean = []
    rep_max = []
    # We are only interested in the test set
    for _, test in splits:
        mean, maximum = get_best_single_performance(y[test], X[test], 
                                                    metric, decreasing=decreasing)
        rep_mean.append(mean)
        rep_max.append(maximum)
    # Compute the final results: One mean and one maximum
    final_mean = np.mean(rep_mean)
    final_max = np.max(rep_max)
    
    return final_mean, final_max 

In [None]:
from sklearn.metrics import auc
from sklearn.metrics import plot_roc_curve
from sklearn.model_selection import StratifiedKFold

@cached()
def plot_roc_cv(classifier, X, y, random_state=None,
               n_folds=5, ax=None, name=''):
    sns.set(style='whitegrid', font_scale=1.2)
    
    cv = StratifiedKFold(n_splits=n_folds, random_state=random_state, shuffle=True)

    tprs = []
    aucs = []
    mean_fpr = np.linspace(0, 1, 100)
    
    if ax == None:
        fig, ax = plt.subplots()
    for i, (train, test) in enumerate(cv.split(X, y)):
        classifier.fit(X[train], y[train])
        viz = plot_roc_curve(classifier, X[test], y[test],
                             name='ROC fold {}'.format(i),
                             alpha=0.3, lw=1, ax=ax)
        interp_tpr = np.interp(mean_fpr, viz.fpr, viz.tpr)
        interp_tpr[0] = 0.0
        tprs.append(interp_tpr)
        aucs.append(viz.roc_auc)

    ax.plot([0, 1], [0, 1], linestyle='--', lw=2, color='r',
            label='Random', alpha=.8)

    mean_tpr = np.mean(tprs, axis=0)
    mean_tpr[-1] = 1.0
    mean_auc = auc(mean_fpr, mean_tpr)
    std_auc = np.std(aucs)
    ax.plot(mean_fpr, mean_tpr, color='b',
            label=r'Mean ROC (AUC = %0.2f $\pm$ %0.2f)' % (mean_auc, std_auc),
            lw=2, alpha=.8)

    std_tpr = np.std(tprs, axis=0)
    tprs_upper = np.minimum(mean_tpr + std_tpr, 1)
    tprs_lower = np.maximum(mean_tpr - std_tpr, 0)
    ax.fill_between(mean_fpr, tprs_lower, tprs_upper, color='grey', alpha=.2,
                    label=r'$\pm$ 1 std. dev.')

    ax.set(xlim=[-0.05, 1.05], ylim=[-0.05, 1.05])
    ax.set_title(label=f"{n_folds}-fold CV ROC curve: {name}", fontsize=16, fontweight='bold')
    ax.legend(loc="lower right")
    return ax

In [None]:
from scipy.stats import shapiro, bartlett, levene

def norm_test(x, alpha=0.05):
    s, p = shapiro(x)
    result = 'rejected' if p < alpha else 'accepted'
    print(f'The H0 is {result} => (W={round(s, 3)}, p={round(p, 3)})')
    
def homovar_test(x, y, alpha=0.05):
    s, p = bartlett(x, y)
    result = 'rejected' if p < alpha else 'accepted'
    print(f'The H0 is {result} => (W={round(s, 3)}, p={round(p, 3)})')
    
def multi_norm_test(df, metric='roc_auc', alpha=0.05):
    res = df.loc[metric].apply(shapiro, axis=0)
    return pd.DataFrame([0 if i[1] < alpha else 1 for i in res], 
             index=df.columns, columns=['Normality']).T

def multi_homovar_test(df, metric='roc_auc', alpha=0.05, as_df=True):
    res = bartlett(*df.loc[metric].values.T)
    if as_df:
        res = pd.DataFrame(res, columns=['Bartlett'], 
                           index=['statistic', 'p']).T
    return res

In [1]:
import Orange

def plot_cd(df, width=12):
    names = df.columns
    n = df.shape[0]
    avranks =  get_R(df)
    cd = Orange.evaluation.compute_CD(avranks, n, alpha='0.05') 
    print('Critical Difference:', cd)
    ax = Orange.evaluation.graph_ranks(avranks, names, 
                                       cd=cd, width=width, textspace=1.)
    plt.show()

In [None]:

def plot_p_heatmap(df):
    cmap = ['1', '#fb6a4a',  '#08306b',  '#4292c6', '#c6dbef']
    p_values_nemenyi = pairwise_nemenyi(df)[0]
    mask = np.zeros_like(p_values_nemenyi, dtype=np.bool)
    mask[np.triu_indices_from(mask)] = True

    ax, _ = sp.sign_plot(p_values_nemenyi - 0.000000001, clip_on= False, cmap=cmap,
                     linewidths= 1, linecolor= 'white', mask=mask,
                     annot=p_values_nemenyi, fmt='.2f')
    plt.show()

In [None]:
from statannot import add_stat_annotation

def plot_box_signif(df, metric_name, **kwargs):
    
    df = order_df(df, metric_name, **kwargs).loc[metric_name]
    p_values_nemenyi = pairwise_nemenyi(df)[0]
            
    df_melted = pd.melt(df, 
                        var_name='method', value_name='score')
    
    box_pairs = [*it.combinations(df.columns, 2)]
    sig_p_values = []
    sig_box_pairs = []
    for pair in box_pairs:
    #     print(pair)
        p = p_values_nemenyi.loc[pair]
        if p <= 0.001:
            sig_p_values.append(p)
            sig_box_pairs.append(pair)
    
    

    fig, ax = plt.subplots(figsize=(15,8))
    ax = sns.boxplot(data=df_melted, x='method', y='score')
    ax = sns.swarmplot(data=df_melted, x='method', y='score', size=8)
    for i,box in enumerate(ax.artists):
        box.set_edgecolor('black')
        box.set_facecolor('white')

        # iterate over whiskers and median lines
        for j in range(6*i,6*(i+1)):
             ax.lines[j].set_color('black')
                
    if metric_name == 'roc_auc':
        ax.set_ylim(0.3, 1.001)
        ax.axhline(0.5, ls='--')
    else:
        ax.set_ylim(0.0, 1.001)


    test_results = add_stat_annotation(ax, data=df_melted, 
                                       x='method', y='score',
                                       box_pairs=sig_box_pairs,
                                       perform_stat_test=False,
                                       pvalues=sig_p_values,
                                       #test_short_name=test_short_name,
                                       text_format='star', verbose=0, loc='outside')

In [None]:
def randomize_y_labels(y_target, random_chi=0.1):
    '''Función para distribuir de forma aleatoria una fracción 
    chi del vector de etiquetas, de forma estratificada'''
    
    # Make a copy of the original vector
    y_copy = y.copy()
    
    # Get the number of actives inside the y_target vector
    n_actives = y_target.sum()
    random_size = np.floor(random_chi * n_actives)
    # Initialize the counters
    act_count = random_size
    inact_count = random_size
    
    # Create the randomized list of idexes
    idx_shuffled = np.random.choice(range(len(y)), len(y), replace=False)
    # iterate over idx_shuffled until act and inact counters == 0
    for l in idx_shuffled:
        if act_count > 0:
            if y_copy[l] == 1: # Is active, then change it to inactive
                y_copy[l] = 0
                act_count = act_count - 1
                continue
            if inact_count > 0: # If is inactive, change it to active
                y_copy[l] = 1
                inact_count = inact_count - 1
                continue
        else:
            break
    return(y_copy)

In [None]:
import scipy.stats as st

get_ci = lambda x: st.t.interval(0.95, len(x) - 1,
                loc=np.mean(x), scale=st.sem(x))

compute_stats = lambda r: pd.Series(
        {'mean': r.mean(), 
         'std': r.std()
        }
    )

def get_group_stats(dict_results, metric_name):
    stats_df = []

    for chi in dict_results.keys():
        df =  dict_results[chi].loc[metric_name]
        stats = df.apply(compute_stats)
        stats_df.append(stats)
        
    means = pd.concat(stats_df, keys=dict_results.keys()).loc[(slice(None), 'mean'),:].droplevel(1)
    stds = pd.concat(stats_df, keys=dict_results.keys()).loc[(slice(None), 'std'),:].droplevel(1)
    
    # Set index
    means.index = [100, 75, 50, 25, 0]
    stds.index = [100, 75, 50, 25, 0]
    
    df_results = pd.melt(means.reset_index(), id_vars=('index'), var_name='method', value_name='mean')
    df_results['std'] = pd.melt(stds.reset_index(), id_vars=('index'), var_name='method', value_name='std')['std']
    
    return df_results 

In [None]:
def pROC_auc_rand(N, n_a, normalized=True):
    x_1 = np.log10(1/N)
    x_2 = 0
    a = (10**x_1) / np.log(10)
    b = (10**x_2) / np.log(10)
    auc = (b - a)
    
    if normalized:
        auc /= - x_1
    
    return auc

In [None]:
from scipy.stats import ttest_ind, shapiro, bartlett, mannwhitneyu 

def compare_two_distributions(conf):
    actives = X_tian.query('activity == 1')[conf]
    inactives = X_tian.query('activity == 0')[conf]
    dk_scores = X_tian[[conf, 'activity']]

    # Normality test
    print('Actives Normality')
    display(shapiro(actives))
    print('Inactives Normality')
    display(shapiro(inactives))
    # T-test
    print('t-student test')
    display(ttest_ind(actives, inactives))
    # Bartlett
    print('Bartlett test')
    display(bartlett(actives, inactives))
    # AUC_ROC
    print('AUC-ROC')
    display(auc_all.loc[conf])
    # Compute Wilcoxon
    display(mannwhitneyu(actives, inactives))
    
    med_actives = np.median(actives)
    med_inactives = np.median(inactives)


    fig, (box, hist) = plt.subplots(2, sharex=True, figsize=(10, 8),
                                   gridspec_kw= {"height_ratios": (0.2, 1)})
    
    box.set(title='Active/Inactive docking scores distribution')
#     sns.boxplot(x=inactives, ax=box)
    sns.boxplot(x=conf, y='activity', data=dk_scores, ax=box, orient='h')
    box.set(xlabel=None, ylabel=None)
    
    hist = sns.distplot(inactives, 
                      label='Decoys/Inactives',hist_kws={'linewidth': 0}, norm_hist=True)
    sns.distplot(actives, ax=hist, bins=35,
                 label='Actives', hist_kws={'linewidth': 0}, norm_hist=True)
    hist.axvline(med_actives, linestyle='--', c='#BD5D19')
    hist.axvline(med_inactives, linestyle='--')
    hist.set(xlabel=f'Conformation: {conf}', ylabel='Density')
    
    hist.legend()
    plt.show()