## Task: File Ingestion and Schema validation

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file:

    Total number of rows,

    total number of columns

    file size

In [1]:
import os
import time

In [2]:
#Size of the file
os.path.getsize('/content/Rate.csv')

16777216

### Read in the data with Dask

In [3]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('/content/Rate.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.02402663230895996 sec


### Read in the data with Pandas

In [4]:
import pandas as pd
start = time.time()
df = pd.read_csv('/content/Rate.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  0.3311746120452881 sec


In [6]:
%pip install dask
%pip install ray
%pip install "modin[all]"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ray
  Downloading ray-2.4.0-cp310-cp310-manylinux2014_x86_64.whl (58.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.6/58.6 MB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
Collecting frozenlist
  Downloading frozenlist-1.3.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (149 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.6/149.6 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
Collecting grpcio<=1.51.3,>=1.42.0
  Downloading grpcio-1.51.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.8/4.8 MB[0m [31m57.2 MB/s[0m eta [36m0:00:00[0m
Collecting virtualenv<20.21.1,>=20.0.24
  Downloading virtualenv

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting modin[all]
  Downloading modin-0.20.1-py3-none-any.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
Collecting rpyc==4.1.5
  Downloading rpyc-4.1.5-py3-none-any.whl (68 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.9/68.9 kB[0m [31m365.4 kB/s[0m eta [36m0:00:00[0m
Collecting boto3
  Downloading boto3-1.26.125-py3-none-any.whl (135 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.6/135.6 kB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting unidist[mpi]>=0.2.1
  Downloading unidist-0.3.0-py3-none-any.whl (104 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.3/104.3 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting modin-spreadsheet>=0.1.0
  Downloading modin_spreadsheet-0.1.2-py2.py3-none-any.whl (1.8 MB

### Read in the data with Modin and Ray

In [7]:
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('/content/Rate.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2023-05-03 12:04:18,128	INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


Read csv with modin and ray:  10.286516666412354 sec


Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.


### Here Dask is better than Pandas, Modin and Ray, with the least reading time of 0.012 sec

In [8]:
from dask import dataframe as dd
df = dd.read_csv('/content/Rate.csv',delimiter=',')

In [9]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 24 entries, BusinessYear to RowNumber
dtypes: object(10), float64(9), int64(5)

In [11]:
#No. of Rows
len(df.index)

225110

In [12]:
#No, of Columns
len(df.columns)

24

In [13]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')



In [14]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [15]:
data=df.columns
data

Index(['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum',
       'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate',
       'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age',
       'IndividualRate', 'IndividualTobaccoRate', 'Couple',
       'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents',
       'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent',
       'CoupleAndTwoDependents', 'CoupleAndThreeOrMoreDependents',
       'RowNumber'],
      dtype='object')

### Validation

In [16]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [17]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utility.py


In [None]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: Rate
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - BusinessYear
      - StateCode
      - IssuerId
      - SourceName
      - VersionNum
      - ImportDate
      - IssuerId2
      - FederalTIN
      - RateEffectiveDate
      - RateExpirationDate
      - PlanId
      - RatingAreaId
      - Tobacco
      - Age
      - IndividualRate
      - IndividualTobaccoRate
      - Couple
      - PrimarySubscriberAndOneDependent
      - PrimarySubscriberAndTwoDependents
      - PrimarySubscriberAndThreeOrMoreDependents
      - CoupleAndOneDependent
      - CoupleAndTwoDependents
      - CoupleAndThreeOrMoreDependents
      - RowNumber

Overwriting store.yaml


In [None]:
# Reading config file
import utility as util
config_data = util.read_config_file("store.yaml")

In [None]:
#data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'Rate',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['BusinessYear - StateCode - IssuerId - SourceName - VersionNum - ImportDate - IssuerId2 - FederalTIN - RateEffectiveDate - RateExpirationDate - PlanId - RatingAreaId - Tobacco - Age - IndividualRate - IndividualTobaccoRate - Couple - PrimarySubscriberAndOneDependent - PrimarySubscriberAndTwoDependents - PrimarySubscriberAndThreeOrMoreDependents - CoupleAndOneDependent - CoupleAndTwoDependents - CoupleAndThreeOrMoreDependents - RowNumber']}

In [None]:
# Reading process of the file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv('/content/Rate.csv',delimiter=',')
df_sample.head()

Unnamed: 0,BusinessYear,StateCode,IssuerId,SourceName,VersionNum,ImportDate,IssuerId2,FederalTIN,RateEffectiveDate,RateExpirationDate,...,IndividualRate,IndividualTobaccoRate,Couple,PrimarySubscriberAndOneDependent,PrimarySubscriberAndTwoDependents,PrimarySubscriberAndThreeOrMoreDependents,CoupleAndOneDependent,CoupleAndTwoDependents,CoupleAndThreeOrMoreDependents,RowNumber
0,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,29.0,,,,,,,,,14
1,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,14
2,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,15
3,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,15
4,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,16


In [None]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = "C:/Users/Amir/Desktop/" + config_data['file_name'] + f'.{file_type}'

In [None]:
import pandas as pd
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,BusinessYear,StateCode,IssuerId,SourceName,VersionNum,ImportDate,IssuerId2,FederalTIN,RateEffectiveDate,RateExpirationDate,...,IndividualRate,IndividualTobaccoRate,Couple,PrimarySubscriberAndOneDependent,PrimarySubscriberAndTwoDependents,PrimarySubscriberAndThreeOrMoreDependents,CoupleAndOneDependent,CoupleAndTwoDependents,CoupleAndThreeOrMoreDependents,RowNumber
0,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,29.0,,,,,,,,,14
1,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,14
2,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,36.95,,73.9,107.61,107.61,107.61,144.56,144.56,144.56,15
3,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,15
4,2014,AK,21989,HIOS,6,2014-03-19 07:06:49,21989,93-0438772,2014-01-01,2014-12-31,...,32.0,,,,,,,,,16


In [None]:
#validating the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['planid', 'rateexpirationdate', 'rateeffectivedate', 'importdate', 'primarysubscriberandthreeormoredependents', 'rownumber', 'federaltin', 'issuerid', 'individualrate', 'issuerid2', 'age', 'ratingareaid', 'couple', 'individualtobaccorate', 'statecode', 'coupleandtwodependents', 'tobacco', 'sourcename', 'primarysubscriberandonedependent', 'coupleandthreeormoredependents', 'businessyear', 'coupleandonedependent', 'primarysubscriberandtwodependents', 'versionnum']
Following YAML columns are not in the file uploaded ['businessyear - statecode - issuerid - sourcename - versionnum - importdate - issuerid2 - federaltin - rateeffectivedate - rateexpirationdate - planid - ratingareaid - tobacco - age - individualrate - individualtobaccorate - couple - primarysubscriberandonedependent - primarysubscriberandtwodependents - primarysubscriberandthreeormoredependents - coupleandonedependent - coupleandtw

0

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['businessyear', 'statecode', 'issuerid', 'sourcename', 'versionnum',
       'importdate', 'issuerid2', 'federaltin', 'rateeffectivedate',
       'rateexpirationdate', 'planid', 'ratingareaid', 'tobacco', 'age',
       'individualrate', 'individualtobaccorate', 'couple',
       'primarysubscriberandonedependent', 'primarysubscriberandtwodependents',
       'primarysubscriberandthreeormoredependents', 'coupleandonedependent',
       'coupleandtwodependents', 'coupleandthreeormoredependents',
       'rownumber'],
      dtype='object')
columns of YAML are: ['BusinessYear - StateCode - IssuerId - SourceName - VersionNum - ImportDate - IssuerId2 - FederalTIN - RateEffectiveDate - RateExpirationDate - PlanId - RatingAreaId - Tobacco - Age - IndividualRate - IndividualTobaccoRate - Couple - PrimarySubscriberAndOneDependent - PrimarySubscriberAndTwoDependents - PrimarySubscriberAndThreeOrMoreDependents - CoupleAndOneDependent - CoupleAndTwoDependents - CoupleAndThre

In [None]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['planid', 'rateexpirationdate', 'rateeffectivedate', 'importdate', 'primarysubscriberandthreeormoredependents', 'rownumber', 'federaltin', 'issuerid', 'individualrate', 'issuerid2', 'age', 'ratingareaid', 'couple', 'individualtobaccorate', 'statecode', 'coupleandtwodependents', 'tobacco', 'sourcename', 'primarysubscriberandonedependent', 'coupleandthreeormoredependents', 'businessyear', 'coupleandonedependent', 'primarysubscriberandtwodependents', 'versionnum']
Following YAML columns are not in the file uploaded ['businessyear - statecode - issuerid - sourcename - versionnum - importdate - issuerid2 - federaltin - rateeffectivedate - rateexpirationdate - planid - ratingareaid - tobacco - age - individualrate - individualtobaccorate - couple - primarysubscriberandonedependent - primarysubscriberandtwodependents - primarysubscriberandthreeormoredependents - coupleandonedependent - coupleandtw

In [None]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('/content/Rate.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('Rate.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

In [None]:
#number of files in gz format folder
import os
entries = os.listdir('/content/Rate.csv.gz/')
for entry in entries:
    print(entry)

00.part
01.part
02.part
03.part
04.part
05.part
06.part
07.part
08.part
09.part
10.part
11.part
12.part
13.part
14.part
15.part
16.part
17.part
18.part
19.part
20.part
21.part
22.part
23.part
24.part
25.part
26.part
27.part
28.part
29.part
30.part


In [None]:
#size of the gz format folder
os.path.getsize('/content/Rate.csv.csv.gz')

8192