# Get Harmonization Training Data

**Goal:** Get mapping information from mutated synthetic data models to source models to serve as synthetic mapping/harmonization training data for an AI model.

We will use the same source of data as we did for v3 (the latest) of the synthetic data from objective 1. Using only ONE mutated version from here: https://uchicago.app.box.com/folder/318234520321?s=j0oq2pfhe59p3caqq3ss32kxqwciwbsd

Utilize only one of the above sub-folders, for example, we will extract and use only one: `mutated_sdc_v3_nmax4_nmin2_pmax75_pmin25_limit20_dmax1000_20250423.zip` and ignore others. 

Ensure the above is extracted into a `../data/Mutated_SDCs_v3_20250423/mutated_sdc_v3_nmax4_nmin2_pmax75_pmin25_limit20_dmax1000_20250423` folder.

Also we need the required the synthetic data dictionary information: https://uchicago.app.box.com/folder/318236025225

Extract the above in a `../data/SDMs_nodes6-15_props50-100_20250423` folder.


In [None]:
import os
import csv
import copy
import json
import shutil
import logging
import concurrent.futures


from harmonization.utils import (
    TEMP_DIR,
    get_gen3_json_schemas_and_templates,
)
from harmonization.simple_data_model import (
    get_data_model_as_node_prop_type_descriptions,
)


SDMS_FOLDER_PATH  = (
    "../data/SDMs_nodes6-15_props50-100_20250423/"
)

GEN3_MANUAL_DD_PATH = {
    "aids.diseasedatahub.org": f"{SDMS_FOLDER_PATH}/input_schemas/aids.diseasedatahub.org__jsonschema_dd_modified.json",
    "bihstaging.data-commons.org": f"{SDMS_FOLDER_PATH}/input_schemas/bihstaging.data-commons.org__jsonschema_dd_modified.json",
    "caninedc.org": f"{SDMS_FOLDER_PATH}/input_schemas/caninedc.org__jsonschema_dd_modified.json",
    "chicagoland.pandemicresponsecommons.org": f"{SDMS_FOLDER_PATH}/input_schemas/chicagoland.pandemicresponsecommons.org__jsonschema_dd_modified.json",
    "chordshealth.org": f"{SDMS_FOLDER_PATH}/input_schemas/chordshealth.org__jsonschema_dd_modified.json",
    "data.bloodpac.org": f"{SDMS_FOLDER_PATH}/input_schemas/data.bloodpac.org__jsonschema_dd_modified.json",
    "data.midrc.org": f"{SDMS_FOLDER_PATH}/input_schemas/data.midrc.org__jsonschema_dd_modified.json",
    "diseasedatahub.org": f"{SDMS_FOLDER_PATH}/input_schemas/diseasedatahub.org__jsonschema_dd_modified.json",
    "flu.diseasedatahub.org": f"{SDMS_FOLDER_PATH}/input_schemas/flu.diseasedatahub.org__jsonschema_dd_modified.json",
    "gen3.biodatacatalyst.nhlbi.nih.gov": f"{SDMS_FOLDER_PATH}/input_schemas/gen3.biodatacatalyst.nhlbi.nih.gov__jsonschema_dd_modified.json",
    "gen3.datacommons.io": f"{SDMS_FOLDER_PATH}/input_schemas/gen3.datacommons.io__jsonschema_dd_modified.json",
    "genomel": f"{SDMS_FOLDER_PATH}/input_schemas/genomel__jsonschema_dd_modified.json",
    "healdata.org": f"{SDMS_FOLDER_PATH}/input_schemas/healdata.org__jsonschema_dd_modified.json",
    "hnc": f"{SDMS_FOLDER_PATH}/input_schemas/hnc__jsonschema_dd_modified.json",
    "ibdgc": f"{SDMS_FOLDER_PATH}/input_schemas/ibdgc__jsonschema_dd_modified.json",
    "icgc.bionimbus.org": f"{SDMS_FOLDER_PATH}/input_schemas/icgc.bionimbus.org__jsonschema_dd_modified.json",
    "jcoin.datacommons.io": f"{SDMS_FOLDER_PATH}/input_schemas/jcoin.datacommons.io__jsonschema_dd_modified.json",
    "kidsfirst": f"{SDMS_FOLDER_PATH}/input_schemas/kidsfirst__jsonschema_dd_modified.json",
    "microbiome": f"{SDMS_FOLDER_PATH}/input_schemas/microbiome__jsonschema_dd_modified.json",
    "mmrf": f"{SDMS_FOLDER_PATH}/input_schemas/mmrf__jsonschema_dd_modified.json",
    "nci-crdc.datacommons.io": f"{SDMS_FOLDER_PATH}/input_schemas/nci-crdc.datacommons.io__jsonschema_dd_modified.json",
    "pdp": f"{SDMS_FOLDER_PATH}/input_schemas/pdp__jsonschema_dd_modified.json",
    "portal.occ-data.org": f"{SDMS_FOLDER_PATH}/input_schemas/portal.occ-data.org__jsonschema_dd_modified.json",
    "rerf": f"{SDMS_FOLDER_PATH}/input_schemas/rerf__jsonschema_dd_modified.json",
    "tb.diseasedatahub.org": f"{SDMS_FOLDER_PATH}/input_schemas/tb.diseasedatahub.org__jsonschema_dd_modified.json",
    "toxdatacommons": f"{SDMS_FOLDER_PATH}/input_schemas/toxdatacommons__jsonschema_dd_modified.json",
    "vpodc.data-commons.org": f"{SDMS_FOLDER_PATH}/input_schemas/vpodc.data-commons.org__jsonschema_dd_modified.json",
}

SYNONYMOUS_NODES = {
    "study": ["study", "dataset", "clinical_trial", "collection", "research"],
    "dataset": ["study", "dataset", "clinical_trial", "collection", "research"],
    "clinical_trial": ["study", "dataset", "clinical_trial", "collection", "research"],
    "collection": ["study", "dataset", "clinical_trial", "collection", "research"],
    "research": ["study", "dataset", "clinical_trial", "collection", "research"],
    "subject": ["subject", "patient", "case", "participant"],
    "patient": ["subject", "patient", "case", "participant"],
    "case": ["subject", "patient", "case", "participant"],
    "participant": ["subject", "patient", "case", "participant"],
    "biospecimen": ["biospecimen", "specimen"],
    "specimen": ["biospecimen", "specimen"],
    "project": ["project"],  # this forces a search over other DDs later
    "program": ["program"],  # this forces a search over other DDs later
}
SYNONYMOUS_NODES_TO_GEN3_MANUAL_DD = {
    # will be populated programatically
}
gen3_dd_schemas = {
    # will be populated programatically
}

In [None]:
# Obtain mutated real data dicts: https://uchicago.app.box.com/folder/318234520321?s=j0oq2pfhe59p3caqq3ss32kxqwciwbsd
# and place as input_dir
input_dir = "../data/Mutated_SDCs_v3_20250423"

sdm_stats_filepath = os.path.join(SDMS_FOLDER_PATH, "_SDM_stats.json")

# delete this whole output directory and rerun to recreate
output_dir = "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [None]:
# creating mapping from nodes that have snyns to a list of dicts with that particular node name
for node, _ in SYNONYMOUS_NODES.items():
    for schema_name, schema_path in GEN3_MANUAL_DD_PATH.items():
        with open(schema_path, "r") as file:
            gen3_dd_schema = json.load(file)
            gen3_dd_schemas[schema_name] = gen3_dd_schema
            if node in gen3_dd_schema:
                SYNONYMOUS_NODES_TO_GEN3_MANUAL_DD.setdefault(node, set()).add(
                    schema_name
                )

### Convert original data model schema format to more efficient dictionary-based format

In [None]:
sdm_input_schemas_dir = os.path.join(SDMS_FOLDER_PATH, "/input_schemas/")
for root, dirs, files in os.walk(sdm_input_schemas_dir):
    for file in files:
        if "_modified.json" in file:
            continue

        output_filename = os.path.join(root, file.replace(".json", "_modified.json"))
        if os.path.exists(output_filename):
            continue

        with open(os.path.join(root, file)) as input_file:
            original_data = json.load(input_file)
            final_output_structure = {
                # "{{node_name}}": {
                # "properties": {
                # "{{property_name}}": {}
                # }
                # }
            }
            for node in original_data["nodes"]:
                new_node = copy.deepcopy(node)

                # is currently a list, we want a dict
                del new_node["properties"]
                new_node["properties"] = {}

                for node_property in node["properties"]:
                    new_node["properties"][node_property["name"]] = node_property

                final_output_structure[node["name"]] = new_node

            with open(output_filename, "w") as output_file:
                output_file.write(json.dumps(final_output_structure))

In [None]:
def get_node_prop_desc_from_real_data_model(
    gen3_dd_schemas, gen3_domain, node_name, property_name
):
    description = get_desc_from_real_data_model(
        gen3_dd_schemas, gen3_domain, node_name, property_name
    )
    if description is None:
        logging.warning(
            f"could not get desc for property {property_name} in node {node_name} from {gen3_domain}"
        )

    return f"{node_name}.{property_name}: {description}"


def get_desc_from_real_data_model(
    gen3_dd_schemas, gen3_domain, node_name, property_name
):
    # Extract descriptions from Gen3 dictionary domain
    gen3_node = gen3_dd_schemas[gen3_domain].get(node_name, {})

    gen3_node_prop = gen3_node.get("properties", {}).get(property_name, {})

    if not gen3_node_prop:
        # logging.warning(
        #     f"Property {property_name} not in node {node_name} in {gen3_domain}"
        # )
        return None

    gen3_description = (
        gen3_node_prop.get("description", "").replace("\t", "    ").replace("\n", " ")
    )

    return gen3_description


def get_synth_model_to_original(sdm_stats_filepath):
    synth_model_to_original = {}

    with open(sdm_stats_filepath) as file:
        sdm_stats = json.load(file)

    for model_num, model_info in sdm_stats["models"].items():
        # we will populate this information per synth model
        synth_model_to_original_info = {
            "node_to_original_info": None,
            "synth_model_node_to_properties": None,
            "synth_model_prop_to_original_models": None,
        }

        # todo: helper func?
        node_to_original_info = {}
        for node_name, node_info in model_info["nodes"].items():
            real_data_model = node_info["dm_name"]

            node_to_original_info[node_name] = node_info

        synth_model_to_original_info["node_to_original_info"] = node_to_original_info

        # now we need to get the actual nodes and props used in this synth model
        # need to read in synthmodel.json
        synth_model_path = os.path.join(
            os.path.dirname(sdm_stats_filepath), f"SDM_{model_num}.json"
        )
        synth_model_node_to_properties = {}
        with open(synth_model_path) as synth_model_file:
            synth_model = json.load(synth_model_file)
            for node_info in synth_model["nodes"]:
                for node_property in node_info["properties"]:
                    synth_model_node_to_properties.setdefault(
                        node_info["name"], []
                    ).append(node_property)

        synth_model_to_original_info["synth_model_node_to_properties"] = (
            synth_model_node_to_properties
        )

        # next step

        synth_model_prop_to_original_models = {
            # "node.property": [{
            #                       "gen3_domain": gen3_domain
            #                       "original_node": gen3_node
            #                       "original_property": gen3_node_prop
            #                   }, ...]
        }
        for node_name, node_info in synth_model_to_original_info[
            "synth_model_node_to_properties"
        ].items():
            for node_property_info in node_info:
                property_name = node_property_info["name"]
                original_sources_already_seen_for_prop = set()

                # find which original dd and node property came from

                # node is not synonymous so we just check the original
                if node_name not in SYNONYMOUS_NODES:
                    if (
                        node_name
                        in synth_model_to_original_info["node_to_original_info"]
                    ):
                        original_model = synth_model_to_original_info[
                            "node_to_original_info"
                        ][node_name]["dm_name"]

                        synth_node_prop_desc = node_property_info["description"]

                        gen3_node = gen3_dd_schemas[original_model].get(node_name, {})
                        gen3_node_prop = gen3_node["properties"].get(property_name, {})

                        if gen3_node_prop:
                            original_info = {
                                "gen3_domain": gen3_domain,
                                "original_node": gen3_node,
                                "original_property": gen3_node_prop,
                            }
                            synth_model_prop_to_original_models.setdefault(
                                f"{node_name}.{property_name}", []
                            ).append(original_info)
                    else:
                        # node not found in specified original model and
                        # not a synonmous node.
                        # This means we need to check parent_nodes information to determine where this node came from
                        # (due to missing information from the synthetic data generation SDM stats creation)
                        for node, node_info in model_info["nodes"].items():
                            if node_name in node_info.get("parent_nodes", {}):
                                gen3_domain = node_info["dm_name"]
                                gen3_node = gen3_dd_schemas[gen3_domain].get(
                                    node_name, {}
                                )
                                gen3_node_prop = gen3_node["properties"].get(
                                    node_name, {}
                                )

                                if gen3_node_prop:
                                    original_info = {
                                        "gen3_domain": gen3_domain,
                                        "original_node": gen3_node,
                                        "original_property": gen3_node_prop,
                                    }
                                    synth_model_prop_to_original_models.setdefault(
                                        f"{node_name}.{property_name}", []
                                    ).append(original_info)

                    if original_model not in original_sources_already_seen_for_prop:
                        original_info = {
                            "gen3_domain": original_model,
                            "original_node": gen3_node,
                            "original_property": gen3_node_prop,
                        }
                        original_sources_already_seen_for_prop.add(original_model)
                        synth_model_prop_to_original_models.setdefault(
                            f"{node_name}.{property_name}", []
                        ).append(original_info)
                elif node_name in SYNONYMOUS_NODES:
                    alternate_names = SYNONYMOUS_NODES.get(node_name, [])

                    for alternate_node_name in alternate_names:
                        for gen3_domain in SYNONYMOUS_NODES_TO_GEN3_MANUAL_DD.get(
                            alternate_node_name, []
                        ):
                            gen3_node = gen3_dd_schemas[gen3_domain].get(
                                alternate_node_name, {}
                            )
                            new_property_name = property_name.replace(
                                node_name, alternate_node_name
                            )

                            gen3_node_prop = gen3_node["properties"].get(
                                new_property_name, {}
                            )

                            if gen3_node_prop:
                                original_info = {
                                    "gen3_domain": gen3_domain,
                                    "original_node": gen3_node,
                                    "original_property": gen3_node_prop,
                                }
                                synth_model_prop_to_original_models.setdefault(
                                    f"{node_name}.{property_name}", []
                                ).append(original_info)
                else:
                    logging.warning(
                        f"could not find {node_name}.{property_name} in SDM_{model_num}.json"
                    )

        synth_model_to_original_info["synth_model_prop_to_original_models"] = (
            synth_model_prop_to_original_models
        )

        synth_model_to_original[model_num] = synth_model_to_original_info
    return synth_model_to_original

In [None]:
# This takes time, run this once then rely on persisted results (i.e. comment this out after running once)
synth_model_to_original = get_synth_model_to_original(sdm_stats_filepath)
print(f"Got {len(synth_model_to_original.keys())} synthetic models...")

## Process all SDCs into Training Data using the Pre-Computed Info Above

This takes a _long_ time. If you leave the default settings, it'll skip recomputing folders that already exist in the output (i.e. you can stop and restart it many times and it'll basically continue where it left off).

You can adjust the number of workers `num_workers` below and you should set that as high as possible.

We're processing 10k synthetic data models, and for each synthetic data model we can have upwards of a dozen SDCs. And each SDC can create mappings for up to 27 of the original data models (e.g. if project contains properties that would map to any model, we generate expected mappings to that model).

In [None]:
def persist_synth_data_info_to_contribution_directory(
    directory_path, synth_model_to_original, force_recreation=False
):
    if not synth_model_to_original:
        logging.warning(
            f"Trying to persist synth_model_to_original but nothing was provided"
        )
        return

    model_path = os.path.dirname(directory_path)
    mutation_path = os.path.dirname(model_path)
    # logging.debug(f"model_path: {model_path}")
    # logging.debug(f"mutation_path: {mutation_path}")

    synth_model_to_original_filepath = os.path.join(
        model_path, "synth_model_to_original.json"
    )

    if force_recreation and os.path.exists(synth_model_to_original_filepath):
        os.remove(synth_model_to_original_filepath)

    if not os.path.exists(synth_model_to_original_filepath):
        with open(
            os.path.join(model_path, "synth_model_to_original.json"), "w"
        ) as output:
            for entry in os.scandir(directory_path):
                if os.path.basename(entry).endswith("__var_map.json"):
                    synth_model_name = entry.path.split("__")[0]
                    synth_model_num = synth_model_name.split("_")[-1]

                    specific_synth_model_to_original = synth_model_to_original[
                        synth_model_num
                    ]
                    output.write(json.dumps(specific_synth_model_to_original))


def process_synth_data_contribution_directory(
    directory_path, output_dir, gen3_dd_schemas, force_recreation=False
):
    """
    For each original model, generate a folder with files related
    Format of output:
    """
    # each of these SDCs will generate multiple test cases b/c properties from

    model_path = os.path.dirname(directory_path)
    mutation_path = os.path.dirname(model_path)

    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(mutation_path, exist_ok=True)
    os.makedirs(model_path, exist_ok=True)

    synth_model_to_original_filepath = os.path.join(
        model_path, "synth_model_to_original.json"
    )
    specific_synth_model_to_original = {}
    with open(synth_model_to_original_filepath) as input_file:
        try:
            specific_synth_model_to_original = json.load(input_file)
        except Exception as exc:
            logging.error(
                f"Could not get specific_synth_model_to_original from: {input_file}. Skipping..."
            )
            return

    output_dir_updated = os.path.join(output_dir, os.path.basename(mutation_path))
    output_dir_updated = os.path.join(output_dir_updated, os.path.basename(model_path))
    output_dir_updated = os.path.join(
        output_dir_updated, os.path.basename(directory_path)
    )

    if force_recreation and os.path.exists(output_dir_updated):
        os.remove(output_dir_updated)

    if not os.path.exists(output_dir_updated):
        output_mappings_for_directory(
            gen3_dd_schemas=gen3_dd_schemas,
            input_dir=directory_path,
            output_dir=output_dir_updated,
            specific_synth_model_to_original=specific_synth_model_to_original,
        )
    else:
        # print(f"skipping {output_dir_updated}, already exists")
        pass


def output_mappings_for_directory(
    gen3_dd_schemas, input_dir, output_dir, specific_synth_model_to_original
):
    # Mirror directory structure and process files
    for root, dirs, files in os.walk(input_dir):
        relative_path = os.path.relpath(root, input_dir)
        mirrored_path = os.path.join(output_dir, relative_path)
        os.makedirs(mirrored_path, exist_ok=True)

        ai_model_node_props = {}
        for file_name in files:
            if file_name.endswith("__var_map.json"):
                synth_model_name = file_name.split("__")[0]
                synth_model_num = synth_model_name.split("_")[-1]

                synth_model_prop_to_original_models = specific_synth_model_to_original[
                    "synth_model_prop_to_original_models"
                ]

                # Transform var_map.json to expected_mappings.tsv
                with open(os.path.join(root, file_name), "r") as var_map_file:
                    var_map_data = json.load(var_map_file)

                # print(f"var_map_data: {var_map_data}")

                # since we can have nodes from multiple different original models
                tsv_lines_per_original_model = {}
                tsv_lines_header = (
                    "ai_model_node_prop_desc\tharmonized_model_node_prop_desc\n"
                )
                original_node_to_harmonized_node = {}
                for original_key, value in var_map_data.items():
                    # the varmap data comes in 3 forms: node, node.property, and node."foreign_key_node.foreign_key_property"
                    # we only want really care about determining mapping from node.property at this point, so ignore
                    # the other two
                    if "." not in original_key:
                        original_node_to_harmonized_node[original_key] = value.get(
                            "new_name", ""
                        )

                for original_key, value in var_map_data.items():
                    if "." not in original_key:
                        continue

                    original_node = original_key.split(".")[0]
                    harmonized_node = original_node_to_harmonized_node[original_node]
                    original_property = ".".join(original_key.split(".")[-2:])

                    new_name = value.get("new_name", "")
                    new_description = value.get("new_description", "").strip()

                    ai_model_node_prop_desc = (
                        f"{harmonized_node}.{new_name}: {new_description}"
                    )
                    # ai_model_node_prop_desc = specific_synth_model_to_original["synth_model_node_to_properties"].get(
                    #     new_description, ""
                    # )

                    # if not ai_model_node_prop_desc:
                    #     ai_model_node_prop_desc = ai_model_node_props.get(new_name, "")

                    # if not ai_model_node_prop_desc:
                    #     logging.debug(
                    #         f"can't find: {new_name} by description: {new_description} in output. skipping..."
                    #     )
                    #     continue

                    # get gen3_domain (e.g. original model) for this particular node prop by
                    # using previously constructured mapping information
                    original_node_props = synth_model_prop_to_original_models.get(
                        original_key, []
                    )
                    # if not original_node_props:
                    #     logging.warning(
                    #         f"{original_key} not in synth_model_prop_to_original_models for {ai_model_node_prop_desc}"
                    #     )

                    for original_node_prop in original_node_props:
                        gen3_domain = original_node_prop["gen3_domain"]
                        original_node = original_node_prop["original_node"]["name"]
                        original_property = original_node_prop["original_property"].get(
                            "name", ""
                        )

                        # Extract descriptions from Gen3 dictionary domain
                        gen3_description = (
                            get_desc_from_real_data_model(
                                gen3_dd_schemas,
                                gen3_domain,
                                original_node,
                                original_property,
                            )
                            or ""
                        )
                        harmonized_model_node_prop_desc = (
                            f"{original_node}.{original_property}: {gen3_description}"
                        )

                        ai_model_line = f"{ai_model_node_prop_desc}\t{harmonized_model_node_prop_desc}"
                        tsv_lines_per_original_model.setdefault(gen3_domain, []).append(
                            ai_model_line
                        )

                # Save as expected_mappings.tsv
                if tsv_lines_per_original_model:
                    for gen3_domain, tsv_lines in tsv_lines_per_original_model.items():
                        os.makedirs(
                            os.path.join(mirrored_path, gen3_domain), exist_ok=True
                        )
                        with open(
                            os.path.join(
                                os.path.join(mirrored_path, gen3_domain),
                                "expected_mappings.tsv",
                            ),
                            "w",
                        ) as tsv_file:
                            tsv_file.write(tsv_lines_header)
                            tsv_file.write("\n".join(list(set(tsv_lines))))
                else:
                    logging.warning(f"No mappings for {mirrored_path}")

            # Process files, rename jsonschema to ai_model_output
            elif "__jsonschema_dd__" in file_name:
                if file_name.endswith("__jsonschema_dd.json"):
                    gen3_domain = file_name.split("__")[0]

                    output_harmonized_model_path = os.path.join(
                        mirrored_path, "harmonized_data_model.json"
                    )
                    with open(output_harmonized_model_path, "w") as dd_file:
                        dd_file.write(json.dumps(gen3_dd_schemas[gen3_domain]))

                    # Rename JSON schema files
                    new_name = file_name.replace("__jsonschema_dd__", "__").replace(
                        "__jsonschema_dd.json", "__ai_model_output.json"
                    )
                    shutil.copy(
                        os.path.join(root, file_name),
                        os.path.join(mirrored_path, new_name),
                    )
                    with open(os.path.join(root, file_name)) as ai_model_ouput_file:
                        ai_model_ouput = json.load(ai_model_ouput_file)
                        ai_model_node_props_raw = (
                            get_data_model_as_node_prop_type_descriptions(
                                ai_model_ouput
                            )
                        )
                        for node_prop in ai_model_node_props_raw:
                            if ":" in node_prop:
                                prop = node_prop.split(":")[0]
                                desc = ":".join(node_prop.split(":")[1:])
                            else:
                                prop = node_prop
                                desc = ""

                            # key on description b/c that's what's in the mapping we can rely on
                            ai_model_node_props[desc.strip()] = node_prop

    # print(f"Done. Output: {os.path.abspath(output_dir)}")

In [None]:
def output_training_data_for_directory(
    input_dir,
    output_dir,
    synth_model_to_original,
    force_recreation_of_synth_data_info=False,
    force_recreation_of_synth_data_contributions=False,
    num_workers=8,
):
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        for entry in os.scandir(input_dir):
            if entry.is_dir() and os.path.basename(entry).startswith("mutated_"):
                logging.warning(f"processing: {entry.path}")
                for data_model_dir in os.scandir(entry):
                    if data_model_dir.is_dir() and os.path.basename(
                        data_model_dir
                    ).endswith("_tsvs"):
                        logging.debug(
                            f"processing data model directory: {data_model_dir.path}"
                        )
                        for data_contribution_dir in os.scandir(data_model_dir):
                            if data_contribution_dir.is_dir():
                                logging.debug(
                                    f"queueing data contribution directory: {os.path.basename(data_contribution_dir.path)}"
                                )
                                # submit each contribution dir to the pool
                                futures.append(
                                    executor.submit(
                                        _process_single_data_contribution_dir,
                                        data_contribution_dir.path,
                                        output_dir,
                                        synth_model_to_original,
                                        force_recreation_of_synth_data_info,
                                        force_recreation_of_synth_data_contributions,
                                    )
                                )
        # wait for all to finish
        for f in concurrent.futures.as_completed(futures):
            try:
                f.result()
            except Exception as exc:
                logging.error(
                    f"[THREAD ERROR] Data contribution dir task failed: {exc}"
                )


def _process_single_data_contribution_dir(
    data_contribution_path,
    output_dir,
    synth_model_to_original,
    force_recreation_of_synth_data_info,
    force_recreation_of_synth_data_contributions,
):
    # This function is called in thread pool, for a single data_contribution directory
    persist_synth_data_info_to_contribution_directory(
        data_contribution_path,
        synth_model_to_original,
        force_recreation=force_recreation_of_synth_data_info,
    )
    process_synth_data_contribution_directory(
        data_contribution_path,
        output_dir,
        gen3_dd_schemas,
        force_recreation=force_recreation_of_synth_data_contributions,
    )

# IMPORTANT: Remove this condition if you want to continue generating from a previously failed or incomplete output
if not os.path.exists(output_dir):
    output_training_data_for_directory(input_dir, output_dir, synth_model_to_original)

At this point, the output_dir contains a raw dump of potentially relevant training data organized into folders. We still need to do some cleanup and expanding of info to get a final, single JSONL file for training.

## Use Folder-Based Data Generated from Above to Construct AI-ready Training Data in JSONL Files

In [None]:
training_files_dir = (
    "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2"
)

In [None]:
def remove_folders_without_mapping_info(root_folder):
    for mutated_dd_dir in os.listdir(root_folder):
        mutated_dd_path = os.path.join(root_folder, mutated_dd_dir)
        if not os.path.isdir(mutated_dd_path):
            continue
        for original_dd_dir in os.listdir(mutated_dd_path):
            original_dd_path = os.path.join(mutated_dd_path, original_dd_dir)
            if not os.path.isdir(original_dd_path):
                continue
            for prop_folder in os.listdir(original_dd_path):
                prop_folder_path = os.path.join(original_dd_path, prop_folder)
                if not os.path.isdir(prop_folder_path):
                    continue
                for gen3_dd_dir in os.listdir(prop_folder_path):
                    gen3_dd_path = os.path.join(prop_folder_path, gen3_dd_dir)
                    if not os.path.isdir(gen3_dd_path):
                        continue
                    mapping_path = os.path.join(gen3_dd_path, "expected_mappings.tsv")
                    if not os.path.exists(mapping_path):
                        print(
                            f"Removing folder (no expected_mappings.tsv): {gen3_dd_path}"
                        )
                        shutil.rmtree(prop_folder_path)
                        continue
                    with open(mapping_path) as mapping_file:
                        if len(mapping_file.readlines()) <= 1:
                            print(
                                f"Removing folder (no content in expected_mappings.tsv): {gen3_dd_path}"
                            )
                            shutil.rmtree(prop_folder_path)
                            continue

In [None]:
# You only need to run this once - it will take a while when you run it.
# It's syncronously checking validity of everything generated above and removing anything non-conformant
remove_folders_without_mapping_info(training_files_dir)

Format of output:

- harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2
  - mutated_synthetic_data_with_specific_parameters
    - SDM_synthetic_data_model_0
      - synthetic_data_contribution_0
        - original_real_source_data_model_0
          - expected_mappings.tsv
        - original_real_source_data_model_1
          - expected_mappings.tsv
        - ...
      - ...
    - ...

`expected_mappings.tsv` has the following columns: 

- `ai_model_node_prop_desc`
    - The AI Model generated node property (e.g. the property we want to map/harmonize to a target model)
- `harmonized_model_node_prop_desc`
    - The source of truth target model node property (e.g. the property from the target model we know the above should map to)

and the target model itself is identified by the `original_real_source_data_model` folder that the expected mappings are in.

#### How do we know the proper mapping?

We took mutated synthetic data from our first algorithm and traced backwards.

This is how we got the mutated data in the first place:

27 Real Gen3 Data Models -> 10k Synthetic Data Models -> Synthetic Data Contributions -> Mutated Synthetic Data Contributions

We were able to trace from the mutations, back to the synthetic model, and from the synthetic model we are able to determine
the set of possible mappings for that particular property back to the real data models. 

> Key point: A single mutated property could potentially map back to `n` original real data models, so we collect every option

## Construct Single Benchmark Test File Incrementally

Each test should include a source model, with desire to harmonize to a target model. We expect known mapping / harmonization in `expected_mappings.tsv`, which gets dumped as a string into the `harmonized_mapping` column.

Now let's create a JSONL file with a test per row.

The JSONL file should have 3 columns: `input_source_model`, `input_target_model`, `harmonized_mapping`


In [None]:
# default to getting training for all available target models
target_models_to_get_training_data_for = list(GEN3_MANUAL_DD_PATH.keys())
# Note: if you want to override to specific targets, the options below should be a subset of the names from GEN3_MANUAL_DD_PATH
# ex: target_models_to_get_training_data_for = ["gen3.biodatacatalyst.nhlbi.nih.gov", "data.midrc.org"]
# target_models_to_get_training_data_for = [
#     "gen3.biodatacatalyst.nhlbi.nih.gov",
#     # "data.midrc.org",
# ]

# Specify nodes to skip from ALL target models, e.g. no mapping data will be provided from these nodes in the final training data.
# This is intended to allow removing excessive training data for "standard" and required Gen3-specific nodes
# that exist in every model (e.g. make this more applicable beyond Gen3 models, hopefully a better generalized set of training,
# less percentage of training for what is "standard" nodes)
global_target_model_nodes_to_skip = {"program", "project"}

# specify property endings to skip from ALL target models, e.g. no mapping data will be providedr if the property .endswith 
# something in this list.
# The default is to skip simple ID properties (since we're more interested in training on properties with more complex data)
property_endings_to_skip = [".id"]

target_model_nodes_to_skip = {
    # GEN3_MANUAL_DD_PATH key : set of nodes to REMOVE from the training PER specific model
    # ex: "gen3.biodatacatalyst.nhlbi.nih.gov": {"foobar", "fizzbuzz"}
}

In [None]:
import concurrent.futures
import os
import logging
import csv


def _process_single_sdm_dir(
    entry,
    synth_model_to_original,
    output_filepath,
    target_models_to_get_training_data_for,
    global_target_model_nodes_to_skip,
    target_model_nodes_to_skip,
    property_endings_to_skip,
):
    for synthetic_data_contribution_dir in os.scandir(entry.path):
        if synthetic_data_contribution_dir.is_dir():
            # print(
            #     f"processing synthetic_data_contribution_dir: {synthetic_data_contribution_dir.path}"
            # )
            sdm_records = []

            for target_model_dir in os.scandir(synthetic_data_contribution_dir):
                target_model = os.path.basename(target_model_dir)
                if (
                    target_model_dir.is_dir()
                    and target_model in target_models_to_get_training_data_for
                ):
                    target_model_name = os.path.basename(target_model_dir.path)
                    # print(f"processing target model mappings: {target_model_name}")

                    with open(os.path.join(target_model_dir.path, "expected_mappings.tsv")) as expected_mappings_file:
                        reader = csv.DictReader(
                            expected_mappings_file,
                            fieldnames=["ai_model_node_prop_desc", "harmonized_model_node_prop_desc"],
                            delimiter="\t",
                        )
                        rows = list(reader)

                        # TODO should we filter out properties too, like ones that are just .id?

                        filtered_rows = []
                        for row in rows:
                            if not row:
                                continue

                            if "harmonized_model_node_prop_desc" not in row:
                                continue

                            if not row["harmonized_model_node_prop_desc"]:
                                continue

                            node_name = (
                                row["harmonized_model_node_prop_desc"]
                                .split(".")[0]
                                .strip()
                            )
                            source_property_name = (
                                row["ai_model_node_prop_desc"].split(":")[0].strip()
                            )
                            target_property_name = (
                                row["harmonized_model_node_prop_desc"]
                                .split(":")[0]
                                .strip()
                            )

                            found_ending_to_skip = False
                            for property_ending in property_endings_to_skip:
                                if target_property_name.endswith(
                                    property_ending.strip()
                                ) or source_property_name.endswith(
                                    property_ending.strip()
                                ):
                                    found_ending_to_skip = True
                                    break

                            if found_ending_to_skip:
                                continue

                            if not source_property_name.split(".")[-1].strip():
                                # print(f"skipping row b/c source property appears to be empty")
                                continue

                            if not target_property_name.split(".")[-1].strip():
                                # print(f"skipping row b/c target property appears to be empty")
                                continue

                            if node_name in global_target_model_nodes_to_skip:
                                # print(f"skipping row b/c node_name {node_name} is in global_target_model_nodes_to_skip")
                                continue

                            if node_name in target_model_nodes_to_skip.get(
                                target_model_dir, {}
                            ):
                                continue

                            filtered_rows.append(row)

                        # create jsonl record
                        sdm_name = os.path.basename(entry.path)

                        if filtered_rows:
                            header_line = "ai_model_node_prop_desc\tharmonized_model_node_prop_desc"
                            tsv_lines = []
                            for row in filtered_rows:
                                line = f"{row['ai_model_node_prop_desc']}\t{row['harmonized_model_node_prop_desc']}"

                                # unsure how this happens, but if we get headers again, skip them
                                if line.strip() == header_line:
                                    continue

                                tsv_lines.append(line)

                            harmonized_mapping = (
                                header_line
                                + "\n"
                                + "\n".join(
                                    [
                                        line
                                        for line in list(set(tsv_lines))
                                        if "harmonized_model_node_prop_desc"
                                        not in line
                                    ]
                                )
                            )

                            # unsure where this duplication of headers is happening, but fix it here
                            harmonized_mapping.replace(
                                "{header_line}\n{header_line}\n",
                                "{header_line}\n",
                            )
                            harmonized_mapping = harmonized_mapping.strip()

                            if len(harmonized_mapping.split("\n")) <= 2:
                                # print(
                                #     f"skipping record b/c harmonized_mapping is empty"
                                # )
                                continue

                            record = {
                                "input_source_model": sdm_name,
                                "synthetic_data_contribution_dir": synthetic_data_contribution_dir.path,
                                "input_target_model": target_model_name,
                                "harmonized_mapping": harmonized_mapping,
                            }
                            sdm_records.append(record)

            # Write to JSONL file
            # print(f"Test count: {len(sdm_records)}")
            with open(output_filepath, "w", newline="") as output_file:
                for record in sdm_records:
                    output_file.write(json.dumps(record) + "\n")


def merge_jsonl_files_into_one(output_dir, all_files):
    with open(
        os.path.join(output_dir, "training_data.jsonl"), "w", newline=""
    ) as outfile:
        for filename in all_files:
            with open(filename, "r") as infile:
                for line in infile:
                    outfile.write(line)

    for file in all_files:
        os.remove(file)


def output_training_data_from_dataset_directory(
    input_dir,
    output_dir,
    synth_model_to_original,
    target_models_to_get_training_data_for,
    global_target_model_nodes_to_skip,
    target_model_nodes_to_skip,
    property_endings_to_skip,
    num_workers=32,
):
    os.makedirs(output_dir, exist_ok=True)

    temp_files = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        print(f"processing input_dir: {input_dir}")
        for entry in os.scandir(input_dir):
            if entry.is_dir() and os.path.basename(entry).startswith("mutated_"):
                for entry in os.scandir(entry.path):
                    if entry.is_dir() and os.path.basename(entry).endswith("_tsvs"):
                        # print(f"processing SDM: {entry.path}")
                        output_filepath = os.path.join(output_dir, f"temp_{os.path.basename(entry.path)}.jsonl")
                        temp_files.append(output_filepath)
                        futures.append(
                            executor.submit(
                                _process_single_sdm_dir,
                                entry,
                                synth_model_to_original,
                                output_filepath,
                                target_models_to_get_training_data_for,
                                global_target_model_nodes_to_skip,
                                target_model_nodes_to_skip,
                                property_endings_to_skip,
                            )
                        )

        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as exc:
                print(f"Error in parallel worker: {exc}")
                raise

    merge_jsonl_files_into_one(output_dir, temp_files)

output_training_data_from_dataset_directory(
    input_dir=training_files_dir,
    output_dir=os.path.join(training_files_dir, "_training_data"),
    synth_model_to_original=synth_model_to_original,
    target_models_to_get_training_data_for=target_models_to_get_training_data_for,
    global_target_model_nodes_to_skip=global_target_model_nodes_to_skip,
    target_model_nodes_to_skip=target_model_nodes_to_skip,
    property_endings_to_skip=property_endings_to_skip,
)

### Covert from JSONL to TSV for easier viewing

Since most editors have a hard time easily viewing and filtering on JSONL data, create a denormalized TSV with columns:

```
    "input_source_model",
    "input_target_model",
    "ai_model_node_prop_desc",
    "harmonized_model_node_prop_desc",
```

The last two are extracted from the `harmonized_mapping` from the JSONL and a row is created *per property to property mapping*.

In [None]:
def harmonization_training_data_jsonl_to_csv(jsonl_file, csv_file, input_headers=None):
    """
    Converts a JSONL file to a CSV file.

    Headers must include: `harmonized_mapping`

    This denormalizes the harmonized mapping so each property mapped is its own row.
    """
    input_headers = input_headers or [
        "input_source_model",
        "input_target_model",
        "harmonized_mapping",
    ]

    if "harmonized_mapping" not in input_headers:
        raise Exception("Headers must include: `harmonized_mapping`")

    input_headers.remove("harmonized_mapping")
    output_headers = copy.deepcopy(input_headers)
    output_headers.extend([
        "ai_model_node_prop_desc",
        "harmonized_model_node_prop_desc",
    ])

    with open(jsonl_file, "r") as f_in, open(csv_file, "w", newline="") as f_out:
        writer = csv.writer(f_out)
        writer.writerow(output_headers)

        for line in f_in:
            try:
                data = json.loads(line)
                for single_property_harmonized_mapping in data[
                    "harmonized_mapping"
                ].split("\n")[1:]:
                    ai_model_node_prop_desc, harmonized_model_node_prop_desc = single_property_harmonized_mapping.split("\t")
                    row = []
                    for header in input_headers:
                        if header == "harmonized_mapping":
                            continue
                        row.append(data[header])
                    row += [
                        ai_model_node_prop_desc,
                        harmonized_model_node_prop_desc,
                    ]
                    writer.writerow(row)
            except json.JSONDecodeError as e:
                print(f"Skipping invalid JSON line: {line.strip()} - {e}")


In [None]:
harmonization_training_data_jsonl_to_csv(
    "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2/_training_data/training_data.jsonl",
    "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2/_training_data/training_data.csv",
)

Now you can open and view `../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2/_training_data/training_data.csv`.

### Final Conversion to Full Training Data

The above gets you an intermediate .jsonl (and CSV) without the source or target models expanded to their full JSON data (for size/efficiency and debugging). In other words, we just have names for the target models and source models right now but we want the full JSON of the models in the final training data.

Now we need to expand the source/target model names into their full descriptions. This is a bit more involved, but can be done with a few steps:
1. **Load the JSONL file**: Read the JSONL file from above.
2. **Expand Target Model Names**: Map model names to their full structures for target Gen3 data models
3. **Expand Source Model Names**: Map model names to their full structures for synthetic data models

In [None]:
INTERMEDIATE_JSONL_FILE = "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2/_training_data/training_data.jsonl"
OUTPUT_JSONL_FILE = "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2/_training_data/final_training_data.jsonl"

# note: this only allows 1 source right now, so if you ran the above with multiple folders this may not work
SOURCE_MODELS_ROOT_DIR = "../data/Mutated_SDCs_v3_20250423/mutated_sdc_v3_nmax4_nmin2_pmax75_pmin25_limit20_dmax1000_20250423"


def expand_model_names(input_file, output_file):
    with open(input_file, "r") as input_file, open(output_file, "w") as output_file:
        for line in input_file:
            data = json.loads(line)
            data_to_write = {}

            # Expand Target Model Names
            target_model_name = data["input_target_model"]

            # Find the appropriate JSON file
            target_model_path = GEN3_MANUAL_DD_PATH.get(target_model_name, "").replace("_modified", "")

            if not os.path.exists(target_model_path):
                print(f"No JSON file found for target model: {target_model_name}")
                continue

            with open(target_model_path, "r") as f:
                target_model_data = json.load(f)

            # Replace name with full structure
            data_to_write["input_target_model_name"] = target_model_name
            data_to_write["input_target_model"] = target_model_data

            # Expand Source Model Names
            source_model_name = data["input_source_model"]
            original_synthetic_data_contribution_dir = os.path.basename(data["synthetic_data_contribution_dir"])

            sdm_file_dir = f"{SOURCE_MODELS_ROOT_DIR}/{source_model_name}/{original_synthetic_data_contribution_dir}"

            sdm_data = None
            for entry in os.scandir(sdm_file_dir):
                if entry.is_file() and os.path.basename(entry).endswith("__jsonschema_dd.json"):
                    with open(entry, "r") as f:
                        sdm_data = json.load(f)

            if not sdm_data:
                continue

            # Replace name with full structure
            data_to_write["input_source_model_name"] = source_model_name
            data_to_write["input_source_model"] = sdm_data

            data_to_write["harmonized_mapping"] = data["harmonized_mapping"]

            output_file.write(json.dumps(data_to_write) + "\n")

expand_model_names(INTERMEDIATE_JSONL_FILE, OUTPUT_JSONL_FILE)

We can also denormalize the output so there's a single source property to target property mapping per row with entire source and target models. Note that this file will be very large. Here's how you'd do that:

Now your `OUTPUT_JSONL_FILE` defined above has all the source and target models fully expanded.

In [None]:
harmonization_training_data_jsonl_to_csv(
    "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2/_training_data/final_training_data.jsonl",
    "../datasets/harmonization_training_Mutated_SDCs_v3_20250423_v0.0.2/_training_data/final_training_data.csv",
)