In [33]:
import concurrent
import os
from pathlib import Path
from time import time

import jsonschema
import requests
from mongospawn.schema import dbschema_from_file, collschemas_for
from pymongo import MongoClient, ReplaceOne
from toolz import keyfilter
from tqdm.notebook import tqdm

nmdc_schema_json_path = str(
    Path.cwd().parent.parent.joinpath("schema", "nmdc.schema.json")
)
dbschema = dbschema_from_file(nmdc_schema_json_path)
collschemas = collschemas_for(dbschema)


def reset_database(db):
    for coll_name in collschemas:
        db.drop_collection(coll_name)
        db.create_collection(
            coll_name, validator={"$jsonSchema": collschemas[coll_name]}
        )
        db[coll_name].create_index("id", unique=True)


def jsonschema_for(collection_name=None):
    if collection_name not in set(dbschema["properties"]):
        raise ValueError(
            f'collection_name must be one of {set(dbschema["properties"])}'
        )
    defn = dbschema["properties"][collection_name]["items"]["$ref"].split("/")[-1]
    return dbschema["definitions"][defn]


def validator_for(collection):
    return collection.options()["validator"]["$jsonSchema"]


def pick(whitelist, d):
    return keyfilter(lambda k: k in whitelist, d)


def conform(doc, collection_name=None):
    """Provides limited, conservative conformance on a docments.

    - If additionalProperties is False, omit any supplied.
    - If a field must be a list of strings, and a lone string is supplied, wrap it in a list.

    """
    if collection_name not in set(dbschema["properties"]):
        raise ValueError(
            f'collection_name must be one of {set(dbschema["properties"])}'
        )
    defn = dbschema["properties"][collection_name]["items"]["$ref"].split("/")[-1]
    schema = dbschema["definitions"][defn]
    if schema.get("additionalProperties") is False:
        doc = pick(list(schema["properties"]), doc)
    for k in list(doc.keys()):
        if (
            isinstance(doc[k], str)
            and schema["properties"].get(k, {}).get("type") == "array"
            and schema["properties"][k]["items"]["type"] == "string"
            and not isinstance(doc[k], list)
        ):
            doc[k] = [doc[k]]
    return doc


def validate(doc, collection_name=None, conform_doc=False):
    if collection_name not in set(dbschema["properties"]):
        raise ValueError(
            f'collection_name must be one of {set(dbschema["properties"])}'
        )
    if conform_doc:
        doc = conform(doc, collection_name=collection_name)
    jsonschema.validate(doc, schema=dbschema)
    return doc


def fetch_json(url):
    return requests.get(url).json()


def fetch_and_validate_json(resource, collection_name=None, conform_doc=False):
    """Takes a URL or the pre-fetched resource (list or dict)"""
    payload = fetch_json(resource) if isinstance(resource, str) else resource
    validated = []
    if isinstance(payload, list):
        for doc in tqdm(payload):
            validated.append(
                validate(doc, collection_name=collection_name, conform_doc=conform_doc)
            )
    elif isinstance(payload, dict):
        if set(payload) & set(dbschema["properties"]):
            for collection_name, docs in payload.items():
                for doc in tqdm(docs, desc=collection_name):
                    validated.append(
                        validate(
                            doc,
                            collection_name=collection_name,
                            conform_doc=conform_doc,
                        )
                    )
        else:
            validated.append(
                validate(
                    payload, collection_name=collection_name, conform_doc=conform_doc
                )
            )
    else:
        raise ValueError(f"Fetched JSON must be a JSON array or object")
    return validated


def add_to_db(validated, db, collection_name=None):
    if collection_name not in set(dbschema["properties"]):
        raise ValueError(
            f'collection_name must be one of {set(dbschema["properties"])}'
        )
    if isinstance(validated, list):
        db[collection_name].bulk_write(
            [ReplaceOne({"id": v["id"]}, v, upsert=True) for v in validated]
        )
    elif isinstance(validated, dict):
        if set(validated) & set(dbschema["properties"]):
            for collection_name, docs in validated.items():
                db[collection_name].bulk_write(
                    [ReplaceOne({"id": v["id"]}, v, upsert=True) for v in docs]
                )
        else:
            db[collection_name].bulk_write(
                [ReplaceOne({"id": validated["id"]}, validated, upsert=True)]
            )
    else:
        raise ValueError(f"payload must be a list or dict")


def fetch_conform_and_persist(spec, db):
    url = spec["url"]
    collection_name = spec["type"]
    print(f"fetching {url} ({collection_name})")
    payload = fetch_and_validate_json(url, collection_name, conform_doc=True)
    add_to_db(payload, db, collection_name)


def fetch_conform_and_persist_from_manifest(spec, db):
    error_urls = []
    url_manifest = spec["url_manifest"]
    collection_name = spec["type"]
    urls = fetch_json(url_manifest)

    pbar = tqdm(total=len(urls))

    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_url = {
            executor.submit(
                fetch_and_validate_json, url, collection_name, conform_doc=True
            ): url
            for url in urls
        }
        for future in concurrent.futures.as_completed(future_to_url):
            pbar.update(1)
            url = future_to_url[future]
            try:
                payload = future.result()
            except Exception as e:
                error_urls.append((url, str(e)))
            else:
                add_to_db(payload, db, collection_name)

    pbar.close()
    return error_urls


In [34]:
from collections import defaultdict
import csv
from datetime import datetime
from functools import partial, reduce
import json
import os
import re
from pprint import pprint
from zipfile import ZipFile

from dictdiffer import diff
from pymongo import DeleteMany, DeleteOne, InsertOne, MongoClient, ReplaceOne, UpdateOne
from toolz import assoc_in, compose, concat, dissoc, keyfilter, get_in, merge, merge_with
from tqdm.notebook import tqdm

(Re-)load existing NMDC DB from file.

In [35]:
with ZipFile('../src/data/nmdc_database.json.zip') as myzip:
    with myzip.open('nmdc_database.json') as f:
        nmdc_database = json.load(f)

In [36]:
client = MongoClient(
    host=os.getenv("NMDC_MONGO_HOST"),
    username="dwinston_rw",
    password=os.getenv("NMDC_MONGO_RW_PWD"))
dbname = "dwinston_scratch"
db = client[dbname]

for collection in nmdc_database:
    db.drop_collection(collection)
    db[collection].create_index("id", unique=True)
    db[collection].insert_many(nmdc_database[collection])
print(sorted(db.list_collection_names()))

['activity_set', 'biosample_set', 'data_object_set', 'functional_annotation_set', 'genome_feature_set', 'mags_activity_set', 'omics_processing_set', 'study_set']


Load FICUS Brodie spreadsheet and create gold-id-to-igsn map.

In [37]:
GOLD_ID_IDX = 5
IGSN_IDX = 2

igsn_golds = defaultdict(list)

gold_id_pattern = re.compile(r"Gb\d+")

with open('../src/data/FICUS_Soil_Gs0135149_Brodie-12-23-2020_PS.xlsx - Brodie_Gs0135149_Soil_Metadata.csv') as f:
    reader = csv.reader(f)
    for row in reader:
        gold_id = row[GOLD_ID_IDX]
        igsn = row[IGSN_IDX]
        if gold_id_pattern.fullmatch(gold_id):
            igsn_golds[igsn].append(gold_id)

Prepare helper function to compare timestamps given in e.g. "15-MAY-20 08.30.01.000000000 am" format.

In [38]:
dt_pattern = re.compile(r"\d{2}-(?P<month>\w+)-\d{2} \d{2}\.\d{2}\.\d{2}\.(?P<ns>\d+) [A|P]M")
dt_format = "%d-%b-%y %I.%M.%S.%f %p"

def order_timestamps(timestamps):
    if not all(isinstance(ts, str) for ts in timestamps):
        raise Exception(f"{timestamps} not strings")
    as_datetimes = []
    for ts in timestamps:
        match = dt_pattern.search(ts)
        first, month, rest = ts.partition(match.group("month"))
        ts_new = first + month[0] + month[1:].lower() + rest
        ts_new = ts_new.replace(match.group("ns"), match.group("ns")[:-3]) # truncate to microseconds
        as_datetimes.append(datetime.strptime(ts_new, dt_format))
    sorted_dts = sorted(as_datetimes)
    return [dt.strftime(dt_format) for dt in sorted_dts]

Prepare helper-function pipeline to unify biosample_set documents that should be considered equivalent.

In [39]:
er_xna_pattern = re.compile(r"ER_[D|R]NA_\d+$")

def rstrip_name_ER_ID(d):
    s = get_in(["name"], d)
    s_new = er_xna_pattern.split(s)[0] if er_xna_pattern.search(s) else s
    return assoc_in(d, ["name"], s_new)

def capitalize_location_raw_value(d):
    s = get_in(["location", "has_raw_value"], d)
    s_new = s[0].upper() + s[1:]
    return assoc_in(d, ["location", "has_raw_value"], s_new)

pipeline = compose(
    capitalize_location_raw_value,
    rstrip_name_ER_ID,
    lambda d: dissoc(d, "_id", "id", "add_date", "mod_date", "identifier"),
)

Produce new biosample objects with ISGN ids.

In [40]:
merged_biosample_docs = []

for igsn, golds in igsn_golds.items():
    igsn_curie = "igsn:"+igsn
    to_change = list(db.biosample_set.find({"id": {"$in": [f"gold:{g}" for g in golds]}}))
    
    # No merge needed, just change of id.
    if len(to_change) == 1:
        merged = assoc_in(to_change[0], ["id"], igsn_curie)
        merged = assoc_in(merged, ["identifier", "has_raw_value"], igsn_curie)
        merged_biosample_docs.append(merged)
        continue
    elif len(to_change) == 0:
        continue

    # Ensure that unification pipeline is adequate to resolve differences.
    distilled = list(map(pipeline, to_change))
    result = list(diff(distilled[0], distilled[1]))
    assert result == []
    
    # Produce a merged document
    earlier_ts, _ = order_timestamps([get_in(["add_date", "has_raw_value"], d) for d in to_change])
    merged = assoc_in(distilled[0], ["add_date", "has_raw_value"], earlier_ts)
    _, later_ts = order_timestamps([get_in(["mod_date", "has_raw_value"], d) for d in to_change])
    merged = assoc_in(merged, ["mod_date", "has_raw_value"], later_ts)
    merged = assoc_in(merged, ["id"], igsn_curie)
    merged = assoc_in(merged, ["identifier", "has_raw_value"], igsn_curie)
    
    merged_biosample_docs.append(merged)
    merged = None # defense against accidental reuse during next iteration.

assert len(merged_biosample_docs) == len(igsn_golds)

Delete old biosample objects and insert new ones in one bulk-write operation.

In [41]:
requests = [DeleteMany({"id": {"$in": ["gold:"+g for g in concat(igsn_golds.values())]}})]
requests.extend([InsertOne(d) for d in merged_biosample_docs])
result = db.biosample_set.bulk_write(requests)
result.deleted_count, result.inserted_count

(93, 48)

Update omics_processing_set references to biosample_set ids.

In [42]:
goldid_igsn = {}
for igsn, gids in igsn_golds.items():
    for gid in gids:
        goldid_igsn[gid] = igsn

In [43]:
requests = []
to_replace = {"gold:"+k: "igsn:"+v for k, v in goldid_igsn.items()}

for doc in db.omics_processing_set.find({"$or": [
    {"id": {"$in": list(to_replace)}},
    {"has_input": {"$in": list(to_replace)}},
    {"has_output": {"$in": list(to_replace)}},
    {"part_of": {"$in": list(to_replace)}}
]}):
    operations = {"$set": {
        "id": to_replace.get(doc["id"], doc["id"]),
        "has_input": [to_replace.get(i, i) for i in doc["has_input"]],
        "has_output": [to_replace.get(i, i) for i in doc["has_output"]],
        "part_of": [to_replace.get(i, i) for i in doc["part_of"]],
    }}
    requests.append({"filter": {"_id": doc["_id"]}, "update": operations})

In [44]:
rv = db.omics_processing_set.bulk_write([UpdateOne(**r) for r in requests])

In [45]:
rv.modified_count

93

Update omics_processing_set references from EMSL ids to IGSNs.

In [46]:
EMSL_IDS_IDX = 7
IGSN_IDX = 2

igsn_emsls = {}

emsl_ids_pattern = re.compile(r"\d+")

with open('../src/data/FICUS_Soil_Gs0135149_Brodie-12-23-2020_PS.xlsx - Brodie_Gs0135149_Soil_Metadata.csv') as f:
    reader = csv.reader(f)
    for row in reader:
        emsl_ids = row[EMSL_IDS_IDX]
        igsn = row[IGSN_IDX]
        ids = emsl_ids_pattern.findall(emsl_ids)
        # XXX some rows have emsl ids but no IGSN, so igsn.strip() check here
        if igsn.strip() and ids:
            igsn_emsls[igsn] = ids

In [47]:
emslid_igsn = {}
for igsn, eids in igsn_emsls.items():
    for eid in eids:
        emslid_igsn[eid] = igsn

In [48]:
n_with_emsl_id = db.omics_processing_set.count_documents(
    {"id": {"$in": ["emsl:"+i for i in emslid_igsn]}})

In [49]:
requests = []
to_replace = {"emsl:"+k: "igsn:"+v for k, v in emslid_igsn.items()}
to_replace.update({"emsl:output_"+k: "igsn:"+v for k, v in emslid_igsn.items()})

def omit(blacklist, d):
    return keyfilter(lambda k: k not in blacklist, d)

def sans_mongo_id(d):
    return omit(["_id"], d)

for doc in db.omics_processing_set.find({"$or": [
    {"id": {"$in": list(to_replace)}},
    {"has_input": {"$in": list(to_replace)}},
    {"has_output": {"$in": list(to_replace)}},
    {"part_of": {"$in": list(to_replace)}}
]}):
    new_id = to_replace.get(doc["id"], doc["id"])
    new_doc = sans_mongo_id(doc)
    operations = {}
    for field in ["has_input", "has_output", "part_of"]:
        if field in doc:
            operations = assoc_in(operations, ["$set", field], [to_replace.get(i, i) for i in doc[field]])
    if operations:
        new_doc = merge(new_doc, operations["$set"])
    if new_id != doc["id"]:
        new_doc = assoc_in(new_doc, ["id"], new_id)
        requests.append({"type": "deleteone", "filter": {"id": doc["id"]}})
    requests.append({"type": "replaceone", "filter": {"id": new_id}, "replacement": new_doc, "upsert": True})

In [50]:
pymongo_requests = []
for r in requests:
    t = r["type"]
    rest = omit(["type"], r)
    if t == "deleteone":
        pymongo_requests.append(DeleteOne(**rest))
    elif t == "replaceone":
        pymongo_requests.append(ReplaceOne(**rest))
        
rv = db.omics_processing_set.bulk_write(pymongo_requests, ordered=True)
assert n_with_emsl_id == rv.modified_count + rv.upserted_count

Note that some newly IGSN id'ed docs identify as part_of biosamples without IGSNs

In [51]:
db.omics_processing_set.find_one(
    {"id": {"$regex": "^igsn"}, "part_of": {"$regex": "^gold"}})

{'_id': ObjectId('600f1b0425261d62ada0d593'),
 'id': 'igsn:IEWFS0001',
 'name': 'EMSL_49991_Brodie_115_Lipids_POS_09Aug19_Lola-WCSH417820',
 'description': 'High res MS with high res HCD MSn and low res CID MSn',
 'part_of': ['gold:Gs0135149'],
 'has_output': ['igsn:IEWFS0001'],
 'omics_type': {'has_raw_value': 'Lipidomics'},
 'type': 'nmdc:OmicsProcessing',
 'instrument_name': {'has_raw_value': 'VOrbiETD04'},
 'processing_institution': {'has_raw_value': 'Environmental Molecular Sciences Lab'}}

In [52]:
"gold:Gs0135149" in goldid_igsn

False

In [54]:
db_share = client["dwinston_dev"]

for collection_name in tqdm(db.list_collection_names(), desc="all collections"):
    docs = [sans_mongo_id(doc) for doc in db[collection_name].find()]
    for doc in docs:
        if "lat_lon" in doc and "type" in doc["lat_lon"]:
            del doc["lat_lon"]["type"]
        for k in doc:
            if isinstance(doc[k], dict) and "term" in doc[k] and "id" in doc[k]["term"]:
                doc[k]["term"] = doc[k]["term"]["id"]
    validated = []
    for doc in tqdm(docs, desc="validating "+collection_name):
        validated.append(validate(doc, collection_name=collection_name, conform_doc=True))
    for doc in tqdm(validated, desc="adding "+collection_name):
        add_to_db(doc, db_share, collection_name)

all collections:   0%|          | 0/8 [00:00<?, ?it/s]

validating genome_feature_set: |          | 0/0 [00:00<?, ?it/s]

adding genome_feature_set: |          | 0/0 [00:00<?, ?it/s]

validating omics_processing_set:   0%|          | 0/6764 [00:00<?, ?it/s]

adding omics_processing_set:   0%|          | 0/6764 [00:00<?, ?it/s]

validating mags_activity_set:   0%|          | 0/114 [00:00<?, ?it/s]

adding mags_activity_set:   0%|          | 0/114 [00:00<?, ?it/s]

validating functional_annotation_set: |          | 0/0 [00:00<?, ?it/s]

adding functional_annotation_set: |          | 0/0 [00:00<?, ?it/s]

validating activity_set:   0%|          | 0/1033 [00:00<?, ?it/s]

adding activity_set:   0%|          | 0/1033 [00:00<?, ?it/s]

validating data_object_set:   0%|          | 0/10870 [00:00<?, ?it/s]

adding data_object_set:   0%|          | 0/10870 [00:00<?, ?it/s]

validating study_set:   0%|          | 0/13 [00:00<?, ?it/s]

adding study_set:   0%|          | 0/13 [00:00<?, ?it/s]

validating biosample_set:   0%|          | 0/919 [00:00<?, ?it/s]

adding biosample_set:   0%|          | 0/919 [00:00<?, ?it/s]