In [0]:
%run ./util

In [0]:
import re
project_name = "omop-tables"
omop_version = "531"

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("OMOP Tables to Delta") \
        .getOrCreate()

vocab_path = "dbfs:/FileStore/tables/vocabulary/"
output_path = f"/dbfs/{project_name}/{omop_version}/"

def sanitize_column_name(name):
    return re.sub(r'[ ,;{}()\n\t=]', '_', name)

def csv_to_delta(file_info):
    file_path = file_info.path
    file_name = file_info.name.split('.')[0]
    delta_path = f"{output_path}/{file_name}"

    df = spark.read.csv(file_path, header=True, inferSchema=True)

    for col in df.columns:
        df = df.withColumnRenamed(col, sanitize_column_name(col))

    df.write.format("delta").mode("overwrite").save(delta_path)
    print(f"Wrote {file_name} to {delta_path}")
    
files = dbutils.fs.ls(vocab_path)

for file in files:
    if file.name.endswith(".csv"):
        csv_to_delta(file)
    


Wrote CONCEPT to /dbfs/omop-tables/531//CONCEPT
Wrote CONCEPT_ANCESTOR to /dbfs/omop-tables/531//CONCEPT_ANCESTOR
Wrote CONCEPT_CLASS to /dbfs/omop-tables/531//CONCEPT_CLASS
Wrote CONCEPT_CPT4 to /dbfs/omop-tables/531//CONCEPT_CPT4
Wrote CONCEPT_RELATIONSHIP to /dbfs/omop-tables/531//CONCEPT_RELATIONSHIP
Wrote CONCEPT_SYNONYM to /dbfs/omop-tables/531//CONCEPT_SYNONYM
Wrote DOMAIN to /dbfs/omop-tables/531//DOMAIN
Wrote DRUG_STRENGTH to /dbfs/omop-tables/531//DRUG_STRENGTH
Wrote RELATIONSHIP to /dbfs/omop-tables/531//RELATIONSHIP
Wrote VOCABULARY to /dbfs/omop-tables/531//VOCABULARY


/Workspace/Users/s0382292@stanfordhealthcare.org


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Drop Vocabulary Tables") \
    .getOrCreate()

catalog = "ancillary_projects"
schema = "cdm_synthea10"

vocab_tables = [
    "CONCEPT",
    "CONCEPT_ANCESTOR",
    "CONCEPT_CLASS",
    "CONCEPT_CPT4",
    "CONCEPT_RELATIONSHIP",
    "CONCEPT_SYNONYM",
    "DOMAIN",
    "DRUG_STRENGTH",
    "RELATIONSHIP",
    "VOCABULARY"
]

for table in vocab_tables:
    table_name = f"{catalog}.{schema}.{table}"
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    print(f"Dropped table {table_name}")



Dropped table ancillary_projects.cdm_synthea10.CONCEPT
Dropped table ancillary_projects.cdm_synthea10.CONCEPT_ANCESTOR
Dropped table ancillary_projects.cdm_synthea10.CONCEPT_CLASS
Dropped table ancillary_projects.cdm_synthea10.CONCEPT_CPT4
Dropped table ancillary_projects.cdm_synthea10.CONCEPT_RELATIONSHIP
Dropped table ancillary_projects.cdm_synthea10.CONCEPT_SYNONYM
Dropped table ancillary_projects.cdm_synthea10.DOMAIN
Dropped table ancillary_projects.cdm_synthea10.DRUG_STRENGTH
Dropped table ancillary_projects.cdm_synthea10.RELATIONSHIP
Dropped table ancillary_projects.cdm_synthea10.VOCABULARY


In [0]:
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

project_name = "omop-tables"
omop_version = "531"

spark = SparkSession.builder \
    .appName("OMOP Tables to Delta") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

vocab_path = "dbfs:/FileStore/tables/vocabulary/"
catalog = "ancillary_projects"
schema = "cdm_synthea10"

def sanitize_column_name(name):
    return re.sub(r'[ ,;{}()\n\t=]', '_', name)

def csv_to_delta(file_info):
    file_path = file_info.path
    file_name = file_info.name.split('.')[0]
    table_name = f"{catalog}.{schema}.{file_name}"

    df = spark.read.csv(file_path, header=True, inferSchema=True, sep='\t', quote='"', escape='"')

    if df.columns[0] == "concept_id_concept_name_domain_id_vocabulary_id_concept_class_id_standard_concept_concept_code_valid_start_date_valid_end_date_invalid_reason":
        df = df.withColumn("raw_data", col(df.columns[0]))

        expected_columns = [
            "CONCEPT_ID", "CONCEPT_NAME", "DOMAIN_ID", "VOCABULARY_ID", "CONCEPT_CLASS_ID",
            "STANDARD_CONCEPT", "CONCEPT_CODE", "VALID_START_DATE", "VALID_END_DATE", "INVALID_REASON"
        ]

        df = df.withColumn("split_data", split(col("raw_data"), r'\t')).select(
            *[col("split_data")[i].alias(expected_columns[i]) for i in range(len(expected_columns))]
        ).drop("raw_data")

    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, sanitize_column_name(col_name))

    df.write.format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .saveAsTable(table_name)
    print(f"Wrote {file_name} to {table_name}")

# List all files in the vocabulary path
files = dbutils.fs.ls(vocab_path)

# Process each CSV file
for file in files:
    if file.name.endswith(".csv"):
        csv_to_delta(file)


Wrote CONCEPT to ancillary_projects.cdm_synthea10.CONCEPT
Wrote CONCEPT_ANCESTOR to ancillary_projects.cdm_synthea10.CONCEPT_ANCESTOR
Wrote CONCEPT_CLASS to ancillary_projects.cdm_synthea10.CONCEPT_CLASS
Wrote CONCEPT_CPT4 to ancillary_projects.cdm_synthea10.CONCEPT_CPT4
Wrote CONCEPT_RELATIONSHIP to ancillary_projects.cdm_synthea10.CONCEPT_RELATIONSHIP
Wrote CONCEPT_SYNONYM to ancillary_projects.cdm_synthea10.CONCEPT_SYNONYM
Wrote DOMAIN to ancillary_projects.cdm_synthea10.DOMAIN
Wrote DRUG_STRENGTH to ancillary_projects.cdm_synthea10.DRUG_STRENGTH
Wrote RELATIONSHIP to ancillary_projects.cdm_synthea10.RELATIONSHIP
Wrote VOCABULARY to ancillary_projects.cdm_synthea10.VOCABULARY


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
import json
import re
from decimal import Decimal
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, tzinfo, date
import pytz
import collections
import logging

In [0]:
spark = SparkSession.builder \
    .appName("Extracting Patients Details") \
        .getOrCreate()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [0]:
GENDER_LABEL = {
    "M": "primary",
    "W": "danger",
    "F": "danger"
}
GENDER_MAP = {
    "M": "M",
    "W": "F",
    "F": "F",
    "MALE": "M",
    "FEMALE": "F"
}

COLOR_MAP = {
    "Condition": "#4daf4a",
    "Drug": "#eb9adb",
    "Measurement": "#80b1d3",
    "Observation": "#ccffff",
    "Procedure": "#ff7f00"
}

MEASURE_FLAG_MAP = {
    "L": {
        "color": "#fb8072"
    },
    "H": {
        "color": "#fb8072"
    }
}


INFO:py4j.clientserver:Received command c on object id p0


In [0]:
_tz = pytz.timezone('US/Eastern')
_epoch = datetime(year=1970, month=1, day=1, tzinfo=_tz)
_day_seconds = 24 * 3600
_milli = 10**6

def _mktime(dt):
    td = dt - _epoch
    res = (td.microseconds + (td.seconds + td.days * _day_seconds) * _milli) / _milli
    return int(res - res % _day_seconds)

def to_time(date_str):
    return _mktime(datetime(year=int(date_str[0:4]), month=int(date_str[4:6]), day=int(date_str[6:8]), tzinfo=_tz))

def from_time(stamp):
    return datetime.fromtimestamp(stamp, _tz).strftime("%Y%m%d")

def next_day(stamp):
    return shift_days(stamp, 1)

def shift_days(stamp, days):
    return _mktime(_epoch + timedelta(days=days, seconds=stamp))

Imported Utilities functions from util.py notebook - Included Time Completed

In [0]:
import os
import json
import sys
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from decimal import Decimal
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("OMOPDataProcessing").getOrCreate()

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OMOP:
    def __init__(self, catalog, schema, debug_output):
        self.catalog = catalog
        self.schema = schema
        self._parents = {}
        self._codes = {}
        self.debug_output = debug_output

    def _exec(self, query):
        df = spark.sql(query)
        return df.collect()

    def _exec_one(self, query):
        result = self._exec(query)
        if not result:
            raise ValueError(f"expected one result row got 0\n{query}\n")
        return result[0]

    def list_patients(self, patients, prefix="", limit=None, show_old_ids=False):
        limit_str = f" LIMIT {limit}" if limit else ""
        query = f"SELECT person_id, person_source_value FROM {self.catalog}.{self.schema}.person{limit_str}"
        results = self._exec(query)
        for r in results:
            patients.add(str(prefix) + (str(r['person_id']) if not show_old_ids else str(r['person_source_value']) + '.json'))

    def get_person_id(self, pid):
        query = f"SELECT person_id FROM {self.catalog}.{self.schema}.person WHERE person_source_value = '{pid}'"
        return str(self._exec_one(query)['person_id'])

    def add_info(self, obj, id, key, value, has_label=False, label=""):
        for info in obj["info"]:
            if info["id"] == id:
                if str(value) != str(info["value"]):
                    print(f'duplicate "{id}" new: {value} old: {info["value"]}', file=sys.stderr)
                return
        node = {"id": id, "name": key, "value": value}
        if has_label:
            node["label"] = label
        obj["info"].append(node)

    def get_info(self, pid, obj):
        query = f"""SELECT
             concept_name as gender_concept_name,
             person_source_value,
             year_of_birth
            FROM
             {self.catalog}.{self.schema}.person
            LEFT JOIN {self.catalog}.{self.schema}.concept ON (
             gender_concept_id = concept_id
            ) WHERE
             person_id = '{pid}'
        """
        result = self._exec_one(query)
        if result['person_source_value']:
            self.add_info(obj, 'id_alt', 'ID', str(result['person_source_value']) + ".json")
        self.add_info(obj, 'born', 'Born', int(result['year_of_birth']))
        gender = str(result['gender_concept_name'])
        self.add_info(obj, 'gender', 'Gender', GENDER_MAP.get(gender.upper(), 'U'), True, GENDER_LABEL.get(gender, 'U'))
        
    def to_time(self, value):
        return toTime(value.strftime("%Y%m%d"))

    def create_event(self, group, id, claim_id, has_result=False, result_flag="", result=""):
        res = {"id": id, "group": group}
        if claim_id is not None:
            res["row_id"] = claim_id
        if has_result:
            res["flag_value"] = result
            res["flag"] = result_flag
        return res

    def add_dict(self, dict, new_dict_entries, group, prefix, id, name, desc, code, unmapped):
        alt_hierarchies = f'{group}_{prefix}'
        if group not in dict:
            dict[group] = {}
            dict[group][""] = {
                "id": "",
                "name": group,
                "desc": group,
                "color": COLOR_MAP.get(group, "lightgray"),
                "parent": ""
            }
            if alt_hierarchies in self._codes:
                ah = {}
                if alt_hierarchies in self._parents:
                    ah = self._parents[alt_hierarchies]
                for key, value in self._codes[alt_hierarchies].items():
                    dict[group][key] = {
                        "id": key,
                        "name": value,
                        "desc": value,
                        "los": 0,
                        "parent": ah.get(key, "")
                    }
            if group == "Measurement":
                dict[group][""]["flags"] = MEASURE_FLAG_MAP
        g = dict[group]
        full_id = f'{prefix}{id}'
        if full_id not in g:
            res = {"id": id, "name": name, "desc": desc, "parent": ""}
            if unmapped:
                res["unmapped"] = True
            g[full_id] = res
            do_add = True
            if alt_hierarchies in self._parents:
                ah = self._parents[alt_hierarchies]
                if code in ah:
                    res["los"] = 0
                    res["parent"] = ah[code]
                    do_add = False
                else:
                    code = code.replace('.', '')
                    if code in ah:
                        res["parent"] = ah[code]
                        do_add = False
            if id != 0 and do_add:
                new_dict_entries.add(str(id))

    def get_dict_entry(self, dict, group, prefix, id):
        if group not in dict:
            return None
        full_id = f'{prefix}{id}'
        return dict[group].get(full_id, None)

    def update_hierarchies(self, dict, new_dict_entries):
        while new_dict_entries:
            query = """SELECT
                 c.concept_id as c_id,
                 c.domain_id as c_domain,
                 c.concept_name as c_name,
                 c.vocabulary_id as c_vocab,
                 c.concept_code as c_num,
                 ca.min_levels_of_separation as c_distance,
                 ca.descendant_concept_id as c_desc_id,
                 cc.domain_id as c_desc_domain,
                 cc.vocabulary_id as c_desc_vocab
                FROM
                 {catalog}.{schema}.concept_ancestor as ca
                LEFT JOIN {catalog}.{schema}.concept as c ON (
                 c.concept_id = ca.ancestor_concept_id
                ) LEFT JOIN {catalog}.{schema}.concept as cc ON (
                 cc.concept_id = ca.descendant_concept_id
                ) WHERE
                 ca.descendant_concept_id != 0
                 AND ca.ancestor_concept_id != 0
                 AND ca.descendant_concept_id IN ({id_list})
            """.format(catalog=self.catalog, schema=self.schema, id_list=','.join(sorted(list(new_dict_entries))))
            result = self._exec(query)
            new_dict_entries.clear()
            for row in result:
                parent_id = str(row['c_id'])
                parent_group = row['c_domain']
                parent_name = row['c_name']
                parent_vocab = row['c_vocab']
                parent_code = row['c_num']
                unmapped = False
                if parent_code == 0:
                    parent_code = row['c_orig']
                    unmapped = True
                parent_desc = f'{parent_name} ({parent_vocab} {parent_code})'
                self.add_dict(dict, new_dict_entries, parent_group, parent_vocab, parent_id, parent_name, parent_desc, parent_code, unmapped)
                dos = int(row['c_distance'])
                desc_id = str(row['c_desc_id'])
                desc_vocab = row['c_desc_vocab']
                desc_group = row['c_desc_domain']
                if desc_group != parent_group:
                    print(f"WARNING: intra group inheritance: {parent_group} << {desc_group}", file=sys.stderr)
                else:
                    desc_entry = self.get_dict_entry(dict, desc_group, desc_vocab, desc_id)
                    if desc_entry is not None and parent_id != desc_id and ('dos' not in desc_entry or desc_entry['dos'] > dos):
                        desc_entry['dos'] = dos
                        desc_entry['parent'] = f'{parent_vocab}{parent_id}'
            new_dict_entries.clear()

    def get_diagnoses(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.condition_occurrence_id as id_row,
            o.condition_start_date as date_start,
            o.condition_end_date as date_end,
            o.condition_concept_id as d_id,
            o.condition_source_value as d_orig,
            c.domain_id as d_domain,
            c.concept_name as d_name,
            c.vocabulary_id as d_vocab,
            c.concept_code as d_num
           FROM
            {self.catalog}.{self.schema}.condition_occurrence as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.condition_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['d_num']
            unmapped = False
            if code == 0:
                code = row['d_orig']
                unmapped = True
            id_row = f'c{row["id_row"]}'
            d_id = row['d_id']
            name = row['d_name']
            vocab = row['d_vocab']
            group = "Condition" if row['d_domain'] is None else row['d_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            date_start = self.to_time(row['date_start'])
            date_end = self.to_time(row['date_end']) if row['date_end'] else date_start
            date_cur = date_start
            while date_cur <= date_end:
                event = self.create_event(group, f'{vocab}{d_id}', id_row)
                event['time'] = date_cur
                obj['events'].append(event)
                date_cur = nextDay(date_cur)

    def get_procedures(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.procedure_occurrence_id as id_row,
            o.procedure_date as p_date,
            o.procedure_concept_id as p_id,
            o.procedure_source_value as p_orig,
            c.domain_id as p_domain,
            c.concept_name as p_name,
            c.vocabulary_id as p_vocab,
            c.concept_code as p_num
           FROM
            {self.catalog}.{self.schema}.procedure_occurrence as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.procedure_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['p_num']
            unmapped = False
            if code == 0:
                code = row['p_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['p_id']
            name = row['p_name']
            vocab = row['p_vocab']
            group = "Procedure" if row['p_domain'] is None else row['p_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row)
            event['time'] = self.to_time(row['p_date'])
            obj['events'].append(event)

    def get_observations_concept_valued(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.observation_id as id_row,
            o.observation_date as o_date,
            o.observation_concept_id as o_id,
            o.observation_source_value as o_orig,
            o.value_as_concept_id as o_val_concept,
            c_val.concept_name as o_val_concept_name,
            c.domain_id as o_domain,
            c.concept_name as o_name,
            c.vocabulary_id as o_vocab,
            c.concept_code as o_num
           FROM
            {self.catalog}.{self.schema}.observation as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.observation_concept_id
           )
           INNER JOIN {self.catalog}.{self.schema}.concept as c_val ON (
            c_val.concept_id = o.value_as_concept_id
           )
           WHERE
            o.person_id = '{pid}'
            AND o.value_as_concept_id IS NOT NULL
        """
        for row in self._exec(query):
            code = row['o_num']
            unmapped = False
            if code == 0:
                code = row['o_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['o_id']
            name = "unknown" if row['o_name'] is None else row['o_name']
            vocab = row['o_vocab']
            group = "Observation" if row['o_domain'] is None else row['o_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, "C", str(row['o_val_concept_name']))
            event['time'] = self.to_time(row['o_date'])
            obj['events'].append(event)

    def get_observations_string_valued(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.observation_id as id_row,
            o.observation_date as o_date,
            o.observation_concept_id as o_id,
            o.observation_source_value as o_orig,
            o.value_as_string as o_val_string,
            c.domain_id as o_domain,
            c.concept_name as o_name,
            c.vocabulary_id as o_vocab,
            c.concept_code as o_num
           FROM
            {self.catalog}.{self.schema}.observation as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.observation_concept_id
           )
           WHERE
            o.person_id = '{pid}'
            AND o.value_as_string IS NOT NULL
        """
        for row in self._exec(query):
            code = row['o_num']
            unmapped = False
            if code == 0:
                code = row['o_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['o_id']
            name = "unknown" if row['o_name'] is None else row['o_name']
            vocab = row['o_vocab']
            group = "Observation" if row['o_domain'] is None else row['o_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, "S", row['o_val_string'])
            event['time'] = self.to_time(row['o_date'])
            obj['events'].append(event)

    def get_observations_number_valued(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.observation_id as id_row,
            o.observation_date as o_date,
            o.observation_concept_id as o_id,
            o.observation_source_value as o_orig,
            o.value_as_number as o_val_number,
            c.domain_id as o_domain,
            c.concept_name as o_name,
            c.vocabulary_id as o_vocab,
            c.concept_code as o_num
           FROM
            {self.catalog}.{self.schema}.observation as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.observation_concept_id
           )
           WHERE
            o.person_id = '{pid}'
            AND o.value_as_number IS NOT NULL
        """
        for row in self._exec(query):
            code = row['o_num']
            unmapped = False
            if code == 0:
                code = row['o_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['o_id']
            name = "unknown" if row['o_name'] is None else row['o_name']
            vocab = row['o_vocab']
            group = "Observation" if row['o_domain'] is None else row['o_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, "N", str(row['o_val_number']))
            event['time'] = self.to_time(row['o_date'])
            obj['events'].append(event)

    def get_drugs(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.drug_exposure_id as id_row,
            o.drug_exposure_start_date as date_start,
            o.drug_exposure_end_date as date_end,
            o.drug_concept_id as m_id,
            o.drug_source_value as m_orig,
            c.domain_id as m_domain,
            c.concept_name as m_name,
            c.vocabulary_id as m_vocab,
            c.concept_code as m_num
           FROM
            {self.catalog}.{self.schema}.drug_exposure as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.drug_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['m_num']
            unmapped = False
            if code == 0:
                code = row['m_orig']
                unmapped = True
            id_row = f'm{row["id_row"]}'
            d_id = row['m_id']
            name = row['m_name']
            vocab = row['m_vocab']
            group = "Drug" if row['m_domain'] is None else row['m_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            date_start = self.to_time(row['date_start'])
            date_end = self.to_time(row['date_end']) if row['date_end'] else date_start
            date_cur = date_start
            while date_cur <= date_end:
                event = self.create_event(group, f'{vocab}{d_id}', id_row)
                event['time'] = date_cur
                obj['events'].append(event)
                date_cur = nextDay(date_cur)

    def get_measurements(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.measurement_id as id_row,
            o.measurement_date as m_date,
            o.measurement_concept_id as m_id,
            o.measurement_source_value as m_orig,
            o.value_source_value as m_orig_value,
            o.value_as_number as m_value,
            o.range_low as m_low,
            o.range_high as m_high,
            c.domain_id as m_domain,
            c.concept_name as m_name,
            c.vocabulary_id as m_vocab,
            c.concept_code as m_num
           FROM
            {self.catalog}.{self.schema}.measurement as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.measurement_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['m_num']
            unmapped = False
            if code == 0:
                code = row['m_orig']
                unmapped = True
            id_row = f'l{row["id_row"]}'
            d_id = row['m_id']
            name = row['m_name']
            vocab = row['m_vocab']
            group = "Measurement" if row['m_domain'] is None else row['m_domain']
            lab_value = row['m_value']
            lab_low = row['m_low']
            lab_high = row['m_high']

            lab_value = float(lab_value) if lab_value is not None and isinstance(lab_value, (int, float, Decimal)) else float('-inf')
            lab_low = float(lab_low) if lab_low is not None and isinstance(lab_low, (int, float, Decimal)) else float('-inf')
            lab_high = float(lab_high) if lab_high is not None and isinstance(lab_high, (int, float, Decimal)) else float('inf')

            lab_flag = ""
            if lab_value is not None and lab_value != float('-inf'):
                if lab_value <= lab_low:
                    lab_flag = "L"
                elif lab_value >= lab_high:
                    lab_flag = "H"
            else:
                lab_value = "n/a"

            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, lab_flag, str(lab_value))
            event['time'] = self.to_time(row['m_date'])
            obj['events'].append(event)

    def get_visits(self, pid, obj):
        classes = obj["classes"]
        if not classes:
            return
        query = f"""
            SELECT v.visit_start_date as date_start, v.visit_end_date as date_end, c.concept_name as c_name
            FROM {self.catalog}.{self.schema}.visit_occurrence as v
            LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (v.visit_concept_id = c.concept_id)
            WHERE v.person_id = '{pid}' AND c.concept_name IN ({','.join([f"'{k}'" for k in classes.keys()])})
        """
        v_spans = obj["v_spans"]
        for row in self._exec(query):
            visit_name = str(row['c_name'])
            date_start_dt = self.to_time(row['date_start'])
            date_end_dt = self.to_time(row['date_end'])

            if date_start_dt is None:
                continue

            v_spans.append({"class": visit_name, "from": self.format_time(date_start_dt), "to": self.format_time(date_end_dt)})

    def get_patient(self, pid, dictionary, line_file, class_file):
        obj = {
            "info": [],
            "events": [],
            "h_bars": [],
            "v_bars": ["auto"],
            "v_spans": [],
            "classes": {}
        }
        new_dict_entries = set()
        add_files(obj, line_file, class_file)
        self.get_info(pid, obj)
        self.add_info(obj, "pid", "Patient", pid)
        self.get_diagnoses(pid, obj, dictionary, new_dict_entries)
        self.get_observations_concept_valued(pid, obj, dictionary, new_dict_entries)
        self.get_observations_string_valued(pid, obj, dictionary, new_dict_entries)
        self.get_observations_number_valued(pid, obj, dictionary, new_dict_entries)
        self.get_procedures(pid, obj, dictionary, new_dict_entries)
        self.get_drugs(pid, obj, dictionary, new_dict_entries)
        self.get_measurements(pid, obj, dictionary, new_dict_entries)
        self.get_visits(pid, obj)
        min_time = float('inf')
        max_time = float('-inf')
        for e in obj["events"]:
            time = e["time"]
            if time < min_time:
                min_time = time
            if time > max_time:
                max_time = time
        obj["start"] = min_time
        obj["end"] = max_time
        self.add_info(obj, "event_count", "Events", len(obj["events"]))
        self.update_hierarchies(dictionary, new_dict_entries)
        return obj

def generate_patient_files(catalog, schema, batch_size=10):
    settings = {
        'catalog': catalog,
        'schema': schema,
        'omop_use_alt_hierarchies': False,
        'use_cache': True,  # This line sets use_cache to False
        'ccs_diag': '/dbfs/omop-tables/531/ccs_diag.csv',
        'ccs_proc': '/dbfs/omop-tables/531/ccs_proc.csv',
    }

    omop = OMOP(settings['catalog'], settings['schema'], True)

    patients = set()
    patients_path = '/dbfs/omop-tables/531/patients.txt'
    if os.path.isfile(patients_path):
        with open(patients_path, 'r') as pf:
            patients = set(pf.read().splitlines())
    else:
        logger.info(f"File {patients_path} does not exist. Starting with an empty patients set.")

    omop.list_patients(patients, prefix="json/")
    save_patients(patients)
    use_cache = settings.get('use_cache', True)

    dictionary = {}
    dictionary_path = '/dbfs/omop-tables/531/json/dictionary.json'
    if os.path.isfile(dictionary_path):
        with open(dictionary_path, 'r') as df:
            dictionary = json.load(df)

    # Ensure the 'json' directory exists
    json_dir = '/dbfs/omop-tables/531/json'
    if not os.path.exists(json_dir):
        os.makedirs(json_dir)
        logger.info(f"Created directory: {json_dir}")
    else:
        logger.info(f"Directory already exists: {json_dir}")

    patient_list = list(patients)
    total_patients = len(patient_list)
    logger.info(f"Total patients to process: {total_patients}")

    def process_patient(patient):
        pid = patient.split('/')[-1].replace('.json', '')
        cache_file = os.path.join(json_dir, f"{pid}.json")
        
        if not os.path.isfile(cache_file) or not use_cache:
            patient_data = omop.get_patient(pid, dictionary, None, None)
            with open(cache_file, 'w') as cf:
                json.dump(patient_data, cf)
                logger.info(f"Created JSON file for patient {pid}: {cache_file}")
        
        return dictionary

    with ThreadPoolExecutor(max_workers=4) as executor:
        future_to_patient = {executor.submit(process_patient, patient): patient for patient in patient_list}
        for future in as_completed(future_to_patient):
            patient = future_to_patient[future]
            try:
                result = future.result()
                dictionary.update(result)
            except Exception as exc:
                logger.error(f'{patient} generated an exception: {exc}')

    with open(dictionary_path, 'w') as df:
        json.dump(dictionary, df)


    with ThreadPoolExecutor(max_workers=4) as executor:
        future_to_patient = {executor.submit(process_patient, patient): patient for patient in patient_list}
        for future in as_completed(future_to_patient):
            patient = future_to_patient[future]
            try:
                result = future.result()
                dictionary.update(result)
            except Exception as exc:
                logger.error(f'{patient} generated an exception: {exc}')

    with open(dictionary_path, 'w') as df:
        json.dump(dictionary, df)

def save_patients(patients):
    patients_path = '/dbfs/omop-tables/531/patients.txt'
    # Ensure the directory exists
    os.makedirs(os.path.dirname(patients_path), exist_ok=True)
    with open(patients_path, 'w') as pf:
        pf.write('\n'.join(sorted(list(patients))))
        pf.flush()

if __name__ == '__main__':
    catalog = 'ancillary_projects'
    schema = 'cdm_synthea10'
    generate_patient_files(catalog, schema, batch_size=10)


INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:Directory already exists: /dbfs/omop-tables/531/json
INFO:__main__:Total patients to process: 11535
INFO:__main__:Created JSON file for patient 8257: /dbfs/omop-tables/531/json/8257.json
INFO:__main__:Created JSON file for patient 83: /dbfs/omop-tables/531/json/83.json
INFO:__main__:Created JSON file for patient 8336: /dbfs/omop-tables/531/json/8336.json
INFO:__main__:Created JSON file for patient 4871: /dbfs/omop-tables/531/json/4871.json


No Util's functions - Time Component Removed 

In [0]:
import os
import json
import sys
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from decimal import Decimal
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("OMOPDataProcessing").getOrCreate()

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class OMOP:
    def __init__(self, catalog, schema, debug_output):
        self.catalog = catalog
        self.schema = schema
        self._parents = {}
        self._codes = {}
        self.debug_output = debug_output

    def _exec(self, query):
        df = spark.sql(query)
        return df.collect()

    def _exec_one(self, query):
        result = self._exec(query)
        if not result:
            raise ValueError(f"expected one result row got 0\n{query}\n")
        return result[0]

    def list_patients(self, patients, prefix="", limit=None, show_old_ids=False):
        limit_str = f" LIMIT {limit}" if limit else ""
        query = f"SELECT person_id, person_source_value FROM {self.catalog}.{self.schema}.person{limit_str}"
        results = self._exec(query)
        for r in results:
            patients.add(str(prefix) + (str(r['person_id']) if not show_old_ids else str(r['person_source_value']) + '.json'))

    def get_person_id(self, pid):
        query = f"SELECT person_id FROM {self.catalog}.{self.schema}.person WHERE person_source_value = '{pid}'"
        return str(self._exec_one(query)['person_id'])

    def add_info(self, obj, id, key, value, has_label=False, label=""):
        for info in obj["info"]:
            if info["id"] == id:
                if str(value) != str(info["value"]):
                    print(f'duplicate "{id}" new: {value} old: {info["value"]}', file=sys.stderr)
                return
        node = {"id": id, "name": key, "value": value}
        if has_label:
            node["label"] = label
        obj["info"].append(node)

    def get_info(self, pid, obj):
        query = f"""SELECT
             concept_name as gender_concept_name,
             person_source_value,
             year_of_birth
            FROM
             {self.catalog}.{self.schema}.person
            LEFT JOIN {self.catalog}.{self.schema}.concept ON (
             gender_concept_id = concept_id
            ) WHERE
             person_id = '{pid}'
        """
        result = self._exec_one(query)
        if result['person_source_value']:
            self.add_info(obj, 'id_alt', 'ID', str(result['person_source_value']) + ".json")
        self.add_info(obj, 'born', 'Born', int(result['year_of_birth']))
        gender = str(result['gender_concept_name'])
        self.add_info(obj, 'gender', 'Gender', GENDER_MAP.get(gender.upper(), 'U'), True, GENDER_LABEL.get(gender, 'U'))
        
    # def to_time(self, value):
    #     if isinstance(value, datetime):
    #         return value.replace(tzinfo=None)
    #     elif isinstance(value, date):
    #         return datetime.combine(value, datetime.min.time()).replace(tzinfo=None)
    #     elif isinstance(value, str):
    #         return datetime.strptime(value, "%Y-%m-%d").replace(tzinfo=None)
    #     else:
    #         raise ValueError(f"Unsupported date format: {value}")


    def create_event(self, group, id, claim_id, has_result=False, result_flag="", result=""):
        res = {"id": id, "group": group}
        if claim_id is not None:
            res["row_id"] = claim_id
        if has_result:
            res["flag_value"] = result
            res["flag"] = result_flag
        return res

    def add_dict(self, dict, new_dict_entries, group, prefix, id, name, desc, code, unmapped):
        alt_hierarchies = f'{group}_{prefix}'
        if group not in dict:
            dict[group] = {}
            dict[group][""] = {
                "id": "",
                "name": group,
                "desc": group,
                "color": COLOR_MAP.get(group, "lightgray"),
                "parent": ""
            }
            if alt_hierarchies in self._codes:
                ah = {}
                if alt_hierarchies in self._parents:
                    ah = self._parents[alt_hierarchies]
                for key, value in self._codes[alt_hierarchies].items():
                    dict[group][key] = {
                        "id": key,
                        "name": value,
                        "desc": value,
                        "los": 0,
                        "parent": ah.get(key, "")
                    }
            if group == "Measurement":
                dict[group][""]["flags"] = MEASURE_FLAG_MAP
        g = dict[group]
        full_id = f'{prefix}{id}'
        if full_id not in g:
            res = {"id": id, "name": name, "desc": desc, "parent": ""}
            if unmapped:
                res["unmapped"] = True
            g[full_id] = res
            do_add = True
            if alt_hierarchies in self._parents:
                ah = self._parents[alt_hierarchies]
                if code in ah:
                    res["los"] = 0
                    res["parent"] = ah[code]
                    do_add = False
                else:
                    code = code.replace('.', '')
                    if code in ah:
                        res["parent"] = ah[code]
                        do_add = False
            if id != 0 and do_add:
                new_dict_entries.add(str(id))

    def get_dict_entry(self, dict, group, prefix, id):
        if group not in dict:
            return None
        full_id = f'{prefix}{id}'
        return dict[group].get(full_id, None)

    def update_hierarchies(self, dict, new_dict_entries):
        while new_dict_entries:
            query = """SELECT
                 c.concept_id as c_id,
                 c.domain_id as c_domain,
                 c.concept_name as c_name,
                 c.vocabulary_id as c_vocab,
                 c.concept_code as c_num,
                 ca.min_levels_of_separation as c_distance,
                 ca.descendant_concept_id as c_desc_id,
                 cc.domain_id as c_desc_domain,
                 cc.vocabulary_id as c_desc_vocab
                FROM
                 {catalog}.{schema}.concept_ancestor as ca
                LEFT JOIN {catalog}.{schema}.concept as c ON (
                 c.concept_id = ca.ancestor_concept_id
                ) LEFT JOIN {catalog}.{schema}.concept as cc ON (
                 cc.concept_id = ca.descendant_concept_id
                ) WHERE
                 ca.descendant_concept_id != 0
                 AND ca.ancestor_concept_id != 0
                 AND ca.descendant_concept_id IN ({id_list})
            """.format(catalog=self.catalog, schema=self.schema, id_list=','.join(sorted(list(new_dict_entries))))
            result = self._exec(query)
            new_dict_entries.clear()
            for row in result:
                parent_id = str(row['c_id'])
                parent_group = row['c_domain']
                parent_name = row['c_name']
                parent_vocab = row['c_vocab']
                parent_code = row['c_num']
                unmapped = False
                if parent_code == 0:
                    parent_code = row['c_orig']
                    unmapped = True
                parent_desc = f'{parent_name} ({parent_vocab} {parent_code})'
                self.add_dict(dict, new_dict_entries, parent_group, parent_vocab, parent_id, parent_name, parent_desc, parent_code, unmapped)
                dos = int(row['c_distance'])
                desc_id = str(row['c_desc_id'])
                desc_vocab = row['c_desc_vocab']
                desc_group = row['c_desc_domain']
                if desc_group != parent_group:
                    print(f"WARNING: intra group inheritance: {parent_group} << {desc_group}", file=sys.stderr)
                else:
                    desc_entry = self.get_dict_entry(dict, desc_group, desc_vocab, desc_id)
                    if desc_entry is not None and parent_id != desc_id and ('dos' not in desc_entry or desc_entry['dos'] > dos):
                        desc_entry['dos'] = dos
                        desc_entry['parent'] = f'{parent_vocab}{parent_id}'
            new_dict_entries.clear()

    def get_diagnoses(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.condition_occurrence_id as id_row,
            o.condition_start_date as date_start,
            o.condition_end_date as date_end,
            o.condition_concept_id as d_id,
            o.condition_source_value as d_orig,
            c.domain_id as d_domain,
            c.concept_name as d_name,
            c.vocabulary_id as d_vocab,
            c.concept_code as d_num
           FROM
            {self.catalog}.{self.schema}.condition_occurrence as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.condition_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['d_num']
            unmapped = False
            if code == 0:
                code = row['d_orig']
                unmapped = True
            id_row = f'c{row["id_row"]}'
            d_id = row['d_id']
            name = row['d_name']
            vocab = row['d_vocab']
            group = "Condition" if row['d_domain'] is None else row['d_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            #date_start = self.to_time(row['date_start'])
            #date_end = self.to_time(row['date_end']) if row['date_end'] else date_start
            #date_cur = date_start
            #while date_cur <= date_end:
            event = self.create_event(group, f'{vocab}{d_id}', id_row)
                #event['time'] = date_cur
            obj['events'].append(event)
                #date_cur = date_cur + timedelta(days=1)

    def get_procedures(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.procedure_occurrence_id as id_row,
            o.procedure_date as p_date,
            o.procedure_concept_id as p_id,
            o.procedure_source_value as p_orig,
            c.domain_id as p_domain,
            c.concept_name as p_name,
            c.vocabulary_id as p_vocab,
            c.concept_code as p_num
           FROM
            {self.catalog}.{self.schema}.procedure_occurrence as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.procedure_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['p_num']
            unmapped = False
            if code == 0:
                code = row['p_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['p_id']
            name = row['p_name']
            vocab = row['p_vocab']
            group = "Procedure" if row['p_domain'] is None else row['p_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row)
            #event['time'] = self.to_time(row['p_date'])
            obj['events'].append(event)

    def get_observations_concept_valued(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.observation_id as id_row,
            o.observation_date as o_date,
            o.observation_concept_id as o_id,
            o.observation_source_value as o_orig,
            o.value_as_concept_id as o_val_concept,
            c_val.concept_name as o_val_concept_name,
            c.domain_id as o_domain,
            c.concept_name as o_name,
            c.vocabulary_id as o_vocab,
            c.concept_code as o_num
           FROM
            {self.catalog}.{self.schema}.observation as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.observation_concept_id
           )
           INNER JOIN {self.catalog}.{self.schema}.concept as c_val ON (
            c_val.concept_id = o.value_as_concept_id
           )
           WHERE
            o.person_id = '{pid}'
            AND o.value_as_concept_id IS NOT NULL
        """
        for row in self._exec(query):
            code = row['o_num']
            unmapped = False
            if code == 0:
                code = row['o_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['o_id']
            name = "unknown" if row['o_name'] is None else row['o_name']
            vocab = row['o_vocab']
            group = "Observation" if row['o_domain'] is None else row['o_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, "C", str(row['o_val_concept_name']))
            #event['time'] = self.to_time(row['o_date'])
            obj['events'].append(event)

    def get_observations_string_valued(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.observation_id as id_row,
            o.observation_date as o_date,
            o.observation_concept_id as o_id,
            o.observation_source_value as o_orig,
            o.value_as_string as o_val_string,
            c.domain_id as o_domain,
            c.concept_name as o_name,
            c.vocabulary_id as o_vocab,
            c.concept_code as o_num
           FROM
            {self.catalog}.{self.schema}.observation as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.observation_concept_id
           )
           WHERE
            o.person_id = '{pid}'
            AND o.value_as_string IS NOT NULL
        """
        for row in self._exec(query):
            code = row['o_num']
            unmapped = False
            if code == 0:
                code = row['o_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['o_id']
            name = "unknown" if row['o_name'] is None else row['o_name']
            vocab = row['o_vocab']
            group = "Observation" if row['o_domain'] is None else row['o_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, "S", row['o_val_string'])
            #event['time'] = self.to_time(row['o_date'])
            obj['events'].append(event)

    def get_observations_number_valued(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.observation_id as id_row,
            o.observation_date as o_date,
            o.observation_concept_id as o_id,
            o.observation_source_value as o_orig,
            o.value_as_number as o_val_number,
            c.domain_id as o_domain,
            c.concept_name as o_name,
            c.vocabulary_id as o_vocab,
            c.concept_code as o_num
           FROM
            {self.catalog}.{self.schema}.observation as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.observation_concept_id
           )
           WHERE
            o.person_id = '{pid}'
            AND o.value_as_number IS NOT NULL
        """
        for row in self._exec(query):
            code = row['o_num']
            unmapped = False
            if code == 0:
                code = row['o_orig']
                unmapped = True
            id_row = f'p{row["id_row"]}'
            d_id = row['o_id']
            name = "unknown" if row['o_name'] is None else row['o_name']
            vocab = row['o_vocab']
            group = "Observation" if row['o_domain'] is None else row['o_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, "N", str(row['o_val_number']))
            #event['time'] = self.to_time(row['o_date'])
            obj['events'].append(event)

    def get_drugs(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.drug_exposure_id as id_row,
            o.drug_exposure_start_date as date_start,
            o.drug_exposure_end_date as date_end,
            o.drug_concept_id as m_id,
            o.drug_source_value as m_orig,
            c.domain_id as m_domain,
            c.concept_name as m_name,
            c.vocabulary_id as m_vocab,
            c.concept_code as m_num
           FROM
            {self.catalog}.{self.schema}.drug_exposure as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.drug_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['m_num']
            unmapped = False
            if code == 0:
                code = row['m_orig']
                unmapped = True
            id_row = f'm{row["id_row"]}'
            d_id = row['m_id']
            name = row['m_name']
            vocab = row['m_vocab']
            group = "Drug" if row['m_domain'] is None else row['m_domain']
            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            #date_start = self.to_time(row['date_start'])
            #date_end = self.to_time(row['date_end']) if row['date_end'] else date_start
            #date_cur = date_start
            #while date_cur <= date_end:
            event = self.create_event(group, f'{vocab}{d_id}', id_row)
                #event['time'] = date_cur
            obj['events'].append(event)
                #date_cur = date_cur + timedelta(days=1)

    def get_measurements(self, pid, obj, dict, new_dict_entries):
        query = f"""SELECT
            o.measurement_id as id_row,
            o.measurement_date as m_date,
            o.measurement_concept_id as m_id,
            o.measurement_source_value as m_orig,
            o.value_source_value as m_orig_value,
            o.value_as_number as m_value,
            o.range_low as m_low,
            o.range_high as m_high,
            c.domain_id as m_domain,
            c.concept_name as m_name,
            c.vocabulary_id as m_vocab,
            c.concept_code as m_num
           FROM
            {self.catalog}.{self.schema}.measurement as o
           LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (
            c.concept_id = o.measurement_concept_id
           )
           WHERE
            o.person_id = '{pid}'
        """
        for row in self._exec(query):
            code = row['m_num']
            unmapped = False
            if code == 0:
                code = row['m_orig']
                unmapped = True
            id_row = f'l{row["id_row"]}'
            d_id = row['m_id']
            name = row['m_name']
            vocab = row['m_vocab']
            group = "Measurement" if row['m_domain'] is None else row['m_domain']
            lab_value = row['m_value']
            lab_low = row['m_low']
            lab_high = row['m_high']

            lab_value = float(lab_value) if lab_value is not None and isinstance(lab_value, (int, float, Decimal)) else float('-inf')
            lab_low = float(lab_low) if lab_low is not None and isinstance(lab_low, (int, float, Decimal)) else float('-inf')
            lab_high = float(lab_high) if lab_high is not None and isinstance(lab_high, (int, float, Decimal)) else float('inf')

            lab_flag = ""
            if lab_value is not None and lab_value != float('-inf'):
                if lab_value <= lab_low:
                    lab_flag = "L"
                elif lab_value >= lab_high:
                    lab_flag = "H"
            else:
                lab_value = "n/a"

            desc = f'{name} ({vocab} {code})'
            self.add_dict(dict, new_dict_entries, group, vocab, d_id, name, desc, code, unmapped)
            event = self.create_event(group, f'{vocab}{d_id}', id_row, True, lab_flag, str(lab_value))
            #event['time'] = self.to_time(row['m_date'])
            obj['events'].append(event)

    def get_visits(self, pid, obj):
        classes = obj["classes"]
        if not classes:
            return
        query = f"""
            SELECT v.visit_start_date as date_start, v.visit_end_date as date_end, c.concept_name as c_name
            FROM {self.catalog}.{self.schema}.visit_occurrence as v
            LEFT JOIN {self.catalog}.{self.schema}.concept as c ON (v.visit_concept_id = c.concept_id)
            WHERE v.person_id = '{pid}' AND c.concept_name IN ({','.join([f"'{k}'" for k in classes.keys()])})
        """
        v_spans = obj["v_spans"]
        for row in self._exec(query):
            visit_name = str(row['c_name'])
            #date_start_dt = self.to_time(row['date_start'])
            #date_end_dt = self.to_time(row['date_end'])

            # if date_start_dt is None:
            #     continue

            v_spans.append({"class": visit_name}) #"from": self.format_time(date_start_dt), "to": self.format_time(date_end_dt)})

    def get_patient(self, pid, dictionary, line_file, class_file):
        obj = {
            "info": [],
            "events": [],
            "h_bars": [],
            "v_bars": ["auto"],
            "v_spans": [],
            "classes": {}
        }
        new_dict_entries = set()
        self.get_info(pid, obj)
        self.add_info(obj, "pid", "Patient", pid)
        self.get_diagnoses(pid, obj, dictionary, new_dict_entries)
        self.get_observations_concept_valued(pid, obj, dictionary, new_dict_entries)
        self.get_observations_string_valued(pid, obj, dictionary, new_dict_entries)
        self.get_observations_number_valued(pid, obj, dictionary, new_dict_entries)
        self.get_procedures(pid, obj, dictionary, new_dict_entries)
        self.get_drugs(pid, obj, dictionary, new_dict_entries)
        self.get_measurements(pid, obj, dictionary, new_dict_entries)
        self.get_visits(pid, obj)
        # min_time = float('inf')
        # max_time = float('-inf')
        # for e in obj["events"]:
        #     time = e["time"]
        #     if time < min_time:
        #         min_time = time
        #     if time > max_time:
        #         max_time = time
        # obj["start"] = min_time
        # obj["end"] = max_time
        self.add_info(obj, "event_count", "Events", len(obj["events"]))
        self.update_hierarchies(dictionary, new_dict_entries)
        return obj

def generate_patient_files(catalog, schema, batch_size=10):
    settings = {
        'catalog': catalog,
        'schema': schema,
        'omop_use_alt_hierarchies': False,
        'use_cache': True,  # This line sets use_cache to False
        'ccs_diag': '/dbfs/omop-tables/531/ccs_diag.csv',
        'ccs_proc': '/dbfs/omop-tables/531/ccs_proc.csv',
    }

    omop = OMOP(settings['catalog'], settings['schema'], True)

    patients = set()
    patients_path = '/dbfs/omop-tables/531/patients.txt'
    if os.path.isfile(patients_path):
        with open(patients_path, 'r') as pf:
            patients = set(pf.read().splitlines())
    else:
        logger.info(f"File {patients_path} does not exist. Starting with an empty patients set.")

    omop.list_patients(patients, prefix="json/")
    save_patients(patients)
    use_cache = settings.get('use_cache', True)

    dictionary = {}
    dictionary_path = '/dbfs/omop-tables/531/json/dictionary.json'
    if os.path.isfile(dictionary_path):
        with open(dictionary_path, 'r') as df:
            dictionary = json.load(df)

    # Ensure the 'json' directory exists
    json_dir = '/dbfs/omop-tables/531/json'
    if not os.path.exists(json_dir):
        os.makedirs(json_dir)
        logger.info(f"Created directory: {json_dir}")
    else:
        logger.info(f"Directory already exists: {json_dir}")

    patient_list = list(patients)
    total_patients = len(patient_list)
    logger.info(f"Total patients to process: {total_patients}")

    def process_patient(patient):
        pid = patient.split('/')[-1].replace('.json', '')
        cache_file = os.path.join(json_dir, f"{pid}.json")
        
        if not os.path.isfile(cache_file) or not use_cache:
            patient_data = omop.get_patient(pid, dictionary, None, None)
            with open(cache_file, 'w') as cf:
                json.dump(patient_data, cf)
                logger.info(f"Created JSON file for patient {pid}: {cache_file}")
        
        return dictionary

    with ThreadPoolExecutor(max_workers=4) as executor:
        future_to_patient = {executor.submit(process_patient, patient): patient for patient in patient_list}
        for future in as_completed(future_to_patient):
            patient = future_to_patient[future]
            try:
                result = future.result()
                dictionary.update(result)
            except Exception as exc:
                logger.error(f'{patient} generated an exception: {exc}')

    with open(dictionary_path, 'w') as df:
        json.dump(dictionary, df)


    with ThreadPoolExecutor(max_workers=4) as executor:
        future_to_patient = {executor.submit(process_patient, patient): patient for patient in patient_list}
        for future in as_completed(future_to_patient):
            patient = future_to_patient[future]
            try:
                result = future.result()
                dictionary.update(result)
            except Exception as exc:
                logger.error(f'{patient} generated an exception: {exc}')

    with open(dictionary_path, 'w') as df:
        json.dump(dictionary, df)

def save_patients(patients):
    patients_path = '/dbfs/omop-tables/531/patients.txt'
    # Ensure the directory exists
    os.makedirs(os.path.dirname(patients_path), exist_ok=True)
    with open(patients_path, 'w') as pf:
        pf.write('\n'.join(sorted(list(patients))))
        pf.flush()

if __name__ == '__main__':
    catalog = 'ancillary_projects'
    schema = 'cdm_synthea10'
    generate_patient_files(catalog, schema, batch_size=10)


INFO:__main__:Directory already exists: /dbfs/omop-tables/531/json
INFO:__main__:Total patients to process: 11535
INFO:__main__:Created JSON file for patient 10902: /dbfs/omop-tables/531/json/10902.json


ient 3101: /dbfs/omop-tables/531/json/3101.json
INFO:__main__:Created JSON file for patient 2442: /dbfs/omop-tables/531/json/2442.json
INFO:__main__:Created JSON file for patient 2506: /dbfs/omop-tables/531/json/2506.json
INFO:__main__:Created JSON file for patient 4588: /dbfs/omop-tables/531/json/4588.json
INFO:__main__:Created JSON file for patient 6960: /dbfs/omop-tables/531/json/6960.json
INFO:__main__:Created JSON file for patient 862: /dbfs/omop-tables/531/json/862.json
INFO:__main__:Created JSON file for patient 3407: /dbfs/omop-tables/531/json/3407.json
INFO:__main__:Created JSON file for patient 9691: /dbfs/omop-tables/531/json/9691.json
INFO:__main__:Created JSON file for patient 1643: /dbfs/omop-tables/531/json/1643.json
INFO:__main__:Created JSON file for patient 4877: /db

In [0]:
contents = dbutils.fs.head("dbfs:/omop-tables/531/json/4871.json")

[Truncated to first 65536 bytes]


In [0]:
display(contents)

'{"info": [{"id": "id_alt", "name": "ID", "value": "6b433a25-fd39-83f1-d0c6-f00d7779c915.json"}, {"id": "born", "name": "Born", "value": 1979}, {"id": "gender", "name": "Gender", "value": "F", "label": "U"}, {"id": "pid", "name": "Patient", "value": "4871"}, {"id": "event_count", "name": "Events", "value": 2725}], "events": [{"id": "SNOMED43530622", "group": "Condition", "row_id": "c43156", "time": 1391558400}, {"id": "SNOMED75053", "group": "Condition", "row_id": "c43161", "time": 1414022400}, {"id": "SNOMED75053", "group": "Condition", "row_id": "c43161", "time": 1414108800}, {"id": "SNOMED75053", "group": "Condition", "row_id": "c43161", "time": 1414195200}, {"id": "SNOMED75053", "group": "Condition", "row_id": "c43161", "time": 1414281600}, {"id": "SNOMED75053", "group": "Condition", "row_id": "c43161", "time": 1414368000}, {"id": "SNOMED75053", "group": "Condition", "row_id": "c43161", "time": 1414454400}, {"id": "SNOMED75053", "group": "Condition", "row_id": "c43161", "time": 141