# Creating Annotated Table of Global Distinct Variants

All tables containing variants in impala will be used to create a global table of disctinct variants. There variants will be given the following annotations: 

    - SnpEff coding consequence preditions  
    - DANN scores  
    - Ensembl gene annotations  
    - dbSNP rsid  
    - Clinvar clinical significance rating  
    - Kaviar allele frequency  
    
To be added when available:   
    - PFAM  
    - CADD  

### Script Requirements

- Ibis: install Ibis from git master https://github.com/cloudera/ibis
- Impyla: install impyla from git master https://github.com/cloudera/impyla
- SnpEff: http://snpeff.sourceforge.net/
- Reference (downloaded automoatically) GRCh37.74 to match vcf files

### Connect to Impala with Ibis

hdfs: update 'user' argument. 

In [65]:
import ibis
import os

# connect to impala with ibis
hdfs_port = os.environ.get('glados20', 50070)
hdfs = ibis.hdfs_connect(host='glados20', port=hdfs_port, user='selasady')
con = ibis.impala.connect(host='glados19', port=21050, timeout=120)

# enable interactive mode
ibis.options.interactive = True

## Add All Variants and Annotations from Reference Tables

All reference sources containing variants were outer joined to compile a list of all variants found on impala. 

TODO: Variants from CGI and Illumina will be added after normalized files are available. 

### Create connections to each needed table

In [28]:
db = 'p7_ref_grch37'

# connect to variant tables
kaviar = con.table('kaviar_isb', database=db)
clinvar = con.table('clinvar', database=db) 
dbsnp = con.table('dbsnp', database=db)

### Create Views with Distinct Subsets of each table

In [29]:
# subset the data for distinct elements from the needed columns
kav_sub = kaviar['chrom', 'pos', 'stop', 'ref', 'alt', 'allele_freq', 'sources'].distinct()
clin_sub = clinvar['chrom', 'pos', 'ref', 'alt', 'clin_sig', 'clin_dbn'].distinct()
dbsnp_sub = dbsnp['chrom', 'pos', 'ref', 'alt', 'rs_id', 'dbsnpbuildid', 'vc'].distinct()

In [30]:
# create views on impala
con.create_view('kav_distinct', kav_sub, database='training')
con.create_view('clin_distinct', clin_sub, database='training')
con.create_view('dbsnp_distinct', dbsnp_sub, database='training')

In [31]:
# create a connect to each view
kav = con.table('kav_distinct', database='training')
clin = con.table('clin_distinct', database='training')
dbsnp = con.table('dbsnp_distinct', database='training')

### Outer Join between Kaviar_ISB and ClinVar

 The Kaviar_ISB file and ClinVar files were joined, keeping all variants found in both tables as well as the Clinvar significance rating and phenotype name. 
 
For SNV's, Kaviar and ClinVar were matched by chromosome and exact position, as well as reference and alt allele. For indels, the tables were matched where ClinVar variant position was between the Kaviar reported start and stop position. 

In [18]:
# join clinvar and kaviar for SNP's
kav_snp = kav[kav.alt.length() == 1]

# create statements to join clinvar and kaviar
clin_expr = [kav_snp.chrom == clin.chrom, kav_snp.pos == clin.pos, kav_snp.ref == clin.ref,\
               kav_snp.alt == clin.alt]

# create expression for coalescing common columns
kav_chrom_col = ibis.coalesce(kav_snp.chrom, clin.chrom).name('chrom')
kav_pos_col = ibis.coalesce(kav_snp.pos, clin.pos).cast('int32').name('pos')
kav_ref_col = ibis.coalesce(kav_snp.ref, clin.ref).name('ref')
kav_alt_col = ibis.coalesce(kav_snp.alt, clin.ref).name('alt')

#outer join clinvar with kaviar_isb
clin_kav_snp = kav_snp.outer_join(clin, clin_expr)[kav_chrom_col, \
           kav_pos_col, kav_ref_col, kav_alt_col, clin.clin_sig, clin.clin_dbn, \
           kav_snp.allele_freq.name('kav_freq'), kav_snp.sources.name('kav_source')].distinct()

# create table with snps
con.create_table('clin_kav_distinct', clin_kav_snp, database='training')

In [21]:
# join clinvar and kaviar for indels
kav_indel = kav[kav.alt.length() != 1]

# create statements to join clinvar and kaviar
clin_expr = [kav_indel.chrom == clin.chrom, kav_indel.pos == clin.pos, kav_indel.ref == clin.ref,\
               kav_indel.alt == clin.alt]

# create expression for coalescing common columns
indel_chrom_col = ibis.coalesce(kav_indel.chrom, clin.chrom).name('chrom')
indel_pos_col = ibis.coalesce(kav_indel.pos, clin.pos).cast('int32').name('pos')
indel_pos_ref = ibis.coalesce(kav_indel.ref, clin.ref).name('ref')
indel_pos_alt = ibis.coalesce(kav_indel.alt, clin.ref).name('alt')

#outer join clinvar with kaviar_isb
clin_kav_indel = kav_indel.outer_join(clin, clin_expr)[indel_chrom_col, \
           indel_pos_col, indel_pos_ref, indel_pos_alt, clin.clin_sig, clin.clin_dbn,\
           kav_indel.allele_freq.name('kav_freq'), kav_indel.sources.name('kav_source')].distinct()

In [22]:
# insert indels into clin_kav_distinct table
con.insert('clin_kav_distinct', clin_kav_indel, database='training')

### Join ClinVar and Kaviar with dbSNP to gain rsID

In [34]:
clin_kav_distinct = con.table('clin_kav_distinct', database='training')

# create statements to join the clin_kav table with dbsnp
dbsnp_exprs = [clin_kav_distinct.chrom == dbsnp.chrom, clin_kav_distinct.pos == dbsnp.pos, \
               clin_kav_distinct.ref == dbsnp.ref, clin_kav_distinct.alt == dbsnp.alt]

# create expression for coalescing common columns
clin_kav_chrom = ibis.coalesce(clin_kav_distinct.chrom, dbsnp.chrom).name('chrom')
clin_kav_pos = ibis.coalesce(clin_kav_distinct.pos, dbsnp.pos).cast('int32').name('pos')
clin_kav_ref = ibis.coalesce(clin_kav_distinct.ref, dbsnp.ref).name('ref')
clin_kav_alt = ibis.coalesce(clin_kav_distinct.alt, dbsnp.ref).name('alt')

# outer join clin_kav with dbsnp
clinkav_dbsnp = clin_kav_distinct.outer_join(dbsnp, dbsnp_exprs)[clin_kav_chrom, clin_kav_pos, clin_kav_ref, clin_kav_alt,\
                        dbsnp.rs_id, clin_kav_distinct.clin_sig, clin_kav_distinct.clin_dbn, clin_kav_distinct.kav_freq, \
                        clin_kav_distinct.kav_source, dbsnp.dbsnpbuildid.name('dbsnp_build'), dbsnp.vc.name('var_type')]

In [35]:
# reduce all_vars to distinct entries and save as a view on impala to join with indels
con.create_table('all_vars', clinkav_dbsnp.distinct(), database='training')

## Add Regional Annotations

Region-based annotations will be added to speed up downstream analysis and lookups. 

TODO: Add CADD and PFAM annotations when available. 

In [32]:
# connect to regional annotation tables
dann = con.table('dann', database='p7_ref_grch37')
ensembl = con.table('ensembl_genes', database='p7_ref_grch37')
dbnsfp = con.table('dbnsfp_variant', database='p7_ref_grch37')

### Subset tables

In [33]:
# subset tables for only required columns and distinct rows
ensembl_sub = ensembl['chrom', 'start', 'stop', 'strand', 'gene_name', 'gene_id'].distinct()
dbnsfp_sub = dbnsfp['chrom', 'pos', 'ref', 'alt', 'cadd_raw', 'interpro_domain'].distinct()

In [5]:
# create temp tables on impala
con.create_table('ens_distinct', ensembl_sub, database='training')
con.create_table('dbnsfp_distinct', dbnsfp_sub, database='training')

In [34]:
# create a connection to each view or table
ens = con.table('ens_distinct', database='training')
dbnsfp_dist = con.table('dbnsfp_distinct', database='training')
all_vars = con.table('all_vars', database='training')

### Add Regional Annotations

The following annotations will be broken out into chromosome as these tables are getting very large. 

In [35]:
# create case statement to add DANN score depending on reference
dann_case = (ibis.case()
            .when(all_vars.alt == 'A', dann.score_a)
            .when(all_vars.alt == 'T', dann.score_t)
            .when(all_vars.alt == 'C', dann.score_c)
            .when(all_vars.alt == 'G', dann.score_g)
            .else_(ibis.NA)
            .end())    

# create expression for coalescing common columns
dann_chrom = ibis.coalesce(all_vars.chrom, dann.chrom).name('chrom')
dann_pos = ibis.coalesce(all_vars.pos, dann.pos).cast('int32').name('pos')

In [68]:
# create empty table
# create table with variants and dann score annnoations for chromosome 1

vars_dann_schema = ibis.schema([('chrom', 'string'), 
                                ('pos', 'int32'), 
                                ('ref', 'string'),
                                ('alt', 'string'), 
                                ('rs_id', 'string'), 
                                ('clin_sig', 'string'),
                                ('clin_dbn', 'string'), 
                                ('kav_freq', 'float'), 
                                ('kav_source', 'string'),
                                ('dbsnp_build', 'string'), 
                                ('var_type', 'string'), 
                                ('dann_score', 'float')])

con.create_table('vars_dann', schema=vars_dann_schema, database='training')

In [48]:
# create string of chromosomes in the data set
chroms = all_vars.chrom.distinct().execute()

In [72]:


# append each chromosome to vars_dann table
for x in chroms:
    # create table with variants and dann score annnoations
    dann_exp = [all_vars.chrom == x, dann.chrom == all_vars.chrom, all_vars.pos == dann.pos]
    con.insert('vars_dann', all_vars.join(dann, dann_exp)
               [dann_chrom, 
                dann_pos, 
                all_vars.ref, 
                all_vars.alt, 
                all_vars.rs_id, 
                all_vars.clin_sig, 
                all_vars.clin_dbn, 
                all_vars.kav_freq, 
                all_vars.kav_source, 
                all_vars.dbsnp_build, 
                all_vars.var_type,                          
                dann_case.cast('float').name('dann_score')].distinct(), database='training')



In [36]:
# connect to vars_dann table and run compute stats
vars_dann = con.table('vars_dann', database='training')
vars_dann.compute_stats()

# create table with ensembl gene annotations
ensembl_exp = [vars_dann.chrom == ens.chrom, vars_dann.pos >= ens.start, \
               vars_dann.pos <= ens.stop]
vars_dann_ens = vars_dann.join(ens, ensembl_exp)[vars_dann, ens.gene_name.name('ens_gene')].distinct()

# create a table with variants, dann and ensembl annotations
con.create_table('vars_dann_ens', vars_dann_ens, database= 'training')

In [17]:
#  create a connection to variants with dann and ensembl annotations view 
add_dbnsfp = con.table('vars_dann_ens', database = 'training')
add_dbnsfp.compute_stats()

# add CADD and PFAM scores from dbNSFP table
dbnsfp_exp = [add_dbnsfp.chrom == dbnsfp_dist.chrom, add_dbnsfp.pos == dbnsfp_dist.pos, \
              add_dbnsfp.ref == dbnsfp_dist.ref, add_dbnsfp.alt == dbnsfp_dist.alt]

global_vars = add_dbnsfp.join(dbnsfp_dist, dbnsfp_exp)[add_dbnsfp, dbnsfp_dist.sift_score,  \
                         dbnsfp_dist.polyphen2_hdiv_score, dbnsfp_dist.polyphen2_hvar_score, \
                         dbnsfp_dist.cadd_raw, dbnsfp_dist.interpro_domain].distinct()

## Save global variants as impala table

The table is too large to add snpeff annotations without first saving the table to impala. 

In [19]:
con.create_table('global_vars', global_vars.distinct(), database='training')

OperationalError: Operation is in ERROR_STATE

### Clean up temp tables

TODO: 
    - Add normalized variants from illumina and cgi
    - Add PFAM domains
    - Add snpeff annotations