# Generate Data 

To trial the pipeline running we will generate a parquet file for one of the main variant ids. We will chose 1 variant from the credible set and studies that appears in two studies; one from Africa and Europe. 

*Set Up*
- insuide venv with uv install `gentropy`, and install JAVA using `curl -s "https://get.sdkman.io" | bash`

- download open targets [credible set](https://platform-docs.opentargets.org/credible-set) using `gcloud storage cp -r gs://open-targets-data-releases/25.09/output/credible_set . --billing-project=opentargets-hack25cam-306`
    this contains all of the credible sets

- download open targets [study](https://platform-docs.opentargets.org/study) using `gcloud storage cp -r gs://open-targets-data-releases/25.09/output/study/ . --billing-project=opentargets-hack25cam-306`
    this contains information per study, including the ancestory 

### Set Up Spark Session

In [16]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("Exploring OT Datasets")
    .config("spark.driver.memory", "10g")
    .getOrCreate()
)
spark


25/10/22 08:14:41 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [17]:
cs = spark.read.parquet("../data/credible_set/")  # downloaded from google

import pyspark.sql.functions as f

cs_explode_ldset = (
    cs.filter(f.col("studyType") == "gwas")
    .select("studyLocusID", f.explode("ldSet").alias("ldSet"))
    .select("studyLocusID", "ldSet.tagVariantId")
)

cs_explode = (
    cs.filter(cs.studyType == "gwas")
    .filter(cs.finemappingMethod == "SuSiE-inf")
    .select("studyID", "studyLocusID", "region", f.explode("locus").alias("locus"))
    .select(
        "studyID",
        "studylocusID",
        "region",
        "locus.variantID",
        "locus.pValueMantissa",
        "locus.pValueExponent",
        "locus.beta",
        "locus.standardError",
    )
)


                                                                                

### Read in the Study object

In [18]:
# read study dataset
st = spark.read.parquet("../data/study/")
st.show(1, vertical=True)


-RECORD 0--------------------------------------------------
 studyId                            | FINNGEN_R12_AUTOI... 
 geneId                             | NULL                 
 projectId                          | FINNGEN_R12          
 studyType                          | gwas                 
 traitFromSource                    | Autoimmune hypert... 
 traitFromSourceMappedIds           | [EFO_0004237]        
 biosampleFromSourceId              | NULL                 
 pubmedId                           | 36653562             
 publicationTitle                   | NULL                 
 publicationFirstAuthor             | NULL                 
 publicationDate                    | NULL                 
 publicationJournal                 | NULL                 
 backgroundTraitFromSourceMappedIds | NULL                 
 initialSampleSize                  | 500,348 (282,064 ... 
 nCases                             | 2469                 
 nControls                          | 37

In [19]:
# explode studies where populatation structure is 1 (1 ancestery per study), and map those to our credible set studies
st_exploded = (
    st.filter(f.size("ldPopulationStructure") == 1)
    .select(
        "studyID",
        "nSamples",
        f.col("ldPopulationStructure").getItem(0).alias("pop_struct"),
    )
    .select("studyId", "nSamples", "pop_struct.ldPopulation")
)

# now each credible set variant is mapped to the population it came from
df_joined = cs_explode.join(st_exploded, on="studyID", how="inner")

from pyspark.sql.functions import countDistinct, col

pop_counts = (
    df_joined.groupBy("variantID")
    .agg(f.countDistinct("ldPopulation").alias("pop_unique"))
    .filter(col("pop_unique") >= 2)
)

# all variants we can use.... with at least two studies 100% from different ancesteries
pop_counts.show()




+----------------+----------+
|       variantID|pop_unique|
+----------------+----------+
| 17_31129861_T_G|         2|
| 17_44491402_G_T|         2|
| 17_60829296_C_A|         2|
| 3_124620680_G_A|         3|
| 2_219041843_C_T|         2|
| 2_218240267_T_C|         2|
| 2_164222629_T_G|         2|
| 2_180674747_A_G|         2|
| 1_205136853_A_G|         2|
| 10_80193510_C_T|         2|
|  5_40673074_G_T|         2|
|  5_75688444_C_T|         2|
|  12_6181927_G_A|         2|
|  4_17943217_G_A|         2|
|  4_54238287_A_T|         2|
|  4_69107326_A_T|         2|
|  1_87240189_G_A|         2|
|12_111856192_C_G|         2|
| 1_158617176_A_T|         2|
| 1_231395730_T_G|         2|
+----------------+----------+
only showing top 20 rows



                                                                                

## Choose a Variant 

Here we manually pick a variant that has 2 studies, one from Africna deceent and one from European decsent. 

In [None]:
lead_variant = "1_205136853_A_G"
df_joined.filter(col("variantID").isin([lead_variant])).show()




+------------+--------------------+--------------------+---------------+--------------+--------------+--------------------+-------------+--------+------------+
|     studyID|        studylocusID|              region|      variantID|pValueMantissa|pValueExponent|                beta|standardError|nSamples|ldPopulation|
+------------+--------------------+--------------------+---------------+--------------+--------------+--------------------+-------------+--------+------------+
|  GCST004601|5c32208a7bac00b3e...|1:204946589-20539...|1_205136853_A_G|          NULL|          NULL|-0.01586299851098...|         NULL|  172952|         nfe|
|GCST90002363|7ff6961e35a506197...|1:204893222-20540...|1_205136853_A_G|          NULL|          NULL|-0.01613941590756707|         NULL|  545203|         nfe|
|GCST90475508|c1d709c81649b00c0...|1:204969603-20539...|1_205136853_A_G|          NULL|          NULL|-0.01173232428566...|         NULL|  296975|         nfe|
|  GCST004634|190e5f0f742e44550...|1:204

                                                                                

In [None]:
# two of the studies the lead variant is found in
studyids = ["GCST90475449", "GCST90002403"]  # AFR, NFE

df_joined.filter(col("variantID").isin([lead_variant])).filter(
    col("studyID").isin(studyids)
).show(truncate=False)




+------------+--------------------------------+---------------------+---------------+--------------+--------------+---------------------+-------------+--------+------------+
|studyID     |studylocusID                    |region               |variantID      |pValueMantissa|pValueExponent|beta                 |standardError|nSamples|ldPopulation|
+------------+--------------------------------+---------------------+---------------+--------------+--------------+---------------------+-------------+--------+------------+
|GCST90002403|ac816501c9068ee8ff755ad9443a8199|1:204910910-205902271|1_205136853_A_G|NULL          |NULL          |-0.018219761232528487|NULL         |408112  |nfe         |
|GCST90475449|881929267f9bbac5a26d92c7077ce775|1:204946589-205386310|1_205136853_A_G|NULL          |NULL          |0.014180421840136782 |NULL         |114836  |afr         |
+------------+--------------------------------+---------------------+---------------+--------------+--------------+---------------

                                                                                

## Download from EBI

We searched nt he ebi [GWAS website](https://www.ebi.ac.uk/gwas/) and input the study IDs. We clicked the FTP and downloaded the harmonised tsv files into the following directory

`located in data_generation/data/1_205136853_A_G`


## Open and harmonise 

In [None]:
# start spark session via gentropy package
import gentropy as gt

session = gt.Session()
spark = session.spark


25/10/22 08:14:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Generate SumSats

Here we use the gentropy function `GWASCatalogSumstatsPreprocessStep()` to convert the EBI downloaded statistics from the `[study].h.tsv.gz` to the `.snappy.parquet` - the file format needed by gentropy.

In [None]:
# summary statistics from a single study as example
# this converts the TSV into the gentropy format

from gentropy.gwas_catalog_sumstat_preprocess import GWASCatalogSumstatsPreprocessStep
import os

if os.path.exists("../outputs/1_205136853_A_G/AFR"):
    pass
else:
    GWASCatalogSumstatsPreprocessStep(
        session=session,
        raw_sumstats_path=f"../data/{lead_variant}/AFR/GCST90475449.h.tsv.gz",
        out_sumstats_path=f"../outputs/{lead_variant}/AFR",
    )
    GWASCatalogSumstatsPreprocessStep(
        session=session,
        raw_sumstats_path=f"../data/{lead_variant}/NFE/32888494-GCST90002403-EFO_0004305.h.tsv.gz",
        out_sumstats_path=f"../outputs/{lead_variant}/NFE",
    )


In [24]:
# use spark to read the generated parquet files

harmonised_gwas_ss_afr = spark.read.parquet(f"../outputs/{lead_variant}/AFR")
harmonised_gwas_ss_nfe = spark.read.parquet(f"../outputs/{lead_variant}/NFE")


In [25]:
harmonised_gwas_ss_afr.show()


+------------+---------------+----------+---------+--------------+--------------+---------+-------------+-------------------------------+----------+
|     studyId|      variantId|chromosome| position|pValueMantissa|pValueExponent|     beta|standardError|effectAlleleFrequencyFromSource|sampleSize|
+------------+---------------+----------+---------+--------------+--------------+---------+-------------+-------------------------------+----------+
|GCST90475449|  1_6751163_T_A|         1|  6751163|         8.597|            -1| 8.153E-4|     0.004613|                         0.4478|    114836|
|GCST90475449| 1_59180012_A_C|         1| 59180012|         9.078|            -1|-0.001845|      0.01594|                         0.0256|    114836|
|GCST90475449| 1_20804008_G_T|         1| 20804008|          6.04|            -1|  0.09261|       0.1786|                         3.0E-4|    114836|
|GCST90475449|1_105213462_T_C|         1|105213462|         3.741|            -1|  0.03963|      0.04459| 

In [26]:
from pyspark.sql.functions import col

# sanity check lead variant is in AFR callset
harmonised_gwas_ss_afr.filter(col("variantId") == lead_variant).show(truncate=False)


+------------+---------------+----------+---------+--------------+--------------+-------+-------------+-------------------------------+----------+
|studyId     |variantId      |chromosome|position |pValueMantissa|pValueExponent|beta   |standardError|effectAlleleFrequencyFromSource|sampleSize|
+------------+---------------+----------+---------+--------------+--------------+-------+-------------+-------------------------------+----------+
|GCST90475449|1_205136853_A_G|1         |205136853|5.847         |-8            |0.03909|0.007207     |0.1177                         |114836    |
+------------+---------------+----------+---------+--------------+--------------+-------+-------------+-------------------------------+----------+



In [27]:
# sanity check lead variant is in NFE callset
harmonised_gwas_ss_nfe.filter(col("variantId") == lead_variant).show(truncate=False)


+------------+---------------+----------+---------+--------------+--------------+----------+-------------+-------------------------------+----------+
|studyId     |variantId      |chromosome|position |pValueMantissa|pValueExponent|beta      |standardError|effectAlleleFrequencyFromSource|sampleSize|
+------------+---------------+----------+---------+--------------+--------------+----------+-------------+-------------------------------+----------+
|GCST90002403|1_205136853_A_G|1         |205136853|4.6           |-33           |-0.0251221|0.00209728   |0.447654                       |NULL      |
+------------+---------------+----------+---------+--------------+--------------+----------+-------------+-------------------------------+----------+



## Subset the New SumStats files using the Genomic Regions from the Studies in Credible Sets

Filter the 

In [28]:
# filter the original df to get the variants from the chosen studies
lv_df = df_joined.filter(col("variantID").isin([lead_variant])).filter(
    col("studyID").isin(studyids)
)
lv_df.show()
lv_df.select(col("region"), col("studyID")).distinct().show()


                                                                                

+------------+--------------------+--------------------+---------------+--------------+--------------+--------------------+-------------+--------+------------+
|     studyID|        studylocusID|              region|      variantID|pValueMantissa|pValueExponent|                beta|standardError|nSamples|ldPopulation|
+------------+--------------------+--------------------+---------------+--------------+--------------+--------------------+-------------+--------+------------+
|GCST90002403|ac816501c9068ee8f...|1:204910910-20590...|1_205136853_A_G|          NULL|          NULL|-0.01821976123252...|         NULL|  408112|         nfe|
|GCST90475449|881929267f9bbac5a...|1:204946589-20538...|1_205136853_A_G|          NULL|          NULL|0.014180421840136782|         NULL|  114836|         afr|
+------------+--------------------+--------------------+---------------+--------------+--------------+--------------------+-------------+--------+------------+





+--------------------+------------+
|              region|     studyID|
+--------------------+------------+
|1:204946589-20538...|GCST90475449|
|1:204910910-20590...|GCST90002403|
+--------------------+------------+



                                                                                

### Filter by Genomic Region

In [29]:
# region = 1:204910910-205902271
harmonised_gwas_ss_nfe.show()


+------------+--------------------+----------+--------+--------------+--------------+-----------+-------------+-------------------------------+----------+
|     studyId|           variantId|chromosome|position|pValueMantissa|pValueExponent|       beta|standardError|effectAlleleFrequencyFromSource|sampleSize|
+------------+--------------------+----------+--------+--------------+--------------+-----------+-------------+-------------------------------+----------+
|GCST90002403|      1_55409076_T_A|         1|55409076|           2.7|            -1| 0.00709597|   0.00641397|                       0.028805|      NULL|
|GCST90002403|      1_77398636_A_G|         1|77398636|           4.6|            -4|  0.0142114|   0.00405495|                       0.071674|      NULL|
|GCST90002403|      1_75181590_C_T|         1|75181590|           9.0|            -1|-0.00470341|     0.037002|                        8.14E-4|      NULL|
|GCST90002403|      1_15217870_A_C|         1|15217870|           3.4|

In [None]:
region_chr = 1
region_start = 204_910_910
region_end = 205_902_271


In [None]:
subset_region_afr = harmonised_gwas_ss_afr.filter(
    (f.col("chromosome") == region_chr)
    & (f.col("position") >= region_start)
    & (f.col("position") <= region_end)
)

subset_region_nfe = harmonised_gwas_ss_nfe.filter(
    (f.col("chromosome") == region_chr)
    & (f.col("position") >= region_start)
    & (f.col("position") <= region_end)
)


In [None]:
# concat
subset_region_combined = subset_region_nfe.union(subset_region_afr)
subset_region_with_pop = subset_region_combined.join(
    lv_df.select("studyID", "ldPopulation"), on="studyID", how="left"
)
subset_region_with_pop.show()




+------------+---------------+----------+---------+--------------+--------------+-----------+-------------+-------------------------------+----------+------------+
|     studyId|      variantId|chromosome| position|pValueMantissa|pValueExponent|       beta|standardError|effectAlleleFrequencyFromSource|sampleSize|ldPopulation|
+------------+---------------+----------+---------+--------------+--------------+-----------+-------------+-------------------------------+----------+------------+
|GCST90002403|1_205532052_C_A|         1|205532052|           5.0|            -1|  -0.101327|     0.148539|                        1.21E-4|      NULL|         nfe|
|GCST90002403|1_205675412_G_C|         1|205675412|           9.1|            -1|-0.00916497|    0.0770102|                        3.26E-4|      NULL|         nfe|
|GCST90002403|1_205313820_C_T|         1|205313820|           6.0|            -2|  0.0170392|   0.00905335|                        0.01398|      NULL|         nfe|
|GCST90002403|1_

                                                                                

## output data 

In [None]:
# check there are signfiicant values, we need some pValueExponent < -3
subset_region_with_pop.filter(subset_region_with_pop.pValueExponent < -3).count()


                                                                                

950

In [None]:
subset_region_with_pop.write.parquet(
    f"../outputs/{lead_variant}_set.parquet", mode="overwrite"
)


                                                                                