# Autoimmune credible sets

The purpose of this notebook is to extract GWAS credible set information for a set of autoimmune diseases, as well as additional metadata about the studies.

1. **Download datasets from Open Targets Platform**

In [25]:
%%bash
release="25.06"
datasets=("study" "credible_set" "l2g_prediction" "target" "disease")
for dataset in "${datasets[@]}"
    do mkdir -p ../../tmp/"${dataset}"
    # Copy the data from Google Cloud Storage to the local tmp directory
    gsutil -m rsync -r gs://open-targets-data-releases/${release}/output/${dataset} ../tmp/${dataset}
done



both the source and destination. Your crcmod installation isn't using the
module's C extension, so checksumming will run very slowly. If this is your
first rsync since updating gsutil, this rsync can take significantly longer than
usual. For help installing the extension, please see "gsutil help crcmod".

Building synchronization state...
If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "GSUtil:parallel_process_count=1"`. Note that multithreading is still available even if you disable multiprocessing.

Starting synchronization...
If you experience problems with multiprocessing on MacOS, they might be related to https://bugs.python.org/issue33725. You can disable multiprocessing by editing your .boto config or by adding the following flag to your command: `-o "GSUtil:parallel_process_count=1"`. Note that 

2. **Python environment and Spark session**

In [1]:
from pathlib import Path
import pyspark.sql.functions as f
from pyspark.sql import SparkSession

# Starting a Spark session
spark = SparkSession.builder.config("spark.driver.memory", "8g").getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/27 20:45:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3. **Read downloaded datasets**

In [2]:
# Reading datasets
credible_set = spark.read.parquet(
    str(Path.cwd().joinpath("../tmp/credible_set").resolve())
)
l2g_prediction = spark.read.parquet(
    str(Path.cwd().joinpath("../tmp/l2g_prediction").resolve())
)
study = spark.read.parquet(str(Path.cwd().joinpath("../tmp/study").resolve()))
target = spark.read.parquet(str(Path.cwd().joinpath("../tmp/target").resolve()))
disease = spark.read.parquet(str(Path.cwd().joinpath("../tmp/disease").resolve()))


4. **Finding all autoimmune diseases according to EFO**

In [None]:
autoimmune_efo = "EFO_0005140"
autoimmune_diseases = (
    disease.filter(f.col("id") == autoimmune_efo)
    .select(f.explode("descendants").alias("diseaseId"))
    .join(
        disease.select(f.col("id").alias("diseaseId"), "name"),
        on="diseaseId",
        how="left",
    )
)
autoimmune_diseases.show(truncate=False)


+-------------+--------------------------------------------------------------------------------+
|diseaseId    |name                                                                            |
+-------------+--------------------------------------------------------------------------------+
|MONDO_0012500|chilblain lupus 1                                                               |
|MONDO_0010894|maturity-onset diabetes of the young type 3                                     |
|MONDO_0024278|proctocolitis                                                                   |
|EFO_0005626  |pancolitis                                                                      |
|EFO_0008613  |pemphigus vegetans                                                              |
|EFO_0803379  |anti-GAD65 autoimmune neurological syndromes                                    |
|MONDO_0005301|multiple sclerosis                                                              |
|EFO_0008605  |IgG/IgA pemphig

5. **Finding all GWAS studies for autoimmune diseases**


In [7]:
auto_gwas_studies = study.withColumn("diseaseId", f.explode("diseaseIds")).join(
    autoimmune_diseases, on="diseaseId", how="inner"
)
auto_gwas_studies.show(1, vertical=True, truncate=False)


-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------
 diseaseId                          | EFO_0002689                                                                                                          
 studyId                            | GCST004227                                                                                                           
 geneId                             | NULL                                                                                                                 
 projectId                          | GCST                                                                                                                 
 studyType                          | gwas                                                                                                                 
 traitFromSource                    | Obstetric antiphospholipid

6. **Extracting all credible sets in autoimmune studies**


In [None]:
auto_cs = auto_gwas_studies.join(credible_set, on="studyId", how="inner")
auto_cs.count()


7621

7. **Including top L2G gene for each credible set**

In [None]:
auto_cs_gene = auto_cs.join(
    # Top scoring L2G gene for each studyLocusId
    l2g_prediction.groupBy("studyLocusId")
    # Stricter L2G score filters can be applied here
    # .filter(f.col("score") > 0.25)
    .agg(
        f.first("geneId").alias("topGeneId"),
        f.first("score").alias("top_L2G_score"),
    )
    .join(
        target.select(
            f.col("id").alias("topGeneId"),
            f.col("approvedSymbol").alias("topGeneSymbol"),
        ),
        on="topGeneId",
        how="left",
    ),
    on="studyLocusId",
    how="left",
)
auto_cs_gene.printSchema()


root
 |-- studyLocusId: string (nullable = true)
 |-- studyId: string (nullable = true)
 |-- diseaseId: string (nullable = true)
 |-- geneId: string (nullable = true)
 |-- projectId: string (nullable = true)
 |-- studyType: string (nullable = true)
 |-- traitFromSource: string (nullable = true)
 |-- traitFromSourceMappedIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- biosampleFromSourceId: string (nullable = true)
 |-- pubmedId: string (nullable = true)
 |-- publicationTitle: string (nullable = true)
 |-- publicationFirstAuthor: string (nullable = true)
 |-- publicationDate: string (nullable = true)
 |-- publicationJournal: string (nullable = true)
 |-- backgroundTraitFromSourceMappedIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- initialSampleSize: string (nullable = true)
 |-- nCases: integer (nullable = true)
 |-- nControls: integer (nullable = true)
 |-- nSamples: integer (nullable = true)
 |-- cohorts: array (null

8. **Select columns of interest to print/write**

In [46]:
cs_out = auto_cs_gene.select(
    # About the study
    "studyId",
    "publicationJournal",
    "publicationDate",
    "nSamples",
    "ldPopulationStructure",
    "traitFromSource",
    "diseaseId",
    f.col("name").alias("diseaseName"),
    # About the credible set
    "studyLocusId",
    "pValueMantissa",
    "pValueExponent",
    "beta",
    "standardError",
    "fineMappingMethod",
    "hasSumstats",
    f.col("variantId").alias("leadVariant"),
    # About the gene
    "topGeneSymbol",
    "topGeneId",
    "top_L2G_score",
)

cs_out.show(truncate=False)

# This dataframe can be written to different formats including parquet file:
# cs_out.write.parquet(
#     str(Path.cwd().joinpath("../tmp/autoimmune_credible_set_parquet").resolve()),
#     mode="overwrite",
# )
# or csv:
# cs_out.drop("ldPopulationStructure").coalesce(1).write.csv(
#     str(Path.cwd().joinpath("../tmp/autoimmune_credible_set_csv").resolve()),
#     mode="overwrite",
#     header=True,
# )


                                                                                

+-------------------------------+-----------------------------+---------------+--------+-----------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------------------------------+--------------------------------+--------------+--------------+---------------------+-------------+-----------------+-----------+---------------+-------------+---------------+-------------------+
|studyId                        |publicationJournal           |publicationDate|nSamples|ldPopulationStructure                                                              |traitFromSource                                                                                                                                    |diseaseId    |diseaseName                       |studyLocusId                    |pValueMantissa|pValueExponent|bet

25/06/27 23:09:54 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 988624 ms exceeds timeout 120000 ms
25/06/27 23:09:54 WARN SparkContext: Killing executors is not supported by current scheduler.
25/06/27 23:15:17 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$