## Dataset Used 
https://www.kaggle.com/new-york-city/nyc-parking-tickets?select=Parking_Violations_Issued_-_Fiscal_Year_2015.csv

### Write YAML file

In [None]:
%%writefile file.yaml
file_type: csv
dataset_name: Parking_Violations_2015
file_name: Parking_Violations_2015
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - Summons Number 
    - Plate ID 
    - Registration State
    - Plate Type
    - Issue Date 
    - Violation Code 
    - Vehicle Body Type
    - Vehicle Make
    - Issuing Agency 
    - Street Code1
    - Street Code2
    - Street Code3 
    - Vehicle Expiration Date
    - Violation Location
    - Violation Precinct
    - Issuer Precinct
    - Issuer Code
    - Issuer Command 
    - Issuer Squad
    - Violation Time
    - Time First Observed
    - Violation County 
    - Violation In Front Of Or Opposite
    - House Number
    - Street Name
    - Intersecting Street
    - Date First Observed
    - Law Section 
    - Sub Division 
    - Violation Legal Code
    - Days Parking In Effect
    - From Hours In Effect
    - To Hours In Effect
    - Vehicle Color
    - Unregistered Vehicle
    - Vehicle Year
    - Meter Number
    - Feet From Curb
    - Violation Post Code
    - Violation Description
    - No Standing or Stopping Violation
    - Hydrant Violation
    - Double Parking Violation
    - Latitude
    - Longitude 
    - Community Board
    - Community Council 
    - Census Tract
    - BIN
    - BBL
    - NTA
drop_columns:
    - No Standing or Stopping Violation
    - Hydrant Violation
    - Double Parking Violation
    - Latitude
    - Longitude
    - Community Board
    - Community Council 
    - Census Tract
    - BIN
    - BBL
    - NTA
    - Violation Legal Code
    - Time First Observed
    - Unregistered Vehicle
    - Meter Number
    - Violation County
    - Violation In Front Of Or Opposite
    - House Number
    - Intersecting Street
    - Days Parking In Effect
    - From Hours In Effect
    - To Hours In Effect
    - Meter Number
    - Violation Post Code
    
dtypes:
    - Feet From Curb: float64
    - Law Section: float64
    - Vehicle Year: float64
    - Violation Legal Code: object
    - Double Parking Violation: object
    - Hydrant Violation: object
    - No Standing or Stopping Violation: object
drop_null: True
output_file_name: final

In [None]:
%%writefile testutility.py
import pandas as pd
import yaml
    
def array_clean(array):
    new_array = [(''.join(y for y in x if y.isalnum() or y==' ')).strip().lower() for x in array]
    return new_array

def config_file(path):
    with open(path,'r') as file:
        att = yaml.safe_load(file)
    return att

def column_validation(df,expected):
    new_column_names = array_clean(df.columns)
    expected_column_names = array_clean(expected)
    trigger = True
    unexpected_columns = [x for x in new_column_names if x not in expected_column_names]
    if len(unexpected_columns) >0:
        print('Columns Not Present in Schema',unexpected_columns)
        trigger = False
    missing_columns = [x for x in expected_column_names if x not in new_column_names]
    if len(missing_columns) >0:
        print('Missing Columns',missing_columns)
        trigger = False
    if trigger:
        df.columns = new_column_names
        print('Sucessfully validated column Names')
        return 1
    else:
        return 0
    
def drop_columns(df,col_names):
    df = df.drop(array_clean(col_names),axis=1)
    return df

### Pandas Read

In [None]:
import testutility as utils
import pandas as pd
import os
import time

start = time.time()
att = utils.config_file("file.yaml")

file_name = att['file_name']
extension = att['file_type']

print('Reading file .....')
r_start = time.time()
df = pd.read_csv(file_name+'.'+extension, delimiter = att['inbound_delimiter'])
r_end = time.time()
print('File Read sucessfully, time taken:',(r_end-e_start))


print('Validating columns.....')
if utils.column_validation(df,att['columns']):
    print("Validation Sucessfull")
else:
    print('Validation failed , process stopped')

if len(att['drop_columnns']) >0:
    print('Deleting specified columns....')
    df = utils.drop_columns(df,att['drop_columns'])

if att['drop_null']:
    print('Dropping NULL Rows....')
    df = df.dropna()
    

c_start = time.time()
print('Compressing the dataset.....')
df.to_csv(att['output_file_name']+'.csv.gz', sep='|', compression='gzip')
c_end = time.time()
print('Sucessfully compressed dataset, time taken:',(c_end-c_start))

print('Writing Summary File....')
no_of_rows = len(df)
no_of_cols = len(df.columns)
file_size = os.path.getsize(att['output_file_name']+'.csv.gz')

with open('summary.txt', 'w') as f:
    f.write('Number of rows:'+ str(no_of_rows))
    f.write('\n')
    f.write('Number of columns:'+ str(no_of_cols))
    f.write('\n')
    f.write('File Size:'+ str(file_size))
print('Sucessfully written summary file')

end = time.time()
print(end-start)

## Read Using Dask

In [None]:
import dask.dataframe as dd
import testutility as util
import time 

start = time.time()
att = utils.config_file("file.yaml")

file_name = att['file_name']
extension = att['file_type']
dt = {list(x.keys())[0]:list(x.values())[0] for x in att['dtypes']}

print('Reading file .....')
r_start = time.time()
df = dd.read_csv(file_name+'.'+ extension, dtype = dt)
r_end = time.time()
print('File Read sucessfully, time taken:',(r_end-e_start))

print('Validating columns.....')
if utils.column_validation(df,att['columns']):
    print("Validation Sucessfull")
else:
    print('Validation failed , process stopped')
    

if len(att['drop_columnns']) >0:
    print('Deleting specified columns....')
    df = utils.drop_columns(df,att['drop_columns'])

df = df.compute()

if att['drop_null']:
    print('Dropping NULL Rows....')
    df = df.dropna()

    
c_start = time.time()
print('Compressing the dataset.....')
df.to_csv(att['output_file_name']+'.csv.gz', sep='|', compression='gzip')
c_end = time.time()
print('Sucessfully compressed dataset, time taken:',(c_end-c_start))

no_of_rows = len(df)
no_of_cols = len(df.columns)
file_size = os.path.getsize(att['output_file_name']+'.csv.gz')

print('Writing Summary File....')
no_of_rows = len(df)
no_of_cols = len(df.columns)
file_size = os.path.getsize(att['output_file_name']+'.csv.gz')

with open('summary.txt', 'w') as f:
    f.write('Number of rows:'+ str(no_of_rows))
    f.write('\n')
    f.write('Number of columns:'+ str(no_of_cols))
    f.write('\n')
    f.write('File Size:'+ str(file_size))
print('Sucessfully written summary file')

end = time.time()
print(end-start)