### Name: Manoj Kumar Thangaraj
### Course: LISUM01

### Week 6 Task


## Task:

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file:

    Total number of rows,

    total number of columns

    file size
    
# Data Ingestion sample code walkthrough

## 
  > Create a utility file
  
  > Config file creation
  
  > Data ingestion pipeline

**Let's write an utility file and import the required libraries**

Define a function for reading and writing the file in **YAML** format and column validation.

    i) read_config for file reading in YAML,
    ii) replacer for removing uncessary charcters,
    iii) col_header_val for column validation

In [1]:
%%writefile testutility.py

#Import the required libraries
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    '''Takes in the filepath and read the file
    Args: Filepath
    return: Yaml config'''
    with open(filepath, 'r') as stream:   #open the file as read only
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:     #output an error if file not found
            logging.error(exc)


def replacer(string, char):
    '''Takes in the string and the character and return after removing unnecessary characters
    Args: string, char
    return: string'''
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)   #using regex remove unnecessary charcters
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    Args: dataframe, table_config from YAML
    returns: validated columns after preprocessing
    '''
    df.columns = df.columns.str.lower()       #lower case
    df.columns = df.columns.str.replace('[^\w]','_',regex=True) #remove tags and replace them with underscore
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns))) #strip out underscore
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


### Write YAML File

The file taken from Kaggle website on the following link https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store?select=2019-Oct.csv where we can two datasets present and 0ct-2019.csv is the downloaded dataset.

 source of Link: link to this page and link to REES46 Marketing Platform.

In [22]:
%%writefile file.yaml
file_type: csv
dataset_name: Oct
file_name: Oct
table_name: Events
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
total_rows: 42448764
columns: 
    - event_time
    - event_type
    - product_id
    - category_id
    - category_code
    - brand
    - price
    - user_id
    - user_session
   

Overwriting file.yaml


In [23]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [24]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'Oct',
 'file_name': 'Oct',
 'table_name': 'Events',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'total_rows': 42448764,
 'columns': ['event_time',
  'event_type',
  'product_id',
  'category_id',
  'category_code',
  'brand',
  'price',
  'user_id',
  'user_session']}

The config data files are read in the dictionary format and we can see we have **keys and values**.

### File is read through pandas, yaml config and dask library and their computational efficiency is compared at the end.

In [5]:
# Normal reading process of the file through pandas
import pandas as pd
import time
start_time = time.time()
df_sample = pd.read_csv("Oct.csv",delimiter=',')
end_time = time.time()
df_sample.head()
Total_time = end_time - start_time
print("The processing time through pandas library is",Total_time,"seconds")

The processing time through pandas library is 508.51016330718994 seconds


In [6]:
#Let's check the dataframe
df_sample.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


Now we will check for the number of rows in total for this file.

In [13]:
no_of_rows = len(df_sample)

print("The total number of rows in this file is",no_of_rows)

The total number of rows in this file is 42448764


In [16]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
start_time = time.time()
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
end_time = time.time()
Total_time = end_time - start_time
df.head()
print("The processing time through YAML library is",Total_time,"seconds")

The processing time through YAML library is 1076.5904235839844 seconds


In [17]:
#checking the number of rows
no_of_rows = len(df)

#print the result
print("The total number of rows in this file is",no_of_rows)

The total number of rows in this file is 42448764


In [18]:
# read the file using dask file
import dask.dataframe as dd
start_time = time.time()
df_dask = dd.read_csv('Oct.csv')
end_time = time.time()
Total_time = end_time - start_time
print("The processing time through pandas library is",Total_time,"seconds")

The processing time through pandas library is 0.5963287353515625 seconds


In [19]:
#checking the number of rows
no_of_rows = len(df_dask)

#print the result
print("The total number of rows in this file is",no_of_rows)

The total number of rows in this file is 42448764


In [20]:
#check the dataframe
df_dask.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


### Now we will validate the dataframe read through libraries with YAML config file, the modules can be imported through testutilities.py file

In [25]:
#validate the header of the file
util.col_header_val(df_sample,config_data)

column name and column length validation passed


1

In [27]:
#print the number of columns on both config and pandas file
print("columns of files are:" ,df_sample.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
columns of YAML are: ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


### As per the pipeline, if our file is accepted or rejected based on the validation, further actions are taken, which is commented out on the below code as they are out of scope for this part.

In [28]:
if util.col_header_val(df_sample,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("column validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
column validation passed


### Once the files are validated and passed, we can write the file in the compressed 'gz' format for further use. The code can be found below.

In [29]:
# write a pandas dataframe to gzipped CSV file
df.to_csv("compressed.csv.gz", 
           index=False, 
           compression="gzip")

### Summary

- The total number of rows in the file are 42448764.
- The total number of the columns in the file are 8.
- The total size of the compressed file is 1.57GB