In [1]:
from itertools import islice
from more_itertools import chunked
import os
from pathlib import Path
import subprocess
import time
import yaml

from linkml.validator.loaders import TsvLoader
from linkml_runtime import SchemaView
from linkml_map.transformer.object_transformer import ObjectTransformer

In [2]:
class DataLoader:
    def __init__(self, base_path):
        self.base_path = base_path

    def __getitem__(self, pht_id):
        file_path = os.path.join(self.base_path, f"{pht_id}.tsv")
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"No TSV file found for {pht_id} at {file_path}")
        return TsvLoader(os.path.join(self.base_path, f"{pht_id}.tsv")).iter_instances()

    def __contains__(self, pht_id):
        return os.path.exists(os.path.join(self.base_path, f"{pht_id}.tsv"))

In [3]:
def get_spec_files(directory, search_string):
    """
    Find YAML files in the directory that contain the search_string.
    Returns a sorted list of matching file paths.
    """
    directory = Path(directory)

    result = subprocess.run(
        ['grep', '-rl', search_string, str(directory)],
        stdout=subprocess.PIPE,
        text=True,
        check=True
    )

    file_paths = [
        Path(p.strip()) for p in result.stdout.strip().split('\n')
        if p.strip().endswith(('.yaml', '.yml'))
    ]
    return sorted(file_paths, key=lambda p: p.stem)

In [4]:
def multi_spec_transform(data_loader, spec_files, source_schema, target_schema):
    for file in spec_files:
        print(f"{file.stem}", end='', flush=True)
        try:
            with open(file) as f:
                specs = yaml.safe_load(f)
            for block in specs:
                derivation = block["class_derivations"]
                print(".", end='', flush=True)
                for class_name, class_spec in derivation.items():
                    pht_id = class_spec["populated_from"]
                    rows = data_loader[pht_id]

                    transformer = ObjectTransformer(unrestricted_eval=True)
                    transformer.source_schemaview = SchemaView(source_schema)
                    transformer.target_schemaview = SchemaView(target_schema)
                    transformer.create_transformer_specification(block)

                    for row in rows:
                        mapped = transformer.map_object(row, source_type=pht_id)
                        yield mapped
            print('')
        except Exception as e:
            print(f"\n⚠️  Error processing {file}: {e.__class__.__name__} - {e}")
            print(block)
            import traceback
            traceback.print_exc()
            raise

In [5]:
def yaml_wrap(batches, key_name):
    for i, batch in enumerate(batches):
        yaml_str = yaml.dump({key_name: batch}, default_flow_style=False)
        yield yaml_str if i == 0 else "\n".join(yaml_str.splitlines()[1:]) + "\n"

In [9]:
source_sv = SchemaView("/sbgenomics/project-files/COPDGene/COPDGene_HMB_Schema.yaml")
# source_sv = SchemaView("/sbgenomics/workspace/output/Schema_FHS_v31_c1/schema-automator-data/Schema_FHS_v31_c1.yaml")
# source_sv = SchemaView("/sbgenomics/workspace/output/CHS/Schema_CHS_v7_c1/Schema_CHS_v7_c1.yaml")
# source_sv = SchemaView("/sbgenomics/workspace/output/HCHS_SOL_cleaned/Schema_HCHS_SOL_v1_c1.yaml")
# source_sv = SchemaView("/sbgenomics/workspace/output/MESA/Schema_MESA_v13_c1/Schema_MESA_v13_c1.yaml")
# source_sv = SchemaView("/sbgenomics/workspace/output/WHI/Schema_WHI_v12_c1/Schema_WHI_v12_c1.yaml")
source_schema = source_sv.schema

target_sv = SchemaView("/sbgenomics/workspace/NHLBI-BDC-DMC-HM/src/bdchm/schema/bdchm.yaml")
target_schema = target_sv.schema

var_dir = "/sbgenomics/workspace/NHLBI-BDC-DMC-HV/priority_variables_transform/COPDGene-ingest/"
# var_dir = "/sbgenomics/workspace/NHLBI-BDC-DMC-HV/priority_variables_transform/FHS-ingest/"
# var_dir = "/sbgenomics/workspace/NHLBI-BDC-DMC-HV/priority_variables_transform/CHS-ingest/"
# var_dir = "/sbgenomics/workspace/NHLBI-BDC-DMC-HV/priority_variables_transform/HCHS-ingest/"
# var_dir = "/sbgenomics/workspace/NHLBI-BDC-DMC-HV/priority_variables_transform/MESA-ingest/"
# var_dir = "/sbgenomics/workspace/NHLBI-BDC-DMC-HV/priority_variables_transform/WHI-ingest/"

In [12]:
output_base = "/sbgenomics/workspace/output/test/"
study_dir = "COPDGene-ingest"
os.makedirs(f"{output_base}/{study_dir}/", exist_ok=True)

data_version = "COPDGene-v6-c1"
consent_label = "HMB" # 

data_loader = DataLoader("/sbgenomics/workspace/output/"+ data_version +"/")
# data_loader = DataLoader("/sbgenomics/workspace/output/WHI_cleaned/"+ data_version +"/")

entities = [
    "Condition",
    "Demography",
    "DrugExposure",
    "MeasurementObservation",
    "Observation",
    "Participant", # Not in FHS?
    "Person", # Not in HCHS
    "Procedure", # Not in HCHS, MESA
    "ResearchStudy",
    "SdohObservation"
]

start = time.perf_counter()
for entity in entities:
    print(f"Starting {entity}")
    spec_files = get_spec_files(var_dir, f"^    {entity}:")
    output_path = f"{output_base}/{study_dir}/{data_version}-{entity}-{consent_label}-data.yaml"

    subset = spec_files
    # subset = [p for p in spec_files if p.stem == "angina"]
    # subset = [p for p in spec_files if p.stem >= "spo2"]
    # subset = [p for p in spec_files if p.stem > "afib"]

    iterable = multi_spec_transform(data_loader, subset, source_schema, target_schema)
    chunk_size = 100
    chunks = chunked(iterable, chunk_size)
    
    key_name = entity.lower() + "s"
    with open(output_path, "w") as f:
        # for yaml_batch in batched_yaml_wrapped(iterable, batch_size, key_name):
        #     f.write(yaml_batch)
        for yaml_batch in yaml_wrap(chunks, key_name):
            f.write(yaml_batch)

    print(f"{entity} Complete")

end = time.perf_counter()
print(f"Time: {end - start:.2f} seconds")

Starting Condition
angina.
asthma.
copd.
diabetes.
hist_hrt_failure.
hist_my_inf.
hyperten.
pad.
slp_ap.
stroke.
stroke_isch_atk.
Condition Complete
Starting Demography
demography.
Demography Complete
Starting DrugExposure
tak_adrenergics.
tak_anabolic_steroid.
tak_betablk.
tak_betablk_resp.
tak_cort_steroid_oral.
tak_cort_steroid_resp.
DrugExposure Complete
Starting MeasurementObservation
bdy_hgt.
bdy_wgt.
bmi.
bp_diastolic.
bp_systolic.
fev1.
fev1_fvc.
fvc.
hrt_rt.
spo2.
MeasurementObservation Complete
Starting Observation
cig_smok.
Observation Complete
Starting Participant
participant.
Participant Complete
Starting Person
person.
Person Complete
Starting Procedure
hist_cor_angio.
hist_cor_bypg.
Procedure Complete
Starting ResearchStudy
research_study.
ResearchStudy Complete
Starting SdohObservation
edu_lvl.
SdohObservation Complete
Time: 237.21 seconds
