# Week 6: File ingestion and schema validation

Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

Total number of rows,

total number of columns

In [1]:
!pip install datatable



In [2]:
import os
import time

In [3]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


# 1) Trying different methods of file reading

### Time it took with Dask

In [58]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('data.csv')
end = time.time()
print("Time it took to read the file using Dask: ",(end-start),"sec")

Time it took to read the file using Dask:  0.023914813995361328 sec


### Time it took with Panads

In [60]:
import pandas as pd
start = time.time()
pd_df = pd.read_csv('data.csv')
end = time.time()
print("Time it took to read the file using panads: ",(end-start),"sec")

Time it took to read the file using panads:  87.40052461624146 sec


### Time it took with Datatable

In [61]:
import datatable as dt

start = time.time()
df_dt = dt.fread('data.csv')
end = time.time()
print("Time it took to read the file using Datatable:", (end-start),"sec")

Time it took to read the file using Datatable: 35.583712577819824 sec


#### Dask was the fastest one by a lot of time. 

# 2) Removeing special character , white spaces etc

In [62]:
df = dd.read_csv('data.csv')
df.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
1,2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2,2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
3,2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
4,2019-11-01 00:00:01 UTC,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2


In [63]:
df.columns

Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')

In [64]:
df.columns=df.columns.str.replace('[#,@,&,_]','')
df.columns = df.columns.str.replace(' ', '')
df.columns

  df.columns=df.columns.str.replace('[#,@,&,_]','')


Index(['eventtime', 'eventtype', 'productid', 'categoryid', 'categorycode',
       'brand', 'price', 'userid', 'usersession'],
      dtype='object')

# 3) YAML file Creation

In [68]:
%%writefile file.yaml
file_type: csv
file_name: data
inbound_delimiter: ","
columns: 
    - event_time
    - event_type
    - product_id
    - category_id
    - category_code
    - brand
    - price
    - user_id
    - user_session

Overwriting file.yaml


In [69]:
import testutility as util
config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'file_name': 'data',
 'inbound_delimiter': ',',
 'columns': ['event_time',
  'event_type',
  'product_id',
  'category_id',
  'category_code',
  'brand',
  'price',
  'user_id',
  'user_session']}

In [73]:
# read the file using config file
#Using the dynamic way of reading the file so it would be flexible 
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
dataframe = pd.read_csv(source_file,config_data['inbound_delimiter'])
dataframe.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
1,2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2,2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
3,2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
4,2019-11-01 00:00:01 UTC,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2


# 4) Validation

In [74]:
util.col_header_val(dataframe ,config_data)

column name and column length validation passed


1

In [75]:
print("columns of files are:" ,dataframe.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
columns of YAML are: ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


#### Since we have the same columns on both files, the validation passed

# 5) Write the file in pipe separated text file (|) in gz format

In [76]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('data.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('data.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:/Users/HP/Data Glacier/week 6/data.csv.gz\\000.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\001.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\002.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\003.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\004.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\005.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\006.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\007.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\008.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\009.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\010.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\011.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\012.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\013.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\014.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\015.part',
 'C:/Users/HP/Data Glacier/week 6/data.csv.gz\\016.part',
 'C:/Users/HP/

# 6) Create a summary of the file

In [77]:
#Total number of rows of the file: 
print("The total number of rows: ", len(dataframe))
#Total number of columns of the file:
print("The number of columns: ", len(dataframe.columns))

The total number of rows:  67501979
The number of columns:  9


In [78]:
#Total number of rows of the YAML: 
print("The total number of rows: ", len(config_data))
#Total number of columns of the YAML:
print("The number of columns: ", len(config_data['columns']))

The total number of rows:  4
The number of columns:  9


In [80]:
# The size of the CSV
file_size = os.path.getsize('data.csv')
print("File Size is :", file_size, "bytes")

File Size is : 9006762395 bytes
