In [1]:
import pandas as pd 
from datetime import datetime
import os
import dask.dataframe as dd
import modin.pandas as mp
import ray
import logging
import subprocess
import yaml
import gc
import re
import testutility as util
import gzip
import csv

  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)


***Different approaches to read the file***

In [2]:
# use pandas to read the file
start_time = datetime.now()
df_pd=pd.read_csv('credit_card_transactions-ibm_v2.csv')
end_time = datetime.now()
result = end_time - start_time
print(result)

0:00:33.137170


In [3]:
# use dask to read the file
start_time = datetime.now()
df_dd=dd.read_csv('credit_card_transactions-ibm_v2.csv')
end_time = datetime.now()
result = end_time - start_time
print(result)

0:00:00.030113


In [4]:
# use modin to read the file
start_time = datetime.now()
df_md=mp.read_csv('credit_card_transactions-ibm_v2.csv')
end_time = datetime.now()
result = end_time - start_time
print(result)


    import ray
    ray.init()



0:00:37.121915


In [5]:
# use ray to read the file
start_time = datetime.now()
df_ray=ray.data.read_csv('credit_card_transactions-ibm_v2.csv')
end_time = datetime.now()
result = end_time - start_time
print(result)

[2m[36m(raylet)[0m Spilled 2429 MiB, 8 objects, write throughput 219 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


0:00:42.642238


***Basic Validation and write the YAML file***

In [6]:
df_pd.columns

Index(['User', 'Card', 'Year', 'Month', 'Day', 'Time', 'Amount', 'Use Chip',
       'Merchant Name', 'Merchant City', 'Merchant State', 'Zip', 'MCC',
       'Errors?', 'Is Fraud?'],
      dtype='object')

In [7]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: credit_card_transactions-ibm_v2
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - user
    - card
    - year
    - month
    - day
    - time
    - amount
    - use_chip
    - merchant_name
    - merchant_city
    - merchant_state
    - zip
    - mcc
    - errors
    - is_fraud

Overwriting file.yaml


In [8]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [9]:
config_data = util.read_config_file("file.yaml")
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'credit_card_transactions-ibm_v2',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['user',
  'card',
  'year',
  'month',
  'day',
  'time',
  'amount',
  'use_chip',
  'merchant_name',
  'merchant_city',
  'merchant_state',
  'zip',
  'mcc',
  'errors',
  'is_fraud']}

In [10]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0,User,Card,Year,Month,Day,Time,Amount,Use Chip,Merchant Name,Merchant City,Merchant State,Zip,MCC,Errors?,Is Fraud?
0,0,0,2002,9,1,06:21,$134.09,Swipe Transaction,3527213246127876953,La Verne,CA,91750.0,5300,,No
1,0,0,2002,9,1,06:42,$38.48,Swipe Transaction,-727612092139916043,Monterey Park,CA,91754.0,5411,,No
2,0,0,2002,9,2,06:22,$120.34,Swipe Transaction,-727612092139916043,Monterey Park,CA,91754.0,5411,,No
3,0,0,2002,9,2,17:45,$128.95,Swipe Transaction,3414527459579106770,Monterey Park,CA,91754.0,5651,,No
4,0,0,2002,9,3,06:23,$104.71,Swipe Transaction,5817218446178736267,La Verne,CA,91750.0,5912,,No


In [11]:
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [12]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['user', 'card', 'year', 'month', 'day', 'time', 'amount', 'use_chip',
       'merchant_name', 'merchant_city', 'merchant_state', 'zip', 'mcc',
       'errors', 'is_fraud'],
      dtype='object')
columns of YAML are: ['user', 'card', 'year', 'month', 'day', 'time', 'amount', 'use_chip', 'merchant_name', 'merchant_city', 'merchant_state', 'zip', 'mcc', 'errors', 'is_fraud']


***Write the file in gz format***

In [13]:
with open('credit_card_transactions-ibm_v2.csv') as fin:
    with open('OutputFile.txt', 'w', newline='') as fout:
        reader = csv.DictReader(fin, delimiter=',')
        writer = csv.DictWriter(fout, reader.fieldnames, delimiter='|')
        writer.writeheader()
        writer.writerows(reader)

In [14]:
# write the file in gz format
f_in = open('OutputFile.txt','rb')
f_out = gzip.open('OutputFile.txt.gz', 'wb')
f_out.writelines(f_in)
f_out.close()
f_in.close()

***Summary of the file***

In [15]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation passed
col validation passed


In [16]:
length_of_col = len(df_pd.columns)
length_of_row = df_pd.count()[0]
file_size = os.path.getsize('credit_card_transactions-ibm_v2.csv')/(1024*1024*1024)

In [17]:
# summarize
print("Total number of rows :", length_of_row)
print("Total number of columns :", length_of_col)
print("File Size :", round(file_size,2), "GB")

Total number of rows : 24386900
Total number of columns : 15
File Size : 2.19 GB
