In [1]:
#Task: File Ingestion and Schema validation
#import time and os
import os
import time

In [2]:
#Size of the file
os.path.getsize("Rate.csv")


13899

In [3]:
#Read in the data with Dask
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('Rate.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.001009225845336914 sec


In [4]:
#Read in the data with Pandas
import pandas as pd
start = time.time()
df = pd.read_csv('Rate.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  0.0038077831268310547 sec


In [5]:
!pip install modin






In [6]:
!pip install ray






In [7]:
#Read in the data with Modin and Ray
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('Rate.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2022-11-09 14:05:10,274	INFO worker.py:1528 -- Started a local Ray instance.


Read csv with modin and ray:  1.0579862594604492 sec


In [8]:
#Here dask is better than pandas, Modin and Ray, with the least reading time of 0.001009225845336914  sec sec

In [20]:
from dask import dataframe as dd
df = dd.read_csv('Rate.csv',delimiter=',')

In [21]:
df.info()


<class 'dask.dataframe.core.DataFrame'>
Columns: 24 entries, BusinessYear to RowNumber
dtypes: object(9), float64(9), int64(6)

In [22]:
#No. of Rows
len(df.index)

67

In [23]:
#No, of Columns
len(df.columns)

24

In [24]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')



In [25]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [26]:
data=df.columns
data

Index(['BusinessYear', 'StateCode', 'IssuerId', 'SourceName', 'VersionNum',
       'ImportDate', 'IssuerId2', 'FederalTIN', 'RateEffectiveDate',
       'RateExpirationDate', 'PlanId', 'RatingAreaId', 'Tobacco', 'Age',
       'IndividualRate', 'IndividualTobaccoRate', 'Couple',
       'PrimarySubscriberAndOneDependent', 'PrimarySubscriberAndTwoDependents',
       'PrimarySubscriberAndThreeOrMoreDependents', 'CoupleAndOneDependent',
       'CoupleAndTwoDependents', 'CoupleAndThreeOrMoreDependents',
       'RowNumber'],
      dtype='object')

In [27]:
#Validation
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [28]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [29]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: Rate
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - BusinessYear
      - StateCode
      - IssuerId
      - SourceName
      - VersionNum
      - ImportDate
      - IssuerId2
      - FederalTIN
      - RateEffectiveDate
      - RateExpirationDate
      - PlanId
      - RatingAreaId
      - Tobacco
      - Age
      - IndividualRate
      - IndividualTobaccoRate
      - Couple
      - PrimarySubscriberAndOneDependent
      - PrimarySubscriberAndTwoDependents
      - PrimarySubscriberAndThreeOrMoreDependents
      - CoupleAndOneDependent
      - CoupleAndTwoDependents
      - CoupleAndThreeOrMoreDependents
      - RowNumber

Overwriting store.yaml


In [34]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('Rate.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('Rate.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          lineterminator='\n')

['C:\\Users\\ish35\\Data glacier\\week6\\Rate.csv.gz\\0.part']

In [35]:
#number of files in gz format folder
import os
entries = os.listdir('Rate.csv.gz')
for entry in entries:
    print(entry)

0.part


In [36]:
#size of the gz format folder
os.path.getsize('Rate.csv.gz')

0