In [None]:
%load_ext autoreload
%autoreload 2

## TDAP on OMOP prototyping

This will just be an example of how to get started

In [None]:
# imports and config
import logging
from time import sleep
from pathlib import Path
import json
from random import sample
from multiprocessing import Pool, cpu_count
import funcy
from random import random, randint
from tqdm import tqdm
from datetime import datetime
from dateutil.relativedelta import relativedelta
from pprint import pprint

import pandas as pd
import numpy as np

from sqlalchemy import types


# ripy
from piesafe import piesafe
from ucdripydbutils import ucdripydbutils

# siblings
from tdap.tdap import (TDAPDataManagerOMOP,
                       inject_secrets_into_config,
                       get_concept_data,
                       get_concept_metadata,
                       key_time_to_period,
                       run_omop,
                       print_concepts, print_cleanups, print_derivations, print_extensions)

import example_tdap_config as my_tdap_config

# logic sibs
from example_omop_executor import (
    get_pop,
    create_process_df
)

# notebook specific config
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

# global vars
## Variable definitions
use_named_ad_auth_to_mssql = False

In [None]:
# set up logging
"""
Logging
TDAP 2.0 is desinged to run in parallel
Therefore logging is A LOT more complicated
"""
# clear out previous log file
data_path = Path.cwd() / Path("log")
[f.unlink() for f in data_path.glob("*.log") if f.is_file()]

logging_config = my_tdap_config.logging_config
logging_config['loggers']['']['level'] = 'DEBUG'
logger_name = 'tdap_omop_logger'
log_file_name = 'log/itdap_omop_'+datetime.now().strftime('%Y-%m-%dT%H:%M')+'.log'
logging_config['handlers']['file']['filename'] = log_file_name

# start with a logger with module name
logger = logging.getLogger(logger_name)  # use module name
logging.config.dictConfig(logging_config)        
logger.info("Logger is configured!!!")


In [None]:
# Start with new unified config file
tdap_config_key = 'dev_omop'
dotenv_file_path = '.env'
dev_email = 'awriedl@ucdavis.edu'
tdap_config = my_tdap_config.tdap_configs.get(tdap_config_key)
tdap_config['logging_config'] = logging_config

use_named_ad_auth_to_mssql = False
if use_named_ad_auth_to_mssql:
    ad_user = input("Enter AD username: ")
    ad_pass = input("Enter AD password: ")
else:
    ad_user = None
    ad_pass = None

# update tdap config with secrets
tdap_config = inject_secrets_into_config(dotenv_file_path, tdap_config, dev_email, logger_name, mssql_ad_user=ad_user, mssql_ad_pass=ad_pass)
# DONT print to screen again - secrets exist now

## Initializing an instance of TDAP on OMOP

In [None]:
# what concepts, etc are configured
print_concepts(tdap_config, print_or_log='print')
print_extensions(tdap_config, print_or_log='print')
print_cleanups(tdap_config, print_or_log='print')
print_derivations(tdap_config, print_or_log='print')

### Obtain a population to work with

In [None]:
pop_df = get_pop(tdap_config, logger_name)
print(pop_df.shape)
display(pop_df.head())

In [None]:
# create process df
process_df = create_process_df(pop_df, logger_name)
print(process_df.shape)
display(process_df.head())

In [None]:
process_tuples = list(process_df.filter(['person_id','start_time_str','end_time_str','visit_occurrence_id_list','id_type']).to_records(index=False))
print(len(process_tuples))
print(process_tuples[0])

run_args_tuple = (tdap_config,logger_name,'omop')
process_tuples_final = [tuple(x) + run_args_tuple for x in process_tuples]

print(len(process_tuples_final))
print(process_tuples_final[:1])

---

### TDAP Initialization

When TDAP is initialized, it creates a context for a unique patient and sets the start time, stop time, and sampling frequency between these two times.  It also:

- captures patient demographic data
- creates a template matrix to which data is attached as concepts, defined in the configuration file are iterated through and added to the matrix

In [None]:
# one_process_tuple = sample(process_tuples_final,1)[0]
# data_manager = TDAPDataManagerOMOP(*one_process_tuple)
# print(data_manager.pat_profile_dict.keys())
# print(data_manager.pat_profile_dict['birth_datetime'])
# print(len(data_manager.pat_profile_dict['visit_occurrence_id_list_chunked']))
# print(data_manager.pat_profile_dict['visit_df'].shape)

--- 

## Running TDAP

We know initialization works, now will a full run complete and decorate everything with the conccepts, etc..

In [None]:
"""
    Functions for serial and parallel processing
    Side by side to demonstrate how easy it is to use either
"""
def get_matrix_serial(data_tuple_list):
    ret_dicts = []
    for one_tuple in tqdm(data_tuple_list):
        ret = run_omop(*one_tuple)
        ret_dicts.append(ret)
    return ret_dicts

def get_matrix_parallel(data_tuple_list):
    with Pool(processes=4) as p:
        ret_dicts = p.starmap(run_omop, data_tuple_list)
    return ret_dicts

In [None]:
sample_size = 30
results_serial = get_matrix_serial(sample(process_tuples_final,sample_size))
print(len(results_serial))
print(type(results_serial))
print(type(results_serial[0]))

In [None]:
# one_result = results_serial[0]
# print(one_result.keys())
# print(one_result['pat_profile_dict'].keys())
# print(one_result['matrix_df'].shape)
# display(one_result['matrix_df'].head())

In [None]:
all_matrix_list = [x['matrix_df'] for x in results_serial]
all_matrix_df = pd.concat(all_matrix_list)
print(all_matrix_df.shape)
display(all_matrix_df.head())

In [None]:
all_matrix_df.loc[all_matrix_df['testdx_native'] == 1].head(10)

In [None]:
all_matrix_df.loc[all_matrix_df['testrx_native'] == 1].head(10)

In [None]:
all_matrix_df.loc[all_matrix_df['testproc_native'] == 1].head(10)

In [None]:
all_matrix_df.loc[all_matrix_df['testobs_native'] == 1].head(10)

In [None]:
## NOTE: using process tuples final from above!
## create a process df, then make the call to on of the get_matrix functions
sample_size = 10
results_parallel = get_matrix_parallel(sample(process_tuples_final,sample_size))

print(len(results_parallel))
print(type(results_parallel))
print(type(results_parallel[0]))

In [None]:
one_result = results_parallel[0]
print(one_result.keys())
print(one_result['pat_profile_dict'].keys())
print(one_result['matrix_df'].shape)
display(one_result['matrix_df'].head())

In [None]:
one_result['matrix_df'].info()

---
## Deep dives below this point

### OMOP tables

In [None]:
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
tab_name_wildcard = 'procedure_occurrence'
table_data_sql = f"""select
  schema_name(tab.schema_id) as schema_name,
  tab.name as table_name,
  col.column_id,
  col.name as column_name,
  col.max_length,
  col.precision
from
  sys.tables as tab
  inner join sys.columns as col on tab.object_id = col.object_id
  left join sys.types as t on col.user_type_id = t.user_type_id
where
  schema_name(tab.schema_id) NOT IN ('sys')
  and tab.name = '{tab_name_wildcard}'"""


table_data = pd.read_sql(table_data_sql, omop_engine)
omop_engine.dispose()
table_cols = table_data['column_name'].tolist()
display(table_data)

print()
for col in table_cols:
    print(col+',')


## OMOP Concepts

In [None]:
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
concept_id_list = [4353936]
concept_sql = f"""
select
    *
from
    concept
where
    concept_id in ({",".join([str(x) for x in concept_id_list])})
"""
print(concept_sql)
concept_df = pd.read_sql(concept_sql, omop_engine)
omop_engine.dispose()
display(concept_df)

## OMOP Observation

In [None]:
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
observation_sql = f"""
select top 10000
    *
from
    observation
"""
print(observation_sql)
observation_df = pd.read_sql(observation_sql, omop_engine)
omop_engine.dispose()
display(observation_df.head())

In [None]:
observation_df['observation_concept_id'].value_counts()

## OMOP Procedure Occurrence

In [None]:
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
procedure_occurrence_sql = f"""
select top 10000
    *
from
    procedure_occurrence
"""
print(procedure_occurrence_sql)
procedure_occurrence_df = pd.read_sql(procedure_occurrence_sql, omop_engine)
omop_engine.dispose()
display(procedure_occurrence_df.head())

In [None]:
procedure_occurrence_df['procedure_concept_id'].value_counts()

## OMOP drug exposure

In [None]:
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
drug_exposure_sql = f"""
select top 1000
    *
from
    drug_exposure
"""
print(drug_exposure_sql)
drug_exposure_df = pd.read_sql(drug_exposure_sql, omop_engine)
omop_engine.dispose()
display(drug_exposure_df.head())

In [None]:
drug_exposure_df['drug_concept_id'].value_counts()

## OMOP condition Occurrence

In [None]:
condition_occurence_sql = f"""
select top 10000
    *
from
    condition_occurrence"""

condition_occurence_df = pd.read_sql(condition_occurence_sql, omop_engine)
print(condition_occurence_df.shape)
display(condition_occurence_df.head())

In [None]:
condition_occurence_df['condition_concept_id'].value_counts()

## OMOP Measurments

In [None]:
## Measurment sql


# @TODO - inject concept ids of interest here
omop_concept_ids = []


measurement_sql = f"""
select
    measurement_id,
    person_id,
    measurement_concept_id,
    measurement_concept.concept_name as measurement_concept_name,
    measurement_date,
    measurement_datetime,
    measurement_time,
    measurement_type_concept_id,
    measurement_type_concept.concept_name as measurement_type_concept_name,
    operator_concept_id,
    operator_concept.concept_name as operator_concept_name,
    value_as_number,
    value_as_concept_id,
    value_as_concept.concept_name as value_as_concept_name,
    unit_concept_id,
    range_low,
    range_high,
    provider_id,
    visit_occurrence_id,
    visit_detail_id,
    measurement_source_value,
    measurement_source_concept_id,
    measurement_source_concept.concept_name as measurement_source_concept_name,
    unit_source_value,
    unit_source_concept_id,
    unit_source_concept.concept_name as unit_source_concept_name,
    value_source_value,
    measurement_event_id,
    meas_event_field_concept_id,
    meas_event_field_concept.concept_name as meas_event_field_concept_name
from
    measurement
    left join concept as measurement_concept on measurement.measurement_concept_id = measurement_concept.concept_id
    left join concept as unit_concept on measurement.unit_concept_id = unit_concept.concept_id
    left join concept as value_as_concept on measurement.value_as_concept_id = value_as_concept.concept_id
    left join concept as measurement_source_concept on measurement.measurement_source_concept_id = measurement_source_concept.concept_id
    left join concept as unit_source_concept on measurement.unit_source_concept_id = unit_source_concept.concept_id
    left join concept as meas_event_field_concept on measurement.meas_event_field_concept_id = meas_event_field_concept.concept_id
    left join concept as measurement_type_concept on measurement.measurement_type_concept_id = measurement_type_concept.concept_id
    left join concept as operator_concept on measurement.operator_concept_id = operator_concept.concept_id
where
    measurement_concept_id in ({",".join([str(x) for x in omop_concept_ids])})
"""
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
measurement_df = pd.read_sql(measurement_sql, omop_engine)
omop_engine.dispose()
print(measurement_df.shape)
display(measurement_df.head())

In [None]:
## Measurment sql
vo_id_list = [71869663,70750183,71833551]
measurement_sql = f"""
select
    measurement_id,   
    measurement_datetime,
    operator_concept_id,
    operator_concept.concept_name as operator_concept_name,
    value_as_number,
    value_as_concept_id,
    value_as_concept.concept_name as value_as_concept_name,
    unit_concept_id,
    unit_concept.concept_name as unit_concept_name,
    range_low,
    range_high,    
    unit_source_value,
    value_source_value
from
    measurement
    left join concept as measurement_concept on measurement.measurement_concept_id = measurement_concept.concept_id
    left join concept as unit_concept on measurement.unit_concept_id = unit_concept.concept_id
    left join concept as value_as_concept on measurement.value_as_concept_id = value_as_concept.concept_id
    left join concept as measurement_source_concept on measurement.measurement_source_concept_id = measurement_source_concept.concept_id
    left join concept as unit_source_concept on measurement.unit_source_concept_id = unit_source_concept.concept_id
    left join concept as meas_event_field_concept on measurement.meas_event_field_concept_id = meas_event_field_concept.concept_id
    left join concept as measurement_type_concept on measurement.measurement_type_concept_id = measurement_type_concept.concept_id
    left join concept as operator_concept on measurement.operator_concept_id = operator_concept.concept_id
where
    measurement_concept_id in ({",".join([str(x) for x in omop_concept_ids])})
    and visit_occurrence_id in ({",".join([str(x) for x in vo_id_list])})
"""
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
measurement_df = pd.read_sql(measurement_sql, omop_engine)
omop_engine.dispose()
print(measurement_df.shape)
display(measurement_df.head())

## Table Data

## Person Data

In [None]:
one_person_id = int(input("Enter a person_id: "))
start_time = datetime(2023, 1, 1)
end_time = datetime(2024, 1, 1)
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
person_sql=f"""
        select
            person_id,
            birth_datetime,
            person_source_value as pat_mrn_id,
            gender_source_value as gender,
            race_source_value as race,
            ethnicity_source_value as ethnicity
        from
            person
        where
            person.person_id = {one_person_id}
    """
person_df = pd.read_sql(person_sql, omop_engine)
omop_engine.dispose()

if not person_df.empty:
    pat_dict = person_df.to_dict(orient='records')[0]
    pat_dict["person_id"] = one_person_id
    # derive some age columns
    age_at_start = relativedelta(start_time, pat_dict.get('birth_datetime'))
    age_at_end = relativedelta(end_time, pat_dict.get('birth_datetime'))
    pat_dict["age_at_start_delta"] = age_at_start
    pat_dict["age_at_start_years"] = age_at_start.years
    pat_dict["age_at_end_delta"] = age_at_end
    pat_dict["age_at_end_years"] = age_at_end.years
    
else:
    # create empty dict with the correct keys
    logger.warning(f"Patient data dict for this record was empty for pat_id = {self.pat_id}")
    empty_pat_dict = {"person_id":None,
                        "pat_mrn_id":None,
                        "gender":None,
                        "age_at_start_delta":None,
                        "age_at_start_years":None,
                        "age_at_end_delta":None,
                        "age_at_end_years":None}
    pat_dict = empty_pat_dict

# establish pat info metadata
pat_profile_metadata = {
    "person_id":types.NVARCHAR(20),
    "pat_mrn_id":types.NVARCHAR(20),
    "gender":types.NVARCHAR(5),
    "birth_datetime": types.DateTime(),
    "age_at_start_years": types.NUMERIC(precision=5, scale=2),
    "age_at_end_years": types.NUMERIC(precision=5, scale=2)
}

pat_dict

## Visit Data

Two tables:

1. Visit Occurrence - This table contains Events where Persons engage with the healthcare system for a duration of time. They are often also called �Encounters�. Visits are defined by a configuration of circumstances under which they occur, such as (i) whether the patient comes to a healthcare institution, the other way around, or the interaction is remote, (ii) whether and what kind of trained medical staff is delivering the service during the Visit, and (iii) whether the Visit is transient or for a longer period involving a stay in bed.
1. Visit Detail - this table is technically optional and is a child table of VO.  Example VO indicates a hosiptal stay, while visit detail may reflect durations in each department or maybe each duration of Attending coverage

> Coclusion - Consider making the visit detail data an 'extension' but it will not be used for the primary purpose of boudning when a patient is interacting with a health system.  That data is contained within VO, which we will use as the primary source of encounter information extraction from OMOP

In [None]:
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
omop_visit_occurrence_sql = f"""
select
    visit_occurrence_id,
    person_id,
    visit_concept_id,
    visit_concept.concept_name as visit_concept_name,
    visit_start_date,
    visit_start_datetime,
    visit_end_date,
    visit_end_datetime,
    visit_type_concept_id,
    visit_type_concept.concept_name as visit_type_concept_name,
    vo.provider_id,
    provider.provider_name,
    vo.care_site_id,
    care_site.care_site_name,
    visit_source_value,
    visit_source_concept_id,
    visit_source_concept.concept_name as visit_source_concept_name,
    admitting_source_concept_id,
    admitting_source_concept.concept_name as admitting_source_concept_name,
    admitting_source_value,
    discharge_to_concept_id,
    discharge_to_concept.concept_name as discharge_to_concept_name,
    discharge_to_source_value,
    preceding_visit_occurrence_id
from
    visit_occurrence vo
    left join concept visit_concept on vo.visit_concept_id = visit_concept.concept_id
    left join concept visit_type_concept on vo.visit_type_concept_id = visit_type_concept.concept_id
    left join concept discharge_to_concept on vo.discharge_to_concept_id = discharge_to_concept.concept_id
    left join concept admitting_source_concept on vo.admitting_source_concept_id = admitting_source_concept.concept_id
    left join concept visit_source_concept on vo.visit_source_concept_id = visit_source_concept.concept_id
    left join provider on vo.provider_id = provider.provider_id
    left join care_site on vo.care_site_id = care_site.care_site_id
where
    vo.visit_start_datetime >= '{start_time}'
    and vo.visit_end_datetime <= '{end_time}'
    -- and vo.person_id = {one_person_id}
"""
omop_visit_occurrence_df = pd.read_sql(omop_visit_occurrence_sql, omop_engine)
omop_engine.dispose()
print(omop_visit_occurrence_df.shape)
display(omop_visit_occurrence_df.head())

#### Visit Detail Deep Dive

In [None]:
# step 1, find a visit occurence(s) that have many detail rows and inspect different axes of breakout
omop_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['omop']['secret'])
visit_detail_sql = f"""
select
    visit_detail_id,
    person_id,
    visit_detail_concept_id,
    visit_detail_concept.concept_name as visit_detail_concept_name,
    visit_detail_start_date,
    visit_detail_start_datetime,
    visit_detail_end_date,
    visit_detail_end_datetime,
    visit_detail_type_concept_id,
    visit_detail_type_concept.concept_name as visit_detail_type_concept_name,
    provider_id,
    care_site_id,
    admitting_source_concept_id,
    admitting_source_concept.concept_name as admitting_source_concept_name,
    discharge_to_concept_id,
    discharge_to_concept.concept_name as discharge_to_concept_name,
    preceding_visit_detail_id,
    visit_detail_source_value,
    visit_detail_source_concept_id,
    visit_detail_source_concept.concept_name as visit_detail_source_concept_name,
    admitting_source_value,
    discharge_to_source_value,
    visit_detail_parent_id,
    visit_occurrence_id
from
    visit_detail
    left join concept visit_detail_concept on visit_detail.visit_detail_concept_id = visit_detail_concept.concept_id
    left join concept visit_detail_type_concept on visit_detail.visit_detail_type_concept_id = visit_detail_type_concept.concept_id
    left join concept admitting_source_concept on visit_detail.admitting_source_concept_id = admitting_source_concept.concept_id
    left join concept discharge_to_concept on visit_detail.discharge_to_concept_id = discharge_to_concept.concept_id
    left join concept visit_detail_source_concept on visit_detail.visit_detail_source_concept_id = visit_detail_source_concept.concept_id
where
    visit_detail_start_datetime >= '{start_time}'
    and visit_detail_end_datetime <= '{end_time}'
"""
omop_visit_detail_df = pd.read_sql(visit_detail_sql, omop_engine)
omop_engine.dispose()
print(omop_visit_detail_df.shape)
display(omop_visit_detail_df.head())

# grab the top N visit_occurrence_ids
omop_visits_with_detail_df = omop_visit_detail_df['visit_occurrence_id'].value_counts().to_frame().reset_index().sort_values('count', ascending=False).head(20).copy()
display(omop_visits_with_detail_df)
vo_ids_w_detail = omop_visits_with_detail_df['visit_occurrence_id'].tolist()
print(len(vo_ids_w_detail))

In [None]:
one_random_vo_id = sample(vo_ids_w_detail, 1)[0]
print(one_random_vo_id)

print("The Visit Occurrence: ")
display(omop_visit_occurrence_df.loc[omop_visit_occurrence_df['visit_occurrence_id'] == one_random_vo_id])
print()
print("The Visit Details: ")
(omop_visit_detail_df.loc[omop_visit_detail_df['visit_occurrence_id'] == one_random_vo_id]
                    .sort_values(by=['visit_detail_start_datetime']))


In [None]:
## what does visit detail source value point to back in Clarity?  Its either a CSN or an ADT event...
## Its neither.  Its OR events - confirmed by looking at OMOP SQL
## These return data but, its purely accidental
one_visit_detail_source_value = omop_visit_detail_df['visit_detail_source_value'].sample(1).values[0]
print(one_visit_detail_source_value)
clarity_engine = ucdripydbutils.get_engine_from_connect_dict(tdap_config['databases']['clarity']['secret'])
clarity_enc_sql = f"""select * from pat_enc where pat_enc_csn_id = {one_visit_detail_source_value}"""
enc_df = pd.read_sql(clarity_enc_sql, clarity_engine)
display(enc_df)
adt_sql = f"""select * from clarity_adt where event_id = {one_visit_detail_source_value}"""
adt_df = pd.read_sql(adt_sql, clarity_engine)
display(adt_df)
