In [0]:
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.FloatType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.BooleanType

def loadCustomTable(tableName:String) = {
    spark.read.parquet(myFile(s"tables/$tableName")).createOrReplaceTempView(tableName)
}
def myFile(f:String) = s"s3a://kf-strides-variant-parquet-prd/notebooks/ad7a1e3b-f732-41c4-be11-f6938f4323e5/$f"

spark.read.option("sep", "\t").option("header", "true")
    .option("nullValue", ".")
    .csv(myFile("hg38_HGMD2022Q3_variant.txt"))
    .withColumnRenamed("#Chr", "chromosome")
    .withColumn("chromosome", regexp_replace($"chromosome", "chr", ""))
    .withColumn("start", $"Start".cast(LongType))
    .withColumn("end", $"End".cast(LongType))
    .withColumnRenamed("Ref", "reference")
    .withColumnRenamed("Alt", "alternate")
    .withColumnRenamed("HGMD2022Q3_ID", "id")
    .withColumnRenamed("HGMD2022Q3_CLASS", "variant_class")
    .withColumnRenamed("HGMD2022Q3_MUT", "mut")
    .withColumnRenamed("HGMD2022Q3_GENE", "symbol")
    .withColumnRenamed("HGMD2022Q3_STRAND", "strand")
    .withColumnRenamed("HGMD2022Q3_DNA", "dna")
    .withColumnRenamed("HGMD2022Q3_PROT", "prot")
    .withColumnRenamed("HGMD2022Q3_DB", "db")
    .withColumnRenamed("HGMD2022Q3_PHEN", "phen")
    .withColumn("rankscore", $"HGMD2022Q3_RANKSCORE".cast(FloatType))
    .withColumn("variant_end", $"HGMD2022Q3_END".cast(LongType))
    .withColumn("svlen", $"HGMD2022Q3_SVLEN".cast(LongType))
    .coalesce(1)
    .write.mode("overwrite")
    .parquet(myFile(s"tables/hg38_HGMD2022Q3_variant"))        

spark.read.options(Map("inferSchema"->"true","sep"->"\t","header"->"true","nullValue"->"."))
    .csv(myFile("gencc-submissions_20220728.txt"))
    .coalesce(1)
    .write.mode("overwrite")
    .parquet(myFile(s"tables/gencc"))   
    
loadCustomTable("hg38_HGMD2022Q3_variant")
loadCustomTable("gencc")

In [1]:
%pyspark
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
cond = ['chromosome', 'start', 'reference', 'alternate']

# latest hgmd table
table_hgmd_variant = spark.table('hg38_hgmd2022q3_variant').where(F.col('variant_class').contains("DM"))
# Table variants, added a column for max minor allele frequency among gnomAD and TOPMed databases
c_vrt = ['max_gnomad_topmed', 'topmed', 'gnomad_genomes_2_1', 'gnomad_exomes_2_1', 'gnomad_genomes_3_0', 'gnomad_genomes_3_1_1']
table_variant = spark.table('variants') \
                .withColumn('max_gnomad_topmed', F.greatest(F.lit(0), F.col('topmed')['af'], F.col('gnomad_exomes_2_1')['af'], \
                F.col('gnomad_genomes_2_1')['af'], F.col('gnomad_genomes_3_0')['af'], F.col('gnomad_genomes_3_1_1')['af'])) \
                .where(F.col('max_gnomad_topmed') < 0.0001) \
                .select(cond + c_vrt)
table_gencc = spark.table('gencc').select(['disease_original_curie', 'classification_title', 'gene_symbol', 'moi_title']) \
                .withColumnRenamed('gene_symbol', 'symbol') \
                .distinct()

In [2]:
%pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, collect_list
  
GenCC_rank_data = [('Definitive', 800), ('Strong', 700), ('Moderate', 600), ('Supportive', 500), ('Limited', 400), ('Disputed Evidence', 300), ('Refuted Evidence', 200), ('No Known Disease Relationship', 100)]
GenCC_rank = spark.createDataFrame(GenCC_rank_data, ['classification_title', 'Rank_score'])
table_gencc = table_gencc.join(GenCC_rank, 'classification_title',"left")
windowDept = Window.partitionBy("symbol").orderBy(col('Rank_score').desc())
table_gencc_simple = table_gencc.withColumn("row",row_number().over(windowDept)) \
                                .filter(col("row") == 1).select("classification_title", "symbol")
table_gencc_highest = table_gencc_simple.join(table_gencc, ["classification_title", "symbol"], "left") \
                                .drop('Rank_score').distinct().groupBy(["moi_title", "symbol"]) \
                                .agg(collect_list('disease_original_curie').alias('disease_original_curies'))

In [3]:
%pyspark
t = spark.sql('SELECT study_id, COUNT(*) FROM family_relationships GROUP BY study_id ').withColumnRenamed('count(1)', 'count')
for study_id in t.select('study_id').toPandas()['study_id']:
    table_ocr_pilot = spark.table('occurrences').where((F.col('study_id') == study_id) & (F.col('is_multi_allelic') == 'false')).withColumn('VariantID', F.concat_ws('-', F.col('chromosome'), F.col('start'), F.col('reference'), F.col('alternate'), F.col('family_id')))
    hom_variants_all = table_ocr_pilot.where((F.col('is_proband') == 'true') & (F.col('zygosity') == 'HOM') & (F.col('mother_zygosity') == 'HET') & (F.col('father_zygosity') == 'HET'))
    denovo_variants_all = table_ocr_pilot.where((F.col('is_proband') == 'true') & (F.col('zygosity') != 'WT') & (F.col('mother_zygosity') == 'WT') & (F.col('father_zygosity') == 'WT'))
    hom_variants_hgmd = hom_variants_all.join(table_hgmd_variant, cond).join(table_variant, cond).join(table_gencc_highest, 'symbol',"left")
    denovo_variants_hgmd = denovo_variants_all.join(table_hgmd_variant, cond).join(table_variant, cond).join(table_gencc_highest, 'symbol',"left")
    project_path = "yiran/pnoc008-annovar-annotation/VWB_landscape_paper_output2"
    if hom_variants_hgmd.count() > 0 :
        hom_variants_hgmd.toPandas().to_csv(f'~/cavatica/projects/{project_path}/{study_id}_homo_variants.tsv', sep='\t', index=False, na_rep='-')
    if denovo_variants_hgmd.count() > 0 :
        denovo_variants_hgmd.toPandas().to_csv(f'~/cavatica/projects/{project_path}/{study_id}_denovo_variants.tsv', sep='\t', index=False, na_rep='-')

In [4]:
%sh
ls /home/notebook/cavatica/projects/yiran/pnoc008-annovar-annotation/VWB_landscape_paper_output2/