# Data Glacier Internship Week 6: Data Ingestion

### Writing the Utility File

In [2]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


### Writing the YAML File

In [3]:
%%writefile file.yaml
file_type: csv
dataset_name: 
file_name: train
table_name: train
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - row_id
    - timestamp
    - user_id
    - content_id
    - content_type_id
    - task_container_id
    - user_answer
    - answered_correctly
    - prior_question_elapsed_time
    - prior_question_had_explanation

Writing file.yaml


### Reading and Inspecting Config File

In [4]:
import testutility as util
config_data = util.read_config_file("file.yaml")

In [5]:
config_data['inbound_delimiter']

','

In [6]:
config_data

{'file_type': 'csv',
 'dataset_name': None,
 'file_name': 'train',
 'table_name': 'train',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['row_id',
  'timestamp',
  'user_id',
  'content_id',
  'content_type_id',
  'task_container_id',
  'user_answer',
  'answered_correctly',
  'prior_question_elapsed_time',
  'prior_question_had_explanation']}

### Reading the Datafile

#### Reading the File Using Pandas

In [12]:
import pandas as pd

In [13]:
%%timeit
df_sample = pd.read_csv("train.csv",delimiter=',')

1min 20s ± 5.97 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Reading the File Using Dask

In [2]:
import dask.dataframe as dd

In [14]:
%%timeit
df_dask = dd.read_csv('train.csv')

10.6 ms ± 95.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


#### Comparing Pandas and Dask

Dask was significantly more computationally efficient than pandas.

#### Reading the File Using the Config File

In [12]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'

df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

Unnamed: 0,row_id,timestamp,user_id,content_id,content_type_id,task_container_id,user_answer,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation
0,0,0,115,5692,0,1,3,1,,
1,1,56943,115,5716,0,2,2,1,37000.0,False
2,2,118363,115,128,0,0,0,1,55000.0,False
3,3,131167,115,7860,0,3,0,1,19000.0,False
4,4,137965,115,7922,0,4,1,1,11000.0,False


### Validating Number of Columns and Column Names of Ingested File with YAML

In [13]:
#Validating the header
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [14]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['row_id', 'timestamp', 'user_id', 'content_id', 'content_type_id',
       'task_container_id', 'user_answer', 'answered_correctly',
       'prior_question_elapsed_time', 'prior_question_had_explanation'],
      dtype='object')
columns of YAML are: ['row_id', 'timestamp', 'user_id', 'content_id', 'content_type_id', 'task_container_id', 'user_answer', 'answered_correctly', 'prior_question_elapsed_time', 'prior_question_had_explanation']


In [15]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation passed
col validation passed


### Writing the File as a Pipe Separated Text File in gz Format

In [20]:
df.to_csv("train.csv.gz", 
           index=False, 
           compression="gzip",sep="|")

### Summary of Datafile

#### Number of Rows: 101,230,332

#### Number of Columns: 10

#### File Size: 1.32 GB (Original CSV File Size: 5.7 GB)