In [28]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import col, explode, split, collect_set, concat_ws, lit, filter, when, concat_ws
from pyspark.sql import DataFrame
import pandas as pd

In [2]:
spark = SparkSession.builder.getOrCreate()

In [11]:
assos_path = "gs://open-targets-data-releases/25.03/output/association_by_datasource_direct/"
assos = spark.read.parquet(assos_path)

In [12]:
assos.printSchema()

root
 |-- datatypeId: string (nullable = true)
 |-- datasourceId: string (nullable = true)
 |-- diseaseId: string (nullable = true)
 |-- targetId: string (nullable = true)
 |-- score: double (nullable = true)
 |-- evidenceCount: long (nullable = true)



In [13]:
assos.show(5)

+----------+------------+-----------+---------------+--------------------+-------------+
|datatypeId|datasourceId|  diseaseId|       targetId|               score|evidenceCount|
+----------+------------+-----------+---------------+--------------------+-------------+
|literature|   europepmc|EFO_0000589|ENSG00000000938|0.012158615952232422|            1|
|literature|   europepmc|EFO_0000589|ENSG00000000971|0.030227670214577826|            3|
|literature|   europepmc|EFO_0000589|ENSG00000001084|0.012158615952232422|            1|
|literature|   europepmc|EFO_0000589|ENSG00000001626|0.037553236325797865|            5|
|literature|   europepmc|EFO_0000589|ENSG00000001630| 0.01823792392834863|            1|
+----------+------------+-----------+---------------+--------------------+-------------+
only showing top 5 rows



In [14]:
reactome_ass = assos.filter(col("datasourceId") == "reactome")

In [16]:
reactome_ass.show(5)

+----------------+------------+-------------+---------------+------------------+-------------+
|      datatypeId|datasourceId|    diseaseId|       targetId|             score|evidenceCount|
+----------------+------------+-------------+---------------+------------------+-------------+
|affected_pathway|    reactome|MONDO_0004992|ENSG00000005339|0.9421515329208274|           10|
|affected_pathway|    reactome|MONDO_0004992|ENSG00000005961|0.9671529481664186|           18|
|affected_pathway|    reactome|MONDO_0004992|ENSG00000006459|0.8897742701710087|            5|
|affected_pathway|    reactome|MONDO_0004992|ENSG00000006530|0.8897742701710087|            5|
|affected_pathway|    reactome|MONDO_0004992|ENSG00000010810|0.9190679877429972|            7|
+----------------+------------+-------------+---------------+------------------+-------------+
only showing top 5 rows



                                                                                

In [17]:
reactome_ass.count()

                                                                                

2478

In [19]:
reactome_dis = reactome_ass.select("diseaseId").dropDuplicates()

In [20]:
reactome_dis.show()



+-------------+
|    diseaseId|
+-------------+
|MONDO_0005090|
|MONDO_0100096|
|MONDO_0009061|
|MONDO_0005129|
|  EFO_0004283|
|MONDO_0009020|
|  EFO_0000182|
|  EFO_0000181|
|MONDO_0009044|
|MONDO_0009026|
|  EFO_0005772|
|MONDO_0008948|
|MONDO_0004992|
|  EFO_0005046|
|MONDO_0004737|
|  EFO_0004799|
|MONDO_0004975|
|  EFO_0005741|
|  EFO_0003060|
|  EFO_0000519|
+-------------+
only showing top 20 rows



                                                                                

In [21]:
reactome_dis.count()



215

                                                                                

In [22]:
disease_path = "gs://open-targets-data-releases/24.09/output/etl/parquet/diseases"
disease = spark.read.parquet(disease_path)

In [25]:
def join_dataframes(initial_df: DataFrame, 
                    second_df: DataFrame, 
                    initial_key_column: str, 
                    second_key_column: str,
                    columns_to_join: list) -> DataFrame:
    """
    Joins two PySpark DataFrames on specified key columns.

    Args:
    initial_df (DataFrame): The initial PySpark DataFrame.
    second_df (DataFrame): The second PySpark DataFrame to join with.
    initial_key_column (str): The key column name in the initial DataFrame.
    second_key_column (str): The key column name in the second DataFrame.
    columns_to_join (list): List of column names from the second DataFrame to include in the join.

    Returns:
    DataFrame: The resulting DataFrame after the join.
    """

    # Selecting specified columns from the second DataFrame, including its key column
    second_df_selected = second_df.select([second_key_column] + columns_to_join)

    second_columns_to_join_with_alias = ["b." + col for col in columns_to_join]

    return initial_df.alias("a")\
        .join(second_df_selected.alias("b"), 
            on = initial_df[initial_key_column] == second_df_selected[second_key_column], 
            how='left')\
        .select("a.*", *second_columns_to_join_with_alias)\
        .persist()

In [26]:
disease_list = ["name", "therapeuticAreas"]

reactome_dis_names = join_dataframes(reactome_dis, disease, "diseaseId", "id", disease_list).persist()

25/06/11 14:33:50 WARN CacheManager: Asked to cache already cached data.


In [27]:
reactome_dis_names.show(5)

                                                                                

+-------------+--------------------+--------------------+
|    diseaseId|                name|    therapeuticAreas|
+-------------+--------------------+--------------------+
|MONDO_0009979|reticular dystrop...|[MONDO_0002025, O...|
|MONDO_0018998|Leber congenital ...|[MONDO_0002025, O...|
|MONDO_0005090|       schizophrenia|[MONDO_0002025, E...|
|  EFO_1000017|autosomal recessi...|      [OTAR_0000018]|
|   HP_0012115|           Hepatitis|       [EFO_0000651]|
+-------------+--------------------+--------------------+
only showing top 5 rows



In [29]:
reactome_dis_names.toPandas().to_csv("disease_list/reactome_dis_names.csv", index=False)

                                                                                