In [8]:
pip install --upgrade pandas "dask[complete]"

Collecting dask[complete]
  Downloading dask-2023.12.1-py3-none-any.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 6.9 MB/s eta 0:00:01
Collecting click>=8.1
  Using cached click-8.1.7-py3-none-any.whl (97 kB)
Collecting importlib-metadata>=4.13.0
  Downloading importlib_metadata-7.0.0-py3-none-any.whl (23 kB)
Collecting pyarrow>=7.0
  Downloading pyarrow-14.0.1-cp39-cp39-macosx_10_14_x86_64.whl (26.9 MB)
[K     |████████████████████████████████| 26.9 MB 54.2 MB/s eta 0:00:01
[?25hCollecting lz4>=4.3.2
  Downloading lz4-4.3.2-cp39-cp39-macosx_10_9_x86_64.whl (254 kB)
[K     |████████████████████████████████| 254 kB 50.5 MB/s eta 0:00:01
[?25hCollecting pyarrow-hotfix
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl (7.9 kB)
Collecting distributed==2023.12.1
  Downloading distributed-2023.12.1-py3-none-any.whl (999 kB)
[K     |████████████████████████████████| 999 kB 36.0 MB/s eta 0:00:01
Collecting locket
  Downloading locket-1.0.0-py2.py3-none-any.whl (4.4 kB)
Co

In [1]:
import os
import time

In [2]:
#Size of the file
os.path.getsize('/Users/richa/Downloads/2019-Nov.csv')

9006762395

## Read data with Dask

In [3]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('/Users/richa/Downloads/2019-Nov.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")



Read csv with dask:  0.012286901473999023 sec


## Read data with Pandas

In [4]:
import pandas as pd
start = time.time()
df = pd.read_csv('/Users/richa/Downloads/2019-Nov.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  82.80315399169922 sec


### Here Dask is better than Pandas

In [5]:
from dask import dataframe as dd
df = dd.read_csv('/Users/richa/Downloads/2019-Nov.csv',delimiter=',')

In [6]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 9 entries, event_time to user_session
dtypes: float64(1), int64(3), string(5)

In [7]:
#No. of Rows
len(df.index)

67501979

In [8]:
#No, of Columns
len(df.columns)

9

In [9]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

In [10]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [11]:
data=df.columns
data

Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')

### Validation

In [12]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [13]:
!pip install ruamel-yaml



In [14]:
import ruamel.yaml as yam
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

In [15]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: 2019-Nov
table_name: nov
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - event_time
    - product_id
    - category_id
    - category_code
    - brand
    - price
    - user_id
    - user_session



Overwriting store.yaml


In [16]:
!pip3 install PyYAML



In [17]:
# Reading config file
import yaml
import utility as util
config_data = read_config_file('/Users/richa/Downloads/store.yaml')

In [18]:
#data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': '2019-Nov',
 'table_name': 'nov',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['event_time',
  'product_id',
  'category_id',
  'category_code',
  'brand',
  'price',
  'user_id',
  'user_session']}

In [19]:
# Reading process of the file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv('/Users/richa/Downloads/2019-Nov.csv',delimiter=',')
df_sample.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
1,2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2,2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
3,2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
4,2019-11-01 00:00:01 UTC,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2


In [20]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = "/Users/richa/Downloads/" + config_data['file_name'] + f'.{file_type}'

In [21]:
import pandas as pd
df = pd.read_csv('/Users/richa/Downloads/2019-Nov.csv',delimiter=config_data['inbound_delimiter'])
df.head()

Unnamed: 0,event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
0,2019-11-01 00:00:00 UTC,view,1003461,2053013555631882655,electronics.smartphone,xiaomi,489.07,520088904,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33
1,2019-11-01 00:00:00 UTC,view,5000088,2053013566100866035,appliances.sewing_machine,janome,293.65,530496790,8e5f4f83-366c-4f70-860e-ca7417414283
2,2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387
3,2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f
4,2019-11-01 00:00:01 UTC,view,1004775,2053013555631882655,electronics.smartphone,xiaomi,183.27,558856683,313628f1-68b8-460d-84f6-cec7a8796ef2


In [22]:
#validating the header of the file
col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['event_type']
Following YAML columns are not in the file uploaded []


0

In [23]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['event_time', 'event_type', 'product_id', 'category_id',
       'category_code', 'brand', 'price', 'user_id', 'user_session'],
      dtype='object')
columns of YAML are: ['event_time', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']


In [24]:
if col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['event_type']
Following YAML columns are not in the file uploaded []
validation failed


In [29]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('/Users/richa/Downloads/2019-Nov.csv',delimiter=',')
df.to_csv('2019-Nov.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True)

['/Users/richa/Downloads/2019-Nov.csv.gz/000.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/001.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/002.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/003.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/004.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/005.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/006.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/007.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/008.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/009.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/010.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/011.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/012.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/013.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/014.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/015.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/016.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/017.part',
 '/Users/richa/Downloads/2019-Nov.csv.gz/018.part',
 '/Users/ric

In [30]:
#number of files in gz format folder
import os
entries = os.listdir('/Users/richa/Downloads/2019-Nov.csv.gz')
for entry in entries:
    print(entry)

083.part
129.part
095.part
113.part
056.part
001.part
017.part
105.part
040.part
037.part
060.part
125.part
076.part
099.part
133.part
021.part
109.part
108.part
020.part
077.part
098.part
132.part
061.part
124.part
036.part
104.part
041.part
016.part
000.part
112.part
057.part
094.part
082.part
128.part
031.part
066.part
123.part
089.part
070.part
135.part
027.part
119.part
085.part
139.part
093.part
115.part
050.part
007.part
011.part
103.part
046.part
102.part
047.part
010.part
006.part
114.part
051.part
138.part
092.part
084.part
118.part
026.part
071.part
134.part
067.part
122.part
088.part
030.part
048.part
009.part
025.part
137.part
072.part
121.part
064.part
033.part
044.part
101.part
013.part
005.part
052.part
117.part
029.part
091.part
087.part
068.part
086.part
069.part
090.part
028.part
053.part
116.part
004.part
012.part
045.part
100.part
032.part
120.part
065.part
136.part
073.part
024.part
008.part
049.part
042.part
107.part
015.part
003.part
054.part
111.part
097.part
0

In [31]:
#size of the gz format folder
os.path.getsize('/Users/richa/Downloads/2019-Nov.csv.gz')

4544