# **File ingestion and schema validation**

Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

Read the file ( Present approach of reading the file )

Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency

Perform basic validation on data columns : eg: remove special character , white spaces from the col name

As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of read and write file, column name in YAML

Validate number of columns and column name of ingested file with YAML.

Write the file in pipe separated text file (|) in gz format.

Create a summary of the file:

1. Total number of rows,

2. total number of columns

3. file size

dataset [link](https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store?fbclid=IwAR3Rji50q1WtaHD8By7X-_E6WO52Ksj7SQmulpV7-flbmhQKerdWR1M2cdc&select=2019-Oct.csv)

### **Dataset Grossary**

About this file

This file contains behavior data for a one month (October 2019) from a large multi-category online store.

Each row in the file represents an event. All events are related to products and users. There are different types of events.

**Dataset description**

1. event_time: When event is was happened (UTC)
2. event_type: Event type: one of [view, cart, remove_from_cart, purchase]
3. product_id: Product ID
4. category_id: Product category ID
5. category_code: Category meaningful name (if present)
6. brand: Brand name in lower case (if present)
7. price: Product price
8. user_id: Permanent user ID
9. user_session: User session ID

In [16]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import yaml

#creating a function to read the file
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower() # changing all columns to lower case
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [17]:
#mounting the drive
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


In [3]:
##writing yaml file
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: online_trans
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - event_time
    - event_type
    - product_id
    - category_id
    - category_code
    - brand
    - price
    - user_id
    - user_session

Writing file.yaml


In [23]:
# Reading the  config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [24]:
#confriming the delimiter
config_data['inbound_delimiter']

','

In [25]:
# confirming the columnsconfig
config_data['columns']

['event_time',
 'event_type',
 'product_id',
 'category_id',
 'category_code',
 'brand',
 'price',
 'user_id',
 'user_session']

In [7]:
#inspecting data of config file
config_data

{'columns': ['event_time',
  'event_type',
  'product_id',
  'category_id',
  'category_code',
  'brand',
  'price',
  'user_id',
  'user_session'],
 'dataset_name': 'testfile',
 'file_name': 'online_trans',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

## **Reading the Files Using different Methods**

we will try to load and read the file and check the time taken for the work to be done




#### **1. Reading using config file**

In [19]:
# importing the operating system to get the file path
import os
print("Confirming my file(online_trans.csv) exists: ", os.listdir())
print("\n")
print("The file directory path is:",os.getcwd()) # getting the exact file locattion directory

Confirming my file(online_trans.csv) exists:  ['online_trans.csv', '__pycache__', 'dask-worker-space', 'ecommerce.gz', 'testutility.py', 'file.yaml', 'online_trans.gz', 'trans_online.gz', 'trans_dask.gz']


The file directory path is: /content/drive/My Drive/data_glacier_intern


In [20]:
#using an absolute path to open the file:
os.chdir(r'/content/drive/MyDrive/data_glacier_intern/')

In [21]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
print("",source_file)

 ./online_trans.csv


In [9]:
#import needed libraries
import time
import pandas as pd

#load and read the dataset
start_time = time.time()
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
end_time = time.time()
print("config estimated loading time: = {}".format(end_time-start_time))
print("\n")
df.head()

config estimated loading time: = 139.54040122032166




Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


### **Validate number of columns and column name of ingested file with YAML**

In [10]:
#validating the header(columns) of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [13]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
columns of YAML are: ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


In [11]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [15]:
#checking the dataset features
pd.read_csv("./online_trans.csv")



Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.20,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.10,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d
...,...,...,...,...,...,...,...,...,...
42448759,2019-10-31 23:59:58 UTC,view,2300275,2053013560530830019,electronics.camera.video,gopro,527.40,537931532,22c57267-da98-4f28-9a9c-18bb5b385193
42448760,2019-10-31 23:59:58 UTC,view,10800172,2053013554994348409,,redmond,61.75,527322328,5054190a-46cb-4211-a8f1-16fc1a060ed8
42448761,2019-10-31 23:59:58 UTC,view,5701038,2053013553970938175,auto.accessories.player,kenwood,128.70,566280422,05b6c62b-992f-4e8e-91f7-961bcb4719cd
42448762,2019-10-31 23:59:59 UTC,view,21407424,2053013561579406073,electronics.clocks,tissot,689.85,513118352,4c14bf2a-2820-4504-929d-046356a5a204


In [16]:
#checking the created Yaml dataframe features
df

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.20,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.10,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d
...,...,...,...,...,...,...,...,...,...
42448759,2019-10-31 23:59:58 UTC,view,2300275,2053013560530830019,electronics.camera.video,gopro,527.40,537931532,22c57267-da98-4f28-9a9c-18bb5b385193
42448760,2019-10-31 23:59:58 UTC,view,10800172,2053013554994348409,,redmond,61.75,527322328,5054190a-46cb-4211-a8f1-16fc1a060ed8
42448761,2019-10-31 23:59:58 UTC,view,5701038,2053013553970938175,auto.accessories.player,kenwood,128.70,566280422,05b6c62b-992f-4e8e-91f7-961bcb4719cd
42448762,2019-10-31 23:59:59 UTC,view,21407424,2053013561579406073,electronics.clocks,tissot,689.85,513118352,4c14bf2a-2820-4504-929d-046356a5a204


## 2. loading and reading the dataset using pandas

In [1]:
#importing libraries
import pandas as pd
import numpy as np
import time # used to estimate time it takes to load

In [2]:
#1.loading the dataset
start_time = time.time()
trans_pandas = pd.read_csv("/content/drive/MyDrive/data_glacier_intern/online_trans.csv", delimiter=',')
end_time = time.time()
print("estimated pandas loading time: = {}".format(end_time-start_time)) #use format method for computing time difference
print("\n")

#checking the first 5 rows
trans_pandas.head()


estimated pandas loading time: = 172.76540398597717




Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


## 3. loading and reading dataset using Dask

In [28]:
#install dask dataframe first and then import library
!pip install "dask[dataframe]"
import dask.dataframe as dd
import time

#loading the dataset
start_time = time.time()
trans_dask = dd.read_csv("/content/drive/MyDrive/data_glacier_intern/online_trans.csv", delimiter=',')
end_time = time.time()
print("dask estimated loading time: = {}".format(end_time-start_time))
print("\n") #break

#getting the first 5 colums
trans_dask.head()


Collecting fsspec>=0.6.0; extra == "dataframe"
[?25l  Downloading https://files.pythonhosted.org/packages/40/e1/7111d8afc76ee3171f4f99592cd29bac9d233ae1aa34623011506f955434/fsspec-2021.7.0-py3-none-any.whl (118kB)
[K     |████████████████████████████████| 122kB 8.7MB/s 
[?25hCollecting partd>=0.3.10; extra == "dataframe"
  Downloading https://files.pythonhosted.org/packages/41/94/360258a68b55f47859d72b2d0b2b3cfe0ca4fbbcb81b78812bd00ae86b7c/partd-1.2.0-py3-none-any.whl
Collecting locket
  Downloading https://files.pythonhosted.org/packages/50/b8/e789e45b9b9c2db75e9d9e6ceb022c8d1d7e49b2c085ce8c05600f90a96b/locket-0.2.1-py2.py3-none-any.whl
Installing collected packages: fsspec, locket, partd
Successfully installed fsspec-2021.7.0 locket-0.2.1 partd-1.2.0
dask estimated loading time: = 0.24152302742004395




Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


## 4. loading and reading the dataset using Datatable


In [None]:
#istalling and importing required library
#!pip install pip --upgrade
#!pip install datatable

In [7]:
#importing the library 
import datatable as dt
import time

#loading using datatable
start_time = time.time()
trans_datatable = dt.fread("/content/drive/MyDrive/data_glacier_intern/online_trans.csv")
end_time = time.time()
print("datatable estimated loading time: = {}".format(end_time-start_time))
print("\n") #break

#getting the first 5 colums
trans_datatable.head()

datatable estimated loading time: = 158.11506295204163




Unnamed: 0_level_0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
Unnamed: 0_level_1,▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪▪▪▪▪,▪▪▪▪,▪▪▪▪,▪▪▪▪▪▪▪▪,▪▪▪▪,▪▪▪▪
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d
5,2019-10-01 00:00:05 UTC,view,1480613,2053013561092866779,computers.desktop,pulser,908.62,512742880,0d0d91c2-c9c2-4e81-90a5-86594dec0db9
6,2019-10-01 00:00:08 UTC,view,17300353,2053013553853497655,,creed,380.96,555447699,4fe811e9-91de-46da-90c3-bbd87ed3a65d
7,2019-10-01 00:00:08 UTC,view,31500053,2053013558031024687,,luminarc,41.16,550978835,6280d577-25c8-4147-99a7-abc6048498d6
8,2019-10-01 00:00:10 UTC,view,28719074,2053013565480109009,apparel.shoes.keds,baden,102.71,520571932,ac1cd4e5-a3ce-4224-a2d7-ff660a105880
9,2019-10-01 00:00:11 UTC,view,1004545,2053013555631882655,electronics.smartphone,huawei,566.01,537918940,406c46ed-90a4-4787-a43b-59a410c1a5fb


## 5. loading and reading file using Modin and rays


In [None]:
'''
#installing the required libraries
#!pip install modin[all] #use pip install modin[dask] for dask, or pip install modin[ray] for ray. 
#or pip install modin[all] . this install both dask and ray
#
#importing the library
import pandas as pd
import modin.pandas as pd
import numpy as np
import ray
ray.shutdown()
ray.init()
'''

In [None]:
'''
#importing os
import os
os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray
os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask
'''

In [None]:
'''
# loading and reading the dataset with modin
import modin.pandas as pd
from distributed import Client
client = Client()
import time
'''



In [None]:
'''
start_time = time.time()
df1 = pd.read_csv("/content/drive/MyDrive/data_glacier_intern/online_trans.csv")
end_time = time.time()
print("modin estimated loading time: = {}".format(end_time-start_time))
print("\n") #break

#getting the first 5 colums.head()
df1.head()
'''

NOTE: crashing thus i was unable to run the codes above.

## File loading conclusion

While loading big files, i came to this conclusion
1. Dask is much faster compared to others; it take about 0.0125 seconds to load the big file.
2. Datatable cames second, as it is slower loading big file compared to dask.it took about 2.6 minutes (about 157 seconds).
3.config file came in 3rd, taking about 2.8 minutes to load (about 169 seconds)
4. Pandas came in last, taking about 2.86 minutes(about 171 seconds).

Therefore, Dask is the best when loading big files.

## **Creating a file in pipe separated text file (|) in gz format**.









In [29]:
import gzip 

while True:
    if util.col_header_val(trans_dask,config_data)==0:
        print("---------------------------------------------")
        print("Validation Failed! Please, check file columns!")
    else:
        print("Column Validation Passed")
        input = open('online_trans.csv', 'rb')
        s = input.read()
        input.close()
        
        output = gzip.GzipFile('trans_dask.gz','wb')
        output.write(s)
        output.close()
        
      
        print("Your Dataframe has been compressed to a .gzip file in the same folder.")
    break


column name and column length validation passed
Column Validation Passed
Your Dataframe has been compressed to a .gzip file in the same folder.


## **Create a summary of the file:**

1. Total number of rows,

2. total number of columns

2. file size

In [29]:
#total number of the rows
print("The dataset has  {} rows".format(len(trans_dask)))

The dataset has  42448764 rows


In [25]:
#checking number of columns
len(trans_dask.columns)
print("The dataset has {} columns" .format(len(trans_dask.columns)))

The dataset has 9 columns


In [30]:
import os
#file size
file_size = os.path.getsize("/content/drive/MyDrive/data_glacier_intern/online_trans.csv")

print("The file size is: {}" .format(file_size))

The file size is: 5668612855
