In [1]:
import os
import time
import warnings
warnings.filterwarnings("ignore")

In [2]:
# file size
os.path.getsize("Parking_Violations_Issued_-_Fiscal_Year_2016.csv")

2151937808

#### Read with Dask

In [3]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv("Parking_Violations_Issued_-_Fiscal_Year_2016.csv")
end = time.time()
print("Read time with dask: ", (end-start), "seconds")

Read time with dask:  0.011098861694335938 seconds


#### Read with Pandas

In [4]:
import pandas as pd
start = time.time()
df = pd.read_csv("Parking_Violations_Issued_-_Fiscal_Year_2016.csv")
end = time.time()
print("Read time with pandas: ", (end-start), "seconds")

Read time with pandas:  78.7598328590393 seconds


Dask is better than Pandas with the least reading time of less than a second

In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10626899 entries, 0 to 10626898
Data columns (total 51 columns):
 #   Column                             Dtype  
---  ------                             -----  
 0   Summons Number                     int64  
 1   Plate ID                           object 
 2   Registration State                 object 
 3   Plate Type                         object 
 4   Issue Date                         object 
 5   Violation Code                     int64  
 6   Vehicle Body Type                  object 
 7   Vehicle Make                       object 
 8   Issuing Agency                     object 
 9   Street Code1                       int64  
 10  Street Code2                       int64  
 11  Street Code3                       int64  
 12  Vehicle Expiration Date            float64
 13  Violation Location                 float64
 14  Violation Precinct                 float64
 15  Issuer Precinct                    float64
 16  Issuer Code     

In [6]:
# number of rows
len(df.index)

10626899

In [7]:
# number of columns
len(df.columns)

51

In [8]:
from dask import dataframe as dd
df = dd.read_csv("Parking_Violations_Issued_-_Fiscal_Year_2016.csv")

In [9]:
# remove special characters from column names
df.columns=df.columns.str.replace('[#,@,&,?]','')

# remove white space from column names
df.columns = df.columns.str.replace(' ', '')

In [10]:
# check column names
df.columns

Index(['SummonsNumber', 'PlateID', 'RegistrationState', 'PlateType',
       'IssueDate', 'ViolationCode', 'VehicleBodyType', 'VehicleMake',
       'IssuingAgency', 'StreetCode1', 'StreetCode2', 'StreetCode3',
       'VehicleExpirationDate', 'ViolationLocation', 'ViolationPrecinct',
       'IssuerPrecinct', 'IssuerCode', 'IssuerCommand', 'IssuerSquad',
       'ViolationTime', 'TimeFirstObserved', 'ViolationCounty',
       'ViolationInFrontOfOrOpposite', 'HouseNumber', 'StreetName',
       'IntersectingStreet', 'DateFirstObserved', 'LawSection', 'SubDivision',
       'ViolationLegalCode', 'DaysParkingInEffect', 'FromHoursInEffect',
       'ToHoursInEffect', 'VehicleColor', 'UnregisteredVehicle', 'VehicleYear',
       'MeterNumber', 'FeetFromCurb', 'ViolationPostCode',
       'ViolationDescription', 'NoStandingorStoppingViolation',
       'HydrantViolation', 'DoubleParkingViolation', 'Latitude', 'Longitude',
       'CommunityBoard', 'CommunityCouncil', 'CensusTract', 'BIN', 'BBL',
       

#### Write utility file

In [11]:
%%writefile testutility.py
import logging
import subprocess
import yaml
import datetime 
import gc
import re



def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df, table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


#### Write YAML file 

In [12]:
%%writefile store.yaml
file_type: csv
dataset_name: file
file_name: Parking_Violations_Issued_-_Fiscal_Year_2016
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - SummonsNumber
      - PlateID
      - RegistrationState
      - PlateType
      - IssueDate
      - ViolationCode
      - VehicleBodyType
      - VehicleMake
      - IssuingAgency
      - StreetCode1
      - StreetCode2
      - StreetCode3
      - VehicleExpirationDate
      - ViolationLocation
      - ViolationPrecinct
      - ViolationInFrontOfOrOpposite
      - HouseNumber
      - StreetName
      - IntersectingStreet
      - DateFirstObserved
      - LawSection
      - SubDivision
      - ViolationLegalCode
      - DaysParkingInEffect
      - FromHoursInEffect
      - ToHoursInEffect
      - VehicleColor
      - UnregisteredVehicle
      - VehicleYear
      - MeterNumber
      - FeetFromCurb
      - ViolationPostCode
      - ViolationDescription
      - NoStandingorStoppingViolation
      - HydrantViolation
      - DoubleParkingViolation
      - Latitude
      - Longitude
      - CommunityBoard
      - CommunityCouncil
      - CensusTract
      - BIN
      - BBL
      - NTA

Overwriting store.yaml


In [13]:
# read config file
import testutility as util
config_data = util.read_config_file("store.yaml")

In [14]:
# data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'Parking_Violations_Issued_-_Fiscal_Year_2016',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['SummonsNumber - PlateID - RegistrationState - PlateType - IssueDate - ViolationCode - VehicleBodyType - VehicleMake - IssuingAgency - StreetCode1 - StreetCode2 - StreetCode3 - VehicleExpirationDate - ViolationLocation - ViolationPrecinct - ViolationInFrontOfOrOpposite - HouseNumber - StreetName - IntersectingStreet - DateFirstObserved - LawSection - SubDivision - ViolationLegalCode - DaysParkingInEffect - FromHoursInEffect - ToHoursInEffect - VehicleColor - UnregisteredVehicle - VehicleYear - MeterNumber - FeetFromCurb - ViolationPostCode - ViolationDescription - NoStandingorStoppingViolation - HydrantViolation - DoubleParkingViolation - Latitude - Longitude - CommunityBoard - CommunityCouncil - CensusTract - BIN - BBL - NTA']}

In [15]:
# read file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'

In [16]:
df = pd.read_csv(source_file, config_data['inbound_delimiter'])
df.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,1363745270,GGY6450,99,PAS,07/09/2015,46,SDN,HONDA,P,0,...,,,,,,,,,,
1,1363745293,KXD355,SC,PAS,07/09/2015,21,SUBN,CHEVR,P,55730,...,,,,,,,,,,
2,1363745438,JCK7576,PA,PAS,07/09/2015,21,SDN,ME/BE,P,42730,...,,,,,,,,,,
3,1363745475,GYK7658,NY,OMS,07/09/2015,21,SUBN,NISSA,P,58130,...,,,,,,,,,,
4,1363745487,GMT8141,NY,PAS,07/09/2015,21,P-U,LINCO,P,58130,...,,,,,,,,,,


In [17]:
# validate header of the file
util.col_header_val(df, config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['violation_county', 'days_parking_in_effect', 'intersecting_street', 'violation_post_code', 'nta', 'census_tract', 'date_first_observed', 'feet_from_curb', 'to_hours_in_effect', 'meter_number', 'vehicle_expiration_date', 'bbl', 'violation_code', 'from_hours_in_effect', 'issuer_code', 'vehicle_make', 'time_first_observed', 'latitude', 'street_code2', 'vehicle_body_type', 'violation_time', 'vehicle_color', 'street_name', 'unregistered_vehicle', 'bin', 'plate_id', 'house_number', 'issuing_agency', 'registration_state', 'violation_description', 'violation_location', 'law_section', 'issuer_precinct', 'plate_type', 'sub_division', 'community_council', 'double_parking_violation', 'summons_number', 'issuer_command', 'violation_precinct', 'violation_in_front_of_or_opposite', 'violation_legal_code', 'longitude', 'hydrant_violation', 'issuer_squad', 'street_code1', 'community_board', 'issue_date', 'no

0

In [18]:
print("columns of files are:", df.columns)
print("columns of YAML are:", config_data['columns'])

columns of files are: Index(['summons_number', 'plate_id', 'registration_state', 'plate_type',
       'issue_date', 'violation_code', 'vehicle_body_type', 'vehicle_make',
       'issuing_agency', 'street_code1', 'street_code2', 'street_code3',
       'vehicle_expiration_date', 'violation_location', 'violation_precinct',
       'issuer_precinct', 'issuer_code', 'issuer_command', 'issuer_squad',
       'violation_time', 'time_first_observed', 'violation_county',
       'violation_in_front_of_or_opposite', 'house_number', 'street_name',
       'intersecting_street', 'date_first_observed', 'law_section',
       'sub_division', 'violation_legal_code', 'days_parking_in_effect',
       'from_hours_in_effect', 'to_hours_in_effect', 'vehicle_color',
       'unregistered_vehicle', 'vehicle_year', 'meter_number',
       'feet_from_curb', 'violation_post_code', 'violation_description',
       'no_standing_or_stopping_violation', 'hydrant_violation',
       'double_parking_violation', 'latitude', '

In [19]:
if util.col_header_val(df, config_data)==0:
    print("validation failed")
else:
    print("col validation passed")

column name and column length validation failed
Following File columns are not in the YAML file ['violation_county', 'days_parking_in_effect', 'intersecting_street', 'violation_post_code', 'nta', 'census_tract', 'date_first_observed', 'feet_from_curb', 'to_hours_in_effect', 'meter_number', 'vehicle_expiration_date', 'bbl', 'violation_code', 'from_hours_in_effect', 'issuer_code', 'vehicle_make', 'time_first_observed', 'latitude', 'street_code2', 'vehicle_body_type', 'violation_time', 'vehicle_color', 'street_name', 'unregistered_vehicle', 'bin', 'plate_id', 'house_number', 'issuing_agency', 'registration_state', 'violation_description', 'violation_location', 'law_section', 'issuer_precinct', 'plate_type', 'sub_division', 'community_council', 'double_parking_violation', 'summons_number', 'issuer_command', 'violation_precinct', 'violation_in_front_of_or_opposite', 'violation_legal_code', 'longitude', 'hydrant_violation', 'issuer_squad', 'street_code1', 'community_board', 'issue_date', 'no

In [20]:
import csv
import gzip

df = dd.read_csv(
    "Parking_Violations_Issued_-_Fiscal_Year_2016.csv",
    dtype={'House Number': 'object',
       'Issuer Command': 'object',
       'Issuer Squad': 'object',
       'Time First Observed': 'object',
       'Unregistered Vehicle?': 'object',
       'Violation Description': 'object',
       'Violation Legal Code': 'object',
       'Violation Location': 'object',
       'Violation Post Code': 'object',
       'Intersecting Street': 'object',
       'Date First Observed': 'object',
       'Feet From Curb': 'object',
       'Issuer Code': 'object',
       'Issuer Precinct': 'object',
       'Law Section': 'object',
       'Vehicle Expiration Date': 'object',
       'Vehicle Year': 'object',
       'Violation Precinct': 'object'})

# write csv in gz format in pipe separated text file (|)
df.to_csv("Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz",
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/00.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/01.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/02.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/03.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/04.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/05.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/06.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/07.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/08.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz/09.part',
 '/Users/untitled/Desktop/Week 6/Parking_Violation

In [21]:
# size of the gz format folder
os.path.getsize("Parking_Violations_Issued_-_Fiscal_Year_2016.csv.gz")

1120