In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType
from pyspark.sql.functions import col, explode, expr, collect_list, when, concat_ws, first, sum, lit, collect_set, split, udf, countDistinct


In [8]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/21 15:51:35 INFO SparkEnv: Registering MapOutputTracker
24/03/21 15:51:35 INFO SparkEnv: Registering BlockManagerMaster
24/03/21 15:51:35 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/03/21 15:51:35 INFO SparkEnv: Registering OutputCommitCoordinator


## Target prioritisation validation based on Open Targets genetic score for IBD

### Export Target-IBD associations from Open Targets platform
##### Sources of evidence: Genetic evidence (excluding PanelApp, gene2phenotype, Orphanet, Clingen), Animal models, Expression Atlas

In [10]:
# Target-Disease indirect (propagated) evidence from Open Targets Platform
evidence_path = "gs://open-targets-data-releases/23.12/output/etl/parquet/associationByDatasourceIndirect"
evidence = spark.read.parquet(evidence_path)
evidence.show()

+----------------+------------+-----------+---------------+-------------------+-------------+
|      datatypeId|datasourceId|  diseaseId|       targetId|              score|evidenceCount|
+----------------+------------+-----------+---------------+-------------------+-------------+
|somatic_mutation|     intogen|EFO_0000403|ENSG00000005339| 0.8500001162170453|            4|
|somatic_mutation|     intogen|EFO_0000403|ENSG00000007312| 0.8274613634158176|            3|
|somatic_mutation|     intogen|EFO_0000403|ENSG00000026103|0.30495492429621884|            1|
|somatic_mutation|     intogen|EFO_0000403|ENSG00000028277| 0.7425494385684972|            2|
|somatic_mutation|     intogen|EFO_0000403|ENSG00000046889|0.22381956059563865|            1|
|somatic_mutation|     intogen|EFO_0000403|ENSG00000049618| 0.2423224679487867|            1|
|somatic_mutation|     intogen|EFO_0000403|ENSG00000057657|0.39907695545334443|            2|
|somatic_mutation|     intogen|EFO_0000403|ENSG00000065526| 

In [15]:
# Filter to only IBD associations
evidence_ibd = evidence.filter(
    (col("diseaseId") == "EFO_0003767") 
).persist()

# Filter to only IBD associations
evidence_ibd_by_target = evidence_ibd. \
    groupBy("targetId"). \
    agg(concat_ws(";", collect_list("datasourceId")).alias("datasourceId"),
        concat_ws(";", collect_list("datatypeId")).alias("datatypeId")).persist()

24/03/21 15:56:23 WARN CacheManager: Asked to cache already cached data.


In [16]:
def show_unique_values_and_counts(df: DataFrame, column_name: str):
    """
    Shows unique values and their counts for a specified column in a Spark DataFrame.

    Parameters:
    df (DataFrame): The Spark DataFrame to analyze.
    column_name (str): The name of the column for which to count unique values.
    """
    if column_name not in df.columns:
        raise ValueError(f"Column {column_name} not found in DataFrame")

    unique_values_counts = df.groupBy(column_name).count()
    unique_values_counts.show()

In [18]:
show_unique_values_and_counts(evidence_ibd, "datasourceId")

+------------------+-----+
|      datasourceId|count|
+------------------+-----+
|         europepmc| 5714|
|  uniprot_variants|    5|
|  genomics_england|   15|
|       gene_burden|    3|
|            chembl|  275|
|  expression_atlas|  546|
|          orphanet|    7|
|            sysbio|  205|
|cancer_gene_census|   79|
|               eva|   26|
|uniprot_literature|    9|
|       eva_somatic|    1|
|ot_genetics_portal| 1275|
|              impc|  589|
+------------------+-----+



### Export dataset with known IBD drug targets

### Calculate % of drug targets in first x targets from ranked list