In [12]:
from pyspark.sql import SparkSession, functions as F
import os
import shutil
import glob


In [13]:
spark = SparkSession.builder.appName("gsea-prepare").getOrCreate()

In [14]:
input_path = "/Users/polina/targets-from-pathways/data/association_by_datatype_indirect"
output_dir = "/Users/polina/targets-from-pathways/data/input/auto-input"
tmp_dir = os.path.join(output_dir, "_spark_tmp_export")

# Read parquet dataset
associations_df = spark.read.parquet(input_path)

In [6]:
associations_df.show(5)

+-----------+---------------+----------+--------------------+-------------+
|  diseaseId|       targetId|datatypeId|               score|evidenceCount|
+-----------+---------------+----------+--------------------+-------------+
|EFO_0008625|ENSG00000112715|literature|0.030396539880581056|            1|
|EFO_0008626|ENSG00000004478|literature|0.030396539880581056|            1|
|EFO_0008626|ENSG00000007171|literature|0.012158615952232422|            1|
|EFO_0008626|ENSG00000010610|literature| 0.03951550184475537|            2|
|EFO_0008626|ENSG00000019582|literature|0.012158615952232422|            1|
+-----------+---------------+----------+--------------------+-------------+
only showing top 5 rows


In [15]:
# Build final gene-level TSV: map targetId->approvedSymbol, group, rename, write
from pyspark.sql import SparkSession, functions as F
import os
import glob
import shutil

output_dir = "/Users/polina/targets-from-pathways/data/input/auto-input"
input_associations_path = os.path.join(output_dir, "filtered_associations.tsv")
targets_path = "/Users/polina/targets-from-pathways/data/gene_data.txt"
tmp_dir = os.path.join(output_dir, "_spark_tmp_export_final")

spark = globals().get("spark")
if spark is None:
    spark = SparkSession.builder.appName("gsea-prepare").getOrCreate()
    globals()["spark"] = spark

# Read previously filtered associations
assoc_df = spark.read.csv(input_associations_path, sep="\t", header=True, inferSchema=True)

# Read target (gene) mapping and select only id and approvedSymbol columns
# gene_id corresponds to Ensembl id; gene_name is the approved gene symbol
targets_df = (
    spark.read.csv(targets_path, sep="\t", header=True, inferSchema=True)
    .select(F.col("gene_id").alias("id"), F.col("gene_name").alias("approvedSymbol"))
)

# Join associations with targets on targetId == id
joined_df = assoc_df.join(targets_df, assoc_df.targetId == targets_df.id, "inner")

# Ensure score is numeric before aggregation
joined_df = joined_df.withColumn("score", F.col("score").cast("double"))

# Aggregate by approvedSymbol, take max score per gene, and rename columns
final_df = (
    joined_df.groupBy("approvedSymbol").agg(F.max("score").alias("globalScore"))
    .select(F.col("approvedSymbol").alias("symbol"), F.col("globalScore"))
    .orderBy(F.desc("globalScore"))
)

# Write as a single TSV
os.makedirs(output_dir, exist_ok=True)
if os.path.exists(tmp_dir):
    shutil.rmtree(tmp_dir)

final_df.coalesce(1).write.mode("overwrite").option("sep", "\t").option("header", True).csv(tmp_dir)

part_files = glob.glob(os.path.join(tmp_dir, "part-*"))
if not part_files:
    raise RuntimeError("No part files were written by Spark; export failed.")

final_tsv_path = os.path.join(output_dir, "final_gsea_input.tsv")
if os.path.exists(final_tsv_path):
    os.remove(final_tsv_path)

shutil.move(part_files[0], final_tsv_path)
shutil.rmtree(tmp_dir)

print(f"Wrote TSV: {final_tsv_path}")


Wrote TSV: /Users/polina/targets-from-pathways/data/input/auto-input/final_gsea_input.tsv


In [16]:
# Build final gene-level TSV from target parquet: map id->approvedSymbol, group, rename, write
from pyspark.sql import SparkSession, functions as F
import os
import glob
import shutil

output_dir = "/Users/polina/targets-from-pathways/data/input/auto-input"
input_associations_path = os.path.join(output_dir, "filtered_associations.tsv")
targets_path = "/Users/polina/targets-from-pathways/data/target"
tmp_dir = os.path.join(output_dir, "_spark_tmp_export_final")

spark = globals().get("spark")
if spark is None:
    spark = SparkSession.builder.appName("gsea-prepare").getOrCreate()
    globals()["spark"] = spark

# Read previously filtered associations
assoc_df = spark.read.csv(input_associations_path, sep="\t", header=True, inferSchema=True)

# Read target parquet and auto-detect id and approved symbol columns
raw_targets_df = spark.read.parquet(targets_path)
fields_lower_to_actual = {f.name.lower(): f.name for f in raw_targets_df.schema.fields}

id_candidates = ["id", "targetid", "gene_id", "ensembl_id"]
symbol_candidates = ["approvedsymbol", "gene_name", "approved_symbol", "symbol"]

id_col_actual = next((fields_lower_to_actual[c] for c in id_candidates if c in fields_lower_to_actual), None)
symbol_col_actual = next((fields_lower_to_actual[c] for c in symbol_candidates if c in fields_lower_to_actual), None)

if id_col_actual is None or symbol_col_actual is None:
    available = list(fields_lower_to_actual.values())
    raise RuntimeError(f"Could not find required columns in target parquet. Available: {available}")

targets_df = raw_targets_df.select(
    F.col(id_col_actual).alias("id"),
    F.col(symbol_col_actual).alias("approvedSymbol"),
)

# Join associations with targets on targetId == id
joined_df = assoc_df.join(targets_df, assoc_df.targetId == targets_df.id, "inner")

# Ensure score is numeric before aggregation
joined_df = joined_df.withColumn("score", F.col("score").cast("double"))

# Aggregate by approvedSymbol, take max score per gene, and rename columns
final_df = (
    joined_df.groupBy("approvedSymbol").agg(F.max("score").alias("globalScore"))
    .select(F.col("approvedSymbol").alias("symbol"), F.col("globalScore"))
    .orderBy(F.desc("globalScore"))
)

# Write as a single TSV
os.makedirs(output_dir, exist_ok=True)
if os.path.exists(tmp_dir):
    shutil.rmtree(tmp_dir)

final_df.coalesce(1).write.mode("overwrite").option("sep", "\t").option("header", True).csv(tmp_dir)

part_files = glob.glob(os.path.join(tmp_dir, "part-*"))
if not part_files:
    raise RuntimeError("No part files were written by Spark; export failed.")

final_tsv_path = os.path.join(output_dir, "final_gsea_input.tsv")
if os.path.exists(final_tsv_path):
    os.remove(final_tsv_path)

shutil.move(part_files[0], final_tsv_path)
shutil.rmtree(tmp_dir)

print(f"Wrote TSV: {final_tsv_path}")


Wrote TSV: /Users/polina/targets-from-pathways/data/input/auto-input/final_gsea_input.tsv
