Create Utility File <br>
Create Config File <br>
Data Ingestion Pipeline steps

## Task:

1) Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

2) Read the file ( Present approach of reading the file )

3) Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

4) Perform basic validation on data columns : eg: remove special character , white spaces from the col name

5) As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

6) Validate number of columns and column name of ingested file with YAML.

7) **Write the file in pipe separated text file (|) in gz format.**

8) **Create a summary of the file:**

    **Total number of rows,**

    **total number of columns**

    **file size** 

**1) Take any csv/text file of 2+ GB of your choice.** <br>
I took the open adress dataset from kaggle(https://www.kaggle.com/openaddresses/openaddresses-europe) and used open adresses from France. <br>
This file has 11 columns, and 2 GB 202 Byte Size in total. 

**2) Read the file ( Present approach of reading the file )** <br>
To read file, let's create `read_config_file` method.

In [1]:
# Importing necessary libraries
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

Overwriting testutility.py


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

**3) Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational efficiency**

* As different method(other than config file), I am going to try pandas. If it crashs, I am going to try the Dask as dask is more suitable for bigger datasets. <br>
* To measure computational efficiency, I am going to use **computation time** as metric. <br>
Let's see which one has more computation time

In [6]:
# Normal reading process of the file
import time, pandas as pd
t_start = time.time()
test_data = pd.read_csv("C:\\Users\\talfi\\python\\dataglacier\\w6\\open-adresses-euro\\france.csv")
t_end = time.time()
print('pd.read_csv(): {} s'.format(t_end - t_start))

pd.read_csv(): 55.03337860107422 s


In [9]:
# read the file using config file
t_start = time.time()
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
t_middle = time.time()
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
t_end = time.time()
print('config file: {} s'.format(t_middle - t_start))

config file: 0.0 s


**4) Perform basic validation on data columns : eg: remove special character , white spaces from the col name**

In [12]:
def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

In [14]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['LON', 'LAT', 'NUMBER', 'STREET', 'UNIT', 'CITY', 'DISTRICT', 'REGION',
       'POSTCODE', 'ID', 'HASH'],
      dtype='object')
columns of YAML are: ['LON', 'LAT', 'NUMBER', 'STREET', 'UNIT', 'CITY', 'DISTRICT', 'REGION', 'POSTCODE', 'ID', 'HASH']


**5) As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of
read and write file, column name in YAML**

In [63]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: france
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - LON
    - LAT
    - NUMBER
    - STREET
    - UNIT
    - CITY
    - DISTRICT
    - REGION
    - POSTCODE
    - ID
    - HASH

Writing file.yaml


**6) Validate number of columns and column name of ingested file with YAML**

In [4]:
config_data['inbound_delimiter']

','

In [5]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'france',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['LON',
  'LAT',
  'NUMBER',
  'STREET',
  'UNIT',
  'CITY',
  'DISTRICT',
  'REGION',
  'POSTCODE',
  'ID',
  'HASH']}

**7) Write the file in pipe separated text file (|) in gz format.**

In [19]:
%%writefile file.gz
file_type: csv
dataset_name: testfile
file_name: france
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - LON
    - LAT
    - NUMBER
    - STREET
    - UNIT
    - CITY
    - DISTRICT
    - REGION
    - POSTCODE
    - ID
    - HASH

Overwriting file.gz


**8) Create a summary of the file:**

**Total number of rows,**

**total number of columns**

**file size** 

In [20]:
def sum_file(df):
    shape = df.shape
    size = df.size
    return shape, size

In [21]:
sum_file(test_data)

((27381995, 11), 301201945)

In [22]:
sum_file(df)

((27381995, 11), 301201945)