In [4]:
!pip install "dask[complete]"



# Week 6 assigment

Dataset used can be found at https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store

This data base contains behavior data for 2 months (October 2019 and November 2019) from a large multi-category online store.

Each line in the file represents one event. All events are related to products and users. Each event is like a many-to-many relationship between products and users.

Due to the size of the base, we will analyze in this notebook only the month of October.


Data Description

    event_time: Time when the event happened (in UTC).
    event_type: Represents the type of event.
    product_id: ID of a product
    category_id: Product category ID
    category_code: Taxonomy of the product category (codename), if possible. Usually present for meaningful categories and ignored for different types of accessories.
    brand: String of characters with the brand name.
    price: Floating price of a product. Present.
    user_id: Permanent user ID.
    user_session: Temporary user session ID. The same for each user session. It is changed every time the user returns to the online store after a long break.

Events can be:

    view: a user viewed a product
    cart: a user added a product to the cart
    remove_from_cart: a user removed a product from the shopping cart
    purchase: a user bought a product.



In [5]:
import dask
import dask.dataframe as dd

In [6]:
import os
import gc
import time
import pandas as pd

In [7]:
begin = time.time()
df_pandas = pd.read_csv("/content/drive/MyDrive/Data Glacier/Datasets/2019-Nov.csv")

end = time.time()
pandas_time = end - begin
# total time taken
print(f"Total runtime of the program is {end - begin}")

Total runtime of the program is 227.8663010597229


In [8]:
%%time
df_pandas.head()

CPU times: user 339 µs, sys: 84 µs, total: 423 µs
Wall time: 428 µs


Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
1,2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2,2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
3,2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
4,2019-11-01 00:00:01 UTC,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2


In [9]:
pandas_mem = df_pandas.memory_usage().sum()/1024**2
print(f"Memory usage by pandas: {pandas_mem:.2f} MB")

Memory usage by pandas: 4634.99 MB


In [10]:
%%time
df_pandas.isnull().sum()

CPU times: user 14.2 s, sys: 1.51 s, total: 15.7 s
Wall time: 15.8 s


event_time              0
event_type              0
product_id              0
category_id             0
category_code    21898171
brand             9224078
price                   0
user_id                 0
user_session           10
dtype: int64

In [11]:
del df_pandas
gc.collect()

208

In [12]:
begin = time.time()
% time df_dask = dd.read_csv("/content/drive/MyDrive/Data Glacier/Datasets/2019-Nov.csv")
end = time.time()

dask_time = end - begin
# total time taken
print(f"Total runtime of the program is {end - begin}")

CPU times: user 86.3 ms, sys: 176 ms, total: 263 ms
Wall time: 279 ms
Total runtime of the program is 0.28266167640686035


In [13]:
%time
df_dask.head()

CPU times: user 6 µs, sys: 1e+03 ns, total: 7 µs
Wall time: 11.2 µs


Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
1,2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2,2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
3,2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
4,2019-11-01 00:00:01 UTC,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2


In [14]:
dask_mem = df_dask.memory_usage().sum()/1024**2
print(f"Memory usage by dask: {dask_mem.compute():.2f} MB")

Memory usage by dask: 4635.01 MB


In [15]:
%%time
df_dask.isnull().sum().compute()

CPU times: user 2min 42s, sys: 11.8 s, total: 2min 53s
Wall time: 3min 3s


event_time              0
event_type              0
product_id              0
category_id             0
category_code    21898171
brand             9224078
price                   0
user_id                 0
user_session           10
dtype: int64

In [16]:
del df_dask
gc.collect()

52

## Data ingestion task

In [17]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


Writting YAML file

event_time: 
event_type: 
product_id: 
category_id:
category_code: 
brand: 
price: 
user_id: 
user_session: 

In [26]:
%%writefile file.yaml
file_type: csv
data_directory: /content/drive/MyDrive/Data Glacier/Datasets/
dataset_name: test
file_name: 2019-Nov
inbound_delimiter: ","
outbound_delimiter: "|"
columns_names: ['event_time','event_type','product_id','category_id','category_code','brand','price','user_id', 'user_session']
drop_columns: ['category_code', 'brand']
data_type:
  event_time: datetime64[ns]
  event_type: object
  product_id: int16
  category_id: int16
  category_code: object
  brand: object
  price: float32
  user_id: int16
  user_session: object
columns: 
    - event_time
    - event_type
    - product_id
    - category_id
    - category_code
    - brand
    - price
    - user_id
    - user_session
clean_data: data_clean.csv

Overwriting file.yaml


In [27]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [31]:
config_data

{'clean_data': 'data_clean.csv',
 'columns': ['event_time',
  'event_type',
  'product_id',
  'category_id',
  'category_code',
  'brand',
  'price',
  'user_id',
  'user_session'],
 'columns_names': ['event_time',
  'event_type',
  'product_id',
  'category_id',
  'category_code',
  'brand',
  'price',
  'user_id',
  'user_session'],
 'data_directory': '/content/drive/MyDrive/Data Glacier/Datasets/',
 'data_type': {'brand': 'object',
  'category_code': 'object',
  'category_id': 'int16',
  'event_time': 'datetime64[ns]',
  'event_type': 'object',
  'price': 'float32',
  'product_id': 'int16',
  'user_id': 'int16',
  'user_session': 'object'},
 'dataset_name': 'test',
 'drop_columns': ['category_code', 'brand'],
 'file_name': '2019-Nov',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|'}

In [28]:
# read the file using config file
filepath = "/content/drive/MyDrive/Data Glacier/Datasets/2019-Nov.csv"
file_type = config_data['file_type']
source_file = "/content/drive/MyDrive/Data Glacier/Datasets/" + config_data['file_name'] + f'.{file_type}'


In [29]:
df = dd.read_csv(source_file)
df

Unnamed: 0_level_0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
npartitions=141,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,object,object,int64,int64,object,object,float64,int64,object
,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...


In [35]:
#Change column names
df.columns= config_data["columns_names"]

In [43]:
print("columns of files are:" ,df.columns.values)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: ['event_time' 'event_type' 'product_id' 'category_id' 'category_code'
 'brand' 'price' 'user_id' 'user_session']
columns of YAML are: ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


In [47]:
type(df)

dask.dataframe.core.DataFrame

In [None]:
#validate the header of the file
#util.col_header_val(df.compute(),config_data) THIS code was not able to run because ram problems

In [39]:
#Reduce Memory usage by casting cols dtypes
df['event_time'] = df['event_time'].astype(config_data['data_type']['event_time'])
df['event_type'] = df['event_type'].astype(config_data['data_type']['event_type'])
df['product_id'] = df['product_id'].astype(config_data['data_type']['product_id'])
df['category_id'] = df['category_id'].astype(config_data['data_type']['category_id'])
df['category_code'] = df['category_code'].astype(config_data['data_type']['category_code'])
df['brand'] = df['brand'].astype(config_data['data_type']['brand'])
df['price'] = df['price'].astype(config_data['data_type']['price'])
df['user_id'] = df['user_id'].astype(config_data['data_type']['user_id'])
df['user_session'] = df['user_session'].astype(config_data['data_type']['user_session'])


In [40]:
df

Unnamed: 0_level_0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
npartitions=141,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,datetime64[ns],object,int16,int16,object,object,float32,int16,object
,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...


In [41]:
#Drop missing values
%time df= df.dropna()

#Save clean data in the repository
df.to_parquet('df.parquet.gzip', compression='gzip')

CPU times: user 11.8 ms, sys: 970 µs, total: 12.8 ms
Wall time: 24.6 ms
