Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3d0af68
feat: added impc_web_api utils module
francisco-ebi Sep 18, 2025
e755ed4
WIP: gene disease mapper task migration to Airflow
francisco-ebi Sep 15, 2025
77215fb
feat: remove import from local module
francisco-ebi Sep 15, 2025
ec7b0e4
fix: explicitly cast disease_model_avg_norm and disease_model_max_nor…
francisco-ebi Sep 15, 2025
1804c51
feat: replace repartition with coalesce
francisco-ebi Sep 16, 2025
57095d3
Merge pull request #428 from mpi2/399-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
b478984
feat: remove to_camel_case function from impc_gene_diseases_mapper ta…
francisco-ebi Sep 18, 2025
d9797f2
feat: initial impc_batch_query mapper task
francisco-ebi Sep 17, 2025
a4ee451
feat: use phenotype_term_zip_udf function from utils module
francisco-ebi Sep 18, 2025
9d23723
Merge pull request #422 from mpi2/392-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
53ffc4d
feat: initial impc_gene_search_mapper airflow task
francisco-ebi Sep 17, 2025
7f8dda2
feat: use GENE_SUMMARY_MAPPINGS from utils module
francisco-ebi Sep 18, 2025
b7f28f4
feat: set overwrite mode when saving results
francisco-ebi Sep 18, 2025
cd7cb82
Merge pull request #423 from mpi2/403-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
ae47f38
feat: initial impc_idg_mapper airflow task
francisco-ebi Sep 18, 2025
38664ec
Merge pull request #426 from mpi2/408-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
d274972
feat: initial impc_external_links_mapper airflow task
francisco-ebi Sep 18, 2025
21bf0a4
Merge pull request #429 from mpi2/398-migrate-impc_etljobsloadimpc_we…
francisco-ebi Sep 18, 2025
2b84b8e
Migration of the impc datasets metadata mapper to Airflow. The feat/i…
rwilson-ebi Sep 26, 2025
b869dec
Removed some print statements used for testing.
rwilson-ebi Sep 26, 2025
61f1ee8
Modified the output to specify the mode as overwrite.
rwilson-ebi Sep 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 93 additions & 117 deletions impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,100 @@
from impc_etl.jobs.load.impc_web_api import (
ImpcConfig,
PySparkTask,
SparkContext,
SparkSession,
col,
collect_set,
explode_outer,
luigi,
phenotype_term_zip_udf,
import logging
import textwrap
from airflow.sdk import Variable, asset

from impc_etl.utils.airflow import create_input_asset, create_output_asset
from impc_etl.utils.spark import with_spark_session
from impc_etl.utils.impc_web_api import phenotype_term_zip_udf

task_logger = logging.getLogger("airflow.task")
dr_tag = Variable.get("data_release_tag")

ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv")
mp_hp_matches_csv_path_asset = create_input_asset("impc_web_api/mp_hp_matches.csv")
gene_stats_results_json_path_asset = create_input_asset("output/impc_web_api/gene_statistical_results_service_json")

batch_query_data_parquet_asset = create_output_asset("impc_web_api/batch_query_data_parquet")

@asset.multi(
schedule=[ortholog_mapping_report_tsv_path_asset, mp_hp_matches_csv_path_asset, gene_stats_results_json_path_asset],
outlets=[batch_query_data_parquet_asset],
dag_id=f"{dr_tag}_impc_batch_query_mapper",
description=textwrap.dedent(
"""IMPC Web API batch query mapper DAG."""
),
tags=["impc_web_api", "batch query"],
)


class ImpcBatchQueryMapper(PySparkTask):
"""
PySpark Task class to parse GenTar Product report data.
"""

#: Name of the Spark task
name: str = "ImpcBatchQueryMapper"

ortholog_mapping_report_tsv_path = luigi.Parameter()
mp_hp_matches_csv_path = luigi.Parameter()

#: Path of the output directory where the new parquet file will be generated.
output_path: luigi.Parameter = luigi.Parameter()

def requires(self):
return [ImpcGeneStatsResultsMapper()]

def output(self):
"""
Returns the full parquet path as an output for the Luigi Task
(e.g. impc/dr15.2/parquet/product_report_parquet)
"""
return ImpcConfig().get_target(
f"{self.output_path}/impc_web_api/batch_query_data_parquet"
)

def app_options(self):
"""
Generates the options pass to the PySpark job
"""
return [
self.ortholog_mapping_report_tsv_path,
self.mp_hp_matches_csv_path,
self.input()[0].path,
self.output().path,
]

def main(self, sc: SparkContext, *args):
"""
Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job.
"""
spark = SparkSession(sc)

# Parsing app options
ortholog_mapping_report_tsv_path = args[0]
mp_hp_matches_csv_path = args[1]
gene_stats_results_json_path = args[2]
output_path = args[3]

ortholog_mapping_df = spark.read.csv(
ortholog_mapping_report_tsv_path, sep="\t", header=True
)
stats_results = spark.read.json(gene_stats_results_json_path)

ortholog_mapping_df = ortholog_mapping_df.select(
col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"),
col("Human Gene Symbol").alias("humanGeneSymbol"),
col("Hgnc Acc Id").alias("hgncGeneAccessionId"),
).distinct()

stats_results = stats_results.join(
ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer"
)

mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True)
mp_matches_df = mp_matches_df.select(
col("curie_x").alias("id"),
col("curie_y").alias("hp_term_id"),
col("label_y").alias("hp_term_name"),
).distinct()

stats_mp_hp_df = stats_results.select(
"statisticalResultId",
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
"significantPhenotype",
@with_spark_session
def impc_batch_query_mapper():
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode_outer, collect_set, when, struct, lit

spark = SparkSession.builder.getOrCreate()

ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri
mp_hp_matches_csv_path = mp_hp_matches_csv_path_asset.uri
gene_stats_results_json_path = gene_stats_results_json_path_asset.uri
output_path = batch_query_data_parquet_asset.uri

ortholog_mapping_df = spark.read.csv(
ortholog_mapping_report_tsv_path, sep="\t", header=True
)
stats_results = spark.read.json(gene_stats_results_json_path)

ortholog_mapping_df = ortholog_mapping_df.select(
col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"),
col("Human Gene Symbol").alias("humanGeneSymbol"),
col("Hgnc Acc Id").alias("hgncGeneAccessionId"),
).distinct()

stats_results = stats_results.join(
ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer"
)

mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True)
mp_matches_df = mp_matches_df.select(
col("curie_x").alias("id"),
col("curie_y").alias("hp_term_id"),
col("label_y").alias("hp_term_name"),
).distinct()

stats_mp_hp_df = stats_results.select(
"statisticalResultId",
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
"significantPhenotype",
)
for phenotype_list_col in [
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
]:
stats_mp_hp_df = stats_mp_hp_df.withColumn(
phenotype_list_col[:-1], explode_outer(phenotype_list_col)
)
for phenotype_list_col in [
"potentialPhenotypes",
"intermediatePhenotypes",
"topLevelPhenotypes",
]:
stats_mp_hp_df = stats_mp_hp_df.withColumn(
phenotype_list_col[:-1], explode_outer(phenotype_list_col)
)

stats_mp_hp_df = stats_mp_hp_df.join(
mp_matches_df,
(
stats_mp_hp_df = stats_mp_hp_df.join(
mp_matches_df,
(
(col("significantPhenotype.id") == col("id"))
| (col("potentialPhenotype.id") == col("id"))
| (col("intermediatePhenotype.id") == col("id"))
| (col("topLevelPhenotype.id") == col("id"))
),
how="left_outer",
)
stats_mp_hp_df = stats_mp_hp_df.withColumn(
"humanPhenotype",
phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")),
)
stats_mp_hp_df = (
stats_mp_hp_df.groupBy("statisticalResultId")
.agg(collect_set("humanPhenotype").alias("humanPhenotypes"))
.select("statisticalResultId", "humanPhenotypes")
.distinct()
)

stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId")

stats_results.write.parquet(output_path)
),
how="left_outer",
)
stats_mp_hp_df = stats_mp_hp_df.withColumn(
"humanPhenotype",
phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")),
)
stats_mp_hp_df = (
stats_mp_hp_df.groupBy("statisticalResultId")
.agg(collect_set("humanPhenotype").alias("humanPhenotypes"))
.select("statisticalResultId", "humanPhenotypes")
.distinct()
)

stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId")

stats_results.coalesce(100).write.parquet(output_path, mode="overwrite")
Loading