# Data Glacier - Nahari Terena - Week6
## Week 6: File ingestion and schema validation

In [1]:
import os
import time

os.path.getsize('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/Week6/dataset_CNPJ.csv')

2059346639

### Reading data with Dask

In [2]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/dataset_CNPJ.csv', sep = ";")
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.03351473808288574 sec


### Read with Pandas

In [4]:
import pandas as pd
start = time.time()
df = pd.read_csv('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/Week6/dataset_CNPJ.csv', sep = ";", encoding = 'latin-1')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  292.1952292919159 sec


### Reading data with DictReader

In [3]:
import csv  

start = time.time()
with open('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/Week6/dataset_CNPJ.csv') as csvfile:  
    data = csv.DictReader(csvfile)

end = time.time()
print("Read csv with DictReader: ",(end-start),"sec")

Read csv with DictReader:  0.0019922256469726562 sec


Although, DictReader is faster than Dask, it imports everything as strings, while the other methods try to guess the data types of each column separately and possibly do multiple other validations upon import.
Therefore, in cases when you know that the columns present in the data are all in string format already, csv.DictReader is the method to go for.
In this specific case, we will use Dask instead of DictReader.

In [38]:
from dask import dataframe as dd
df = dd.read_csv('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/Week6/dataset_CNPJ.csv', sep = ";", dtype={'CODE_SIZE': 'object',
       'INCOME': 'object',
       'STATE': 'object'})

In [39]:
#No. of Rows
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 7 entries, CNPJ to STATE
dtypes: object(4), int64(3)

In [40]:
len(df.index)

32297432

In [37]:
df.columns

Index(['CNPJ', 'NAME', 'CODE', 'QUALIFICATION', 'INCOME', 'CODE_SIZE',
       'STATE'],
      dtype='object')

In [41]:
#No, of Columns
len(df.columns)

7

In [42]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

  df.columns=df.columns.str.replace('[#,@,&]','')


In [43]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [52]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [53]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing utility.py


In [55]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: Rate
table_name: cnpj
inbound_delimiter: ";"
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - CNPJ
    - NAME 
    - CODE 
    - QUALIFICATION
    - INCOME
    - CODE_SIZE
    - STATE

Writing store.yaml


In [59]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/Week6/dataset_CNPJ.csv', sep = ";", dtype={'CODE_SIZE': 'object',
       'INCOME': 'object',
       'STATE': 'object'})

# Write csv in gz format in pipe separated text file (|)
df.to_csv('Rate.csv.gz',
          sep=';',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)
  df.to_csv(f, **kwargs)


['C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\00.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\01.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\02.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\03.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\04.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\05.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\06.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\07.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\08.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\09.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\10.part',
 'C:\\Users\\Olnalu\\Desktop\\Nahari\\DataGlacier\\Week6\\Rate.csv.gz\\11.part',
 'C:\\Users\\Olnalu\\Desktop

In [60]:
#number of files in gz format folder
import os
entries = os.listdir('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/Week6/Rate.csv.gz/')
for entry in entries:
    print(entry)

00.part
01.part
02.part
03.part
04.part
05.part
06.part
07.part
08.part
09.part
10.part
11.part
12.part
13.part
14.part
15.part
16.part
17.part
18.part
19.part
20.part
21.part
22.part
23.part
24.part
25.part
26.part
27.part
28.part
29.part
30.part
31.part


In [61]:
#size of the gz format folder
os.path.getsize('C:/Users/Olnalu/Desktop/Nahari/DataGlacier/Week6/Rate.csv.gz')

12288