From 3d0af689de709b5f3978ee0073d91af8a77b3a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 13:09:55 +0100 Subject: [PATCH 01/16] feat: added impc_web_api utils module --- impc_etl/utils/impc_web_api.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 impc_etl/utils/impc_web_api.py diff --git a/impc_etl/utils/impc_web_api.py b/impc_etl/utils/impc_web_api.py new file mode 100644 index 00000000..a5522a35 --- /dev/null +++ b/impc_etl/utils/impc_web_api.py @@ -0,0 +1,28 @@ + +GENE_SUMMARY_MAPPINGS = { + "mgi_accession_id": "mgiGeneAccessionId", + "marker_symbol": "geneSymbol", + "marker_name": "geneName", + "marker_synonym": "synonyms", + "significant_top_level_mp_terms": "significantTopLevelPhenotypes", + "not_significant_top_level_mp_terms": "notSignificantTopLevelPhenotypes", + "embryo_data_available": "hasEmbryoImagingData", + "human_gene_symbol": "human_gene_symbols", + "human_symbol_synonym": "human_symbol_synonyms", + "production_centre": "production_centres", + "phenotyping_centre": "phenotyping_centres", + "allele_name": "allele_names", + "ensembl_gene_id": "ensembl_gene_ids", +} + +def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) + +def phenotype_term_zip_udf(x, y): + from pyspark.sql.functions import lit, struct, when + return when(x.isNotNull(), struct(x.alias("id"), y.alias("name"))).otherwise( + lit(None) + ) \ No newline at end of file From e755ed4eedd13d6c2512d8d5cef5f5e5f218601b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Mon, 15 Sep 2025 12:01:54 +0100 Subject: [PATCH 02/16] WIP: gene disease mapper task migration to Airflow --- .../impc_web_api/impc_gene_diseases_mapper.py | 217 ++++++++---------- 1 file changed, 98 insertions(+), 119 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index 888b0e28..a7b0d544 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -1,130 +1,109 @@ from impc_etl.jobs.load.impc_web_api import ( - BooleanType, - DoubleType, - ImpcConfig, - IntegerType, - PySparkTask, - SparkContext, - SparkSession, Window, - col, - desc, - luigi, - row_number, - to_camel_case, ) - -class ImpcGeneDiseasesMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcGeneDiseasesMapper" - - #: Path to the CSV gene disease association report - disease_model_summary_csv_path = luigi.Parameter() - - #: Path to the CSV gene disease association report - mouse_model_phenodigm_csv_path = luigi.Parameter() - - #: Path to the CSV gene disease association report - disease_phenodigm_csv_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/gene_diseases_service_json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.disease_model_summary_csv_path, - self.mouse_model_phenodigm_csv_path, - self.disease_phenodigm_csv_path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - disease_model_summary_csv_path = args[0] - mouse_model_phenodigm_csv_path = args[1] # model_id, model_phenotypes - disease_phenodigm_csv_path = args[2] # disease_id, disease_phenotypes - output_path = args[3] - - disease_df = spark.read.csv(disease_model_summary_csv_path, header=True).drop( - "disease_phenotypes", "model_phenotypes" - ) - mouse_model_df = spark.read.csv( - mouse_model_phenodigm_csv_path, header=True - ).select("model_id", "model_phenotypes") - disease_phenodigm_df = spark.read.csv( - disease_phenodigm_csv_path, header=True - ).select("disease_id", "disease_phenotypes") - - disease_df = disease_df.withColumn( - "phenodigm_score", - (col("disease_model_avg_norm") + col("disease_model_max_norm")) / 2, - ) - - disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") - disease_df = disease_df.join(mouse_model_df, "model_id", "left_outer") - - window_spec = Window.partitionBy("disease_id", "marker_id").orderBy( - col("phenodigm_score").desc() - ) - - max_disease_df = disease_df.withColumn( - "row_number", row_number().over(window_spec) - ) - - max_disease_df = max_disease_df.withColumn( - "isMaxPhenodigmScore", col("row_number") == 1 - ).drop("row_number") - +import logging +import textwrap +from airflow.sdk import Variable, asset + +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +disease_model_summary_csv_path_asset = create_input_asset("impc_web_api/disease_model_summary.csv") +mouse_model_phenodigm_csv_path_asset = create_input_asset("impc_web_api/mouse_model_phenodigm.csv") +disease_phenodigm_csv_path_asset = create_input_asset("impc_web_api/disease_phenodigm.csv") +gene_diseases_service_json_asset = create_output_asset("impc_web_api/gene_diseases_service_json") + +@asset.multi( + schedule=[disease_model_summary_csv_path_asset, mouse_model_phenodigm_csv_path_asset, disease_phenodigm_csv_path_asset], + outlets=[gene_diseases_service_json_asset], + dag_id=f"{dr_tag}_impc_gene_diseases_mapper", + description=textwrap.dedent( + """IMPC Web API gene diseases mapper DAG.""" + ), + tags=["impc_web_api", "diseases"], +) +@with_spark_session +def impc_gene_diseases_mapper(): + from pyspark.sql import SparkSession, Window + from pyspark.sql.types import BooleanType, DoubleType, IntegerType + from pyspark.sql.functions import col, row_number + + def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) + + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + disease_model_summary_csv_path = disease_model_summary_csv_path_asset.uri + mouse_model_phenodigm_csv_path = mouse_model_phenodigm_csv_path_asset.uri # model_id, model_phenotypes + disease_phenodigm_csv_path = disease_phenodigm_csv_path_asset.uri # disease_id, disease_phenotypes + output_path = gene_diseases_service_json_asset.uri + + disease_df = spark.read.csv(disease_model_summary_csv_path, header=True).drop( + "disease_phenotypes", "model_phenotypes" + ) + mouse_model_df = spark.read.csv( + mouse_model_phenodigm_csv_path, header=True + ).select("model_id", "model_phenotypes") + disease_phenodigm_df = spark.read.csv( + disease_phenodigm_csv_path, header=True + ).select("disease_id", "disease_phenotypes") + + disease_df = disease_df.withColumn( + "phenodigm_score", + (col("disease_model_avg_norm") + col("disease_model_max_norm")) / 2, + ) + + disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") + disease_df = disease_df.join(mouse_model_df, "model_id", "left_outer") + + window_spec = Window.partitionBy("disease_id", "marker_id").orderBy( + col("phenodigm_score").desc() + ) + + max_disease_df = disease_df.withColumn( + "row_number", row_number().over(window_spec) + ) + + max_disease_df = max_disease_df.withColumn( + "isMaxPhenodigmScore", col("row_number") == 1 + ).drop("row_number") + + max_disease_df = max_disease_df.withColumnRenamed( + "marker_id", "mgiGeneAccessionId" + ) + + for col_name in max_disease_df.columns: max_disease_df = max_disease_df.withColumnRenamed( - "marker_id", "mgiGeneAccessionId" + col_name, to_camel_case(col_name) ) - for col_name in max_disease_df.columns: - max_disease_df = max_disease_df.withColumnRenamed( - col_name, to_camel_case(col_name) - ) - - double_cols = [ - "diseaseModelAvgNorm", - "diseaseModelAvgRaw", - "diseaseModelMaxRaw", - "diseaseModelMaxNorm", - ] - - for col_name in double_cols: - max_disease_df = max_disease_df.withColumn( - col_name, col(col_name).astype(DoubleType()) - ) + double_cols = [ + "diseaseModelAvgNorm", + "diseaseModelAvgRaw", + "diseaseModelMaxRaw", + "diseaseModelMaxNorm", + ] + for col_name in double_cols: max_disease_df = max_disease_df.withColumn( - "markerNumModels", col("markerNumModels").astype(IntegerType()) - ) - max_disease_df = max_disease_df.withColumn( - "associationCurated", col("associationCurated").astype(BooleanType()) + col_name, col(col_name).astype(DoubleType()) ) - max_disease_df.repartition(500).write.option("ignoreNullFields", "false").json( - output_path - ) \ No newline at end of file + max_disease_df = max_disease_df.withColumn( + "markerNumModels", col("markerNumModels").astype(IntegerType()) + ) + max_disease_df = max_disease_df.withColumn( + "associationCurated", col("associationCurated").astype(BooleanType()) + ) + + max_disease_df.repartition(500).write.option("ignoreNullFields", "false").json( + output_path + ) \ No newline at end of file From 77215fb199b7f174a28ba9a972bd16a1668edb6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Mon, 15 Sep 2025 13:15:58 +0100 Subject: [PATCH 03/16] feat: remove import from local module --- impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index a7b0d544..ff1cd3a8 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -1,7 +1,3 @@ -from impc_etl.jobs.load.impc_web_api import ( - Window, -) - import logging import textwrap from airflow.sdk import Variable, asset From ec7b0e452b232860bf94f96a5642bd1003b666e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Mon, 15 Sep 2025 15:12:13 +0100 Subject: [PATCH 04/16] fix: explicitly cast disease_model_avg_norm and disease_model_max_norm columns as Double --- impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index ff1cd3a8..94676915 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -54,7 +54,7 @@ def to_camel_case(snake_str): disease_df = disease_df.withColumn( "phenodigm_score", - (col("disease_model_avg_norm") + col("disease_model_max_norm")) / 2, + (col("disease_model_avg_norm").cast(DoubleType()) + col("disease_model_max_norm").cast(DoubleType())) / 2, ) disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") From 1804c51ceba5f594a67147d078d09b7ef7540dfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Tue, 16 Sep 2025 12:22:42 +0100 Subject: [PATCH 05/16] feat: replace repartition with coalesce --- impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index 94676915..c286c61a 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -100,6 +100,6 @@ def to_camel_case(snake_str): "associationCurated", col("associationCurated").astype(BooleanType()) ) - max_disease_df.repartition(500).write.option("ignoreNullFields", "false").json( + max_disease_df.coalesce(100).write.option("ignoreNullFields", "false").json( output_path ) \ No newline at end of file From b47898469da26a178439aecfcceb62f2587cf7d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 14:45:32 +0100 Subject: [PATCH 06/16] feat: remove to_camel_case function from impc_gene_diseases_mapper task, use shared one --- .../jobs/load/impc_web_api/impc_gene_diseases_mapper.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index c286c61a..87dd658c 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -4,6 +4,7 @@ from impc_etl.utils.airflow import create_input_asset, create_output_asset from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import to_camel_case task_logger = logging.getLogger("airflow.task") dr_tag = Variable.get("data_release_tag") @@ -28,12 +29,6 @@ def impc_gene_diseases_mapper(): from pyspark.sql.types import BooleanType, DoubleType, IntegerType from pyspark.sql.functions import col, row_number - def to_camel_case(snake_str): - components = snake_str.split("_") - # We capitalize the first letter of each component except the first one - # with the 'title' method and join them together. - return components[0] + "".join(x.title() for x in components[1:]) - spark = SparkSession.builder.getOrCreate() # Parsing app options From d9797f23ee34aefd3384212395745f5cb5e26f34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Wed, 17 Sep 2025 09:59:30 +0100 Subject: [PATCH 07/16] feat: initial impc_batch_query mapper task --- .../impc_web_api/impc_batch_query_mapper.py | 210 ++++++++---------- 1 file changed, 95 insertions(+), 115 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py index 409727bb..fa3e68f7 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py @@ -1,124 +1,104 @@ -from impc_etl.jobs.load.impc_web_api import ( - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - col, - collect_set, - explode_outer, - luigi, - phenotype_term_zip_udf, +import logging +import textwrap +from airflow.sdk import Variable, asset + +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +mp_hp_matches_csv_path_asset = create_input_asset("impc_web_api/mp_hp_matches.csv") +gene_stats_results_json_path_asset = create_input_asset("output/impc_web_api/gene_statistical_results_service_json") + +batch_query_data_parquet_asset = create_output_asset("impc_web_api/batch_query_data_parquet") + +@asset.multi( + schedule=[ortholog_mapping_report_tsv_path_asset, mp_hp_matches_csv_path_asset, gene_stats_results_json_path_asset], + outlets=[batch_query_data_parquet_asset], + dag_id=f"{dr_tag}_impc_batch_query_mapper", + description=textwrap.dedent( + """IMPC Web API batch query mapper DAG.""" + ), + tags=["impc_web_api", "batch query"], ) - - -class ImpcBatchQueryMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcBatchQueryMapper" - - ortholog_mapping_report_tsv_path = luigi.Parameter() - mp_hp_matches_csv_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [ImpcGeneStatsResultsMapper()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/batch_query_data_parquet" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.ortholog_mapping_report_tsv_path, - self.mp_hp_matches_csv_path, - self.input()[0].path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - ortholog_mapping_report_tsv_path = args[0] - mp_hp_matches_csv_path = args[1] - gene_stats_results_json_path = args[2] - output_path = args[3] - - ortholog_mapping_df = spark.read.csv( - ortholog_mapping_report_tsv_path, sep="\t", header=True +@with_spark_session +def impc_batch_query_mapper(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, explode_outer, collect_set, when, struct, lit + + def phenotype_term_zip_udf(x, y): + return when(x.isNotNull(), struct(x.alias("id"), y.alias("name"))).otherwise( + lit(None) ) - stats_results = spark.read.json(gene_stats_results_json_path) - ortholog_mapping_df = ortholog_mapping_df.select( - col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"), - col("Human Gene Symbol").alias("humanGeneSymbol"), - col("Hgnc Acc Id").alias("hgncGeneAccessionId"), - ).distinct() - - stats_results = stats_results.join( - ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer" + spark = SparkSession.builder.getOrCreate() + + ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri + mp_hp_matches_csv_path = mp_hp_matches_csv_path_asset.uri + gene_stats_results_json_path = gene_stats_results_json_path_asset.uri + output_path = batch_query_data_parquet_asset.uri + + ortholog_mapping_df = spark.read.csv( + ortholog_mapping_report_tsv_path, sep="\t", header=True + ) + stats_results = spark.read.json(gene_stats_results_json_path) + + ortholog_mapping_df = ortholog_mapping_df.select( + col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"), + col("Human Gene Symbol").alias("humanGeneSymbol"), + col("Hgnc Acc Id").alias("hgncGeneAccessionId"), + ).distinct() + + stats_results = stats_results.join( + ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer" + ) + + mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True) + mp_matches_df = mp_matches_df.select( + col("curie_x").alias("id"), + col("curie_y").alias("hp_term_id"), + col("label_y").alias("hp_term_name"), + ).distinct() + + stats_mp_hp_df = stats_results.select( + "statisticalResultId", + "potentialPhenotypes", + "intermediatePhenotypes", + "topLevelPhenotypes", + "significantPhenotype", + ) + for phenotype_list_col in [ + "potentialPhenotypes", + "intermediatePhenotypes", + "topLevelPhenotypes", + ]: + stats_mp_hp_df = stats_mp_hp_df.withColumn( + phenotype_list_col[:-1], explode_outer(phenotype_list_col) ) - mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True) - mp_matches_df = mp_matches_df.select( - col("curie_x").alias("id"), - col("curie_y").alias("hp_term_id"), - col("label_y").alias("hp_term_name"), - ).distinct() - - stats_mp_hp_df = stats_results.select( - "statisticalResultId", - "potentialPhenotypes", - "intermediatePhenotypes", - "topLevelPhenotypes", - "significantPhenotype", - ) - for phenotype_list_col in [ - "potentialPhenotypes", - "intermediatePhenotypes", - "topLevelPhenotypes", - ]: - stats_mp_hp_df = stats_mp_hp_df.withColumn( - phenotype_list_col[:-1], explode_outer(phenotype_list_col) - ) - - stats_mp_hp_df = stats_mp_hp_df.join( - mp_matches_df, - ( + stats_mp_hp_df = stats_mp_hp_df.join( + mp_matches_df, + ( (col("significantPhenotype.id") == col("id")) | (col("potentialPhenotype.id") == col("id")) | (col("intermediatePhenotype.id") == col("id")) | (col("topLevelPhenotype.id") == col("id")) - ), - how="left_outer", - ) - stats_mp_hp_df = stats_mp_hp_df.withColumn( - "humanPhenotype", - phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")), - ) - stats_mp_hp_df = ( - stats_mp_hp_df.groupBy("statisticalResultId") - .agg(collect_set("humanPhenotype").alias("humanPhenotypes")) - .select("statisticalResultId", "humanPhenotypes") - .distinct() - ) - - stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId") - - stats_results.write.parquet(output_path) \ No newline at end of file + ), + how="left_outer", + ) + stats_mp_hp_df = stats_mp_hp_df.withColumn( + "humanPhenotype", + phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")), + ) + stats_mp_hp_df = ( + stats_mp_hp_df.groupBy("statisticalResultId") + .agg(collect_set("humanPhenotype").alias("humanPhenotypes")) + .select("statisticalResultId", "humanPhenotypes") + .distinct() + ) + + stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId") + + stats_results.coalesce(100).write.parquet(output_path, mode="overwrite") From a4ee4519fc2f894b17950c93af50c74ff452d90a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 14:47:32 +0100 Subject: [PATCH 08/16] feat: use phenotype_term_zip_udf function from utils module --- impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py index fa3e68f7..f579a40e 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py @@ -4,6 +4,7 @@ from impc_etl.utils.airflow import create_input_asset, create_output_asset from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import phenotype_term_zip_udf task_logger = logging.getLogger("airflow.task") dr_tag = Variable.get("data_release_tag") @@ -28,11 +29,6 @@ def impc_batch_query_mapper(): from pyspark.sql import SparkSession from pyspark.sql.functions import col, explode_outer, collect_set, when, struct, lit - def phenotype_term_zip_udf(x, y): - return when(x.isNotNull(), struct(x.alias("id"), y.alias("name"))).otherwise( - lit(None) - ) - spark = SparkSession.builder.getOrCreate() ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri From 53ffc4d65f8ec4ae6a068508d5dec837e434b7bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Wed, 17 Sep 2025 12:54:55 +0100 Subject: [PATCH 09/16] feat: initial impc_gene_search_mapper airflow task --- .../impc_web_api/impc_gene_search_mapper.py | 134 +++++++++--------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py index b6ac0fde..cd162f6d 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py @@ -1,78 +1,78 @@ -from impc_etl.jobs.load.impc_web_api import ( - GENE_SUMMARY_MAPPINGS, - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - luigi, - to_camel_case, -) - +import logging +import textwrap +from airflow.sdk import Variable, asset -class ImpcGeneSearchMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") - #: Name of the Spark task - name: str = "Impc_Gene_Search_Mapper" +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") +gene_search_service_json_asset = create_output_asset("impc_web_api/gene_search_service_json") - def requires(self): - return GeneLoader() +@asset.multi( + schedule=[gene_parquet_path_asset], + outlets=[gene_search_service_json_asset], + dag_id=f"{dr_tag}_impc_gene_search_mapper", + description=textwrap.dedent( + """IMPC Web API gene search mapper DAG.""" + ), + tags=["impc_web_api", "gene search"], +) +@with_spark_session +def impc_gene_search_mapper(): + from pyspark.sql import SparkSession - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/gene_search_service_json" - ) + GENE_SUMMARY_MAPPINGS = { + "mgi_accession_id": "mgiGeneAccessionId", + "marker_symbol": "geneSymbol", + "marker_name": "geneName", + "marker_synonym": "synonyms", + "significant_top_level_mp_terms": "significantTopLevelPhenotypes", + "not_significant_top_level_mp_terms": "notSignificantTopLevelPhenotypes", + "embryo_data_available": "hasEmbryoImagingData", + "human_gene_symbol": "human_gene_symbols", + "human_symbol_synonym": "human_symbol_synonyms", + "production_centre": "production_centres", + "phenotyping_centre": "phenotyping_centres", + "allele_name": "allele_names", + "ensembl_gene_id": "ensembl_gene_ids", + } - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input().path, - self.output().path, - ] + def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) + spark = SparkSession.builder.getOrCreate() - # Parsing app options - gene_parquet_path = args[0] - gene_df = spark.read.parquet(gene_parquet_path) - output_path = args[1] + # Parsing app options + gene_parquet_path = gene_parquet_path_asset.uri + gene_df = spark.read.parquet(gene_parquet_path) + output_path = gene_search_service_json_asset.uri - for col_name in GENE_SUMMARY_MAPPINGS.keys(): - gene_df = gene_df.withColumnRenamed( - col_name, GENE_SUMMARY_MAPPINGS[col_name] - ) + for col_name in GENE_SUMMARY_MAPPINGS.keys(): + gene_df = gene_df.withColumnRenamed( + col_name, GENE_SUMMARY_MAPPINGS[col_name] + ) - for col_name in gene_df.columns: - gene_df = gene_df.withColumnRenamed(col_name, to_camel_case(col_name)) + for col_name in gene_df.columns: + gene_df = gene_df.withColumnRenamed(col_name, to_camel_case(col_name)) - gene_search_df = gene_df.select( - "mgiGeneAccessionId", - "geneName", - "geneSymbol", - "synonyms", - "humanGeneSymbols", - "humanSymbolSynonyms", - "esCellProductionStatus", - "mouseProductionStatus", - "phenotypeStatus", - "phenotypingDataAvailable", - ) - gene_search_df.repartition(1).write.option("ignoreNullFields", "false").json( - output_path - ) \ No newline at end of file + gene_search_df = gene_df.select( + "mgiGeneAccessionId", + "geneName", + "geneSymbol", + "synonyms", + "humanGeneSymbols", + "humanSymbolSynonyms", + "esCellProductionStatus", + "mouseProductionStatus", + "phenotypeStatus", + "phenotypingDataAvailable", + ) + gene_search_df.repartition(1).write.option("ignoreNullFields", "false").json( + output_path + ) From 7f8dda25144c32c7adf89eb641f1dec82cd8e233 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 15:09:49 +0100 Subject: [PATCH 10/16] feat: use GENE_SUMMARY_MAPPINGS from utils module --- .../impc_web_api/impc_gene_search_mapper.py | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py index cd162f6d..a56ea9e6 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py @@ -7,6 +7,7 @@ from impc_etl.utils.airflow import create_input_asset, create_output_asset from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import GENE_SUMMARY_MAPPINGS gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") gene_search_service_json_asset = create_output_asset("impc_web_api/gene_search_service_json") @@ -24,22 +25,6 @@ def impc_gene_search_mapper(): from pyspark.sql import SparkSession - GENE_SUMMARY_MAPPINGS = { - "mgi_accession_id": "mgiGeneAccessionId", - "marker_symbol": "geneSymbol", - "marker_name": "geneName", - "marker_synonym": "synonyms", - "significant_top_level_mp_terms": "significantTopLevelPhenotypes", - "not_significant_top_level_mp_terms": "notSignificantTopLevelPhenotypes", - "embryo_data_available": "hasEmbryoImagingData", - "human_gene_symbol": "human_gene_symbols", - "human_symbol_synonym": "human_symbol_synonyms", - "production_centre": "production_centres", - "phenotyping_centre": "phenotyping_centres", - "allele_name": "allele_names", - "ensembl_gene_id": "ensembl_gene_ids", - } - def to_camel_case(snake_str): components = snake_str.split("_") # We capitalize the first letter of each component except the first one From b7f28f4e5c8fb025a462c7cacf1b90c378338091 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 15:16:07 +0100 Subject: [PATCH 11/16] feat: set overwrite mode when saving results --- impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py index a56ea9e6..b12e1dd5 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py @@ -58,6 +58,6 @@ def to_camel_case(snake_str): "phenotypeStatus", "phenotypingDataAvailable", ) - gene_search_df.repartition(1).write.option("ignoreNullFields", "false").json( + gene_search_df.repartition(1).write.option("ignoreNullFields", "false").mode("overwrite").json( output_path ) From ae47f383551e3e5092eccf927b9c8a1b8c572044 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 10:32:50 +0100 Subject: [PATCH 12/16] feat: initial impc_idg_mapper airflow task --- .../jobs/load/impc_web_api/impc_idg_mapper.py | 160 ++++++++---------- 1 file changed, 69 insertions(+), 91 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py index fe81d966..4f02eb59 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py @@ -1,93 +1,71 @@ -from impc_etl.jobs.load.impc_web_api import ( - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - col, - luigi, +import logging +import textwrap +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") +idg_family_mapping_report_json_path_asset = create_input_asset("impc_web_api/IDG_TargetList_CurrentVersion.json") +ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +idg_landing_json_asset = create_output_asset("impc_web_api/idg_landing.json") + +@asset.multi( + schedule=[idg_family_mapping_report_json_path_asset], + outlets=[idg_landing_json_asset], + dag_id=f"{dr_tag}_impc_idg_landing_mapper", + description=textwrap.dedent( + """IMPC Web API IDG landing page mapper DAG.""" + ), + tags=["impc_web_api", "idg", "landing-page"], ) +@with_spark_session +def impc_idg_mapper(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + idg_family_mapping_report_json_path = idg_family_mapping_report_json_path_asset.uri + ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri + gene_parquet_path = gene_parquet_path_asset.uri + + idg_family_df = spark.read.json(idg_family_mapping_report_json_path) + ortholog_mapping_df = spark.read.csv( + ortholog_mapping_report_tsv_path, sep="\t", header=True + ) + gene_df = spark.read.parquet(gene_parquet_path) + + gene_df = gene_df.select( + "mgi_accession_id", + "marker_symbol", + "significant_top_level_mp_terms", + "not_significant_top_level_mp_terms", + "phenotype_status", + "mouse_production_status", + "es_cell_production_status", + ).distinct() + + ortholog_mapping_df = ortholog_mapping_df.select( + col("Mgi Gene Acc Id").alias("mgi_accession_id"), + col("Human Gene Symbol").alias("human_gene_symbol"), + ).distinct() + + gene_df = gene_df.join( + ortholog_mapping_df, + "mgi_accession_id", + ) + idg_family_df = idg_family_df.withColumnRenamed("Gene", "human_gene_symbol") + idg_family_df = idg_family_df.withColumnRenamed("IDGFamily", "idg_family") + gene_df = gene_df.join(idg_family_df, "human_gene_symbol") + + idg_landing_json = gene_df.rdd.map(lambda row: row.asDict(True)).collect() + output_path = unquote(urlparse(idg_landing_json_asset.uri).path) + with open(output_path, "w") as output_file: + output_file.write(json.dumps(idg_landing_json)) - -class ImpcIDGMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcIDGMapper" - - ortholog_mapping_report_tsv_path = luigi.Parameter() - idg_family_mapping_report_json_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [GeneLoader()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/idg_landing.json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.idg_family_mapping_report_json_path, - self.ortholog_mapping_report_tsv_path, - self.input()[0].path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - idg_family_mapping_report_json_path = args[0] - ortholog_mapping_report_tsv_path = args[1] - gene_parquet_path = args[2] - output_path = args[3] - - idg_family_df = spark.read.json(idg_family_mapping_report_json_path) - ortholog_mapping_df = spark.read.csv( - ortholog_mapping_report_tsv_path, sep="\t", header=True - ) - gene_df = spark.read.parquet(gene_parquet_path) - - gene_df = gene_df.select( - "mgi_accession_id", - "marker_symbol", - "significant_top_level_mp_terms", - "not_significant_top_level_mp_terms", - "phenotype_status", - "mouse_production_status", - "es_cell_production_status", - ).distinct() - - ortholog_mapping_df = ortholog_mapping_df.select( - col("Mgi Gene Acc Id").alias("mgi_accession_id"), - col("Human Gene Symbol").alias("human_gene_symbol"), - ).distinct() - - gene_df = gene_df.join( - ortholog_mapping_df, - "mgi_accession_id", - ) - idg_family_df = idg_family_df.withColumnRenamed("Gene", "human_gene_symbol") - idg_family_df = idg_family_df.withColumnRenamed("IDGFamily", "idg_family") - gene_df = gene_df.join(idg_family_df, "human_gene_symbol") - - idg_landing_json = gene_df.rdd.map(lambda row: row.asDict(True)).collect() - - with open(output_path, mode="w") as output_file: - output_file.write(json.dumps(idg_landing_json)) \ No newline at end of file From d274972bda51dc2dfb2bfd20c239cf3e4d95f87b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Pen=CC=83a?= Date: Thu, 18 Sep 2025 12:58:33 +0100 Subject: [PATCH 13/16] feat: initial impc_external_links_mapper airflow task --- .../impc_external_links_mapper.py | 473 ++++++++---------- 1 file changed, 220 insertions(+), 253 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py index 01f01362..a5d9f0c5 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py @@ -1,259 +1,226 @@ -from impc_etl.jobs.load.impc_web_api import ( - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - StringType, - StructField, - StructType, - col, - concat, - concat_ws, - lit, - lower, - luigi, - regexp_replace, - trim, +import logging +import textwrap +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +mouse_human_ortholog_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +umass_early_lethal_report_csv_path_asset = create_input_asset("impc_web_api/umass_early_lethal_report.csv") +uniprot_report_csv_path_asset = create_input_asset("impc_web_api/uniprot_report.tsv") +morphic_report_csv_path_asset = create_input_asset("impc_web_api/morphic_report.csv") +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") + +external_links_json_asset = create_output_asset("impc_web_api/external_links_json") + +@asset.multi( + schedule=[mouse_human_ortholog_report_tsv_path_asset, umass_early_lethal_report_csv_path_asset, uniprot_report_csv_path_asset, morphic_report_csv_path_asset, gene_parquet_path_asset], + outlets=[external_links_json_asset], + dag_id=f"{dr_tag}_impc_external_links_mapper", + description=textwrap.dedent( + """IMPC Web API external links mapper DAG.""" + ), + tags=["impc_web_api", "external links"], ) - - -class ImpcExternalLinksMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcExternalLinksMapper" - - mouse_human_ortholog_report_tsv_path: luigi.Parameter = luigi.Parameter() - umass_early_lethal_report_csv_path: luigi.Parameter = luigi.Parameter() - uniprot_report_csv_path: luigi.Parameter = luigi.Parameter() - morphic_report_csv_path: luigi.Parameter = luigi.Parameter() - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [GeneLoader()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/external_links_json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input()[0].path, - self.mouse_human_ortholog_report_tsv_path, - self.umass_early_lethal_report_csv_path, - self.uniprot_report_csv_path, - self.morphic_report_csv_path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - gene_parquet_path = args[0] - mouse_human_ortholog_report_tsv_path = args[1] - umass_early_lethal_report_csv_path = args[2] - uniprot_report_csv_path = args[3] - morphic_report_csv_path = args[4] - output_path = args[5] - - gene_df = spark.read.parquet(gene_parquet_path) - mouse_human_ortholog_report_df = spark.read.csv( - mouse_human_ortholog_report_tsv_path, sep="\t", header=True - ) - umass_early_lethal_report_df = spark.read.csv( - umass_early_lethal_report_csv_path, header=True, multiLine=True - ) - uniprot_report_df = spark.read.csv( - uniprot_report_csv_path, - sep="\t", - header=False, - schema=StructType( - [ - StructField("uniprot_id", StringType(), True), - StructField("uniprot_db_id", StringType(), True), - StructField("uniprot_external_id", StringType(), True), - ] - ), - ) - morphic_report_df = spark.read.csv( - morphic_report_csv_path, - header=False, - schema=StructType( - [ - StructField("morphic_human_gene_symbol", StringType(), True), - ] - ), - ) - - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "MGI Number", "mgi_accession_id" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "Description only", "description" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "Link", concat(lit("https://"), col("Link")) - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "Link", "href" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "description", regexp_replace("description", "[\b\n\r]", " ") - ) - - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "mgi_accession_id", - concat_ws(":", lit("MGI"), trim("mgi_accession_id")), - ).select("mgi_accession_id", "description", "href") - for col_name in mouse_human_ortholog_report_df.columns: - mouse_human_ortholog_report_df = ( - mouse_human_ortholog_report_df.withColumnRenamed( - col_name, col_name.replace(" ", "_").lower() - ) - ) - +@with_spark_session +def impc_external_links_mapper(): + from pyspark.sql import SparkSession, Window + from pyspark.sql.types import StructType, StructField, StringType + from pyspark.sql.functions import col, concat, lit, regexp_replace, trim, concat_ws + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + gene_parquet_path = gene_parquet_path_asset.uri + mouse_human_ortholog_report_tsv_path = mouse_human_ortholog_report_tsv_path_asset.uri + umass_early_lethal_report_csv_path = umass_early_lethal_report_csv_path_asset.uri + uniprot_report_csv_path = uniprot_report_csv_path_asset.uri + morphic_report_csv_path = morphic_report_csv_path_asset.uri + output_path = external_links_json_asset.uri + + gene_df = spark.read.parquet(gene_parquet_path) + mouse_human_ortholog_report_df = spark.read.csv( + mouse_human_ortholog_report_tsv_path, sep="\t", header=True + ) + umass_early_lethal_report_df = spark.read.csv( + umass_early_lethal_report_csv_path, header=True, multiLine=True + ) + uniprot_report_df = spark.read.csv( + uniprot_report_csv_path, + sep="\t", + header=False, + schema=StructType( + [ + StructField("uniprot_id", StringType(), True), + StructField("uniprot_db_id", StringType(), True), + StructField("uniprot_external_id", StringType(), True), + ] + ), + ) + morphic_report_df = spark.read.csv( + morphic_report_csv_path, + header=False, + schema=StructType( + [ + StructField("morphic_human_gene_symbol", StringType(), True), + ] + ), + ) + + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "MGI Number", "mgi_accession_id" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "Description only", "description" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "Link", concat(lit("https://"), col("Link")) + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "Link", "href" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "description", regexp_replace("description", "[\b\n\r]", " ") + ) + + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "mgi_accession_id", + concat_ws(":", lit("MGI"), trim("mgi_accession_id")), + ).select("mgi_accession_id", "description", "href") + for col_name in mouse_human_ortholog_report_df.columns: mouse_human_ortholog_report_df = ( mouse_human_ortholog_report_df.withColumnRenamed( - "mgi_gene_acc_id", "mgi_gene_accession_id" + col_name, col_name.replace(" ", "_").lower() ) ) - gwas_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( - "human_gene_symbol", "mgi_gene_accession_id" - ).distinct() - - gene_mgi_accession_df = ( - gene_df.select("mgi_accession_id") - .withColumnRenamed("mgi_accession_id", "mgi_gene_accession_id") - .dropDuplicates() - ) - - gwas_external_links_df = gene_mgi_accession_df.join( - gwas_mouse_human_ortholog_report_df, "mgi_gene_accession_id" - ) - - gwas_external_links_df = gwas_external_links_df.withColumnRenamed( - "mgi_gene_accession_id", "mgiGeneAccessionId" - ) - - gwas_external_links_df = gwas_external_links_df.withColumnRenamed( - "human_gene_symbol", "label" - ) - - gwas_external_links_df = gwas_external_links_df.withColumn( - "href", - concat( - lit("https://www.ebi.ac.uk/gwas/genes/"), gwas_external_links_df.label - ), - ) - gwas_external_links_df = gwas_external_links_df.withColumn( - "providerName", lit("GWAS Catalog") - ) - - gwas_external_links_df = gwas_external_links_df.withColumn( - "description", lit(None) - ) - - embryo_data_df = gene_df.select( - "mgi_accession_id", - "marker_symbol", - ).distinct() - embryo_data_df = embryo_data_df.join( - umass_early_lethal_report_df, "mgi_accession_id" - ) - - umass_external_links_df = embryo_data_df.withColumnRenamed( - "mgi_accession_id", "mgiGeneAccessionId" - ) - umass_external_links_df = umass_external_links_df.withColumnRenamed( - "marker_symbol", "label" - ) - umass_external_links_df = umass_external_links_df.withColumn( - "providerName", lit("Mager Lab Early Lethal Phenotypes") - ) - umass_external_links_df = umass_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ) - - uniprot_external_links_df = uniprot_report_df.where( - col("uniprot_db_id") == lit("MGI") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "mgiGeneAccessionId", col("uniprot_external_id") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumnRenamed( - "uniprot_id", "label" - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "providerName", lit("UniProt") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "href", - concat(lit("https://www.uniprot.org/uniprotkb/"), col("label")), - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "description", lit(None) - ) - uniprot_external_links_df = uniprot_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ).distinct() - - morphic_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( - "human_gene_symbol", "mgi_gene_accession_id", "hgnc_acc_id" - ).distinct() - - morphic_external_links_df = gene_mgi_accession_df.join( - morphic_mouse_human_ortholog_report_df, "mgi_gene_accession_id" - ) - - morphic_external_links_df = morphic_external_links_df.join( - morphic_report_df, - col("human_gene_symbol") == col("morphic_human_gene_symbol"), - ) - - morphic_external_links_df = morphic_external_links_df.withColumnRenamed( - "mgi_gene_accession_id", "mgiGeneAccessionId" - ) - - morphic_external_links_df = morphic_external_links_df.withColumnRenamed( - "human_gene_symbol", "label" - ) - - morphic_external_links_df = morphic_external_links_df.withColumn( - "href", - concat(lit("https://morphic.bio/genes/"), col("hgnc_acc_id"), lit("/")), - ) - morphic_external_links_df = morphic_external_links_df.withColumn( - "providerName", lit("MorPhiC Program") - ) - - morphic_external_links_df = morphic_external_links_df.withColumn( - "description", lit(None) - ) - - morphic_external_links_df = morphic_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ).distinct() - - external_links_df = ( - gwas_external_links_df.union(umass_external_links_df) - .union(uniprot_external_links_df) - .union(morphic_external_links_df) - ) - external_links_df.write.json(output_path, mode="overwrite") \ No newline at end of file + mouse_human_ortholog_report_df = ( + mouse_human_ortholog_report_df.withColumnRenamed( + "mgi_gene_acc_id", "mgi_gene_accession_id" + ) + ) + + gwas_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( + "human_gene_symbol", "mgi_gene_accession_id" + ).distinct() + + gene_mgi_accession_df = ( + gene_df.select("mgi_accession_id") + .withColumnRenamed("mgi_accession_id", "mgi_gene_accession_id") + .dropDuplicates() + ) + + gwas_external_links_df = gene_mgi_accession_df.join( + gwas_mouse_human_ortholog_report_df, "mgi_gene_accession_id" + ) + + gwas_external_links_df = gwas_external_links_df.withColumnRenamed( + "mgi_gene_accession_id", "mgiGeneAccessionId" + ) + + gwas_external_links_df = gwas_external_links_df.withColumnRenamed( + "human_gene_symbol", "label" + ) + + gwas_external_links_df = gwas_external_links_df.withColumn( + "href", + concat( + lit("https://www.ebi.ac.uk/gwas/genes/"), gwas_external_links_df.label + ), + ) + gwas_external_links_df = gwas_external_links_df.withColumn( + "providerName", lit("GWAS Catalog") + ) + + gwas_external_links_df = gwas_external_links_df.withColumn( + "description", lit(None) + ) + + embryo_data_df = gene_df.select( + "mgi_accession_id", + "marker_symbol", + ).distinct() + embryo_data_df = embryo_data_df.join( + umass_early_lethal_report_df, "mgi_accession_id" + ) + + umass_external_links_df = embryo_data_df.withColumnRenamed( + "mgi_accession_id", "mgiGeneAccessionId" + ) + umass_external_links_df = umass_external_links_df.withColumnRenamed( + "marker_symbol", "label" + ) + umass_external_links_df = umass_external_links_df.withColumn( + "providerName", lit("Mager Lab Early Lethal Phenotypes") + ) + umass_external_links_df = umass_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ) + + uniprot_external_links_df = uniprot_report_df.where( + col("uniprot_db_id") == lit("MGI") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "mgiGeneAccessionId", col("uniprot_external_id") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumnRenamed( + "uniprot_id", "label" + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "providerName", lit("UniProt") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "href", + concat(lit("https://www.uniprot.org/uniprotkb/"), col("label")), + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "description", lit(None) + ) + uniprot_external_links_df = uniprot_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ).distinct() + + morphic_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( + "human_gene_symbol", "mgi_gene_accession_id", "hgnc_acc_id" + ).distinct() + + morphic_external_links_df = gene_mgi_accession_df.join( + morphic_mouse_human_ortholog_report_df, "mgi_gene_accession_id" + ) + + morphic_external_links_df = morphic_external_links_df.join( + morphic_report_df, + col("human_gene_symbol") == col("morphic_human_gene_symbol"), + ) + + morphic_external_links_df = morphic_external_links_df.withColumnRenamed( + "mgi_gene_accession_id", "mgiGeneAccessionId" + ) + + morphic_external_links_df = morphic_external_links_df.withColumnRenamed( + "human_gene_symbol", "label" + ) + + morphic_external_links_df = morphic_external_links_df.withColumn( + "href", + concat(lit("https://morphic.bio/genes/"), col("hgnc_acc_id"), lit("/")), + ) + morphic_external_links_df = morphic_external_links_df.withColumn( + "providerName", lit("MorPhiC Program") + ) + + morphic_external_links_df = morphic_external_links_df.withColumn( + "description", lit(None) + ) + + morphic_external_links_df = morphic_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ).distinct() + + external_links_df = ( + gwas_external_links_df.union(umass_external_links_df) + .union(uniprot_external_links_df) + .union(morphic_external_links_df) + ) + external_links_df.write.json(output_path, mode="overwrite") From 2b84b8e7d64158888199d121aa7f5092b84556de Mon Sep 17 00:00:00 2001 From: Robert Wilson Date: Fri, 26 Sep 2025 10:09:24 +0100 Subject: [PATCH 14/16] Migration of the impc datasets metadata mapper to Airflow. The feat/impc-web-api branch is merged into this branch, but has not been altered. The code was modified to include try_cast operations to handle to differences with the way Spark 4.0 handles null values. --- .../impc_datasets_metadata_mapper.py | 816 +++++++++--------- 1 file changed, 405 insertions(+), 411 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py index 2b67818c..3c446e8e 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py @@ -1,452 +1,446 @@ -from impc_etl.jobs.load.impc_web_api import ( - DoubleType, - ImpcConfig, - ImpressToParameterMapper, - IntegerType, - PySparkTask, - SparkContext, - SparkSession, - StatsResultsMapper, - array, - col, - concat_ws, - explode, - lit, - luigi, - phenotype_term_zip_udf, - regexp_replace, - split, - struct, - to_camel_case, - when, - zip_with, -) +""" + PySpark task to create JSON for the datasets_metadata_service + of the website from the impress_parameter_parquet (pipeline parquet) + and the stats_results_parquet. +""" +import logging +import textwrap +from airflow.sdk import Variable, asset -class ImpcDatasetsMetadataMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session - #: Name of the Spark task - name: str = "ImpcDatasetsMetadataMapper" - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") - def requires(self): - return [ - ImpressToParameterMapper(), - StatsResultsMapper(), - ] +impress_parameter_parquet_path_asset = create_input_asset("output/impress_parameter_parquet") +statistical_results_raw_data_include_parquet_path_asset = create_input_asset("output/statistical_results_raw_data_include_parquet") - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/datasets_metadata_service_json" - ) +datasets_metadata_service_output_asset = create_output_asset("impc_web_api/datasets_metadata_service_json") - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input()[0].path, - self.input()[1].path, - self.output().path, - ] - def main(self, sc: SparkContext, *args): +@asset.multi( + schedule=[impress_parameter_parquet_path_asset, statistical_results_raw_data_include_parquet_path_asset], + outlets=[datasets_metadata_service_output_asset], + dag_id=f"{dr_tag}_impc_datasets_metadata_mapper", + description=textwrap.dedent( """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. + PySpark task to create JSON for the datasets_metadata_service + of the website from the impress_parameter_parquet (pipeline parquet) + and the stats_results_parquet. """ - spark = SparkSession(sc) + ), + tags=["impc_web_api"], +) +@with_spark_session +def impc_datasets_metadata_mapper(): - # Parsing app options - impress_parameter_parquet_path = args[0] - stats_results_parquet_path = args[1] - output_path = args[2] - stats_results_df = spark.read.parquet(stats_results_parquet_path) - explode_cols = [ - "procedure_stable_key", - "procedure_stable_id", - "procedure_name", - "parameter_stable_key", - "project_name", - "life_stage_name", - "life_stage_acc", - ] - impress_parameter_df = spark.read.parquet(impress_parameter_parquet_path) - unit_df = impress_parameter_df.select( - "fully_qualified_name", - col("unit_x").alias("x"), - col("unit_y").alias("y"), - col("categories").alias("parameter_category_list"), - ) - unit_df = unit_df.withColumn("unit", struct("x", "y")) - unit_df = unit_df.drop("x", "y") - stats_results_df = stats_results_df.withColumn( - "fully_qualified_name", - concat_ws( - "_", "pipeline_stable_id", "procedure_stable_id", "parameter_stable_id" - ), - ) - stats_results_df = stats_results_df.join( - unit_df, "fully_qualified_name", "left_outer" - ).drop("fully_qualified_name") - col_name_map = { - "doc_id": "datasetId", - "marker_accession_id": "mgiGeneAccessionId", - "marker_symbol": "geneSymbol", - "metadata": "metadataValues", - "production_center": "productionCentre", - "phenotyping_center": "phenotypingCentre", - "resource_fullname": "resourceFullName", - "statistical_method": "name", - "soft_windowing_bandwidth": "bandwidth", - "soft_windowing_shape": "shape", - "soft_windowing_peaks": "peaks", - "soft_windowing_min_obs_required": "minObsRequired", - "soft_windowing_total_obs_or_weight": "totalObsOrWeight", - "soft_windowing_threshold": "threshold", - "soft_windowing_number_of_doe": "numberOfDoe", - "soft_windowing_doe_note": "doeNote", - "effect_size": "reportedEffectSize", - "p_value": "reportedPValue", - } + from impc_etl.utils.impc_web_api import to_camel_case, phenotype_term_zip_udf + + from pyspark.sql import SparkSession + from pyspark.sql.functions import ( + col, + explode, + zip_with, + struct, + concat_ws, + split, + regexp_replace, + when, + lit, + ) + from pyspark.sql.types import DoubleType, IntegerType, ArrayType + + spark = SparkSession.builder.getOrCreate() - new_structs_dict = { - "summaryStatistics": [ - "female_control_count", - "female_control_mean", - "female_control_sd", - "male_control_count", - "male_control_mean", - "male_control_sd", - "female_mutant_count", - "female_mutant_mean", - "female_mutant_sd", - "male_mutant_count", - "male_mutant_mean", - "male_mutant_sd", - "both_mutant_count", - "both_mutant_mean", - "both_mutant_sd", - ], - "statisticalMethod": [ - "statistical_method", - { - "attributes": [ - "female_ko_effect_p_value", - "female_ko_effect_stderr_estimate", - "female_ko_parameter_estimate", - "female_percentage_change", - "male_ko_effect_p_value", - "male_ko_effect_stderr_estimate", - "male_ko_parameter_estimate", - "male_percentage_change", - "genotype_effect_p_value", - "genotype_effect_stderr_estimate", - "group_1_genotype", - "group_1_residuals_normality_test", - "group_2_genotype", - "group_2_residuals_normality_test", - "female_effect_size_low_normal_vs_high", - "female_effect_size_low_vs_normal_high", - "female_pvalue_low_normal_vs_high", - "female_pvalue_low_vs_normal_high", - "male_effect_size_low_normal_vs_high", - "male_effect_size_low_vs_normal_high", - "male_pvalue_low_normal_vs_high", - "male_pvalue_low_vs_normal_high", - "genotype_effect_size_low_normal_vs_high", - "genotype_effect_size_low_vs_normal_high", - "genotype_pvalue_low_normal_vs_high", - "genotype_pvalue_low_vs_normal_high", - "interaction_effect_p_value", - "interaction_significant", - "intercept_estimate", - "intercept_estimate_stderr_estimate", - "sex_effect_p_value", - "sex_effect_parameter_estimate", - "sex_effect_stderr_estimate", - "male_effect_size", - "female_effect_size", - "batch_significant", - "variance_significant", - "weight_effect_p_value", - "weight_effect_parameter_estimate", - "weight_effect_stderr_estimate", - ] - }, - ], - "softWindowing": [ - "soft_windowing_bandwidth", - "soft_windowing_shape", - "soft_windowing_peaks", - "soft_windowing_min_obs_required", - "soft_windowing_total_obs_or_weight", - "soft_windowing_threshold", - "soft_windowing_number_of_doe", - "soft_windowing_doe_note", - ], - } - int_columns = [ + stats_results_df = spark.read.parquet(statistical_results_raw_data_include_parquet_path_asset.uri) + explode_cols = [ + "procedure_stable_key", + "procedure_stable_id", + "procedure_name", + "parameter_stable_key", + "project_name", + "life_stage_name", + "life_stage_acc", + ] + impress_parameter_df = spark.read.parquet(impress_parameter_parquet_path_asset.uri) + unit_df = impress_parameter_df.select( + "fully_qualified_name", + col("unit_x").alias("x"), + col("unit_y").alias("y"), + col("categories").alias("parameter_category_list"), + ) + unit_df = unit_df.withColumn("unit", struct("x", "y")) + unit_df = unit_df.drop("x", "y") + stats_results_df = stats_results_df.withColumn( + "fully_qualified_name", + concat_ws( + "_", "pipeline_stable_id", "procedure_stable_id", "parameter_stable_id" + ), + ) + stats_results_df = stats_results_df.join( + unit_df, "fully_qualified_name", "left_outer" + ).drop("fully_qualified_name") + col_name_map = { + "doc_id": "datasetId", + "marker_accession_id": "mgiGeneAccessionId", + "marker_symbol": "geneSymbol", + "metadata": "metadataValues", + "production_center": "productionCentre", + "phenotyping_center": "phenotypingCentre", + "resource_fullname": "resourceFullName", + "statistical_method": "name", + "soft_windowing_bandwidth": "bandwidth", + "soft_windowing_shape": "shape", + "soft_windowing_peaks": "peaks", + "soft_windowing_min_obs_required": "minObsRequired", + "soft_windowing_total_obs_or_weight": "totalObsOrWeight", + "soft_windowing_threshold": "threshold", + "soft_windowing_number_of_doe": "numberOfDoe", + "soft_windowing_doe_note": "doeNote", + "effect_size": "reportedEffectSize", + "p_value": "reportedPValue", + } + + new_structs_dict = { + "summaryStatistics": [ "female_control_count", + "female_control_mean", + "female_control_sd", "male_control_count", + "male_control_mean", + "male_control_sd", "female_mutant_count", + "female_mutant_mean", + "female_mutant_sd", "male_mutant_count", + "male_mutant_mean", + "male_mutant_sd", "both_mutant_count", - ] - double_columns = [ - "p_value", - "effect_size", - "female_ko_effect_p_value", - "female_ko_parameter_estimate", - "female_percentage_change", - "genotype_effect_p_value", - "male_ko_effect_p_value", - "male_ko_parameter_estimate", - "male_percentage_change", - ] + "both_mutant_mean", + "both_mutant_sd", + ], + "statisticalMethod": [ + "statistical_method", + { + "attributes": [ + "female_ko_effect_p_value", + "female_ko_effect_stderr_estimate", + "female_ko_parameter_estimate", + "female_percentage_change", + "male_ko_effect_p_value", + "male_ko_effect_stderr_estimate", + "male_ko_parameter_estimate", + "male_percentage_change", + "genotype_effect_p_value", + "genotype_effect_stderr_estimate", + "group_1_genotype", + "group_1_residuals_normality_test", + "group_2_genotype", + "group_2_residuals_normality_test", + "female_effect_size_low_normal_vs_high", + "female_effect_size_low_vs_normal_high", + "female_pvalue_low_normal_vs_high", + "female_pvalue_low_vs_normal_high", + "male_effect_size_low_normal_vs_high", + "male_effect_size_low_vs_normal_high", + "male_pvalue_low_normal_vs_high", + "male_pvalue_low_vs_normal_high", + "genotype_effect_size_low_normal_vs_high", + "genotype_effect_size_low_vs_normal_high", + "genotype_pvalue_low_normal_vs_high", + "genotype_pvalue_low_vs_normal_high", + "interaction_effect_p_value", + "interaction_significant", + "intercept_estimate", + "intercept_estimate_stderr_estimate", + "sex_effect_p_value", + "sex_effect_parameter_estimate", + "sex_effect_stderr_estimate", + "male_effect_size", + "female_effect_size", + "batch_significant", + "variance_significant", + "weight_effect_p_value", + "weight_effect_parameter_estimate", + "weight_effect_stderr_estimate", + ] + }, + ], + "softWindowing": [ + "soft_windowing_bandwidth", + "soft_windowing_shape", + "soft_windowing_peaks", + "soft_windowing_min_obs_required", + "soft_windowing_total_obs_or_weight", + "soft_windowing_threshold", + "soft_windowing_number_of_doe", + "soft_windowing_doe_note", + ], + } - for col_name in int_columns: - stats_results_df = stats_results_df.withColumn( - col_name, col(col_name).astype(IntegerType()) - ) - for col_name in double_columns: - stats_results_df = stats_results_df.withColumn( - col_name, col(col_name).astype(DoubleType()) - ) - - for col_name in explode_cols: - stats_results_df = stats_results_df.withColumn(col_name, explode(col_name)) - stats_results_df = stats_results_df.select( - "doc_id", # statisticalResultId - "strain_accession_id", - "strain_name", - "genetic_background", - "colony_id", - "marker_accession_id", # mgiGeneAccessionId - "marker_symbol", # geneSymbol - "allele_accession_id", - "allele_name", - "allele_symbol", - "metadata_group", - "metadata", # metadataValues - "life_stage_acc", # explode - "life_stage_name", # explode - "data_type", - "production_center", # productionCentre - "phenotyping_center", # phenotypingCentre - "project_name", - "resource_name", - "resource_fullname", # resourceFullName - "pipeline_stable_key", - "pipeline_stable_id", - "pipeline_name", - "procedure_stable_key", # explode - "procedure_stable_id", # explode - "procedure_name", # explode - "procedure_group", - "parameter_stable_key", # explode - "parameter_stable_id", - "parameter_name", - "female_control_count", # group under summaryStatistics - "female_control_mean", # group under summaryStatistics - "female_control_sd", # group under summaryStatistics - "male_control_count", # group under summaryStatistics - "male_control_mean", # group under summaryStatistics - "male_control_sd", # group under summaryStatistics - "female_mutant_count", # group under summaryStatistics - "female_mutant_mean", # group under summaryStatistics - "female_mutant_sd", # group under summaryStatistics - "male_mutant_count", # group under summaryStatistics - "male_mutant_mean", # group under summaryStatistics - "male_mutant_sd", # group under summaryStatistics - "both_mutant_count", # group under summaryStatistics - "both_mutant_mean", # group under summaryStatistics - "both_mutant_sd", # group under summaryStatistics - "status", - "sex", # phenotypeSex - "zygosity", - "phenotype_sex", # testedSexes - "significant", - "classification_tag", - "p_value", # reportedPValue - "effect_size", # reportedEffectSize - "statistical_method", # name, group under statisticalMethod - "female_ko_effect_p_value", # group under statisticalMethod.attributes - "female_ko_effect_stderr_estimate", # group under statisticalMethod.attributes - "female_ko_parameter_estimate", # group under statisticalMethod.attributes - "female_percentage_change", # group under statisticalMethod.attributes - "male_ko_effect_p_value", # group under statisticalMethod.attributes - "male_ko_effect_stderr_estimate", # group under statisticalMethod.attributes - "male_ko_parameter_estimate", # group under statisticalMethod.attributes - "male_percentage_change", # group under statisticalMethod.attributes - "genotype_effect_p_value", # group under statisticalMethod.attributes - "genotype_effect_stderr_estimate", # group under statisticalMethod.attributes - "group_1_genotype", # group under statisticalMethod.attributes - "group_1_residuals_normality_test", # group under statisticalMethod.attributes - "group_2_genotype", # group under statisticalMethod.attributes - "group_2_residuals_normality_test", # group under statisticalMethod.attributes - "female_effect_size_low_normal_vs_high", # group under statisticalMethod.attributes - "female_effect_size_low_vs_normal_high", # group under statisticalMethod.attributes - "female_pvalue_low_normal_vs_high", # group under statisticalMethod.attributes - "female_pvalue_low_vs_normal_high", # group under statisticalMethod.attributes - "male_effect_size_low_normal_vs_high", # group under statisticalMethod.attributes - "male_effect_size_low_vs_normal_high", # group under statisticalMethod.attributes - "male_pvalue_low_normal_vs_high", # group under statisticalMethod.attributes - "male_pvalue_low_vs_normal_high", # group under statisticalMethod.attributes - "genotype_effect_size_low_normal_vs_high", # group under statisticalMethod.attributes - "genotype_effect_size_low_vs_normal_high", # group under statisticalMethod.attributes - "genotype_pvalue_low_normal_vs_high", # group under statisticalMethod.attributes - "genotype_pvalue_low_vs_normal_high", # group under statisticalMethod.attributes - "interaction_effect_p_value", # group under statisticalMethod.attributes - "interaction_significant", # group under statisticalMethod.attributes - "intercept_estimate", # group under statisticalMethod.attributes - "intercept_estimate_stderr_estimate", # group under statisticalMethod.attributes - "sex_effect_p_value", # group under statisticalMethod.attributes - "sex_effect_parameter_estimate", # group under statisticalMethod.attributes - "sex_effect_stderr_estimate", # group under statisticalMethod.attributes - "male_effect_size", # group under statisticalMethod.attributes - "female_effect_size", # group under statisticalMethod.attributes - "batch_significant", # group under statisticalMethod.attributes - "variance_significant", # group under statisticalMethod.attributes - "soft_windowing_bandwidth", # bandwidth, group under softWindowing - "soft_windowing_shape", # shape, group under softWindowing - "soft_windowing_peaks", # peaks, group under softWindowing - "soft_windowing_min_obs_required", # minObsRequired, group under softWindowing - "soft_windowing_total_obs_or_weight", # totalObsOrWeight, group under softWindowing - "soft_windowing_threshold", # threshold, group under softWindowing - "soft_windowing_number_of_doe", # numberOfDoe, group under softWindowing - "soft_windowing_doe_note", # doeNote, group under softWindowing - "mp_term_id", - "mp_term_name", - "intermediate_mp_term_id", - "intermediate_mp_term_name", - "top_level_mp_term_id", - "top_level_mp_term_name", - "mp_term_id_options", - "mp_term_name_options", - "unit", - "parameter_category_list", - "weight_effect_p_value", - "weight_effect_parameter_estimate", - "weight_effect_stderr_estimate", - ) + int_columns = [ + "female_control_count", + "male_control_count", + "female_mutant_count", + "male_mutant_count", + "both_mutant_count", + ] + double_columns = [ + "p_value", + "effect_size", + "female_ko_effect_p_value", + "female_ko_parameter_estimate", + "female_percentage_change", + "genotype_effect_p_value", + "male_ko_effect_p_value", + "male_ko_parameter_estimate", + "male_percentage_change", + ] + for col_name in int_columns: stats_results_df = stats_results_df.withColumn( - "soft_windowing_peaks", - split(regexp_replace("soft_windowing_peaks", "\[|\]", ""), ",").cast( - "array" - ), + col_name, col(col_name).astype(IntegerType()) ) - + for col_name in double_columns: stats_results_df = stats_results_df.withColumn( - "significantPhenotype", - when( - col("mp_term_id").isNotNull(), - struct( - col("mp_term_id").alias("id"), col("mp_term_name").alias("name") - ), - ).otherwise(lit(None)), + col_name, col(col_name).try_cast(DoubleType()) ) - stats_results_df = stats_results_df.withColumn( - "intermediatePhenotypes", - zip_with( - "intermediate_mp_term_id", - "intermediate_mp_term_name", - phenotype_term_zip_udf, - ), - ) + for col_name in explode_cols: + stats_results_df = stats_results_df.withColumn(col_name, explode(col_name)) + stats_results_df = stats_results_df.select( + "doc_id", # statisticalResultId + "strain_accession_id", + "strain_name", + "genetic_background", + "colony_id", + "marker_accession_id", # mgiGeneAccessionId + "marker_symbol", # geneSymbol + "allele_accession_id", + "allele_name", + "allele_symbol", + "metadata_group", + "metadata", # metadataValues + "life_stage_acc", # explode + "life_stage_name", # explode + "data_type", + "production_center", # productionCentre + "phenotyping_center", # phenotypingCentre + "project_name", + "resource_name", + "resource_fullname", # resourceFullName + "pipeline_stable_key", + "pipeline_stable_id", + "pipeline_name", + "procedure_stable_key", # explode + "procedure_stable_id", # explode + "procedure_name", # explode + "procedure_group", + "parameter_stable_key", # explode + "parameter_stable_id", + "parameter_name", + "female_control_count", # group under summaryStatistics + "female_control_mean", # group under summaryStatistics + "female_control_sd", # group under summaryStatistics + "male_control_count", # group under summaryStatistics + "male_control_mean", # group under summaryStatistics + "male_control_sd", # group under summaryStatistics + "female_mutant_count", # group under summaryStatistics + "female_mutant_mean", # group under summaryStatistics + "female_mutant_sd", # group under summaryStatistics + "male_mutant_count", # group under summaryStatistics + "male_mutant_mean", # group under summaryStatistics + "male_mutant_sd", # group under summaryStatistics + "both_mutant_count", # group under summaryStatistics + "both_mutant_mean", # group under summaryStatistics + "both_mutant_sd", # group under summaryStatistics + "status", + "sex", # phenotypeSex + "zygosity", + "phenotype_sex", # testedSexes + "significant", + "classification_tag", + "p_value", # reportedPValue + "effect_size", # reportedEffectSize + "statistical_method", # name, group under statisticalMethod + "female_ko_effect_p_value", # group under statisticalMethod.attributes + "female_ko_effect_stderr_estimate", # group under statisticalMethod.attributes + "female_ko_parameter_estimate", # group under statisticalMethod.attributes + "female_percentage_change", # group under statisticalMethod.attributes + "male_ko_effect_p_value", # group under statisticalMethod.attributes + "male_ko_effect_stderr_estimate", # group under statisticalMethod.attributes + "male_ko_parameter_estimate", # group under statisticalMethod.attributes + "male_percentage_change", # group under statisticalMethod.attributes + "genotype_effect_p_value", # group under statisticalMethod.attributes + "genotype_effect_stderr_estimate", # group under statisticalMethod.attributes + "group_1_genotype", # group under statisticalMethod.attributes + "group_1_residuals_normality_test", # group under statisticalMethod.attributes + "group_2_genotype", # group under statisticalMethod.attributes + "group_2_residuals_normality_test", # group under statisticalMethod.attributes + "female_effect_size_low_normal_vs_high", # group under statisticalMethod.attributes + "female_effect_size_low_vs_normal_high", # group under statisticalMethod.attributes + "female_pvalue_low_normal_vs_high", # group under statisticalMethod.attributes + "female_pvalue_low_vs_normal_high", # group under statisticalMethod.attributes + "male_effect_size_low_normal_vs_high", # group under statisticalMethod.attributes + "male_effect_size_low_vs_normal_high", # group under statisticalMethod.attributes + "male_pvalue_low_normal_vs_high", # group under statisticalMethod.attributes + "male_pvalue_low_vs_normal_high", # group under statisticalMethod.attributes + "genotype_effect_size_low_normal_vs_high", # group under statisticalMethod.attributes + "genotype_effect_size_low_vs_normal_high", # group under statisticalMethod.attributes + "genotype_pvalue_low_normal_vs_high", # group under statisticalMethod.attributes + "genotype_pvalue_low_vs_normal_high", # group under statisticalMethod.attributes + "interaction_effect_p_value", # group under statisticalMethod.attributes + "interaction_significant", # group under statisticalMethod.attributes + "intercept_estimate", # group under statisticalMethod.attributes + "intercept_estimate_stderr_estimate", # group under statisticalMethod.attributes + "sex_effect_p_value", # group under statisticalMethod.attributes + "sex_effect_parameter_estimate", # group under statisticalMethod.attributes + "sex_effect_stderr_estimate", # group under statisticalMethod.attributes + "male_effect_size", # group under statisticalMethod.attributes + "female_effect_size", # group under statisticalMethod.attributes + "batch_significant", # group under statisticalMethod.attributes + "variance_significant", # group under statisticalMethod.attributes + "soft_windowing_bandwidth", # bandwidth, group under softWindowing + "soft_windowing_shape", # shape, group under softWindowing + "soft_windowing_peaks", # peaks, group under softWindowing + "soft_windowing_min_obs_required", # minObsRequired, group under softWindowing + "soft_windowing_total_obs_or_weight", # totalObsOrWeight, group under softWindowing + "soft_windowing_threshold", # threshold, group under softWindowing + "soft_windowing_number_of_doe", # numberOfDoe, group under softWindowing + "soft_windowing_doe_note", # doeNote, group under softWindowing + "mp_term_id", + "mp_term_name", + "intermediate_mp_term_id", + "intermediate_mp_term_name", + "top_level_mp_term_id", + "top_level_mp_term_name", + "mp_term_id_options", + "mp_term_name_options", + "unit", + "parameter_category_list", + "weight_effect_p_value", + "weight_effect_parameter_estimate", + "weight_effect_stderr_estimate", + ) - stats_results_df = stats_results_df.withColumn( - "topLevelPhenotypes", - zip_with( - "top_level_mp_term_id", - "top_level_mp_term_name", - phenotype_term_zip_udf, - ), - ) + stats_results_temp_df = stats_results_df.withColumn( + "soft_windowing_peaks", + split(regexp_replace("soft_windowing_peaks", "\[|\]", ""), ","), + ) + stats_results_temp_df.printSchema() + stats_results_temp_df.select("soft_windowing_peaks").show(500) - stats_results_df = stats_results_df.withColumn( - "potentialPhenotypes", - zip_with( - "mp_term_id_options", - "mp_term_name_options", - phenotype_term_zip_udf, + stats_results_df = stats_results_df.withColumn( + "soft_windowing_peaks", + split(regexp_replace("soft_windowing_peaks", "\[|\]", ""), ",").try_cast( + ArrayType(IntegerType()) + ), + ) + + stats_results_df = stats_results_df.withColumn( + "significantPhenotype", + when( + col("mp_term_id").isNotNull(), + struct( + col("mp_term_id").alias("id"), col("mp_term_name").alias("name") ), - ) + ).otherwise(lit(None)), + ) - stats_results_df = stats_results_df.drop( - "mp_term_id", - "mp_term_name", - "top_level_mp_term_id", - "top_level_mp_term_name", + stats_results_df = stats_results_df.withColumn( + "intermediatePhenotypes", + zip_with( "intermediate_mp_term_id", "intermediate_mp_term_name", + phenotype_term_zip_udf, + ), + ) + + stats_results_df = stats_results_df.withColumn( + "topLevelPhenotypes", + zip_with( + "top_level_mp_term_id", + "top_level_mp_term_name", + phenotype_term_zip_udf, + ), + ) + + stats_results_df = stats_results_df.withColumn( + "potentialPhenotypes", + zip_with( "mp_term_id_options", "mp_term_name_options", - ) + phenotype_term_zip_udf, + ), + ) - stats_results_df = stats_results_df.withColumnRenamed( - "marker_accession_id", "mgiGeneAccessionId" - ) - stats_results_df = stats_results_df.withColumnRenamed("doc_id", "datasetId") - stats_results_df = stats_results_df.withColumn("id", col("datasetId")) + stats_results_df = stats_results_df.drop( + "mp_term_id", + "mp_term_name", + "top_level_mp_term_id", + "top_level_mp_term_name", + "intermediate_mp_term_id", + "intermediate_mp_term_name", + "mp_term_id_options", + "mp_term_name_options", + ) + + stats_results_df = stats_results_df.withColumnRenamed( + "marker_accession_id", "mgiGeneAccessionId" + ) + stats_results_df = stats_results_df.withColumnRenamed("doc_id", "datasetId") + stats_results_df = stats_results_df.withColumn("id", col("datasetId")) - drop_list = [] - for struct_col_name in new_structs_dict.keys(): - columns = new_structs_dict[struct_col_name] - fields = [] - for column in columns: - if type(column) == str: + drop_list = [] + for struct_col_name in new_structs_dict.keys(): + columns = new_structs_dict[struct_col_name] + fields = [] + for column in columns: + if type(column) == str: + fields.append( + col(column).alias(col_name_map[column]) + if column in col_name_map + else col(column).alias(to_camel_case(column)) + ) + drop_list.append(column) + else: + for sub_field_name in column.keys(): fields.append( - col(column).alias(col_name_map[column]) - if column in col_name_map - else col(column).alias(to_camel_case(column)) - ) - drop_list.append(column) - else: - for sub_field_name in column.keys(): - fields.append( - struct( - *[ - col(sub_column).alias(col_name_map[sub_column]) - if sub_column in col_name_map - else col(sub_column).alias( - to_camel_case( - sub_column.replace("pvalue", "p_value") - ) + struct( + *[ + col(sub_column).alias(col_name_map[sub_column]) + if sub_column in col_name_map + else col(sub_column).alias( + to_camel_case( + sub_column.replace("pvalue", "p_value") ) - for sub_column in column[sub_field_name] - ] - ).alias(sub_field_name) - ) - for sub_column in column[sub_field_name]: - drop_list.append(sub_column) + ) + for sub_column in column[sub_field_name] + ] + ).alias(sub_field_name) + ) + for sub_column in column[sub_field_name]: + drop_list.append(sub_column) - stats_results_df = stats_results_df.withColumn( - struct_col_name, - struct(*fields), - ) + stats_results_df = stats_results_df.withColumn( + struct_col_name, + struct(*fields), + ) - stats_results_df = stats_results_df.drop(*drop_list) + stats_results_df = stats_results_df.drop(*drop_list) - for col_name in stats_results_df.columns: - stats_results_df = stats_results_df.withColumnRenamed( - col_name, - col_name_map[col_name] - if col_name in col_name_map - else to_camel_case(col_name), - ) - stats_results_df.distinct().repartition(1000).write.option( - "ignoreNullFields", "false" - ).json(output_path) \ No newline at end of file + for col_name in stats_results_df.columns: + stats_results_df = stats_results_df.withColumnRenamed( + col_name, + col_name_map[col_name] + if col_name in col_name_map + else to_camel_case(col_name), + ) + stats_results_df.distinct().repartition(1000).write.option( + "ignoreNullFields", "false" + ).json(datasets_metadata_service_output_asset.uri) \ No newline at end of file From b869decbf79fc4a816bb4747028d74bee731a3f2 Mon Sep 17 00:00:00 2001 From: Robert Wilson Date: Fri, 26 Sep 2025 10:12:04 +0100 Subject: [PATCH 15/16] Removed some print statements used for testing. --- .../load/impc_web_api/impc_datasets_metadata_mapper.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py index 3c446e8e..c774257a 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py @@ -328,13 +328,6 @@ def impc_datasets_metadata_mapper(): "weight_effect_stderr_estimate", ) - stats_results_temp_df = stats_results_df.withColumn( - "soft_windowing_peaks", - split(regexp_replace("soft_windowing_peaks", "\[|\]", ""), ","), - ) - stats_results_temp_df.printSchema() - stats_results_temp_df.select("soft_windowing_peaks").show(500) - stats_results_df = stats_results_df.withColumn( "soft_windowing_peaks", split(regexp_replace("soft_windowing_peaks", "\[|\]", ""), ",").try_cast( From 61f1ee8c1184beda940bad72f9bb61c0e5c906cb Mon Sep 17 00:00:00 2001 From: Robert Wilson Date: Fri, 26 Sep 2025 10:23:22 +0100 Subject: [PATCH 16/16] Modified the output to specify the mode as overwrite. --- .../jobs/load/impc_web_api/impc_datasets_metadata_mapper.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py index c774257a..1ada49e9 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_datasets_metadata_mapper.py @@ -436,4 +436,4 @@ def impc_datasets_metadata_mapper(): ) stats_results_df.distinct().repartition(1000).write.option( "ignoreNullFields", "false" - ).json(datasets_metadata_service_output_asset.uri) \ No newline at end of file + ).json(datasets_metadata_service_output_asset.uri, mode="overwrite") \ No newline at end of file