In [1]:
import numpy as np
from scipy.stats import norm, binom
import seaborn as sn
import handedness
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import matplotlib.pyplot as plt

 


In [2]:
def create_fam_population(pldc, pl,c, size):

    population =[]
    parents = binom.rvs(p=pl,n=2, size=size)

    ### For each family generate children ###

    for i in range(size):
        fam_size = binom.rvs(p=0.36, n=4) + 1
        p_type = parents[i]
        prob = handedness.family_probs(c,pl,pldc, fam_size)[p_type*(1+fam_size):(1+p_type)*(1+fam_size)]
        l = np.random.choice(range(1+fam_size), p=prob)
        population.append(np.array([p_type,fam_size,l]))
       
    return np.array(population)

In [3]:
def generate_fam_data(datasets, type):
    data = []
    measured = []
    
    for dataset in datasets:

        measured.append(np.array([np.sum(dataset[:,2])/np.sum(dataset[:,1]), np.sum(dataset[:,0])/ (2 *len(dataset))]))
    
        if type == 1:
            ds = np.zeros(6) 
            for family in dataset:
                couple_type, fam_size, left_children = family   
                ds[2*couple_type] +=  fam_size - left_children
                ds[2*couple_type + 1] += left_children

                
        if type == 2:
            ds = np.zeros((5,18))
            for family in dataset:
                couple_type, fam_size, left_children = family   
                ds[fam_size-1,6*couple_type + left_children] +=1

            for size in range(5):
                for cnt in range(size+2,6):
                    ds[size, cnt] = np.nan
                    ds[size, 6 + cnt] = np.nan
                    ds[size, 12 + cnt] = np.nan
        data.append(ds)
    return np.array(data).reshape(-1,np.array(data).shape[-1]), np.array(measured)

In [4]:
def create_twins_population(pldc,pl, c, type, size):
    return np.random.choice([0,1,2], size, p= handedness.twins_prob(c,pldc,pl, type))

In [5]:
def generate_twins_data(datasets):
    
    table = []
    measured = []
    for dataset in datasets:
        table.append([len(dataset[dataset==i]) for i in range(3)])
        measured.append(np.sum(dataset)/( 2 * len(dataset)))
    
    return  np.array(table), np.array(measured)

In [6]:
def resample(data, func = lambda x: x):

    new_data = []
    
    for ds in data:
        size = len(ds)
        idxes = np.random.choice(range(size),size=size, replace=True)
        new_ds = ds[idxes]
        new_data.append(new_ds)
        
    return func(new_data)


In [7]:
def simulation(pldc, pl, C):
    np.random.seed()
    
    # Create triplets table:
    triplets_data = [create_fam_population(pldc,pl,C,np.random.randint(100,1200)) for _ in range(12)]

    triplets, triplets_measured = generate_fam_data(triplets_data, 1)



    # Create multi-children table:
    multi_data =  [create_fam_population(pldc,pl,C,np.random.randint(100,1200)) for _ in range(4)]
    multi, multi_measured = generate_fam_data(multi_data, 2)


    # Create MZ table:

    mono_data = [create_twins_population(pldc,pl,C,1,np.random.randint(60,300)) for _ in range(13)]
    mono, mono_measured = generate_twins_data(mono_data)
    
    # Create MZ table:

    dz_data = [create_twins_population(pldc,pl,C,2,np.random.randint(60,300)) for _ in range(13)]
    dz, dz_measured = generate_twins_data(dz_data)
    

    tables = [triplets, multi, mono, dz]
    measured = [triplets_measured, multi_measured, mono_measured, dz_measured]
    type_list =[1,1,2,3]
    rows = [1,5,1,1]
    pldc_mle, pl_mle = handedness.run_model(tables, type_list, rows, measured)

    runs  = 200
    PLDC = np.zeros(runs)
    PL = np.zeros(runs)

    for i in range(runs):

        triplets, triplets_measured =  resample(triplets_data, lambda data: generate_fam_data(data, 1))
        multi, multi_measured =  resample(multi_data, lambda data: generate_fam_data(data, 2))
        mono, mono_measured = resample(mono_data, lambda data: generate_twins_data(data))
        dz, dz_measured = resample(dz_data, lambda data: generate_twins_data(data))

        tables = [triplets, multi, mono, dz]
        measured = [triplets_measured, multi_measured, mono_measured, dz_measured]

        pldc_m, pl_m = handedness.run_model(tables, type_list, rows, measured)

        PL[i] = pl_m
        PLDC[i] = pldc_m 

    np.savetxt(f"res_{pldc}_{pl}_mle{pldc_mle}_{pl_mle}.txt", [PLDC,PL])

    # Check absolute error:
    diff_pl = abs(pl_mle- np.mean(PL))
    diff_pldc = abs(pldc_mle-np.mean(PLDC))
    
    # Check confidence
    PL.sort()
    PLDC.sort()
    conf_pl =[]
    conf_pldc= []

    for conf in [0.99, 0.95, 0.9, 0.8, 0.5]:
        upper_idx = int(runs*((1+conf)/2)-1)
        lower_idx = int(runs*((1-conf)/2))

        if PL[lower_idx] <= pl_mle <= PL[upper_idx]:
            conf_pl.append(1)
        else:
            conf_pl.append(0)

        if PLDC[lower_idx] <= pldc_mle <= PLDC[upper_idx]:
            conf_pldc.append(1)
        else:
            conf_pldc.append(0)





    return conf_pldc, conf_pl, diff_pldc, diff_pl

In [8]:
def coverage():
    returns = 100
    diff_PL =[]
    diff_PLDC= []
    conf_PL = np.zeros(5)
    conf_PLDC = np.zeros(5)

    futures =[]
    f = simulation
    cpus = multiprocessing.cpu_count() - 5 
  
    with ProcessPoolExecutor(cpus) as executor:
        for s_ in range(returns):
            pldc = np.random.random()*0.5
            pl= np.random.random()*0.18+0.02
            kwargs = dict(pldc= pldc ,pl=pl , C=handedness.calc_c(pl, pldc))
            fut = executor.submit(f, **kwargs)
            futures.append(fut)
    
    for i,future in enumerate(as_completed(futures)):
        r1, r2, r3, r4 = future.result()

        conf_PLDC += r1
        conf_PL += r2
        diff_PLDC.append(r3)
        diff_PL.append(r4)
        
    
    
    return conf_PLDC/100, conf_PL/100, diff_PLDC, diff_PL


In [13]:
res = simulation(0.25, 0.0775,0.155)