# 2GB file is downloaded from
# https://www.kaggle.com/datasets/hhs/health-insurance-marketplace?select=Rate.csv 



Rate.csv renamed as health_insurance.csv 

# Reading data with pandas

In [1]:
import pandas as pd

In [2]:
%%time
df = pd.read_csv(r'C:\Users\hooda\Downloads\archive (5)\Rate.csv')
df.head()


CPU times: total: 45.9 s
Wall time: 1min 12s


Unnamed: 0,BusinessYear,StateCode,IssuerId,SourceName,VersionNum,ImportDate,IssuerId2,FederalTIN,RateEffectiveDate,RateExpirationDate,...,IndividualRate,IndividualTobaccoRate,Couple,PrimarySubscriberAndOneDependent,PrimarySubscriberAndTwoDependents,PrimarySubscriberAndThreeOrMoreDependents,CoupleAndOneDependent,CoupleAndTwoDependents,CoupleAndThreeOrMoreDependents,RowNumber
0,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,29.0,,,,,,,,,14
1,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,14
2,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,15
3,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,15
4,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,16


Reading the 2GB csv file took  59.7 s using pandas

# Reading data with modin and ray

In [3]:
pip install modin

Collecting modin
  Downloading modin-0.24.1-py3-none-any.whl.metadata (17 kB)
Downloading modin-0.24.1-py3-none-any.whl (1.1 MB)
   ---------------------------------------- 0.0/1.1 MB ? eta -:--:--
   -------- ------------------------------- 0.2/1.1 MB 6.7 MB/s eta 0:00:01
   ------------------------------------- -- 1.0/1.1 MB 13.1 MB/s eta 0:00:01
   ---------------------------------------- 1.1/1.1 MB 9.9 MB/s eta 0:00:00
Installing collected packages: modin
Successfully installed modin-0.24.1
Note: you may need to restart the kernel to use updated packages.


In [4]:
import modin.pandas as mpd

In [5]:
#with ray as the computation engine (https://modin.readthedocs.io/en/stable/)

import ray
ray.shutdown()
ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}})

2023-10-16 07:43:58,740	INFO worker.py:1633 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.4
Ray version:,2.7.1
Dashboard:,http://127.0.0.1:8265


In [6]:
%%time
mdf = mpd.read_csv(r'C:\Users\hooda\Downloads\archive (5)\Rate.csv')

CPU times: total: 5.02 s
Wall time: 1min 12s


Reading the 2GB csv file took 16.5 s using modin with ray

# Reading data with dask

In [8]:
# https://docs.dask.org/en/stable/generated/dask.dataframe.read_csv.html
from dask import dataframe as dd

In [9]:
%%time
dask_df = dd.read_csv(r'C:\Users\hooda\Downloads\archive (5)\Rate.csv')

CPU times: total: 15.6 ms
Wall time: 69.8 ms


Dask is better than pandas and modin with computational time of 0.014 s, whereas the other two took 23.5 s and 16.5 s respectively

# Using Dask to read the file and performing ingestion

In [None]:
dask_df = dd.read_csv('health_insurance.csv', delimiter = ',')
dask_df.head()

Unnamed: 0,BusinessYear,StateCode,IssuerId,SourceName,VersionNum,ImportDate,IssuerId2,FederalTIN,RateEffectiveDate,RateExpirationDate,...,IndividualRate,IndividualTobaccoRate,Couple,PrimarySubscriberAndOneDependent,PrimarySubscriberAndTwoDependents,PrimarySubscriberAndThreeOrMoreDependents,CoupleAndOneDependent,CoupleAndTwoDependents,CoupleAndThreeOrMoreDependents,RowNumber
0,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,29.0,,,,,,,,,14
1,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,14
2,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,15
3,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,15
4,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,16


In [10]:
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 24 entries, BusinessYear to RowNumber
dtypes: float64(9), int64(5), string(10)

In [11]:
print("Number of rows: ", len(dask_df.index))
print("Number of columns : {}".format(len(dask_df.columns)))



Number of rows:  12694445
Number of columns : 24


In [12]:
dask_df.columns

Index(['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum',
       'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate',
       'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age',
       'IndividualRate', 'IndividualTobaccoRate', 'Couple',
       'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents',
       'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent',
       'CoupleAndTwoDependents', 'CoupleAndThreeOrMoreDependents',
       'RowNumber'],
      dtype='object')

There are no special characters or white spaces in the column names, however, they can be removed as follows

In [13]:
# remove special character
dask_df.columns=dask_df.columns.str.replace('\W','',regex=True)

#To remove white space from columns
dask_df.columns = dask_df.columns.str.replace(' ', '')

In [14]:
dask_df.columns

Index(['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum',
       'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate',
       'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age',
       'IndividualRate', 'IndividualTobaccoRate', 'Couple',
       'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents',
       'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent',
       'CoupleAndTwoDependents', 'CoupleAndThreeOrMoreDependents',
       'RowNumber'],
      dtype='object')

# Creating utility file

In [15]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utility.py


In [16]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: health_insurance
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - BusinessYear
    - StateCode
    - IssuerId
    - SourceName
    - VersionNum
    - ImportDate
    - IssuerId2
    - FederalTIN
    - RateEffectiveDate
    - RateExpirationDate
    - PlanId
    - RatingAreaId
    - Tobacco
    - Age
    - IndividualRate
    - IndividualTobaccoRate
    - Couple
    - PrimarySubscriberAndOneDependent
    - PrimarySubscriberAndTwoDependents
    - PrimarySubscriberAndThreeOrMoreDependents
    - CoupleAndOneDependent
    - CoupleAndTwoDependents
    - CoupleAndThreeOrMoreDependents
    - RowNumber

Writing file.yaml


In [17]:
# Read config file
import utility as util
config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'health_insurance',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['BusinessYear',
  'StateCode',
  'IssuerId',
  'SourceName',
  'VersionNum',
  'ImportDate',
  'IssuerId2',
  'FederalTIN',
  'RateEffectiveDate',
  'RateExpirationDate',
  'PlanId',
  'RatingAreaId',
  'Tobacco',
  'Age',
  'IndividualRate',
  'IndividualTobaccoRate',
  'Couple',
  'PrimarySubscriberAndOneDependent',
  'PrimarySubscriberAndTwoDependents',
  'PrimarySubscriberAndThreeOrMoreDependents',
  'CoupleAndOneDependent',
  'CoupleAndTwoDependents',
  'CoupleAndThreeOrMoreDependents',
  'RowNumber']}

In [21]:
config_data['file_name']

'health_insurance'

In [22]:
# read the csv file using config file with dask 
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
print(source_file)
import dask.dataframe as dd

# Read the CSV file using Dask
df = dd.read_csv(r'C:\Users\hooda\Downloads\archive (5)\Rate.csv', delimiter=config_data['inbound_delimiter'])

# Display the first few rows of the Dask DataFrame
df.head()



./health_insurance.csv


Unnamed: 0,BusinessYear,StateCode,IssuerId,SourceName,VersionNum,ImportDate,IssuerId2,FederalTIN,RateEffectiveDate,RateExpirationDate,...,IndividualRate,IndividualTobaccoRate,Couple,PrimarySubscriberAndOneDependent,PrimarySubscriberAndTwoDependents,PrimarySubscriberAndThreeOrMoreDependents,CoupleAndOneDependent,CoupleAndTwoDependents,CoupleAndThreeOrMoreDependents,RowNumber
0,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,29.0,,,,,,,,,14
1,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,14
2,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,15
3,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,15
4,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,16


In [27]:
print("columns of files are:" ,dask_df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum',
       'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate',
       'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age',
       'IndividualRate', 'IndividualTobaccoRate', 'Couple',
       'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents',
       'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent',
       'CoupleAndTwoDependents', 'CoupleAndThreeOrMoreDependents',
       'RowNumber'],
      dtype='object')
columns of YAML are: ['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum', 'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate', 'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age', 'IndividualRate', 'IndividualTobaccoRate', 'Couple', 'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents', 'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent', 'CoupleAndTwoDepen

# Write the file in pipe separated text file (|) in gz format

In [28]:
dask_df.to_csv("health_insurance.gz", sep = '|', index = False)



['c:\\Users\\hooda\\Desktop\\health_insurance.gz\\00.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\01.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\02.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\03.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\04.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\05.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\06.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\07.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\08.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\09.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\10.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\11.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\12.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\13.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\14.part',
 'c:\\Users\\hooda\\Desktop\\health_insurance.gz\\15.part',
 'c:\\Users\\hooda\\Desktop\\health_insu

In [34]:
#number of files in gz format folder

entries = os.listdir('health_insurance.gz')
len(entries)

30

In [33]:
#size of the gz format folder
import os as os
os.path.getsize('health_insurance.gz')

12288