In [1]:
import allel
import pandas 
from datetime import date
import numpy
import dask
import dask.dataframe as dd
import dask.array as da
from pathlib import Path
dask.config.set(**{'array.slicing.split_large_chunks': True})
import sys


def log(*msg):
    print(*msg, file=sys.stdout)
    sys.stdout.flush()
    

%run tools.py

### Make a VCF to Zarr function

Making use of pandas, and allel.write_vcf_header() function.

In [2]:
def write_vcf_header(vcf_file):
    
    print('##fileformat=VCFv4.1', file=vcf_file)
    # write today's date
    today = date.today().strftime('%Y%m%d')
    print('##fileDate=%s' % today, file=vcf_file)
    # write source
    print('##source=scikit-allel-%s + write_vcf_gaardian.py' % allel.__version__, file=vcf_file)
    #write refs and contigs 
    print('##reference=resources/reference/Anopheles-gambiae-PEST_CHROMOSOMES_AgamP4.fa', file=vcf_file)
    print('##contig=<ID=2R,length=61545105>', file=vcf_file)
    print('##contig=<ID=3R,length=53200684>', file=vcf_file)
    print('##contig=<ID=2L,length=49364325>', file=vcf_file)
    print('##contig=<ID=3L,length=41963435>', file=vcf_file)
    print('##contig=<ID=X,length=24393108>', file=vcf_file)
    print('##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">', file=vcf_file)


def gn_to_pd_to_vcf(vcf_file, sample_set, chrom, nchunks=50, snpfilter = "segregating"):
    
    """
    Converts genotype and POS arrays to vcf, using pd dataframes in chunks. 
    Segregating sites only. 
    """
    
    #if file exists ignore and skip
    myfile = Path(f"{vcf_file}.gz")
    if myfile.is_file():
        print(f"File {vcf_file}.gz Exists...")
        return
    
    log(f"Loading array for {chrom}...")
    geno, pos, metadata = load_arrays_and_metadata("../resources", sample_set, chrom)
    
    allpos = allel.SortedIndex(zarr.open_array(f"../resources/snp_genotypes/all/sites/{chrom}/variants/POS/")[:])
    ref_alt_filter = allpos.locate_intersection(pos)[0]
    
    refs = zarr.open_array(f"../resources/snp_genotypes/all/sites/{chrom}/variants/REF/")[:][ref_alt_filter]
    alts = zarr.open_array(f"../resources/snp_genotypes/all/sites/{chrom}/variants/ALT/")[:][ref_alt_filter]
    
    
    if snpfilter == "segregating":
        log("Find segregating sites...")
        flt = geno.count_alleles().is_segregating()
        geno = geno.compress(flt, axis=0)
        positions = pos[flt]
        refs = refs[flt].astype(str)
        alts = [a +"," + b + "," + c for a,b,c in alts[flt].astype(str)]
    elif snpfilter == 'biallelic01':
        log("Finding biallelic01 sites...")
        flt = geno.count_alleles().is_biallelic_01()
        geno = geno.compress(flt, axis=0)
        positions = pos[flt]
        refs = refs[flt].astype(str)
        alts = [a for a,b,c in alts[flt].astype(str)]
    else:
        assert np.isin(snpfilter, ['segregating', "biallelic01"]).any(), "incorrect snpfilter value"
  
    log("calculating chunks sizes...")
    chunks = np.round(np.arange(0, geno.shape[0], geno.shape[0]/nchunks)).astype(int).tolist()
    chunks.append(geno.shape[0])

    for idx, chunk in enumerate(chunks[:-1]):

        gn = geno[chunks[idx]:chunks[idx+1]].compute()
        pos = positions[chunks[idx]:chunks[idx+1]]
        ref = refs[chunks[idx]:chunks[idx+1]]
        alt = alts[chunks[idx]:chunks[idx+1]]
        
        # Contruct SNP info DF
        vcf_df = pd.DataFrame({'#CHROM': chrom,
                 'POS': pos,
                 'ID': '.',
                 'REF': ref,
                 'ALT': alt,
                 'QUAL': '.',
                 'FILTER': '.',
                 'INFO':'.',
                'FORMAT': 'GT'})

        log(f"Pandas SNP info DataFrame constructed...{idx}")

        # Geno to VCF
        vcf = pd.DataFrame(gn.to_gt().astype(str), columns=metadata['partner_sample_id'])
        log("Concatenating info and genotype dataframes...")
        vcf = pd.concat([vcf_df, vcf], axis=1)

        log(f"Pandas Genotype data constructed...{idx}")

        if (idx==0) is True:
            with open(f"{vcf_file}", 'w') as vcfheader:
                    write_vcf_header(vcfheader)

        log("Writing to .vcf")

        vcf.to_csv(vcf_file, 
                   sep="\t", 
                   index=False,
                   mode='a',
                  header=(idx==0), 
                  line_terminator="\n")

### Write multiallelic (segregating snps only) VCF file

In [4]:
sample_set = "1244-VO-GH-YAWSON-VMF00149"
chroms = ['2L', '2R', '3L', '3R', 'X']

for chrom in chroms:
    
    gn_to_pd_to_vcf(f"../resources/vcfs/ag3_gaardian_{chrom}.vcf", sample_set, chrom)
    

File ../resources/vcfs/ag3_gaardian_2L.vcf.gz Exists...
File ../resources/vcfs/ag3_gaardian_2R.vcf.gz Exists...
File ../resources/vcfs/ag3_gaardian_3L.vcf.gz Exists...
Loading array for 3R...
Find segregating sites...
calculating chunks sizes...
Pandas SNP info DataFrame constructed...0
Concatenating info and genotype dataframes...
Pandas Genotype data constructed...0
Writing to .vcf
Pandas SNP info DataFrame constructed...1


KeyboardInterrupt: 

### Write biallelic VCF file

In [3]:
sample_set = "1244-VO-GH-YAWSON-VMF00149"
chroms = ['2L', '2R', '3L', '3R', 'X']

for chrom in chroms:
    
    gn_to_pd_to_vcf(f"../resources/vcfs/ag3_gaardian_{chrom}.biallelic.vcf", sample_set, chrom, snpfilter="biallelic01")

File ../resources/vcfs/ag3_gaardian_2L.biallelic.vcf.gz Exists...
File ../resources/vcfs/ag3_gaardian_2R.biallelic.vcf.gz Exists...
File ../resources/vcfs/ag3_gaardian_3L.biallelic.vcf.gz Exists...
File ../resources/vcfs/ag3_gaardian_3R.biallelic.vcf.gz Exists...
File ../resources/vcfs/ag3_gaardian_X.biallelic.vcf.gz Exists...
