Prepare gene sets from facets obtained from gs://open-targets-pre-data-releases/25.06/view/search_facet_target/

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/27 10:54:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/27 10:54:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Prepare gmt files from facets

In [5]:
facets = spark.read.parquet("/Users/polina/Pathwaganda/data/search_facet_target")

In [4]:
facets.select("category").distinct().show(truncate=False)

+-----------------------------+
|category                     |
+-----------------------------+
|Approved Name                |
|Approved Symbol              |
|Target ID                    |
|GO:CC                        |
|GO:MF                        |
|GO:BP                        |
|Subcellular Location         |
|Tractability PROTAC          |
|Tractability Other Modalities|
|Tractability Antibody        |
|Reactome                     |
|Tractability Small Molecule  |
|ChEMBL Target Class          |
+-----------------------------+



                                                                                

In [7]:
facets.filter(col("category") == "Subcellular Location").show()

+--------------------+--------------------+--------------------+------------+
|               label|            category|           entityIds|datasourceId|
+--------------------+--------------------+--------------------+------------+
|            Acrosome|Subcellular Location|[ENSG00000176988,...|        NULL|
|     Actin filaments|Subcellular Location|[ENSG00000118407,...|     SL-0090|
|           Aggresome|Subcellular Location|[ENSG00000004975,...|        NULL|
|             Annulus|Subcellular Location|[ENSG00000130363,...|        NULL|
|Apical cell membrane|Subcellular Location|[ENSG00000086548,...|     SL-0015|
|Apicolateral cell...|Subcellular Location|[ENSG00000137860,...|     SL-0017|
|        Autolysosome|Subcellular Location|[ENSG00000087086,...|     SL-0535|
|Autolysosome memb...|Subcellular Location|   [ENSG00000225190]|     SL-0536|
|          Basal body|Subcellular Location|[ENSG00000248144,...|        NULL|
| Basal cell membrane|Subcellular Location|[ENSG00000134538,...|

In [11]:
from pyspark.sql import functions as F

def export_gmt_files(spark_df, categories, output_dir, target_parquet):
    # Read target parquet and select required columns
    target_df = (
        spark.read.parquet(target_parquet)
        .select(F.col("id").alias("geneId"), "approvedSymbol")
    )

    # Add Term column
    df = spark_df.withColumn(
        "Term", 
        F.concat_ws("", F.col("label"), F.lit("{"), F.col("datasourceId"), F.lit("}"))
    )

    for cat in categories:
        # Filter by category
        df_cat = df.filter(F.col("category") == cat)

        # Explode entityIds
        df_cat = df_cat.withColumn("geneId", F.explode("entityIds"))

        # Join with target to bring approvedSymbol
        df_cat = df_cat.join(target_df, on="geneId", how="inner")

        # Group by Term and collect symbols
        df_grouped = (df_cat.groupBy("Term")
                             .agg(F.collect_list("approvedSymbol").alias("symbols")))

        # Convert Spark -> Pandas
        pdf = df_grouped.toPandas()

        # Format into GMT lines using approvedSymbol
        pdf["gmt_line"] = pdf.apply(
            lambda row: row["Term"] + "\t" + "\t".join(row["symbols"]),
            axis=1
        )

        # Save as a single .gmt file
        output_path = f"{output_dir}/{cat}.gmt"
        pdf["gmt_line"].to_csv(output_path, index=False, header=False)


In [12]:
categories = ["Reactome", "GO:BP", "GO:MF", "GO:CC", "Subcellular Location", "ChEMBL Target Class"]
# categories = ["Reactome"]
export_gmt_files(facets, categories, "../data/gmt/from_facets", "/Users/polina/genetics_gsea/data/target")

# Propagate for descendants

## Reactome

In [18]:
reactome = spark.read.parquet("/Users/polina/genetics_gsea/data/reactome")

In [None]:
reactome.show(5)

+-------------+--------------------+--------------------+-----------+--------+---------------+--------------------+
|           id|               label|           ancestors|descendants|children|        parents|                path|
+-------------+--------------------+--------------------+-----------+--------+---------------+--------------------+
| R-HSA-164843|2-LTR circle form...|[R-HSA-162594, R-...|         []|      []| [R-HSA-162592]|[[R-HSA-1643685, ...|
|R-HSA-9909438|3-Methylcrotonyl-...|[R-HSA-9865118, R...|         []|      []|[R-HSA-9865118]|[[R-HSA-1643685, ...|
|R-HSA-9916722|3-hydroxyisobutyr...|[R-HSA-9865118, R...|         []|      []|[R-HSA-9865118]|[[R-HSA-1643685, ...|
|R-HSA-9914274|3-methylglutaconi...|[R-HSA-9865118, R...|         []|      []|[R-HSA-9865118]|[[R-HSA-1643685, ...|
|  R-HSA-73843|5-Phosphoribose 1...|[R-HSA-1430728, R...|         []|      []|  [R-HSA-71336]|[[R-HSA-1430728, ...|
+-------------+--------------------+--------------------+-----------+---

25/08/26 20:00:11 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1045410 ms exceeds timeout 120000 ms
25/08/26 20:00:11 WARN SparkContext: Killing executors is not supported by current scheduler.
25/08/26 20:10:04 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$