# 1. File ingestion

## 1.1 Pandas

In [42]:
import pandas as pd
import time

start=time.time()
df=pd.read_csv('demo1.csv')
end=time.time()

print('total_time:%f'%(end-start))

total_time:40.616086


### Bracket removal fuction of column names

In [43]:
def remove_bracket(x):
    start=0
    end=0
    for i in range(len(x)):
        if x[i]=='(':
            start=i
        elif x[i]==')':
            end=i
            
    if start!=end:
        y=x.replace(x[start:end+1],'')
        return y
    
    else:
        return x

In [44]:
df.columns

Index(['Unnamed: 0', 'ym(Year + month)', 'exp_imp(export: 1, import: 2)',
       'hs9(HS code)', 'Customs', 'Country', 'Q1(quantity)', 'Q2(quantity)',
       'Value(in thousands of yen)'],
      dtype='object')

In [45]:
%%time
df=df.drop(columns=['Unnamed: 0'])
df.rename(columns=lambda x : remove_bracket(x),inplace=True)
df.columns

CPU times: total: 1.41 s
Wall time: 1.41 s


Index(['ym', 'exp_imp', 'hs9', 'Customs', 'Country', 'Q1', 'Q2', 'Value'], dtype='object')

## 1.2 Dask

In [62]:
import dask.dataframe as dd
import time

start=time.time()
ddf=dd.read_csv('demo1.csv')
end=time.time()

print('total_time:%f'%(end-start))

total_time:0.010990


In [63]:
ddf.columns

Index(['Unnamed: 0', 'ym(Year + month)', 'exp_imp(export: 1, import: 2)',
       'hs9(HS code)', 'Customs', 'Country', 'Q1(quantity)', 'Q2(quantity)',
       'Value(in thousands of yen)'],
      dtype='object')

In [64]:
%%time
ddf=ddf.drop(columns=['Unnamed: 0'])
ddf=ddf.rename(columns=lambda x : remove_bracket(x))
ddf.columns

CPU times: total: 0 ns
Wall time: 8.99 ms


Index(['ym', 'exp_imp', 'hs9', 'Customs', 'Country', 'Q1', 'Q2', 'Value'], dtype='object')

## 1.3 Modin

In [11]:
import modin.pandas as mpd
import time

start=time.time()
mdf=mpd.read_csv('demo1.csv')
end=time.time()

print('total_time:%f'%(end-start))

total_time:24.728534


In [67]:
%%time
mdf=mdf.drop(columns=['Unnamed: 0'])
mdf=mdf.rename(columns=lambda x : remove_bracket(x))
mdf.columns

CPU times: total: 15.6 ms
Wall time: 13 ms


Index(['ym', 'exp_imp', 'hs9', 'Customs', 'Country', 'Q1', 'Q2', 'Value'], dtype='object')

## 1.4 Ray

In [13]:
import ray
import time

start=time.time()
rdf=ray.data.read_csv('demo1.csv')
end=time.time()

print('total_time:%f'%(end-start))



total_time:36.774528


In [73]:
rdf

Dataset(num_blocks=1, num_rows=50047277, schema={: int64, ym(Year + month): int64, exp_imp(export: 1, import: 2): int64, hs9(HS code): int64, Customs: int64, Country: int64, Q1(quantity): int64, Q2(quantity): int64, Value(in thousands of yen): int64})

# 2. Schema validation

## 2.1 Yaml

In [75]:
%%writefile file.yaml
file_type: csv
file_name: demo1
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - ym(Year + month)
    - Country
    - City

Writing file.yaml


## 2.2 Utility

In [89]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


In [85]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [79]:
config_data

{'file_type': 'csv',
 'file_name': 'demo1',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['ym(Year + month)', 'Country', 'City']}

## 2.3 Column validation

In [80]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0.1,Unnamed: 0,ym(Year + month),"exp_imp(export: 1, import: 2)",hs9(HS code),Customs,Country,Q1(quantity),Q2(quantity),Value(in thousands of yen)
0,0,198801,1,103,100,120991000,0,1590,4154
1,1,198801,1,103,100,210390900,0,4500,2565
2,2,198801,1,103,100,220890200,0,3000,757
3,3,198801,1,103,100,240220000,0,26000,40668
4,4,198801,1,103,100,250410000,0,5,8070


In [86]:
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['customs', 'q1_quantity', 'ym_year_month', 'hs9_hs_code', 'q2_quantity', 'value_in_thousands_of_yen', 'exp_imp_export_1_import_2', 'unnamed_0']
Following YAML columns are not in the file uploaded ['city', 'ym(year + month)']


0

## 2.4 Info summary

In [111]:
%%writefile info.py
import pandas as pd
import io

def read_info(df):
    print("The number of rows is: %d"%df.shape[0])
    print("The number of columns is: %d"%df.shape[1])
    buf = io.StringIO()
    df.info(buf=buf)
    info = buf.getvalue()
    info = buf.getvalue().split('\n')[-2]
    print(info)

Writing info.py


In [112]:
import info

info.read_info(df)

The number of rows is: 50047277
The number of columns is: 9
memory usage: 3.4 GB


## 2.5 csv to txt in gz

In [114]:
df.to_csv('demo1.txt.gz', sep='|', compression='gzip')