## Variant annotations in hail 

In [None]:
%%capture
! pip install firecloud
from firecloud import fiss
import pandas as pd
pd.set_option('display.max_row', 10000)
import io
import numpy as np
from pprint import pprint

In [None]:
samples_sets = fiss.fapi.get_entities('topmed-shared','topmed-shared', 'sample_set').json()
print 'Sample set name:', samples_sets[0]['name']
print 'Sample set fields:', ', '.join(samples_sets[0]['attributes'].keys())
vcf_files = samples_sets[0]['attributes']['vcf']['items']
print '# of vcf files:', len(vcf_files)

In [1]:
from hail import *
hc = HailContext(log="/topmed.log")
from pprint import pprint

### Import vcf to vds and split any multi allelic sites

In [2]:
vds = hc.import_vcf('gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/freeze.5b.chr10.phased.pass.minDP0.remDuplicates.vcf.bgz',min_partitions = 1000).split_multi()

### Import wgsa annotations as a key table. This line does the following:
    1: import full csv
    2: make a _variant_ type field
    3: key by the variant
    4: remove unneeded fields
    5: split the vep annotations into an array for each variant (there can be multiple annotations)
    6: expand each variant row to 1 row per vep annotation for ease of filtering

In [3]:
# kt = hc.import_table('freezes_2a_3a_4.snp_indel.annotated.general20170422.subset.gz.chr20.csv',delimiter='\t').annotate("v = chr+':'+pos+':'+ref+':'+alt").annotate('v = Variant(v)').key_by('v').drop(['pos','alt','chr','ref','wgsa_version']).annotate("VEP_ensembl_Consequence = VEP_ensembl_Consequence.split(',')").explode('VEP_ensembl_Consequence')

kt = (hc.import_table('gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/annotations/freeze.5.chr10.wgsa.noncoding.vep.tsv',delimiter='\t')
      .annotate("chr = 'chr'+CHROM")
      .annotate("v = chr+':'+POS+':'+REF+':'+ALT")
      .annotate('v = Variant(v)')
      .key_by('v')
      .drop(['POS','ALT','chr','REF','CHROM','wgsa_version'])
      .annotate("VEP_ensembl_Consequence = VEP_ensembl_Consequence.split(',')")
      .explode('VEP_ensembl_Consequence')
     )

In [4]:
pprint(kt.take(1))

[{u'CADD_phred': u'4.946',
  u'CADD_raw': u'0.225505',
  u'DANN_score': u'0.50821951740435234',
  u'Eigen_PC_phred': u'4.16501',
  u'Eigen_PC_raw': u'-0.159546417332822',
  u'Eigen_coding_or_noncoding': u'n',
  u'Eigen_phred': u'1.30879',
  u'Eigen_raw': u'-0.303227552104844',
  u'GenoCanyon_score': u'2.78746919547125E-4',
  u'RegulomeDB_score': u'6',
  u'VEP_ensembl_Consequence': u'downstream_gene_variant',
  u'VEP_ensembl_Gene_ID': u'ENSG00000260370',
  u'VEP_ensembl_Transcript_ID': u'ENST00000566940',
  u'fathmm_MKL_non_coding_pred': u'.',
  u'funseq2_noncoding_score': u'0',
  u'funseq_noncoding_score': u'.',
  u'integrated_fitCons_score': u'0.061011',
  u'v': Variant(contig=chr10, start=10023, ref=C, alts=[AltAllele(ref=C, alt=A)])}]


### Annotated vds with key table

In [5]:
vds = vds.annotate_variants_table(kt, root='va.wgsa')

### Load expression data -> gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/annotations

In [6]:
islet_trans = KeyTable.import_bed("gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/annotations/t2dreamdb_rnaseq_avgFPKM_greater2_v2.bed")

In [7]:
pprint(islet_trans.take(1))

[{u'interval': Interval(start=Locus(contig=chr1, position=203795655), end=Locus(contig=chr1, position=203855000)),
  u'target': u'ENSG00000058673'}]


In [8]:
islet_trans_pad = KeyTable.import_bed("gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/annotations/t2dreamdb_rnaseq_avgFPKM_greater2_padding_v2.bed")


### Load the islet data

In [9]:
islet_states = KeyTable.import_bed('gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/annotations/Islets.chromatinStates.hg38.v2.bed').filter('target == "10_Active_enhancer_2" || target == "9_Active_enhancer_1"')                           




### Load islet tfbs data

In [10]:
islet_tfbs_file = 'gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/annotations/all_tfbs_chr10.enhancer.hg38.bed'
islet_tfbs = KeyTable.import_bed(islet_tfbs_file)
# islet_tfbs = hc.import_table(islet_tfbs_file,delimiter=',').annotate("i = V1+':'+V2+'-'+V3" ).annotate('i = Interval(i)').key_by('i')


### Find variants that fall within expressed genes or islet states or ptv

In [11]:
vds = (vds.variant_qc()
       .cache()
       .filter_variants_expr('va.qc.AF < 0.01')
       .annotate_variants_table(islet_trans_pad,root='va.gene')
       .annotate_variants_table(islet_states,root='va.chr_state')
       .annotate_variants_table(islet_tfbs,root='va.tfbs')
      )

In [None]:
# vds = vds.filter_variants_expr('va.chr_state == "10_Active_enhancer_2" || va.chr_state == "9_Active_enhancer_1" || isDefined(va.tfbs) ||  va.wgsa.VEP_ensembl_Consequence == "splice_acceptor_variant"  || va.wgsa.VEP_ensembl_Consequence == "splice_donor_variant" || va.wgsa.VEP_ensembl_Consequence== "splice_region_variant" || va.wgsa.VEP_ensembl_Consequence == "stop_gained" || va.wgsa.VEP_ensembl_Consequence == "stop_lost" || va.wgsa.VEP_ensembl_Consequence == "start_gained" || va.wgsa.VEP_ensembl_Consequence == "start_lost" || va.wgsa.VEP_ensembl_Consequence == "frameshift_variant"')

In [12]:
vds_all_in_gene_pad = vds.filter_variants_table(islet_trans_pad)


In [13]:
vds_all_in_gene = vds.filter_variants_table(islet_trans)

In [14]:
# vds_ptv = vds_all_in_gene.filter_variants_expr('va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("splice_acceptor_variant")) || va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("splice_donor_variant")) || va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("splice_region_variant")) || va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("stop_gained")) || va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("stop_lost")) || va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("start_gained")) || va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("start_lost")) || va.wgsa.VEP_ensembl_Consequence.forall(tc => tc.toSet.contains("frameshift_variant"))')

vds_tfbs = vds_all_in_gene_pad.filter_variants_table(islet_tfbs)
vds_states = vds_all_in_gene_pad.filter_variants_table(islet_states)


In [15]:
vds_ptv = vds_all_in_gene.filter_variants_expr('va.wgsa.VEP_ensembl_Consequence == "splice_acceptor_variant"  || va.wgsa.VEP_ensembl_Consequence == "splice_donor_variant" || va.wgsa.VEP_ensembl_Consequence== "splice_region_variant" || va.wgsa.VEP_ensembl_Consequence == "stop_gained" || va.wgsa.VEP_ensembl_Consequence == "stop_lost" || va.wgsa.VEP_ensembl_Consequence == "start_gained" || va.wgsa.VEP_ensembl_Consequence == "start_lost" || va.wgsa.VEP_ensembl_Consequence == "frameshift_variant"')
vds_ptv.count()

(54035L, 10225L)

In [None]:
vds_tfbs.write('gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/testing-5b-chr10/chr10_tfbs.vds')

In [None]:
vds_states.write('gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/testing-5b-chr10/chr10_states.vds')

In [None]:
vds_ptv.write('gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/testing-5b-chr10/chr10_ptv.vds')


In [None]:
union_vds = hc.read(['gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/testing-5b-chr10/chr10_tfbs.vds', 'gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/testing-5b-chr10/chr10_states.vds','gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/testing-5b-chr10/chr10_ptv.vds'])
union_vds.variant_schema

### 

In [None]:
(union_vds
 .export_variants('gs://dataproc-0a502eb9-92d4-4031-a99d-7b98ab92717b-us/testing-5b-chr10/freeze5b_chr10_rarevar_v1.tsv', 'group_id = va.gene, chromosome = v.locus.contig, variant_id = va.rsid, position=v.locus.position, ref = v.ref, alt=v.alt, VEP_ensembl_Consequence = va.wgsa.VEP_ensembl_Consequence, chromatin_state = va.chr_state, tfbs = va.tfbs, allele_count = va.qc.AC, CADD_phred = va.wgsa.CADD_phred, DANN_score = va.wgsa.DANN_score, Eigen_PC_phred = va.wgsa.Eigen_PC_phred, Eigen_phred=va.wgsa.Eigen_phred, Eigen_coding_or_noncoding = va.wgsa.Eigen_coding_or_noncoding, GenoCanyon_score = va.wgsa.GenoCanyon_score, RegulomeDB_score = va.wgsa.RegulomeDB_score, fathmm_MKL_non_coding_pred = va.wgsa.fathmm_MKL_non_coding_pred, funseq2_noncoding_score = va.wgsa.funseq2_noncoding_score, funseq_noncoding_score = va.wgsa.funseq_noncoding_score, integrated_fitCons_score = va.wgsa.integrated_fitCons_score')
)



In [None]:
vds.query_variants('variants.take(5)')

In [None]:
# %%capture
# ! pip install firecloud
# from firecloud import fiss
# import pandas as pd
# import io


In [None]:
# vds_subset = vds_subset.variant_qc()

In [None]:
# data_model = fiss.fapi.get_entities_tsv("topmed-shared","topmed-shared", "sample")
# data_model_text = pd.read_csv(io.StringIO(data_model.text), sep='\t')[['entity:sample_id','participant','CENTER','study','topmed_project']]
# data_model_text.rename(columns = {'entity:sample_id':'ent_sample_id', 'participant':'sample_id'}, inplace = True)
# data_model_text[['study', 'topmed_project']] = data_model_text[['study', 'topmed_project']].astype(str)
# from pyspark.sql import SQLContext
# sqlctx = SQLContext(hc.sc)
# spark_df = sqlctx.createDataFrame(data_model_text)
# kt = KeyTable.from_dataframe(spark_df,key='sample_id')
# vds = vds_subset.annotate_samples_table(kt, root='sa').sample_qc()
# vds = vds.annotate_samples_expr('sa.nDoubles = gs.filter(g => g.isHet() && va.qc.AC == 2).count()')
# vds = vds.annotate_samples_expr('sa.nTri_to_one = gs.filter(g => g.isHet() && va.qc.AC == 3).count()')
# vds = vds.annotate_samples_expr('sa.nOne = gs.filter(g => g.isHet() && va.qc.AF < 0.01 && va.qc.AF > 0.001).count()')
# vds = vds.annotate_samples_expr('sa.nTen = gs.filter(g => g.isHet() && va.qc.AF < 0.1 && va.qc.AF > 0.01).count()')
# vds = vds.annotate_samples_expr('sa.nTen_above = gs.filter(g => g.isHet() && va.qc.AF > 0.1).count()')
# vds.samples_table().aggregate_by_key(key_expr=['Pop = sa.topmed_project'], agg_expr=['Singletons = sa.map(s => sa.qc.nSingleton).stats().sum',
#                                                                                           'Doubletons = sa.map(s => sa.nDoubles).stats().sum',
#                                                                                           'Tripletons_to_01 = sa.map(sa => sa.nTri_to_one).stats().sum',
#                                                                                           'Zero_1_to_1 = sa.map(sa => sa.nOne).stats().sum']).to_pandas()

In [None]:
# vds.samples_table().aggregate_by_key(key_expr=['Pop = sa.topmed_project'], agg_expr=['Single = sa.map(s => sa.qc.nSingleton).stats()']).to_pandas()