diff --git a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py index 409727bb..f579a40e 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_batch_query_mapper.py @@ -1,124 +1,100 @@ -from impc_etl.jobs.load.impc_web_api import ( - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - col, - collect_set, - explode_outer, - luigi, - phenotype_term_zip_udf, +import logging +import textwrap +from airflow.sdk import Variable, asset + +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import phenotype_term_zip_udf + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +mp_hp_matches_csv_path_asset = create_input_asset("impc_web_api/mp_hp_matches.csv") +gene_stats_results_json_path_asset = create_input_asset("output/impc_web_api/gene_statistical_results_service_json") + +batch_query_data_parquet_asset = create_output_asset("impc_web_api/batch_query_data_parquet") + +@asset.multi( + schedule=[ortholog_mapping_report_tsv_path_asset, mp_hp_matches_csv_path_asset, gene_stats_results_json_path_asset], + outlets=[batch_query_data_parquet_asset], + dag_id=f"{dr_tag}_impc_batch_query_mapper", + description=textwrap.dedent( + """IMPC Web API batch query mapper DAG.""" + ), + tags=["impc_web_api", "batch query"], ) - - -class ImpcBatchQueryMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcBatchQueryMapper" - - ortholog_mapping_report_tsv_path = luigi.Parameter() - mp_hp_matches_csv_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [ImpcGeneStatsResultsMapper()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/batch_query_data_parquet" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.ortholog_mapping_report_tsv_path, - self.mp_hp_matches_csv_path, - self.input()[0].path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - ortholog_mapping_report_tsv_path = args[0] - mp_hp_matches_csv_path = args[1] - gene_stats_results_json_path = args[2] - output_path = args[3] - - ortholog_mapping_df = spark.read.csv( - ortholog_mapping_report_tsv_path, sep="\t", header=True - ) - stats_results = spark.read.json(gene_stats_results_json_path) - - ortholog_mapping_df = ortholog_mapping_df.select( - col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"), - col("Human Gene Symbol").alias("humanGeneSymbol"), - col("Hgnc Acc Id").alias("hgncGeneAccessionId"), - ).distinct() - - stats_results = stats_results.join( - ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer" - ) - - mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True) - mp_matches_df = mp_matches_df.select( - col("curie_x").alias("id"), - col("curie_y").alias("hp_term_id"), - col("label_y").alias("hp_term_name"), - ).distinct() - - stats_mp_hp_df = stats_results.select( - "statisticalResultId", - "potentialPhenotypes", - "intermediatePhenotypes", - "topLevelPhenotypes", - "significantPhenotype", +@with_spark_session +def impc_batch_query_mapper(): + from pyspark.sql import SparkSession + from pyspark.sql.functions import col, explode_outer, collect_set, when, struct, lit + + spark = SparkSession.builder.getOrCreate() + + ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri + mp_hp_matches_csv_path = mp_hp_matches_csv_path_asset.uri + gene_stats_results_json_path = gene_stats_results_json_path_asset.uri + output_path = batch_query_data_parquet_asset.uri + + ortholog_mapping_df = spark.read.csv( + ortholog_mapping_report_tsv_path, sep="\t", header=True + ) + stats_results = spark.read.json(gene_stats_results_json_path) + + ortholog_mapping_df = ortholog_mapping_df.select( + col("Mgi Gene Acc Id").alias("mgiGeneAccessionId"), + col("Human Gene Symbol").alias("humanGeneSymbol"), + col("Hgnc Acc Id").alias("hgncGeneAccessionId"), + ).distinct() + + stats_results = stats_results.join( + ortholog_mapping_df, "mgiGeneAccessionId", how="left_outer" + ) + + mp_matches_df = spark.read.csv(mp_hp_matches_csv_path, header=True) + mp_matches_df = mp_matches_df.select( + col("curie_x").alias("id"), + col("curie_y").alias("hp_term_id"), + col("label_y").alias("hp_term_name"), + ).distinct() + + stats_mp_hp_df = stats_results.select( + "statisticalResultId", + "potentialPhenotypes", + "intermediatePhenotypes", + "topLevelPhenotypes", + "significantPhenotype", + ) + for phenotype_list_col in [ + "potentialPhenotypes", + "intermediatePhenotypes", + "topLevelPhenotypes", + ]: + stats_mp_hp_df = stats_mp_hp_df.withColumn( + phenotype_list_col[:-1], explode_outer(phenotype_list_col) ) - for phenotype_list_col in [ - "potentialPhenotypes", - "intermediatePhenotypes", - "topLevelPhenotypes", - ]: - stats_mp_hp_df = stats_mp_hp_df.withColumn( - phenotype_list_col[:-1], explode_outer(phenotype_list_col) - ) - stats_mp_hp_df = stats_mp_hp_df.join( - mp_matches_df, - ( + stats_mp_hp_df = stats_mp_hp_df.join( + mp_matches_df, + ( (col("significantPhenotype.id") == col("id")) | (col("potentialPhenotype.id") == col("id")) | (col("intermediatePhenotype.id") == col("id")) | (col("topLevelPhenotype.id") == col("id")) - ), - how="left_outer", - ) - stats_mp_hp_df = stats_mp_hp_df.withColumn( - "humanPhenotype", - phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")), - ) - stats_mp_hp_df = ( - stats_mp_hp_df.groupBy("statisticalResultId") - .agg(collect_set("humanPhenotype").alias("humanPhenotypes")) - .select("statisticalResultId", "humanPhenotypes") - .distinct() - ) - - stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId") - - stats_results.write.parquet(output_path) \ No newline at end of file + ), + how="left_outer", + ) + stats_mp_hp_df = stats_mp_hp_df.withColumn( + "humanPhenotype", + phenotype_term_zip_udf(col("hp_term_id"), col("hp_term_name")), + ) + stats_mp_hp_df = ( + stats_mp_hp_df.groupBy("statisticalResultId") + .agg(collect_set("humanPhenotype").alias("humanPhenotypes")) + .select("statisticalResultId", "humanPhenotypes") + .distinct() + ) + + stats_results = stats_results.join(stats_mp_hp_df, "statisticalResultId") + + stats_results.coalesce(100).write.parquet(output_path, mode="overwrite") diff --git a/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py index 01f01362..a5d9f0c5 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_external_links_mapper.py @@ -1,259 +1,226 @@ -from impc_etl.jobs.load.impc_web_api import ( - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - StringType, - StructField, - StructType, - col, - concat, - concat_ws, - lit, - lower, - luigi, - regexp_replace, - trim, +import logging +import textwrap +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +mouse_human_ortholog_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +umass_early_lethal_report_csv_path_asset = create_input_asset("impc_web_api/umass_early_lethal_report.csv") +uniprot_report_csv_path_asset = create_input_asset("impc_web_api/uniprot_report.tsv") +morphic_report_csv_path_asset = create_input_asset("impc_web_api/morphic_report.csv") +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") + +external_links_json_asset = create_output_asset("impc_web_api/external_links_json") + +@asset.multi( + schedule=[mouse_human_ortholog_report_tsv_path_asset, umass_early_lethal_report_csv_path_asset, uniprot_report_csv_path_asset, morphic_report_csv_path_asset, gene_parquet_path_asset], + outlets=[external_links_json_asset], + dag_id=f"{dr_tag}_impc_external_links_mapper", + description=textwrap.dedent( + """IMPC Web API external links mapper DAG.""" + ), + tags=["impc_web_api", "external links"], ) - - -class ImpcExternalLinksMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcExternalLinksMapper" - - mouse_human_ortholog_report_tsv_path: luigi.Parameter = luigi.Parameter() - umass_early_lethal_report_csv_path: luigi.Parameter = luigi.Parameter() - uniprot_report_csv_path: luigi.Parameter = luigi.Parameter() - morphic_report_csv_path: luigi.Parameter = luigi.Parameter() - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [GeneLoader()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/external_links_json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input()[0].path, - self.mouse_human_ortholog_report_tsv_path, - self.umass_early_lethal_report_csv_path, - self.uniprot_report_csv_path, - self.morphic_report_csv_path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - gene_parquet_path = args[0] - mouse_human_ortholog_report_tsv_path = args[1] - umass_early_lethal_report_csv_path = args[2] - uniprot_report_csv_path = args[3] - morphic_report_csv_path = args[4] - output_path = args[5] - - gene_df = spark.read.parquet(gene_parquet_path) - mouse_human_ortholog_report_df = spark.read.csv( - mouse_human_ortholog_report_tsv_path, sep="\t", header=True - ) - umass_early_lethal_report_df = spark.read.csv( - umass_early_lethal_report_csv_path, header=True, multiLine=True - ) - uniprot_report_df = spark.read.csv( - uniprot_report_csv_path, - sep="\t", - header=False, - schema=StructType( - [ - StructField("uniprot_id", StringType(), True), - StructField("uniprot_db_id", StringType(), True), - StructField("uniprot_external_id", StringType(), True), - ] - ), - ) - morphic_report_df = spark.read.csv( - morphic_report_csv_path, - header=False, - schema=StructType( - [ - StructField("morphic_human_gene_symbol", StringType(), True), - ] - ), - ) - - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "MGI Number", "mgi_accession_id" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "Description only", "description" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "Link", concat(lit("https://"), col("Link")) - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( - "Link", "href" - ) - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "description", regexp_replace("description", "[\b\n\r]", " ") - ) - - umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( - "mgi_accession_id", - concat_ws(":", lit("MGI"), trim("mgi_accession_id")), - ).select("mgi_accession_id", "description", "href") - for col_name in mouse_human_ortholog_report_df.columns: - mouse_human_ortholog_report_df = ( - mouse_human_ortholog_report_df.withColumnRenamed( - col_name, col_name.replace(" ", "_").lower() - ) - ) - +@with_spark_session +def impc_external_links_mapper(): + from pyspark.sql import SparkSession, Window + from pyspark.sql.types import StructType, StructField, StringType + from pyspark.sql.functions import col, concat, lit, regexp_replace, trim, concat_ws + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + gene_parquet_path = gene_parquet_path_asset.uri + mouse_human_ortholog_report_tsv_path = mouse_human_ortholog_report_tsv_path_asset.uri + umass_early_lethal_report_csv_path = umass_early_lethal_report_csv_path_asset.uri + uniprot_report_csv_path = uniprot_report_csv_path_asset.uri + morphic_report_csv_path = morphic_report_csv_path_asset.uri + output_path = external_links_json_asset.uri + + gene_df = spark.read.parquet(gene_parquet_path) + mouse_human_ortholog_report_df = spark.read.csv( + mouse_human_ortholog_report_tsv_path, sep="\t", header=True + ) + umass_early_lethal_report_df = spark.read.csv( + umass_early_lethal_report_csv_path, header=True, multiLine=True + ) + uniprot_report_df = spark.read.csv( + uniprot_report_csv_path, + sep="\t", + header=False, + schema=StructType( + [ + StructField("uniprot_id", StringType(), True), + StructField("uniprot_db_id", StringType(), True), + StructField("uniprot_external_id", StringType(), True), + ] + ), + ) + morphic_report_df = spark.read.csv( + morphic_report_csv_path, + header=False, + schema=StructType( + [ + StructField("morphic_human_gene_symbol", StringType(), True), + ] + ), + ) + + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "MGI Number", "mgi_accession_id" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "Description only", "description" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "Link", concat(lit("https://"), col("Link")) + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumnRenamed( + "Link", "href" + ) + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "description", regexp_replace("description", "[\b\n\r]", " ") + ) + + umass_early_lethal_report_df = umass_early_lethal_report_df.withColumn( + "mgi_accession_id", + concat_ws(":", lit("MGI"), trim("mgi_accession_id")), + ).select("mgi_accession_id", "description", "href") + for col_name in mouse_human_ortholog_report_df.columns: mouse_human_ortholog_report_df = ( mouse_human_ortholog_report_df.withColumnRenamed( - "mgi_gene_acc_id", "mgi_gene_accession_id" + col_name, col_name.replace(" ", "_").lower() ) ) - gwas_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( - "human_gene_symbol", "mgi_gene_accession_id" - ).distinct() - - gene_mgi_accession_df = ( - gene_df.select("mgi_accession_id") - .withColumnRenamed("mgi_accession_id", "mgi_gene_accession_id") - .dropDuplicates() - ) - - gwas_external_links_df = gene_mgi_accession_df.join( - gwas_mouse_human_ortholog_report_df, "mgi_gene_accession_id" - ) - - gwas_external_links_df = gwas_external_links_df.withColumnRenamed( - "mgi_gene_accession_id", "mgiGeneAccessionId" - ) - - gwas_external_links_df = gwas_external_links_df.withColumnRenamed( - "human_gene_symbol", "label" - ) - - gwas_external_links_df = gwas_external_links_df.withColumn( - "href", - concat( - lit("https://www.ebi.ac.uk/gwas/genes/"), gwas_external_links_df.label - ), - ) - gwas_external_links_df = gwas_external_links_df.withColumn( - "providerName", lit("GWAS Catalog") - ) - - gwas_external_links_df = gwas_external_links_df.withColumn( - "description", lit(None) - ) - - embryo_data_df = gene_df.select( - "mgi_accession_id", - "marker_symbol", - ).distinct() - embryo_data_df = embryo_data_df.join( - umass_early_lethal_report_df, "mgi_accession_id" - ) - - umass_external_links_df = embryo_data_df.withColumnRenamed( - "mgi_accession_id", "mgiGeneAccessionId" - ) - umass_external_links_df = umass_external_links_df.withColumnRenamed( - "marker_symbol", "label" - ) - umass_external_links_df = umass_external_links_df.withColumn( - "providerName", lit("Mager Lab Early Lethal Phenotypes") - ) - umass_external_links_df = umass_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ) - - uniprot_external_links_df = uniprot_report_df.where( - col("uniprot_db_id") == lit("MGI") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "mgiGeneAccessionId", col("uniprot_external_id") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumnRenamed( - "uniprot_id", "label" - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "providerName", lit("UniProt") - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "href", - concat(lit("https://www.uniprot.org/uniprotkb/"), col("label")), - ) - uniprot_external_links_df = uniprot_external_links_df.withColumn( - "description", lit(None) - ) - uniprot_external_links_df = uniprot_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ).distinct() - - morphic_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( - "human_gene_symbol", "mgi_gene_accession_id", "hgnc_acc_id" - ).distinct() - - morphic_external_links_df = gene_mgi_accession_df.join( - morphic_mouse_human_ortholog_report_df, "mgi_gene_accession_id" - ) - - morphic_external_links_df = morphic_external_links_df.join( - morphic_report_df, - col("human_gene_symbol") == col("morphic_human_gene_symbol"), - ) - - morphic_external_links_df = morphic_external_links_df.withColumnRenamed( - "mgi_gene_accession_id", "mgiGeneAccessionId" - ) - - morphic_external_links_df = morphic_external_links_df.withColumnRenamed( - "human_gene_symbol", "label" - ) - - morphic_external_links_df = morphic_external_links_df.withColumn( - "href", - concat(lit("https://morphic.bio/genes/"), col("hgnc_acc_id"), lit("/")), - ) - morphic_external_links_df = morphic_external_links_df.withColumn( - "providerName", lit("MorPhiC Program") - ) - - morphic_external_links_df = morphic_external_links_df.withColumn( - "description", lit(None) - ) - - morphic_external_links_df = morphic_external_links_df.select( - "mgiGeneAccessionId", "label", "href", "providerName", "description" - ).distinct() - - external_links_df = ( - gwas_external_links_df.union(umass_external_links_df) - .union(uniprot_external_links_df) - .union(morphic_external_links_df) - ) - external_links_df.write.json(output_path, mode="overwrite") \ No newline at end of file + mouse_human_ortholog_report_df = ( + mouse_human_ortholog_report_df.withColumnRenamed( + "mgi_gene_acc_id", "mgi_gene_accession_id" + ) + ) + + gwas_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( + "human_gene_symbol", "mgi_gene_accession_id" + ).distinct() + + gene_mgi_accession_df = ( + gene_df.select("mgi_accession_id") + .withColumnRenamed("mgi_accession_id", "mgi_gene_accession_id") + .dropDuplicates() + ) + + gwas_external_links_df = gene_mgi_accession_df.join( + gwas_mouse_human_ortholog_report_df, "mgi_gene_accession_id" + ) + + gwas_external_links_df = gwas_external_links_df.withColumnRenamed( + "mgi_gene_accession_id", "mgiGeneAccessionId" + ) + + gwas_external_links_df = gwas_external_links_df.withColumnRenamed( + "human_gene_symbol", "label" + ) + + gwas_external_links_df = gwas_external_links_df.withColumn( + "href", + concat( + lit("https://www.ebi.ac.uk/gwas/genes/"), gwas_external_links_df.label + ), + ) + gwas_external_links_df = gwas_external_links_df.withColumn( + "providerName", lit("GWAS Catalog") + ) + + gwas_external_links_df = gwas_external_links_df.withColumn( + "description", lit(None) + ) + + embryo_data_df = gene_df.select( + "mgi_accession_id", + "marker_symbol", + ).distinct() + embryo_data_df = embryo_data_df.join( + umass_early_lethal_report_df, "mgi_accession_id" + ) + + umass_external_links_df = embryo_data_df.withColumnRenamed( + "mgi_accession_id", "mgiGeneAccessionId" + ) + umass_external_links_df = umass_external_links_df.withColumnRenamed( + "marker_symbol", "label" + ) + umass_external_links_df = umass_external_links_df.withColumn( + "providerName", lit("Mager Lab Early Lethal Phenotypes") + ) + umass_external_links_df = umass_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ) + + uniprot_external_links_df = uniprot_report_df.where( + col("uniprot_db_id") == lit("MGI") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "mgiGeneAccessionId", col("uniprot_external_id") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumnRenamed( + "uniprot_id", "label" + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "providerName", lit("UniProt") + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "href", + concat(lit("https://www.uniprot.org/uniprotkb/"), col("label")), + ) + uniprot_external_links_df = uniprot_external_links_df.withColumn( + "description", lit(None) + ) + uniprot_external_links_df = uniprot_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ).distinct() + + morphic_mouse_human_ortholog_report_df = mouse_human_ortholog_report_df.select( + "human_gene_symbol", "mgi_gene_accession_id", "hgnc_acc_id" + ).distinct() + + morphic_external_links_df = gene_mgi_accession_df.join( + morphic_mouse_human_ortholog_report_df, "mgi_gene_accession_id" + ) + + morphic_external_links_df = morphic_external_links_df.join( + morphic_report_df, + col("human_gene_symbol") == col("morphic_human_gene_symbol"), + ) + + morphic_external_links_df = morphic_external_links_df.withColumnRenamed( + "mgi_gene_accession_id", "mgiGeneAccessionId" + ) + + morphic_external_links_df = morphic_external_links_df.withColumnRenamed( + "human_gene_symbol", "label" + ) + + morphic_external_links_df = morphic_external_links_df.withColumn( + "href", + concat(lit("https://morphic.bio/genes/"), col("hgnc_acc_id"), lit("/")), + ) + morphic_external_links_df = morphic_external_links_df.withColumn( + "providerName", lit("MorPhiC Program") + ) + + morphic_external_links_df = morphic_external_links_df.withColumn( + "description", lit(None) + ) + + morphic_external_links_df = morphic_external_links_df.select( + "mgiGeneAccessionId", "label", "href", "providerName", "description" + ).distinct() + + external_links_df = ( + gwas_external_links_df.union(umass_external_links_df) + .union(uniprot_external_links_df) + .union(morphic_external_links_df) + ) + external_links_df.write.json(output_path, mode="overwrite") diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py index 888b0e28..87dd658c 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_diseases_mapper.py @@ -1,130 +1,100 @@ -from impc_etl.jobs.load.impc_web_api import ( - BooleanType, - DoubleType, - ImpcConfig, - IntegerType, - PySparkTask, - SparkContext, - SparkSession, - Window, - col, - desc, - luigi, - row_number, - to_camel_case, +import logging +import textwrap +from airflow.sdk import Variable, asset + +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import to_camel_case + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +disease_model_summary_csv_path_asset = create_input_asset("impc_web_api/disease_model_summary.csv") +mouse_model_phenodigm_csv_path_asset = create_input_asset("impc_web_api/mouse_model_phenodigm.csv") +disease_phenodigm_csv_path_asset = create_input_asset("impc_web_api/disease_phenodigm.csv") +gene_diseases_service_json_asset = create_output_asset("impc_web_api/gene_diseases_service_json") + +@asset.multi( + schedule=[disease_model_summary_csv_path_asset, mouse_model_phenodigm_csv_path_asset, disease_phenodigm_csv_path_asset], + outlets=[gene_diseases_service_json_asset], + dag_id=f"{dr_tag}_impc_gene_diseases_mapper", + description=textwrap.dedent( + """IMPC Web API gene diseases mapper DAG.""" + ), + tags=["impc_web_api", "diseases"], ) - - -class ImpcGeneDiseasesMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcGeneDiseasesMapper" - - #: Path to the CSV gene disease association report - disease_model_summary_csv_path = luigi.Parameter() - - #: Path to the CSV gene disease association report - mouse_model_phenodigm_csv_path = luigi.Parameter() - - #: Path to the CSV gene disease association report - disease_phenodigm_csv_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/gene_diseases_service_json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.disease_model_summary_csv_path, - self.mouse_model_phenodigm_csv_path, - self.disease_phenodigm_csv_path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - disease_model_summary_csv_path = args[0] - mouse_model_phenodigm_csv_path = args[1] # model_id, model_phenotypes - disease_phenodigm_csv_path = args[2] # disease_id, disease_phenotypes - output_path = args[3] - - disease_df = spark.read.csv(disease_model_summary_csv_path, header=True).drop( - "disease_phenotypes", "model_phenotypes" - ) - mouse_model_df = spark.read.csv( - mouse_model_phenodigm_csv_path, header=True - ).select("model_id", "model_phenotypes") - disease_phenodigm_df = spark.read.csv( - disease_phenodigm_csv_path, header=True - ).select("disease_id", "disease_phenotypes") - - disease_df = disease_df.withColumn( - "phenodigm_score", - (col("disease_model_avg_norm") + col("disease_model_max_norm")) / 2, - ) - - disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") - disease_df = disease_df.join(mouse_model_df, "model_id", "left_outer") - - window_spec = Window.partitionBy("disease_id", "marker_id").orderBy( - col("phenodigm_score").desc() - ) - - max_disease_df = disease_df.withColumn( - "row_number", row_number().over(window_spec) - ) - - max_disease_df = max_disease_df.withColumn( - "isMaxPhenodigmScore", col("row_number") == 1 - ).drop("row_number") - +@with_spark_session +def impc_gene_diseases_mapper(): + from pyspark.sql import SparkSession, Window + from pyspark.sql.types import BooleanType, DoubleType, IntegerType + from pyspark.sql.functions import col, row_number + + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + disease_model_summary_csv_path = disease_model_summary_csv_path_asset.uri + mouse_model_phenodigm_csv_path = mouse_model_phenodigm_csv_path_asset.uri # model_id, model_phenotypes + disease_phenodigm_csv_path = disease_phenodigm_csv_path_asset.uri # disease_id, disease_phenotypes + output_path = gene_diseases_service_json_asset.uri + + disease_df = spark.read.csv(disease_model_summary_csv_path, header=True).drop( + "disease_phenotypes", "model_phenotypes" + ) + mouse_model_df = spark.read.csv( + mouse_model_phenodigm_csv_path, header=True + ).select("model_id", "model_phenotypes") + disease_phenodigm_df = spark.read.csv( + disease_phenodigm_csv_path, header=True + ).select("disease_id", "disease_phenotypes") + + disease_df = disease_df.withColumn( + "phenodigm_score", + (col("disease_model_avg_norm").cast(DoubleType()) + col("disease_model_max_norm").cast(DoubleType())) / 2, + ) + + disease_df = disease_df.join(disease_phenodigm_df, "disease_id", "left_outer") + disease_df = disease_df.join(mouse_model_df, "model_id", "left_outer") + + window_spec = Window.partitionBy("disease_id", "marker_id").orderBy( + col("phenodigm_score").desc() + ) + + max_disease_df = disease_df.withColumn( + "row_number", row_number().over(window_spec) + ) + + max_disease_df = max_disease_df.withColumn( + "isMaxPhenodigmScore", col("row_number") == 1 + ).drop("row_number") + + max_disease_df = max_disease_df.withColumnRenamed( + "marker_id", "mgiGeneAccessionId" + ) + + for col_name in max_disease_df.columns: max_disease_df = max_disease_df.withColumnRenamed( - "marker_id", "mgiGeneAccessionId" + col_name, to_camel_case(col_name) ) - for col_name in max_disease_df.columns: - max_disease_df = max_disease_df.withColumnRenamed( - col_name, to_camel_case(col_name) - ) - - double_cols = [ - "diseaseModelAvgNorm", - "diseaseModelAvgRaw", - "diseaseModelMaxRaw", - "diseaseModelMaxNorm", - ] - - for col_name in double_cols: - max_disease_df = max_disease_df.withColumn( - col_name, col(col_name).astype(DoubleType()) - ) + double_cols = [ + "diseaseModelAvgNorm", + "diseaseModelAvgRaw", + "diseaseModelMaxRaw", + "diseaseModelMaxNorm", + ] + for col_name in double_cols: max_disease_df = max_disease_df.withColumn( - "markerNumModels", col("markerNumModels").astype(IntegerType()) - ) - max_disease_df = max_disease_df.withColumn( - "associationCurated", col("associationCurated").astype(BooleanType()) + col_name, col(col_name).astype(DoubleType()) ) - max_disease_df.repartition(500).write.option("ignoreNullFields", "false").json( - output_path - ) \ No newline at end of file + max_disease_df = max_disease_df.withColumn( + "markerNumModels", col("markerNumModels").astype(IntegerType()) + ) + max_disease_df = max_disease_df.withColumn( + "associationCurated", col("associationCurated").astype(BooleanType()) + ) + + max_disease_df.coalesce(100).write.option("ignoreNullFields", "false").json( + output_path + ) \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py index b6ac0fde..b12e1dd5 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_gene_search_mapper.py @@ -1,78 +1,63 @@ -from impc_etl.jobs.load.impc_web_api import ( - GENE_SUMMARY_MAPPINGS, - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - luigi, - to_camel_case, +import logging +import textwrap +from airflow.sdk import Variable, asset + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session +from impc_etl.utils.impc_web_api import GENE_SUMMARY_MAPPINGS + +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") +gene_search_service_json_asset = create_output_asset("impc_web_api/gene_search_service_json") + +@asset.multi( + schedule=[gene_parquet_path_asset], + outlets=[gene_search_service_json_asset], + dag_id=f"{dr_tag}_impc_gene_search_mapper", + description=textwrap.dedent( + """IMPC Web API gene search mapper DAG.""" + ), + tags=["impc_web_api", "gene search"], ) - - -class ImpcGeneSearchMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "Impc_Gene_Search_Mapper" - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return GeneLoader() - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/gene_search_service_json" +@with_spark_session +def impc_gene_search_mapper(): + from pyspark.sql import SparkSession + + def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) + + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + gene_parquet_path = gene_parquet_path_asset.uri + gene_df = spark.read.parquet(gene_parquet_path) + output_path = gene_search_service_json_asset.uri + + for col_name in GENE_SUMMARY_MAPPINGS.keys(): + gene_df = gene_df.withColumnRenamed( + col_name, GENE_SUMMARY_MAPPINGS[col_name] ) - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input().path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - gene_parquet_path = args[0] - gene_df = spark.read.parquet(gene_parquet_path) - output_path = args[1] - - for col_name in GENE_SUMMARY_MAPPINGS.keys(): - gene_df = gene_df.withColumnRenamed( - col_name, GENE_SUMMARY_MAPPINGS[col_name] - ) - - for col_name in gene_df.columns: - gene_df = gene_df.withColumnRenamed(col_name, to_camel_case(col_name)) - - gene_search_df = gene_df.select( - "mgiGeneAccessionId", - "geneName", - "geneSymbol", - "synonyms", - "humanGeneSymbols", - "humanSymbolSynonyms", - "esCellProductionStatus", - "mouseProductionStatus", - "phenotypeStatus", - "phenotypingDataAvailable", - ) - gene_search_df.repartition(1).write.option("ignoreNullFields", "false").json( - output_path - ) \ No newline at end of file + for col_name in gene_df.columns: + gene_df = gene_df.withColumnRenamed(col_name, to_camel_case(col_name)) + + gene_search_df = gene_df.select( + "mgiGeneAccessionId", + "geneName", + "geneSymbol", + "synonyms", + "humanGeneSymbols", + "humanSymbolSynonyms", + "esCellProductionStatus", + "mouseProductionStatus", + "phenotypeStatus", + "phenotypingDataAvailable", + ) + gene_search_df.repartition(1).write.option("ignoreNullFields", "false").mode("overwrite").json( + output_path + ) diff --git a/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py index fe81d966..4f02eb59 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_idg_mapper.py @@ -1,93 +1,71 @@ -from impc_etl.jobs.load.impc_web_api import ( - GeneLoader, - ImpcConfig, - PySparkTask, - SparkContext, - SparkSession, - col, - luigi, +import logging +import textwrap +from airflow.sdk import Variable, asset +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session + +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") + +gene_parquet_path_asset = create_input_asset("output/gene_data_include_parquet") +idg_family_mapping_report_json_path_asset = create_input_asset("impc_web_api/IDG_TargetList_CurrentVersion.json") +ortholog_mapping_report_tsv_path_asset = create_input_asset("impc_web_api/mouse_human_ortholog_report.tsv") +idg_landing_json_asset = create_output_asset("impc_web_api/idg_landing.json") + +@asset.multi( + schedule=[idg_family_mapping_report_json_path_asset], + outlets=[idg_landing_json_asset], + dag_id=f"{dr_tag}_impc_idg_landing_mapper", + description=textwrap.dedent( + """IMPC Web API IDG landing page mapper DAG.""" + ), + tags=["impc_web_api", "idg", "landing-page"], ) +@with_spark_session +def impc_idg_mapper(): + import json + from pyspark.sql import SparkSession + from pyspark.sql.functions import col + from urllib.parse import unquote, urlparse + + spark = SparkSession.builder.getOrCreate() + + # Parsing app options + idg_family_mapping_report_json_path = idg_family_mapping_report_json_path_asset.uri + ortholog_mapping_report_tsv_path = ortholog_mapping_report_tsv_path_asset.uri + gene_parquet_path = gene_parquet_path_asset.uri + + idg_family_df = spark.read.json(idg_family_mapping_report_json_path) + ortholog_mapping_df = spark.read.csv( + ortholog_mapping_report_tsv_path, sep="\t", header=True + ) + gene_df = spark.read.parquet(gene_parquet_path) + + gene_df = gene_df.select( + "mgi_accession_id", + "marker_symbol", + "significant_top_level_mp_terms", + "not_significant_top_level_mp_terms", + "phenotype_status", + "mouse_production_status", + "es_cell_production_status", + ).distinct() + + ortholog_mapping_df = ortholog_mapping_df.select( + col("Mgi Gene Acc Id").alias("mgi_accession_id"), + col("Human Gene Symbol").alias("human_gene_symbol"), + ).distinct() + + gene_df = gene_df.join( + ortholog_mapping_df, + "mgi_accession_id", + ) + idg_family_df = idg_family_df.withColumnRenamed("Gene", "human_gene_symbol") + idg_family_df = idg_family_df.withColumnRenamed("IDGFamily", "idg_family") + gene_df = gene_df.join(idg_family_df, "human_gene_symbol") + + idg_landing_json = gene_df.rdd.map(lambda row: row.asDict(True)).collect() + output_path = unquote(urlparse(idg_landing_json_asset.uri).path) + with open(output_path, "w") as output_file: + output_file.write(json.dumps(idg_landing_json)) - -class ImpcIDGMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ - - #: Name of the Spark task - name: str = "ImpcIDGMapper" - - ortholog_mapping_report_tsv_path = luigi.Parameter() - idg_family_mapping_report_json_path = luigi.Parameter() - - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() - - def requires(self): - return [GeneLoader()] - - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/idg_landing.json" - ) - - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.idg_family_mapping_report_json_path, - self.ortholog_mapping_report_tsv_path, - self.input()[0].path, - self.output().path, - ] - - def main(self, sc: SparkContext, *args): - """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. - """ - spark = SparkSession(sc) - - # Parsing app options - idg_family_mapping_report_json_path = args[0] - ortholog_mapping_report_tsv_path = args[1] - gene_parquet_path = args[2] - output_path = args[3] - - idg_family_df = spark.read.json(idg_family_mapping_report_json_path) - ortholog_mapping_df = spark.read.csv( - ortholog_mapping_report_tsv_path, sep="\t", header=True - ) - gene_df = spark.read.parquet(gene_parquet_path) - - gene_df = gene_df.select( - "mgi_accession_id", - "marker_symbol", - "significant_top_level_mp_terms", - "not_significant_top_level_mp_terms", - "phenotype_status", - "mouse_production_status", - "es_cell_production_status", - ).distinct() - - ortholog_mapping_df = ortholog_mapping_df.select( - col("Mgi Gene Acc Id").alias("mgi_accession_id"), - col("Human Gene Symbol").alias("human_gene_symbol"), - ).distinct() - - gene_df = gene_df.join( - ortholog_mapping_df, - "mgi_accession_id", - ) - idg_family_df = idg_family_df.withColumnRenamed("Gene", "human_gene_symbol") - idg_family_df = idg_family_df.withColumnRenamed("IDGFamily", "idg_family") - gene_df = gene_df.join(idg_family_df, "human_gene_symbol") - - idg_landing_json = gene_df.rdd.map(lambda row: row.asDict(True)).collect() - - with open(output_path, mode="w") as output_file: - output_file.write(json.dumps(idg_landing_json)) \ No newline at end of file diff --git a/impc_etl/jobs/load/impc_web_api/impc_pathology_datasets_mapper.py b/impc_etl/jobs/load/impc_web_api/impc_pathology_datasets_mapper.py index 7f31e362..a955393e 100644 --- a/impc_etl/jobs/load/impc_web_api/impc_pathology_datasets_mapper.py +++ b/impc_etl/jobs/load/impc_web_api/impc_pathology_datasets_mapper.py @@ -1,158 +1,148 @@ -from impc_etl.jobs.load.impc_web_api import ( - ExperimentToObservationMapper, - ImpcConfig, - ImpressToParameterMapper, - PySparkTask, - SparkContext, - SparkSession, - arrays_zip, - col, - collect_set, - luigi, - struct, - to_camel_case, -) +""" + PySpark task to create the JSON for the pathology_service + of the website from the observations_parquet and impress_parameter_parquet (pipeline_parquet). +""" +import logging +import textwrap +from airflow.sdk import Variable, asset -class ImpcPathologyDatasetsMapper(PySparkTask): - """ - PySpark Task class to parse GenTar Product report data. - """ +from impc_etl.utils.airflow import create_input_asset, create_output_asset +from impc_etl.utils.spark import with_spark_session - #: Name of the Spark task - name: str = "ImpcPathologyDatasetsMapper" - #: Path of the output directory where the new parquet file will be generated. - output_path: luigi.Parameter = luigi.Parameter() +task_logger = logging.getLogger("airflow.task") +dr_tag = Variable.get("data_release_tag") - def requires(self): - return [ExperimentToObservationMapper(), ImpressToParameterMapper()] +observations_parquet_path_asset = create_input_asset("output/observations_parquet") +impress_parameter_parquet_asset = create_input_asset("output/impress_parameter_parquet") - def output(self): - """ - Returns the full parquet path as an output for the Luigi Task - (e.g. impc/dr15.2/parquet/product_report_parquet) - """ - return ImpcConfig().get_target( - f"{self.output_path}/impc_web_api/pathology_service_json" - ) +pathology_service_output_asset = create_output_asset("impc_web_api/pathology_service_json") - def app_options(self): - """ - Generates the options pass to the PySpark job - """ - return [ - self.input()[0].path, - self.input()[1].path, - self.output().path, - ] - def main(self, sc: SparkContext, *args): +@asset.multi( + schedule=[observations_parquet_path_asset + ,], + outlets=[pathology_service_output_asset], + dag_id=f"{dr_tag}_impc_pathology_datasets_mapper", + description=textwrap.dedent( """ - Takes in a SparkContext and the list of arguments generated by `app_options` and executes the PySpark job. + PySpark task to create the JSON for the pathology_service + of the website from the observations_parquet and parameter_parquet (pipeline_parquet). """ - spark = SparkSession(sc) + ), + tags=["impc_web_api"], +) +@with_spark_session +def impc_pathology_datasets_mapper(): - # Parsing app options - observations_parquet_path = args[0] - parameter_parquet_path = args[1] - output_path = args[2] + from impc_etl.utils.impc_web_api import to_camel_case - observations_df = spark.read.parquet(observations_parquet_path) - parameter_df = ( - spark.read.parquet(parameter_parquet_path) - .select( - "pipeline_stable_id", - "pipeline_stable_key", - "procedure_stable_id", - "procedure_stable_key", - "parameter_stable_id", - "parameter_stable_key", - ) - .distinct() - ) - observations_df = observations_df.join( - parameter_df, - [ - "pipeline_stable_id", - "procedure_stable_id", - "parameter_stable_id", - ], - ) - pathology_datasets_cols = [ - "gene_accession_id", - "gene_symbol", - "allele_accession_id", - "allele_symbol", - "zygosity", + from pyspark.sql import SparkSession + from pyspark.sql.functions import ( + col, + struct, + collect_set, + arrays_zip, + ) + + spark = SparkSession.builder.getOrCreate() + + observations_df = spark.read.parquet(observations_parquet_path_asset.uri) + parameter_df = ( + spark.read.parquet(impress_parameter_parquet_asset.uri) + .select( "pipeline_stable_id", "pipeline_stable_key", "procedure_stable_id", "procedure_stable_key", - "procedure_name", "parameter_stable_id", "parameter_stable_key", - "parameter_name", - "life_stage_name", - "sub_term_id", - "sub_term_name", - "text_value", - "category", - "data_point", - "external_sample_id", - "phenotyping_center", - "metadata", - "strain_name", - "strain_accession_id", - ] - observations_df = observations_df.select(*pathology_datasets_cols) - pathology_datasets_df = observations_df.where( - col("parameter_stable_id").contains("PAT") ) + .distinct() + ) + observations_df = observations_df.join( + parameter_df, + [ + "pipeline_stable_id", + "procedure_stable_id", + "parameter_stable_id", + ], + ) + pathology_datasets_cols = [ + "gene_accession_id", + "gene_symbol", + "allele_accession_id", + "allele_symbol", + "zygosity", + "pipeline_stable_id", + "pipeline_stable_key", + "procedure_stable_id", + "procedure_stable_key", + "procedure_name", + "parameter_stable_id", + "parameter_stable_key", + "parameter_name", + "life_stage_name", + "sub_term_id", + "sub_term_name", + "text_value", + "category", + "data_point", + "external_sample_id", + "phenotyping_center", + "metadata", + "strain_name", + "strain_accession_id", + ] + observations_df = observations_df.select(*pathology_datasets_cols) + pathology_datasets_df = observations_df.where( + col("parameter_stable_id").contains("PAT") + ) + pathology_datasets_df = pathology_datasets_df.withColumnRenamed( + "gene_accession_id", "mgi_gene_accession_id" + ) + for col_name in pathology_datasets_df.columns: pathology_datasets_df = pathology_datasets_df.withColumnRenamed( - "gene_accession_id", "mgi_gene_accession_id" + col_name, to_camel_case(col_name) ) - for col_name in pathology_datasets_df.columns: - pathology_datasets_df = pathology_datasets_df.withColumnRenamed( - col_name, to_camel_case(col_name) - ) - pathology_datasets_df = pathology_datasets_df.withColumnRenamed( - "subTermId", "termId" - ) - pathology_datasets_df = pathology_datasets_df.withColumnRenamed( - "subTermName", "termName" - ) + pathology_datasets_df = pathology_datasets_df.withColumnRenamed( + "subTermId", "termId" + ) + pathology_datasets_df = pathology_datasets_df.withColumnRenamed( + "subTermName", "termName" + ) - pathology_datasets_df = pathology_datasets_df.withColumnRenamed( - "external_sample_id", "sampleId" - ) + pathology_datasets_df = pathology_datasets_df.withColumnRenamed( + "external_sample_id", "sampleId" + ) - pathology_datasets_df = pathology_datasets_df.withColumn( - "ontologyTerms", arrays_zip("termId", "termName") - ) - pathology_datasets_df = pathology_datasets_df.drop("termId", "termName") - common_columns = [ - "mgiGeneAccessionId", - "geneSymbol", - "pipelineStableId", - "pipelineStableKey", - "procedureStableId", - "procedureStableKey", - "procedureName", - "parameterStableId", - "parameterStableKey", - "parameterName", - ] - pathology_datasets_df = pathology_datasets_df.groupBy(*common_columns).agg( - collect_set( - struct( - *[ - to_camel_case(col_name) - for col_name in pathology_datasets_df.columns - if col_name not in common_columns - ] - ) - ).alias("datasets") - ) - pathology_datasets_df.write.json(output_path, mode="overwrite") \ No newline at end of file + pathology_datasets_df = pathology_datasets_df.withColumn( + "ontologyTerms", arrays_zip("termId", "termName") + ) + pathology_datasets_df = pathology_datasets_df.drop("termId", "termName") + common_columns = [ + "mgiGeneAccessionId", + "geneSymbol", + "pipelineStableId", + "pipelineStableKey", + "procedureStableId", + "procedureStableKey", + "procedureName", + "parameterStableId", + "parameterStableKey", + "parameterName", + ] + pathology_datasets_df = pathology_datasets_df.groupBy(*common_columns).agg( + collect_set( + struct( + *[ + to_camel_case(col_name) + for col_name in pathology_datasets_df.columns + if col_name not in common_columns + ] + ) + ).alias("datasets") + ) + pathology_datasets_df.write.json(pathology_service_output_asset.uri, mode="overwrite") \ No newline at end of file diff --git a/impc_etl/utils/impc_web_api.py b/impc_etl/utils/impc_web_api.py new file mode 100644 index 00000000..a5522a35 --- /dev/null +++ b/impc_etl/utils/impc_web_api.py @@ -0,0 +1,28 @@ + +GENE_SUMMARY_MAPPINGS = { + "mgi_accession_id": "mgiGeneAccessionId", + "marker_symbol": "geneSymbol", + "marker_name": "geneName", + "marker_synonym": "synonyms", + "significant_top_level_mp_terms": "significantTopLevelPhenotypes", + "not_significant_top_level_mp_terms": "notSignificantTopLevelPhenotypes", + "embryo_data_available": "hasEmbryoImagingData", + "human_gene_symbol": "human_gene_symbols", + "human_symbol_synonym": "human_symbol_synonyms", + "production_centre": "production_centres", + "phenotyping_centre": "phenotyping_centres", + "allele_name": "allele_names", + "ensembl_gene_id": "ensembl_gene_ids", +} + +def to_camel_case(snake_str): + components = snake_str.split("_") + # We capitalize the first letter of each component except the first one + # with the 'title' method and join them together. + return components[0] + "".join(x.title() for x in components[1:]) + +def phenotype_term_zip_udf(x, y): + from pyspark.sql.functions import lit, struct, when + return when(x.isNotNull(), struct(x.alias("id"), y.alias("name"))).otherwise( + lit(None) + ) \ No newline at end of file