# **File Ingestion and Schema Validation**

**Importing & Installing Libraires**


In [59]:
import random
import torch
import cv2
import os
import time
import matplotlib.pyplot as plt
import numpy as np
import zipfile
import pandas as pd
import numpy as np
pd.set_option('display.max_columns', None)
import seaborn as sns
import plotly.express as px
import dask

In [2]:
pip install modin[ray]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [58]:
path= '/content/sample_data/california_housing_train.csv'

## **Size**

In [55]:
os.path.getsize('/content/sample_data/california_housing_train.csv')

1706430

In [71]:
df = pd.read_csv(path)
df.head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
1,-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0
2,-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0
3,-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0
4,-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925,65500.0


## **Read Data With Dask**

In [73]:
from dask import dataframe as dfff
start = time.time()
dask_df = dfff.read_csv(path)
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.009879827499389648 sec


## **Read Data With Dask**

In [6]:
from dask import dataframe as dff
start = time.time()
dask_df = dff.read_csv(path)
end = time.time()
print("Read csv with dask: ",(end-start),"sec")

Read csv with dask:  0.0074596405029296875 sec


## **Read Data With Pandas**

In [7]:
import pandas as pd
start = time.time()
df = pd.read_csv(path)
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")

Read csv with pandas:  0.02148914337158203 sec


## **Read Data With Modin(Ray)**

In [34]:
import modin.pandas as pd
print("modin import successful")
import ray
ray.shutdown()
ray.init()
start = time.time()
modin_df = pd.read_csv(path)
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")

modin import successful


2022-09-30 12:02:33,766	INFO worker.py:1515 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


Read csv with modin and ray:  0.9897801876068115 sec


## **Dask Perform better than Pandas & Modin(Ray)**

In [75]:
from dask import dataframe as dff
df_dask = dff.read_csv(path,delimiter=',')

In [76]:
df_dask.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 9 entries, longitude to median_house_value
dtypes: float64(9)

In [24]:
# remove special character
df_dask.columns=df_dask.columns.str.replace('[#,@,&]',',_')



In [25]:
#To remove white space from columns
df_dask.columns = df_dask.columns.str.replace(' ', '')
df_dask.columns = df_dask.columns.str.replace('_', '')

In [26]:
data=df_dask.columns
data

Index(['longitude', 'latitude', 'housingmedianage', 'totalrooms',
       'totalbedrooms', 'population', 'households', 'medianincome',
       'medianhousevalue'],
      dtype='object')

# **Validation**

In [27]:
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

In [35]:
%%writefile utility.py

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [39]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: california_housing_train
table_name: df
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
      - longitude
      - latitude
      - housing_median_age
      - total_rooms
      - VersionNum
      - total_bedrooms
      - population
      - FederalTIN
      - households
      - median_income
      - median_house_value

Overwriting store.yaml


In [47]:
import yaml

# **Gzip Compression**

In [62]:
import datetime
import csv
import gzip

from dask import dataframe as ddf
df1 = ddf.read_csv('/content/sample_data/california_housing_train.csv')

# Write csv in gz format in pipe separated text file (|)
df1.to_csv('california_housing_train.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['/content/california_housing_train.csv.gz/0.part']

In [69]:
#number of files in gz format folder
import os
files = os.listdir('/content/california_housing_train.csv.gz/')
for file in files:
    print(file)

0.part


In [70]:
#size of the gz format folder
os.path.getsize('/content/california_housing_train.csv.gz/')

4096

# **END**