# Variant regulatory consequence analysis

This notebook introduces new dataset `Epiraction`, which contains the regulatory consequences for variants.


## Epiraction dataset ingesion


In [1]:
from __future__ import annotations

import pandas as pd
from gentropy.common.session import Session
from pyspark.sql import Column
from pyspark.sql import functions as f

from manuscript_methods.datasets import LeadVariantEffect


In [2]:
session = Session(extended_spark_conf={"spark.driver.memory": "120G"})
dataset_path = "../../data/epiraction_v1.6"

lead_variant_maf = "../../data/lead-maf-vep"
so_terms_path = "../../data/so_terms.tsv"


25/06/26 08:55:26 WARN Utils: Your hostname, mindos resolves to a loopback address: 127.0.1.1; using 192.168.0.100 instead (on interface eno1)
25/06/26 08:55:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/26 08:55:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
session.spark


In [6]:
epirack = session.spark.read.parquet(dataset_path)
lve = LeadVariantEffect.from_parquet(session=session, path=lead_variant_maf)
so_terms = pd.read_csv(so_terms_path, sep="\t")


In [7]:
# Prepare SO terms (change them to from SO:0000001 to SO_0000001)
so_terms["featureId"] = so_terms["featureId"].str.replace("SO:", "SO_")


In [8]:
def map_so_terms(so_term_col: Column, so_terms: pd.DataFrame) -> Column:
    """Map Sequence Ontology (SO) terms to their ids without joining.

    Note:
    ----
    The scores are ordered by the most severe SO term so, if we have a list of SO_terms, choose the fist one as most severe.

    """
    expr = f.when(f.lit(False), None)

    for v in so_terms.iterrows():
        so_term = v[1][1]
        so_id = v[1][0]
        expr = expr.when(f.lit(so_id) == so_term_col, f.lit(so_term))

    return expr


In [9]:
# Prefilter lead variants
lve_dataset = (
    lve.maf_filter()
    .effect_size_filter()
    .replicated()
    .select(
        f.col("variantId"),
        f.col("variant.chromosome").alias("chromosome"),
        f.col("variant.start").alias("start"),
        f.col("variant.end").alias("end"),
        f.col("leadVariantConsequence").getField("type").alias("variantPositionEffect"),
        f.col("leadVariantConsequence")
        .getField("transcriptConsequence")
        .getField("consequenceScore")
        .alias("consequenceScore"),
        f.col("leadVariantConsequence")
        .getField("transcriptConsequence")
        .getField("variantFunctionalConsequenceIds")
        .alias("consequenceIds"),
        f.col("studyStatistics").getField("studyType").alias("studyType"),
        f.col("majorLdPopulationMaf").getField("value").alias("MAF"),
        f.col("rescaledStatistics").getField("estimatedBeta").alias("estimatedBeta"),
        f.col("rescaledStatistics").getField("estimatedSE").alias("estimatedSE"),
    )
    .withColumn(
        "consequenceOntologyTerms",
        f.transform(f.col("consequenceIds"), lambda x: map_so_terms(x, so_terms)).getItem(0),
    )
)
# Filter based on score distribution - see 05.0 notebook
epirack_dataset = (
    epirack.filter(f.col("score") > 0.05)
    .dropDuplicates(["chromosome", "start", "end", "intervalType"])
    .select(
        f.regexp_replace(f.col("chromosome"), "chr", "").alias("chromosome"),
        (f.col("start") + 1).alias("start"),
        (f.col("end") + 1).alias("end"),
        "intervalType",
    )
)




Initial number of variants: 2720012
Following number of variants: 88109
Number of variants removed: 2631903
Percentage of variants removed: 96.76%


In [10]:
lve_dataset.write.mode("overwrite").bucketBy(50, "chromosome").sortBy("start").saveAsTable("lve_lead_variants")
epirack_dataset.write.mode("overwrite").bucketBy(50, "chromosome").sortBy("start").saveAsTable("epirack_intervals")


                                                                                

In [11]:
lve_dataset_table = session.spark.table("lve_lead_variants")
epirack_dataset_table = session.spark.table("epirack_intervals")

nc_dataset = lve_dataset_table.join(
    epirack_dataset_table,
    on=(
        (lve_dataset_table.chromosome == epirack_dataset_table.chromosome)
        & (lve_dataset_table.start >= epirack_dataset_table.start)
        & (lve_dataset_table.end < epirack_dataset_table.end)
    ),
    how="left",
).select(
    lve_dataset_table.variantId,
    lve_dataset_table.chromosome,
    lve_dataset_table.start,
    lve_dataset_table.end,
    lve_dataset_table.variantPositionEffect,
    lve_dataset_table.consequenceScore,
    lve_dataset_table.consequenceOntologyTerms,
    lve_dataset_table.studyType,
    lve_dataset_table.MAF,
    lve_dataset_table.estimatedBeta,
    lve_dataset_table.estimatedSE,
    epirack_dataset_table.intervalType,
)


In [12]:
nc_dataset.write.mode("overwrite").parquet("../../data/nc_dataset")


                                                                                

In [13]:
from pyspark.sql import Window

w = Window().partitionBy("chromosome", "start", "end").orderBy("start")

nc_dedup = (
    nc_dataset.withColumn("inEnhancerRegion", f.max((f.col("intervalType") == "enhancer").cast("int")).over(w) == 1)
    .withColumn("inPromoterRegion", f.max((f.col("intervalType") == "promoter").cast("int")).over(w) == 1)
    .dropDuplicates(["chromosome", "start", "end"])
    .cache()
)


In [None]:
nc_dataset.filter(f.col("variantPositionEffect") == "out-of-gene-effect").filter(
    f.col("intervalType").isNotNull()
).show()


+----------+---------+---------+---------------------+----------------+------------------------+----------+--------------------+-------------------+-------------------+------------+
|chromosome|    start|      end|variantPositionEffect|consequenceScore|consequenceOntologyTerms| studyType|                 MAF|      estimatedBeta|        estimatedSE|intervalType|
+----------+---------+---------+---------------------+----------------+------------------------+----------+--------------------+-------------------+-------------------+------------+
|         2|218156235|218156235|   out-of-gene-effect|             0.0|    upstream_gene_var...|trans-pqtl|0.011732533521524348| 0.2439420560557209|0.03636328703736627|    enhancer|
|         2| 86930667| 86930667|   out-of-gene-effect|             0.0|    upstream_gene_var...|      eqtl| 0.08980134763485674|-0.9595888210139163|0.11139149209893032|    enhancer|
|         2| 43195651| 43195652|   out-of-gene-effect|             0.1|          intron_va

In [None]:
nc_dedup.filter(f.col("inPromoterRegion")).groupBy("consequenceOntologyTerms").count().orderBy(
    f.col("count").desc()
).show(100, truncate=False)


+-----------------------------------+-----+
|consequenceOntologyTerms           |count|
+-----------------------------------+-----+
|upstream_gene_variant              |1358 |
|intron_variant                     |924  |
|5_prime_UTR_variant                |872  |
|non_coding_transcript_exon_variant |212  |
|downstream_gene_variant            |200  |
|missense_variant                   |148  |
|synonymous_variant                 |105  |
|splice_region_variant              |41   |
|splice_donor_region_variant        |25   |
|splice_donor_variant               |14   |
|splice_donor_5th_base_variant      |8    |
|stop_gained                        |6    |
|splice_polypyrimidine_tract_variant|5    |
|frameshift_variant                 |5    |
|inframe_deletion                   |4    |
|inframe_insertion                  |3    |
|3_prime_UTR_variant                |2    |
|splice_acceptor_variant            |2    |
+-----------------------------------+-----+



In [None]:
nc_dedup.filter(f.col("inEnhancerRegion")).groupBy("consequenceOntologyTerms").count().orderBy(
    f.col("count").desc()
).show(100, truncate=False)


+-----------------------------------+-----+
|consequenceOntologyTerms           |count|
+-----------------------------------+-----+
|intron_variant                     |4502 |
|upstream_gene_variant              |2431 |
|downstream_gene_variant            |1068 |
|3_prime_UTR_variant                |366  |
|missense_variant                   |286  |
|synonymous_variant                 |152  |
|non_coding_transcript_exon_variant |92   |
|splice_region_variant              |49   |
|5_prime_UTR_variant                |36   |
|splice_polypyrimidine_tract_variant|36   |
|splice_donor_region_variant        |16   |
|frameshift_variant                 |11   |
|inframe_deletion                   |6    |
|splice_donor_5th_base_variant      |6    |
|splice_donor_variant               |6    |
|splice_acceptor_variant            |5    |
|stop_gained                        |5    |
|stop_lost                          |1    |
+-----------------------------------+-----+



In [None]:
epirack.groupBy("biofeature").count().show()


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `biofeature` cannot be resolved. Did you mean one of the following? [`end`, `geneId`, `score`, `start`, `intervalType`].;
'Aggregate ['biofeature], ['biofeature, count(1) AS count#4258L]
+- Relation [chromosome#60,start#61,end#62,geneId#63,biosampleName#64,intervalType#65,score#66,resourceScore#67,datasourceId#68,pmid#69] parquet
