In [None]:
import numpy as np
import numpy.random as nrand
import pandas as pd
import pickle
import copy
import matplotlib.pyplot as plt
from itertools import combinations
from matplotlib.ticker import FormatStrFormatter
from scipy.optimize import curve_fit
import itertools
from sklearn.linear_model import LinearRegression

In [None]:
# Change parameters to select fitness landscape and metric for analysis.

N = 10
model = 'Polynomial' #['NK','RMF','Polynomial']
metric = 'open_ratio' #['N_max','epi','r_s','open_ratio']

In [None]:
with open(f'../FL_data_3X10/{model}{N}_{metric}_landscape_3X10.pkl', 'rb') as f:
    landscape_dict = pickle.load(f)

if N == 5:
    with open('../index_file/epi_list_5s_all.pkl', 'rb') as f:
        epi_list = pickle.load(f)
    with open(f'../index_file/pathway_list_5s_all.pkl', 'rb') as f:
        pathway_list = pickle.load(f)
elif N == 10:
    with open('../index_file/epi_list_10s_all.pkl', 'rb') as f:
        epi_list = pickle.load(f)
    with open(f'../index_file/pathway_list_10s_120000.pkl', 'rb') as f:
        pathway_list = pickle.load(f)
elif N == 15:
    with open('../index_file/epi_list_15s_200000.pkl', 'rb') as f:
        epi_list = pickle.load(f)
    with open(f'../index_file/pathway_list_15s_240000.pkl', 'rb') as f:
        pathway_list = pickle.load(f)
    with open(f'../index_file/N_max_list_15s_all.pkl', 'rb') as f:
        neighbor_list = pickle.load(f)




In [None]:
Power_key = np.power(2, np.arange(N - 1, -1, -1))

In [None]:
def Add_Error(landscape,std):
    landscape_error = copy.deepcopy(landscape)
    landscape_error[:,N] += np.random.normal(0,std,landscape_error.shape[0])
    return landscape_error

In [None]:
def get_N_max(landscape):
    N_max = 0
    for gt in landscape:
        seq = gt[0:N]
        fit = gt[N]
        flag = True
        #print(f'gt:{gt}')
        for i,_ in enumerate(seq):
            seq_ = copy.deepcopy(seq)
            seq_[i] = 1 - seq_[i]
            tmp = ''.join(seq_.astype(int).astype(str))
            idx = int(tmp, 2)
            #print(f'neighbor:{fitness_landscape[idx]}')
            fit_ = landscape[idx,N]
            if fit < fit_:
                flag = False
                break
        if flag == True:
            N_max += 1
    return N_max

def get_N_max_15(landscape):
    return np.sum(np.max(landscape[neighbor_list][:,:,-1],axis=1) == landscape[neighbor_list[:,0]][:,-1])

def cal_epi(landscape):
    epi_fit_list = landscape[epi_list][:,:,-1]
    n_epi = np.sum(np.sum(epi_fit_list[:,[0,0,3,3]] > epi_fit_list[:,[1,2,1,2]],axis=1)==4)
    return n_epi/len(epi_fit_list)
    #return total_open/2580480

def cal_r_s(landscape):
    X = landscape[:,:N]
    y = landscape[:,-1]
    reg = LinearRegression().fit(X, y)
    y_predict = reg.predict(landscape[:,:N])
    roughness = np.sqrt(np.mean(np.square(y - y_predict)))
    slope = np.mean(np.abs(reg.coef_))
    return roughness/slope

def cal_open_ratio(landscape):
    pathway_fit_list = landscape[pathway_list][:,:,-1]
    total_open = np.sum(np.sum(pathway_fit_list[:,0:4]<=pathway_fit_list[:,1:5],axis=1)==pathway_fit_list.shape[1]-1)+\
    np.sum(np.sum(pathway_fit_list[:,0:4]<=pathway_fit_list[:,1:5],axis=1)==0)
    return total_open/len(pathway_list)
    #return total_open/2580480


In [None]:
if metric == 'N_max':
    if N == 15:
        get_ruggedness = get_N_max_15
    else:
        get_ruggedness = get_N_max

elif metric == 'r_s':
    get_ruggedness = cal_r_s

elif metric == 'epi':
    get_ruggedness = cal_epi

elif metric == 'open_ratio':
    get_ruggedness = cal_open_ratio

In [None]:
def get_distribution(landscape,measurement_std,rep):
    fitness_rep = []
    for _ in range(rep):
        fitness_rep.append(Add_Error(landscape,measurement_std))
    fitness_rep = np.array(fitness_rep)

    duplicates_list = list(range(rep))
    ruggedness_dict = {i:[] for i in range(1,rep+1)}
    tmplate = copy.deepcopy(landscape)
    for duplication in range(1,rep+1):
        if duplication == 1:
            iter_list = duplicates_list
            for replicate in iter_list:
                ruggedness_dict[duplication].append(get_ruggedness(fitness_rep[replicate]))
        else:
            iter_list = combinations(duplicates_list,duplication)   
            for replicate in iter_list:
                tmplate[:,N] = np.average(fitness_rep[list(replicate),:,N],axis=0)
                ruggedness_dict[duplication].append(get_ruggedness(tmplate))

    ground_truth = get_ruggedness(landscape)
    return ground_truth,ruggedness_dict

In [None]:
res_dict = {i:[] for i in ['ground_truth','N_site','model','replication',
                           'metric','ruggedness','measurement_std','landscape_rep','error_rep','raw']}

In [None]:
for ruggedness in range(1,4):
    for landscape_rep in range(3):
        landscape = landscape_dict[ruggedness][landscape_rep]
        for measurement_std in [0.1/3*i for i in [1,2,3,4]]:
            for error_rep in range(3):
                for rep in [3,4,5,6]:
                    print(f'{ruggedness}:{landscape_rep}:{measurement_std}:{error_rep}:{error_rep}:{rep}')
                    ground_truth,raw = get_distribution(landscape,measurement_std,rep)
                    res_dict['ground_truth'].append(ground_truth)
                    res_dict['N_site'].append(N)
                    res_dict['model'].append(model)
                    res_dict['replication'].append(rep)
                    res_dict['metric'].append(metric)
                    res_dict['ruggedness'].append(ruggedness)
                    res_dict['measurement_std'].append(measurement_std)
                    res_dict['landscape_rep'].append(landscape_rep)
                    res_dict['error_rep'].append(error_rep)
                    res_dict['raw'].append(raw)

In [None]:
with open(f'raw_data/{model}{N}_{metric}_raw.pkl', 'wb') as f:
    pickle.dump(res_dict,f)