## Task:

* Take any csv/text file of 2+ GB of your choice. --- (You can do this assignment on Google colab)

* Read the file ( Present approach of reading the file )

* Try different methods of file reading eg: Dask, Modin, Ray, pandas and present your findings in term of computational     efficiency

* Perform basic validation on data columns : eg: remove special character , white spaces from the col name

* As you already know the schema hence create a YAML file and write the column name in YAML file. --define separator of   
  read and write file, column name in YAML

* Validate number of columns and column name of ingested file with YAML.

* Write the file in pipe separated text file (|) in gz format.

* Create a summary of the file:

    Total number of rows,

    total number of columns

    file size
    
# Data Ingestion sample code walkthrough

## 
  > Create a utility file
  
  > Config file creation
  
  > Data ingestion pipeline


In [49]:
from google.colab import drive,files
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [50]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import pandas as pd

################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

### Write YAML file

In [51]:
%%writefile schema.yaml

file_type: csv
file_name: tidy_campprof_enrollmnent_2013_to_2019
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - campus_number
    - data_release 
    - data_level 
    - data_category
    - release_year
    - demog
    - count 
    - all_students
    - percent

Overwriting schema.yaml


In [52]:
# Read config file
config_data = read_config_file("schema.yaml")

In [53]:
config_data['inbound_delimiter']

','

In [54]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'file_name': 'tidy_campprof_enrollmnent_2013_to_2019',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['campus_number',
  'data_release',
  'data_level',
  'data_category',
  'release_year',
  'demog',
  'count',
  'all_students',
  'percent']}

In [55]:
# Normal reading process of the file
import pandas as pd
df_sample = pd.read_csv('/content/drive/MyDrive/Colab_Notebooks/tidy_campprof_enrollmnent_2013_to_2019.csv',delimiter=',')
df_sample.head()

Unnamed: 0,campus_number,data_release,data_level,data_category,release_year,demog,count,all_students,percent
0,1902001,tapr,campus,CAMPPROF.csv,2013,african_american,16,206,7.8
1,1902001,tapr,campus,CAMPPROF.csv,2013,asian,2,206,1.0
2,1902001,tapr,campus,CAMPPROF.csv,2013,at_risk,68,206,33.0
3,1902001,tapr,campus,CAMPPROF.csv,2013,bilingual_esl,3,206,1.5
4,1902001,tapr,campus,CAMPPROF.csv,2013,career_tech,145,206,70.4


In [56]:
# read the file using config file
file_type = config_data['file_type']
source_file = "/content/drive/MyDrive/Colab_Notebooks/" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,campus_number,data_release,data_level,data_category,release_year,demog,count,all_students,percent
0,1902001,tapr,campus,CAMPPROF.csv,2013,african_american,16,206,7.8
1,1902001,tapr,campus,CAMPPROF.csv,2013,asian,2,206,1.0
2,1902001,tapr,campus,CAMPPROF.csv,2013,at_risk,68,206,33.0
3,1902001,tapr,campus,CAMPPROF.csv,2013,bilingual_esl,3,206,1.5
4,1902001,tapr,campus,CAMPPROF.csv,2013,career_tech,145,206,70.4


In [57]:
#validate the header of the file
col_header_val(df,config_data)

column name and column length validation passed


1

In [58]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['campus_number', 'data_release', 'data_level', 'data_category',
       'release_year', 'demog', 'count', 'all_students', 'percent'],
      dtype='object')
columns of YAML are: ['campus_number', 'data_release', 'data_level', 'data_category', 'release_year', 'demog', 'count', 'all_students', 'percent']


In [59]:
if col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [60]:
pd.read_csv("/content/drive/MyDrive/Colab_Notebooks/tidy_campprof_enrollmnent_2013_to_2019.csv")

Unnamed: 0,campus_number,data_release,data_level,data_category,release_year,demog,count,all_students,percent
0,1902001,tapr,campus,CAMPPROF.csv,2013,african_american,16,206,7.8
1,1902001,tapr,campus,CAMPPROF.csv,2013,asian,2,206,1.0
2,1902001,tapr,campus,CAMPPROF.csv,2013,at_risk,68,206,33.0
3,1902001,tapr,campus,CAMPPROF.csv,2013,bilingual_esl,3,206,1.5
4,1902001,tapr,campus,CAMPPROF.csv,2013,career_tech,145,206,70.4
...,...,...,...,...,...,...,...,...,...
1970711,254902101,tapr,campus,CAMPPROF.csv,2019,phys_disabilities,12,288,4.2
1970712,254902101,tapr,campus,CAMPPROF.csv,2019,section_504,12,288,4.2
1970713,254902101,tapr,campus,CAMPPROF.csv,2019,special_ed,32,288,11.1
1970714,254902101,tapr,campus,CAMPPROF.csv,2019,two_or_more_races,0,288,0.0


In [61]:
import csv
import gzip

from dask import dataframe as dd
df = dd.read_csv("/content/drive/MyDrive/Colab_Notebooks/tidy_campprof_enrollmnent_2013_to_2019.csv")

# Write csv in gz format in pipe separated text file (|)
df.to_csv('/content/idy_campprof_enrollmnent_2013_to_2019.csv.gz',
          sep='|',
          header=True,
          index=False,
        quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')



['/content/idy_campprof_enrollmnent_2013_to_2019.csv.gz/0.part']

In [63]:
#number of files in gz format folder
entries = os.listdir('/content/idy_campprof_enrollmnent_2013_to_2019.csv.gz/')
for entry in entries:
    print(entry)

0.part


In [75]:
#size of the gz format folder
zip_s=os.path.getsize('/content/drive/MyDrive/Colab_Notebooks/tidy_campprof_enrollmnent_2013_to_2019.csv')
gzip_s=os.path.getsize('/content/idy_campprof_enrollmnent_2013_to_2019.csv.gz')
print(f'{zip_s},{gzip_s}')

119260647,4096


In [76]:
%%shell
jupyter nbconvert --to html /content/drive/MyDrive/Colab_Notebooks/week6_dataingestion.ipynb

[NbConvertApp] Converting notebook /content/drive/MyDrive/Colab_Notebooks/week6_dataingestion.ipynb to html
[NbConvertApp] Writing 317788 bytes to /content/drive/MyDrive/Colab_Notebooks/week6_dataingestion.html


