In [1]:
import pandas as pd
import sqlite3 as sql
import os
import glob
import numpy as np
import shutil
import os

# Importing Claims JSON Files

In [2]:
#file directory for JSON files (Claims Extracts)
os.chdir(r"C:\Users\PRM\Documents\Projects\Python\DataPipeline\Data\Claims_Extracts")
#extracting file names for each json file through a loop
filenames = [i for i in glob.glob("*.json")]
#reading files into a dataframe as a list for each file
claims_df = [pd.read_json(file, ) 
      for file in filenames]
#extracting the date from the file name and setting it as a column called file_dt
for i in range(0,365):
    claims_df[i]['file_dt'] = filenames[i]
    
#note: it looks like the claim_dt is truncated and set to numerical. I was not sure if this date reflected the file date
#so I added the file_dt column for tracking purposes

In [3]:
#combining all claims files into one dataframe
claims_full_df = pd.concat(claims_df)
#resetting/removing the index
claims_full_df = claims_full_df.reset_index(drop=True) 
#removing the .json from the file date
claims_full_df['file_dt']= claims_full_df['file_dt'].str.replace('.json','',regex=False)
#formatting data type
claims_full_df['file_dt'] = pd.to_datetime(claims_full_df['file_dt'])

# CSV Import

In [4]:
#file directory for CSV
os.chdir(r"C:\Users\PRM\Documents\Projects\Python\DataPipeline\Data")
#extracting file names for each CSV file through a loop
csv_files = [i for i in glob.glob("*.csv")]
#reading files into a dataframe as a list for each file
csv_df = [pd.read_csv(file, ) 
      for file in csv_files]

#dropping the indexed column for both CSV's and distinguishing the dataframes into ccs_dim_df & dx_dim_df
ccs_dim_df = csv_df[0].drop(columns='Unnamed: 0')
dx_dim_df = csv_df[1].drop(columns='Unnamed: 0')

#note about the ccs_dim template: one code could have multiple descriptions

# Text Import

In [5]:
#file directory for text files
os.chdir(r"C:\Users\PRM\Documents\Projects\Python\DataPipeline\Data")
#extracting file names for each text file through a loop
txt_files = [i for i in glob.glob("*.txt")]
#reading files into a dataframe as a list for each file
txt_df = [pd.read_csv(file,sep='|' ) 
      for file in txt_files]

#distinguishing the dataframes into patient_dim_df & product_dim_df
patient_dim_df = txt_df[0]
product_dim_df = txt_df[1]

# Cleaning

In [6]:
#left joining the claims data with the patient data into a master dataframe
df = claims_full_df.merge(patient_dim_df, left_on='patient_id',right_on='pid', how='left')
#left joining the master dataframe and CCS data
df = df.merge(product_dim_df, left_on='member_id',right_on='MBR_IDN', how='left')
#left joining the master dataframe and product data
df = df.merge(ccs_dim_df, left_on='primary_icd10',right_on='ICD10_CD', how='left')
#left joining the master dataframe and cx data
df = df.merge(dx_dim_df, left_on='primary_icd10',right_on='ICD10_DIAG_CD', how='left')

#dropping duplicates from the master dataframe and dropping columns
m_df = df.drop_duplicates()
m_df = df.drop(columns=['ICD10_CD','ICD10_DIAG_CD','primary_icd10','pid','member_id'])

In [7]:
#creating a row number column to identify the smallest MBR_CTC_PDT_BAN_CD when there are 
#2 claim records (part of requirement)

m_df['RN'] = m_df.sort_values(['claim_id','MBR_CTC_PDT_BAN_CD'], ascending=[True,True]) \
             .groupby(['claim_id']) \
             .cumcount() + 1
             
#creating a column that identifies rows with more than 1 claim         
m_df['contract_count'] = m_df.groupby('claim_id')['claim_id'].transform('count')

#this is an example - please see the last 2 columns
m_df[m_df['claim_id']==407734]



#excluding If a patient has more than 1 contract then the smallest value for 
# MBR_CTC_PDT_BAN_CD should be used and this should be indicated through a newly created 
# column as a binary flag (1 = more than one contract; 0 = one contract)
#this excludes patients with more than 2 contracts prioritizing the smallest MBR_CTC_PDT_BAN_CD	

Unnamed: 0,claim_id,claim_dt,patient_id,file_dt,gender,patient_dob,MBR_IDN,MBR_CTC_PDT_BAN_CD,EXCHANGE_IND,MBR_SEG_CD,CCS_DESCR,ICD10_DIAG_DE,RN,contract_count
458469,407734,18871,A13273794,2021-09-01,f,1981-05-20,49737231,1,0,5512005,Diseases of the digestive system,"Gastrointestinal hemorrhage, unspecified",1,2
458470,407734,18871,A13273794,2021-09-01,f,1981-05-20,49737231,3,1,8843001,Diseases of the digestive system,"Gastrointestinal hemorrhage, unspecified",2,2


In [8]:
#m_df[m_df['contract_count']==2].groupby(['claim_id','CCS_DESCR']).size().reset_index().sort_values(by=0)

In [9]:
#filtering for RN = 1 for us to have a claim per row. in other words, claim_id is the granularity
m_df = m_df[m_df['RN']==1]

In [10]:
#this is a test to show the claim_id is the granularity
t = m_df.groupby(['claim_id']).size().reset_index().sort_values(by=0)
t[t[0]>1]

Unnamed: 0,claim_id,0


In [11]:
#updating the contract_count to show 1  for claims with more than 1 contract (this is part of the requirement)
m_df['contract_count'] = np.where(m_df['contract_count'] == 2, 1, 0)
#dropping the row number column as it is no longer needed
m_df = m_df.drop(columns=['RN'])
#datatype formatting
m_df['patient_dob'] = pd.to_datetime(m_df['patient_dob'])

In [12]:
#dataframe sample
m_df.head()

Unnamed: 0,claim_id,claim_dt,patient_id,file_dt,gender,patient_dob,MBR_IDN,MBR_CTC_PDT_BAN_CD,EXCHANGE_IND,MBR_SEG_CD,CCS_DESCR,ICD10_DIAG_DE,contract_count
0,5652977,18628,B49130628,2021-01-01,m,1988-05-10,826031940,2,0,5512009,Symptoms; signs; and ill-defined conditions an...,Flexural eczema,0
1,1336619,18628,A43805793,2021-01-01,f,1976-05-03,39750834,1,0,5512004,Diseases of the skin and subcutaneous tissue,Other specified follicular disorders,0
2,8144284,18628,A55835152,2021-01-01,m,1975-04-29,25153855,1,0,5512003,Endocrine; nutritional; and metabolic diseases...,Oth diabetes mellitus with diabetic chronic ki...,0
3,1798579,18628,B48711425,2021-01-01,m,1980-06-22,524117840,2,0,5512006,Diseases of the musculoskeletal system and con...,Other specified soft tissue disorders,0
4,1862995,18628,A25577626,2021-01-01,f,1991-06-22,62677552,1,0,5512007,Infectious and parasitic diseases,Encntr screen for infections w sexl mode of tr...,0


# Transfer Claims Files from Inbound (Source) to Archived (Target) folder

In [13]:
#since this is designed for reproducability, this is to move the claims data into an archived folder

#source and destination folder
source_dir = r'C:\Users\PRM\Documents\Projects\Python\DataPipeline\Data\Claims_Extracts'
target_dir = r'C:\Users\PRM\Documents\Projects\Python\DataPipeline\Archive'
    
file_names = os.listdir(source_dir)
#transfers the files over to the archived folder
for file_name in file_names:
    shutil.move(os.path.join(source_dir, file_name), target_dir)

# Data Analysis

In [14]:
#this is to identify the column lengths for our SQL import. Last 2 fields will be a varchar(100)
print(m_df['patient_id'].str.len().max())
print(m_df['gender'].str.len().max())
print(m_df['ICD10_DIAG_DE'].str.len().max())
print(m_df['CCS_DESCR'].str.len().max())

9
1
60
81.0


In [15]:
#identifying nulls
m_df.isnull().sum(axis = 0)

claim_id                  0
claim_dt                  0
patient_id                0
file_dt                   0
gender                    0
patient_dob               0
MBR_IDN                   0
MBR_CTC_PDT_BAN_CD        0
EXCHANGE_IND              0
MBR_SEG_CD                0
CCS_DESCR             12214
ICD10_DIAG_DE             0
contract_count            0
dtype: int64

In [16]:
#identifying data types
m_df.dtypes

claim_id                       int64
claim_dt                       int64
patient_id                    object
file_dt               datetime64[ns]
gender                        object
patient_dob           datetime64[ns]
MBR_IDN                        int64
MBR_CTC_PDT_BAN_CD             int64
EXCHANGE_IND                   int64
MBR_SEG_CD                     int64
CCS_DESCR                     object
ICD10_DIAG_DE                 object
contract_count                 int32
dtype: object

In [17]:
#row and column count
m_df.shape

(688115, 13)

# Data Load

In [39]:
#connecting to the claims database
conn = sql.connect(r'C:\Users\PRM\Documents\Projects\Python\DataPipeline\CLAIMS.db', uri = True)
cur = conn.cursor()

In [40]:
#cur.execute('drop table if exists CLAIMS')

In [41]:
#creating the table 'CLAIMS' and labeling fields and datatypes

cur.execute(
  '''
 CREATE TABLE if not exists CLAIMS
  (
    claim_id bigint primary key,
    claim_dt VARCHAR(50),
    patient_id VARCHAR(100),
    file_dt date,
    gender VARCHAR(10),
    patient_dob date,
    MBR_IDN int,
    MBR_CTC_PDT_BAN_CD int,
    EXCHANGE_IND int,
    MBR_SEG_CD int,
    CCS_DESCR VARCHAR(100),
    ICD10_DIAG_DE VARCHAR(100),
    contract_count int
    )
  '''
  )

<sqlite3.Cursor at 0x33e47020>

In [42]:
#dataframe m_df will feed the table
m_df.to_sql('CLAIMS', conn, if_exists = 'append', index = False)

In [43]:
#executes a simple top 5 query
cur.execute("SELECT * FROM CLAIMS limit 5")
print(cur. fetchall())

[(5652977, '18628', 'B49130628', '2021-01-01 00:00:00', 'm', '1988-05-10 00:00:00', 826031940, 2, 0, 5512009, 'Symptoms; signs; and ill-defined conditions and factors influencing health status', 'Flexural eczema', 0), (1336619, '18628', 'A43805793', '2021-01-01 00:00:00', 'f', '1976-05-03 00:00:00', 39750834, 1, 0, 5512004, 'Diseases of the skin and subcutaneous tissue', 'Other specified follicular disorders', 0), (8144284, '18628', 'A55835152', '2021-01-01 00:00:00', 'm', '1975-04-29 00:00:00', 25153855, 1, 0, 5512003, 'Endocrine; nutritional; and metabolic diseases and immunity disorders', 'Oth diabetes mellitus with diabetic chronic kidney disease', 0), (1798579, '18628', 'B48711425', '2021-01-01 00:00:00', 'm', '1980-06-22 00:00:00', 524117840, 2, 0, 5512006, 'Diseases of the musculoskeletal system and connective tissue', 'Other specified soft tissue disorders', 0), (1862995, '18628', 'A25577626', '2021-01-01 00:00:00', 'f', '1991-06-22 00:00:00', 62677552, 1, 0, 5512007, 'Infectio

In [44]:
#row count for testing
cur.execute("SELECT count(*) FROM CLAIMS")
print(cur. fetchall())

[(688115,)]


In [45]:
conn.commit()
conn.close()