In [1]:
import sys
import json
import pickle
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas, pd_writer
import getpass as gt
import pandas as pd
import numpy as np
import warnings

from snowflake_conn import *

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
warnings.simplefilter("ignore")

In [2]:
lds = pd.read_csv("data/simplified_lds.csv")
lds.groupby(['Data','File']).size()

Data         File          
Beneficiary  Demo               35
Carrier      Base Claim         19
             Claim Line         13
DME          Base Claim         18
             Claim Line         13
Home Health  Base Claim         41
             Revenue Center     10
Hospice      Base Claim         40
             Revenue Center     10
Inpatient    Base Claim        118
             Revenue Center      9
Outpatient   Base Claim         93
             Revenue Center     10
SNF          Base Claim         95
             Revenue Center      9
dtype: int64

In [3]:
fd = {
    'carrier_base_claim':['Carrier','Base Claim'],
    'carrier_claim_line':['Carrier','Claim Line'],
    'dme_base_claim':['DME','Base Claim'],
    'dme_claim_line':['DME','Claim Line'],
    'hha_base_claim':['Home Health','Base Claim'],
    'hha_revenue_center':['Home Health','Revenue Center'],
    'hospice_base_claim':['Hospice','Base Claim'],
    'hospice_revenue_center':['Hospice','Revenue Center'],
    'inpatient_base_claim':['Inpatient','Base Claim'],
    'inpatient_revenue_center':['Inpatient','Revenue Center'],
    'master_beneficiary_summary':['Beneficiary','Demo'],
    'outpatient_base_claim':['Outpatient','Base Claim'],
    'outpatient_revenue_center':['Outpatient','Revenue Center'],
    'snf_base_claim':['SNF','Base Claim'],
    'snf_revenue_center':['SNF','Revenue Center'],
}

In [4]:
def change_dtypes(base_name, fd, df, lds):
    data_type, file_type = fd[base_name][0], fd[base_name][1]
    subset = lds.loc[(lds.Data == data_type) & (lds.File == file_type),:]

    d = dict(zip(subset.Name, subset.Type))

    for c in df.columns.tolist():
        orig_type = df[c].dtype
        lds_type = d[c]

        if lds_type == 'CHAR':
            df[c] = df[c].astype(str).replace({'nan':None})
        elif lds_type == 'DATE':
            df[c] = pd.to_datetime(df[c],dayfirst=True,format='mixed').astype(str).str.replace('-','').replace({'NaT':None})
        else:
            df[c] = df[c].astype(float).replace({'nan':None})

        new_type = df[c].dtype
    return df

def check_dfs(sql, base_name):
    df = read_sql(sql)
    tmp = pd.read_csv(f"data/{base_name}.csv",low_memory=False)
    tmp = change_dtypes(base_name, fd, tmp, lds)
    tmp.columns = [c.lower() for c in tmp.columns]
    print(df.shape, tmp.shape)

    # check if the two df are the same
    for c in df.columns:
        if (df[c].values != tmp[c].values).sum() > 0:
            idx1 = df.loc[df[c].notna(),[c]].values
            idx2 = tmp.loc[tmp[c].notna(),[c]].values
            if (idx1 != idx2).sum() > 0:
                print(c, '-- real errors')
            else:
                print(c, "-- nan errors only")

In [5]:
conn = get_connection()

In [None]:
# # medical claim
# sql = '''SELECT COUNT(DISTINCT(patient_id)), COUNT(DISTINCT(claim_id)), COUNT(DISTINCT(claim_start_date)) FROM SANDBOX_FFS.TUVA_CLAIMS.MEDICAL_CLAIM'''
# print(read_sql(sql))

# sql = '''SELECT * FROM SANDBOX_FFS.TUVA_CLAIMS.MEDICAL_CLAIM LIMIT 1000;'''
# tmp = read_sql(sql)
# tmp.head(1)

# # no dates...

### Carrier
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/carrier_claim.sql

Check if dfs are equivalent (see if there's an error reading them in)

In [None]:
# base claim
sql = '''SELECT * FROM SANDBOX_FFS.LDS.CARRIER_BASE_CLAIM;'''
base_name = 'carrier_base_claim'
check_dfs(sql, base_name)

# equivalent

In [None]:
# claim line
sql = '''SELECT * FROM SANDBOX_FFS.LDS.CARRIER_CLAIM_LINE;'''
base_name = 'carrier_claim_line'
check_dfs(sql, base_name)

# equivalent

Tuva

In [None]:
# tuva
sql = '''SELECT COUNT(DISTINCT(patient_id)), COUNT(DISTINCT(claim_id)), COUNT(DISTINCT(claim_start_date)) FROM SANDBOX_FFS.TUVA_CLAIMS.CARRIER_CLAIM'''
print(read_sql(sql))

sql = '''SELECT * FROM SANDBOX_FFS.TUVA_CLAIMS.CARRIER_CLAIM LIMIT 1;'''
carrier = read_sql(sql)
print(carrier.shape)
carrier.head(1)

### DME
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/dme_claim.sql

In [6]:
# base claim
sql = '''SELECT * FROM SANDBOX_FFS.LDS.DME_BASE_CLAIM;'''
base_name = 'dme_base_claim'
b = read_sql(sql)

check_dfs(sql, base_name)

# equivalent

(103828, 18) (103828, 18)


In [7]:
# claim line
sql = '''SELECT * FROM SANDBOX_FFS.LDS.DME_CLAIM_LINE;'''
base_name = 'dme_claim_line'
l = read_sql(sql)

check_dfs(sql, base_name)

# equivalent

(103828, 13) (103828, 13)


In [8]:
# tuva
sql = '''SELECT COUNT(DISTINCT(patient_id)), COUNT(DISTINCT(claim_id)), COUNT(DISTINCT(claim_end_date)) FROM SANDBOX_FFS.TUVA_CLAIMS.DME_CLAIM'''
print(read_sql(sql))

sql = '''SELECT * FROM SANDBOX_FFS.TUVA_CLAIMS.DME_CLAIM;'''
dme_tuva = read_sql(sql)
print(dme_tuva.shape)
dme_tuva.head(1)

   count(distinct(patient_id))  count(distinct(claim_id))  \
0                         5576                      37782   

   count(distinct(claim_end_date))  
0                             2926  
(407576, 137)


Unnamed: 0,claim_id,claim_line_number,claim_type,patient_id,member_id,claim_start_date,claim_end_date,claim_line_start_date,claim_line_end_date,admission_date,discharge_date,admit_source_code,admit_type_code,discharge_disposition_code,place_of_service_code,bill_type_code,ms_drg_code,apr_drg_code,revenue_center_code,service_unit_quantity,hcpcs_code,hcpcs_modifier_1,hcpcs_modifier_2,hcpcs_modifier_3,hcpcs_modifier_4,hcpcs_modifier_5,rendering_npi,billing_npi,facility_npi,paid_date,paid_amount,total_cost_amount,allowed_amount,charge_amount,diagnosis_code_type,diagnosis_code_1,diagnosis_code_2,diagnosis_code_3,diagnosis_code_4,diagnosis_code_5,diagnosis_code_6,diagnosis_code_7,diagnosis_code_8,diagnosis_code_9,diagnosis_code_10,diagnosis_code_11,diagnosis_code_12,diagnosis_code_13,diagnosis_code_14,diagnosis_code_15,diagnosis_code_16,diagnosis_code_17,diagnosis_code_18,diagnosis_code_19,diagnosis_code_20,diagnosis_code_21,diagnosis_code_22,diagnosis_code_23,diagnosis_code_24,diagnosis_code_25,diagnosis_poa_1,diagnosis_poa_2,diagnosis_poa_3,diagnosis_poa_4,diagnosis_poa_5,diagnosis_poa_6,diagnosis_poa_7,diagnosis_poa_8,diagnosis_poa_9,diagnosis_poa_10,diagnosis_poa_11,diagnosis_poa_12,diagnosis_poa_13,diagnosis_poa_14,diagnosis_poa_15,diagnosis_poa_16,diagnosis_poa_17,diagnosis_poa_18,diagnosis_poa_19,diagnosis_poa_20,diagnosis_poa_21,diagnosis_poa_22,diagnosis_poa_23,diagnosis_poa_24,diagnosis_poa_25,procedure_code_type,procedure_code_1,procedure_code_2,procedure_code_3,procedure_code_4,procedure_code_5,procedure_code_6,procedure_code_7,procedure_code_8,procedure_code_9,procedure_code_10,procedure_code_11,procedure_code_12,procedure_code_13,procedure_code_14,procedure_code_15,procedure_code_16,procedure_code_17,procedure_code_18,procedure_code_19,procedure_code_20,procedure_code_21,procedure_code_22,procedure_code_23,procedure_code_24,procedure_code_25,procedure_date_1,procedure_date_2,procedure_date_3,procedure_date_4,procedure_date_5,procedure_date_6,procedure_date_7,procedure_date_8,procedure_date_9,procedure_date_10,procedure_date_11,procedure_date_12,procedure_date_13,procedure_date_14,procedure_date_15,procedure_date_16,procedure_date_17,procedure_date_18,procedure_date_19,procedure_date_20,procedure_date_21,procedure_date_22,procedure_date_23,procedure_date_24,procedure_date_25,data_source
0,-10000930037927201582,1,professional,-10000010254618,-10000010254618,,2015-03-25,,2015-03-25,,,,,,31,,,,,4,K0002,,,,,,,1336780048,,,0.0,1.0,,0.0,icd-10-cm,S134XX,R4689,E781,J329,E119,D649,E849,B965,N469,E849,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,medicare_lds


Process

In [9]:
# first filter
b = b.loc[~(b['carr_clm_pmt_dnl_cd'] == '0'),:]
b.shape

(103828, 18)

In [10]:
### base claim ###
# claim id: tuva has this format -- not sure why: -1000093003792725-M82
b['clm_thru_dt_year'] = b['clm_thru_dt'].apply(lambda x: str(x)[:4])
b['claim_id'] = b['claim_no'].astype(int).astype(str) + b['clm_thru_dt_year'].astype(str) + b['nch_clm_type_cd'].astype(str)

b['patient_id'] = b['desy_sort_key'].astype(str)
b['member_id'] = b['patient_id']
b['claim_end_date'] = pd.to_datetime(b['clm_thru_dt'])

b['diagnosis_code_1'] = b['prncpal_dgns_cd'].astype(str)
b['diagnosis_code_2'] = b['icd_dgns_cd2'].astype(str)
b['diagnosis_code_3'] = b['icd_dgns_cd3'].astype(str)
b['diagnosis_code_4'] = b['icd_dgns_cd4'].astype(str)
b['diagnosis_code_5'] = b['icd_dgns_cd5'].astype(str)
b['diagnosis_code_6'] = b['icd_dgns_cd6'].astype(str)
b['diagnosis_code_7'] = b['icd_dgns_cd7'].astype(str)
b['diagnosis_code_8'] = b['icd_dgns_cd8'].astype(str)
b['diagnosis_code_9'] = b['icd_dgns_cd9'].astype(str)
b['diagnosis_code_10'] = b['icd_dgns_cd10'].astype(str)
b['diagnosis_code_11'] = b['icd_dgns_cd11'].astype(str)
b['diagnosis_code_12'] = b['icd_dgns_cd12'].astype(str)

b.shape

(103828, 35)

In [11]:
### claim line ###
l['claim_line_end_date'] = pd.to_datetime(l['clm_thru_dt'])

l['place_of_service_code'] = l['line_place_of_srvc_cd'].astype(str)
l['hcpcs_code'] = l['hcpcs_cd'].astype(str)
l['hcpcs_modifier_1'] = l['hcpcs_1st_mdfr_cd'].astype(str)
l['hcpcs_modifier_2'] = l['hcpcs_2nd_mdfr_cd'].astype(str)
l['billing_npi'] = l['prvdr_npi'].astype(str)

l['claim_line_number'] = l['clm_line_num'].astype(int)
l['service_unit_quantity'] = l['line_srvc_cnt'].apply(lambda x: int(str(x)[0]))
l['paid_amount'] = l['line_nch_pmt_amt'].astype(float)
l['total_cost_amount'] = l['line_nch_pmt_amt'].astype(float) + l['line_bene_ptb_ddctbl_amt'].astype(float) + l['line_bene_prmry_pyr_pd_amt'].astype(float)
l['charge_amount'] = l['line_alowd_chrg_amt'].astype(float)
l.shape

(103828, 24)

In [12]:
### merge ###
dme = b.merge(l, on = ['claim_no'], how = 'inner')

dme['claim_type'] = 'professional'
dme['diagnosis_code_type'] = 'icd-10-cm'
dme['data_source'] = 'medicare_lds'
dme.shape

(407576, 61)

In [13]:
### add nulls ###
for c in [
    'claim_start_date','admission_date','discharge_date','paid_date','claim_line_start_date',
     'procedure_date_1', 'procedure_date_2', 'procedure_date_3', 'procedure_date_4', 'procedure_date_5', 
     'procedure_date_6', 'procedure_date_7', 'procedure_date_8', 'procedure_date_9', 'procedure_date_10', 
     'procedure_date_11', 'procedure_date_12', 'procedure_date_13', 'procedure_date_14', 'procedure_date_15', 
     'procedure_date_16', 'procedure_date_17', 'procedure_date_18', 'procedure_date_19', 'procedure_date_20', 
     'procedure_date_21', 'procedure_date_22', 'procedure_date_23', 'procedure_date_24', 'procedure_date_25'
]:
    dme[c] = pd.to_datetime(None)

for c in [
    'admit_source_code','admit_type_code','discharge_disposition_code',
    'bill_type_code','ms_drg_code','apr_drg_code','revenue_center_code',
    'hcpcs_modifier_3','hcpcs_modifier_4','hcpcs_modifier_5',
    'rendering_npi','facility_npi',
    'diagnosis_code_13', 'diagnosis_code_14', 'diagnosis_code_15', 'diagnosis_code_16', 
    'diagnosis_code_17', 'diagnosis_code_18', 'diagnosis_code_19', 'diagnosis_code_20', 
    'diagnosis_code_21', 'diagnosis_code_22', 'diagnosis_code_23', 'diagnosis_code_24', 
    'diagnosis_code_25', 'diagnosis_poa_1', 'diagnosis_poa_2', 'diagnosis_poa_3', 
    'diagnosis_poa_4', 'diagnosis_poa_5', 'diagnosis_poa_6', 'diagnosis_poa_7', 
    'diagnosis_poa_8', 'diagnosis_poa_9', 'diagnosis_poa_10', 'diagnosis_poa_11', 
    'diagnosis_poa_12', 'diagnosis_poa_13', 'diagnosis_poa_14', 'diagnosis_poa_15', 
    'diagnosis_poa_16', 'diagnosis_poa_17', 'diagnosis_poa_18', 'diagnosis_poa_19', 
    'diagnosis_poa_20', 'diagnosis_poa_21', 'diagnosis_poa_22', 'diagnosis_poa_23', 
    'diagnosis_poa_24', 'diagnosis_poa_25', 'procedure_code_type', 'procedure_code_1', 
    'procedure_code_2', 'procedure_code_3', 'procedure_code_4', 'procedure_code_5', 
    'procedure_code_6', 'procedure_code_7', 'procedure_code_8', 'procedure_code_9', 
    'procedure_code_10', 'procedure_code_11', 'procedure_code_12', 'procedure_code_13', 
    'procedure_code_14', 'procedure_code_15', 'procedure_code_16', 'procedure_code_17', 
    'procedure_code_18', 'procedure_code_19', 'procedure_code_20', 'procedure_code_21', 
    'procedure_code_22', 'procedure_code_23', 'procedure_code_24', 'procedure_code_25',
]:
    dme[c] = ''


dme['allowed_amount'] = np.nan

In [14]:
### only keep the relevant cols ###
dme = dme.loc[:,[c for c in dme_tuva.columns.tolist()]].reset_index(drop=True)
print(dme.shape, dme_tuva.shape)
dme.head(1)

(407576, 137) (407576, 137)


Unnamed: 0,claim_id,claim_line_number,claim_type,patient_id,member_id,claim_start_date,claim_end_date,claim_line_start_date,claim_line_end_date,admission_date,discharge_date,admit_source_code,admit_type_code,discharge_disposition_code,place_of_service_code,bill_type_code,ms_drg_code,apr_drg_code,revenue_center_code,service_unit_quantity,hcpcs_code,hcpcs_modifier_1,hcpcs_modifier_2,hcpcs_modifier_3,hcpcs_modifier_4,hcpcs_modifier_5,rendering_npi,billing_npi,facility_npi,paid_date,paid_amount,total_cost_amount,allowed_amount,charge_amount,diagnosis_code_type,diagnosis_code_1,diagnosis_code_2,diagnosis_code_3,diagnosis_code_4,diagnosis_code_5,diagnosis_code_6,diagnosis_code_7,diagnosis_code_8,diagnosis_code_9,diagnosis_code_10,diagnosis_code_11,diagnosis_code_12,diagnosis_code_13,diagnosis_code_14,diagnosis_code_15,diagnosis_code_16,diagnosis_code_17,diagnosis_code_18,diagnosis_code_19,diagnosis_code_20,diagnosis_code_21,diagnosis_code_22,diagnosis_code_23,diagnosis_code_24,diagnosis_code_25,diagnosis_poa_1,diagnosis_poa_2,diagnosis_poa_3,diagnosis_poa_4,diagnosis_poa_5,diagnosis_poa_6,diagnosis_poa_7,diagnosis_poa_8,diagnosis_poa_9,diagnosis_poa_10,diagnosis_poa_11,diagnosis_poa_12,diagnosis_poa_13,diagnosis_poa_14,diagnosis_poa_15,diagnosis_poa_16,diagnosis_poa_17,diagnosis_poa_18,diagnosis_poa_19,diagnosis_poa_20,diagnosis_poa_21,diagnosis_poa_22,diagnosis_poa_23,diagnosis_poa_24,diagnosis_poa_25,procedure_code_type,procedure_code_1,procedure_code_2,procedure_code_3,procedure_code_4,procedure_code_5,procedure_code_6,procedure_code_7,procedure_code_8,procedure_code_9,procedure_code_10,procedure_code_11,procedure_code_12,procedure_code_13,procedure_code_14,procedure_code_15,procedure_code_16,procedure_code_17,procedure_code_18,procedure_code_19,procedure_code_20,procedure_code_21,procedure_code_22,procedure_code_23,procedure_code_24,procedure_code_25,procedure_date_1,procedure_date_2,procedure_date_3,procedure_date_4,procedure_date_5,procedure_date_6,procedure_date_7,procedure_date_8,procedure_date_9,procedure_date_10,procedure_date_11,procedure_date_12,procedure_date_13,procedure_date_14,procedure_date_15,procedure_date_16,procedure_date_17,procedure_date_18,procedure_date_19,procedure_date_20,procedure_date_21,procedure_date_22,procedure_date_23,procedure_date_24,procedure_date_25,data_source
0,-10000930037927201582,1,professional,-10000010254618.0,-10000010254618.0,,2015-03-25,,2015-03-25,,,,,,31,,,,,4,K0002,,,,,,,1336780048,,,0.0,1.0,,0.0,icd-10-cm,S134XX,R4689,E781,J329,E119,D649,E849,B965,N469,E849,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,medicare_lds


#### Problems
- Dates: `claim_end_date`, `claim_line_end_date`
    - Python: '2015-03-25T00:00:00.000000000' (I made it a datetime)
    - SQL: None
    - SQL code: `sql code: {{ try_to_cast_date('b.clm_thru_dt', 'YYYYMMDD') }} as claim_end_date`

In [26]:
cols = [c for c in dme.columns if 'diagnosis_code' in c]
m = dme[['claim_id']+cols].merge(dme_tuva[['claim_id']+cols], on='claim_id', how='outer', indicator=True)
m._merge.value_counts()

_merge
both          9954308
left_only           0
right_only          0
Name: count, dtype: int64

In [35]:
for c in cols:
    v = m.loc[(m[f'{c}_x'].notna()) & (m[f'{c}_y'].notna()),:]
    if (v[f'{c}_x'].values != v[f'{c}_y'].values).sum() > 0:
        print(c)

### Eligibility (Intermediate)
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/eligibility_unpivot.sql

In [None]:
sql = '''SELECT * FROM SANDBOX_FFS.LDS.MASTER_BENEFICIARY_SUMMARY;'''
base_name = 'master_beneficiary_summary'
check_dfs(sql, base_name)

# equivalent

### Home Health
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/home_health_claim.sql

In [None]:
# base claim
sql = '''SELECT * FROM SANDBOX_FFS.LDS.HHA_BASE_CLAIM;'''
base_name = 'hha_base_claim'
check_dfs(sql, base_name)

# equivalent

In [None]:
# revenue center
sql = '''SELECT * FROM SANDBOX_FFS.LDS.HHA_REVENUE_CENTER;'''
base_name = 'hha_revenue_center'
check_dfs(sql, base_name)

# equivalent

### Hospice
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/hospice_claim.sql

In [None]:
# base claim
sql = '''SELECT * FROM SANDBOX_FFS.LDS.HOSPICE_BASE_CLAIM;'''
base_name = 'hospice_base_claim'
check_dfs(sql, base_name)

# equivalent

In [None]:
# revenue center
sql = '''SELECT * FROM SANDBOX_FFS.LDS.HOSPICE_REVENUE_CENTER;'''
base_name = 'hospice_revenue_center'
check_dfs(sql, base_name)

### Inpatient
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/inpatient_claim.sql

In [None]:
# base claim
sql = '''SELECT * FROM SANDBOX_FFS.LDS.INPATIENT_BASE_CLAIM;'''
base_name = 'inpatient_base_claim'
check_dfs(sql, base_name)

# equivalent

In [None]:
# revenue center
sql = '''SELECT * FROM SANDBOX_FFS.LDS.INPATIENT_REVENUE_CENTER;'''
base_name = 'inpatient_revenue_center'
check_dfs(sql, base_name)

# equivalent

### Outpatient
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/outpatient_claim.sql

In [None]:
# base claim
sql = '''SELECT * FROM SANDBOX_FFS.LDS.OUTPATIENT_BASE_CLAIM;'''
base_name = 'outpatient_base_claim'
check_dfs(sql, base_name)

# equivalent

In [None]:
# revenue center
sql = '''SELECT * FROM SANDBOX_FFS.LDS.OUTPATIENT_REVENUE_CENTER;'''
base_name = 'outpatient_revenue_center'
check_dfs(sql, base_name)

# equivalent

### SNF
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/intermediate/snf_claim.sql

In [None]:
# base claim
sql = '''SELECT * FROM SANDBOX_FFS.LDS.SNF_BASE_CLAIM;'''
base_name = 'snf_base_claim'
check_dfs(sql, base_name)

# equivalent

In [None]:
# revenue center
sql = '''SELECT * FROM SANDBOX_FFS.LDS.SNF_REVENUE_CENTER;'''
base_name = 'snf_revenue_center'
check_dfs(sql, base_name)

# equivalent

### Medical Claim
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/final/medical_claim.sql

### Eligibility (Final)
https://github.com/manganese-ai/medicare_lds_connector/blob/main/models/final/eligibility.sql