In [440]:
from linkml_runtime.linkml_model import ClassDefinition, EnumDefinition
from linkml_runtime.utils.schemaview import SchemaView
import pandas as pd
import random
import string
import yaml

In [441]:
placeholder_string = "XYZ"
placeholder_int = 999

emp500_tsv_from_rdbms = '../assets/bp_PRJEB42019_biosamples.tsv'
schema_yaml = "../src/schema/nmdc.yaml"
target_class = "Biosample"
biosample_id_prefix = 'nmdc:bsm-99-'
output_yaml = "../assets/emp500_biosample_set.yaml"

In [442]:
emp500 = pd.read_csv(emp500_tsv_from_rdbms, sep='\t')

In [443]:
emp500.shape

(1024, 547)

In [444]:
emp500_populated_cols = emp500.dropna(axis=1, how='all')

In [445]:
emp500_populated_cols.shape

(1024, 78)

In [446]:
emp500_cols = list(emp500_populated_cols.columns)

In [447]:
emp500_cols.sort()

In [448]:
emp500_populated_cols.to_clipboard()

In [449]:
v = SchemaView(schema_yaml)

In [450]:
bs = v.induced_class(target_class)

In [451]:
bs_slots = [str(k) for k, v in bs.attributes.items()]

In [452]:
bs_required_slots = [str(k) for k, v in bs.attributes.items() if v.required]

In [453]:
bs_slots.sort()

In [454]:
bs_required_slots.sort()

In [455]:
def useful_list_operations(emp500_list, bs_all, bs_required):
    emp500_biosample_intersection = list(set(emp500_list) & set(bs_all))
    emp500_biosample_intersection.sort()

    emp500_minus_biosample = list(set(emp500_list) - set(bs_all))
    emp500_minus_biosample.sort()
    biosample_required_minus_emp500 = list(set(bs_required) - set(emp500_list))
    biosample_required_minus_emp500.sort()
    return {"emp500_biosample_intersection": emp500_biosample_intersection,
            "emp500_minus_biosample": emp500_minus_biosample,
            "biosample_required_minus_emp500": biosample_required_minus_emp500}

In [456]:
emp500_vs_biosample = useful_list_operations(emp500_cols, bs_slots, bs_required_slots)

In [457]:
emp500_intersection_frame = emp500[emp500_vs_biosample['emp500_biosample_intersection']]

In [458]:
emp500_intersection_frame_plus = emp500_intersection_frame.reindex(
    columns=emp500_intersection_frame.columns.tolist() + emp500_vs_biosample['biosample_required_minus_emp500'])

In [459]:
emp500_intersection_frame_plus['source_mat_id'] = emp500_intersection_frame_plus['id']

In [460]:
def random_suffix():
    return ''.join(random.choices(string.ascii_uppercase, k=9))

In [461]:
emp500_intersection_frame_plus['source_mat_id'] = emp500_intersection_frame_plus['id']

In [462]:
emp500_intersection_frame_plus['id'] = [biosample_id_prefix + random_suffix() for i in
                                        range(len(emp500_intersection_frame_plus))]

In [463]:

emp500_intersection_frame_plus['depth'] = placeholder_int

# don't expect to get these from NCBI
emp500_intersection_frame_plus['ecosystem'] = placeholder_string
emp500_intersection_frame_plus['ecosystem_category'] = placeholder_string
emp500_intersection_frame_plus['ecosystem_subtype'] = placeholder_string
emp500_intersection_frame_plus['ecosystem_type'] = placeholder_string
emp500_intersection_frame_plus['specific_ecosystem'] = placeholder_string

# didn't realize these were required!!!
emp500_intersection_frame_plus['growth_facil'] = placeholder_string
emp500_intersection_frame_plus['samp_store_temp'] = placeholder_int
emp500_intersection_frame_plus['store_cond'] = placeholder_string


In [464]:
emp500_intersection_frame_plus.to_clipboard(index=False)

In [465]:
dict_list = emp500_intersection_frame_plus.to_dict(orient='records')

In [466]:
# really don't want to test against string "nan"... that should theoretically be legal
dict_list_without_missings = [
    {k: v for k, v in d.items() if str(v) not in ["", "nan"]} for d in
    dict_list]


In [467]:
for my_dict in dict_list_without_missings:
    for key, value in my_dict.items():
        try:
            my_dict[key] = float(value)
        except ValueError:
            pass

In [468]:
keys_used = set()
for my_dict in dict_list_without_missings:
    for key in my_dict.keys():
        keys_used.add(key)

In [469]:
ranges_used = dict()
for key in keys_used:
    ranges_used[key] = bs.attributes[key].range

In [470]:
keys_by_range = {}
for key, value in ranges_used.items():
    if value not in keys_by_range:
        keys_by_range[value] = []
    keys_by_range[value].append(key)

In [471]:
def cast_to_range(val_in, key_name, ranges_used_dict):
    my_range = ranges_used_dict[key_name]
    range_obj = v.get_element(my_range)
    if type(range_obj) == ClassDefinition and str(range_obj.is_a) == "AttributeValue":
        return {"has_raw_value": val_in}
    elif type(range_obj) == EnumDefinition:
        pass
    elif my_range == "string":
        return str(val_in)
    elif my_range == "float":
        try:
            return float(val_in)
        except ValueError:
            return None
    elif my_range == "ControlledIdentifiedTermValue":
        return {"has_raw_value": val_in, "term": {"id": f"{placeholder_string}:{placeholder_int}"}}
    elif my_range == "ControlledTermValue":
        return {"has_raw_value": val_in}
    elif my_range == "date":
        pass
    else:
        # raise ValueError("Unknown range: " + my_range)
        return val_in


In [472]:
casted = [
    {k: cast_to_range(v, k, ranges_used) for k, v in d.items()} for d in
    dict_list_without_missings]

In [473]:
outer_dict = {"biosample_set": casted}

In [474]:
with open(output_yaml, "w") as f:
    # Dump the dictionary to YAML and write to the file
    yaml.dump(outer_dict, f)