# Part 4: Data Wrangling - Using Dask to remove repeating values

Currently, we have parquet files for each associated json files that matches a discount rate for a procedure(code type + code value) to a CCN (hospital). To make this data more managable, we need to remove data that is repeated and combine and organize all the files in a managable way. That is the objective of this notebook.

This notebook utilizes dask, since all the files are currently seperated based on company that purchases United Healthcare Insurance, to delete repeated values to make a more manageable dataset. After this we can select for some procedure and begin exploratory analysis.

In [1]:
import pandas as pd
import numpy as np
import os
from dotenv import load_dotenv
from datetime import date, datetime, timedelta
import sqlalchemy
import pymysql
import openpyxl
import glob
from ast import literal_eval
from collections import Counter
from tqdm.auto import tqdm
import dask.dataframe as dd
import dask.array as da
import dask.bag as db
import pyarrow as pa
from dask.distributed import Client

### Files to obtain

First, we need to obtain the paths of the parquet files of billing information merged with CCN created in the prior notebook. The parent directory is locally stored in a dotenv file. You may replace the location as you see fit.

In [2]:
load_dotenv()

hyperlink_path = 'json_completed_hyperlinks_update.csv'
parent_dir = os.getenv('dir')
data_dir = os.path.join(parent_dir,'data_update')

df = pd.read_csv(hyperlink_path, header=None)
df.head()
df.columns = ['ParseID','Hyperlink']
hyperlinks = df['Hyperlink'].tolist()

def foldername(hyperlink):
    hyperlink = hyperlink.split('/')[-1]
    return hyperlink[0:-8]
def providers_path(folder):
    return os.path.join(data_dir,folder,folder+'_providers.csv')

folder_names= [foldername(hyperlink) for hyperlink in hyperlinks]
provider_files = [providers_path(folder_name) for folder_name in folder_names]

Using dask allows us to work in parallel when completing tasks. We can select the number of workers. I set it to one here due to the CPU I am working with. You can increase the number workers based on your CPU.

In [3]:
client = Client(n_workers=1)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 1
Total threads: 4,Total memory: 3.86 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:58748,Workers: 1
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 3.86 GiB

0,1
Comm: tcp://127.0.0.1:58758,Total threads: 4
Dashboard: http://127.0.0.1:58759/status,Memory: 3.86 GiB
Nanny: tcp://127.0.0.1:58751,
Local directory: C:\Users\VIGNES~1\AppData\Local\Temp\dask-worker-space\worker-3leup45v,Local directory: C:\Users\VIGNES~1\AppData\Local\Temp\dask-worker-space\worker-3leup45v


### Read our Parquet Files with Dask

Dask has similar functions to Pandas, but it does not perform the task until it is required to. This is a form of lazy computation. When I call the dataframe, it displays basic information about the tables I want to gather, but it has not actually compiled the file or allocated it to memory. It performs some basic tasks to get an idea about the data we are working with. Here is an example where we compile all the parquet files created in the prior notebook into one Dask dataframe.

In [3]:
ddf = dd.read_parquet('D://Vignesh/Capstone/data_update/*/*_merge.parquet', 
                     columns=['billing_type','billing_code','negotiated_rates','ccn'], engine='pyarrow')

ddf

Unnamed: 0_level_0,billing_type,billing_code,negotiated_rates,ccn
npartitions=586,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,object,object,float64,object
,...,...,...,...
...,...,...,...,...
,...,...,...,...
,...,...,...,...


One can drop the duplicates from all our files. I will save the file as a CSV for access later. You may save the file as a different format. This may take some time to execute.

In [5]:
%%time

ddf = ddf.drop_duplicates(ignore_index=True)
ddf.to_csv("D://Vignesh/Capstone/combined/export.csv")

CPU times: total: 24 s
Wall time: 7min 27s


['D:\\Vignesh\\Capstone\\combined\\export.csv\\0.part']

Similary, I will save the file as a parquet file below. This may take some time to execute.

In [7]:
ddf = ddf.drop_duplicates(ignore_index=True)
ddf.to_parquet("D://Vignesh/Capstone/combined/export.parquet", engine='pyarrow', write_index=False)

Instead of creating one file to save all different code types, I believe it might be easier to access our data if the files are divided based on coding type. Here, let us first see how many coding types we have. This may take some time to execute.

In [4]:
n = ddf['billing_type'].unique().compute()
n

0         CPT
1       HCPCS
2    CSTM-ALL
3      MS-DRG
4          RC
5         CDT
Name: billing_type, dtype: object

We have 6 different coding types. Let us save type as a seperate parquet file for easy access. The most common coding types are MS-DRG, CPT, and HCPCS. Let us save these code types as seperate files.

The following is for coding type MS-DRG. The file will be saved as both a CSV and parquet file. This may take several minutes.

In [6]:
ddf = dd.read_parquet('D://Vignesh/Capstone/data_update/*/*_merge.parquet', 
                     columns=['billing_type','billing_code','negotiated_rates','ccn'], engine='pyarrow')
ddf = ddf.drop_duplicates(ignore_index=True)
ddf_drg = ddf[ddf['billing_type']=='MS-DRG']
ddf_drg.to_csv("D://Vignesh/Capstone/combined/drg_csv")
ddf_drg.to_parquet("D://Vignesh/Capstone/combined/drg_parquet", engine='pyarrow', write_index=False)

The following is for coding type CPT. The file will be saved as both a CSV and parquet file. This may take several minutes.

In [4]:
ddf = dd.read_parquet('D://Vignesh/Capstone/data_update/*/*_merge.parquet', 
                     columns=['billing_type','billing_code','negotiated_rates','ccn'], engine='pyarrow')
ddf = ddf.drop_duplicates(ignore_index=True)
ddf_cpt = ddf[ddf['billing_type']=='CPT']
ddf_cpt.to_csv("D://Vignesh/Capstone/combined/cpt_csv")
ddf_cpt.to_parquet("D://Vignesh/Capstone/combined/cpt_parquet", engine='pyarrow', write_index=False)

The following is for coding type HCPCS. The file will be saved as both a CSV and parquet file. This may take several minutes.

In [5]:
ddf = dd.read_parquet('D://Vignesh/Capstone/data_update/*/*_merge.parquet', 
                     columns=['billing_type','billing_code','negotiated_rates','ccn'], engine='pyarrow')
ddf = ddf.drop_duplicates(ignore_index=True)
ddf_hcpcs = ddf[ddf['billing_type']=='HCPCS']
ddf_hcpcs.to_csv("D://Vignesh/Capstone/combined/hcpcs_csv")
ddf_hcpcs.to_parquet("D://Vignesh/Capstone/combined/hcpcs_parquet", engine='pyarrow', write_index=False)

### MySQL database (Optional)
The following code can be used to set up a database with these values and is optional. This can be useful when exploring different procedures.
Lets determine varchars lengths for each column we intend to create.

Please feel free to set up a connection to a database below and store the information there.

In [6]:
(host, user, password, port, database) = (os.getenv('host'), os.getenv('user'), os.getenv('passwd'), os.getenv('port'), os.getenv('database'))

def get_connection():
    return sqlalchemy.create_engine(
        url="mysql+pymysql://{0}:{1}@{2}:{3}/{4}".format(
            user, password, host, port, database
        )
    )

engine = get_connection()

The following code can help define your table parameters for your mySQL server.

In [None]:
unquie_types = ddf['billing_type'].unique().compute()
bt_max = ddf['billing_type'].str.len().max().compute()
bc_max = ddf['billing_code'].str.len().max().compute()
ccn_max = ddf['ccn'].str.len().max().compute()

Here is an implementation of a rates table that can be stored as an example.

In [8]:
table= 'rates'
columns = ['billing_type', 'billing_code', 'negotiated_rates', 'ccn']
sqltypes_rates = {'billing_type': sqlalchemy.types.VARCHAR(length=8), 'billing_code': sqlalchemy.types.VARCHAR(length=7), 
                  'negotiated_rates': sqlalchemy.types.FLOAT(), 'ccn': sqlalchemy.types.VARCHAR(length=6)}

In [None]:
ddf = dd.read_parquet('D://Vignesh/Capstone/data_update/*/*_merge.parquet', 
                     columns=['billing_type','billing_code','negotiated_rates','ccn'], engine='pyarrow')
ddf = ddf.drop_duplicates(ignore_index=True)
ddf.head()

In [13]:
ddf.to_sql(name=table,uri="mysql+pymysql://{0}:{1}@{2}:{3}/{4}".format(user, password, host, port, database), if_exists='append', index=False, chunksize=100, dtype=sqltypes_rates)

## Conclusion:

In this notebook, we combined all our billing information linking negotiated rates for procedures to their respective hospital/facility within the CMS dataset. These files were stored in several parquet files based on the coding type used to differentiate procedures. In the next notebook, we will begin EDA.