# New command: vocab --update

The script will perform the following actions:

- Request a new vocabulary version from [Athena](https://athena.ohdsi.org). A list of vocabularies used is in config.yml.
- Download vocabulary zip file.
- Unzip vocabulary file retrieved from Athena.
- Update CPT4 codes using cpt.bat tool provided.

Optional step

- Import vocabulary tables into project database.

Requires an Athena account to download the vocabulary tables and api_key from the [NLM](https://uts.nlm.nih.gov) to update CPT4 codes.

TODO:

- get an account that everyone can access
- implement request credentials
- implement request api_key

# Download vocabulary

In [1]:
import os
import glob
import time
import yaml
from selenium import webdriver
from selenium.webdriver.remote.errorhandler import NoSuchElementException
from datetime import datetime as dt

from omop_etl.utils import timeitc
from omop_etl.load import Loader
from omop_etl.utils import search

In [2]:
config = r'\\share.ahc.ufl.edu\share$\DSS\IDR_Projects\COVID-19\OMOP\new_pipeline\config.yml'
config = r'\\share.ahc.ufl.edu\share$\DSS\IDR_Projects\Cancer_Center\omop_cc_etl\config.yml'
loader = Loader(config)
download_dir = os.path.join(loader.config.project_dir, 'vocabulary')

In [3]:
def athena_driver(username, password, chrome_path, headless=True, download_dir=None):
    options = webdriver.ChromeOptions()
    options.add_argument("--incognito")
    options.add_argument('window-size=1680x900')
    options.headless = headless
    if download_dir:
        options.add_experimental_option('prefs', {'download.default_directory': download_dir})
    driver = webdriver.Chrome(executable_path=chrome_path, options=options)
    driver.set_window_position(0,0)
    driver.set_window_size(1200, 1375)
    driver.set_page_load_timeout(15)
    
    driver.get("https://athena.ohdsi.org/")
    driver.find_element_by_xpath('/html/body/div/div/div[4]/div[2]/div/div[2]/div[2]/button').click()
    driver.find_element_by_xpath('/html/body/div/div/header/nav/div[3]/a').click()
    driver.find_element_by_xpath('/html/body/div/div/div[1]/div/div[2]/div/button').click()

    main_window = driver.current_window_handle
    for handle in driver.window_handles:
        if handle != main_window:
            popup = handle
            driver.switch_to.window(popup)

    driver.find_element_by_xpath('//*[@id="username"]').send_keys(username)
    driver.find_element_by_xpath('//*[@id="password"]').send_keys(password)
    driver.find_element_by_xpath('/html/body/div[1]/div/div/div[2]/div/form/section[3]/input[4]').click()
    driver.switch_to.window(driver.window_handles[0])

    return driver 

In [11]:
def request_new_vocabulary_file(driver, vocabularies):
    driver.get('https://athena.ohdsi.org/search-terms/start')
    driver.find_element_by_xpath('/html/body/div/div/header/nav/div[3]/div/a/div[2]').click()
    driver.find_element_by_xpath('//*[@id="app"]/div/header/nav/div[3]/div/div/div/a[1]').click()
    driver.find_element_by_xpath('//*[@id="app"]/div/header/nav/div[2]/a').click()
    
    # double click on select all to make sure no vocab is selected 
    driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/table/thead/tr/th[1]/label').click()
    driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/table/thead/tr/th[1]/label').click()

    # Select vocabularies
    checkboxes = driver.find_elements_by_css_selector("#app > div > div.at-vocabs > div.at-vocabularies > table > tbody > tr")
    
    for checkbox in checkboxes:
        cls = checkbox.find_element_by_class_name('at-vocabularies__code-td')
        if cls.text in vocabularies:
            checkbox.click()
            print(f'Vocabulary {cls.text} was selected')
    
    datestamp = dt.today().strftime('%m_%d_%Y')
    driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[1]/button').click()
    driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[3]/div[2]/div/div[2]/form/div/div[1]/input').send_keys(f'vocabulary_5x_{datestamp}')
    
    # Request vocab file
    driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[3]/div[2]/div/div[2]/form/div/div[2]/button[1]').click()

    #return to main page
    driver.get('https://athena.ohdsi.org/search-terms/start')

In [23]:
def download_vocabulary_file(driver, get_last=True, vocabulary_name=None, archive=True):
    """Download most recent vocabulary file."""
    driver.get('https://athena.ohdsi.org/search-terms/start')
    driver.find_element_by_xpath('/html/body/div/div/header/nav/div[3]/div/a/div[2]').click()
    driver.find_element_by_xpath('//*[@id="app"]/div/header/nav/div[3]/div/div/div/a[1]').click()
    driver.find_element_by_xpath('//*[@id="app"]/div/header/nav/div[3]/div').click()
    buttons = driver.find_elements_by_class_name("react-sanfona-item")

    if get_last:
        button = buttons[0]
        tool = button.find_element_by_class_name('ac-toolbar')
        status = tool.text.split()[-1]

        if status == 'DOWNLOADSHAREARCHIVE': 
            driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[1]').click()
            if archive:
                time.sleep(30)
                driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[3]').click()

        elif status == 'ARCHIVEDRESTORE':
            driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button').click()
            time.sleep(2)
            tool = button.find_element_by_class_name('ac-toolbar')
            status = tool.text.split()[-1]

        while status == 'PENDING':
            print('Restoring vocabulary, please wait.', end='\r')
            time.sleep(60)
            tool = button.find_element_by_class_name('ac-toolbar')
            status = tool.text.split()[-1]

            if status == 'DOWNLOADSHAREARCHIVE':
                time.sleep(30)
                driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[1]').click()
                print("Download started")
                if archive:
                    time.sleep(30)
                    driver.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[3]').click() 

In [7]:
# Enter your Athena credentials to login
USERNAME = "yankuic@ufl.edu"
PASSWORD = "7DSrMx7JNTHaXEM"
path = r'Z:\OMOP_CDM'
athena = athena_driver(USERNAME, PASSWORD, loader.chrome_path, headless=True, download_dir=download_dir)

In [12]:
request_new_vocabulary_file(athena, loader.config.vocabularies)

Vocabulary SNOMED was selected
Vocabulary ICD9CM was selected
Vocabulary ICD9Proc was selected
Vocabulary CPT4 was selected
Vocabulary HCPCS was selected
Vocabulary LOINC was selected
Vocabulary NDFRT was selected
Vocabulary RxNorm was selected
Vocabulary NDC was selected
Vocabulary Gender was selected
Vocabulary Race was selected
Vocabulary CMS Place of Service was selected
Vocabulary ATC was selected
Vocabulary VA Product was selected
Vocabulary VA Class was selected
Vocabulary ICD10 was selected
Vocabulary ICD10PCS was selected
Vocabulary DRG was selected
Vocabulary MDC was selected
Vocabulary APC was selected
Vocabulary Revenue Code was selected
Vocabulary Ethnicity was selected
Vocabulary MeSH was selected
Vocabulary NUCC was selected
Vocabulary Medicare Specialty was selected
Vocabulary SPL was selected
Vocabulary GCN_SEQNO was selected
Vocabulary OPCS4 was selected
Vocabulary HES Specialty was selected
Vocabulary PCORNet was selected
Vocabulary Currency was selected
Vocabulary I

In [22]:
# request_new_vocabulary_file(athena)
download_vocabulary_file(athena)
# athena.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[2]/div[1]/ul/li/div/button').click()

Download started


In [34]:
# method to get the downloaded file name
def get_downloaded_filename():
     zip_list = glob.glob(download_dir + '/*.zip')
     last_zip = max(zip_list, key=os.path.getctime)
     return last_zip

In [26]:
# athena.execute_script("return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('#progress').value")
# athena.execute_script("return document.querySelector('downloads-manager').shadowRoot.querySelector('#downloadsList downloads-item').shadowRoot.querySelector('div#content  #file-link').text")

In [35]:
filename = get_downloaded_filename()
filename

'\\\\share.ahc.ufl.edu\\share$\\DSS\\IDR_Projects\\COVID-19\\OMOP\\new_pipeline\\vocabulary\\vocabulary_download_v5_{eb084edd-3ac2-4259-9e48-726252e24edd}_1630083285143.zip'

In [36]:
athena.close()

# Unzip vocabulary files

In [37]:
import zipfile

with zipfile.ZipFile(filename) as z:
    csv = [f.filename for f in z.filelist if search('csv', f.filename)]
    not_csv = [f.filename for f in z.filelist if f.filename not in csv]
    z.extractall(download_dir)
    print('Vocabulary files were extracted.')

Vocabulary files were extracted.


# Update CPT4 codes

In [38]:
# requires api_key from https://uts.nlm.nih.gov
api_key = '189a8df1-7922-4d9f-a1c3-80a8db7c5397'
os.system(f'pushd {download_dir} && cpt.bat {api_key} && popd')

0

In [39]:
# Clean up directory
os.remove(os.path.join(download_dir, filename))
[os.remove(os.path.join(download_dir, f)) for f in not_csv]

[None, None, None, None]

# Import vocabulary tables into project database

In [3]:
import os
from omop_etl.io import import_csv

schema = 'xref'
# table = 'concept'
database = 'dws_omop'
server='dwsrsrch01.***REMOVED***.edu'

tables = ['concept', 'concept_ancestor','concept_class', 'concept_relationship', 'concept_synonym',
          'domain', 'drug_strength', 'relationship', 'vocabulary', 'source_to_concept_map']

for table in tables:
    filepath = os.path.join(download_dir, f'{table.upper()}.csv')
    print(f'Processing table {table}, please wait')
    print(import_csv(filepath, table, 1e6, schema, server, database, keep_default_na=False, sep='\t'))

Processing table concept, please wait
Importing data into table concept finished in 00:06:10
6567004 rows affected
Processing table concept_ancestor, please wait
Importing data into table concept_ancestor finished in 00:39:44
73942777 rows affected
Processing table concept_class, please wait
Importing data into table concept_class finished in 00:00:00
415 rows affected
Processing table concept_relationship, please wait
Importing data into table concept_relationship finished in 00:30:07
47754428 rows affected
Processing table concept_synonym, please wait
Importing data into table concept_synonym finished in 00:01:26
2176340 rows affected
Processing table domain, please wait
Importing data into table domain finished in 00:00:00
48 rows affected
Processing table drug_strength, please wait
Importing data into table drug_strength finished in 00:02:06
2840468 rows affected
Processing table relationship, please wait
Importing data into table relationship finished in 00:00:00
622 rows affected

# Legacy code

Code used for experimentation prior implementing into omop_etl app. Keeping for future reference.

In [62]:
# import glob, os
# path = r'Z:\OMOP_CDM'
# csv_list = glob.glob(path + '/*.zip')
# last_zip = max(csv_list, key=os.path.getctime)
# last_zip

In [12]:
## athena.switch_to.window(athena.window_handles[0])
# athena.get('https://athena.ohdsi.org/search-terms/start')
# athena.find_element_by_xpath('/html/body/div/div/header/nav/div[3]/div/a/div[2]').click()
# athena.find_element_by_xpath('//*[@id="app"]/div/header/nav/div[3]/div/div/div/a[1]').click()
# athena.find_element_by_xpath('//*[@id="app"]/div/header/nav/div[3]/div').click()
# buttons = athena.find_elements_by_class_name("react-sanfona-item")
# last = True
# archive = False
# if last:
#     button = buttons[0]
#     tool = button.find_element_by_class_name('ac-toolbar')
#     status = tool.text.split()[-1]
#     if status == 'DOWNLOADSHAREARCHIVE': 
#         athena.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[1]').click()
#         if archive:
#             time.sleep(30)
#             athena.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[3]').click()
#     elif status == 'ARCHIVEDRESTORE':
#         athena.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button').click()
#         time.sleep(2)
#         tool = button.find_element_by_class_name('ac-toolbar')
#         status = tool.text.split()[-1]
#         while status == 'PENDING':
#             print('Restoring vocabulary, please wait.', end='\r')
#             time.sleep(60)
#             tool = button.find_element_by_class_name('ac-toolbar')
#             status = tool.text.split()[-1]
#             if status == 'DOWNLOADSHAREARCHIVE':
#                 time.sleep(30)
#                 athena.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[1]').click()
#                 print("Download started")
#                 if archive:
#                     time.sleep(30)
#                     athena.find_element_by_xpath('//*[@id="app"]/div/div[1]/div[2]/div[1]/div[1]/ul/li/div/button[3]').click()                
#     elif status == 'PENDING':
#         print('Restoring vocabulary, try again later.')

Restoring vocabulary, try again later.


In [7]:
# import datetime
# import time
# import math
# from pydma.utils import timeit_context
# import pandas as pd
# import numpy as np
# import numpy.ma as ma
# from turbodbc import connect
# from turbodbc.exceptions import DatabaseError, InterfaceError
# from pydma.databases import OneFLDb

# database='DWS_CC_OMOP'
# omop = OneFLDb('edw', database=database)
# today = datetime.datetime.today()
# date_flag = today.strftime('%Y%m%d')
# schema = 'xref'

# t_connection = connect(driver='{SQL Server}', server='edw.***REMOVED***.edu', 
#               database=database, trusted_connection='yes')

# tables = ['concept']#, 'concept_ancestor','concept_class', 'concept_relationship', 'concept_synonym',
# #           'domain', 'drug_strength', 'relationship', 'vocabulary']
# df_debug = ''

# for table in tables:
#     # Truncate table
#     cursor = t_connection.cursor()
    
#     sql = """\
#         truncate table {}.{}
#         """.format(schema, table)
    
#     # Use raw_connection to access data types.
#     raw_connection = omop.engine.raw_connection()
#     omop_cursor = raw_connection.cursor()
#     omop_cursor.execute(sql)
#     omop_cursor.commit()
          
#     # Retrieve sql data types 
#     omop_cursor.execute(f'select top 1 * from {schema}.{table}')
#     dtypes = {t[0]:t[1] for t in omop_cursor.description}
# #     nullable = [t[0] for t in omop_cursor.description if t[-1]]
#     omop_cursor.close()
#     raw_connection.close()
    
#     # Use turbodbc supported data types
#     for t in dtypes.keys():
#         if dtypes[t] is int:
#             dtypes[t] = np.int64
    
#     if table == 'drug_strength':
#         chunks = pd.read_csv(r'z:\OMOP_CDM\Vocabulary_v5x\{}.csv'.format(table.upper()), #keep_default_na=False, 
#                              chunksize=1000000, sep='\t', dtype=str, low_memory=False)

#     else:
#         chunks = pd.read_csv(r'z:\OMOP_CDM\Vocabulary_v5x\{}.csv'.format(table.upper()), keep_default_na=False,
#                          chunksize=1000000, dtype=dtypes, sep='\t')
    
#     insert_query = """
#             SET ANSI_WARNINGS OFF
            
#             INSERT INTO {0}.{1} ({2})
#             VALUES ({3})
            
#             SET ANSI_WARNINGS ON
#         """
    
#     remove_999 = """
#             update {0}.{1}
#             set {2} = NULL
#             where {2} = -999
#     """
    
#     count = 0
#     rows_proc = 0
#     with timeit_context(f'Processing table {table}'):
#         print(f'Loading table {table} into schema {schema}. Please wait.')
#         for chunk in chunks:
            
#             if table == 'drug_strength':
                
#                 for col in chunk.columns:
#                     data_type = dtypes[col]
#                     if data_type is np.int64:
#                         chunk[col] = chunk[col].fillna(-999).astype(data_type)
#                     elif data_type is str:
#                         chunk[col] = chunk[col].fillna('').astype(data_type)
                
#             # Replace all NaN for None - turbodbc does not support NaN
#             chunk = chunk.where((pd.notnull(chunk)), None)

#             if count == 0:
#                 columns = ','.join(chunk.columns)
#                 n_cols = len(chunk.columns)
#                 placehl = ','.join(['?']*n_cols)
#                 count=+1

#             insert_query = insert_query.format(schema, table, columns, placehl)

#             try:
#                 cursor.executemanycolumns(insert_query, [np.ascontiguousarray(chunk[col].values) for col in chunk.columns])
#                 rows_proc = rows_proc + chunk.shape[0]
#                 t_connection.commit()
                                
#             except (DatabaseError, InterfaceError, ValueError) as e:
#                 df_debug = chunk
#                 t_connection.rollback()
#                 t_connection.close()
#                 raise e
    
#     if table == 'drug_strength':
        
#         for col in chunk.columns:
#             data_type = dtypes[col]
#             if data_type is np.int64:
#                 print(f'Cleaning up column {col}')
#                 cursor.execute(remove_999.format(schema, table, col))
#                 t_connection.commit()

#     cursor.close()            
#     print(f'Import complete: {rows_proc} rows processed')

# t_connection.close()

In [1]:
# _BOOLEAN_CODE = 0
# _INTEGER_CODE = 10
# _FLOATING_POINT_CODE = 20
# _STRING_CODE = 30
# _UNICODE_CODE = 31
# _TIMESTAMP_CODE = 40
# _DATE_CODE = 41
    
# import datetime
# import time
# import math
# from omop_etl.utils import timeitc
# import pandas as pd
# import numpy as np
# from turbodbc import connect, make_options
# from turbodbc.exceptions import DatabaseError, InterfaceError
# from pydma.databases import OneFLDb

# schema = 'testing'
# table = 'drug_strength'
# database = 'dws_omop'
# omop = OneFLDb('edw', database=database)

# tbdbc_dtypes = {10: int, 20:np.int64, 30:str}

# options = make_options(use_async_io=True,
#                        prefer_unicode=True,
#                        fetch_wchar_as_char=True)

# connection = connect(driver='{SQL Server}', server='edw.***REMOVED***.edu', 
#                      database=database, trusted_connection='yes', turbodbc_options=options)

# #get table data types
# cursor = connection.cursor()
# cursor.execute(f'select top 1 * from {schema}.{table}')
# dtypes = {t[0]:t[1] for t in cursor.description}
# # cursor.close()  

# for t in dtypes.keys():
#     if dtypes[t] == 10:
#         dtypes[t] = np.int64
#     elif dtypes[t] == 20:
#         dtypes[t] = float
#     else:
#         dtypes[t] = str

# try:
#     next(pd.read_csv(f'z:/OMOP_CDM/Vocabulary_v5x/{table.upper()}.csv', chunksize=1000, dtype=dtypes, sep='\t'))
    
#     chunks = pd.read_csv(f'z:/OMOP_CDM/Vocabulary_v5x/{table.upper()}.csv', keep_default_na=False,
#                      chunksize=100000, dtype=dtypes, sep='\t')
# except ValueError as e:
#     chunks = pd.read_csv(f'z:/OMOP_CDM/Vocabulary_v5x/{table.upper()}.csv', chunksize=100000, keep_default_na=False, dtype=str, sep='\t')


# count = 0
# for chunk in chunks:
#     if count == 0:
#         columns = ','.join(chunk.columns)
#         n_cols = len(chunk.columns)
#         placehl = ','.join(['?']*n_cols)
#         count=+1

#     else: break
        
# chunk = chunk.where(pd.notnull(chunk.replace('', np.nan)), None)

# insert_query = f"""
#         set ansi_warnings off; 
#         insert into {schema}.{table} ({columns})
#         values ({placehl})
#         set ansi_warnings on
#     """

# truncate_str = f'truncate table {schema}.{table}'
# cursor.execute(truncate_str)
# connection.commit()

# with timeitc(f'Importing {table}'):
#     cursor.executemanycolumns(insert_query, [np.ascontiguousarray(chunk[col].values) for col in chunk.columns])
#     connection.commit()

# connection.close()

Unable to import optional dependencies:
selenium: No module named 'selenium'
Importing drug_strength finished in 00:00:05


In [41]:
# loinc = pd.read_csv('/OMOP/omop_etl/xref/loinc.csv', sep='\t')
# source_to_concept = loinc = pd.read_csv('/OMOP/omop_etl/xref/source_to_concept_map.csv', sep='\t')

In [42]:
# with omop.engine.connect() as con:
#     loinc.to_sql('loinc', con, schema='xref', index=False, if_exists='append')
#     source_to_concept.to_sql('source_to_concept_map', con, schema='xref', index=False, if_exists='append')

In [4]:
# truncate_str = f'truncate table {schema}.{table}'
# cursor.execute(truncate_str)
# connection.commit()
    
# with timeitc(f'Importing {table}'):
#     with omop.engine.connect() as con:
#         chunk.to_sql(table, con, schema='testing', index=False, if_exists='append')