# Data Ingestion Code

In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


## create a YAML file

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: complaints
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - date_received
    - product
    - sub_product
    - issue
    - sub_issue
    - consumer_complaint_narrative
    - company_public_response
    - company
    - state
    - zip_code
    - tags
    - consumer_consent_provided
    - submitted_via
    - date_sent_to_company
    - company_response_to_consumer
    - timely_response
    - consumer_disputed
    - complaint_id

Overwriting file.yaml


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")


In [4]:
config_data['inbound_delimiter']

','

## Panda

In [5]:
import pandas as pd

In [6]:
%%time
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file)




CPU times: total: 24.4 s
Wall time: 25.2 s


In [8]:
df.head()

Unnamed: 0,Date received,Product,Sub-product,Issue,Sub-issue,Consumer complaint narrative,Company public response,Company,State,ZIP code,Tags,Consumer consent provided?,Submitted via,Date sent to company,Company response to consumer,Timely response?,Consumer disputed?,Complaint ID
0,2022-11-19,"Credit reporting, credit repair services, or o...",Credit reporting,Improper use of your report,Reporting company used your report improperly,,,"EQUIFAX, INC.",TX,78541.0,Servicemember,,Web,2022-11-19,In progress,Yes,,6222374
1,2022-10-13,"Credit reporting, credit repair services, or o...",Credit reporting,Improper use of your report,Reporting company used your report improperly,In accordance with the fair credit reporting a...,Company has responded to the consumer and the ...,"TRANSUNION INTERMEDIATE HOLDINGS, INC.",GA,30043.0,,Consent provided,Web,2022-10-13,Closed with explanation,Yes,,6079679
2,2022-10-13,"Credit reporting, credit repair services, or o...",Credit reporting,Incorrect information on your report,Information belongs to someone else,This is not a duplicate nor is this complaint ...,Company has responded to the consumer and the ...,"TRANSUNION INTERMEDIATE HOLDINGS, INC.",CA,90660.0,,Consent provided,Web,2022-10-13,Closed with non-monetary relief,Yes,,6076990
3,2022-10-12,"Credit reporting, credit repair services, or o...",Credit reporting,Incorrect information on your report,Personal information incorrect,,Company has responded to the consumer and the ...,"TRANSUNION INTERMEDIATE HOLDINGS, INC.",PA,19148.0,,Consent not provided,Web,2022-10-12,Closed with non-monetary relief,Yes,,6080312
4,2022-11-19,"Credit reporting, credit repair services, or o...",Credit reporting,Incorrect information on your report,Information belongs to someone else,,,"TRANSUNION INTERMEDIATE HOLDINGS, INC.",OH,44130.0,,,Web,2022-11-19,In progress,Yes,,6222634


## Dask

In [9]:
import dask.dataframe as dd

In [10]:
%%time

# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
ddf = dd.read_csv(source_file, blocksize=None, low_memory=False)


CPU times: total: 15.6 ms
Wall time: 24.6 ms


## Modin

In [11]:
import modin.pandas as mpd

In [12]:
%%time

# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
mdf = mpd.read_csv(source_file)


    from distributed import Client

    client = Client()



CPU times: total: 11.8 s
Wall time: 1min 26s


Among three method, **Dask** has the most efficient computation time.

## To gzip

In [8]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [10]:
# make gz file for the dataframe
df.to_csv('data.csv.gz', index=False, compression='gzip', sep=config_data['outbound_delimiter'])

## Summay of files

In [9]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['date_received', 'product', 'sub_product', 'issue', 'sub_issue',
       'consumer_complaint_narrative', 'company_public_response', 'company',
       'state', 'zip_code', 'tags', 'consumer_consent_provided',
       'submitted_via', 'date_sent_to_company', 'company_response_to_consumer',
       'timely_response', 'consumer_disputed', 'complaint_id'],
      dtype='object')
columns of YAML are: ['date_received', 'product', 'sub_product', 'issue', 'sub_issue', 'consumer_complaint_narrative', 'company_public_response', 'company', 'state', 'zip_code', 'tags', 'consumer_consent_provided', 'submitted_via', 'date_sent_to_company', 'company_response_to_consumer', 'timely_response', 'consumer_disputed', 'complaint_id']


In [19]:
import os
# get the file size
csv_file_size = os.path.getsize('complaints.csv')
gz_file_size = os.path.getsize('data.csv.gz')


# print the summary
print(f"Dataset memory size: {csv_file_size} bytes")
print(f"File size after zipping: {gz_file_size}")
print(f"Total number of rows: {len(df)}")
print(f"Total number of columns: {len(df.columns)}")


Dataset memory size: 2072427322 bytes
File size after zipping: 508824210
Total number of rows: 3093136
Total number of columns: 18
