# **Data Engineering Project**
### Created by: Pablo Andres Esteban Vargas

In [1]:
import pandas as pd
import numpy as np
import random as rnd
import time as tm
import datetime as dtm
import sqlalchemy
from sqlalchemy import create_engine
from random import choice
from tabulate import tabulate
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
import re
import sqlite3
from sqlite3 import Error

## Schema1 functions

In [2]:
def create_initial_schema(claims):
    #Variables Definition
    providerType = ["inpatient", "snf", "home health", "hha", "null", "outpatient"]
    procedureCode = ['CAT THERAPY', 'INSULIN', 'PHYSICAL THERAPY', 'LIGHT THERAPY', 'INSULINE']
    diagnosis = ['DIABETES', 'LONELINESS', 'SADNESS', 'LACK OF EXERCISE', 'DBTS']
    stime = tm.mktime(tm.strptime("2010-1-1", '%Y-%m-%d'))
    etime = tm.mktime(tm.strptime("2020-12-31", '%Y-%m-%d'))
    #Creating providerType and claimId     
    cl_claim = [(rnd.choice(providerType), x) for x in range(0,claims)]
    
    #Creating Schema 1 providerType and claimId 
    cl_claim = [(x, y, np.random.randint(2,11) if (x == "home health" or x == "hha") else 1, \
            tm.strftime('%Y-%m-%d', tm.localtime(stime + rnd.random() * (etime - stime)))) for x, y in cl_claim]
    #Creating lineNumber and fromDate
    cl_claim = [(x, y, z, "4"+str(np.random.randint(10000,99999)) if (x == "outpatient" or x == "null") \
             else str((choice([i for i in range(0,9) if i not in [4]]))) + str(np.random.randint(10000,99999)), a) \
             for x, y, z, a in cl_claim]
    #Creating ccn
    cl_claim = [[(x, y, z, a, dtm.datetime.strptime(b, '%Y-%m-%d') + dtm.timedelta(days=z)) for z in range(1,z+1)] \
                for x, y, z, a, b in cl_claim]
    #Creating rows for lineNumber and fromDate  
    flatten_cl_claim = [val for sublist in cl_claim for val in sublist]
    #Creating list with Schema 1
    claim_generator = ((x, "claim_id_"+str(y), "line_number_"+str(z), a, rnd.choice(procedureCode), \
                        rnd.choice(diagnosis), b) for x,y,z,a,b in flatten_cl_claim)
    df = pd.DataFrame(claim_generator, columns=["providerType", "claimId", "lineNumber", "ccn", "procedureCode", \
                                        "diagnosis","fromDate"])
    df['new_id'] = df.index
    return df

In [3]:
def define_value_per_provider(df):
    conditions_provider_type = [
    (df['providerType'] == "inpatient"),
    (df['providerType'] == "snf"),
    (df['providerType'] == "home health"),
    (df['providerType'] == "hha"),
    (df['providerType'] == "null"),
    (df['providerType'] == "outpatient")
    ]
    values_provider_type = [70, 100, 0.2, 0.2, 40, 40]
    df['providerTypeBase'] = np.select(conditions_provider_type, values_provider_type)
    return df

In [4]:
def define_value_per_procedure(df):
    conditions_procedure_code = [
    (df['procedureCode'] == "CAT THERAPY"),
    (df['procedureCode'] == "INSULIN"),
    (df['procedureCode'] == "PHYSICAL THERAPY"),
    (df['procedureCode'] == "LIGHT THERAPY"),
    (df['procedureCode'] == "INSULINE")
    ]
    values_procedure_code = [40, 0.1, 35, 36, 0.1]
    df['valuesProcedureBase'] = np.select(conditions_procedure_code, values_procedure_code)
    return df

In [5]:
def define_allowed_amount(df, mu, sigma, algorithm):
    df = df.reset_index(drop=True)
    count = df[df.columns[0]].count()
    if algorithm == "lognormal":
        value_adjust = np.random.lognormal(mu, sigma, count).tolist()
        df["income"] = value_adjust
        df["allowedAmount"] = df["providerTypeBase"] + df["valuesProcedureBase"] + df["income"]
    if algorithm == "uniform":
        value_adjust = np.random.uniform(mu, sigma, count).tolist()
        df["income"] = value_adjust
        df["allowedAmount"] = df["providerTypeBase"] + (df["valuesProcedureBase"] * df["income"])
    df = df.drop('providerTypeBase', axis=1)
    df = df.drop('valuesProcedureBase', axis=1)
    df = df.drop('income', axis=1)
    return df

In [6]:
def clean_index(df):
    df = df.sort_values(['new_id'], ignore_index=True)
    df = df.drop('new_id', axis=1)
    df = df.convert_dtypes()
    df['fromDate'] = df['fromDate'].astype("string")
    df['allowedAmount'] = df['allowedAmount'].astype("string")
    return df

In [7]:
def generete_schema1(amount):
    df1 = create_initial_schema(amount)
    df1 = define_value_per_provider(df1)
    df1 = define_value_per_procedure(df1)
    schema1 = pd.concat([define_allowed_amount(df1[df1.providerType == "snf"], 6., 1., "lognormal"), \
                         define_allowed_amount(df1[df1.providerType == "inpatient"], 6., 1., "lognormal"), \
                         define_allowed_amount(df1[(df1.providerType == "outpatient") | (df1.providerType == "null")], 6., 1., "lognormal"), \
                         define_allowed_amount(df1[(df1.providerType == "hha") | (df1.providerType == "home health")], 50, 1., "uniform")])
    schema1 = schema1.reset_index(drop=True)
    schema1 = clean_index(schema1)
    return schema1

## Schema 2 functions

In [8]:
def modify_provider_type(df):
    df.loc[df["providerType"] == "inpatient", "providerType"] = "ip"
    df.loc[df["providerType"] == "home health", "providerType"] = "hha"
    df.loc[df["providerType"] == "null", "providerType"] = "op"
    df.loc[df["providerType"] == "outpatient", "providerType"] = "op"
    return df

In [9]:
def modify_line_number(df):
    df['lineNumber'] = df['lineNumber'].str[12:]
    return df

In [10]:
def clean_procedure_code_diagnosis(df):
    df.loc[df["procedureCode"] == "INSULINE", "procedureCode"] = "INSULIN"
    df.loc[df["diagnosis"] == "DBTS", "diagnosis"] = "DIABETES"
    return df

In [11]:
def edit_data_type(df):
    df['line_number'] = df['line_number'].astype(int)
    df['allowed_amount'] = df['allowed_amount'].astype(float)
    df["from_date"] = pd.to_datetime(df["from_date"])
    df = df.convert_dtypes()
    return df

In [12]:
  
def categorise_outlier_cost_inpatient(row, value):  
    if (row['allowed_amount'] >= value) and (row['provider_type'] == "inpatient"):
        return True
    else:
        return False    

In [13]:
def calculate_high_cost(df, percentil_value, new_column):
    df['new_id'] = df.index
    df1 = pd.DataFrame()
    
    list_providers = df['provider_type'].drop_duplicates().tolist()
    list_procedures = df['procedure_code'].drop_duplicates().tolist()

    for provider in list_providers:
        for procedure in list_procedures:
            df_s = df.loc[(df.provider_type == provider) & (df.procedure_code == procedure)]            
            percentil = df_s.allowed_amount.quantile(percentil_value)
            df_s[new_column] = np.where(df_s['allowed_amount'] >= percentil, True, False)   
            df1 = pd.concat([df1, df_s])
    df1 = df1.sort_values(['new_id'], ignore_index=True)
    df1 = df1.drop('new_id', axis=1)
    return df1

In [14]:
def calculate_outlier_cost_inpatient(df, percentil_value):
    df_slice = df[(df.provider_type == "ip")]
    percentil = df_slice.allowed_amount.quantile(percentil_value)
    df["outlier_cost_inpatient"] = df.apply(lambda row: categorise_outlier_cost_inpatient(row, percentil), axis=1)
    return df

In [15]:
def flags_creation(df):                
    df['is_diabetic'] = np.where(df['diagnosis'] == "DIABETES", True, False)
    df['is_lonely'] = np.where(df['diagnosis'] == "LONELINESS", True, False)
    df['given_insulin'] = np.where(df['procedure_code'] == "INSULIN", True, False)
    df = calculate_high_cost(df, 0.9, "high_cost")
    df = calculate_outlier_cost_inpatient(df, 0.99)
    df['created_at'] = pd.to_datetime("2015-06-15",format='%Y-%m-%d') 
    df['active'] = np.where(df['from_date'] >= df['created_at'], True, False)
    df['valid_from'] = pd.to_datetime("2021-11-24",format='%Y-%m-%d')
    df['valid_thru'] = pd.to_datetime("2021-05-05",format='%Y-%m-%d')        
    return df

In [16]:
def create_schema2(df):
    df = modify_provider_type(df)
    df = modify_line_number(df)
    df = clean_procedure_code_diagnosis(df)
    df.columns = [re.sub(r'(?<!^)(?=[A-Z])', '_', x).lower() for x in df.columns]
    df = edit_data_type(df)
    df = flags_creation(df)
    return df

## Creating schemas

In [17]:
schema1 = generete_schema1(5000)

In [18]:
schema2 = create_schema2(schema1.copy())

## Database Functions

In [19]:
def create_connection(db_file):
    conn = None
    try:
        conn = sqlite3.connect(db_file)
    except Error as e:
        print(e)

    return conn

In [20]:
def create_tables(conn):
    cursor_obj = conn.cursor()    
    raw_claims_history = """ CREATE TABLE raw_claims_history (
                index INT NOT NULL,
                claimId VARCHAR(255) NOT NULL,
                ccn VARCHAR(255) NOT NULL,
                providerType VARCHAR(255),
                lineNumber VARCHAR(255) NOT NULL,
                allowedAmount VARCHAR(255) NOT NULL,
                procedureCode VARCHAR(255) NOT NULL,
                diagnosis VARCHAR(255) NOT NULL,
                fromDate VARCHAR(255) NOT NULL
            ); """    

    claims_history = """ CREATE TABLE claims_history (
            index INT NOT NULL,
            claim_id VARCHAR(255) NOT NULL,
            ccn VARCHAR(255) NOT NULL,
            provider_type VARCHAR(255) NOT NULL,
            line_number INT NOT NULL,
            allowed_amount REAL NOT NULL,
            procedure_code VARCHAR(255) NOT NULL,
            diagnosis VARCHAR(255) NOT NULL,
            from_date VARCHAR(255) NOT NULL,
            is_diabetic INT NOT NULL,
            is_lonely INT NOT NULL,
            given_insulin INT NOT NULL,
            high_cost INT NOT NULL,
            outlier_cost_inpatient INT,
            created_at VARCHAR(255) NOT NULL,
            active INT NOT NULL,
            valid_from VARCHAR(255) NOT NULL,
            valid_thru VARCHAR(255) NOT NULL
        ); """
    try:
        cursor_obj.execute(claims_history)
    except Error as e:
         print(e)

    try:
        cursor_obj.execute(raw_claims_history)
    except Error as e:
         print(e)
    conn.close()

In [21]:
def insert_data(schema1, schema2):
    engine = sqlalchemy.create_engine('sqlite:///signify_data_engineering.db', echo=False)
    schema1.to_sql('raw_claims_history', con=engine, if_exists='append')
    schema2.to_sql('claims_history', con=engine, if_exists='append')

## Create db, tables and inserting data

In [22]:
database = r"signify_data_engineering.db"
connection_obj = create_connection(database)
create_tables(connection_obj)
insert_data(schema1, schema2)

table claims_history already exists
table raw_claims_history already exists
