Skip to content

Commit

Permalink
Merge pull request #329 from microbiomedata/issue-315
Browse files Browse the repository at this point in the history
Updates to submission portal jobs to handle sequencing data
  • Loading branch information
pkalita-lbl committed Oct 19, 2023
2 parents 96befc1 + b88bb77 commit 46d6543
Show file tree
Hide file tree
Showing 10 changed files with 557 additions and 12,502 deletions.
32 changes: 28 additions & 4 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
nmdc_schema_database_export_filename_neon,
get_neon_pipeline_mms_data_product,
get_neon_pipeline_sls_data_product,
get_submission_portal_pipeline_inputs,
get_csv_rows_from_url,
)


Expand Down Expand Up @@ -134,8 +136,19 @@ def gold_study_to_database():

@graph
def translate_metadata_submission_to_nmdc_schema_database():
metadata_submission = fetch_nmdc_portal_submission_by_id()
database = translate_portal_submission_to_nmdc_schema_database(metadata_submission)
(
submission_id,
omics_processing_mapping_file_url,
data_object_mapping_file_url,
) = get_submission_portal_pipeline_inputs()

metadata_submission = fetch_nmdc_portal_submission_by_id(submission_id)
omics_processing_mapping = get_csv_rows_from_url(omics_processing_mapping_file_url)
data_object_mapping = get_csv_rows_from_url(data_object_mapping_file_url)

database = translate_portal_submission_to_nmdc_schema_database(
metadata_submission, omics_processing_mapping, data_object_mapping
)

validate_metadata(database)

Expand All @@ -147,8 +160,19 @@ def translate_metadata_submission_to_nmdc_schema_database():

@graph
def ingest_metadata_submission():
metadata_submission = fetch_nmdc_portal_submission_by_id()
database = translate_portal_submission_to_nmdc_schema_database(metadata_submission)
(
submission_id,
omics_processing_mapping_file_url,
data_object_mapping_file_url,
) = get_submission_portal_pipeline_inputs()

metadata_submission = fetch_nmdc_portal_submission_by_id(submission_id)
omics_processing_mapping = get_csv_rows_from_url(omics_processing_mapping_file_url)
data_object_mapping = get_csv_rows_from_url(data_object_mapping_file_url)

database = translate_portal_submission_to_nmdc_schema_database(
metadata_submission, omics_processing_mapping, data_object_mapping
)
run_id = submit_metadata_to_db(database)
poll_for_run_completion(run_id)

Expand Down
67 changes: 62 additions & 5 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import csv
import json
import mimetypes
import os
Expand All @@ -6,8 +7,10 @@
from collections import defaultdict
from datetime import datetime, timezone
from io import BytesIO
from typing import Tuple
from zipfile import ZipFile
import pandas as pd
import requests

from bson import ObjectId, json_util
from dagster import (
Expand All @@ -25,6 +28,7 @@
RetryRequested,
String,
op,
Optional,
)
from gridfs import GridFS
from linkml_runtime.dumpers import json_dumper
Expand Down Expand Up @@ -77,7 +81,7 @@
from pymongo.database import Database as MongoDatabase
from starlette import status
from terminusdb_client.woqlquery import WOQLQuery as WQ
from toolz import assoc, dissoc, get_in
from toolz import assoc, dissoc, get_in, valfilter, identity


@op
Expand Down Expand Up @@ -638,12 +642,34 @@ def id_minter(*args, **kwargs):
return database


@op(
config_schema={
"submission_id": str,
"omics_processing_mapping_file_url": str,
"data_object_mapping_file_url": str,
},
out={
"submission_id": Out(),
"omics_processing_mapping_file_url": Out(),
"data_object_mapping_file_url": Out(),
},
)
def get_submission_portal_pipeline_inputs(
context: OpExecutionContext,
) -> Tuple[str, str, str]:
return (
context.op_config["submission_id"],
context.op_config["omics_processing_mapping_file_url"],
context.op_config["data_object_mapping_file_url"],
)


@op(
required_resource_keys={"nmdc_portal_api_client"},
config_schema={"submission_id": str},
)
def fetch_nmdc_portal_submission_by_id(context: OpExecutionContext) -> Dict[str, Any]:
submission_id = context.op_config["submission_id"]
def fetch_nmdc_portal_submission_by_id(
context: OpExecutionContext, submission_id: str
) -> Dict[str, Any]:
client: NmdcPortalApiClient = context.resources.nmdc_portal_api_client
return client.fetch_metadata_submission(submission_id)

Expand All @@ -652,14 +678,21 @@ def fetch_nmdc_portal_submission_by_id(context: OpExecutionContext) -> Dict[str,
def translate_portal_submission_to_nmdc_schema_database(
context: OpExecutionContext,
metadata_submission: Dict[str, Any],
omics_processing_mapping: List,
data_object_mapping: List,
) -> nmdc.Database:
client: RuntimeApiSiteClient = context.resources.runtime_api_site_client

def id_minter(*args, **kwargs):
response = client.mint_id(*args, **kwargs)
return response.json()

translator = SubmissionPortalTranslator(metadata_submission, id_minter=id_minter)
translator = SubmissionPortalTranslator(
metadata_submission,
omics_processing_mapping,
data_object_mapping,
id_minter=id_minter,
)
database = translator.get_database()
return database

Expand Down Expand Up @@ -773,3 +806,27 @@ def id_minter(*args, **kwargs):
@op
def nmdc_schema_database_export_filename_neon() -> str:
return "database_from_neon_metadata.json"


@op
def get_csv_rows_from_url(url: str) -> List[Dict]:
"""Download and parse a CSV file from a remote URL.
This method fetches data from the given URL and parses that data as CSV. The parsed data
is returned as a list (each element corresponds to a row) of dicts (each key is a column
name and the value is the corresponding cell value). The dict will *not* contain keys
for columns where the cell was empty.
:param url: Url to fetch and parse
:return: List[Dict]
"""
if not url:
return []

response = requests.get(url)
response.raise_for_status()

reader = csv.DictReader(response.text.splitlines())
# Collect all the rows into a list of dicts while stripping out (valfilter) cells where the
# value is an empty string (identity returns a Falsy value).
return [valfilter(identity, row) for row in reader]
16 changes: 12 additions & 4 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,12 @@ def biosample_submission_ingest():
),
"ops": {
"export_json_to_drs": {"config": {"username": "..."}},
"fetch_nmdc_portal_submission_by_id": {
"config": {"submission_id": "..."}
"get_submission_portal_pipeline_inputs": {
"config": {
"submission_id": "",
"omics_processing_mapping_file_url": "",
"data_object_mapping_file_url": "",
}
},
},
},
Expand All @@ -542,8 +546,12 @@ def biosample_submission_ingest():
},
),
"ops": {
"fetch_nmdc_portal_submission_by_id": {
"config": {"submission_id": "..."}
"get_submission_portal_pipeline_inputs": {
"config": {
"submission_id": "",
"omics_processing_mapping_file_url": "",
"data_object_mapping_file_url": "",
}
},
},
},
Expand Down

0 comments on commit 46d6543

Please sign in to comment.