In [1]:
import os
import time

In [2]:
#read the size of the file
os.path.getsize('Walmart_Store_sales.csv')

363732

In [3]:
#read the data with pandas
import pandas as pd
start = time.time()
df = pd.read_csv('Walmart_Store_sales.csv')
end = time.time()
print("Read file with pandas: ",(end-start),"sec")

Read file with pandas:  0.05054163932800293 sec


In [4]:
#read the data with Dask
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('Walmart_Store_sales.csv')
end = time.time()
print("Read file with dask: ",(end-start),"sec")

Read file with dask:  0.01564502716064453 sec


In [7]:
#read the data with modin and ray
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('Walmart_Store_sales.csv')
end = time.time()
print("Read file with modin and ray: ",(end-start),"sec")

Read file with modin and ray:  1.09419846534729 sec


In [11]:
from dask import dataframe as dd
df = dd.read_csv('C:/Users/User/Desktop/data ingestion/Walmart_Store_sales.csv',delimiter=',')

In [12]:
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 8 entries, Store to Unemployment
dtypes: object(1), float64(5), int64(2)

In [14]:
#No of rows and coumns
len(df.index)


6435

In [15]:
len(df.columns)

8

In [16]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')



In [18]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [19]:
data=df.columns
data

Index(['Store', 'Date', 'Weekly_Sales', 'Holiday_Flag', 'Temperature',
       'Fuel_Price', 'CPI', 'Unemployment'],
      dtype='object')

In [20]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [21]:
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

In [1]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: Rate
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Store
      - Date
      - Weekly_Sales
      - Holiday_Flag
      - Temperature
      - Fuel_Price
      - CPI
      - Unemployment
     

Writing store.yaml


In [7]:
from dask import dataframe as dd
df_sample = dd.read_csv('C:/Users/User/Desktop/data ingestion/Walmart_Store_sales.csv',delimiter=',')
df_sample.head()

Unnamed: 0,Store,Date,Weekly_Sales,Holiday_Flag,Temperature,Fuel_Price,CPI,Unemployment
0,1,05-02-2010,1643690.9,0,42.31,2.572,211.096358,8.106
1,1,12-02-2010,1641957.44,1,38.51,2.548,211.24217,8.106
2,1,19-02-2010,1611968.17,0,39.93,2.514,211.289143,8.106
3,1,26-02-2010,1409727.59,0,46.63,2.561,211.319643,8.106
4,1,05-03-2010,1554806.68,0,46.5,2.625,211.350143,8.106


In [11]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('C:/Users/User/Desktop/data ingestion/Walmart_Store_sales.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('Walmart_Store_sales.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:/Users/User/Desktop/data ingestion/Walmart_Store_sales.csv.gz\\0.part']

In [12]:
#number of files in gz format folder
import os
entries = os.listdir('C:/Users/User/Desktop/data ingestion/Walmart_Store_sales.csv.gz/')
for entry in entries:
    print(entry)

0.part


In [13]:
os.path.getsize('C:/Users/User/Desktop/data ingestion/Walmart_Store_sales.csv.gz')

0