In [1]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from datetime import datetime
import pandas as pd

In [2]:
concept = pd.read_csv('/workspaces/synthea_dw/omop/seeds/CONCEPT.csv', delimiter='\t', low_memory=False)

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.expand_frame_repr', False)

In [3]:
def find_concept_id(
        concept, concept_codes=None, 
        concept_names=None, vocabulary_ids=None, 
        domain_ids=None, concept_class_ids=None, 
        invalid_reason=False, standard_concept=None
    ):
    query_components = []

    if concept_codes:
        query_components.append(f"concept_code in @concept_codes")
    if concept_names:
        query_components.append(f"concept_name in @concept_names")
    if vocabulary_ids:
        query_components.append(f"vocabulary_id in @vocabulary_ids")
    if not invalid_reason:
        query_components.append(f"invalid_reason.isnull()")
    if standard_concept:
        query_components.append(f"standard_concept == @standard_concept")
    if domain_ids:
        query_components.append(f"domain_id in @domain_ids")
    if concept_class_ids:
        query_components.append(f"concept_class_id in @concept_class_ids")

    query = " and ".join(query_components)
    
    concept_rows = concept.query(query)['concept_id'] if query else concept['concept_id']
    
    return int(concept_rows.iloc[0]) if not concept_rows.empty else 0

In [4]:
def person(filepaths, concept, max_workers=10):
    person_rows = []

    def process_line(line):
        data = json.loads(line)

        # Extracting the year, month, and day of birth
        birth_year = int(data['birthDate'].split('-')[0])
        birth_month = int(data['birthDate'].split('-')[1])
        birth_day = int(data['birthDate'].split('-')[2])
        birth_datetime = datetime.fromisoformat(data['birthDate']).strftime('%Y-%m-%d %H:%M:%S')

        race_code, ethnicity_code, gender_source_value = None, None, None
        for ext in data.get('extension', []):
            if ext.get('url') == 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-race':
                race_code = ext['valueCoding']['display'] if 'valueCoding' in ext else None
            elif ext.get('url') == 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity':
                ethnicity_code = ext['valueCoding']['display'] if 'valueCoding' in ext else None
            elif ext.get('url') == 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex':
                gender_source_value = ext.get('valueCode')

        race_concept_id = find_concept_id(concept, concept_names=[race_code], vocabulary_ids=['Race'])
        ethnicity_concept_id = find_concept_id(concept, concept_names=[ethnicity_code], vocabulary_ids=['Ethnicity'])
        gender_concept_id = find_concept_id(concept, concept_codes=[gender_source_value], vocabulary_ids=['Gender'])

        return {
            'person_id': data['id'],
            'gender_concept_id': gender_concept_id,
            'year_of_birth': birth_year,
            'month_of_birth': birth_month,
            'day_of_birth': birth_day,
            'birth_datetime': birth_datetime,
            'race_concept_id': race_concept_id,
            'ethnicity_concept_id': ethnicity_concept_id,
            'location_id': data['id'],
            'provider_id': pd.NA,
            'care_site_id': pd.NA,
            'person_source_value': data['id'],
            'gender_source_value': gender_source_value,
            'gender_source_concept_id': gender_concept_id,
            'race_source_value': race_code,
            'race_source_concept_id': race_concept_id,
            'ethnicity_source_value': ethnicity_code,
            'ethnicity_source_concept_id': ethnicity_concept_id
        }

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                result = future.result()
                if result:
                    person_rows.append(result)

    person = pd.DataFrame(person_rows).drop_duplicates()

    return person

filepaths = ['/workspaces/synthea_dw/data/fhir/Patient.ndjson']
person_df = person(filepaths, concept)
person_df.sample(5)

Unnamed: 0,person_id,gender_concept_id,year_of_birth,month_of_birth,day_of_birth,birth_datetime,race_concept_id,ethnicity_concept_id,location_id,provider_id,care_site_id,person_source_value,gender_source_value,gender_source_concept_id,race_source_value,race_source_concept_id,ethnicity_source_value,ethnicity_source_concept_id
1,7a82833f-fae1-d69a-2cbf-69279dac746f,8532,1967,5,20,1967-05-20 00:00:00,0,0,7a82833f-fae1-d69a-2cbf-69279dac746f,,,7a82833f-fae1-d69a-2cbf-69279dac746f,F,8532,,0,,0
4,408a95f4-02aa-3003-2f09-0241ac3343fb,8532,1958,1,8,1958-01-08 00:00:00,0,0,408a95f4-02aa-3003-2f09-0241ac3343fb,,,408a95f4-02aa-3003-2f09-0241ac3343fb,F,8532,,0,,0
9,7a7b7fba-a005-3736-91ef-218a0d2824c5,8507,1969,11,16,1969-11-16 00:00:00,0,0,7a7b7fba-a005-3736-91ef-218a0d2824c5,,,7a7b7fba-a005-3736-91ef-218a0d2824c5,M,8507,,0,,0
7,c95d085d-2249-b616-7668-88cc9a0c11bd,8532,1958,8,17,1958-08-17 00:00:00,0,0,c95d085d-2249-b616-7668-88cc9a0c11bd,,,c95d085d-2249-b616-7668-88cc9a0c11bd,F,8532,,0,,0
6,4390395b-5a78-2005-80b7-5ebd62b595c9,8507,1944,8,2,1944-08-02 00:00:00,0,0,4390395b-5a78-2005-80b7-5ebd62b595c9,,,4390395b-5a78-2005-80b7-5ebd62b595c9,M,8507,,0,,0


In [None]:
def visit_occurrence(filepaths, concept, max_workers=10):
    visit_occurrences = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']
        person_id = data['subject']['reference'].split('/')[-1]

        if resource_type == 'CareTeam':
            provider_id, care_site_id = None, None
            for participant in data.get('participant', []):
                for role in participant.get('role', []):
                    for coding in role.get('coding', []):
                        if coding.get('code') == '116154003':
                            person_id = participant['member']['reference'].split('/')[-1]
                        elif coding.get('code') == '223366009': 
                            provider_id = participant['member']['reference'].split('/')[-1]
                        elif coding.get('code') == '224891009': 
                            care_site_id = participant['member']['reference'].split('/')[-1]

            visit_occurrence = {
                'visit_occurrence_id': data['encounter']['reference'].split('/')[-1],
                'person_id': person_id,
                'visit_concept_id': 9201,
                'visit_start_date': datetime.strptime(data['period']['start'].split('T')[0], '%Y-%m-%d').date(),
                'visit_start_datetime': datetime.fromisoformat(data['period']['start']),
                'visit_end_date': datetime.strptime(data['period']['end'].split('T')[0], '%Y-%m-%d').date() if 'end' in data['period'] else None,
                'visit_end_datetime': datetime.fromisoformat(data['period']['end']) if 'end' in data['period'] else None,
                'visit_type_concept_id': 32817,
                'provider_id': provider_id,
                'care_site_id': care_site_id,
                'visit_source_value': 'IP',
                'visit_source_concept_id': 9201,
                'admitted_from_concept_id': pd.NA,
                'admitted_from_source_value': None,
                'discharged_to_concept_id': pd.NA,
                'discharged_to_source_value': None,
                'preceding_visit_occurrence_id': pd.NA
            }

        elif resource_type == 'Encounter':
            visit_class_code = data['class']['code'] if 'class' in data else None
            visit_concept_id_map = {'IMP': 9201, 'EMER': 9203, 'AMB': 9202}
            visit_concept_id = visit_concept_id_map.get(visit_class_code, None)
            provider_id = data['participant'][0]['individual']['reference'].split('/')[-1] if 'participant' in data and 'individual' in data['participant'][0] else None
            care_site_id = data['serviceProvider']['reference'].split('/')[-1] if 'serviceProvider' in data else None

            visit_occurrence = {
                'visit_occurrence_id': data['id'],
                'person_id': person_id,
                'visit_concept_id': visit_concept_id,
                'visit_start_date': datetime.strptime(data['period']['start'].split('T')[0], '%Y-%m-%d').date(),
                'visit_start_datetime': datetime.fromisoformat(data['period']['start']),
                'visit_end_date': datetime.strptime(data['period']['end'].split('T')[0], '%Y-%m-%d').date() if 'end' in data['period'] else None,
                'visit_end_datetime': datetime.fromisoformat(data['period']['end']) if 'end' in data['period'] else None,
                'visit_type_concept_id': 32817,
                'provider_id': provider_id,
                'care_site_id': care_site_id,
                'visit_source_value': visit_class_code,
                'visit_source_concept_id': visit_concept_id,
                'admitted_from_concept_id': pd.NA,
                'admitted_from_source_value': None,
                'discharged_to_concept_id': pd.NA,
                'discharged_to_source_value': None,
                'preceding_visit_occurrence_id': pd.NA
            }

        return visit_occurrence

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                visit_occurrences.append(future.result())
    
    visit_occurrence = pd.DataFrame(visit_occurrences).drop_duplicates()

    return visit_occurrence

filepaths = ['/workspaces/synthea_dw/data/fhir/CareTeam.ndjson', '/workspaces/synthea_dw/data/fhir/Encounter.ndjson']
visit_occurrence_df = visit_occurrence(filepaths, concept)
visit_occurrence_df.sample(5)

Unnamed: 0,visit_occurrence_id,person_id,visit_concept_id,visit_start_date,visit_start_datetime,visit_end_date,visit_end_datetime,visit_type_concept_id,provider_id,care_site_id,visit_source_value,visit_source_concept_id,admitted_from_concept_id,admitted_from_source_value,discharged_to_concept_id,discharged_to_source_value,preceding_visit_occurrence_id
798,0e87aa43-3404-0c15-d9ab-b5df454e400e,7a7b7fba-a005-3736-91ef-218a0d2824c5,9202.0,2019-04-13,2019-04-13 05:06:03+00:00,2019-04-13,2019-04-13 08:13:03+00:00,32817,299bc447-29c4-3c98-948b-ea0891c97d89,497f39dd-280e-3d58-af5b-c5e3a3a09b10,AMB,9202.0,,,,,
573,5c01c2f4-1cb3-6ff2-961f-6378ab7ac778,7a7b7fba-a005-3736-91ef-218a0d2824c5,9202.0,2016-11-29,2016-11-29 00:29:03+00:00,2016-11-29,2016-11-29 02:54:03+00:00,32817,299bc447-29c4-3c98-948b-ea0891c97d89,497f39dd-280e-3d58-af5b-c5e3a3a09b10,AMB,9202.0,,,,,
921,ec57fabd-e567-c6b8-ab2f-5c2482e206d1,7a7b7fba-a005-3736-91ef-218a0d2824c5,9202.0,2020-02-01,2020-02-01 20:17:03+00:00,2020-02-01,2020-02-01 20:32:03+00:00,32817,299bc447-29c4-3c98-948b-ea0891c97d89,497f39dd-280e-3d58-af5b-c5e3a3a09b10,AMB,9202.0,,,,,
796,9c9b717b-42fc-f67c-a898-9417e2a96c71,7a7b7fba-a005-3736-91ef-218a0d2824c5,9202.0,2018-11-24,2018-11-24 17:40:03+00:00,2018-11-24,2018-11-24 20:47:03+00:00,32817,299bc447-29c4-3c98-948b-ea0891c97d89,497f39dd-280e-3d58-af5b-c5e3a3a09b10,AMB,9202.0,,,,,
762,3bfa4723-7b69-853f-9e1e-18408050bc81,7a7b7fba-a005-3736-91ef-218a0d2824c5,9202.0,2018-11-02,2018-11-02 16:52:03+00:00,2018-11-02,2018-11-02 18:53:03+00:00,32817,299bc447-29c4-3c98-948b-ea0891c97d89,497f39dd-280e-3d58-af5b-c5e3a3a09b10,AMB,9202.0,,,,,


In [None]:
def condition_occurrence(filepaths, concept, max_workers=10):
    condition_occurrences = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']
        conditions = []

        if resource_type == 'AllergyIntolerance':
            if data['code']['coding'][0]['code'] == '419199007':
                return []

            for reaction in data.get('reaction', []):
                for manifestation in reaction.get('manifestation', []):
                    conditions.append({
                        'condition_occurrence_id': data['id'],
                        'person_id': data['patient']['reference'].split('/')[-1],
                        'condition_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[manifestation['coding'][0]['code']], 
                            vocabulary_ids=['SNOMED'], 
                            domain_ids=['Condition'], 
                            invalid_reason=False, 
                            standard_concept='S'
                        ),
                        'condition_start_date': datetime.strptime(data['recordedDate'].split('T')[0], '%Y-%m-%d').date(),
                        'condition_start_datetime': datetime.fromisoformat(data['recordedDate']),
                        'condition_end_date': None,
                        'condition_end_datetime': None,
                        'condition_type_concept_id': 32817,
                        'condition_status_concept_id': pd.NA,
                        'stop_reason': None,
                        'provider_id': pd.NA,
                        'visit_occurrence_id': pd.NA,
                        'visit_detail_id': pd.NA,
                        'condition_source_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[manifestation['coding'][0]['code']], 
                            vocabulary_ids=['SNOMED'], 
                            domain_ids=['Condition'], 
                            invalid_reason=True
                        ),
                        'condition_status_source_value': manifestation['coding'][0]['code']
                    })

        elif resource_type == 'CareTeam':
            if 'reasonCode' in data:
                person_id = None
                provider_id = None
                for participant in data.get('participant', []):
                    for role in participant.get('role', []):
                        for coding in role.get('coding', []):
                            if coding.get('code') == '116154003':
                                person_id = participant['member']['reference'].split('/')[-1]
                            elif coding.get('code') == '223366009':
                                provider_id = participant['member']['reference'].split('/')[-1]

                if person_id:
                    for reasonCode in data['reasonCode']:
                        for coding in reasonCode.get('coding', []):
                            condition_occurrence = {
                                'condition_occurrence_id': data['id'],
                                'person_id': person_id,
                                'condition_concept_id': find_concept_id(
                                    concept, 
                                    concept_codes=[coding['code']], 
                                    vocabulary_ids=['SNOMED'], 
                                    domain_ids=['Condition'],
                                    invalid_reason=False, 
                                    standard_concept='S'
                                ),
                                'condition_start_date': datetime.strptime(data['period']['start'].split('T')[0], '%Y-%m-%d').date(),
                                'condition_start_datetime': datetime.fromisoformat(data['period']['start']),
                                'condition_end_date': None,
                                'condition_end_datetime': None,
                                'condition_type_concept_id': 32817,
                                'condition_status_concept_id': pd.NA,
                                'stop_reason': None,
                                'provider_id': provider_id,
                                'visit_occurrence_id': data['encounter']['reference'].split('/')[-1],
                                'visit_detail_id': pd.NA,
                                'condition_source_value': coding['code'],
                                'condition_source_concept_id': find_concept_id(
                                    concept, 
                                    concept_codes=[coding['code']], 
                                    vocabulary_ids=['SNOMED'], 
                                    domain_ids=['Condition']
                                ),
                                'condition_status_source_value': None
                            }
                            conditions.append(condition_occurrence)

        elif resource_type == 'Claim':
            if any(coding['code'] in ['professional', 'institutional'] for coding in data['type']['coding']):
                for diagnosis in data.get('diagnosis', []):
                    condition_ref = diagnosis['diagnosisReference']['reference']
                    condition_id = condition_ref.split('/')[-1]

                    for item in data.get('item', []):
                        if 'productOrService' in item and 'coding' in item['productOrService']:
                            for coding in item['productOrService']['coding']:
                                condition_occurrence = {
                                    'condition_occurrence_id': condition_id,
                                    'person_id': data['patient']['reference'].split('/')[-1],
                                    'condition_concept_id': find_concept_id(
                                        concept, 
                                        concept_codes=[coding['code']], 
                                        vocabulary_ids=['SNOMED'], 
                                        domain_ids=['Condition'], 
                                        invalid_reason=False, 
                                        standard_concept='S'
                                    ),
                                    'condition_start_date': datetime.strptime(data['billablePeriod']['start'].split('T')[0], '%Y-%m-%d').date(),
                                    'condition_start_datetime': datetime.fromisoformat(data['billablePeriod']['start']),
                                    'condition_end_date': datetime.strptime(data['billablePeriod']['end'].split('T')[0], '%Y-%m-%d').date(),
                                    'condition_end_datetime': datetime.fromisoformat(data['billablePeriod']['end']),
                                    'condition_type_concept_id': 32817,
                                    'condition_status_concept_id': pd.NA,
                                    'stop_reason': None,
                                    'provider_id': pd.NA,
                                    'visit_occurrence_id': item['encounter'][0]['reference'].split('/')[-1] if 'encounter' in item else None,
                                    'visit_detail_id': pd.NA,
                                    'condition_source_concept_id': find_concept_id(
                                        concept, 
                                        concept_codes=[coding['code']], 
                                        vocabulary_ids=['SNOMED'], 
                                        domain_ids=['Condition']
                                    ),
                                    'condition_status_source_value': coding['code']
                                }
                                conditions.append(condition_occurrence)

        elif resource_type == 'Condition':
            person_id = data['subject']['reference'].split('/')[-1]
            condition_code = data['code']['coding'][0]['code']
            clinical_status_code = data['clinicalStatus']['coding'][0]['code']

            condition_status_concept_id = 37109701 if clinical_status_code == 'resolved' else 9181 if clinical_status_code == 'active' else pd.NA
            condition_abatement_datetime = datetime.fromisoformat(data['abatementDateTime']) if 'abatementDateTime' in data else None

            condition_occurrence = {
                'condition_occurrence_id': data['id'],
                'person_id': person_id,
                'condition_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[condition_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Condition'], 
                    invalid_reason=False, 
                    standard_concept='S'
                ),
                'condition_start_date': datetime.strptime(data['onsetDateTime'].split('T')[0], '%Y-%m-%d').date(),
                'condition_start_datetime': datetime.fromisoformat(data['onsetDateTime']),
                'condition_end_date': condition_abatement_datetime.date() if condition_abatement_datetime else None,
                'condition_end_datetime': condition_abatement_datetime,
                'condition_type_concept_id': 32817,
                'condition_status_concept_id': condition_status_concept_id,
                'stop_reason': None,
                'provider_id': pd.NA,
                'visit_occurrence_id': data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None,
                'visit_detail_id': pd.NA,
                'condition_source_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[condition_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Condition']
                ),
                'condition_status_source_value': condition_code
            }
            conditions.append(condition_occurrence)

        return conditions

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}
            for future in as_completed(future_to_line):
                condition_occurrences.extend(future.result())

    condition_occurrence = pd.DataFrame(condition_occurrences).drop_duplicates()
    condition_occurrence = condition_occurrence[condition_occurrence['condition_source_concept_id'] != 0]

    return condition_occurrence

filepaths = [
    '/workspaces/synthea_dw/data/fhir/AllergyIntolerance.ndjson',
    '/workspaces/synthea_dw/data/fhir/CareTeam.ndjson',
    '/workspaces/synthea_dw/data/fhir/Claim.ndjson',
    '/workspaces/synthea_dw/data/fhir/Condition.ndjson'
]
concept = concept
condition_occurrence_df = condition_occurrence(filepaths, concept)
condition_occurrence_df.sample(5)

Unnamed: 0,condition_occurrence_id,person_id,condition_concept_id,condition_start_date,condition_start_datetime,condition_end_date,condition_end_datetime,condition_type_concept_id,condition_status_concept_id,stop_reason,provider_id,visit_occurrence_id,visit_detail_id,condition_source_concept_id,condition_status_source_value,condition_source_value
7196,32c73b65-827c-fd2c-31d2-d3559df734de,7a82833f-fae1-d69a-2cbf-69279dac746f,4309238,2014-12-13,2014-12-13 02:39:06+00:00,2015-02-21,2015-02-21 02:29:28+00:00,32817,37109701.0,,,528ff2b6-d6ea-2ec2-cde4-b84ac955067e,,4309238,422650009,
2470,490f2ba1-7578-c902-3cb9-7ac8230d6064,7a82833f-fae1-d69a-2cbf-69279dac746f,4172829,2018-05-26,2018-05-26 01:46:00+00:00,2018-05-26,2018-05-26 02:25:44+00:00,32817,,,,,,4172829,423315002,
7373,8d20c824-d2c7-47a4-8d59-aab806ef4fa8,7a82833f-fae1-d69a-2cbf-69279dac746f,4251306,2020-10-10,2020-10-10 02:35:15+00:00,2020-11-21,2020-11-21 02:41:47+00:00,32817,37109701.0,,,82575e97-c41d-fd32-a16a-cefd2ef33041,,4251306,73595000,
7512,76f86b9a-68b4-f6af-b1c8-8258484c6509,7a7b7fba-a005-3736-91ef-218a0d2824c5,4309238,2016-07-24,2016-07-24 12:59:27+00:00,2017-02-26,2017-02-26 12:46:48+00:00,32817,37109701.0,,,cfd8a830-512b-39f3-54f2-c229eac60284,,4309238,422650009,
1230,1b7138a2-6d4b-b769-8f0a-825fabc06ecb,a5a02d31-a93c-7b72-7e1c-a9cbfa64874d,436940,2011-03-02,2011-03-02 08:52:48+00:00,2011-03-02,2011-03-02 09:45:56+00:00,32817,,,,,,436940,237602007,


In [None]:
def drug_exposure(filepaths, concept, max_workers=10):
    drug_exposures = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']
        exposures = []

        if resource_type == 'Claim' and any(coding['code'] == 'pharmacy' for coding in data['type']['coding']):
            for item in data.get('item', []):
                drug_exposure = {
                    'drug_exposure_id': data['prescription']['reference'].split('/')[-1],
                    'person_id': data['patient']['reference'].split('/')[-1],
                    'drug_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[item['productOrService']['coding'][0]['code']], 
                        vocabulary_ids=['RxNorm'], 
                        domain_ids=['Drug'], 
                        invalid_reason=False, 
                        standard_concept='S'
                    ),
                    'drug_exposure_start_date': datetime.strptime(data['billablePeriod']['start'].split('T')[0], '%Y-%m-%d').date(),
                    'drug_exposure_start_datetime': datetime.fromisoformat(data['billablePeriod']['start']),
                    'drug_exposure_end_date': datetime.strptime(data['billablePeriod']['end'].split('T')[0], '%Y-%m-%d').date(),
                    'drug_exposure_end_datetime': datetime.fromisoformat(data['billablePeriod']['end']),
                    'verbatim_end_date': datetime.strptime(data['billablePeriod']['end'].split('T')[0], '%Y-%m-%d').date(),
                    'drug_type_concept_id': 32817,
                    'stop_reason': None,
                    'refills': 0,
                    'quantity': None,
                    'days_supply': ((datetime.strptime(data['billablePeriod']['start'].split('T')[0], '%Y-%m-%d').date()) - (datetime.strptime(data['billablePeriod']['start'].split('T')[0], '%Y-%m-%d').date())).days or 1,
                    'sig': None,
                    'route_concept_id': None,
                    'lot_number': None,
                    'provider_id': None,
                    'visit_occurrence_id': item['encounter'][0]['reference'].split('/')[-1] if 'encounter' in item else None,
                    'visit_detail_id': None,
                    'drug_source_value': item['productOrService']['coding'][0]['code'],
                    'drug_source_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[item['productOrService']['coding'][0]['code']], 
                        vocabulary_ids=['RxNorm'], 
                        domain_ids=['Drug']
                    ),
                    'route_source_value': None,
                    'dose_unit_source_value': None
                }
                exposures.append(drug_exposure)

        elif resource_type == 'Immunization':
            person_id = data['patient']['reference'].split('/')[-1]
            occurrence_date = datetime.strptime(data['occurrenceDateTime'].split('T')[0], '%Y-%m-%d').date()
            occurrence_datetime = datetime.fromisoformat(data['occurrenceDateTime'])
            visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None

            for vaccineCode in data.get('vaccineCode', {}).get('coding', []):
                drug_exposure = {
                    'drug_exposure_id': data['id'],
                    'person_id': person_id,
                    'drug_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[vaccineCode['code']], 
                        vocabulary_ids=['CVX'], 
                        domain_ids=['Drug'], 
                        invalid_reason=False, 
                        standard_concept='S'
                    ),
                    'drug_exposure_start_date': occurrence_date,
                    'drug_exposure_start_datetime': occurrence_datetime,
                    'drug_exposure_end_date': occurrence_date,
                    'drug_exposure_end_datetime': occurrence_datetime,
                    'verbatim_end_date': occurrence_date,
                    'drug_type_concept_id': 32817,
                    'stop_reason': 'completed' if data['status'] == 'completed' else None,
                    'refills': 0,
                    'quantity': 1,
                    'days_supply': 1,
                    'sig': None,
                    'route_concept_id': None,
                    'lot_number': None,
                    'provider_id': None,
                    'visit_occurrence_id': visit_occurrence_id,
                    'visit_detail_id': None,
                    'drug_source_value': vaccineCode['code'],
                    'drug_source_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[vaccineCode['code']], 
                        vocabulary_ids=['CVX'], 
                        domain_ids=['Drug']
                    ),
                    'route_source_value': None,
                    'dose_unit_source_value': None
                }
                exposures.append(drug_exposure)

        elif resource_type == 'MedicationAdministration':
            person_id = data['subject']['reference'].split('/')[-1]
            visit_occurrence_id = data['context']['reference'].split('/')[-1] if 'context' in data else None
            effective_date = datetime.strptime(data['effectiveDateTime'].split('T')[0], '%Y-%m-%d').date()

            for coding in data['medicationCodeableConcept']['coding']:
                drug_exposure = {
                    'drug_exposure_id': data['id'],
                    'person_id': person_id,
                    'drug_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[coding['code']], 
                        vocabulary_ids=['RxNorm'], 
                        domain_ids=['Drug'], 
                        invalid_reason=False, 
                        standard_concept='S'
                    ),
                    'drug_exposure_start_date': effective_date,
                    'drug_exposure_start_datetime': datetime.fromisoformat(data['effectiveDateTime']),
                    'drug_exposure_end_date': effective_date,
                    'drug_exposure_end_datetime': datetime.fromisoformat(data['effectiveDateTime']),
                    'verbatim_end_date': effective_date,
                    'drug_type_concept_id': 32817,
                    'stop_reason': 'completed' if data['status'] == 'completed' else None,
                    'refills': 0,
                    'quantity': 1,
                    'days_supply': 1,
                    'sig': None,
                    'route_concept_id': None,
                    'lot_number': None,
                    'provider_id': None,
                    'visit_occurrence_id': visit_occurrence_id,
                    'visit_detail_id': None,
                    'drug_source_value': coding['code'],
                    'drug_source_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[coding['code']], 
                        vocabulary_ids=['RxNorm'], 
                        domain_ids=['Drug']
                    ),
                    'route_source_value': None,
                    'dose_unit_source_value': None
                }
                exposures.append(drug_exposure)

        elif resource_type == 'MedicationRequest':
            if 'medicationCodeableConcept' in data and 'coding' in data['medicationCodeableConcept']:
                person_id = data['subject']['reference'].split('/')[-1]
                provider_id = data['requester']['reference'].split('/')[-1] if 'requester' in data else None
                visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None
                authored_date = datetime.strptime(data['authoredOn'].split('T')[0], '%Y-%m-%d').date()

                for coding in data['medicationCodeableConcept']['coding']:
                    drug_exposure = {
                        'drug_exposure_id': data['id'],
                        'person_id': person_id,
                        'drug_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[coding['code']], 
                            vocabulary_ids=['RxNorm'], 
                            domain_ids=['Drug'], 
                            invalid_reason=False, 
                            standard_concept='S'
                        ),
                        'drug_exposure_start_date': authored_date,
                        'drug_exposure_start_datetime': datetime.fromisoformat(data['authoredOn']),
                        'drug_exposure_end_date': pd.NA,
                        'drug_exposure_end_datetime': pd.NA,
                        'verbatim_end_date': pd.NA,
                        'drug_type_concept_id': 32817,
                        'stop_reason': 'stopped' if data['status'] == 'stopped' else None,
                        'refills': pd.NA,
                        'quantity': pd.NA,
                        'days_supply': pd.NA,
                        'sig': None,
                        'route_concept_id': None,
                        'lot_number': None,
                        'provider_id': provider_id,
                        'visit_occurrence_id': visit_occurrence_id,
                        'visit_detail_id': None,
                        'drug_source_value': coding['code'],
                        'drug_source_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[coding['code']], 
                            vocabulary_ids=['RxNorm'], 
                            domain_ids=['Drug']
                        ),
                        'route_source_value': None,
                        'dose_unit_source_value': None
                    }
                    exposures.append(drug_exposure)

        return exposures

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}
            for future in as_completed(future_to_line):
                drug_exposures.extend(future.result())

    drug_exposure = pd.DataFrame(drug_exposures).drop_duplicates()
    drug_exposure = drug_exposure[drug_exposure['drug_source_concept_id'] != 0]

    return drug_exposure

filepaths = [
    '/workspaces/synthea_dw/data/fhir/Claim.ndjson',
    '/workspaces/synthea_dw/data/fhir/Immunization.ndjson',
    '/workspaces/synthea_dw/data/fhir/MedicationAdministration.ndjson',
    '/workspaces/synthea_dw/data/fhir/MedicationRequest.ndjson'
]
concept = concept
drug_exposure_df = drug_exposure(filepaths, concept)
drug_exposure_df.sample(5)


Unnamed: 0,drug_exposure_id,person_id,drug_concept_id,drug_exposure_start_date,drug_exposure_start_datetime,drug_exposure_end_date,drug_exposure_end_datetime,verbatim_end_date,drug_type_concept_id,stop_reason,refills,quantity,days_supply,sig,route_concept_id,lot_number,provider_id,visit_occurrence_id,visit_detail_id,drug_source_value,drug_source_concept_id,route_source_value,dose_unit_source_value
1745,adb6f4af-183a-823a-904b-f81d94a612ac,7a7b7fba-a005-3736-91ef-218a0d2824c5,19041324,2018-01-21,2018-01-21 12:15:03+00:00,,,,32817,stopped,,,,,,,d833f3f0-c08f-370e-b14a-1af90bc0adb3,6bfbdab2-d926-d13e-91e1-797ab9a396c0,,209387,19041324,,
634,130573f9-556b-be34-cd81-da3996f35640,7a7b7fba-a005-3736-91ef-218a0d2824c5,19080128,2017-07-23,2017-07-23 12:15:03+00:00,2017-07-23,2017-07-23 13:12:56+00:00,2017-07-23,32817,,0.0,,1.0,,,,,8a0add6a-00fa-b99f-ef92-531be4fe8f2a,,314076,19080128,,
1589,b70bf693-9244-d1cc-10a0-16ee972c6c44,7a7b7fba-a005-3736-91ef-218a0d2824c5,19009384,2014-04-13,2014-04-13 12:15:03+00:00,,,,32817,stopped,,,,,,,d833f3f0-c08f-370e-b14a-1af90bc0adb3,76349a0d-58a7-b8b1-0294-825cac2452ed,,106892,19009384,,
609,84023240-1548-2314-3b0f-ce8967be7fdb,7a7b7fba-a005-3736-91ef-218a0d2824c5,19041324,2016-11-06,2016-11-06 12:15:03+00:00,2016-11-06,2016-11-06 12:57:35+00:00,2016-11-06,32817,,0.0,,1.0,,,,,ae9d4e18-353f-8d6b-a917-765b73c63f88,,209387,19041324,,
352,f71f3770-edab-8e00-f1ba-9116390a58e8,408a95f4-02aa-3003-2f09-0241ac3343fb,40166824,2023-04-05,2023-04-05 08:52:48+00:00,2023-04-05,2023-04-05 09:30:00+00:00,2023-04-05,32817,,0.0,,1.0,,,,,f5d43eb2-aefd-1a7c-1214-c9112cd19473,,866412,40166824,,


In [None]:
def procedure_occurrence(filepaths, concept, max_workers=10):
    procedure_occurrences = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']
        procedures = []

        if resource_type == 'CarePlan':
            for category in data.get('category', []):
                for coding in category.get('coding', []):
                    if 'display' in coding:
                        procedure = {
                            'procedure_occurrence_id': data['id'],
                            'person_id': data['subject']['reference'].split('/')[-1],
                            'procedure_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Procedure'], 
                                invalid_reason=False, 
                                standard_concept='S', 
                                concept_class_ids=['Procedure']
                            ),
                            'procedure_date': datetime.strptime(data['period']['start'].split('T')[0], '%Y-%m-%d').date(),
                            'procedure_datetime': datetime.fromisoformat(data['period']['start']),
                            'procedure_end_date': None,
                            'procedure_end_datetime': None,
                            'procedure_type_concept_id': 32817,
                            'modifier_concept_id': pd.NA,
                            'quantity': 1,
                            'provider_id': pd.NA,
                            'visit_occurrence_id': data['encounter']['reference'].split('/')[-1],
                            'visit_detail_id': pd.NA,
                            'procedure_source_value': coding['code'],
                            'procedure_source_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Procedure'], 
                                concept_class_ids=['Procedure']
                            ),
                            'modifier_source_value': None
                        }
                        procedures.append(procedure)

        elif resource_type == 'ImagingStudy':
            person_id = data['subject']['reference'].split('/')[-1]
            procedure_date = datetime.strptime(data['started'].split('T')[0], '%Y-%m-%d').date()
            procedure_datetime = datetime.fromisoformat(data['started'])
            visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None
            numberOfInstances = data['numberOfInstances'] if 'numberOfInstances' in data else None

            for procedureCode in data.get('procedureCode', []):
                for coding in procedureCode.get('coding', []):
                    modifier_code = data['series'][0]['bodySite']['code'] if 'series' in data and 'bodySite' in data['series'][0] else None

                    procedure_occurrence = {
                        'procedure_occurrence_id': data['id'],
                        'person_id': person_id,
                        'procedure_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[coding['code']], 
                            vocabulary_ids=['SNOMED'], 
                            domain_ids=['Procedure'], 
                            invalid_reason=False, 
                            standard_concept='S'
                        ),
                        'procedure_date': procedure_date,
                        'procedure_datetime': procedure_datetime,
                        'procedure_end_date': None,
                        'procedure_end_datetime': None,
                        'procedure_type_concept_id': 32817,
                        'modifier_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[modifier_code], 
                            vocabulary_ids=['SNOMED'], 
                            domain_ids=['Spec Anatomic Site'], 
                            invalid_reason=False, 
                            standard_concept='S'
                        ),
                        'quantity': numberOfInstances,
                        'provider_id': pd.NA,
                        'visit_occurrence_id': visit_occurrence_id,
                        'visit_detail_id': pd.NA,
                        'procedure_source_value': coding['display'] if 'display' in coding else None,
                        'procedure_source_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[coding['code']], 
                            vocabulary_ids=['SNOMED'], 
                            domain_ids=['Procedure'], 
                            invalid_reason=False, 
                            standard_concept='S'
                        ),
                        'modifier_source_value': modifier_code
                    }
                    procedures.append(procedure_occurrence)

        elif resource_type == 'Procedure':
            person_id = data['subject']['reference'].split('/')[-1]
            procedure_code = data['code']['coding'][0]['code']
            procedure_date = datetime.strptime(data['performedPeriod']['start'].split('T')[0], '%Y-%m-%d').date()
            procedure_datetime = datetime.fromisoformat(data['performedPeriod']['start'])
            procedure_end_date = datetime.strptime(data['performedPeriod']['end'].split('T')[0], '%Y-%m-%d').date() if 'end' in data['performedPeriod'] else None
            procedure_end_datetime = datetime.fromisoformat(data['performedPeriod']['end']) if 'end' in data['performedPeriod'] else None
            visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None

            procedure_occurrence = {
                'procedure_occurrence_id': data['id'],
                'person_id': person_id,
                'procedure_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Procedure'], 
                    invalid_reason=False, 
                    standard_concept='S'
                ),
                'procedure_date': procedure_date,
                'procedure_datetime': procedure_datetime,
                'procedure_end_date': procedure_end_date,
                'procedure_end_datetime': procedure_end_datetime,
                'procedure_type_concept_id': 32817,
                'modifier_concept_id': 0,
                'quantity': 1,
                'provider_id': pd.NA,
                'visit_occurrence_id': visit_occurrence_id,
                'visit_detail_id': pd.NA,
                'procedure_source_value': procedure_code,
                'procedure_source_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Procedure']
                ),
                'modifier_source_value': None
            }

            procedures.append(procedure_occurrence)

        return procedures

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                procedure_occurrences.extend(future.result())

    procedure_occurrence = pd.DataFrame(procedure_occurrences).drop_duplicates()
    procedure_occurrence = procedure_occurrence[procedure_occurrence['procedure_source_concept_id'] != 0]

    return procedure_occurrence

filepaths = ['/workspaces/synthea_dw/data/fhir/CarePlan.ndjson', '/workspaces/synthea_dw/data/fhir/ImagingStudy.ndjson', '/workspaces/synthea_dw/data/fhir/Procedure.ndjson']
concept = concept
procedure_occurrence_df = procedure_occurrence(filepaths, concept)
procedure_occurrence_df.sample(5)

Unnamed: 0,procedure_occurrence_id,person_id,procedure_concept_id,procedure_date,procedure_datetime,procedure_end_date,procedure_end_datetime,procedure_type_concept_id,modifier_concept_id,quantity,provider_id,visit_occurrence_id,visit_detail_id,procedure_source_value,procedure_source_concept_id,modifier_source_value
946,0b2170d1-d7ed-2261-590f-2c8debb3a2dc,408a95f4-02aa-3003-2f09-0241ac3343fb,762506,2020-03-18,2020-03-18 10:43:14+00:00,2020-03-18,2020-03-18 10:53:55+00:00,32817,0,1,,2591259b-95e9-e6cf-bb45-82b3676f9fc2,,428211000124100,762506,
2117,4d4d6eab-5afc-4fc4-568f-0cfc125a415d,7a7b7fba-a005-3736-91ef-218a0d2824c5,4146536,2022-06-25,2022-06-25 18:11:03+00:00,2022-06-25,2022-06-25 21:52:03+00:00,32817,0,1,,790fa7c6-516e-0e70-c8c1-371556580834,,265764009,4146536,
1024,6b19781f-5af2-0c06-e6e4-e3b73c9031b7,7a82833f-fae1-d69a-2cbf-69279dac746f,762506,2021-01-02,2021-01-02 03:17:25+00:00,2021-01-02,2021-01-02 03:27:58+00:00,32817,0,1,,805c3232-6446-6b62-9cfb-48d6bf7133dd,,428211000124100,762506,
1313,efaa3271-f64e-9e27-3c29-026acddd00bf,7a7b7fba-a005-3736-91ef-218a0d2824c5,46272459,2015-11-15,2015-11-15 12:15:03+00:00,2015-11-15,2015-11-15 13:02:45+00:00,32817,0,1,,9081c31a-1aab-35ab-f477-9e4793c9c9af,,710824005,46272459,
451,f7dd8cae-da6d-d9c0-3ceb-0ed8cbd097c3,4390395b-5a78-2005-80b7-5ebd62b595c9,46272459,2020-12-30,2020-12-30 07:11:38+00:00,2020-12-30,2020-12-30 07:57:31+00:00,32817,0,1,,f89dde2a-4483-92ae-b94e-d6d14b6a4454,,710824005,46272459,


In [None]:
def device_exposure(filepaths, concept, max_workers=10):
    device_exposures = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']
        exposures = []

        if resource_type == 'Device':
            person_id = data['patient']['reference'].split('/')[-1]
            device_code = data['type']['coding'][0]['code']
            exposure = {
                'device_exposure_id': data['id'],
                'person_id': person_id,
                'device_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[device_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Device'], 
                    invalid_reason=False, 
                    standard_concept='S'
                ),
                'device_exposure_start_date': datetime.strptime(data['manufactureDate'].split('T')[0], '%Y-%m-%d').date() if 'manufactureDate' in data else None,
                'device_exposure_start_datetime': datetime.fromisoformat(data['manufactureDate']) if 'manufactureDate' in data else None,
                'device_exposure_end_date': None,
                'device_exposure_end_datetime': None,
                'device_type_concept_id': 32817,
                'unique_device_id': data.get('distinctIdentifier', None),
                'production_id': data['udiCarrier'][0]['carrierHRF'] if 'udiCarrier' in data else None,
                'quantity': 1,
                'provider_id': pd.NA,
                'visit_occurrence_id': pd.NA,
                'visit_detail_id': pd.NA,
                'device_source_value': device_code,
                'device_source_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[device_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Device']
                ),
                'unit_concept_id': pd.NA,
                'unit_source_value': None,
                'unit_source_concept_id': pd.NA
            }
            exposures.append(exposure)

        elif resource_type == 'SupplyDelivery':
            person_id = data['patient']['reference'].split('/')[-1]
            device_code = data['suppliedItem']['itemCodeableConcept']['coding'][0]['code']
            quantity = data['suppliedItem']['quantity']['value'] if 'quantity' in data['suppliedItem'] else None
            occurrence_date = datetime.strptime(data['occurrenceDateTime'].split('T')[0], '%Y-%m-%d').date() if 'occurrenceDateTime' in data else None
            occurrence_datetime = datetime.fromisoformat(data['occurrenceDateTime']) if 'occurrenceDateTime' in data else None

            exposure = {
                'device_exposure_id': data['id'],
                'person_id': person_id,
                'device_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[device_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Device'], 
                    invalid_reason=False, 
                    standard_concept='S'
                ),
                'device_exposure_start_date': occurrence_date,
                'device_exposure_start_datetime': occurrence_datetime,
                'device_exposure_end_date': occurrence_date,
                'device_exposure_end_datetime': occurrence_datetime,
                'device_type_concept_id': 32817,
                'unique_device_id': None,
                'production_id': None,
                'quantity': quantity,
                'provider_id': pd.NA,
                'visit_occurrence_id': pd.NA,
                'visit_detail_id': pd.NA,
                'device_source_value': device_code,
                'device_source_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[device_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Device']
                ),
                'unit_concept_id': pd.NA,
                'unit_source_value': None,
                'unit_source_concept_id': pd.NA
            }
            exposures.append(exposure)

        return exposures

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                device_exposures.extend(future.result())

    device_exposure = pd.DataFrame(device_exposures).drop_duplicates()
    device_exposure = device_exposure[device_exposure['device_source_concept_id'] != 0]

    return device_exposure

filepaths = ['/workspaces/synthea_dw/data/fhir/Device.ndjson', '/workspaces/synthea_dw/data/fhir/SupplyDelivery.ndjson']
device_exposure_df = device_exposure(filepaths, concept)
device_exposure_df.sample(5)

Unnamed: 0,device_exposure_id,person_id,device_concept_id,device_exposure_start_date,device_exposure_start_datetime,device_exposure_end_date,device_exposure_end_datetime,device_type_concept_id,unique_device_id,production_id,quantity,provider_id,visit_occurrence_id,visit_detail_id,device_source_value,device_source_concept_id,unit_concept_id,unit_source_value,unit_source_concept_id
84,c12aab6d-fb7b-673e-834b-73ca80e15f29,4390395b-5a78-2005-80b7-5ebd62b595c9,605721,2014-11-26,2014-11-26 07:11:38+00:00,2014-11-26,2014-11-26 07:11:38+00:00,32817,,,100,,,,1137596000,605721,,,
165,4b51e00e-6308-6f4f-0bc4-cb211c11fb92,c86bea4c-5647-c8c2-35c5-cb08246ded70,45758780,2021-08-27,2021-08-27 16:03:44+00:00,2021-08-27,2021-08-27 16:03:44+00:00,32817,,,1,,,,463659001,45758780,,,
65,6969115a-d68f-ee3c-7d8a-61169637449b,c86bea4c-5647-c8c2-35c5-cb08246ded70,4222987,2011-07-01,2011-07-01 16:03:44+00:00,2011-07-01,2011-07-01 16:03:44+00:00,32817,,,50,,,,337388004,4222987,,,
95,5f44c6e2-c228-1fbc-dd96-36a391349608,32ee64c2-1585-d7ad-c53f-9ad739c676cf,4222987,2011-11-27,2011-11-27 12:15:03+00:00,2011-11-27,2011-11-27 12:15:03+00:00,32817,,,50,,,,337388004,4222987,,,
149,829b1d33-4e46-1e0c-c083-44ecab3eae04,c86bea4c-5647-c8c2-35c5-cb08246ded70,45758780,2020-06-19,2020-06-19 16:03:44+00:00,2020-06-19,2020-06-19 16:03:44+00:00,32817,,,1,,,,463659001,45758780,,,


In [5]:
def measurement(filepaths, concept, max_workers=10):
    measurement_rows = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']
        measurements = []

        person_id = data['subject']['reference'].split('/')[-1]
        visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None

        if resource_type == 'Observation':
            measurement_date = datetime.strptime(data['effectiveDateTime'].split('T')[0], '%Y-%m-%d').date() if 'effectiveDateTime' in data else None
            measurement_datetime = datetime.fromisoformat(data['effectiveDateTime']) if 'effectiveDateTime' in data else None
            measurement_time = data['effectiveDateTime'].split('T')[1] if 'effectiveDateTime' in data else None
            components = data.get('component', [{'code': data['code'], 'valueQuantity': data.get('valueQuantity')}])

            for comp in components:
                code = comp['code']['coding'][0]['code']
                valueQuantity = comp.get('valueQuantity', None)
                value_as_number = float(valueQuantity['value']) if valueQuantity and 'value' in valueQuantity else pd.NA

                measurement = {
                    'measurement_id': data['id'],
                    'person_id': person_id,
                    'measurement_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[code], 
                        vocabulary_ids=['LOINC'], 
                        domain_ids=['Measurement'], 
                        invalid_reason=False, 
                        standard_concept='S'
                    ),
                    'measurement_date': measurement_date,
                    'measurement_datetime': measurement_datetime,
                    'measurement_time': measurement_time,
                    'measurement_type_concept_id': 32817,
                    'operator_concept_id': 4172703,
                    'value_as_number': value_as_number,
                    'value_as_concept_id': pd.NA,
                    'unit_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[valueQuantity['code']] if valueQuantity and 'code' in valueQuantity else None, 
                        vocabulary_ids=['UCUM'], domain_ids=['Unit'], 
                        invalid_reason=False, 
                        standard_concept='S'
                    ),
                    'range_low': pd.NA,
                    'range_high': pd.NA,
                    'provider_id': pd.NA,
                    'visit_occurrence_id': visit_occurrence_id,
                    'visit_detail_id': pd.NA,
                    'measurement_source_value': code,
                    'measurement_source_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[code], 
                        vocabulary_ids=['LOINC'], 
                        domain_ids=['Measurement']
                    ),
                    'unit_source_value': valueQuantity['code'] if valueQuantity and 'code' in valueQuantity else None,
                    'unit_source_concept_id': pd.NA,
                    'value_source_value': valueQuantity['value'] if valueQuantity and 'value' in valueQuantity else None,
                    'measurement_event_id': pd.NA,
                    'meas_event_field_concept_id': pd.NA
                }
                measurements.append(measurement)

        elif resource_type == 'Procedure':
            procedure_code = data['code']['coding'][0]['code']
            measurement_date = datetime.strptime(data['performedPeriod']['start'].split('T')[0], '%Y-%m-%d').date() if 'performedPeriod' in data and 'start' in data['performedPeriod'] else None
            measurement_datetime = datetime.fromisoformat(data['performedPeriod']['start']) if 'performedPeriod' in data and 'start' in data['performedPeriod'] else None

            measurement = {
                'measurement_id': data['id'],
                'person_id': person_id,
                'measurement_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Measurement'], 
                    invalid_reason=False, 
                    standard_concept='S'
                ),
                'measurement_date': measurement_date,
                'measurement_datetime': measurement_datetime,
                'measurement_time': None,
                'measurement_type_concept_id': 32817,
                'operator_concept_id': 4172703,
                'value_as_number': pd.NA,
                'value_as_concept_id': 0,
                'unit_concept_id': 0,
                'range_low': pd.NA,
                'range_high': pd.NA,
                'provider_id': pd.NA,
                'visit_occurrence_id': visit_occurrence_id,
                'visit_detail_id': pd.NA,
                'measurement_source_value': procedure_code,
                'measurement_source_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Measurement']
                ),
                'unit_source_value': None,
                'unit_source_concept_id': 0,
                'value_source_value': None,
                'measurement_event_id': pd.NA,
                'meas_event_field_concept_id': pd.NA
            }
            measurements.append(measurement)

        return measurements

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                measurement_rows.extend(future.result())

    measurement = pd.DataFrame(measurement_rows).drop_duplicates()
    measurement = measurement[measurement['measurement_source_concept_id'] != 0]

    return measurement

filepaths = ['/workspaces/synthea_dw/data/fhir/Observation.ndjson', '/workspaces/synthea_dw/data/fhir/Procedure.ndjson']
measurement_df = measurement(filepaths, concept)
measurement_df.sample(5)

In [None]:
def observation(filepaths, concept, max_workers=10):
    observation_rows = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']
        observations = []

        person_id = data['subject']['reference'].split('/')[-1]
        visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None

        if resource_type == 'AllergyIntolerance' and data['code']['coding'][0]['code'] != '419199007':
            for reaction in data.get('reaction', []):
                observation = {
                    'observation_id': data['id'],
                    'person_id': data['patient']['reference'].split('/')[-1],
                    'observation_concept_id': 4169307,
                    'observation_date': datetime.strptime(data['recordedDate'].split('T')[0], '%Y-%m-%d').date(),
                    'observation_datetime': datetime.fromisoformat(data['recordedDate']),
                    'observation_type_concept_id': 32817,
                    'value_as_number': None,
                    'value_as_string': None,
                    'value_as_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[data['code']['coding'][0]['code']], 
                        vocabulary_ids=['SNOMED'], 
                        domain_ids=['Observation'], 
                        invalid_reason=False, 
                        standard_concept='S', 
                        concept_class_ids=['Substance']
                    ),
                    'qualifier_concept_id': find_concept_id(
                        concept, 
                        concept_names=[data['criticality'].capitalize()], 
                        vocabulary_ids=['SNOMED'], 
                        domain_ids=['Meas Value'], 
                        invalid_reason=False, 
                        standard_concept='S',
                        concept_class_ids=['Qualifier Value']
                    ),
                    'unit_concept_id': pd.NA,
                    'provider_id': pd.NA,
                    'visit_occurrence_id': None,
                    'visit_detail_id': None,
                    'observation_source_value': None,
                    'observation_source_concept_id': 4169307,
                    'unit_source_value': None,
                    'qualifier_source_value': data['criticality'],
                    'value_source_value': data['code']['coding'][0]['code'],
                    'observation_event_id': None,
                    'obs_event_field_concept_id': None
                }
                observations.append(observation)

        elif resource_type == 'CarePlan':
            for category in data.get('category', []):
                for coding in category.get('coding', []):
                    if 'display' in coding:
                        observation = {
                            'observation_id': data['id'],
                            'person_id': data['subject']['reference'].split('/')[-1],
                            'observation_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Observation'], 
                                invalid_reason=False, 
                                standard_concept='S'
                            ),
                            'observation_date': datetime.strptime(data['period']['start'].split('T')[0], '%Y-%m-%d').date(),
                            'observation_datetime': datetime.fromisoformat(data['period']['start']),
                            'observation_type_concept_id': 32817,
                            'value_as_number': None,
                            'value_as_string': None,
                            'value_as_concept_id': pd.NA,
                            'qualifier_concept_id': pd.NA,
                            'unit_concept_id': pd.NA,
                            'provider_id': pd.NA,
                            'visit_occurrence_id': data['encounter']['reference'].split('/')[-1],
                            'visit_detail_id': pd.NA,
                            'observation_source_value': coding['code'],
                            'observation_source_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Observation']
                            ),
                            'qualifier_source_value': None,
                            'value_source_value': None,
                            'observation_event_id': data['id'],
                            'obs_event_field_concept_id': None  
                        }
                        observations.append(observation)

        elif resource_type == 'Claim' and any(coding['code'] in ['professional', 'institutional'] for coding in data['type']['coding']):
            person_id = data['patient']['reference'].split('/')[-1]

            for item in data.get('item', []):
                if 'productOrService' in item and 'coding' in item['productOrService']:
                    for coding in item['productOrService']['coding']:
                        observation = {
                            'observation_id': data['id'],
                            'person_id': person_id,
                            'observation_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Observation'], 
                                invalid_reason=False, 
                                standard_concept='S'
                            ),
                            'observation_date': datetime.strptime(data['created'].split('T')[0], '%Y-%m-%d').date(),
                            'observation_datetime': datetime.fromisoformat(data['created']),
                            'observation_type_concept_id': 32817,
                            'value_as_number': pd.NA,
                            'value_as_string': None,
                            'value_as_concept_id': pd.NA,
                            'qualifier_concept_id': pd.NA,
                            'unit_concept_id': None,
                            'provider_id': pd.NA,
                            'visit_occurrence_id': item['encounter'][0]['reference'].split('/')[-1] if 'encounter' in item else None,
                            'visit_detail_id': pd.NA,
                            'observation_source_value': coding['code'],
                            'observation_source_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Observation']
                            ),
                            'unit_source_value': None,
                            'qualifier_source_value': None,
                            'value_source_value': None,
                            'observation_event_id': data['id'],
                            'obs_event_field_concept_id': None
                        }
                        observations.append(observation)

        elif resource_type == 'Observation':
            person_id = data['subject']['reference'].split('/')[-1]
            observation_date = datetime.strptime(data['effectiveDateTime'].split('T')[0], '%Y-%m-%d').date() if 'effectiveDateTime' in data else None
            observation_datetime = datetime.fromisoformat(data['effectiveDateTime']) if 'effectiveDateTime' in data else None
            visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None

            if 'component' in data:
                for comp in data['component']:
                    code = comp['code']['coding'][0]['code']
                    valueQuantity = comp.get('valueQuantity', None)
                    valueCodeableConcept = comp.get('valueCodeableConcept', None)

                    value_as_number = float(valueQuantity['value']) if valueQuantity and 'value' in valueQuantity else pd.NA
                    value_as_concept_id = find_concept_id(
                        concept, 
                        concept_codes=[valueCodeableConcept['coding'][0]['code']], 
                        vocabulary_ids=['LOINC'], 
                        domain_ids=['Meas Value']
                    ) if valueCodeableConcept else None

                    observation = {
                        'observation_id': data['id'],
                        'person_id': person_id,
                        'observation_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[code], 
                            vocabulary_ids=['LOINC'], 
                            domain_ids=['Observation'], 
                            invalid_reason=False, 
                            standard_concept='S'
                        ),
                        'observation_date': observation_date,
                        'observation_datetime': observation_datetime,
                        'observation_type_concept_id': 32817,
                        'value_as_number': value_as_number,
                        'value_as_concept_id': value_as_concept_id,
                        'unit_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[valueQuantity['code']] if valueQuantity and 'code' in valueQuantity else None, 
                            vocabulary_ids=['UCUM'], 
                            domain_ids=['Unit'], 
                            invalid_reason=False, 
                            standard_concept='S'
                        ) if valueQuantity and 'code' in valueQuantity else pd.NA,
                        'provider_id': pd.NA,
                        'visit_occurrence_id': visit_occurrence_id,
                        'visit_detail_id': pd.NA,
                        'observation_source_value': code,
                        'observation_source_concept_id': find_concept_id(
                            concept, 
                            concept_codes=[code], 
                            vocabulary_ids=['LOINC'], 
                            domain_ids=['Observation']
                        ),
                        'unit_source_value': valueQuantity['code'] if valueQuantity and 'code' in valueQuantity else None,
                        'qualifier_source_value': None,
                        'value_source_value': valueQuantity['value'] if valueQuantity and 'value' in valueQuantity else None,
                        'observation_event_id': data['id'],
                        'obs_event_field_concept_id': pd.NA
                    }
                    observations.append(observation)
            
            else:
                code = data['code']['coding'][0]['code']
                valueQuantity = data.get('valueQuantity', None)
                valueCodeableConcept = data.get('valueCodeableConcept', None)

                value_as_number = float(valueQuantity['value']) if valueQuantity and 'value' in valueQuantity else pd.NA
                value_as_concept_id = find_concept_id(
                    concept, 
                    concept_codes=[valueCodeableConcept['coding'][0]['code']], 
                    vocabulary_ids=['LOINC'], 
                    domain_ids=['Meas Value']
                ) if valueCodeableConcept else None

                observation = {
                    'observation_id': data['id'],
                    'person_id': person_id,
                    'observation_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[code], 
                        vocabulary_ids=['LOINC'], 
                        domain_ids=['Observation'], 
                        invalid_reason=False, 
                        standard_concept='S'
                    ),
                    'observation_date': observation_date,
                    'observation_datetime': observation_datetime,
                    'observation_type_concept_id': 32817,
                    'value_as_number': value_as_number,
                    'value_as_string': None,
                    'value_as_concept_id': value_as_concept_id,
                    'qualifier_concept_id': pd.NA,
                    'unit_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[valueQuantity['code']] if valueQuantity and 'code' in valueQuantity else None, 
                        vocabulary_ids=['UCUM'], 
                        domain_ids=['Unit'], 
                        invalid_reason=False, 
                        standard_concept='S'
                    ) if valueQuantity and 'code' in valueQuantity else pd.NA,
                    'provider_id': pd.NA,
                    'visit_occurrence_id': visit_occurrence_id,
                    'visit_detail_id': pd.NA,
                    'observation_source_value': code,
                    'observation_source_concept_id': find_concept_id(
                        concept, 
                        concept_codes=[code], 
                        vocabulary_ids=['LOINC'], 
                        domain_ids=['Observation']
                    ),
                    'unit_source_value': valueQuantity['code'] if valueQuantity and 'code' in valueQuantity else None,
                    'qualifier_source_value': None,
                    'value_source_value': valueQuantity['value'] if valueQuantity and 'value' in valueQuantity else None,
                    'observation_event_id': data['id'],
                    'obs_event_field_concept_id': pd.NA
                }
                observations.append(observation)

        elif resource_type == 'Procedure':
            person_id = data['subject']['reference'].split('/')[-1]
            procedure_code = data['code']['coding'][0]['code']
            observation_date = datetime.strptime(data['performedPeriod']['start'].split('T')[0], '%Y-%m-%d').date() if 'performedPeriod' in data and 'start' in data['performedPeriod'] else None
            observation_datetime = datetime.fromisoformat(data['performedPeriod']['start']) if 'performedPeriod' in data and 'start' in data['performedPeriod'] else None
            visit_occurrence_id = data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None

            observation = {
                'observation_id': data['id'],
                'person_id': person_id,
                'observation_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Observation'], 
                    invalid_reason=False, 
                    standard_concept='S'
                ),
                'observation_date': observation_date,
                'observation_datetime': observation_datetime,
                'observation_type_concept_id': 32817,
                'value_as_number': pd.NA,
                'value_as_string': None,
                'value_as_concept_id': 0,
                'qualifier_concept_id': pd.NA,
                'unit_concept_id': 0,
                'provider_id': pd.NA,
                'visit_occurrence_id': visit_occurrence_id,
                'visit_detail_id': pd.NA,
                'observation_source_value': procedure_code,
                'observation_source_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Observation']
                ),
                'unit_source_value': None,
                'qualifier_source_value': None,
                'value_source_value': None,
                'observation_event_id': data['id'],
                'obs_event_field_concept_id': pd.NA
            }
            observations.append(observation)

        return observations

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                observation_rows.extend(future.result())

    observation = pd.DataFrame(observation_rows).drop_duplicates()
    observation = observation[observation['observation_source_concept_id'] != 0]

    return observation

filepaths = ['/workspaces/synthea_dw/data/fhir/AllergyIntolerance.ndjson', '/workspaces/synthea_dw/data/fhir/CarePlan.ndjson', '/workspaces/synthea_dw/data/fhir/Claim.ndjson', '/workspaces/synthea_dw/data/fhir/Observation.ndjson', '/workspaces/synthea_dw/data/fhir/Procedure.ndjson']
observation_df = observation(filepaths, concept)
observation_df.sample(5)

In [None]:
def death(filepaths, max_workers=10):
    death_rows = []

    def process_line(line):
        data = json.loads(line)

        if 'deceasedDateTime' in data:
            person_id = data['id']
            death_date = datetime.strptime(data['deceasedDateTime'].split('T')[0], '%Y-%m-%d').date()

            return {
                'person_id': person_id,
                'death_date': death_date,
                'death_type_concept_id': 32817,
                'cause_concept_id': 0,
                'cause_source_value': None,
                'cause_source_concept_id': 0
            }
        return None

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                result = future.result()
                if result:
                    death_rows.append(result)

    death = pd.DataFrame(death_rows).drop_duplicates()

    return death

filepaths = ['/workspaces/synthea_dw/data/fhir/Patient.ndjson']
death_df = death(filepaths)
death_df.sample(3)

In [None]:
def note(filepaths, concept, max_workers=10):
    note_rows = []

    def process_line(line, resource_type):
        data = json.loads(line)
        person_id = data['subject']['reference'].split('/')[-1]
        provider_id = None
        note_text = None
        note_title = None
        note_date = None
        note_datetime = None
        note_type_concept_id = 32817
        note_class_concept_id = None
        note_type_code = None

        if resource_type == 'CarePlan':
            div_text = data['text']['div']
            note_title_end_index = div_text.find('<br/>')
            note_title = div_text[len('<div xmlns="http://www.w3.org/1999/xhtml">'):note_title_end_index]
            note_text = div_text[note_title_end_index + len('<br/>'):]
            note_date = datetime.strptime(data['period']['start'].split('T')[0], '%Y-%m-%d').date()
            note_datetime = datetime.fromisoformat(data['period']['start'])
            note_class_concept_id = 706300

        elif resource_type == 'DiagnosticReport':
            provider_id = data['performer'][0]['reference'].split('/')[-1] if 'performer' in data and data['performer'] else None
            note_text = data['presentedForm'][0]['data'] if 'presentedForm' in data and data['presentedForm'] else None
            note_title = data['code']['coding'][0]['display'] if 'code' in data and 'coding' in data['code'] else None
            note_date = datetime.strptime(data['issued'].split('T')[0], '%Y-%m-%d').date()
            note_datetime = datetime.fromisoformat(data['issued'])
            note_class_concept_id = 42868493

        elif resource_type == 'DocumentReference':
            provider_id = data['author'][0]['reference'].split('/')[-1] if 'author' in data else None
            note_text = data['content'][0]['attachment']['data'] if 'content' in data and 'attachment' in data['content'][0] else None
            note_title = data['category'][0]['coding'][0]['display'] if 'category' in data and 'coding' in data['category'][0] else None
            note_date = datetime.strptime(data['date'].split('T')[0], '%Y-%m-%d').date()
            note_datetime = datetime.fromisoformat(data['date'])
            note_type_code = data['type']['coding'][0]['code'] if 'type' in data and 'coding' in data['type'] else None
            note_class_concept_id = find_concept_id(concept, concept_codes=[note_type_code], vocabulary_ids=['LOINC'], domain_ids=['Note'], invalid_reason=False, standard_concept='S')

        return {
            'note_id': data['id'],
            'person_id': person_id,
            'note_date': note_date,
            'note_datetime': note_datetime,
            'note_type_concept_id': note_type_concept_id,
            'note_class_concept_id': note_class_concept_id,
            'note_title': note_title,
            'note_text': note_text,
            'encoding_concept_id': 32678,
            'language_concept_id': 4175745,
            'provider_id': provider_id,
            'visit_occurrence_id': data['encounter']['reference'].split('/')[-1] if 'encounter' in data else None,
            'visit_detail_id': None,
            'note_source_value': None,
            'note_event_id': None,
            'note_event_field_concept_id': None
        }

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for filepath in filepaths:
            resource_type = filepath.split('/')[-1].split('.')[0]
            with open(filepath, 'r') as file:
                lines = file.readlines()
                for line in lines:
                    futures.append(executor.submit(process_line, line, resource_type))

        for future in as_completed(futures):
            note_rows.append(future.result())

    note = pd.DataFrame(note_rows).drop_duplicates()

    return note

filepaths = [
    '/workspaces/synthea_dw/data/fhir/CarePlan.ndjson',
    '/workspaces/synthea_dw/data/fhir/DiagnosticReport.ndjson',
    '/workspaces/synthea_dw/data/fhir/DocumentReference.ndjson'
]
note_df = note(filepaths, concept)
note_df.sample(5)

In [None]:
def location(filepaths, max_workers=10):
    location_rows = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']

        if resource_type == 'Location':
            return {
                'location_id': data['id'],
                'address_1': data['address']['line'][0] if 'address' in data and 'line' in data['address'] and data['address']['line'] else None,
                'address_2': None,
                'city': data['address']['city'] if 'address' in data and 'city' in data['address'] else None,
                'state': data['address']['state'] if 'address' in data and 'state' in data['address'] else None,
                'zip': data['address']['postalCode'] if 'address' in data and 'postalCode' in data['address'] else None,
                'county': None,
                'location_source_value': data['name'] if 'name' in data else None,
                'country_concept_id': 42046186 if ('address' in data and 'country' in data['address'] and data['address']['country'] == 'US') else 0,
                'country_source_value': data['address']['country'] if 'address' in data and 'country' in data['address'] else None,
                'latitude': data['position']['latitude'] if 'position' in data and 'latitude' in data['position'] else None,
                'longitude': data['position']['longitude'] if 'position' in data and 'longitude' in data['position'] else None
            }

        elif resource_type == 'Patient':
            address = data['address'][0] if 'address' in data and data['address'] else {}
            country_concept_id = 42046186 if address.get('country') == 'US' else 0
            latitude, longitude = None, None
            if 'extension' in address:
                for ext in address['extension']:
                    if ext['url'] == 'latitude':
                        latitude = ext['valueDecimal']
                    elif ext['url'] == 'longitude':
                        longitude = ext['valueDecimal']

            return {
                'location_id': data['id'],
                'address_1': address.get('line', [None])[0],
                'address_2': None,
                'city': address.get('city'),
                'state': address.get('state'),
                'zip': address.get('postalCode'),
                'county': None,
                'location_source_value': data['id'],
                'country_concept_id': country_concept_id,
                'country_source_value': address.get('country'),
                'latitude': latitude,
                'longitude': longitude
            }

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for filepath in filepaths:
            with open(filepath, 'r') as file:
                lines = file.readlines()
                for line in lines:
                    futures.append(executor.submit(process_line, line))

        for future in as_completed(futures):
            location_rows.append(future.result())

    location = pd.DataFrame(location_rows).drop_duplicates()

    return location

filepaths = ['/workspaces/synthea_dw/data/fhir/Location.ndjson', '/workspaces/synthea_dw/data/fhir/Patient.ndjson']
location_df = location(filepaths)
location_df.sample(5)

In [None]:
def specimen(filepaths, concept, max_workers=10):
    specimen_rows = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']

        if resource_type == 'Procedure':
            person_id = data['subject']['reference'].split('/')[-1]
            procedure_code = data['code']['coding'][0]['code']
            specimen_date = datetime.strptime(data['performedPeriod']['start'].split('T')[0], '%Y-%m-%d').date()
            specimen_datetime = datetime.fromisoformat(data['performedPeriod']['start'])

            return {
                'specimen_id': data['id'],
                'person_id': person_id,
                'specimen_concept_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Specimen'], 
                    invalid_reason=False, 
                    standard_concept='S'
                ),
                'specimen_type_concept_id': 32817,
                'specimen_date': specimen_date,
                'specimen_datetime': specimen_datetime,
                'quantity': 1,
                'unit_concept_id': 0,
                'anatomic_site_concept_id': 0,
                'disease_status_concept_id': 0,
                'specimen_source_id': find_concept_id(
                    concept, 
                    concept_codes=[procedure_code], 
                    vocabulary_ids=['SNOMED'], 
                    domain_ids=['Specimen']
                ),
                'specimen_source_value': procedure_code,
                'unit_source_value': None,
                'anatomic_site_source_value': None,
                'disease_status_source_value': None
            }

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for filepath in filepaths:
            with open(filepath, 'r') as file:
                lines = file.readlines()
                for line in lines:
                    futures.append(executor.submit(process_line, line))

        for future in as_completed(futures):
            result = future.result()
            if result:
                specimen_rows.append(result)

    specimen = pd.DataFrame(specimen_rows).drop_duplicates()
    specimen = specimen[specimen['specimen_source_id'] != 0]

    return specimen

filepaths = ['/workspaces/synthea_dw/data/fhir/Procedure.ndjson']
specimen_df = specimen(filepaths, concept)
specimen_df.sample(2)

In [None]:
def care_site(filepaths, max_workers=10):
    care_site_rows = []

    def process_line(line):
        data = json.loads(line)
        resource_type = data['resourceType']

        if resource_type == 'Organization':
            return {
                'care_site_id': data['id'],
                'care_site_name': data['name'],
                'place_of_service_concept_id': 32693,
                'location_id': pd.NA,
                'care_site_source_value': data['id'],
                'place_of_service_source_value': 'Healthcare Provider'
            }

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for filepath in filepaths:
            with open(filepath, 'r') as file:
                lines = file.readlines()
                for line in lines:
                    futures.append(executor.submit(process_line, line))

        for future in as_completed(futures):
            result = future.result()
            if result:
                care_site_rows.append(result)

    care_site = pd.DataFrame(care_site_rows).drop_duplicates()
    
    return care_site

filepaths = ['/workspaces/synthea_dw/data/fhir/Organization.ndjson']
care_site_df = care_site(filepaths)
care_site_df.sample(5)

In [None]:
def provider(filepaths, concept, max_workers=10):
    practitioner_file_path, practitioner_role_file_path = filepaths

    practitioners_dict = {}
    with open(practitioner_file_path, 'r') as file:
        for line in file:
            data = json.loads(line)
            for identifier in data.get('identifier', []):
                if identifier.get('system') == "http://hl7.org/fhir/sid/us-npi":
                    npi = identifier['value']
                    practitioners_dict[npi] = {
                        'provider_name': " ".join(
                            data['name'][0].get('prefix', []) +
                            data['name'][0].get('given', []) +
                            [data['name'][0]['family']]
                        ),
                        'gender': data['gender']
                    }

    provider_rows = []

    def process_line(line):
        data = json.loads(line)
        npi = data['practitioner']['identifier']['value']
        specialty_code = data['specialty'][0]['coding'][0]['code'] if 'specialty' in data else None

        return {
            'provider_id': data['id'],
            'provider_name': practitioners_dict.get(npi, {}).get('provider_name', ''),
            'npi': npi,
            'dea': None,
            'specialty_concept_id': find_concept_id(concept, concept_codes=[specialty_code], vocabulary_ids=['NUCC'], domain_ids=['Provider'], invalid_reason=False, standard_concept='S'),
            'care_site_id': pd.NA,
            'year_of_birth': pd.NA,
            'gender_concept_id': find_concept_id(concept, concept_names=[practitioners_dict.get(npi, {}).get('gender', '').upper()], vocabulary_ids=['Gender'], domain_ids=['Gender'], invalid_reason=False, standard_concept='S'),
            'provider_source_value': data['id'],
            'specialty_source_value': specialty_code,
            'specialty_source_concept_id': find_concept_id(concept, concept_codes=[specialty_code], vocabulary_ids=['NUCC'], domain_ids=['Provider']),
            'gender_source_value': practitioners_dict.get(npi, {}).get('gender', ''),
            'gender_source_concept_id': find_concept_id(concept, concept_names=[practitioners_dict.get(npi, {}).get('gender', '').upper()], vocabulary_ids=['Gender'], domain_ids=['Gender'])
        }

    with open(practitioner_role_file_path, 'r') as file:
        lines = file.readlines()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_line = {executor.submit(process_line, line): line for line in lines}

        for future in as_completed(future_to_line):
            provider_rows.append(future.result())

    provider = pd.DataFrame(provider_rows).drop_duplicates()

    return provider

filepaths = ['/workspaces/synthea_dw/data/fhir/Practitioner.ndjson', '/workspaces/synthea_dw/data/fhir/PractitionerRole.ndjson']
provider_df = provider(filepaths, concept)
provider_df.sample(5)

In [None]:
def episode(filepaths, concept, max_workers=10):
    episode_rows = []

    def process_line(line):
        data = json.loads(line)
        episodes = []

        if 'reasonCode' in data:
            person_id = None
            for participant in data.get('participant', []):
                for role in participant.get('role', []):
                    for coding in role.get('coding', []):
                        if coding.get('code') == '116154003':
                            person_id = participant['member']['reference'].split('/')[-1]
                            break
                    if person_id:
                        break

            if person_id:
                for reasonCode in data['reasonCode']:
                    for coding in reasonCode.get('coding', []):
                        episode = {
                            'episode_id': data['id'],
                            'person_id': person_id,
                            'episode_concept_id': 32533,
                            'episode_start_date': datetime.strptime(data['period']['start'].split('T')[0], '%Y-%m-%d').date(),
                            'episode_start_datetime': datetime.fromisoformat(data['period']['start']),
                            'episode_end_date': None,
                            'episode_end_datetime': None,
                            'episode_parent_id': pd.NA,
                            'episode_number': 1,
                            'episode_object_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Condition'],
                                invalid_reason=False, 
                                standard_concept='S'
                            ),
                            'episode_type_concept_id': 32817,
                            'episode_source_value': coding['code'],
                            'episode_source_concept_id': find_concept_id(
                                concept, 
                                concept_codes=[coding['code']], 
                                vocabulary_ids=['SNOMED'], 
                                domain_ids=['Condition'],
                                invalid_reason=True
                            ),
                        }
                        episodes.append(episode)

        return episodes

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                episode_rows.extend(future.result())

    episode = pd.DataFrame(episode_rows).drop_duplicates()

    return episode

filepaths = ['/workspaces/synthea_dw/data/fhir/CareTeam.ndjson']
episode_df = episode(filepaths, concept)
episode_df.sample(5)

In [None]:
def cost(filepaths, max_workers=10):
    cost_rows = []

    def process_line(line):
        data = json.loads(line)
        cost = {
            'cost_id': data['id'],
            'cost_event_id': pd.NA,
            'cost_domain_id': 32007,
            'cost_type_concept_id': 5032,
            'currency_concept_id': 44818668,
            'total_charge': pd.NA,
            'total_cost': data['total']['value'] if 'total' in data and 'value' in data['total'] else pd.NA,
            'total_paid': pd.NA,
            'paid_by_payer': pd.NA,
            'paid_by_patient': pd.NA,
            'paid_patient_copay': pd.NA,
            'paid_patient_coinsurance': pd.NA,
            'paid_patient_deductible': pd.NA,
            'paid_by_primary': pd.NA,
            'paid_ingredient_cost': pd.NA,
            'paid_dispensing_fee': pd.NA,
            'payer_plan_period_id': pd.NA,
            'amount_allowed': pd.NA,
            'revenue_code_concept_id': 38003025,
            'revenue_code_source_value': None,
            'drg_concept_id': pd.NA,
            'drg_source_value': None
        }
        return cost

    for filepath in filepaths:
        with open(filepath, 'r') as file:
            lines = file.readlines()

        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_line = {executor.submit(process_line, line): line for line in lines}

            for future in as_completed(future_to_line):
                cost_rows.append(future.result())

    cost = pd.DataFrame(cost_rows).drop_duplicates()

    return cost

filepaths = ['/workspaces/synthea_dw/data/fhir/Claim.ndjson']
cost_df = cost(filepaths)
cost_df.sample(5)