In [17]:
%%capture
from pyspark.sql import SparkSession
import glow
spark = SparkSession \
    .builder \
    .appName("PythonPi") \
    .config("spark.jars.packages", "io.projectglow:glow-spark3_2.12:1.1.2") \
    .config("spark.hadoop.io.compression.codecs", "io.projectglow.sql.util.BGZFCodec") \
    .getOrCreate()
spark = glow.register(spark)
import pandas as pd
pd.set_option('max_columns', None)

In [18]:
from pyspark.sql.functions import col, regexp_replace, split, regexp_extract, input_file_name
from pyspark.sql.types import LongType, FloatType, DoubleType, StringType
from pyspark.sql import DataFrame
import numpy as np
import re

In [None]:
chromosomes = np.arange(1, 23, 1).tolist() + ["X" , "Y"]

for CHROM in chromosomes:
    CHROM = str(CHROM)
    spark.read.options(inferSchema=True,sep="\t",header=True,nullValue=".") \
        .csv("/sbgenomics/project-files/hg38_genome_anno_output/hg38_genome_chr" + CHROM + ".hg38_multianno.txt.gz") \
        .withColumn("start",col("Start").cast(LongType())) \
        .withColumn("end",col("End").cast(LongType())) \
        .withColumnRenamed("Chr", "chromosome") \
        .withColumn('chromosome', regexp_replace('chromosome', 'chr', '')) \
        .withColumnRenamed("Ref", "reference") \
        .withColumnRenamed("Alt", "alternate") \
        .coalesce(1) \
        .write.mode("overwrite") \
        .parquet("/sbgenomics/output-files/human_genome_hg38_Annovar_annotation/chromosome=" + CHROM)

[Stage 14:>                                                         (0 + 1) / 1]

In [17]:
for CHROM in chromosomes:
    CHROM = str(CHROM)
    table_temp_count = spark.read.parquet("/sbgenomics/output-files/human_genome_hg38_Annovar_annotation/chromosome=" + CHROM).count()
    print("chr" + CHROM + ": " + str(table_temp_count))



chr1: 691443044


                                                                                

chr2: 721644720


                                                                                

chr3: 594300433


                                                                                

chr4: 569258001


                                                                                

chr5: 536949579




chr6: 510235570


                                                                                

chr7: 476910409


                                                                                

chr8: 434304408


                                                                                

chr9: 365371662


                                                                                

chr10: 399789030


                                                                                

chr11: 403601226


                                                                                

chr12: 399413460


                                                                                

chr13: 293949387


                                                                                

chr14: 265554303


                                                                                

chr15: 253923975


                                                                                

chr16: 245417833


                                                                                

chr17: 248760660


                                                                                

chr18: 240268815


                                                                                

chr19: 167299191


                                                                                

chr20: 191832771


                                                                                

chr21: 114115725


                                                                                

chr22: 111215960


                                                                                

chrX: 464679107




chrY: 70909065


                                                                                

In [19]:
genome_hg38_Annovar = spark.read.parquet("/sbgenomics/output-files/human_genome_hg38_Annovar_annotation/chromosome=*")

In [21]:
genome_hg38_Annovar.limit(5).show()



+----------+-----+-----+---------+---------+------------+------------+-------------------+------------------+----------------+
|chromosome|start|  end|reference|alternate|Func.refGene|Gene.refGene| GeneDetail.refGene|ExonicFunc.refGene|AAChange.refGene|
+----------+-----+-----+---------+---------+------------+------------+-------------------+------------------+----------------+
|         1|10001|10001|        T|        A|  intergenic|NONE;DDX11L1|dist=NONE;dist=1873|              null|            null|
|         1|10001|10001|        T|        C|  intergenic|NONE;DDX11L1|dist=NONE;dist=1873|              null|            null|
|         1|10001|10001|        T|        G|  intergenic|NONE;DDX11L1|dist=NONE;dist=1873|              null|            null|
|         1|10002|10002|        A|        C|  intergenic|NONE;DDX11L1|dist=NONE;dist=1872|              null|            null|
|         1|10002|10002|        A|        G|  intergenic|NONE;DDX11L1|dist=NONE;dist=1872|              null|  

                                                                                

In [None]:
chromosomes = np.arange(20, 23, 1).tolist() + ["X" , "Y"]

for CHROM in chromosomes:
    CHROM = str(CHROM)
    spark.read.options(inferSchema=True,sep="\t",header=False,nullValue=".") \
        .csv("/sbgenomics/project-files/hg38_CADD16.chr" + CHROM + ".txt") \
        .toDF("chromosome", "start", "end", "reference", "alternate", "CADD_splice") \
        .coalesce(1) \
        .write.mode("overwrite") \
        .parquet("/sbgenomics/output-files/hg38_CADD16/chromosome=" + CHROM)

[Stage 11:>                                                         (0 + 1) / 1]

In [None]:
chromosomes = np.arange(1, 23, 1).tolist() + ["X" , "Y"]
for CHROM in chromosomes:
    CHROM = str(CHROM)
    dbNSFP_variant = spark.read.options(inferSchema=True,sep="\t",header=True,nullValue=".") \
            .csv("/sbgenomics/project-files/dbNSFP4.3a_variant.chr" + CHROM + ".gz")

    cols=[re.sub(r'(^_|_$)','',f.replace("(", "[").replace(" ", "_").replace(")", "]")) for f in dbNSFP_variant.columns]
    dbNSFP_variant.toDF(*cols) \
            .withColumnRenamed("#chr", "chromosome") \
            .withColumn("pos[1-based]", col("pos[1-based]").cast(LongType())) \
            .withColumnRenamed("pos[1-based]", "start") \
            .withColumnRenamed("ref", "reference") \
            .withColumnRenamed("alt", "alternate") \
            .coalesce(1) \
            .write.mode("overwrite") \
            .parquet("/sbgenomics/output-files/dbNSFP_variant/chromosome=" + CHROM)

[Stage 8:>                                                          (0 + 1) / 1]

In [7]:
spark.read.options(inferSchema=True,sep=",",header=True,nullValue=".") \
    .csv("/sbgenomics/project-files/phenotype_to_genes.csv.gz") \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/hpo_phenotype_to_genes")

                                                                                

In [8]:
spark.read.options(inferSchema=True,sep=",",header=True,nullValue=".") \
    .csv("/sbgenomics/project-files/genes_to_phenotype.csv.gz") \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/hpo_genes_to_phenotype")

In [29]:
# Define the path to the directory containing the files
dir_path = "PanelApp"

# Read all TSV files in the directory into a DataFrame
Panel_df = spark.read.format("csv").option("header", "true").option("delimiter", "\t") \
                 .option("inferSchema", "true").option("quote", "") \
                 .option("ignoreLeadingWhiteSpace", "true").option("ignoreTrailingWhiteSpace", "true") \
                 .load(dir_path + "/*.tsv") \
                 .withColumn("file_name", input_file_name()) \
                 .withColumnRenamed("Sources(; separated)", "Sources_semicolon_separated")

# Get a list of current column names
current_cols = Panel_df.columns

# Define a function to replace symbols with underscores
def clean_col_name(col_name):
    return re.sub('[ ,;{}()\n\t=]+', '_', col_name)

# Use list comprehension to create a list of new column names with symbols replaced by underscores
new_cols = [clean_col_name(col_name) for col_name in current_cols]

# Rename the columns using withColumnRenamed()
for i in range(len(current_cols)):
    Panel_df = Panel_df.withColumnRenamed(current_cols[i], new_cols[i])

Panel_df.coalesce(1) \
     .write.mode("overwrite") \
     .parquet("/sbgenomics/output-files/PanelApp")

                                                                                

In [3]:
spark.read.options(inferSchema=True,sep="\t",header=True,nullValue=".") \
    .csv("hg38_HGMD2023Q3_gene_lite.VWB.txt") \
    .withColumnRenamed("Chr", "chromosome") \
    .withColumn('chromosome', regexp_replace('chromosome', 'chr', '')) \
    .withColumn("start", col("Start").cast(LongType())) \
    .withColumn("end", col("End").cast(LongType())) \
    .withColumn("split_c0", split(col("#EntrezGeneID_GeneSymbol"), "_")) \
    .withColumn("entrez_gene_id",col("split_c0").getItem(0)) \
    .withColumn("symbol",col("split_c0").getItem(1)) \
    .withColumn("variant_class",split(col("Phenotypes"), ",")) \
    .drop("Chr", "Phenotypes", "split_c0", "#EntrezGeneID_GeneSymbol") \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/hg38_HGMD2023Q3_gene_lite")

                                                                                

In [4]:
spark.read.options(inferSchema=True,sep="\t",header=True, nullValue=".") \
    .csv("hg38_HGMD2023Q3_gene_sorted.VWB.txt") \
    .withColumnRenamed("Chr", "chromosome") \
    .withColumn("chromosome", regexp_replace("chromosome", "chr", "")) \
    .withColumn("start", col("Start").cast(LongType())) \
    .withColumn("end", col("End").cast(LongType())) \
    .withColumn("split_c0", split(col("#EntrezGeneID_GeneSymbol"), "_")) \
    .withColumn("entrez_gene_id",col("split_c0").getItem(0)) \
    .withColumn("symbol",col("split_c0").getItem(1)) \
    .withColumn("split_c4",split(col("Phenotypes"), "~")) \
    .withColumn("DM", split(regexp_extract(col("split_c4").getItem(0), "^DM\\[([^\\]]*)?\\]?", 1), ",")) \
    .withColumn("DM?", split(regexp_extract(col("split_c4").getItem(1), "^DM\\?\\[([^\\]]*)?\\]?", 1), ",")) \
    .withColumn("DP", split(regexp_extract(col("split_c4").getItem(2), "^DP\\[([^\\]]*)?\\]?", 1), ",")) \
    .withColumn("DFP", split(regexp_extract(col("split_c4").getItem(3), "^DFP\\[([^\\]]*)?\\]?", 1), ",")) \
    .withColumn("FP", split(regexp_extract(col("split_c4").getItem(4), "^FP\\[([^\\]]*)?\\]?", 1), ",")) \
    .withColumn("R", split(regexp_extract(col("split_c4").getItem(5), "^R\\[([^\\]]*)?\\]?", 1), ",")) \
    .drop("Chr", "Phenotypes", "split_c0", "#EntrezGeneID_GeneSymbol", "split_c4") \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/hg38_HGMD2023Q3_gene_sorted")

                                                                                

In [11]:
df = spark.read.options(inferSchema=True,sep="\t",header=True, nullValue=".") \
        .csv("hg38_HGMD2023Q3_variant.VWB.txt") \
        .withColumnRenamed("#Chr", "chromosome") \
        .withColumn("chromosome", regexp_replace("chromosome", "chr", "")) \
        .withColumn("start", col("Start").cast(LongType())) \
        .withColumn("end", col("End").cast(LongType())) \
        .withColumnRenamed("Ref", "reference") \
        .withColumnRenamed("Alt", "alternate")

for c in df.columns :
    if "HGMD2023Q3_" in c:
        if "END" in c:
            new_c = "variant_end"
        elif "GENE" in c:
            new_c = "symbol"
        elif "CLASS" in c:
            new_c = "variant_class"
        else:
            new_c = c.replace("HGMD2023Q3_", "").lower()
        df = df.withColumnRenamed(c, new_c)

df.withColumn("rankscore", col("rankscore").cast(FloatType())) \
    .withColumn("variant_end", col("variant_end").cast(LongType())) \
    .withColumn("svlen", col("svlen").cast(LongType())) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/hg38_HGMD2023Q3_variant")

                                                                                

In [15]:
hgmd=spark.read.parquet("/sbgenomics/output-files/hg38_HGMD2023Q3_variant")
hgmd.show(5)

+----------+--------+--------+---------+---------+---------+-------------+---+------+------+--------------------+--------------------+-----------+--------------------+---------+------+-----------+-----+
|chromosome|   start|     end|reference|alternate|       id|variant_class|mut|symbol|strand|                 dna|                prot|         db|                phen|rankscore|svtype|variant_end|svlen|
+----------+--------+--------+---------+---------+---------+-------------+---+------+------+--------------------+--------------------+-----------+--------------------+---------+------+-----------+-----+
|         5|79126068|79126068|        C|        T|CM2234576|          DM?|ALT|  BHMT|     +|NM_001713.3%3Ac.6...|NP_001704.2%3Ap.N...|       null|Autism_spectrum_dis.|     null|  null|       null| null|
|         5|79126136|79126136|        G|        A| CM031139|           DP|ALT|  BHMT|     +|NM_001713.3%3Ac.7...|NP_001704.2%3Ap.R...|  rs3733890|Reduced_risk_of_c...|      0.1|  null|    

In [20]:
spark.read.options(inferSchema=True,sep="\t",header=True,nullValue="NA") \
    .csv("/sbgenomics/project-files/gencc-submissions_20231003.txt") \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/gencc20231003") 

In [19]:
spark.read.options(inferSchema=True,sep="\t",header=True,nullValue="NA") \
    .csv("GenicIntolerance_v3_12Mar16.txt") \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/RVIS")

In [4]:
# Define the path to the directory containing the files
dir_path = "dbNSFP4.5a/"

# Read all TSV files in the directory into a DataFrame
dbNSFP_gene = spark.read.options(inferSchema=True,sep="\t",header=True, nullValue=".") \
    .csv("dbNSFP4.5a/dbNSFP4.5_gene.gz") \
    .withColumnRenamed("chr", "chromosome")

new_columns = [col.replace('(', '[').replace(')', ']').replace(' ', '_') for col in dbNSFP_gene.columns]
dbNSFP_gene = dbNSFP_gene.toDF(*new_columns)

    
dbNSFP_gene.coalesce(1) \
        .write.mode("overwrite") \
        .parquet("/sbgenomics/output-files/dbNSFP4.5_gene")

23/11/14 20:57:41 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [None]:
chromosomes = np.arange(1, 23, 1).tolist() + ["X" , "Y"]
for CHROM in chromosomes:
    CHROM = str(CHROM)
    dbNSFP_variant = spark.read.options(inferSchema=True,sep="\t",header=True,nullValue=".") \
            .csv("dbNSFP4.5a/dbNSFP4.5a_variant.chr" + CHROM + ".gz")

    cols=[re.sub(r'(^_|_$)','',f.replace("(", "[").replace(" ", "_").replace(")", "]")) for f in dbNSFP_variant.columns]
    dbNSFP_variant.toDF(*cols) \
            .withColumnRenamed("#chr", "chromosome") \
            .withColumn("chromosome", col("chromosome").cast(StringType())) \
            .withColumn("fathmm-XF_coding_score", col("fathmm-XF_coding_score").cast(DoubleType())) \
            .withColumn("pos[1-based]", col("pos[1-based]").cast(LongType())) \
            .withColumnRenamed("pos[1-based]", "start") \
            .withColumnRenamed("ref", "reference") \
            .withColumnRenamed("alt", "alternate") \
            .coalesce(1) \
            .write.mode("overwrite") \
            .parquet("/sbgenomics/output-files/dbNSFP4.5_variant/chromosome=" + CHROM)

[Stage 27:>                                                         (0 + 1) / 1]

In [32]:
%%time
dbnsfp_v4_5 = spark.read.parquet('/sbgenomics/output-files/dbNSFP4.5_variant/')

CPU times: user 2.76 ms, sys: 530 µs, total: 3.29 ms
Wall time: 252 ms


23/11/28 02:48:33 WARN DataSource: Found duplicate column(s) in the data schema and the partition schema: `chromosome`


In [34]:
# Check the distinct values of the 'chromosome' column in the DataFrame
dbnsfp_v4_5.select('chromosome').distinct().show()

                                                                                

+----------+
|chromosome|
+----------+
|         7|
|        15|
|        11|
|         3|
|         8|
|        22|
|        16|
|         5|
|        18|
|         Y|
|        17|
|         6|
|        19|
|         X|
|         9|
|         1|
|        20|
|        10|
|         4|
|        12|
+----------+
only showing top 20 rows



In [36]:
# Check the distinct values of the 'chromosome' column in the partitioning information
distinct_partition_values = spark.sql("SHOW PARTITIONS dbnsfp_v4_5").select("chromosome").distinct()
distinct_partition_values.show()

AnalysisException: Table not found for 'SHOW PARTITIONS': dbnsfp_v4_5; line 1 pos 0;
'ShowPartitions
+- 'UnresolvedTable [dbnsfp_v4_5], SHOW PARTITIONS


In [29]:
spark.read \
    .option("header", False) \
    .option("sep", "\t") \
    .option("comment", "#") \
    .csv("/sbgenomics/workspace/dm_alphamissense/AlphaMissense_hg38.tsv.gz") \
    .toDF("chromosome", "start", "reference", "alternate", "genome", \
          "uniprot_id", "transcript_id", "protein_variant", "am_pathogenicity", \
          "am_class") \
    .withColumn("chromosome", regexp_replace("chromosome", "chr", "")) \
    .withColumn("start", col("Start").cast(LongType())) \
    .withColumn("am_pathogenicity", col("am_pathogenicity").cast(DoubleType())) \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/dm_alphamissense/AlphaMissense_hg38")

                                                                                

In [32]:
spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .option("comment", "#") \
    .csv("/sbgenomics/workspace/dm_alphamissense/AlphaMissense_gene_hg38.tsv.gz") \
    .withColumn("mean_am_pathogenicity", col("mean_am_pathogenicity").cast(DoubleType())) \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/dm_alphamissense/AlphaMissense_gene_hg38")

In [36]:
spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .option("comment", "#") \
    .csv("/sbgenomics/workspace/dm_alphamissense/AlphaMissense_aa_substitutions.tsv.gz") \
    .withColumn("am_pathogenicity", col("am_pathogenicity").cast(DoubleType())) \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/dm_alphamissense/AlphaMissense_aa_substitutions")

[Stage 35:>                                                         (0 + 1) / 1]

KeyboardInterrupt: 

In [6]:
spark.read \
    .option("header", True) \
    .option("sep", "\t") \
    .option("comment", "#") \
    .csv("/sbgenomics/workspace/dm_alphamissense/AlphaMissense_isoforms_aa_substitutions.tsv.gz") \
    .withColumn("am_pathogenicity", col("am_pathogenicity").cast(DoubleType())) \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/dm_alphamissense/AlphaMissense_isoforms_aa_substitutions")

                                                                                

In [17]:
spark.read \
    .option("header", False) \
    .option("sep", "\t") \
    .option("comment", "#") \
    .csv("/sbgenomics/workspace/dm_alphamissense/AlphaMissense_isoforms_hg38.tsv.gz") \
    .toDF("chromosome", "start", "reference", "alternate", "genome", \
          "transcript_id", "protein_variant", "am_pathogenicity", "am_class") \
    .withColumn("am_pathogenicity", col("am_pathogenicity").cast(DoubleType())) \
    .coalesce(1) \
    .write.mode("overwrite") \
    .parquet("/sbgenomics/output-files/dm_alphamissense/AlphaMissense_isoforms_hg38")

                                                                                