## write utility file

In [1]:
%%writefile utility.py
import os
import yaml
import pandas as pd
import logging
import datetime 
import gc
import re

# file read
def read_config(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)
            

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string


# validate the column header
def column_header_val(df, config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(), config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    if len(df.columns) >= len(expected_col) and all(element in df.columns for element in expected_col):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting utility.py


## write YAML file

In [2]:
%%writefile config.yaml
file_type : csv
dataset_name : test
file_name : test_set
table_name : test
inbound_delimiter : ","
outbound_delimiter : "|"
skip_leading_rows : 1
gz_file : 'data.csv.gz'
gz_delimiter : '|'
columns : 
    - passband
    - flux
    - flux_err

Overwriting config.yaml


In [3]:
import utility as util
config = util.read_config('config.yaml')
config

{'file_type': 'csv',
 'dataset_name': 'test',
 'file_name': 'test_set',
 'table_name': 'test',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'gz_file': 'data.csv.gz',
 'gz_delimiter': '|',
 'columns': ['passband', 'flux', 'flux_err']}

In [4]:
ft = config['file_type']
source_file = "./" + config['file_name'] + f'.{ft}'

## read file

### pandas

In [7]:
# read file using pandas
import pandas as pd
import time
start = time.time()
df = pd.read_csv(source_file,config['inbound_delimiter'])
print("Time to read with modin: {} seconds".format(round(time.time() - start, 3)))
df.head()

  df = pd.read_csv(source_file,config['inbound_delimiter'])


Time to read with modin: 19.563 seconds


Unnamed: 0,object_id,mjd,passband,flux,flux_err,detected
0,104853940,59788.4007,3,-11.29186,5.06226,0
1,104853940,59788.4358,4,8.834054,9.886057,0
2,104853940,59789.4023,2,-4.987594,2.069673,0
3,104853940,59790.363,1,-2.552541,1.805415,0
4,104853940,59790.4299,4,-17.382816,10.41285,0


### dask

In [8]:
# read file using dask
import dask.dataframe as dd
import time
start = time.time()
df = dd.read_csv(source_file)
print("Time to read with dask: {} seconds".format(round(time.time() - start, 3)))
df.head()

Time to read with dask: 0.022 seconds


Unnamed: 0,object_id,mjd,passband,flux,flux_err,detected
0,104853940,59788.4007,3,-11.29186,5.06226,0
1,104853940,59788.4358,4,8.834054,9.886057,0
2,104853940,59789.4023,2,-4.987594,2.069673,0
3,104853940,59790.363,1,-2.552541,1.805415,0
4,104853940,59790.4299,4,-17.382816,10.41285,0


### modin ray

In [9]:
# # read file using modin ray
import modin.pandas as pd
import time
import ray
start = time.time()
ray.shutdown()
ray.init()
df = pd.read_csv(source_file)
print("Time to read with modin: {} seconds".format(round(time.time() - start, 3)))
df.head()

2023-06-12 16:26:21,187	INFO worker.py:1509 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


Time to read with modin: 22.675 seconds


Unnamed: 0,object_id,mjd,passband,flux,flux_err,detected
0,104853940,59788.4007,3,-11.29186,5.06226,0
1,104853940,59788.4358,4,8.834054,9.886057,0
2,104853940,59789.4023,2,-4.987594,2.069673,0
3,104853940,59790.363,1,-2.552541,1.805415,0
4,104853940,59790.4299,4,-17.382816,10.41285,0


#### Dask appears to have the fastest file reading performance, taking only 0.022 seconds, whereas Pandas takes 19.5 seconds and Modin takes 22.67 seconds.

In [10]:
util.column_header_val(df, config)

column name and column length validation passed


1

In [11]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config['columns'])

columns of files are: Index(['object_id', 'mjd', 'passband', 'flux', 'flux_err', 'detected'], dtype='object')
columns of YAML are: ['passband', 'flux', 'flux_err']


In [12]:
# validate the columns
if util.column_header_val(df,config)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation passed
col validation passed


In [16]:
import gzip
import csv
# save the file in gz format pipe seperated
with gzip.open(config['gz_file'], 'wt', encoding='utf-8') as file:
        df.to_csv(file, sep=config['gz_delimiter'], index=False, quoting=csv.QUOTE_NONE)



In [19]:
# open the file in gz format pipe seperated

with gzip.open(config['gz_file'], 'rt', encoding='utf-8') as file:
    df = pd.read_csv(file.name, sep=config['gz_delimiter'])

In [20]:
df.head()

Unnamed: 0,object_id,mjd,passband,flux,flux_err,detected
0,104853940,59788.4007,3,-11.29186,5.06226,0
1,104853940,59788.4358,4,8.834054,9.886057,0
2,104853940,59789.4023,2,-4.987594,2.069673,0
3,104853940,59790.363,1,-2.552541,1.805415,0
4,104853940,59790.4299,4,-17.382816,10.41285,0


In [21]:
import os

#size of the gz format folder
os.path.getsize(config['gz_file'])

703499799