In [1]:
from google.colab import drive                #  Attact data from google drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
# Import Data and display the size of the dataset
import os
data_size = os.path.getsize('/content/drive/My Drive/2019-Oct.csv')/(1024**3)
print(f'The size of the data is {round(data_size,4)} GB')

The size of the data is 5.2793 GB


# Read with Pandas

In [8]:
import pandas as pd
import time

In [5]:
# Find elapsed time for read using pandas
start = time.time()
data = pd.read_csv('/content/drive/My Drive/2019-Oct.csv')
end = time.time()

In [7]:
time_elapsed = end-start
print(f'Time taken for read using Pandas: {round(time_elapsed,4)} seconds')

Time taken for read using Pandas: 135.6377 seconds


# Read with Dask

In [4]:
import dask.dataframe as dd

In [8]:
# Find elapsed time for read with Dask
start = time.time()
data = dd.read_csv('/content/drive/My Drive/2019-Oct.csv')
end = time.time()

In [9]:
time_elapsed = end-start
print(f'Time taken for read using Dask: {round(time_elapsed,4)} seconds')

Time taken for read using Dask: 0.3956 seconds


# Read using Modin

In [2]:
import modin.pandas as mpd

In [3]:
# Find elapsed time for read with Modin
start = time.time()
data = mpd.read_csv('/content/drive/My Drive/2019-Oct.csv')
end = time.time()


    import ray
    ray.init()

2023-12-10 02:06:24,091	INFO worker.py:1673 -- Started a local Ray instance.
[33m(raylet)[0m [2023-12-10 02:09:24,007 E 19035 19035] (raylet) node_manager.cc:3035: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 8fbf3da6b3520c6689ef79f8606405f42187d20e6b08e15b3b0c758e, IP: 172.28.0.12) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.28.0.12`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refr

In [4]:
time_elapsed = end-start
print(f'Time taken for read using modin: {round(time_elapsed,4)} seconds')

Time taken for read using modin: 249.505 seconds


# Read with Ray

In [21]:
import ray

In [22]:
# Find elapsed time for read with Ray
start = time.time()
data = ray.data.read_csv('/content/drive/My Drive/2019-Oct.csv')
end = time.time()

2023-12-10 23:55:50,921	INFO worker.py:1673 -- Started a local Ray instance.


In [23]:
time_elapsed = end-start
print(f'Time taken for read using ray: {round(time_elapsed,4)} seconds')

Time taken for read using ray: 8.2521 seconds


### **Ray read the data as ray dataset object where as Dask, Pandas and Mobin reads it as Dataframe object. From our analysis, dask is performing better with minimal reading time compared to the rest of the methods**

# Basic Validation

In [10]:
data.columns                            # Columns before special characters

Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')

In [24]:
data.columns = data.columns.str.strip().str.replace('[_,@,#,&, ]','')
data.columns                        # Columns after removing spaces and special characters

  data.columns = data.columns.str.strip().str.replace('[_,@,#,&, ]','').str.replace(' ','')


Index(['eventtime', 'eventtype', 'productid', 'categoryid', 'categorycode',
       'brand', 'price', 'userid', 'usersession'],
      dtype='object')

# Create YAML file


In [1]:
# Creating YAML file for validation by providing parameters and columns

%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: 2019-Oct
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - eventtime
    - eventtype
    - productid
    - categoryid
    - categorycode
    - brand
    - price
    - userid
    - usersession

Writing file.yaml


# Create Utility File

In [3]:
# Creating utility file for using utility functions during validation of the dataset
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):

    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utility.py


# Validation of Ingestion File with Yaml

In [4]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [5]:
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': '2019-Oct',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['eventtime',
  'eventtype',
  'productid',
  'categoryid',
  'categorycode',
  'brand',
  'price',
  'userid',
  'usersession']}

In [8]:
# Normal reading process of the file
import dask.dataframe as dd
df_norm = dd.read_csv('/content/drive/My Drive/2019-Oct.csv')
df_norm.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [13]:
# read the file using config file
file_type = config_data['file_type']
source_file = "/content/drive/My Drive/" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  df = pd.read_csv(source_file,config_data['inbound_delimiter'])


Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-10-01 00:00:00 UTC,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
1,2019-10-01 00:00:00 UTC,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2,2019-10-01 00:00:01 UTC,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
3,2019-10-01 00:00:01 UTC,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
4,2019-10-01 00:00:04 UTC,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [14]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['category_id', 'user_id', 'event_type', 'product_id', 'category_code', 'user_session', 'event_time']
Following YAML columns are not in the file uploaded ['userid', 'productid', 'categoryid', 'usersession', 'categorycode', 'eventtype', 'eventtime']


0

In [15]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
columns of YAML are: ['eventtime', 'eventtype', 'productid', 'categoryid', 'categorycode', 'brand', 'price', 'userid', 'usersession']


In [16]:
# Post validation logic
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['category_id', 'user_id', 'event_type', 'product_id', 'category_code', 'user_session', 'event_time']
Following YAML columns are not in the file uploaded ['userid', 'productid', 'categoryid', 'usersession', 'categorycode', 'eventtype', 'eventtime']
validation failed


# Write File in gz format

In [2]:
import csv
import gzip

In [5]:
df = dd.read_csv('/content/drive/My Drive/2019-Oct.csv')

In [19]:
# Write csv in gz format in pipe separated text file (|)
df.to_csv('2019-Oct.csv.gz', sep='|', header=True, index=False, compression='gzip')


  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)


['/content/2019-Oct.csv.gz/00.part',
 '/content/2019-Oct.csv.gz/01.part',
 '/content/2019-Oct.csv.gz/02.part',
 '/content/2019-Oct.csv.gz/03.part',
 '/content/2019-Oct.csv.gz/04.part',
 '/content/2019-Oct.csv.gz/05.part',
 '/content/2019-Oct.csv.gz/06.part',
 '/content/2019-Oct.csv.gz/07.part',
 '/content/2019-Oct.csv.gz/08.part',
 '/content/2019-Oct.csv.gz/09.part',
 '/content/2019-Oct.csv.gz/10.part',
 '/content/2019-Oct.csv.gz/11.part',
 '/content/2019-Oct.csv.gz/12.part',
 '/content/2019-Oct.csv.gz/13.part',
 '/content/2019-Oct.csv.gz/14.part',
 '/content/2019-Oct.csv.gz/15.part',
 '/content/2019-Oct.csv.gz/16.part',
 '/content/2019-Oct.csv.gz/17.part',
 '/content/2019-Oct.csv.gz/18.part',
 '/content/2019-Oct.csv.gz/19.part',
 '/content/2019-Oct.csv.gz/20.part',
 '/content/2019-Oct.csv.gz/21.part',
 '/content/2019-Oct.csv.gz/22.part',
 '/content/2019-Oct.csv.gz/23.part',
 '/content/2019-Oct.csv.gz/24.part',
 '/content/2019-Oct.csv.gz/25.part',
 '/content/2019-Oct.csv.gz/26.part',
 

In [17]:
# Display the summary of the Pipe seperated gz file
import os
import io
size = os.path.getsize('2019-Oct.csv.gz')
rows = len(df)
columns = df.shape[1]
print(f'The summary of the pipe seperated gz file is as below \
      \n Size: {size} \
      \n Number of Rows: {rows} \
      \n Number of Columns: {columns}')

The summary of the pipe seperated gz file is as below       
 Size: 4096       
 Number of Rows: 42448764       
 Number of Columns: 9
