In [None]:
#Creating a Dummy set of 3 columns and 2 gb size
import csv
import random
import os

# Define the path and filename for the output CSV file
filename = "dummy_dataset.csv"
file_size = 2 * 1024 * 1024 * 1024 # 2GB in bytes
row_size = 20 # estimated size of each row in bytes

# Define the headers for the CSV file
headers = ["age", "salary", "distance"]

# Define the range of values for each column
age_range = (18, 70)
salary_range = (20000, 100000)
distance_range = (0.1, 100)

# Calculate the approximate number of rows needed to reach the target file size
num_rows = file_size // row_size

# Generate the random data and write it to the CSV file
with open(filename, mode="w", newline="") as csv_file:
    writer = csv.writer(csv_file, delimiter=",")
    
    # Write the headers to the first row of the CSV file
    writer.writerow(headers)
    
    # Generate random data and write to the CSV file
    for i in range(num_rows):
        age = random.randint(age_range[0], age_range[1])
        salary = random.randint(salary_range[0], salary_range[1])
        distance = round(random.uniform(distance_range[0], distance_range[1]), 2)
        
        row = [age, salary, distance]
        writer.writerow(row)
        
        # Check the size of the file after writing each row
        if os.path.getsize(filename) > file_size:
            break


In [3]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [4]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: dummy_dataset
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - age
    - salary
    - distance

Overwriting file.yaml


In [5]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [6]:
config_data['inbound_delimiter']

','

In [7]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'dummy_dataset',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['age', 'salary', 'distance']}

In [13]:
# import pandas as pd
import dask.dataframe as dd
import modin.pandas as mo

# Reading the file using Pandas
%timeit pd.read_csv('dummy_dataset.csv')

# Reading the file using Dask
%timeit dd.read_csv('dummy_dataset.csv')

# Reading the file using Modin
%timeit mo.read_csv('dummy_dataset.csv')


21.8 s ± 1.86 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
6.65 ms ± 593 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
1min 29s ± 12.6 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
# For pandas, it took 22.2 seconds per loop on average to read the dataset.
# For Dask, it took 6.06 milliseconds per loop on average to read the dataset.
# For Modin, it took 1 minute 28 seconds per loop on average to read the dataset.

In [14]:
#Summary of dataset
import os
import pandas as pd

filename = 'dummy_dataset.csv'

# Get file size in bytes
file_size = os.path.getsize(filename)

# Read file into a pandas dataframe
df = pd.read_csv(filename)

# Get number of rows and columns
num_rows = len(df)
num_cols = len(df.columns)

# Print summary
print("File summary:")
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")
print(f"File size: {file_size} bytes")


File summary:
Number of rows: 107374182
Number of columns: 3
File size: 1696618253 bytes


In [8]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv("dummy_dataset.csv",delimiter=',')
df_sample.head()

Unnamed: 0,age,salary,distance
0,39,52975,72.04
1,20,97345,7.02
2,34,44695,40.5
3,58,71890,0.62
4,64,35986,56.88


In [9]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0,age,salary,distance
0,39,52975,72.04
1,20,97345,7.02
2,34,44695,40.5
3,58,71890,0.62
4,64,35986,56.88


In [10]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [11]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['age', 'salary', 'distance'], dtype='object')
columns of YAML are: ['age', 'salary', 'distance']


In [12]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [None]:
df = pd.read_csv("dummy_dataset.csv")

# Write the DataFrame to a pipe-separated text file in gz format
df.to_csv("dataset.psv.gz", sep="|", compression="gzip", index=False)