insert heading here

CSV is 'customers-11500000.csv' created using code from https://github.com/datablist/sample-csv-files.git

In [None]:
#writefile testutilty.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char): 
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string


def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing YAML File

In [None]:
# writefile file.yaml
file_type: csv
dataset_name: customers-11500000 
file_name: customers-11500000 
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Index
    - Customer_Id
    - First_Name
    - Last_Name
    - Company
    - City
    - Country
    - Phone_1
    - Phone_2
    - Email
    - Subscription_Date
    - Website 

In [None]:
# read config file
import testutility as util
import datetime
import time
config_data=util.read_config_file('file.yaml')

In [None]:
config_data

In [None]:
#normal processing of the file
start=datetime.datetime.now()
import pandas as pd
df_sample=pd.read_csv("customers-11500000.csv",delimiter=',')
df_sample.head()

end=datetime.datetime.now()
elapsed=end-start
print('run time: ',elapsed)

Pandas readcsv run time: 0:00:59.715164 

In [None]:
# read the file using config file
start=datetime.datetime.now()

file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter']) 
df.head()

end=datetime.datetime.now()
elapsed=end-start
print('run time: ',elapsed)

Pandas readcsv with YAML config run time: 0:00:48.311845

In [None]:
#validate the headers of the file
util.col_header_val(df,config_data)

column name and column length validation passed

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['index', 'customer_id', 'first_name', 'last_name', 'company', 'city',
       'country', 'phone_1', 'phone_2', 'email', 'subscription_date',
       'website'],
      dtype='object')
columns of YAML are: ['Index', 'Customer_Id', 'First_Name', 'Last_Name', 'Company', 'City', 'Country', 'Phone_1', 'Phone_2', 'Email', 'Subscription_Date', 'Website']

In [None]:
#converting file with pipeline delimiter
start=datetime.datetime.now()
import csv
with open('customers-11500000.csv') as fin:
    with open('OutputFile.txt','w',newline='') as fout:
        reader=csv.DictReader(fin, delimiter=',')
        writer=csv.DictWriter(fout,reader.fieldnames,delimiter='|')
        writer.writeheader()
        writer.writerows(reader)
end=datetime.datetime.now()
elapsed=end-start
print('run time: ',elapsed)

Run time for converting to '|' delimiter: 0:02:12.546689

In [None]:
#convert output.txt to the gz format type
import gzip
start=datetime.datetime.now()
with open('OutputFile.txt','rb') as orig_file:
    with gzip.open('OutputFile.txt.gz','wb') as zipped_file:
        zipped_file.writelines(orig_file)
end=datetime.datetime.now()
elapsed=end-start
print('run time: ',elapsed)

Run time to convert the output to gz format: 0:02:02.217650

Comparing read times with other methods as a measure of computational efficiency

In [None]:
#Ray:
start=datetime.datetime.now()
import ray
ds=ray.data.read_csv('customers-11500000.csv')
ds.head()

end=datetime.datetime.now()
elapsed=end-start
print('run time: ',elapsed)

2023-02-09 17:14:50,949 INFO worker.py:1538 -- Started a local Ray instance.
2023-02-09 17:14:52,064 WARNING read_api.py:315 -- ⚠️  The blocks of this dataset are estimated to be 3.0x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
run time:  0:07:33.805030


NOTE: ran it on file with more cols/fewer rows and ran much much much quicker despite same file size (run time: 0:00:33.144028)
OTHER NOTE: repartition(n) was unsuccessful

In [None]:
#Dask:
import dask
import dask.dataframe as dd
start=datetime.datetime.now()
print('run time: ',elapsed)
df=dd.read_csv('customers-11500000.csv')
df.head()
end=datetime.datetime.now()
elapsed=end-start

Dask run time: 0:00:05.342901

Summary:


In [None]:
num_rows = len(df_sample.index)
num_cols = len(df_sample.columns)
file_size = '2.02 GB'
print('Number of rows: ',num_rows)
print('Number of columns: ',num_cols)
print('File size: ',file_size)

Number of rows:  11500000
Number of columns:  12
File size:  2.02 GB