In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, DoubleType, IntegerType
from pyspark.sql.functions import col, struct, explode, when, lit



In [3]:
# load and output directory
# vep_srcdir = 's3://dig-analysis-data/out/varianteffect/effects/part-*'
# outdir = 's3://dig-bio-index/burden/vepbinning'

# development localhost directories
vep_srcdir = '/Users/mduby/Data/Broad/Aggregator/BurdenBinning/20200330/test*'
outdir = '/Users/mduby/Data/Broad/Aggregator/BurdenBinning/20200330/Out'

# print
print("the input directory is: {}".format(vep_srcdir))


the input directory is: /Users/mduby/Data/Broad/Aggregator/BurdenBinning/20200330/test*


In [5]:
# open spark session
spark = SparkSession.builder.appName('bioindex').getOrCreate()


In [39]:
# constants for filters
# there are 3 levels of filters (lof, impact + maf, and combined predictions)
# the 7 bins will combine variantions of these three OR conditions

# general filter
filter_pick = "pick"

# level 1 filter
filter_lof = "lof"

# level 2 filters
filter_polyphen2_hdiv_pred = "polyphen2_hdiv_pred"
filter_polyphen2_hvar_pred = "polyphen2_hvar_pred"
filter_sift_red = "sift_pred"
filter_mutationtaster_pred = "mutationtaster_pred"
filter_lrt_pred = "lrt_pred"
filter_metalr_pred = "metalr_pred"
filter_provean_pred = "provean_pred"
filter_fathmm_pred = "fathmm_pred"
filter_fathmm_mkl_coding_pred = "fathmm-mkl_coding_pred"
filter_eigen_pc_raw_rankscore = "eigen-pc-raw_rankscore"
filter_dann_rankscore = "dann_rankscore"
filter_vest3_rankscore = "vest3_rankscore"
filter_cadd_raw_rankscore = "cadd_raw_rankscore"
filter_metasvm_pred = "metasvm_pred"

# aliases w/o -
filter_fathmm_mkl_coding_pred_alias = "fathmm_mkl_coding_pred"
filter_eigen_pc_raw_rankscore_alias = "eigen_pc_raw_rankscore"

# level 3 filter
filter_impact = "impact"

# column constants
var_id = "varId"
gene_ensemble_id = "geneEnsembleId"
burden_bin_id = "burdenBinId"

# column variables for output
var_id_col = col(var_id)
gene_ensemble_id_col = col(gene_ensemble_id)
burden_bin_id_col = col(burden_bin_id)

# column variables for filters
filter_lof_col = col("lof")
filter_polyphen2_hdiv_pred_col = col("polyphen2_hdiv_pred")
filter_polyphen2_hvar_pred_col = col("polyphen2_hvar_pred")
filter_sift_pred_col = col("sift_pred")
filter_lrt_pred_col = col("lrt_pred")
filter_mutationtaster_pred_col = col("mutationtaster_pred")

In [80]:
# variables for filters conditions
condition_lof_hc = col('lof') == 'HC'
condition_impact_moderate = col('impact') == 'MODERATE'
condition_impact_high = col('impact') == 'HIGH'

# level 2 condition for bin 7
condition_level2_bin7 = (filter_polyphen2_hdiv_pred_col != 'D') & \
    (filter_polyphen2_hvar_pred_col != 'D') & \
    (filter_sift_pred_col != 'deleterious') & \
    (filter_lrt_pred_col != 'D') & \
    (~filter_mutationtaster_pred_col.isin(['A', 'D']))

# level 2 exclusion condition for bin 6
condition_level2_inclusion_bin6 = (filter_polyphen2_hdiv_pred_col == 'D') | \
    (filter_polyphen2_hvar_pred_col == 'D') | \
    (filter_sift_pred_col == 'deleterious') | \
    (filter_lrt_pred_col == 'D') | \
    (filter_mutationtaster_pred_col.isin(['A', 'D']))

# level 2 exclusion condition for bin 5
condition_level2_inclusion_bin5 = (filter_polyphen2_hdiv_pred_col == 'D') & \
    (filter_polyphen2_hvar_pred_col == 'D') & \
    (filter_sift_pred_col == 'deleterious') & \
    (filter_lrt_pred_col == 'D') & \
    (filter_mutationtaster_pred_col.isin(['A', 'D']))




In [54]:
# load the json data
vep = spark.read.json(vep_srcdir)

# print
print("the loaded vep data count is: {}".format(vep.count()))
format(vep.show())


the loaded vep data count is: 329016
+---------------+-------------+-------------+--------------------+---------+-----------------+--------------------+-----------------------+-----------------------+-------------------------------+---------------+---------+------+-----------------------+
|_corrupt_record|allele_string|assembly_name|  colocated_variants|      end|               id|               input|intergenic_consequences|most_severe_consequence|regulatory_feature_consequences|seq_region_name|    start|strand|transcript_consequences|
+---------------+-------------+-------------+--------------------+---------+-----------------+--------------------+-----------------------+-----------------------+-------------------------------+---------------+---------+------+-----------------------+
|           null|          A/G|       GRCh37|[[,,,,,,, A/G/T,,...|100011334| 10:100011334:A:G|10	100011334	1000...|                   null|       missense_variant|                           null|         

'None'

In [56]:
# create new data frame with only var id
transcript_consequences = vep.select(vep.id, vep.transcript_consequences) \
    .withColumn('cqs', explode(col('transcript_consequences'))) \
    .select(
        col('id').alias('varId'),
        col('cqs.gene_id').alias('geneEnsembleId'),
        col('cqs.' + filter_lof).alias(filter_lof),
        col('cqs.' + filter_impact).alias(filter_impact),

        col('cqs.' + filter_polyphen2_hdiv_pred).alias(filter_polyphen2_hdiv_pred),
        col('cqs.' + filter_polyphen2_hvar_pred).alias(filter_polyphen2_hvar_pred),
        col('cqs.' + filter_sift_red).alias(filter_sift_red),
        col('cqs.' + filter_mutationtaster_pred).alias(filter_mutationtaster_pred),
        col('cqs.' + filter_lrt_pred).alias(filter_lrt_pred),
        col('cqs.' + filter_metalr_pred).alias(filter_metalr_pred),

        col('cqs.' + filter_provean_pred).alias(filter_provean_pred),
        col('cqs.' + filter_fathmm_pred).alias(filter_fathmm_pred),
        col('cqs.' + filter_fathmm_mkl_coding_pred).alias(filter_fathmm_mkl_coding_pred_alias),
        col('cqs.' + filter_eigen_pc_raw_rankscore).alias(filter_eigen_pc_raw_rankscore_alias),
        col('cqs.' + filter_dann_rankscore).alias(filter_dann_rankscore),
        col('cqs.' + filter_vest3_rankscore).alias(filter_vest3_rankscore),
        col('cqs.' + filter_cadd_raw_rankscore).alias(filter_cadd_raw_rankscore),
        col('cqs.' + filter_metasvm_pred).alias(filter_metasvm_pred)
    )


# print
print("the filtered test data count is: {}".format(transcript_consequences.count()))
# transcript_consequences.show()



the filtered test data count is: 220431


In [66]:
# get the lof level 1 data frame
dataframe_lof = transcript_consequences.filter(condition_lof_hc).select(var_id_col, gene_ensemble_id_col)

# print
print("the lof data frame count is: {}".format(dataframe_lof.count()))
# dataframe_lof.show()

the lof data frame count is: 1201


In [67]:
# get the level 3 dataframe
dataframe_impact_moderate = transcript_consequences.filter(condition_impact_moderate).select(var_id_col, gene_ensemble_id_col)
dataframe_impact_high = transcript_consequences.filter(condition_impact_high).select(var_id_col, gene_ensemble_id_col)

# print
print("the moderate impact dataframe is {}".format(dataframe_impact_moderate.count()))
print("the high impact dataframe is {}".format(dataframe_impact_high.count()))


the moderate impact dataframe is 17404
the high impact dataframe is 1433


In [69]:
# get the initial level 2 dataframe
dataframe_level2 = transcript_consequences.filter(condition_level2).select(var_id_col, gene_ensemble_id_col)

print("level 2 data frame count: {}".format(dataframe_level2.count()))
# dataframe_level2.show()


level 2 data frame count: 5610


In [70]:
# BIN 1 of 7
# create the final_1 df, just lof = HC
final_bin1_data_frame = dataframe_lof.withColumn(burden_bin_id, lit('bin1_7')).select(var_id_col, gene_ensemble_id_col, burden_bin_id_col)

# print
print("the final bin# final_bin1_data_framermat(final_bin1_data_frame.count()))
# final_bin1_data_frame.show()

the final bin dataframe is: 1201


In [73]:
# BIN 7 of 7
# get the initial level 2 dataframe
dataframe_level2 = transcript_consequences.filter(condition_level2_bin7).select(var_id_col, gene_ensemble_id_col)

print("level 2 data frame count: {}".format(dataframe_level2.count()))
print("moderate impact data frame count: {}".format(dataframe_impact_moderate.count()))
print("lof data frame count: {}".format(dataframe_lof.count()))
# dataframe_level2.show()

# create the final_7 df, lof = HC, impact moderate, add in level 2 filters
final_bin7_data_frame = dataframe_lof.union(dataframe_impact_moderate).union(dataframe_level2).distinct()
final_bin7_data_frame = final_bin7_data_frame.withColumn(burden_bin_id, lit('bin7_7'))

# print
print("the final bin 7 dataframe is: {}".format(final_bin7_data_frame.count()))
# final_bin7_data_frame.show()

level 2 data frame count: 5610
moderate impact data frame count: 17404
lof data frame count: 1201
the final bin 7 dataframe is: 18632


In [81]:
# BIN 6 of 7
# get the exclusion level 2 data frame
dataframe_level2_exclusion = transcript_consequences.filter(~condition_level2_inclusion_bin5).select(var_id_col, gene_ensemble_id_col)
dataframe_level2_inclusion = transcript_consequences.filter(condition_level2_inclusion_bin6).select(var_id_col, gene_ensemble_id_col)

print("level 2 exclusion data frame count: {}".format(dataframe_level2_exclusion.count()))
print("level 2 inclusion data frame count: {}".format(dataframe_level2_inclusion.count()))
print("moderate impact data frame count: {}".format(dataframe_impact_moderate.count()))
print("lof data frame count: {}".format(dataframe_lof.count()))
# dataframe_level2.show()

# create the final_6 df, lof = HC, impact moderate, add in level 2 filters
final_bin6_data_frame = dataframe_level2_exclusion.union(dataframe_level2_inclusion) \
    .union(dataframe_lof).union(dataframe_impact_moderate) \
    .union(dataframe_level2_inclusion) \
    .distinct()
final_bin6_data_frame = final_bin6_data_frame.withColumn(burden_bin_id, lit('bin6_7'))

# print
print("the final bin 6 dataframe is: {}".format(final_bin6_data_frame.count()))
# final_bin6_data_frame.show()


level 2 exclusion data frame count: 17105
level 2 inclusion data frame count: 9564
moderate impact data frame count: 17404
lof data frame count: 1201
the final bin 6 dataframe is: 18726


In [83]:
# BIN 5 of 7
# already have the inclusion level 2 data frame 
dataframe_level2_inclusion_bin5 = transcript_consequences.filter(condition_level2_inclusion_bin5).select(var_id_col, gene_ensemble_id_col)

print("level 2 inclusion data frame count: {}".format(dataframe_level2_inclusion_bin5.count()))
print("high impact data frame count: {}".format(dataframe_impact_high.count()))
print("lof data frame count: {}".format(dataframe_lof.count()))
# dataframe_level2.show()

# create the final_5 df, lof = HC, impact moderate, add in level 2 filters
final_bin5_data_frame = dataframe_lof.union(dataframe_level2_inclusion_bin5).union(dataframe_impact_high).distinct()
final_bin5_data_frame = final_bin5_data_frame.withColumn(burden_bin_id, lit('bin5_7'))

# print
print("the final bin 5 dataframe is: {}".format(final_bin5_data_frame.count()))
# final_bin5_data_frame.show()

level 2 inclusion data frame count: 0
high impact data frame count: 1433
lof data frame count: 1201
the final bin 5 dataframe is: 1433


In [163]:
# BIN 4 of 7
# already have the inclusion level 2 data frame (exclusion from the previous bin 6 of 7)

print("level 2 inclusion data frame count: {}".format(dataframe_level2_exclusion.count()))
print("lof data frame count: {}".format(dataframe_lof.count()))
# dataframe_level2.show()

# create the final_4 df, lof = HC, impact moderate, add in level 2 filters
final_bin4_data_frame = dataframe_lof.union(dataframe_level2_exclusion).distinct()
final_bin4_data_frame = final_bin4_data_frame.withColumn(burden_bin_id, lit('bin4_7'))

# print
print("the final bin 4 dataframe is: {}".format(final_bin4_data_frame.count()))
# final_bin7_data_frame.show()

level 2 inclusion data frame count: 0
lof data frame count: 1201
the final bin 4 dataframe is: 1201


In [169]:
# BIN 3 of 7
# bin consists of bin4 level 2 filter with some added on filters
dataframe_bin3_level2_inclusion = dataframe_level2_exclusion \
    .filter(dataframe_level2_exclusion.metalr_pred == 'D') \
    .filter(dataframe_level2_exclusion.metasvm_pred == 'D') \
    .filter(dataframe_level2_exclusion.provean_pred == 'D') \
    .filter(dataframe_level2_exclusion.fathmm_mkl_coding_pred == 'D') \
    .filter(dataframe_level2_exclusion.fathmm_pred == 'D')

print("bin 3 level 2 inclusion data frame count: {}".format(dataframe_bin3_level2_inclusion.count()))
print("lof data frame count: {}".format(dataframe_lof.count()))
# dataframe_level2.show()

# create the final_3 df, lof = HC, add in level 2 filters
final_bin3_data_frame = dataframe_lof.union(dataframe_bin3_level2_inclusion).distinct()
final_bin3_data_frame = final_bin3_data_frame.withColumn(burden_bin_id, lit('bin3_7'))

# print
print("the final bin 3 dataframe is: {}".format(final_bin3_data_frame.count()))
# final_bin7_data_frame.show()

bin 3 level 2 inclusion data frame count: 0
lof data frame count: 1201
the final bin 3 dataframe is: 1201


In [168]:
dataframe_bin3_level2_inclusion.show()

+-----+--------------+---+----+------+-------------------+-------------------+---------+-------------------+--------+-----------+------------+-----------+----------------------+----------------------+--------------+---------------+------------------+------------+
|varId|geneEnsembleId|lof|pick|impact|polyphen2_hdiv_pred|polyphen2_hvar_pred|sift_pred|mutationtaster_pred|lrt_pred|metalr_pred|provean_pred|fathmm_pred|fathmm_mkl_coding_pred|eigen_pc_raw_rankscore|dann_rankscore|vest3_rankscore|cadd_raw_rankscore|metasvm_pred|
+-----+--------------+---+----+------+-------------------+-------------------+---------+-------------------+--------+-----------+------------+-----------+----------------------+----------------------+--------------+---------------+------------------+------------+
+-----+--------------+---+----+------+-------------------+-------------------+---------+-------------------+--------+-----------+------------+-----------+----------------------+----------------------+--------

In [170]:
# BIN 2 of 7
# bin consists of bin3 level 2 filter with some more added on filters
dataframe_bin2_level2_inclusion = dataframe_bin3_level2_inclusion \
    .filter(dataframe_level2_exclusion.eigen_pc_raw_rankscore > 0.9) \
    .filter(dataframe_level2_exclusion.dann_rankscore > 0.9) \
    .filter(dataframe_level2_exclusion.cadd_raw_rankscore > 0.9) \
    .filter(dataframe_level2_exclusion.vest3_rankscore > 0.9)

print("bin 2 level 2 inclusion data frame count: {}".format(dataframe_bin2_level2_inclusion.count()))
print("lof data frame count: {}".format(dataframe_lof.count()))
# dataframe_level2.show()

# create the final_2 df, lof = HC, add in level 2 filters
final_bin2_data_frame = dataframe_lof.union(dataframe_bin2_level2_inclusion).distinct()
final_bin2_data_frame = final_bin2_data_frame.withColumn(burden_bin_id, lit('bin2_7'))

# print
print("the final bin 3 dataframe is: {}".format(final_bin3_data_frame.count()))
# final_bin7_data_frame.show()

bin 2 level 2 inclusion data frame count: 0
lof data frame count: 1201
the final bin 3 dataframe is: 1201


In [175]:
# combine all the bins into one dataframe
output_data_frame = final_bin1_data_frame \
    .union(final_bin2_data_frame) \
    .union(final_bin3_data_frame) \
    .union(final_bin4_data_frame) \
    .union(final_bin5_data_frame) \
    .union(final_bin6_data_frame) \
    .union(final_bin7_data_frame).distinct()
    # .distinct() \
    # .orderBy(var_id, gene_ensemble_id, burden_bin_id)

# print
print("the final agregated bin dataframe is: {}".format(output_data_frame.count()))


the final agregated bin dataframe is: 43523


In [179]:
# only select the relevant columns
output_data_frame = output_data_frame.select(col(var_id), col(gene_ensemble_id), col(burden_bin_id))

# print
print("the final agregated bin with selected columns dataframe is: {}".format(output_data_frame.count()))

the final agregated bin dataframe is: 43523


In [180]:
# save out the output data frame to file
output_data_frame \
    .orderBy(var_id, gene_ensemble_id, burden_bin_id) \
    .write \
    .mode('overwrite') \
    .json('%s' % outdir)

# print
print("Printed out {} records to bioindex".format(output_data_frame.count()))


Printed out 43523 records to bioindex


In [None]:
# done
spark.stop()


In [None]:
# filter_polyphen2_hdiv_pred = "polyphen2_hdiv_pred"
# filter_polyphen2_hvar_pred = "polyphen2_hvar_pred"
# filter_sift_red = "sift_pred"
# filter_mutationtaster_pred = "mutationtaster_pred"
# filter_lrt_pred = "lrt_pred"
# filter_metalr_pred = "metalr_pred"
# filter_provean_pred = "provean_pred"
# filter_fathmm_pred = "fathmm_pred"
# filter_fathmm_mkl_coding_pred = "fathmm_mkl_coding_pred"
# filter_eigen_pc_raw_rankscore = "eigen_pc_raw_rankscore"
# filter_dann_rankscore = "dann_rankscore"
# filter_vest3_rankscore = "vest3_rankscore"
# filter_cadd_raw_rankscore = "cadd_raw_rankscore"
# filter_metasvm_pred = "metasvm_pred"

# aliases w/o -
# filter_fathmm_mkl_coding_pred_alias = "fathmm_mkl_coding_pred"
# filter_eigen_pc_raw_rankscore_alias = "eigen_pc-raw_rankscore"
