In [None]:
import numpy as np
import numpy.random as nrand
from scipy.stats import norm, rv_continuous
import matplotlib.pyplot as plt
import pandas as pd
import itertools

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import Pipeline


def get_natural_cubic_spline_model(x, y, minval=None, maxval=None, n_knots=None, knots=None):
    """
    Get a natural cubic spline model for the data.

    For the knots, give (a) `knots` (as an array) or (b) minval, maxval and n_knots.

    If the knots are not directly specified, the resulting knots are equally
    space within the *interior* of (max, min).  That is, the endpoints are
    *not* included as knots.

    Parameters
    ----------
    x: np.array of float
        The input data
    y: np.array of float
        The outpur data
    minval: float 
        Minimum of interval containing the knots.
    maxval: float 
        Maximum of the interval containing the knots.
    n_knots: positive integer 
        The number of knots to create.
    knots: array or list of floats 
        The knots.

    Returns
    --------
    model: a model object
        The returned model will have following method:
        - predict(x):
            x is a numpy array. This will return the predicted y-values.
    """

    if knots:
        spline = NaturalCubicSpline(knots=knots)
    else:
        spline = NaturalCubicSpline(max=maxval, min=minval, n_knots=n_knots)

    p = Pipeline([
        ('nat_cubic', spline),
        ('regression', LinearRegression(fit_intercept=True))
    ])

    p.fit(x, y)

    return p


class AbstractSpline(BaseEstimator, TransformerMixin):
    """Base class for all spline basis expansions."""

    def __init__(self, max=None, min=None, n_knots=None, n_params=None, knots=None):
        if knots is None:
            if not n_knots:
                n_knots = self._compute_n_knots(n_params)
            knots = np.linspace(min, max, num=(n_knots + 2))[1:-1]
            max, min = np.max(knots), np.min(knots)
        self.knots = np.asarray(knots)

    @property
    def n_knots(self):
        return len(self.knots)

    def fit(self, *args, **kwargs):
        return self


class NaturalCubicSpline(AbstractSpline):
    """Apply a natural cubic basis expansion to an array.
    The features created with this basis expansion can be used to fit a
    piecewise cubic function under the constraint that the fitted curve is
    linear *outside* the range of the knots..  The fitted curve is continuously
    differentiable to the second order at all of the knots.
    This transformer can be created in two ways:
      - By specifying the maximum, minimum, and number of knots.
      - By specifying the cutpoints directly.  

    If the knots are not directly specified, the resulting knots are equally
    space within the *interior* of (max, min).  That is, the endpoints are
    *not* included as knots.
    Parameters
    ----------
    min: float 
        Minimum of interval containing the knots.
    max: float 
        Maximum of the interval containing the knots.
    n_knots: positive integer 
        The number of knots to create.
    knots: array or list of floats 
        The knots.
    """

    def _compute_n_knots(self, n_params):
        return n_params

    @property
    def n_params(self):
        return self.n_knots - 1

    def transform(self, X, **transform_params):
        X_spl = self._transform_array(X)
        if isinstance(X, pd.Series):
            col_names = self._make_names(X)
            X_spl = pd.DataFrame(X_spl, columns=col_names, index=X.index)
        return X_spl

    def _make_names(self, X):
        first_name = "{}_spline_linear".format(X.name)
        rest_names = ["{}_spline_{}".format(X.name, idx)
                      for idx in range(self.n_knots - 2)]
        return [first_name] + rest_names

    def _transform_array(self, X, **transform_params):
        X = X.squeeze()
        try:
            X_spl = np.zeros((X.shape[0], self.n_knots - 1))
        except IndexError: # For arrays with only one element
            X_spl = np.zeros((1, self.n_knots - 1))
        X_spl[:, 0] = X.squeeze()

        def d(knot_idx, x):
            def ppart(t): return np.maximum(0, t)

            def cube(t): return t*t*t
            numerator = (cube(ppart(x - self.knots[knot_idx]))
                         - cube(ppart(x - self.knots[self.n_knots - 1])))
            denominator = self.knots[self.n_knots - 1] - self.knots[knot_idx]
            return numerator / denominator

        for i in range(0, self.n_knots - 2):
            X_spl[:, i+1] = (d(i, X) - d(self.n_knots - 2, X)).squeeze()
        return X_spl

In [None]:
def average_PCC(env_mut_fit_list):
    N = env_mut_fit_list.shape[0]
    if N == 1:
        return False
    else:
        PCC_matrix = np.corrcoef(env_mut_fit_list)
        return PCC_matrix[np.triu_indices(N,k=1)].mean()

def average_CV(env_mut_fit_list):
    N = env_mut_fit_list.shape[0]
    if N == 1:
        return False
    else:
        std = env_mut_fit_list.std(axis=0,ddof=1)
        mean = env_mut_fit_list.mean(axis=0)
        return np.mean(std/mean)

In [None]:
## mut_fit_list: list of fitness value
## N_env: number of enviornment
## env_variance: fitness variance due to change of enviornments
# The function first ordered the original fitness list, so the index of each element in list
# correspond to their ranking in the list. Then add normal distributed noise
# to each of fitness value in the list, then get the fitness ranking of each fitness, 
# according to this ranking, we shuffled the original ordered fitness list, and get the new
# fitness list in a new environment while maintaining the same shape of distribution.
# This process is repeated N_env times, then the average fitness will be calculated.

def average_env_fit(mut_fit_list,N_env,env_variance):
    env_mut_fit_list = []
    mut_fit_list.sort()
    mut_fit_list = np.array(mut_fit_list)
    N = len(mut_fit_list)
    for i in range(N_env):
        tmp_array = mut_fit_list + nrand.normal(scale=env_variance,size=N)
        order = tmp_array.argsort()
        ranks = order.argsort()
        env_mut_fit_list.append(mut_fit_list[ranks])
    env_mut_fit_list = np.array(env_mut_fit_list)
    #PCC = average_PCC(env_mut_fit_list)
    CV = average_CV(env_mut_fit_list)    
    #mean = np.exp(np.log(env_mut_fit_list).mean(axis=0)) # changed to use geometric mean.
    mean = env_mut_fit_list.min(axis=0) # use the minimum
    return mean, CV, env_mut_fit_list

In [None]:
# get_env_given_CV will do a automatic search to fit the calculated CV to the target CV,
# and return a list of across-environments-averaged fitness distribution with preset replication.

def get_env_given_CV(mut_fit_list,N_env,target_CV,cutoff = 0.0001, rep=1):
    env_variance = 0.01 # initial value of env_variance. the value will be updated during the searching
    step_size = 0.005 # setp_size of searching
    direction_flag_list = [] # 1 for CV > target_CV, -1 for CV < target_CV
    rep_list = []
    CV_list = []
    env_mut_fit_list_list = []
    while True:
        mean, CV, env_mut_fit_list = average_env_fit(mut_fit_list,N_env,env_variance)
        
        if CV - target_CV > cutoff:
            # calculated CV is greater than target CV
            direction_flag_list.append(-1)
        
        elif CV - target_CV < -cutoff:
            # calculated CV is lower than target CV
            direction_flag_list.append(1)
        
        else:
            # calculated CV is within the cutoff
            direction_flag_list.append(0)
            rep_list.append(mean)
            env_mut_fit_list_list.append(env_mut_fit_list)
            CV_list.append(CV)
            if len(rep_list) == rep:
                break
        
        if len(direction_flag_list) >= 2 and direction_flag_list[-1]*direction_flag_list[-2] == -1:
            # update step_size according to the state of last two direction flag
            step_size = step_size/2
        
        # update env_variance according to step_size and direction flag
        env_variance += step_size*direction_flag_list[-1]
    
    return np.array(rep_list), CV_list, env_mut_fit_list_list
        
def add_env_given_CV(mut_fit_list, env_mut_fit_list_list, N_env, target_CV, cutoff = 0.0001, rep=1):
    env_variance = 0.01 # initial value of env_variance. the value will be updated during the searching
    step_size = 0.005 # setp_size of searching
    direction_flag_list = [] # 1 for CV > target_CV, -1 for CV < target_CV
    rep_list = []
    CV_list = []
    additional_N_env = N_env - len(env_mut_fit_list_list[0])
    new_env_mut_fit_list_list = []
    ii = 0
    while True:
        _, _, env_mut_fit_list = average_env_fit(mut_fit_list,additional_N_env,env_variance)
        
        new_env_mut_fit_list = np.concatenate([env_mut_fit_list_list[ii],env_mut_fit_list])
        
        CV = average_CV(new_env_mut_fit_list)
        
        if CV - target_CV > cutoff:
            # calculated CV is greater than target CV
            direction_flag_list.append(-1)
        
        elif CV - target_CV < -cutoff:
            # calculated CV is lower than target CV
            direction_flag_list.append(1)
        
        else:
            # calculated CV is within the cutoff
            direction_flag_list.append(0)
            rep_list.append(new_env_mut_fit_list.min(axis=0))
            CV_list.append(CV)
            new_env_mut_fit_list_list.append(new_env_mut_fit_list)
            ii += 1
            if len(rep_list) == rep:
                break
        
        if len(direction_flag_list) >= 2 and direction_flag_list[-1]*direction_flag_list[-2] == -1:
            # update step_size according to the state of last two direction flag
            step_size = step_size/2
        
        # update env_variance according to step_size and direction flag
        env_variance += step_size*direction_flag_list[-1]
        if env_variance < 0: env_variance = 0
    
    return np.array(rep_list), CV_list, new_env_mut_fit_list_list

In [None]:
df_empirical  = pd.read_csv('All_mutations_four_env_SNF6_two_replicates-fitness.csv')

In [None]:
df_empirical

In [None]:
syn_fit_list = non_fit_list = df_empirical['YPD-fitness'].to_numpy().flatten()
syn_fit_list.sort()
non_fit_list.sort()

In [None]:
#### parameters need to tune ####

replication = 10 #10
N_env_list = list(range(1,10))+list(range(10,40,2))+list(range(40,100,10))+list(range(100,201,100)) # number of environment
target_CV_list = [[0.008,0.006],[0.008,0.005],[0.008,0.004],[0.008,0.003]] # target CV
cutoff_list = [0.98,0.99,0.995] # fitness cutoff for purging mutations
res_dict = {'N_env':[],'target_non_CV':[],'target_syn_CV':[],'rep':[],'cutoff':[],'dN':[],'dS':[]}

##################################

for target_CV in target_CV_list:
    print(target_CV)
    if target_CV[1] in res_dict['target_syn_CV']:
        continue
    for N_env in N_env_list:
        #print(N_env)
        if N_env == 1:
            non_mean_list = np.array([non_fit_list]*replication)
            syn_mean_list = np.array([syn_fit_list]*replication)
        else:
            # get_env_given_CV will do a automatic search to fit the calculated CV to the target CV,
            # and return a list of across-environments-averaged fitness distribution with preset replication.
            non_mean_list, CV_list, non_env_mut_fit_list_list = get_env_given_CV(
                non_fit_list,
                N_env,target_CV[0],
                cutoff = 0.0001,
                rep=replication
            )
            syn_mean_list, CV_list, syn_env_mut_fit_list_list = get_env_given_CV(
                syn_fit_list,
                N_env,target_CV[1],
                cutoff = 0.0001,
                rep=replication
            )

        for cutoff in cutoff_list:
            dN_list = np.sum(non_mean_list>cutoff,axis=1)/len(non_fit_list)
            dS_list = np.sum(syn_mean_list>cutoff,axis=1)/len(syn_fit_list)
            res_dict['N_env'] += [N_env]*replication
            res_dict['target_non_CV'] += [target_CV[0]]*replication
            res_dict['target_syn_CV'] += [target_CV[1]]*replication
            res_dict['rep'] += list(range(replication))
            res_dict['cutoff'] += [cutoff]*replication
            res_dict['dN'] += list(dN_list)
            res_dict['dS'] += list(dS_list)
            

res_df = pd.DataFrame(res_dict)
res_df['dNdS'] = res_df.dN/res_df.dS

In [None]:
plot_dict = {'N_env':[],'target_non_CV':[],'target_syn_CV':[],'cutoff':[],'dNdS_mean':[],'dNdS_std':[]}
for target_CV in target_CV_list:
    for N_env in N_env_list:
        for cutoff in cutoff_list:
            idx = (res_df.target_syn_CV == target_CV[1]) & (res_df.cutoff == cutoff)  & (res_df.N_env == N_env)
            dN_list = res_df[idx].dN
            dS_list = res_df[idx].dS
            dNdS_mean = (dN_list/dS_list).mean()
            dNdS_std = (dN_list/dS_list).std()
            plot_dict['N_env'].append(N_env)
            plot_dict['target_non_CV'].append(target_CV[0])
            plot_dict['target_syn_CV'].append(target_CV[1])
            plot_dict['cutoff'].append(cutoff)
            plot_dict['dNdS_mean'].append(dNdS_mean)
            plot_dict['dNdS_std'].append(dNdS_std)

In [None]:
plot_df = pd.DataFrame(plot_dict)

In [None]:
target_CV_list_plot=[[0.008, 0.003],
                     [0.008, 0.004],
                     [0.008, 0.005],
                     [0.008, 0.006]]
cutoff_list_plot = [0.98,0.99]

In [None]:
cmap = ['r','b','g']
fig,axes = plt.subplots(1,4,figsize=[22,5])
for j,target_CV in enumerate(target_CV_list_plot):
    ax = axes[j]
    ax.set_title(f'Nonsynonymous $CV$={target_CV[0]}\n Synonymous $CV$={target_CV[1]}',size=15)
    color=['red', 'green', 'blue']
    for i,cutoff in enumerate(cutoff_list_plot):
        idx = (plot_df.target_syn_CV == target_CV[1]) & (plot_df.cutoff == cutoff)
        x = plot_df[idx].N_env
        y = plot_df[idx].dNdS_mean
        x_plot = np.linspace(1,101,100)
        model_6 = get_natural_cubic_spline_model(x, y, minval=min(x), maxval=max(x), n_knots=20, 
                                                 knots = [1,4,7,10,20,30,40,100])
        y_plot = model_6.predict(x_plot)
        ax.plot(x, y, lw=2, label = f'Fitness cutoff={cutoff}')
        ax.set_ylim(-0.1,1.1)
        ax.set_xlim(-5,200)
        ax.fill_between(plot_df[idx].N_env,
                        plot_df[idx].dNdS_mean-plot_df[idx].dNdS_std*1.96,
                        plot_df[idx].dNdS_mean+plot_df[idx].dNdS_std*1.96, alpha=.3)
        ax.set_xlabel("Number of different environments",size=15)
        ax.set_ylabel("Expected dN/dS",size=15)
        ax.legend()
#plt.savefig('simulation.pdf')