In [1]:
import os
import time
import gc
import pandas as pd
import dask.dataframe as dd
import csv
import gzip


### Read the file with Pandas

In [2]:
start = time.time()

pandas_df = pd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

finish = time.time()

diff_min, diff_sec = divmod(finish - start, 60)
diff_sec, diff_msec = divmod(diff_sec, 1)
diff_msec *= 1000

output_str = "The time taken for reading the file with Pandas: {:02d} min {:02d} sec {:.0f} ms".format(int(diff_min), int(diff_sec), diff_msec)

print(output_str)
gc.collect()

The time taken for reading the file with Pandas: 00 min 05 sec 329 ms


20

### Read the file with Dask

In [3]:
start = time.time()

dask_df = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

finish = time.time()

diff_min, diff_sec = divmod(finish - start, 60)
diff_sec, diff_msec = divmod(diff_sec, 1)
diff_msec *= 1000

output_str = "The time taken for reading the file with Dask: {:02d} min {:02d} sec {:.0f} ms".format(int(diff_min), int(diff_sec), diff_msec)

print(output_str)
gc.collect()

The time taken for reading the file with Dask: 00 min 00 sec 18 ms


0

In [4]:
dtype = {
    'House Number': 'object',
    'Time First Observed': 'object',
}

# Read the CSV file with specified data types
dask_df = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv', dtype=dtype)

# Display the first few rows
dask_df.head()

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,7/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,7/8/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,8/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,6/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,4,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [5]:
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 43 entries, Summons Number to Double Parking Violation
dtypes: object(25), float64(5), int64(13)

In [6]:
start = time.time()

# remove special character
dask_df.columns = dask_df.columns.str.replace('[#,@,&,.,1]','')

# removing whitespaces
dask_df.columns = dask_df.columns.str.replace(' ', '')

finish = time.time()
diff_min, diff_sec = divmod(finish - start, 60)
diff_sec, diff_msec = divmod(diff_sec, 1)
diff_msec *= 1000

output_str = "Performing simple string removal from the columns: {:02d} min {:02d} sec {:.0f} ms".format(int(diff_min), int(diff_sec), diff_msec)

print(output_str)

Performing simple string removal from the columns: 00 min 00 sec 11 ms


  dask_df.columns = dask_df.columns.str.replace('[#,@,&,.,1]','')


### Validation with YAML

In [7]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


### Write YAML file

In [8]:
dask_df.head()

Unnamed: 0,SummonsNumber,PlateID,RegistrationState,PlateType,IssueDate,ViolationCode,VehicleBodyType,VehicleMake,IssuingAgency,StreetCode,...,VehicleColor,UnregisteredVehicle?,VehicleYear,MeterNumber,FeetFromCurb,ViolationPostCode,ViolationDescription,NoStandingorStoppingViolation,HydrantViolation,DoubleParkingViolation
0,5092469481,GZH7067,NY,PAS,7/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,7/8/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,8/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,6/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,4,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [9]:
dask_df.columns

Index(['SummonsNumber', 'PlateID', 'RegistrationState', 'PlateType',
       'IssueDate', 'ViolationCode', 'VehicleBodyType', 'VehicleMake',
       'IssuingAgency', 'StreetCode', 'StreetCode2', 'StreetCode3',
       'VehicleExpirationDate', 'ViolationLocation', 'ViolationPrecinct',
       'IssuerPrecinct', 'IssuerCode', 'IssuerCommand', 'IssuerSquad',
       'ViolationTime', 'TimeFirstObserved', 'ViolationCounty',
       'ViolationInFrontOfOrOpposite', 'HouseNumber', 'StreetName',
       'IntersectingStreet', 'DateFirstObserved', 'LawSection', 'SubDivision',
       'ViolationLegalCode', 'DaysParkingInEffect', 'FromHoursInEffect',
       'ToHoursInEffect', 'VehicleColor', 'UnregisteredVehicle?',
       'VehicleYear', 'MeterNumber', 'FeetFromCurb', 'ViolationPostCode',
       'ViolationDescription', 'NoStandingorStoppingViolation',
       'HydrantViolation', 'DoubleParkingViolation'],
      dtype='object')

In [10]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: Parking_Violations_Issued_-_Fiscal_Year_2017
table_name: Parking_Violations_Issued
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - SummonsNumber
    - PlateID
    - RegistrationState
    - PlateType
    - IssueDate
    - ViolationCode
    - VehicleBodyType
    - VehicleMake
    - IssuingAgency
    - StreetCode
    - StreetCode2
    - StreetCode3
    - VehicleExpirationDate
    - ViolationLocation
    - ViolationPrecinct
    - IssuerPrecinct
    - IssuerCode
    - IssuerCommand
    - IssuerSquad
    - ViolationTime
    - TimeFirstObserved
    - ViolationCounty
    - ViolationInFrontOfOrOpposite
    - HouseNumber
    - StreetName
    - IntersectingStreet
    - DateFirstObserved
    - LawSection
    - SubDivision
    - ViolationLegalCode
    - DaysParkingInEffect
    - FromHoursInEffect
    - ToHoursInEffect
    - VehicleColor
    - UnregisteredVehicle?
    - VehicleYear
    - MeterNumber
    - FeetFromCurb
    - ViolationPostCode
    - ViolationDescription
    - NoStandingorStoppingViolation
    - HydrantViolation
    - DoubleParkingViolation

Overwriting file.yaml


In [11]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [12]:
config_data['inbound_delimiter']

','

In [13]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'Parking_Violations_Issued_-_Fiscal_Year_2017',
 'table_name': 'Parking_Violations_Issued',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['SummonsNumber',
  'PlateID',
  'RegistrationState',
  'PlateType',
  'IssueDate',
  'ViolationCode',
  'VehicleBodyType',
  'VehicleMake',
  'IssuingAgency',
  'StreetCode',
  'StreetCode2',
  'StreetCode3',
  'VehicleExpirationDate',
  'ViolationLocation',
  'ViolationPrecinct',
  'IssuerPrecinct',
  'IssuerCode',
  'IssuerCommand',
  'IssuerSquad',
  'ViolationTime',
  'TimeFirstObserved',
  'ViolationCounty',
  'ViolationInFrontOfOrOpposite',
  'HouseNumber',
  'StreetName',
  'IntersectingStreet',
  'DateFirstObserved',
  'LawSection',
  'SubDivision',
  'ViolationLegalCode',
  'DaysParkingInEffect',
  'FromHoursInEffect',
  'ToHoursInEffect',
  'VehicleColor',
  'UnregisteredVehicle?',
  'VehicleYear',
  'MeterNumber',
  'FeetFromCurb'

In [14]:
# read the file using config file
file_type = config_data['file_type']
source_file = config_data['file_name'] + f'.{file_type}'

In [15]:
del pandas_df, dask_df
gc.collect()

19

In [16]:
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

  df = pd.read_csv(source_file,config_data['inbound_delimiter'])


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
0,5092469481,GZH7067,NY,PAS,7/10/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
1,5092451658,GZH7067,NY,PAS,7/8/2016,7,SUBN,TOYOT,V,0,...,GY,,2001,,0,,FAILURE TO STOP AT RED LIGHT,,,
2,4006265037,FZX9232,NY,PAS,8/23/2016,5,SUBN,FORD,V,0,...,BK,,2004,,0,,BUS LANE VIOLATION,,,
3,8478629828,66623ME,NY,COM,6/14/2017,47,REFG,MITSU,T,10610,...,WH,,2007,,0,4,47-Double PKG-Midtown,,,
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,WHITE,,2007,,0,31 6,69-Failure to Disp Muni Recpt,,,


In [17]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['from_hours_in_effect', 'issuer_precinct', 'street_code2', 'street_name', 'summons_number', 'plate_id', 'no_standing_or_stopping_violation', 'issue_date', 'violation_precinct', 'days_parking_in_effect', 'law_section', 'vehicle_body_type', 'date_first_observed', 'house_number', 'intersecting_street', 'sub_division', 'vehicle_year', 'time_first_observed', 'unregistered_vehicle', 'violation_legal_code', 'meter_number', 'double_parking_violation', 'violation_county', 'vehicle_color', 'feet_from_curb', 'issuer_squad', 'hydrant_violation', 'violation_in_front_of_or_opposite', 'violation_description', 'violation_code', 'registration_state', 'to_hours_in_effect', 'issuer_command', 'issuer_code', 'vehicle_expiration_date', 'street_code1', 'violation_location', 'vehicle_make', 'violation_time', 'issuing_agency', 'plate_type', 'violation_post_code', 'street_code3']
Following YAML columns are not in th

0

In [18]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['summons_number', 'plate_id', 'registration_state', 'plate_type',
       'issue_date', 'violation_code', 'vehicle_body_type', 'vehicle_make',
       'issuing_agency', 'street_code1', 'street_code2', 'street_code3',
       'vehicle_expiration_date', 'violation_location', 'violation_precinct',
       'issuer_precinct', 'issuer_code', 'issuer_command', 'issuer_squad',
       'violation_time', 'time_first_observed', 'violation_county',
       'violation_in_front_of_or_opposite', 'house_number', 'street_name',
       'intersecting_street', 'date_first_observed', 'law_section',
       'sub_division', 'violation_legal_code', 'days_parking_in_effect',
       'from_hours_in_effect', 'to_hours_in_effect', 'vehicle_color',
       'unregistered_vehicle', 'vehicle_year', 'meter_number',
       'feet_from_curb', 'violation_post_code', 'violation_description',
       'no_standing_or_stopping_violation', 'hydrant_violation',
       'double_parking_violation'],
      dtype=

In [19]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
else:
    print("col validation passed")
    df_clean = df.fillna(0)
    df_transformed = df_clean.apply(lambda x: x**2)
    df_transformed.to_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

column name and column length validation failed
Following File columns are not in the YAML file ['from_hours_in_effect', 'issuer_precinct', 'street_code2', 'street_name', 'summons_number', 'plate_id', 'no_standing_or_stopping_violation', 'issue_date', 'violation_precinct', 'days_parking_in_effect', 'law_section', 'vehicle_body_type', 'date_first_observed', 'house_number', 'intersecting_street', 'sub_division', 'vehicle_year', 'time_first_observed', 'unregistered_vehicle', 'violation_legal_code', 'meter_number', 'double_parking_violation', 'violation_county', 'vehicle_color', 'feet_from_curb', 'issuer_squad', 'hydrant_violation', 'violation_in_front_of_or_opposite', 'violation_description', 'violation_code', 'registration_state', 'to_hours_in_effect', 'issuer_command', 'issuer_code', 'vehicle_expiration_date', 'street_code1', 'violation_location', 'vehicle_make', 'violation_time', 'issuing_agency', 'plate_type', 'violation_post_code', 'street_code3']
Following YAML columns are not in th

In [20]:
# Define data types for problematic columns
dtype = {
    'House Number': 'object',
    'Time First Observed': 'object',
}

# Read the CSV file with specified data types
df = dd.read_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv', dtype=dtype)

# Write csv in gz format in pipe separated text file (|)
df.to_csv('Parking_Violations_Issued_-_Fiscal_Year_2017.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

  df.to_csv(f, **kwargs)


['d:\\Data Glacier\\Week 6\\Parking_Violations_Issued_-_Fiscal_Year_2017.csv.gz\\0.part',
 'd:\\Data Glacier\\Week 6\\Parking_Violations_Issued_-_Fiscal_Year_2017.csv.gz\\1.part',
 'd:\\Data Glacier\\Week 6\\Parking_Violations_Issued_-_Fiscal_Year_2017.csv.gz\\2.part']

In [21]:
# Get file summary
file_size = os.path.getsize('Parking_Violations_Issued_-_Fiscal_Year_2017.csv.gz')
num_rows = len(df)
num_cols = len(df.columns)

# Print file summary
print("File summary:")
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")
print(f"File size: {file_size} bytes")

File summary:
Number of rows: 1048575
Number of columns: 43
File size: 0 bytes
