# **File Ingestion and Schema Validation Assignment**

## **1. Write a Utility File**

## **2. Write a Yaml File**

## **3. File Validation Necessities**

## **4. Read File with Different Methods**

### **4a. Via Pandas**
### **4b. Via Modin**
### **4c. Via Vaex**


## **5. Validation and Summary**

## **1. Write a Utility File**

In [198]:
%%writefile testutility0.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):

    df.columns = [col.lower() for col in df.get_column_names()]
    df.columns = list(map(lambda x: x.strip('_'), df.columns))
    df.columns = list(map(lambda x: replacer(x,'_'), df.columns))
    expected_col = list(map(lambda x: x.lower(),  list(table_config['columns'])))
    expected_col.sort()
    df.columns.sort()
    df.columns =list(map(lambda x: x.lower(), df.columns))
    
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility0.py


## **2. Write a Yaml File**

In [199]:
%%writefile file.yaml
file_type: csv
dataset_name: ParkingViolations
file_name: Parking_Violations_2015
table_name: tableOne
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Summons Number
    - Plate ID
    - Registration State
    - Plate Type
    - Issue Date
    - Violation Code
    - Vehicle Body Type
    - Vehicle Make
    - Issuing Agency
    - Street Code1
    - Street Code2
    - Street Code3
    - Vehicle Expiration Date
    - Violation Location
    - Violation Precinct
    - Issuer Precinct
    - Issuer Code
    - Issuer Command
    - Issuer Squad
    - Violation Time
    - Time First Observed
    - Violation County
    - Violation In Front Of Or Opposite
    - House Number
    - Street Name
    - Intersecting Street
    - Date First Observed
    - Law Section
    - Sub Division
    - Violation Legal Code
    - Days Parking In Effect
    - From Hours In Effect
    - To Hours In Effect
    - Vehicle Color
    - Unregistered Vehicle?
    - Vehicle Year
    - Meter Number
    - Feet From Curb
    - Violation Post Code
    - Violation Description
    - No Standing or Stopping Violation
    - Hydrant Violation
    - Double Parking Violation
    - Latitude
    - Longitude
    - Community Board
    - Community Council
    - Census Tract
    - BIN
    - BBL
    - NTA

Overwriting file.yaml


## **3. File Validation**

In [200]:
# Read config file
import testutility0 as util
config_data = util.read_config_file("file.yaml")

In [201]:
#inspecting data of config file
config_data

OrderedDict([('file_type', 'csv'),
             ('dataset_name', 'ParkingViolations'),
             ('file_name', 'Parking_Violations_2015'),
             ('table_name', 'tableOne'),
             ('inbound_delimiter', ','),
             ('outbound_delimiter', '|'),
             ('skip_leading_rows', 1),
             ('columns',
              ['Summons Number',
               'Plate ID',
               'Registration State',
               'Plate Type',
               'Issue Date',
               'Violation Code',
               'Vehicle Body Type',
               'Vehicle Make',
               'Issuing Agency',
               'Street Code1',
               'Street Code2',
               'Street Code3',
               'Vehicle Expiration Date',
               'Violation Location',
               'Violation Precinct',
               'Issuer Precinct',
               'Issuer Code',
               'Issuer Command',
               'Issuer Squad',
               'Violation Time',
            

## **4. Read File with Different Methods**


### **4a. Via Pandas**

In [None]:
%%timeit
# read the file using config file (via pandas)
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
import pandas as pd
df = pd.read_csv(source_file,config_data['inbound_delimiter'])

## **Result: The file could not be read via Pandas because of memory (scalability) issues. We should try another method.**

### **4b. Via Modin**

In [1]:
!pip install modin[ray]

Collecting modin[ray]
  Downloading modin-0.11.2-py3-none-win_amd64.whl (731 kB)
Collecting pandas==1.3.4
  Downloading pandas-1.3.4-cp38-cp38-win_amd64.whl (10.2 MB)
Collecting pyarrow>=1.0
  Downloading pyarrow-6.0.0-cp38-cp38-win_amd64.whl (15.5 MB)
Collecting ray[default]>=1.4.0
  Downloading ray-1.7.0-cp38-cp38-win_amd64.whl (18.1 MB)
Collecting redis>=3.5.0
  Downloading redis-3.5.3-py2.py3-none-any.whl (72 kB)
Collecting aiohttp
  Downloading aiohttp-3.8.0-cp38-cp38-win_amd64.whl (572 kB)
Collecting aiohttp-cors
  Downloading aiohttp_cors-0.7.0-py3-none-any.whl (27 kB)
Collecting gpustat
  Downloading gpustat-0.6.0.tar.gz (78 kB)
Collecting py-spy>=0.2.0
  Downloading py_spy-0.3.10-py2.py3-none-win_amd64.whl (1.4 MB)
Collecting colorful
  Downloading colorful-0.5.4-py2.py3-none-any.whl (201 kB)
Collecting opencensus
  Downloading opencensus-0.8.0-py2.py3-none-any.whl (128 kB)
Collecting aioredis<2
  Downloading aioredis-1.3.1-py3-none-any.whl (65 kB)
Collecting hiredis
  Downloa

In [None]:
%%timeit
# read the file using config file (via Modin)
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
import modin.pandas as pd
df = pd.read_csv(source_file,config_data['inbound_delimiter'])


    import ray
    ray.init()



## **Result: The file could not be read via Modin because of memory (scalability) issues again. We should try another method.**

### **4c. Via Vaex**

In [5]:
pip install anyio >=3.0.0

Note: you may need to restart the kernel to use updated packages.


In [None]:
!pip install --upgrade vaex

In [7]:
# read the file using config file (via Vaex)
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
import vaex as vx
df = vx.open(source_file)

  return _from_csv_read(filename_or_buffer=filename_or_buffer, copy_index=copy_index,


## **Result: Although it takes a serious amount of time, the data could be read via Vaex.**

## **5. Validation and Summary**

In [215]:
# Validation

util.col_header_val(df,config_data)

column name and column length validation passed


1

In [216]:
print("Total Number of Rows: " + str(df.shape[0]))
print("Total Number of Columns: " + str(df.shape[1]))
print("File Size: 2.66 GB")

Total Number of Rows: 11809233
Total Number of Columns: 51
File Size: 2.66 GB
