In [33]:
import os
import time

In [34]:
#Size of the file
os.path.getsize('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv')

78199

In [35]:
#Read data with Dask
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv')
end = time.time()
print("Time to read CSV using Dask: ",(end-start),"sec")

Time to read CSV using Dask:  0.010307788848876953 sec


In [36]:
#Normal reading process of the file
import pandas as pd
start = time.time()
df_sample = pd.read_csv("/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv",delimiter=',')
end = time.time()
print("Read csv with panda: ",(end-start),"sec")
#df_sample.head(10)

Read csv with panda:  0.0027692317962646484 sec


In [37]:
#Modin and Ray reading process
import os

os.environ["MODIN_ENGINE"] = "ray"  # Modin will use Ray

import modin.pandas as pd
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv')
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

2022-09-12 20:20:58,181	INFO worker.py:1518 -- Started a local Ray instance.


Read csv with modin and ray:  0.6052107810974121 sec


I will use dask for the final file since it's speed is much much faster than pandas and Modin and Ray reading process

In [38]:
from dask import dataframe as dd
df = dd.read_csv('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv')

In [39]:
len(df.index)

1462

In [40]:
df.columns

Index(['date', 'meantemp', 'humidity', 'wind_speed', 'meanpressure'], dtype='object')

data Validation

In [41]:
#YAML
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: test
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - date
    - meantemp
    - humidity
    - wind_speed
    - meanpressure

Overwriting file.yaml


In [42]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [43]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [44]:
config_data['inbound_delimiter']

','

In [57]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'test',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['date', 'meantemp', 'humidity', 'wind_speed', 'meanpressure']}

In [47]:
# Normal reading process of the file
import dask.dataframe as dd
df_sample = dd.read_csv("/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv",delimiter=',')
df_sample.head()

Unnamed: 0,date,meantemp,humidity,wind_speed,meanpressure
0,2013-01-01,10.0,84.5,0.0,1015.666667
1,2013-01-02,7.4,92.0,2.98,1017.8
2,2013-01-03,7.166667,87.0,4.633333,1018.666667
3,2013-01-04,8.666667,71.333333,1.233333,1017.166667
4,2013-01-05,6.0,86.833333,3.7,1016.5


In [59]:
# read the file using config file
file_type = config_data['file_type']
source_file = ('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv')
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,date,meantemp,humidity,wind_speed,meanpressure
0,2013-01-01,10.0,84.5,0.0,1015.666667
1,2013-01-02,7.4,92.0,2.98,1017.8
2,2013-01-03,7.166667,87.0,4.633333,1018.666667
3,2013-01-04,8.666667,71.333333,1.233333,1017.166667
4,2013-01-05,6.0,86.833333,3.7,1016.5


In [60]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [61]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['date', 'meantemp', 'humidity', 'wind_speed', 'meanpressure'], dtype='object')
columns of YAML are: ['date', 'meantemp', 'humidity', 'wind_speed', 'meanpressure']


In [62]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation passed
col validation passed


In [64]:
# Write our csv file in gz format (pipe separated "|")
import csv
import datetime
import gzip

from dask import dataframe as dd
df = dd.read_csv('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv',delimiter=',')

df.to_csv('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv.gz/0.part']

In [65]:
#number of files in gz format folder
import os
entries = os.listdir('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv.gz/')
for entry in entries:
    print(entry)

0.part


In [66]:
#size of the gz format folder
os.path.getsize('/Users/yangzongkun/Desktop/DailyDelhiClimateTrain.csv.gz')

96