In [2]:
%%writefile utilities.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utilities.py


### Write YAML file

In [16]:
%%writefile file.yaml
file_type: csv
dataset_name: creditcard_data
file_name: creditcard_data
table_name: creditcard_data
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - LIMIT_BAL
    - SEX
    - EDUCATION
    - MARRIAGE
    - AGE
    - PAY_0
    - PAY_2
    - PAY_3
    - PAY_4
    - PAY_5
    - PAY_6
    - BILL_AMT1
    - BILL_AMT2
    - BILL_AMT3
    - BILL_AMT4
    - BILL_AMT5
    - BILL_AMT6
    - PAY_AMT1
    - PAY_AMT2
    - PAY_AMT3
    - PAY_AMT4
    - PAY_AMT5
    - PAY_AMT6
    - default_payment_next_month

Overwriting file.yaml


In [17]:
# Read config file
import utilities as util
config_data = util.read_config_file("file.yaml")

In [18]:
config_data

{'file_type': 'csv',
 'dataset_name': 'creditcard_data',
 'file_name': 'creditcard_data',
 'table_name': 'creditcard_data',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['LIMIT_BAL',
  'SEX',
  'EDUCATION',
  'MARRIAGE',
  'AGE',
  'PAY_0',
  'PAY_2',
  'PAY_3',
  'PAY_4',
  'PAY_5',
  'PAY_6',
  'BILL_AMT1',
  'BILL_AMT2',
  'BILL_AMT3',
  'BILL_AMT4',
  'BILL_AMT5',
  'BILL_AMT6',
  'PAY_AMT1',
  'PAY_AMT2',
  'PAY_AMT3',
  'PAY_AMT4',
  'PAY_AMT5',
  'PAY_AMT6',
  'default_payment_next_month']}

In [19]:
import pandas as pd

file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  df = pd.read_csv(source_file,config_data['inbound_delimiter'])


Unnamed: 0,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,default payment next month
0,170000,2,2,1,52,0,0,0,0,0,...,163652,131396,130402,7620,6200,10000,5100,5500,5000,1
1,150000,2,2,1,39,1,-2,-2,-2,-2,...,0,0,0,0,0,0,0,0,0,1
2,120000,1,2,1,40,0,0,0,0,0,...,39510,40873,42082,2000,2000,2000,2000,2000,3000,0
3,270000,2,1,2,28,-1,-1,-1,-1,0,...,7931,3412,1035,845,9896,7977,17,1040,1848,0
4,50000,2,2,1,31,0,0,0,0,0,...,30929,30078,29170,1974,4406,1061,1073,1046,1047,1


In [20]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [21]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['limit_bal', 'sex', 'education', 'marriage', 'age', 'pay_0', 'pay_2',
       'pay_3', 'pay_4', 'pay_5', 'pay_6', 'bill_amt1', 'bill_amt2',
       'bill_amt3', 'bill_amt4', 'bill_amt5', 'bill_amt6', 'pay_amt1',
       'pay_amt2', 'pay_amt3', 'pay_amt4', 'pay_amt5', 'pay_amt6',
       'default_payment_next_month'],
      dtype='object')
columns of YAML are: ['LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6', 'default_payment_next_month']


In [24]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


conver to gz format

In [28]:
import gzip

output_file = 'creditcard_data.txt.gz'
with gzip.open(output_file, 'wt') as f:
    for record in df:
        line = config_data['outbound_delimiter'].join(record) + '\n'
        f.write(line)


Summary

In [40]:
import os
num_rows = df.shape[0]
num_columns = df.shape[1]
file_size = os.path.getsize('creditcard_data.csv')
file_size = file_size / (1024 * 1024)
print(f' Number of rows: {num_rows}')
print(f' Number of columns: {num_columns}')
print(f' File size: {file_size} MB')

 Number of rows: 27000
 Number of columns: 24
 File size: 2.3424062728881836 MB
