In [2]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as md
import time
import ray

## Trying out different types of read method:  

#### Pandas

In [22]:
start_timep = time.time()
%time df_panda =pd.read_csv("Documents/dataset/custom_1988_2020.csv", delimiter =',')
end_timep = time.time()
print("Time required for pandas to read : ", (end_timep - start_timep), "seconds")

Wall time: 2min 26s
Time required for pandas to read :  146.33109760284424 seconds


#### Dask

In [21]:
start_timed = time.time()
%time df_dask =dd.read_csv("Documents/dataset/custom_1988_2020.csv", delimiter =',')
end_timed = time.time()
print("Time required for dask to read : ", (end_timed - start_timed), "seconds")

Wall time: 33.3 ms
Time required for dask to read :  0.035315513610839844 seconds


#### Modin and Ray

In [5]:
ray.shutdown()
ray.init()
start_timem = time.time()
%time df_modin =md.read_csv("Documents/dataset/custom_1988_2020.csv", delimiter =',')
end_timem = time.time()
print("Time required for modin to read : ", (end_timem - start_timem), "seconds")

Wall time: 1min 26s
Time required for modin to read :  86.07971572875977 seconds


### Dask took the least amount of time to read the file with 33.3 ms, Pandas took the most time at 2 min 26 s follwed by Modin which took 1 min  26s

In [27]:
df_dask.tail()

Unnamed: 0,198801,1,103,100,000000190,0,35843,34353
1601230,202012,2,627,200,843149020,0,228,602
1601231,202012,2,628,100,852580000,1,20,260
1601232,202012,2,628,104,847180000,0,9,1775
1601233,202012,2,702,104,30119000,179,16,255
1601234,202012,2,702,404,271019143,10,8114,1315


In [28]:
# assigning header names,  as dataset doesnt have column names
df_dask.columns

Index(['198801', '1', '103', '100', '000000190', '0', '35843', '34353'], dtype='object')

In [31]:
df_dask.columns =['Year_Month','Export_Import','HS_Code', 'Customs','Country','Quantity_1','Quantity_2','Value']

In [None]:
df_modin =df_modin.set_axis(['Year_Month','Export_Import','HS_Code', 'Customs','Country','Quantity_1','Quantity_2','Value'], axis =1)

In [37]:
df_modin.head()

Unnamed: 0,Year_Month,Export_Import,HS_Code,Customs,Country,Quantity_1,Quantity_2,Value
0,198801,1,103,100,120991000,0,1590,4154
1,198801,1,103,100,210390900,0,4500,2565
2,198801,1,103,100,220890200,0,3000,757
3,198801,1,103,100,240220000,0,26000,40668
4,198801,1,103,100,250410000,0,5,8070


In [39]:
df_modin.to_csv("test_data.csv",index = False)

## YAML file creation

In [16]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Year_Month
    - Export_Import
    - HS_Code
    - Customs
    - Country
    - Quantity_1
    - Quantity_2
    - Value

Writing file.yaml


In [17]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


In [18]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [20]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['Year_Month',
  'Export_Import',
  'HS_Code',
  'Customs',
  'Country',
  'Quantity_1',
  'Quantity_2',
  'Value']}

In [40]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = "C:/Users/Kumar/" + config_data['file_name'] + f'.{file_type}'

In [42]:
df = pd.read_csv(source_file)
df.head()

Unnamed: 0,Year_Month,Export_Import,HS_Code,Customs,Country,Quantity_1,Quantity_2,Value
0,198801,1,103,100,120991000,0,1590,4154
1,198801,1,103,100,210390900,0,4500,2565
2,198801,1,103,100,220890200,0,3000,757
3,198801,1,103,100,240220000,0,26000,40668
4,198801,1,103,100,250410000,0,5,8070


### Validation of number of columns and column name of ingested file with YAML.

In [43]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [44]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['year_month', 'export_import', 'hs_code', 'customs', 'country',
       'quantity_1', 'quantity_2', 'value'],
      dtype='object')
columns of YAML are: ['Year_Month', 'Export_Import', 'HS_Code', 'Customs', 'Country', 'Quantity_1', 'Quantity_2', 'Value']


In [46]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation passed
col validation passed


### Saving as pipe seperated text file in gz format

In [59]:
df.to_csv("japan_customs_trade_stats.txt.gz", sep ='|', compression = 'gzip', header =True)

### Summary

In [61]:
# tried with smaller size as saving to txt.gz with pipe delimiter was taking time
print("Total rows :", (len(df)))
print("Total columns: ", (len(df.columns)))

Total rows : 113607321
Total columns:  8


In [63]:
# tried with smaller size as saving to txt.gz with pipe delimiter was taking time
import os
print("File size: ", os.path.getsize('C:/Users/Kumar/japan_customs_trade_stats.txt.gz')/(1000000000), "gb")

File size:  1.380260028 gb
