In [1]:
%%writefile instruct.yaml 

file_type: csv
dataset_name: custom_1988_2020
file_name: custom_1988_2020
table_name: 103
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - '198801'
    - '1'
    - '103'
    - '100'
    - 000000190
    - '0'
    - '35843'
    - '34353'

Overwriting instruct.yaml


In [2]:
%%writefile testutility.py 

import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: str(x).lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: str(x).lower(), list(df.columns)))
    df = df.reset_index()
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [4]:
# Read config file
import testutility as util
config_data = util.read_config_file("instruct.yaml")

In [5]:
# dictionary with which you can subset whichever key you want
config_data['inbound_delimiter']

','

In [6]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'custom_1988_2020',
 'file_name': 'custom_1988_2020',
 'table_name': 103,
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['198801', '1', '103', '100', '000000190', '0', '35843', '34353']}

In [7]:
import dask.dataframe as dd

In [8]:
# read the file using config file
file_type = config_data['file_type']
source_file = config_data['file_name'] + f'.{file_type}'
df = dd.read_csv(source_file)
df.head()

Unnamed: 0,198801,1,103,100,000000190,0,35843,34353
0,198801,1,103,100,120991000,0,1590,4154
1,198801,1,103,100,210390900,0,4500,2565
2,198801,1,103,100,220890200,0,3000,757
3,198801,1,103,100,240220000,0,26000,40668
4,198801,1,103,100,250410000,0,5,8070


In [14]:
# number of pandas dataframes in a single dask dataframe
df.npartitions

71

In [10]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['index']
Following YAML columns are not in the file uploaded []


0

In [11]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['198801', '1', '103', '100', '000000190', '0', '35843', '34353'], dtype='object')
columns of YAML are: ['198801', '1', '103', '100', '000000190', '0', '35843', '34353']


In [12]:
config_data['columns']

['198801', '1', '103', '100', '000000190', '0', '35843', '34353']

In [13]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['index']
Following YAML columns are not in the file uploaded []
validation failed


In [16]:
df.describe().compute()  

Unnamed: 0,198801,1,103,100,000000190,0,35843,34353
count,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0,113607300.0
mean,200512.6,1.391867,193.2035,313.3187,654576300.0,47076.76,127167.0,32408.3
std,928.6563,0.4881672,121.0249,179.774,252176300.0,29145920.0,4571867.0,376908.2
min,198801.0,1.0,103.0,100.0,11.0,0.0,0.0,30.0
25%,199710.0,1.0,111.0,104.0,482010900.0,0.0,116.0,764.0
50%,200510.0,1.0,205.0,305.0,830629000.0,0.0,900.0,2782.0
75%,201308.0,2.0,302.0,500.0,852692000.0,26.0,8975.0,11865.0
max,202012.0,2.0,703.0,908.0,970600000.0,125500000000.0,1885790000.0,183278400.0


Dask dataframe object has no attribute 'reindex'.<br>
Pandas framewwork could not load dataframe as size was too large and could not allocate memory space.<br>
Thus, index was reset and columns do not match even though the files being compared are the same.

File was downloaded from Kaggle and its size is 4.23 GB.