In [57]:
%run imports.ipynb
import csv
import getpass
import cython

In [58]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=30)
cluster

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [59]:
from dask.distributed import Client
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.32.1.188:43721  Dashboard: /user/carlo%20mariade%20marco1/proxy/34093/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [60]:
callset_pass = callset_biallel

In [61]:
allele_counts= zarr.open('data/phase2_biallel_allele_count.zarr')

In [62]:
outgroup_allele_counts= zarr.open('data/outgroup_alleles_phase2.zarr')

In [92]:
def outgroup_ascertainment(chrom, start, stop, outgroups):
    
    # locate region
    pos = allel.SortedIndex(callset_pass[chrom]['variants/POS'][:])
    locr = pos.locate_range(start, stop)
    
    # ascertain SNPs
    loca = np.zeros(pos.shape, dtype='b1')
    loca[locr] = True
    log('outgroup ascertainment, initial', nnz(loca))
    for s in outgroups:
        ac = allel.AlleleCountsArray(outgroup_allele_counts[chrom][s][:])
        # biallelic and non-missing
        locs = (ac.max_allele() <= 1) & (ac.sum(axis=1) > 0)
        loca &= locs
        log(s, nnz(loca))
        
    return loca
        

In [93]:
def ingroup_ascertainment(chrom, start, stop, segpops):

    # locate region
    pos = allel.SortedIndex(callset_pass[chrom]['variants']['POS'][:])
    locr = pos.locate_range(start, stop)

    # ascertain SNPs
    loca = np.zeros(pos.shape, dtype='b1')
    loca[locr] = True
    log('ingroup ascertainment, initial', nnz(loca))

    
    # require segregating
    for pop in segpops:
        ac = allel.AlleleCountsArray(allele_counts[chrom][pop][:, :2])
        loc_seg = (ac.min(axis=1) > 0)
        loca &= loc_seg
        log('after require segregating in', pop, nnz(loca))
        
    return loca
        

In [94]:
def ld_prune(pos, gn, size, step, threshold=.1, n_iter=1):
    for i in range(n_iter):
        loc_unlinked = allel.locate_unlinked(gn, size=size, step=step, threshold=threshold)
        n = np.count_nonzero(loc_unlinked)
        n_remove = gn.shape[0] - n
        log('iteration', i+1, 'retaining', n, 'removing', n_remove, 'variants')
        gn = gn.compress(loc_unlinked, axis=0)
        pos = pos.compress(loc_unlinked)
    return pos, gn

In [95]:
def to_treemix(acs, fn):
    pops = sorted(acs.keys())
    n_variants = acs[pops[0]].shape[0]
    n_alleles = acs[pops[0]].shape[1]
    assert n_alleles == 2, 'only biallelic variants supported'
    for pop in pops[1:]:
        assert n_variants == acs[pop].shape[0], 'bad number of variants for pop %s' % pop
        assert n_alleles == acs[pop].shape[1], 'bad number of alleles for pop %s' % pop
        
    with open(fn, 'wt', encoding='ascii') as f:
        print(' '.join(pops), file=f)
        for i in range(n_variants):
            print(' '.join([','.join(map(str, acs[pop][i])) for pop in pops]), file=f)


In [96]:
def downsample_and_prune(chrom, start, stop, loc_asc,
                         n=100000, ldp_size=500, ldp_step=250, ldp_threshold=.1, ldp_n_iter=1):

    # all variant positions
    pos = allel.SortedIndex(callset_pass[chrom]['variants/POS'][:])
    posa = pos[loc_asc]

    # randomly downsample
    if n < posa.shape[0]:
        posds = np.random.choice(posa, n, replace=False)
        posds.sort()
        posds = allel.SortedIndex(posds)
    else:
        # skip downsampling
        posds = posa
    locds = pos.locate_keys(posds)    

    # load genotype data
    genotype = allel.GenotypeChunkedArray(callset_pass[chrom]['calldata/GT'])
    gn = genotype.to_n_alt()

    # prune
    posu, gnu = ld_prune(posds, gn, size=ldp_size, step=ldp_step, threshold=ldp_threshold, n_iter=ldp_n_iter)
    locu = pos.locate_keys(posu)
    
    return locu

In [97]:
def run_analysis(rname, chrom, start, stop, outgroups, segpops,
                 n=100000, ldp_size=500, ldp_step=250, ldp_threshold=.1, ldp_n_iter=1):

    # initial ascertainment
    loc_og_asc = outgroup_ascertainment(chrom, start, stop, outgroups=outgroups)
    loc_ig_asc = ingroup_ascertainment(chrom, start, stop, segpops=segpops)
    loc_asc = loc_og_asc & loc_ig_asc
    log('initial ascertainment', nnz(loc_asc))
    
    # downsample and prune
    locu = downsample_and_prune(chrom, start, stop, loc_asc, 
                                n=n, ldp_size=ldp_size, ldp_step=ldp_step, 
                                ldp_threshold=ldp_threshold, ldp_n_iter=ldp_n_iter)
    
    # write allele counts
    acsu = dict()
    for pop in populations:
        acsu[pop] = allele_counts[chrom][pop][:, :2][locu]
    for pop in outgroups:
        acsu[pop] = outgroup_allele_counts[chrom][pop][:, :2][locu]

    outdir = 'data/treemix/seg_%s_og_%s_ldp_%s' % ('_'.join(segpops), '_'.join(outgroups), ldp_n_iter)
    !mkdir -pv {outdir}
    fn = os.path.join(outdir, '%s.allele_counts.txt' % rname)
    to_treemix(acsu, fn)
    !gzip -fv {fn}


In [98]:
def log(*msg):
    print(' '.join(map(str, msg)), file=sys.stdout)
    sys.stdout.flush()
nnz = np.count_nonzero

In [99]:
outgroups = ['arab', 'quad', 'meru', 'mela', 'chri']
segpops = ['BFcol', 'BFgam', 'AOcol', 'GAgam']
n = 200000
ldp_n_iter = 1
region_X_speciation = 'X-speciation', 'X', 15000000, 24000000 
region_X_free = 'X-free', 'X', 1, 14000000 
region_3L_free = '3L-free', '3L', 15000000, 41000000
region_3R_free = '3R-free', '3R', 28200000, 29200000 

In [100]:
list(outgroup_allele_counts['2L'])

['arab', 'chri', 'epir', 'mela', 'meru', 'quad']

In [None]:
import bcolz
rname, chrom, start, stop = region_3R_free
log(rname, chrom, start, stop)
run_analysis(rname, chrom, start, stop, outgroups, segpops, n=n, ldp_n_iter=ldp_n_iter)

3R-free 3R 28200000 29200000
outgroup ascertainment, initial 200873
arab 190848
quad 187297
meru 181770
mela 174043
chri 103089
ingroup ascertainment, initial 200873
after require segregating in BFcol 61068
after require segregating in BFgam 21946
after require segregating in AOcol 11411
after require segregating in GAgam 9458
initial ascertainment 4315


In [None]:
rname, chrom, start, stop = region_3L_free
log(rname, chrom, start, stop)
run_analysis(rname, chrom, start, stop, outgroups, segpops, n=n, ldp_n_iter=ldp_n_iter)

In [None]:
rname, chrom, start, stop = region_X_free
log(rname, chrom, start, stop)
run_analysis(rname, chrom, start, stop, outgroups, segpops, n=n, ldp_n_iter=ldp_n_iter)

In [None]:
rname, chrom, start, stop = region_X_speciation
log(rname, chrom, start, stop)
run_analysis(rname, chrom, start, stop, outgroups, segpops, n=n, ldp_n_iter=ldp_n_iter)