## Task: File Ingestion and Schema validation

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file:

    Total number of rows,

    total number of columns

    file size

In [1]:
import os
import time

In [2]:
#Size of the file
os.path.getsize('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv')

4544707885

### Read in the data with Dask

In [3]:
import dask.dataframe as dd
start = time.time()
dask_df = dd.read_csv('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv')
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.0038919448852539062 sec


### Read in the data with Pandas

In [4]:
import pandas as pd
start = time.time()
df = pd.read_csv('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv')
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  27.523354053497314 sec


### Here Dask is better than Pandas, with the least reading time of 0.004 sec

In [5]:
from dask import dataframe as dd
df = dd.read_csv('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv',delimiter=',')

In [6]:
df.info()

<class 'dask_expr.DataFrame'>
Columns: 8 entries, 198801 to 34353
dtypes: int64(8)

In [7]:
#No. of Rows
len(df.index)

113607321

In [8]:
#No, of Columns
len(df.columns)

8

In [9]:
# remove special character
df.columns=df.columns.str.replace('[#,@,&]','')

In [10]:
#To remove white space from columns
df.columns = df.columns.str.replace(' ', '')

In [11]:
data=df.columns
data

Index(['198801', '1', '103', '100', '000000190', '0', '35843', '34353'], dtype='object')

### Validation

In [12]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [13]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - JapanTradeData
      - YearAndMonth
      - ExportOrImport
      - HSCode
      - Customs
      - Country 
      - Q1
      - Q2
      - Value

Overwriting store.yaml


In [14]:
# Reading config file
import utility as util
config_data = util.read_config_file("store.yaml")

  df.columns = df.columns.str.replace('[^\w]','_',regex=True)


In [15]:
#data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['JapanTradeData - YearAndMonth - ExportOrImport - HSCode - Customs - Country - Q1 - Q2 - Value']}

In [16]:
# Reading process of the file using Dask
from dask import dataframe as dd
df_sample = dd.read_csv('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv',delimiter=',')
df_sample.head()

Unnamed: 0,198801,1,103,100,000000190,0,35843,34353
0,198801,1,103,100,120991000,0,1590,4154
1,198801,1,103,100,210390900,0,4500,2565
2,198801,1,103,100,220890200,0,3000,757
3,198801,1,103,100,240220000,0,26000,40668
4,198801,1,103,100,250410000,0,5,8070


In [17]:
#Reading the file using config file
file_type = config_data['file_type']
source_file = "/Users/pranav13b/Documents/Data Glacier Internship/Week 6/" + config_data['file_name'] + f'.{file_type}'

In [18]:
import pandas as pd
df = pd.read_csv(source_file)
df.head()

Unnamed: 0,198801,1,103,100,000000190,0,35843,34353
0,198801,1,103,100,120991000,0,1590,4154
1,198801,1,103,100,210390900,0,4500,2565
2,198801,1,103,100,220890200,0,3000,757
3,198801,1,103,100,240220000,0,26000,40668
4,198801,1,103,100,250410000,0,5,8070


In [19]:
#validating the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['100', '103', '1', '0', '35843', '34353', '198801', '000000190']
Following YAML columns are not in the file uploaded ['japantradedata - yearandmonth - exportorimport - hscode - customs - country - q1 - q2 - value']


0

In [20]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['198801', '1', '103', '100', '000000190', '0', '35843', '34353'], dtype='object')
columns of YAML are: ['JapanTradeData - YearAndMonth - ExportOrImport - HSCode - Customs - Country - Q1 - Q2 - Value']


In [21]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['100', '103', '1', '0', '35843', '34353', '198801', '000000190']
Following YAML columns are not in the file uploaded ['japantradedata - yearandmonth - exportorimport - hscode - customs - country - q1 - q2 - value']
validation failed


In [22]:
import datetime
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv',delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv('data.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True)

['/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/00.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/01.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/02.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/03.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/04.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/05.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/06.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/07.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/08.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/09.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/10.part',
 '/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/11.part',
 '/Users/pranav1

In [23]:
#number of files in gz format folder
import os
entries = os.listdir('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz/')
for entry in entries:
    print(entry)

26.part
67.part
30.part
47.part
10.part
06.part
51.part
50.part
07.part
11.part
46.part
31.part
66.part
70.part
27.part
41.part
16.part
00.part
57.part
20.part
61.part
36.part
37.part
60.part
21.part
56.part
01.part
17.part
40.part
55.part
02.part
14.part
43.part
38.part
59.part
18.part
34.part
63.part
22.part
23.part
62.part
35.part
19.part
58.part
39.part
42.part
15.part
03.part
54.part
08.part
49.part
32.part
65.part
24.part
53.part
04.part
12.part
45.part
69.part
28.part
29.part
68.part
44.part
13.part
05.part
52.part
25.part
64.part
33.part
48.part
09.part


In [24]:
#size of the gz format folder
os.path.getsize('/Users/pranav13b/Documents/Data Glacier Internship/Week 6/data.csv.gz')

2336