# File ingestion and schema validation

## Dask data frame

> [dask]('https://www.dask.org/')


Perform basic validation on data columns : eg: remove special character , white spaces from the col name

Create Schema in YAML

Validate the file with YAML



In [7]:
#from dask.distributed import Client
#client = Client()
#client

In [8]:
#client.shutdown()

In [9]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re
import csv

> YAML Configuration file write and read

In [10]:
%%writefile file.yaml
file_type: csv
dataset_name: Parking_Violations
file_name: Parking_Violations_2016
inbound_deliminater: ','
outbound_deliminater: '|'
skip_leading_rows: 1
columns:
   - summons_number 
   - plate_id 
   - registration_state
   - plate_type      
   - issue_date
   - violation_code 
   - vehicle_body_type
   - vehicle_make      
   - issuing_agency
   - street_code1
   - street_code2
   - street_code3       
   - vehicle_expiration_date 
   - violation_location
   - violation_precinct
   - issuer_precinct
   - issuer_code 
   - issuer_command 
   - issuer_squad
   - violation_time 
   - time_first_observed 
   - violation_county
   - violation_in_front_of_or_opposite
   - house_number
   - street_name 
   - intersecting_street 
   - date_first_observed 
   - law_section
   - sub_division 
   - violation_legal_code
   - days_parking_in_effect
   - from_hours_in_effect 
   - to_hours_in_effect
   - vehicle_color
   - unregistered_vehicle
   - vehicle_year
   - meter_number
   - feet_from_curb 
   - violation_post_code 
   - violation_description
   - no_standing_or_stopping_violation 
   - hydrant_violation
   - double_parking_violation

Overwriting file.yaml


In [11]:
import yaml
with open('file.yaml', 'r') as f:
    file = yaml.safe_load(f)
    file

Check yaml file

In [12]:
file['inbound_deliminater']

','

In [13]:
file['columns']

['summons_number',
 'plate_id',
 'registration_state',
 'plate_type',
 'issue_date',
 'violation_code',
 'vehicle_body_type',
 'vehicle_make',
 'issuing_agency',
 'street_code1',
 'street_code2',
 'street_code3',
 'vehicle_expiration_date',
 'violation_location',
 'violation_precinct',
 'issuer_precinct',
 'issuer_code',
 'issuer_command',
 'issuer_squad',
 'violation_time',
 'time_first_observed',
 'violation_county',
 'violation_in_front_of_or_opposite',
 'house_number',
 'street_name',
 'intersecting_street',
 'date_first_observed',
 'law_section',
 'sub_division',
 'violation_legal_code',
 'days_parking_in_effect',
 'from_hours_in_effect',
 'to_hours_in_effect',
 'vehicle_color',
 'unregistered_vehicle',
 'vehicle_year',
 'meter_number',
 'feet_from_curb',
 'violation_post_code',
 'violation_description',
 'no_standing_or_stopping_violation',
 'hydrant_violation',
 'double_parking_violation']

> Read and inspect CSV file

Track load times

In [1]:
import dask.dataframe as dd
import tracemalloc
tracemalloc.start()
dfd = dd.read_csv('Parking_Violations_2015.csv', delimiter=',')
dfd
snapshot=tracemalloc.take_snapshot()
tracemalloc.stop()


In [9]:
top_stats=snapshot.statistics('traceback')
# pick the biggest memory block
stat = top_stats[0]
print("%s memory blocks: %.1f KiB" % (stat.count, stat.size / 1024))
for line in stat.traceback.format():
    print(line)

300 memory blocks: 18.2 KiB
  File "C:\Users\nunto\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.9_qbz5n2kfra8p0\LocalCache\local-packages\Python39\site-packages\pandas\core\internals\managers.py", line 1024
    bp = BlockPlacement(slice(0, len(values)))


Explore Data

In [15]:
dfd.columns

Index(['Summons Number', 'Plate ID', 'Registration State', 'Plate Type',
       'Issue Date', 'Violation Code', 'Vehicle Body Type', 'Vehicle Make',
       'Issuing Agency', 'Street Code1', 'Street Code2', 'Street Code3',
       'Vehicle Expiration Date', 'Violation Location', 'Violation Precinct',
       'Issuer Precinct', 'Issuer Code', 'Issuer Command', 'Issuer Squad',
       'Violation Time', 'Time First Observed', 'Violation County',
       'Violation In Front Of Or Opposite', 'House Number', 'Street Name',
       'Intersecting Street', 'Date First Observed', 'Law Section',
       'Sub Division', 'Violation Legal Code', 'Days Parking In Effect    ',
       'From Hours In Effect', 'To Hours In Effect', 'Vehicle Color',
       'Unregistered Vehicle?', 'Vehicle Year', 'Meter Number',
       'Feet From Curb', 'Violation Post Code', 'Violation Description',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parking Violation', 'Latitude', 'Longitude', 'Comm

In [16]:
dfd.dtypes

Summons Number                         int64
Plate ID                              object
Registration State                    object
Plate Type                            object
Issue Date                            object
Violation Code                         int64
Vehicle Body Type                     object
Vehicle Make                          object
Issuing Agency                        object
Street Code1                           int64
Street Code2                           int64
Street Code3                           int64
Vehicle Expiration Date               object
Violation Location                     int64
Violation Precinct                     int64
Issuer Precinct                        int64
Issuer Code                            int64
Issuer Command                        object
Issuer Squad                          object
Violation Time                        object
Time First Observed                  float64
Violation County                      object
Violation 

### Perform Basic Validation on Data Columns

- Validate csv file with yaml

Drop unwanted columns

In [17]:
dfd=dfd.drop(["BIN", "BBL", "NTA"], axis=1)
dfd=dfd.drop(["Latitude", "Longitude", "Community Board", "Community Council ", "Census Tract"], axis=1)

> Remove special characters and white space from data columns

In [18]:
def val_data_col():
    # clean up df columns #
    dfd.columns=dfd.columns.str.replace('[?]', '')
    dfd.columns=dfd.columns.str.strip()
    dfd.columns=dfd.columns.str.replace('[ ]', '_')
    dfd.columns=dfd.columns.str.lower()
    # compare yaml columns with df columns #
    expected_columns = list(file['columns'])
    if len(dfd.columns) == len(expected_columns) and list(dfd.columns) == expected_columns:
        print('column name and column length validation passed')
        mismatched_columns_file = list(set(dfd.columns).difference(expected_columns))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_columns).difference(dfd.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {dfd.columns}')
        logging.info(f'expected columns: {expected_columns}')
        return 1
    else:
        print('column name and column length validation failed')

    return 0

In [19]:
val_data_col()

column name and column length validation passed
Following File columns are not in the YAML file []
Following YAML columns are not in the file uploaded []


  dfd.columns=dfd.columns.str.replace('[?]', '')
  dfd.columns=dfd.columns.str.replace('[ ]', '_')


1

Check dataframe

In [20]:
dfd.dtypes

summons_number                         int64
plate_id                              object
registration_state                    object
plate_type                            object
issue_date                            object
violation_code                         int64
vehicle_body_type                     object
vehicle_make                          object
issuing_agency                        object
street_code1                           int64
street_code2                           int64
street_code3                           int64
vehicle_expiration_date               object
violation_location                     int64
violation_precinct                     int64
issuer_precinct                        int64
issuer_code                            int64
issuer_command                        object
issuer_squad                          object
violation_time                        object
time_first_observed                  float64
violation_county                      object
violation_