From fd83da2f3a12f494c5fcb1a9825166a523c9c768 Mon Sep 17 00:00:00 2001 From: Robert Wilson Date: Mon, 10 Nov 2025 16:17:04 +0000 Subject: [PATCH] Modified the KG mouse allele mapper for Airflow. --- .../jobs/load/impc_kg/mouse_allele_mapper.py | 163 ++++++++---------- 1 file changed, 76 insertions(+), 87 deletions(-) diff --git a/impc_etl/jobs/load/impc_kg/mouse_allele_mapper.py b/impc_etl/jobs/load/impc_kg/mouse_allele_mapper.py index 3a965c67..342301a2 100644 --- a/impc_etl/jobs/load/impc_kg/mouse_allele_mapper.py +++ b/impc_etl/jobs/load/impc_kg/mouse_allele_mapper.py @@ -1,97 +1,86 @@ -import luigi -from impc_etl.jobs.extract.allele_ref_extractor import ExtractAlleleRef -from impc_etl.jobs.load.impc_bulk_api.impc_api_mapper import to_camel_case -from luigi.contrib.spark import PySparkTask -from pyspark import SparkContext -from pyspark.sql import SparkSession +""" +Module to generate the mouse allele data as JSON for the KG. +""" +import logging +import textwrap -from impc_etl.jobs.load.impc_kg.impc_kg_helper import add_unique_id -from impc_etl.workflow.config import ImpcConfig +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session -class ImpcKgMouseAlleleMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") - #: Name of the Spark task - name: str = "ImpcKgMouseAlleleMapper" +allele_ref_parquet_path_asset = create_input_asset("output/allele_ref_parquet") - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() +mouse_allele_output_asset = create_output_asset("/impc_kg/mouse_allele_json") - def requires(self): - return [ExtractAlleleRef()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target(f"{self.output_path}/impc_kg/mouse_allele_json") - - def app_options(self): +@asset.multi( + schedule=[allele_ref_parquet_path_asset], + outlets=[mouse_allele_output_asset], + dag_id=f"{dr_tag}_impc_kg_mouse_allele_mapper", + description=textwrap.dedent( """ - Generates the options pass to the PySpark job + PySpark task to create the mouse allele Knowledge Graph JSON files + from the MGI reference data. """ - return [ - self.input()[0].path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - allele_ref_parquet_path = args[0] - output_path = args[1] - - allele_ref_df = spark.read.parquet(allele_ref_parquet_path) - - allele_ref_df = add_unique_id( - allele_ref_df, - "mouse_gene", - ["mgi_marker_acc_id"], - ) - - allele_ref_df = add_unique_id( - allele_ref_df, - "mouse_allele_id", - ["mgi_allele_acc_id"], - ) - - mouse_allele_col_map = { - "allele_name": "name", - "allele_symbol": "symbol", - "mgi_allele_acc_id": "mgiAlleleAccessionId", - } - - output_cols = [ - "mouse_allele_id", - "allele_attribute", - "allele_name", - "allele_symbol", - "mgi_allele_acc_id", - "type", - "synonyms", - "ensembl_acc_id", - "mouse_gene", - "db_name", - ] - output_df = allele_ref_df.select(*output_cols).distinct() - for col_name in output_df.columns: - output_df = output_df.withColumnRenamed( - col_name, - ( - to_camel_case(col_name) - if col_name not in mouse_allele_col_map - else to_camel_case(mouse_allele_col_map[col_name]) - ), - ) - output_df.distinct().coalesce(1).write.json( - output_path, mode="overwrite", compression="gzip" + ), + tags=["impc_kg"], +) +@with_spark_session +def impc_kg_mouse_allele_mapper(): + + from impc_etl.jobs.load.impc_web_api.impc_web_api_helper import to_camel_case + from impc_etl.jobs.load.impc_kg.impc_kg_helper import add_unique_id + + from pyspark.sql import SparkSession + + spark = SparkSession.builder.getOrCreate() + + allele_ref_df = spark.read.parquet(allele_ref_parquet_path_asset.uri) + + allele_ref_df = add_unique_id( + allele_ref_df, + "mouse_gene", + ["mgi_marker_acc_id"], + ) + + allele_ref_df = add_unique_id( + allele_ref_df, + "mouse_allele_id", + ["mgi_allele_acc_id"], + ) + + mouse_allele_col_map = { + "allele_name": "name", + "allele_symbol": "symbol", + "mgi_allele_acc_id": "mgiAlleleAccessionId", + } + + output_cols = [ + "mouse_allele_id", + "allele_attribute", + "allele_name", + "allele_symbol", + "mgi_allele_acc_id", + "type", + "synonyms", + "ensembl_acc_id", + "mouse_gene", + "db_name", + ] + output_df = allele_ref_df.select(*output_cols).distinct() + for col_name in output_df.columns: + output_df = output_df.withColumnRenamed( + col_name, + ( + to_camel_case(col_name) + if col_name not in mouse_allele_col_map + else to_camel_case(mouse_allele_col_map[col_name]) + ), ) + output_df.distinct().coalesce(1).write.json( + mouse_allele_output_asset.uri, mode="overwrite", compression="gzip" + )