# Update PIONEER custom vocabularies

Follow the steps from this notebook to load or update the PIONEER custom vocabularies.

In [None]:
import getpass
import pandas as pd
import sqlalchemy as sa

from sqlalchemy import create_engine, text
from pathlib import Path

In [None]:
# Path to the pioneer_custom_vocabulary dir of the ohdsi-omop-pioneer repository
pio_vocab_dir = Path('/home/jovyan/ohdsi-omop-pioneer/pioneer_custom_vocabulary/')

## Schema names

In [None]:
VOCAB_SCHEMA = 'vocab'
TEMP_SCHEMA = 'temp_schema'

## Connect to the db

In [None]:
server = 'localhost'
port = '5432'
db = 'ohdsi'
engine = create_engine(f'postgresql://{input("User:")}:{getpass.getpass("Password:")}@{server}:{port}/{db}')

In [None]:
sa.inspect(engine).get_schema_names()

## Load new custom vocab files into a temporary schema

In [None]:
with engine.connect() as con:
    con.execute(f'CREATE SCHEMA {TEMP_SCHEMA};')

In [None]:
# Custom vocab file paths
vocabulary_file = pio_vocab_dir/'pioneer_vocabulary.csv'
concept_class_file = pio_vocab_dir/'pioneer_concept_class.csv'
concept_file = pio_vocab_dir/'pioneer_concepts.csv'

# Currently loaded custom vocabulary records as dfs
query = f"SELECT * FROM {VOCAB_SCHEMA}.vocabulary WHERE vocabulary_concept_id = 0"
current_vocab_df = pd.read_sql(sql=query, con=engine)
current_vocab_df = current_vocab_df.astype({c: 'UInt64' for c in current_vocab_df.columns if c.endswith('concept_id')})
query = f"SELECT * FROM {VOCAB_SCHEMA}.concept_class WHERE concept_class_concept_id = 0"
current_concept_class_df = pd.read_sql(sql=query, con=engine)
current_concept_class_df = current_concept_class_df.astype({c: 'UInt64' for c in current_concept_class_df.columns if c.endswith('concept_id')})
query = f"SELECT * FROM {VOCAB_SCHEMA}.concept WHERE concept_id >= 2000000000"
date_cols = ['valid_start_date', 'valid_end_date']
current_concept_df = pd.read_sql(sql=query, con=engine, parse_dates=date_cols)
current_concept_df = current_concept_df.astype({c: 'UInt64' for c in current_concept_df.columns if c.endswith('concept_id')})

# Read new custom vocab files as dfs
vocab_df = pd.read_csv(vocabulary_file, sep=',', dtype=current_vocab_df.dtypes.to_dict())
concept_class_df = pd.read_csv(concept_class_file, sep=',', dtype=current_concept_class_df.dtypes.to_dict())
dtypes = {k: v for k, v in current_concept_df.dtypes.to_dict().items() if k not in date_cols}
concept_df = pd.read_csv(concept_file, sep=',', dtype=dtypes, parse_dates=date_cols)

# Load the vocab files into the newly created schema
vocab_df.to_sql(name='vocabulary', con=engine, schema=TEMP_SCHEMA, index=False, if_exists='replace')
concept_class_df.to_sql(name='concept_class', con=engine, schema=TEMP_SCHEMA, index=False, if_exists='replace')
concept_df.to_sql(name='concept', con=engine, schema=TEMP_SCHEMA, index=False, if_exists='replace')

## Check for new custom vocabularies and replace the current ones

Each replacement query will be executed as a transaction.  
Meaning that if it fails it will automatically do a rollback, otherwise it will be committed.

In [None]:
def df_a_is_subset_of_df_b(a: pd.DataFrame, b: pd.DataFrame) -> bool:
    return len(a.merge(b)) == len(a)

### vocabulary

In [None]:
new_records_available = not df_a_is_subset_of_df_b(a=vocab_df, b=current_vocab_df)
print(f'New vocabulary table records available: {new_records_available}')

In [None]:
replace_query = f"""
WITH ins1 AS (
    INSERT INTO {VOCAB_SCHEMA}.vocabulary
    SELECT * FROM {TEMP_SCHEMA}.vocabulary
)
DELETE FROM {VOCAB_SCHEMA}.vocabulary
WHERE vocabulary_concept_id = 0 
AND vocabulary_id in 
    (select vocabulary_id from {TEMP_SCHEMA}.vocabulary);
"""
with engine.begin() as con:
    con.execute(replace_query)

### concept_class

In [None]:
new_records_available = not df_a_is_subset_of_df_b(a=concept_class_df, b=current_concept_class_df)
print(f'New concept_class table records available: {new_records_available}')

In [None]:
replace_query = f"""
WITH ins1 AS (
    INSERT INTO {VOCAB_SCHEMA}.concept_class
    SELECT * FROM {TEMP_SCHEMA}.concept_class
)
DELETE FROM {VOCAB_SCHEMA}.concept_class 
WHERE concept_class_concept_id = 0;
"""
with engine.begin() as con:
    con.execute(replace_query)

### concept

In [None]:
new_records_available = not df_a_is_subset_of_df_b(a=concept_df, b=current_concept_df)
print(f'New concept table records available: {new_records_available}')

In [None]:
replace_query = f"""
WITH ins1 AS (
    INSERT INTO {VOCAB_SCHEMA}.concept
    SELECT * FROM {TEMP_SCHEMA}.concept
)
DELETE FROM {VOCAB_SCHEMA}.concept
WHERE concept_id in 
    (select concept_id from {TEMP_SCHEMA}.concept);
"""
with engine.begin() as con:
    con.execute(replace_query)

## Drop the temporary schema

In [None]:
with engine.connect() as con:
    con.execute(f'DROP SCHEMA {TEMP_SCHEMA} CASCADE;')