## Replicate QC

This notebook takes a merged sampleset, ie all GT arrays from a set that have been merged by the `combine-zarr-callset` pipeline, and computes pairwise distances between them.

Each contig is handled separately, and the dimensions of the resulting outputs are: contigs x npairs.

Three arrays are written, one with euclidean distance, one with cityblock distance and one with the number of comparable (ie called) sites.

NB: We restrict to bialleleic positions in phase 2. 

NB: Efficiency could be improved markedly by chunking the genotypes with (X, 1, 2), to make it more efficient at reading in the relevant data. 
(I'm currently recreating on the cluster)

In [1]:
import zarr
import allel
import pandas as pd

In [2]:
!pip install dask-distance
import dask_distance as dadist
import scipy.spatial.distance as dist
import os

[33mYou are using pip version 19.0, however version 19.0.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [3]:
import dask.array as da
import numpy as np

In [4]:
sampleset = "AG1000G-UG"

# I wonder if used memory will scale better with chunk width = 1??
chunksize = 30000

In [5]:
storage_path = 'ag1000g-release/observatory/callset.zarr'

In [6]:
# wrapper function to reshape for map_blocks
def trans_d(block, metric="euclidean"):
    return dist.pdist(block, metric=metric).reshape((-1, 1))

In [48]:
# pruning missing count
def count_nmissing(X1, X2):
    
    X1 = np.array(X1)
    X2 = np.array(X2)
    
    # compress by non missing
    ok = (X1 >= 0) & (X2 >= 0)
    
    # compute on array
    return np.sum(ok)

In [7]:
# cityblock distance after pruning missings
def cib_dist_nmissing(X1, X2):
    
    X1 = np.array(X1)
    X2 = np.array(X2)
    
    # compress by non missing
    ok = (X1 >= 0) & (X2 >= 0)
    
    # compute on array
    return dist.cityblock(
        np.compress(ok, X1),
        np.compress(ok, X2))

In [8]:
# GCS configuration
import gcsfs

gcs_bucket_fs = gcsfs.GCSFileSystem(
    project='malariagen-jupyterhub', token='anon', access='read_only')

store = gcsfs.mapping.GCSMap(
    storage_path, gcs=gcs_bucket_fs, check=False, create=False)

In [9]:
calldata = zarr.Group(store)

In [10]:
df = pd.read_csv("/gcs/observatory/manifest")

In [11]:
# assume this is ok for now. Normally use the manifest
samples = df["sample_name"].tolist()

In [12]:
from dask_kubernetes import KubeCluster

In [13]:
cluster = KubeCluster(n_workers=40)
cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [14]:
from dask.distributed import Client, progress
client = Client(cluster)

In [15]:
client

0,1
Client  Scheduler: tcp://10.8.78.8:34327  Dashboard: /user/nicholasharding/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [16]:
len(samples)

361

In [17]:
phase2_callset = zarr.open_group("/gcs/phase2/AR1/variation/main/zarr2/ag1000g.phase2.ar1")
called_sites = zarr.open_group("/gcs/observatory/ag.allsites.nonN.zarr.zip", mode="r")

In [18]:
# find biallelic sites
def find_phase2_bialleleic_sites(chrom):

    g = allel.GenotypeDaskArray(phase2_callset[chrom]["calldata"]["genotype"])
    
    # TO DO PASS ONLY
    
    biallelic = (g.max(axis=[1,2]) <= 1).compute()
                 
    d = {}
    for x in "POS", "REF", "ALT":
        v = phase2_callset[chrom]["variants"][x]
        dav = da.from_zarr(v, chunksize=v.chunks)
        d[x] = da.compress(biallelic, dav, axis=0)
        
    return d["POS"], d["ALT"], d["REF"]

In [19]:
config = {"pwd_contigs": ["3L", "3R", "2L", "2R", "X"]}
contigs = config["pwd_contigs"]

In [20]:
from itertools import combinations

In [21]:
pairs = list(combinations(range(len(samples)), 2))
npairs = len(pairs)

In [49]:
h = np.zeros((len(contigs), npairs))
denom = np.zeros((len(contigs), npairs))

In [31]:
ca = count_alts.T
ratio = ca.shape[0] / ca.chunksize[0] 

In [51]:
alt_list = []
for cix, contig in enumerate(contigs):

    sites_pos = allel.SortedIndex(called_sites[contig]["variants/POS"])
    bial_pos, bial_alt, bial_ref = find_phase2_bialleleic_sites(contig)
    loc = sites_pos.locate_keys(bial_pos)

    alleles=da.hstack((bial_ref.reshape((-1, 1)), bial_alt))

    # reduce to biallelic sites all samples still
    print(contig, "compressing")
    gt_a = allel.GenotypeDaskArray(calldata[contig]["calldata/GT"]).compress(loc)

    print(contig, "remapping")
    mapping = allel.create_allele_mapping(
        ref=np.compress(loc, called_sites[contig]["variants/REF"]),
        alt=np.compress(loc, called_sites[contig]["variants/ALT"]),
        alleles=alleles)

    count_alts = gt_a.map_alleles(mapping).to_n_alt(fill=-1)
    
    alt_list.append(count_alts)

    # transpose and rechunk for scipy dist object
    ca = count_alts.T
    ratio = ca.shape[0] / ca.chunksize[0]
    newchunks = (ca.shape[0], int(ca.chunksize[1] / ratio))
    ca = ca.rechunk(chunks=newchunks)
    nchunks = len(ca.chunks[1])

    D = ca.map_blocks(
        trans_d, 
        metric=cib_dist_nmissing,
        chunks=((1,), tuple(np.repeat(1, nchunks))), 
        dtype=float, 
        drop_axis=(0, ), 
        new_axis=(0, ))
    
    X = ca.map_blocks(
        trans_d, 
        metric=count_nmissing,
        chunks=((1,), tuple(np.repeat(1, nchunks))), 
        dtype=float, 
        drop_axis=(0, ), 
        new_axis=(0, ))
    
    h[cix] = D.compute().sum(axis=1)
    denom[cix] = X.compute().sum(axis=1)

3L compressing
3L remapping
3R compressing
3R remapping
2L compressing
2L remapping
2R compressing
2R remapping
X compressing
X remapping


In [52]:
np.savez_compressed(
    "replicate-qc-{sset}".format(sset=sampleset), 
    cityblock=h, 
    nsites=denom)