In [5]:
import os
import sqlalchemy
import pandas as pd
import numpy as np
import argparse
from warnings import warn

In [15]:
# ADD LOCAL PYTHON FILES
import pydecovid
from pydecovid.dashutil.tableutil import *
from pydecovid.queries import qry_table1
from pydecovid.db import tabledefs

In [10]:
from sqlalchemy import select, cast, Table

### Parse user / command line options

In [11]:
def parse_args(use_args=None):
    parser = argparse.ArgumentParser()
    parser.add_argument("-u", "--user", help="Database username", default="alexbird")
    parser.add_argument("-p", "--password", help="Database password", default="")
    parser.add_argument("-d", "--dialect", help="Database dialect", default="postgresql")
    parser.add_argument("--url", help="Database URL", default="localhost")
    parser.add_argument("--driver", help="Database driver (e.g. pyodbc)", default="")
    parser.add_argument("--db", help="Database name", default="synpuf1k")
    parser.add_argument("--dsn", help="Data source name", default="")
    parser.add_argument("--trusted", help="Trusted connection", action='store_true')
    
    if use_args:
        args = parser.parse_args(use_args.split(" ")).__dict__
    else:
        args = parser.parse_args().__dict__
    
    assert not args['dialect'].lower() == "sqlite", "SQLite not supported"
    uidpwd = "{user:s}:{password}".format(**args) if not args['trusted'] else ""
    args['driver'] = "+" + args['driver'] if len(args['driver']) > 0 else ""
    rdbms = "{dialect:s}{driver:s}".format(**args)
    url = "@" + args['url'] if len(args['dsn']) == 0 else args['dsn']
    db = "/" + args['db'] if len(args['db']) > 0 else args['db']
    
    # extract connection string
    connstr = f"{rdbms:s}://{uidpwd:s}{url:s}{db:s}"
    args = {k:tmp[k] for k in args.keys() - ["dialect", "user", "password", "db", "driver", 
                                             "db", "dsn", "trusted", "url"]}
    args["connstr"] = connstr
    return args

In [12]:
user_args = parse_args("-d postgresql")

### Connect to DB and load from tables

`Pandas` has a direct `read_sql` function, which would be absolutely fine if we were not database agnostic.

1. Achilles is RDBMS agnostic, and it is reasonable to keep our work RDBMS agnostic if possible.
2. `sqlalchemy` makes it easy to remain database agnostic.
3. The codebase was initially developed locally on Postgres rather than SQL Server.

In [13]:
engine = sqlalchemy.create_engine(user_args['connstr'])
metadata = sqlalchemy.MetaData()

In [16]:
# SAFETY: keep results tables schemas in `tablerefs` file and load explicitly
# rather than using schema from existing connection
#
# ==> The OMOP schema is not likely to change, but Achilles might, or some
# mistake might be made during the creation / running of the tool. We prefer
# the code not to fail silently
tbl_results, tbl_results_dist = tabledefs.results_tables(metadata);
assert tbl_results.exists(engine), "(Standard) results table not found in the database."
assert tbl_results_dist.exists(engine), "(Distribution) results table not found in the database."

In [17]:
# Load CONCEPT metadata from database (see also metadata.reflect(engine))
tbl_concept = Table('concept', metadata, autoload=True, autoload_with=engine)

### Joining analysis results to concepts

Now, this is much more ugly than I had anticipated. The possible values in the OLAP-like columns (`stratum_` 1 through 5) can include `concept_id`s, `NULL`s and strings. They are currently stored in `VARCHAR` format, and filtering for numeric values is technically possible in SQL server, but not in all dialects.

Since we need to do 5 joins to the same table (one for each stratum), and only a limited subset of the large `Concept` table will be used, I propose that this is done by Python rather than by the RDBMS. I do not believe that we will be losing much in the way of performance in doing this.

### Annotate non-concept rows in results table

In [18]:
achilles_schema = pd.read_csv("../../Achilles/inst/csv/achilles/achilles_analysis_details.csv")
achilles_schema = achilles_schema[["ANALYSIS_ID", "STRATUM_1_NAME", "STRATUM_2_NAME",
                                  "STRATUM_3_NAME", "STRATUM_4_NAME", "STRATUM_5_NAME", "ANALYSIS_NAME"]]
achilles_schema['STRATUM_5_NAME'] = achilles_schema['STRATUM_5_NAME'].astype(object)

In [19]:
def ref_age(x):
    return (x == "year_of_birth") | (x == "age") | (x == "age_decile") | (x == "age decile")
def ref_datetime(x):
    return (x == 'calendar_month') | (x == 'calendar month') | (x == 'calendar year')
def ref_periods(x):
    return (x == 'payer plan period length 30d increments') | \
           (x == 'Observation period length 30d increments') | \
           (x == 'number of observation periods') | \
           (x == 'number of payer plan periods')
def ref_location(x):
    return (x == '3-digit zip') | (x == 'state')
def ref_table(x):
    return (x == 'table name') | (x == 'table_name')
def ref_other(x):
    return (x == 'source_value')
def bad_concept(x):
    x = x.str.strip()
    return ref_age(x) | ref_datetime(x) | ref_location(x) | ref_periods(x) | ref_table(x) | \
           ref_other(x) | x.isna()

In [20]:
not_concepts = [bad_concept(achilles_schema["STRATUM_{:d}_NAME".format(i+1)]) for i in range(5)]
not_concepts = [pd.DataFrame({'analysis_id': achilles_schema.ANALYSIS_ID, 
                              'bad_{:d}'.format(i+1): x}) for i,x in enumerate(not_concepts)]


In [21]:
# analysis_id > 2000000 are Achilles statistics (irrelevant)
q = tbl_results.select().where(tbl_results.c.analysis_id < 2000000)
df_result = pd.read_sql(q, engine)

In [22]:
# determine stratum_{:d} values which are not concept_ids
for i in range(5):
    # determine bad concept based on definition in CSV
    df_result = pd.merge(df_result, not_concepts[i], how="left", on="analysis_id")
    
    # determine bad_concept based on value in results stratum
    s, b = "stratum_{:d}".format(i+1), "bad_{:d}".format(i+1)
    df_result[b] = df_result[b] | df_result[s].isna() | (df_result[s].str.len() == 0)

In [23]:
# Check that all non-numeric strings have been labelled as "non-concept_id".
def isdigit_or_none(x):
    return x.str.isdigit() | x.isna()

for i in range(1,6):
    assert (~isdigit_or_none(df_result["stratum_{:d}".format(i)]) & \
            ~df_result["bad_{:d}".format(i)]).sum() == 0, "Achilles results table has non-numeric " + \
           "codes that have *NOT* been removed by pre-processing (stratum_{:d}).".format(i)

### Merge to concept table where relevant

In [24]:
# Get a list of all numeric concept_ids from *ALL* strata
def is_concept_id(x): 
    return x is not None and x.isdigit()

all_concepts = np.hstack([df_result['stratum_{:d}'.format(i)][
    ~df0['bad_{:d}'.format(i)]].unique() for i in range(1,6)])
all_concepts = list(filter(is_concept_id, all_concepts))
all_concepts = [int(x) for x in all_concepts]
all_concepts = np.unique(np.sort(all_concepts))

NameError: name 'df0' is not defined

In [309]:
# Make sure not too large
len(all_concepts) >= 20000 and warn("all_concepts list is large: may run very slowly.");
assert len(all_concepts) <= 100000, "all_concepts list is over 100,000 concept IDs. The query is too large."

In [310]:
# Retrieve all the relevant concepts
val_concepts = valuesstmt.values(
    [
        sqlalchemy.column("concept_id", sqlalchemy.INTEGER),
    ],
    *[(x,) for x in all_concepts],
    alias_name="myconcepts",
)

tbl_conceptjoin = tbl_concept.join(val_concepts, 
                                   tbl_concept.c.concept_id == val_concepts.c.concept_id)

tc = tbl_concept.c  # shorthand
df_concept = pd.read_sql(select([tc.concept_id,
                                 tc.concept_name,
                                 tc.domain_id])\
                         .select_from(tbl_conceptjoin).\
                         where(
                                (tc.standard_concept=='S') and
                                (tc.invalid_reason == None)
                              ), 
                         engine)
del tc              # delete shorthand

In [311]:
assert df_concept.concept_id.nunique() == df_concept.shape[0], \
    "Retrieved concept table does not have a unique primary key."

df_concept.index = df_concept.concept_id
df_concept = df_concept.drop(columns="concept_id")

In [312]:
for i in range(1,6):
    c = "stratum_{:d}".format(i)
    is_ok = ~(df_result["bad_{:d}".format(i)])
    s = df_result[c][is_ok].astype(int)
    df = pd.DataFrame(s)
    decoded = pd.merge(df, df_concept, how="left", left_on=c, right_index=True)
    decoded.loc[decoded["concept_name"].isna(), "concept_name"] = "No matching concept"
    df_result.loc[is_ok, c] = decoded["concept_name"]
    
# drop non-concept flags
df_result = df_result.drop(columns=list(filter(lambda x: x[:3] == 'bad', df_result.columns)))

In [329]:
df_result.to_feather("./data/achilles_results.feather")

In [25]:
import os
import pandas as pd

In [26]:
# --------- DATA ------------------------------------------------------
dir_data = './data'
file_achilles_res = os.path.join(dir_data, 'achilles_results.feather')
assert os.path.isfile(file_achilles_res), f"File {file_achilles_res:s} not found."

df = pd.read_feather(file_achilles_res)

In [28]:
# __________________QUERY (TABLE 1)_____________________________________
# --------- ACHILLES GENERIC -------------------------------------------
def _extract_achilles(df, analysis_id, num_strata):
    strata = ['stratum_{:d}'.format(i+1) for i in range(num_strata)]
    return df.loc[df.analysis_id == analysis_id, [*strata, 'count_value']]

def _generate_achilles_simple(analysis_id, num_strata):
    def achilles_simple(df):
        qry = _extract_achilles(df, analysis_id, num_strata)
        for i in range(num_strata):
            qry[f'stratum_{i+1}'] = qry[f'stratum_{i+1}'].str.capitalize()
        return qry.reset_index(drop=True)
    return achilles_simple


# --------- ACHILLES 'QUERIES' -----------------------------------------
achilles_gender = _generate_achilles_simple(2, 1)
achilles_race = _generate_achilles_simple(4, 1)
achilles_ethnicity = _generate_achilles_simple(5, 1)
achilles_race_ethnicity = _generate_achilles_simple(12, 2)

# Persons by Age
# ==> Note will sometimes need to fill in zeros (o.w. missing) on-the-fly in Dash.
def achilles_age(df, aggregate_to=1):
    c_year = 2020 # Date at time of study, not date.today().strftime("%Y")
    qry = qry = _extract_achilles(df, 3, 1)
    qry['stratum_1'] = qry['stratum_1'].astype(int) 
    qry['stratum_1'] = 2020 - qry['stratum_1']
    return qry.reset_index(drop=True)


# --------- UTILS ------------------------------------------------------

def aggregate_age(df, age_column, step, label=False, agg_below=None, sort="asc"):
    df[age_column] = (df[age_column] // step) * step
    if agg_below is not None:
        lt_cutoff = df[age_column] < agg_below
        df.loc[lt_cutoff, age_column] = -1  # to allow sorting
    df = df.groupby(age_column, as_index=False).sum()
    df[age_column] = df[age_column].sort_values(ascending = sort[:3]=='asc')\
        .reset_index(drop=True)
    if label:
        if agg_below is not None:
            gt_cutoff = df[age_column] != -1
            df.loc[gt_cutoff, age_column] = df.loc[gt_cutoff, age_column].map(
                lambda x: "{:d}-{:d}".format(x, x+step-1))
            df[age_column] = df[age_column].astype(str)
            df.loc[~gt_cutoff, age_column] = f"<{agg_below}"
        else:
            df[age_column] = df[age_column].map(lambda x: '{:d}-{:d}'.format(x, x+step-1))
    return df

def concat_pct_col(df, val_column='count_value'):
    pct_col = df[val_column] / sum(df[val_column])
    df['%'] = pct_col.map(lambda x: '{:.1f}'.format(100*x))
    return df

def title_row(df, title):
    df = df.astype(str)
    trow = pd.DataFrame([[title, *['' for i in range(df.shape[1]-1)]]], columns=df.columns)
    return pd.concat((trow, df), axis=0, ignore_index=True)
    
# ______________________________________________________________________






In [7]:
import table1

In [8]:
table1.query(df)

Unnamed: 0,stratum_1,count_value,%
0,Age,,
1,50-59,16.0,1.7
2,60-69,42.0,4.6
3,70-79,162.0,17.7
4,80-89,413.0,45.1
5,90-99,247.0,27.0
6,100-109,36.0,3.9
7,Gender,,
8,Male,461.0,46.1
9,Female,539.0,53.9


In [30]:
age

Unnamed: 0,stratum_1,count_value,%
0,50-59,16,1.7
1,60-69,42,4.6
2,70-79,162,17.7
3,80-89,413,45.1
4,90-99,247,27.0
5,100-109,36,3.9


In [29]:
age = aggregate_age(achilles_age(df), 'stratum_1', 10, label=True, agg_below=40, sort="asc")
gender = achilles_gender(df)
race = achilles_race(df)
ethnicity = achilles_ethnicity(df)


pd.concat((
    title_row(concat_pct_col(age), 'Age'),
    title_row(concat_pct_col(gender), 'Gender'),
    title_row(concat_pct_col(race), 'Race'),
    title_row(concat_pct_col(ethnicity), 'Ethnicity'),
    ), axis=0
).reset_index(drop=True)

Unnamed: 0,stratum_1,count_value,%
0,Age,,
1,50-59,16.0,1.7
2,60-69,42.0,4.6
3,70-79,162.0,17.7
4,80-89,413.0,45.1
5,90-99,247.0,27.0
6,100-109,36.0,3.9
7,Gender,,
8,Male,461.0,46.1
9,Female,539.0,53.9


In [4]:
age

NameError: name 'age' is not defined

In [13]:
__name__

'__main__'

In [3]:
dash.Dash(__name__, external_stylesheets=['https://codepen.io/chriddyp/pen/bWLwgP.css'])

<dash.dash.Dash at 0x10d5e8550>

In [14]:
app = dash.Dash("__name__")

In [19]:
app.css

<dash.resources.Css at 0x1124052710>