# Loading Medicare and Medicaid Claims data into i2b2

[CMS RIF][] docs

focus is currently on carrier claims

(demographics was done in Oracle PL/SQL)

so far, we can get data in chunks, map patients and encounters, pivot diagnoses, and insert the result into an observation_fact table (which is missing some constraints).

[CMS RIF]: https://www.resdac.org/cms-data/file-availability#research-identifiable-files

## Python Data Science Tools

especially [pandas](http://pandas.pydata.org/pandas-docs/)

In [None]:
import pandas as pd
import numpy as np
import sqlalchemy as sqla
dict(pandas=pd.__version__, numpy=np.__version__, sqlalchemy=sqla.__version__)

## DB Access: Luigi Config, Logging

[luigi docs](https://luigi.readthedocs.io/en/stable/)

In [None]:
# Passwords are expected to be in the environment.
# Prompt if it's not already there.
    
def _fix_password():
    from os import environ
    import getpass
    keyname = getpass.getuser().upper() + '_SGROUSE'
    if keyname not in environ:
        environ[keyname] = getpass.getpass()
_fix_password()

In [None]:
import luigi


def _reset_config(path):
    '''Reach into luigi guts and reset the config.
    
    Don't ask.'''
    cls = luigi.configuration.LuigiConfigParser
    cls._instance = None  # KLUDGE
    cls._config_paths = [path]
    return cls.instance()

_reset_config('luigi-sgrouse.cfg')
luigi.configuration.LuigiConfigParser.instance()._config_paths

In [None]:
import cx_ora_fix

help(cx_ora_fix)

In [None]:
cx_ora_fix.patch_version()

import cx_Oracle as cx
dict(cx_Oracle=cx.__version__, version_for_sqlalchemy=cx.version)

In [None]:
import logging

concise = logging.Formatter(fmt='%(asctime)s %(levelname)s %(message)s',
                            datefmt='%02H:%02M:%02S')

def log_to_notebook(log,
                    formatter=concise):
    log.setLevel(logging.DEBUG)
    to_notebook = logging.StreamHandler()
    to_notebook.setFormatter(formatter)
    log.addHandler(to_notebook)
    return log

In [None]:
from cms_etl import CMSExtract

try:
    log.info('Already logging to notebook.')
except NameError:
    cms_rif_task = CMSExtract()
    log = log_to_notebook(cms_rif_task._log)

    log.info('We try to log non-trivial DB access.')

    with cms_rif_task.connection() as lc:
        lc.log.info('first bene_id')
        first_bene_id = pd.read_sql('select min(bene_id) bene_id_first from %s.%s' % (
            cms_rif_task.cms_rif, cms_rif_task.table_eg), lc._conn)

first_bene_id

## Carrier claims data: breaking work into groups by beneficiary

We break down work by ranges of `bene_id`:

In [None]:
from cms_etl import BeneIdSurvey
from cms_pd import CarrierClaimUpload

survey = BeneIdSurvey(source_table=CarrierClaimUpload.table_name)
survey.script.fname

In [None]:
bene_chunks = survey.results()
bene_chunks = pd.DataFrame(bene_chunks, columns=bene_chunks[0].keys()).set_index('chunk_num')
bene_chunks.head()

In [None]:
len(bene_chunks)

Now define a task for the first chunk of beneficiaries:

In [None]:
cc = CarrierClaimUpload(bene_id_first=bene_chunks.iloc[0].bene_id_first,
                        bene_id_last=bene_chunks.iloc[0].bene_id_last)
cc.account, cc.source.cms_rif, cc.project.star_schema

Within each group, we process the claims a few thousand at a time.

_`sqlalchemy` makes a rather verbose query to get the input.
Note that we log the execution plan as well._

In [None]:
with cc.connection() as lc:
    cclaims_in = next(cc.chunks(lc, chunk_size=2000))
cclaims_in.info()

In [None]:
cclaims_in.head()

## Column Info: Value Type, Level of Measurement

In [None]:
with cc.connection() as lc:
    bcarrier_db_cols = cc.column_data(lc)
bcarrier_db_cols.head(3).set_index('column_name')[['data_type']]

Assign i2b2 value types based on column info:

_See also: [levels of measurement][1]._

_Diagnosis columns are discussed below._

[1]: https://en.wikipedia.org/wiki/Level_of_measurement

In [None]:
bcarrier_cols = cc.column_properties(bcarrier_db_cols)
bcarrier_cols[~ bcarrier_cols.is_dx].sort_values('valtype_cd').set_index('column_name')

We did get them all, right?

In [None]:
bcarrier_cols[~ bcarrier_cols.is_dx &
              ~ bcarrier_cols.valtype_cd.isin(['n', 't', '@', 'd']) &
              ~ bcarrier_cols.column_name.isin(cc.ix_cols)]

## Observation Facts by Value Type

In [None]:
def rif_modifier(table_name):
    return 'CMS_RIF:' + table_name.upper()


max_grouped_col_exp = 2  # no more than 100 diagnoses, procedures, etc.

def pivot_valtype(data, table_name, col_info, key_cols, valtype,
                  spare_instance_digits=max_grouped_col_exp,
                  update_dt_col='nch_wkly_proc_dt'):
    ty_cols = col_info[col_info.valtype_cd == valtype]
    ty_data = data.reset_index()[key_cols + [n for n in ty_cols.column_name]].copy()
    ty_data['instance_num'] = ty_data.index * (10 ** spare_instance_digits)
    ty_data['modifier_cd'] = rif_modifier(table_name)
    obs = ty_data.melt(id_vars=key_cols + ['instance_num', 'modifier_cd'],
                       var_name='column').dropna(subset=['value'])
    obs['valtype_cd'] = valtype
    if valtype == '@':
        obs['concept_cd'] = obs.column.str.upper() + ':' + obs.value
    else:
        obs['concept_cd'] = obs.column.str.upper() + ':'
        if valtype == 'n':
            obs['nval_num'] = obs.value
        elif valtype == 't':
            obs['tval_char'] = obs.value
        elif valtype == 'd':
            obs['tval_char'] = obs.value  # ISSUE: format yyyy-mm-dd...
        else:
            raise TypeError

    if valtype == 'd':
        obs['start_date'] = obs['end_date'] = obs.value
    else:
        obs = obs.rename(
            columns=dict(clm_from_dt='start_date',
                         clm_thru_dt='end_date'))
    return obs.rename(columns={update_dt_col: 'update_date'})

### Nominal data (no value type: @)

In [None]:
obs_cd = pivot_valtype(cclaims_in, cc.table_name, bcarrier_cols[~ bcarrier_cols.is_dx], cc.ix_cols, '@')

(obs_cd.set_index(['bene_id', 'start_date', 'instance_num', 'modifier_cd'])
       .sort_index().head(15)[['valtype_cd', 'concept_cd']])

### Ordinal data (text: t)

In [None]:
obs_txt = pivot_valtype(cclaims_in, cc.table_name, bcarrier_cols[~ bcarrier_cols.is_dx], cc.ix_cols, 't')

obs_txt.set_index(['bene_id', 'start_date', 'concept_cd', 'instance_num', 'modifier_cd']
                  ).sort_index().head(10)[['valtype_cd', 'tval_char']]

### Interval data (date: d)

In [None]:
obs_dt = pivot_valtype(cclaims_in, cc.table_name, bcarrier_cols[~ bcarrier_cols.is_dx], cc.ix_cols, 'd')

obs_dt.set_index(['bene_id', 'concept_cd', 'instance_num', 'modifier_cd']
                  ).sort_index()[::20].head()[['valtype_cd', 'tval_char', 'start_date']]

### Ratio data (numeric: n)

In [None]:
obs_num = pivot_valtype(cclaims_in, cc.table_name, bcarrier_cols[~ bcarrier_cols.is_dx], cc.ix_cols, 'n')
obs_num.set_index(['bene_id', 'start_date', 'concept_cd', 'instance_num', 'modifier_cd']
                  ).sort_index().head(10)[['valtype_cd', 'nval_num']]

In [None]:
(obs_cd.append(obs_num).append(obs_txt).append(obs_dt)
 .set_index(['bene_id', 'instance_num', 'concept_cd'])  # , 'modifier_cd'
 .sort_index()
 .head(30)[
    ['start_date', 'valtype_cd', 'nval_num', 'tval_char', 'end_date', 'update_date']])

### Diagnoses: combining column groups

In [None]:
from typing import List

def col_groups(col_info: pd.DataFrame,
               suffixes: List[str]) -> pd.DataFrame:
    out = None
    for ix, suffix in enumerate(suffixes):
        cols = col_info[ix::len(suffixes)].reset_index()[['column_name']]
        if out is None:
            out = cols
        else:
            out = out.merge(cols, left_index=True, right_index=True)
    out.columns = ['column_name' + s for s in suffixes]
    return out

dx_cols = col_groups(bcarrier_cols[bcarrier_cols.is_dx], ['_cd', '_vrsn'])
dx_cols

In [None]:
'PRNCPAL_DGNS_CD'.lower()

In [None]:
from cms_pd import fmt_dx_code

max_grouped_col_exp = 2  # no more than 100 diagnoses, procedures, etc.

# cf. PRCORNet CDM
pdx_flags=pd.DataFrame([dict(primary='1', secondary='2')])


def obs_stack(data: pd.DataFrame,
              table_name, col_groups: pd.DataFrame, key_cols: List[str],
              out_cols: List[str],
              pdx='prncpal_dgns_cd') -> pd.DataFrame:
    out = None
    data = data.reset_index()
    for ix, group in col_groups.iterrows():
        x_data = data[key_cols + list(group.values)].copy()
        instance_num = x_data.index * (10 ** max_grouped_col_exp) + ix
        x_data = x_data.set_index(key_cols)
        # icd_dgns_cd11 -> icd_dgns_cd
        x_data.columns = out_cols
        x_data['instance_num'] = instance_num
        x_data['modifier_cd'] = pdx_flags.primary[0] if group.values[0] == pdx else rif_modifier(table_name)
        x_data = x_data.dropna(subset=[out_cols[0]])
        # x_data['ix'] = ix
        # x_data['column'] = group.values[0]
        if out is None:
            out = x_data
        else:
            out = out.append(x_data)
    return out

def dx_data(data: pd.DataFrame, table_name, col_info: pd.DataFrame, ix_cols: List[str],
            update_dt_col='nch_wkly_proc_dt') -> pd.DataFrame:
    """Combine diagnosis columns i2b2 style
    """
    dx_cols = col_groups(col_info[col_info.is_dx], ['_cd', '_vrsn'])
    df = obs_stack(data, table_name, dx_cols, ix_cols, ['dgns_cd', 'dgns_vrsn'])
    df['valtype_cd'] = '@'
    df['concept_cd'] = [fmt_dx_code(row.dgns_vrsn, row.dgns_cd)  # TODO: vectorize?
                             for _, row in df.iterrows()]
    return df.reset_index().rename(
            columns={update_dt_col: 'update_date',
                     'clm_from_dt': 'start_date',
                     'clm_thru_dt': 'end_date'})

obs_dx = dx_data(cclaims_in, cc.table_name, bcarrier_cols, cc.ix_cols)
obs_dx.set_index(['bene_id', 'start_date', 'instance_num', 'modifier_cd']).sort_index().head(15)

## Patient, Encounter Mapping

In [None]:
from typing import Tuple

from cms_pd import read_sql_step, log_plan, LoggedConnection

def encounter_mapping(lc: LoggedConnection,
                      bene_range: Tuple[int, int],
                      source='ccwdata.org',
                      debug_plan=True,
                      key_cols: str='(MEDPAR_ID)') -> pd.DataFrame:
    q = '''select medpar.medpar_id, medpar.bene_id, emap.encounter_num, medpar.admsn_dt, medpar.dschrg_dt
    from %(CMS_RIF)s.medpar_all medpar
    join %(I2B2STAR)s.encounter_mapping emap on emap.encounter_ide = medpar.medpar_id
    where medpar.bene_id between :bene_id_first and :bene_id_last
      and emap.patient_ide between :bene_id_first and :bene_id_last
      and emap.encounter_ide_source = :encounter_ide_source
    ''' % dict(I2B2STAR='DCONNOLLY', #@@self.project.star_schema
               CMS_RIF='CMS_DEID'  #self.source.cms_rif
              )

    params = dict(encounter_ide_source=source + key_cols,
                  bene_id_first=bene_range[0],
                  bene_id_last=bene_range[1])  # type: Params

    if debug_plan:
        log_plan(lc, event='patient_mapping', sql=q, params=params)
    
    return read_sql_step(q, lc, params=params)

with cc.connection() as lc:
    emap = encounter_mapping(lc, (obs_dx.bene_id.min(), obs_dx.bene_id.max()))
emap.head()

In [None]:
def fmt_patient_day(df: pd.DataFrame) -> pd.Series:
    return df.start_date.dt.strftime('%Y-%m-%d') + ' ' + df.bene_id

def pat_day_rollup(
    data: pd.DataFrame, # with bene_id, start_date, and optionally medpar_id
    medpar_mapping: pd.DataFrame):  # with medpar_id, encounter_num) -> pd.DataFrame:

    out = data.reset_index().copy()
    out['start_day'] = pd.to_datetime(out.start_date, unit='D')
    pat_day = out[['bene_id', 'start_day']].drop_duplicates()

    # TODO: assert(medpar_mapping is 1-1 from medpar_id to encounter_num)
    pat_enc = pat_day.merge(medpar_mapping, on='bene_id', how='left')

    pat_enc = pat_enc[(pat_enc.start_day >= pat_enc.admsn_dt) &
                      (pat_enc.start_day <= pat_enc.dschrg_dt)]
    pat_enc = pat_enc.set_index(['bene_id', 'start_day'])  # [['encounter_num', 'medpar_id']]
    pat_enc = pat_enc[~pat_enc.index.duplicated(keep='first')]
    out = out.merge(pat_enc, how='left', left_on=['bene_id', 'start_day'], right_index=True)
    assert len(out) == len(data)

    # @@TODO: replace hash with something portable between Oracle and python
    # Note medpar_mapping.sql ensures encounter_num > 0 when assigned to a medpar_id
    fallback = - fmt_patient_day(out).apply(hash).abs()
    out.encounter_num = out.encounter_num.fillna(fallback)

    return out

x = pat_day_rollup(obs_dx, emap)
(x[(x.encounter_num > 0) | (x.encounter_num % 400 == 0) ][::5]
  .reset_index().set_index(['bene_id', 'start_date', 'encounter_num']).sort_index()
  .head(15)[['medpar_id', 'start_day', 'admsn_dt', 'dschrg_dt', 'concept_cd']])

#x.head(20)
#x[~ x.encounter_num.isnull()].head(20)

In [None]:
'medpar_id' in obs_dx.columns.values

## map, finish, write WIP TODO

In [None]:
if 0:
    with cc.connection() as lc:
        obs_cd_mapped = cc.with_mapping(lc, obs_cd)
    obs_cd_mapped.head()

In [None]:
if 0:
    fact1 = cc.finish_facts(obs_cd_mapped, import_date=clock(), upload_id=100)
    fact1.head()

In [None]:
if 0:
    with cc.connection() as lc:
        mapped = cc.with_mapping(lc, dx_data)
    mapped.sort_values('start_date').head(15)

In [None]:
if 0:
    len(mapped)

In [None]:
if 0:
    obs_fact = cc.finish_facts(mapped, upload_id=100, import_date=clock())

    obs_fact.head()

In [None]:
if 0:
    with cc.connection() as lc:
        obs_fact.head(100).to_sql(name='observation_fact_100', con=lc._conn,
                       if_exists='append', index=False)

In [None]:
if 0:
    with cc.connection() as lc:
        for x in cc.obs_data(lc, 100, chunk_size=1000):
            break
    x.head()

In [None]:
if 0:
    cc.run()

## Outpatient Claims: Procedures


Here we deal with diagnoses as well as procedures.

In [None]:
from cms_pd import OutpatientClaimUpload

oc = OutpatientClaimUpload(bene_id_first=bene_chunks.iloc[0].bene_id_first,
                        bene_id_last=bene_chunks.iloc[0].bene_id_last)

In [None]:
with oc.connection() as lc:
    chunks = oc.chunks(lc, chunk_size=5000)
    while 1:
        oclaims_in = next(chunks)
        proc_qty = (~oclaims_in.icd_prcdr_cd1.isnull()).sum()
        print("@@found:", proc_qty)
        if proc_qty > 5:
            break
    #x = pd.read_sql('select * from cms_deid.OUTPATIENT_BASE_CLAIMS where rownum <= 100', lc._conn)
print(len(oclaims_in))
oclaims_in.head()

In [None]:
with oc.connection() as lc:
    ocol_info = oc.column_properties(oc.column_data(lc))
ocol_info[ocol_info.valtype_cd.isnull()]

In [None]:
'bene_id' in ocol_info.column_name.values

In [None]:
col_groups(ocol_info[ocol_info.is_px], ['_cd', '_vrsn', '_dt'])

In [None]:
oclaims_in[['icd_prcdr_cd1']].drop_duplicates()

In [None]:
# select px_code('9904', '9') from dual; -- ICD9:99.04
# select px_code('064', '9') from dual; -- ICD9:06.4
# select px_code('99321', 'HCPCS') from dual; -- CPT:99321
def fmt_px_code(prcdr_cd: str, prcdr_vrsn: str) -> str:
  return (('CPT:' + prcdr_cd) if prcdr_vrsn in ['CPT', 'HCPCS'] else 
          ('ICD9:' + prcdr_cd[:2] + '.' + prcdr_cd[2:]) if prcdr_vrsn == '9' else 
          ('ICD9' + prcdr_vrsn + ':' + prcdr_cd))

fmt_px_code('9904', '9'), fmt_px_code('064', '9'), fmt_px_code('99321', 'HCPCS')

In [None]:
def px_data(data: pd.DataFrame, table_name, col_info: pd.DataFrame, ix_cols: List[str]) -> pd.DataFrame:
    """Combine procedure columns i2b2 style
    """
    px_cols = col_groups(col_info[col_info.is_px], ['_cd', '_vrsn', '_dt'])
    px_data = obs_stack(data, table_name, px_cols, ix_cols, ['prcdr_cd', 'prcdr_vrsn', 'prcdr_dt'])
    px_data['valtype_cd'] = '@'
    px_data['concept_cd'] = [fmt_px_code(row.prcdr_cd, row.prcdr_vrsn)
                             for _, row in px_data.iterrows()]
    return px_data.rename(columns=dict(prcdr_dt='start_date'))

px_data(oclaims_in[~oclaims_in.icd_prcdr_cd1.isnull()], oc.table_name, ocol_info, oc.ix_cols)

In [None]:
ocol_info[~ ocol_info.is_px  & ~ ocol_info.is_dx].sort_values('valtype_cd')

This one is not a diagnosis code:

In [None]:
oclaims_in[['clm_mdcr_non_pmt_rsn_cd']].drop_duplicates()

In [None]:
oclaims_in[['clm_mdcl_rec']].drop_duplicates()

Clearly `at_physn_npi` is the one to use:

### TODO: Provider

In [None]:
oclaims_in.set_index('bene_id', 'clm_from_dt')[['at_physn_npi', 'op_physn_npi', 'ot_physn_npi']].describe()

### No provider for carrier_claims???

See [missing Carrier Claim Billing NPI Number #8](https://github.com/kumc-bmi/grouse/issues/8):

In [None]:
'carr_clm_blg_npi_num' in bcarrier_cols.columns.values

In [None]:
# bcarrier_claims:
provider = 'PRF_PHYSN_NPI'

## Reimport code into running notebook

In [None]:
#@@@
from etl_tasks import log_plan
from cms_etl import FromCMS, CMSExtract, BeneIdSurvey
from cms_pd import CarrierClaimUpload, OutpatientClaimUpload, dx_stack, fmt_dx_code

In [None]:
import importlib

import cms_pd
import cms_etl
import etl_tasks
importlib.reload(cms_pd);
importlib.reload(cms_etl);
importlib.reload(etl_tasks);