In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
from pyspark.sql.functions import when, isnull

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

In [2]:
#Create SparkSession",
if 'spark' not in globals():
    spark = SparkSession.builder.appName('nycparking').getOrCreate()
else:
    print(f'Spark {spark.version} is already initalized.')

23/02/05 23:36:07 WARN Utils: Your hostname, DESKTOP-K4JEK0C resolves to a loopback address: 127.0.1.1; using 172.29.234.4 instead (on interface eth0)
23/02/05 23:36:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/05 23:36:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/05 23:36:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/05 23:36:10 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/02/05 23:36:10 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/02/05 23:36:10 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


In [3]:
DATA_DIR = "/mnt/c/Users/raveendra sawkar/Documents/IITH Docs/ML EtE Hackathon/NYC Parking Ticket Problem/archive/"
import os

In [4]:
original_df = spark.read.option("header",True).csv(os.path.join(DATA_DIR, 'Parking_Violations_Issued_-_Fiscal_Year_2015.csv'))
original_df.limit(5).toPandas().head()

23/02/05 23:36:25 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Hydrant Violation,Double Parking Violation,Latitude,Longitude,Community Board,Community Council,Census Tract,BIN,BBL,NTA
0,8002531292,EPC5238,NY,PAS,10/01/2014,21,SUBN,CHEVR,T,20390,...,,,,,,,,,,
1,8015318440,5298MD,NY,COM,03/06/2015,14,VAN,FRUEH,T,27790,...,,,,,,,,,,
2,7611181981,FYW2775,NY,PAS,07/28/2014,46,SUBN,SUBAR,T,8130,...,,,,,,,,,,
3,7445908067,GWE1987,NY,PAS,04/13/2015,19,4DSD,LEXUS,T,59990,...,,,,,,,,,,
4,7037692864,T671196C,NY,PAS,05/19/2015,19,4DSD,CHRYS,T,36090,...,,,,,,,,,,


In [5]:
subdf = original_df.limit(10000)

In [6]:
pddf = subdf.toPandas()

In [7]:
pddf.columns = pddf.columns.str.strip().str.replace(' ', '_')

In [8]:
#Replace junk values with NaN (0,-,etc)
pddf = pddf.replace('0', np.nan)
pddf = pddf.replace('-', np.nan)
pddf = pddf.fillna(value=np.nan)
pddf['Registration_State'] = pddf['Registration_State'].replace('99', np.nan)
pddf['Plate_Type'] = pddf['Plate_Type'].replace('99', np.nan)

#type cast date cols to datetime format and replace vals with incorrect date
pddf['Issue_Date'] = pd.to_datetime(pddf['Issue_Date'])
pddf = pddf[ (pddf['Issue_Date'] >= '08/01/2014') & (pddf['Issue_Date'] <= '07/31/2015') ]
pddf['Vehicle_Year'] = pddf['Vehicle_Year'].astype(str).apply(lambda x: x if x<='2015' else np.nan)

In [9]:
pddf.isnull().sum()

Summons_Number                          0
Plate_ID                                0
Registration_State                     22
Plate_Type                              0
Issue_Date                              0
Violation_Code                          0
Vehicle_Body_Type                       9
Vehicle_Make                           29
Issuing_Agency                          0
Street_Code1                          444
Street_Code2                         1404
Street_Code3                         1415
Vehicle_Expiration_Date                 0
Violation_Location                      4
Violation_Precinct                      4
Issuer_Precinct                       196
Issuer_Code                            19
Issuer_Command                          0
Issuer_Squad                            0
Violation_Time                          1
Time_First_Observed                  7823
Violation_County                       56
Violation_In_Front_Of_Or_Opposite      67
House_Number                      

In [10]:
#Learn more about the dataset
pddf.info

<bound method DataFrame.info of      Summons_Number  Plate_ID Registration_State Plate_Type Issue_Date  \
0        8002531292   EPC5238                 NY        PAS 2014-10-01   
1        8015318440    5298MD                 NY        COM 2015-03-06   
3        7445908067   GWE1987                 NY        PAS 2015-04-13   
4        7037692864  T671196C                 NY        PAS 2015-05-19   
5        7704791394   JJF6834                 PA        PAS 2014-11-20   
...             ...       ...                ...        ...        ...   
9994     7969518205    L79DVL                 NJ        PAS 2015-06-09   
9995     7907420775    XBSK23                 NJ        PAS 2015-03-09   
9996     8004422019   24861JY                 NY        COM 2014-10-24   
9998     7059056555   42287MA                 NY        COM 2015-06-08   
9999     8018425530    XBPP36                 NJ        PAS 2015-04-24   

     Violation_Code Vehicle_Body_Type Vehicle_Make Issuing_Agency  \
0         

In [12]:
#Prepare to drop irrelevant/duplicate columns
pddf['Plate_ID'] = np.nan
pddf['Issue_Date'] = np.nan
pddf['Street_Code2'] = np.nan
pddf['Street_Code3'] = np.nan
pddf['Street_Name'] = np.nan
pddf['Violation_Precinct'] = np.nan
pddf['Issuer_Code'] = np.nan
pddf['Issuer_Squad'] = np.nan
pddf['House_Number'] = np.nan
pddf['Vehicle Color'] = np.nan
pddf['Violation_Post_Code'] = np.nan
pddf['Violation_Description'] = np.nan
pddf['Vehicle_Expiration_Date'] = np.nan
pddf['Date_First_Observed'] = np.nan
pddf['Vehicle_Color'] = np.nan

In [13]:
#Drop columns with 75% or more NaN values
perc = 75.0
min_count =  int(((100-perc)/100)*pddf.shape[0] + 1)
pddf = pddf.dropna( axis=1, thresh=min_count)

In [14]:
pddf.drop_duplicates(subset = ['Summons_Number'], inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  pddf.drop_duplicates(subset = ['Summons_Number'], inplace = True)


In [15]:
pddf = pddf[pddf['Violation_Location'].notna()]

In [16]:
maskA_replace = 'A'
maskP_replace = 'P'
maskA = pddf['Violation_Time'].str.contains(maskA_replace)
maskP = pddf['Violation_Time'].str.contains(maskP_replace)
pddf.loc[maskA, 'Violation_Time'] = maskA_replace
pddf.loc[maskP, 'Violation_Time'] = maskP_replace

In [17]:
maskA_replace = 'A'
maskP_replace = 'P'
maskA = pddf['From_Hours_In_Effect'].str[-1]==maskA_replace
maskP = pddf['From_Hours_In_Effect'].str[-1]==maskP_replace
pddf.loc[maskA, 'From_Hours_In_Effect'] = maskA_replace
pddf.loc[maskP, 'From_Hours_In_Effect'] = maskP_replace
maskA = pddf['To_Hours_In_Effect'].str[-1]==maskA_replace
maskP = pddf['To_Hours_In_Effect'].str[-1]==maskP_replace
pddf.loc[maskA, 'To_Hours_In_Effect'] = maskA_replace
pddf.loc[maskP, 'To_Hours_In_Effect'] = maskP_replace

In [18]:
counts = pddf['Days_Parking_In_Effect'].str.count('Y')
np.where(counts==0, 7, counts)
pddf['Days_Parking_In_Effect'] = counts

In [19]:
remap_county_dict = {
    'K' : 'Brooklyn',
    'Q' : 'Queens',
    'NY': 'Manhattan',
    'QN': 'Queens',
    'BK': 'Brooklyn',
    'R' : 'Staten Island',
    'BX': 'Bronx',
    'ST': 'Staten Island',
    'MN': 'Manhattan',
    'KINGS': 'Brooklyn',
    'QNS': 'Queens',
    'BRONX': 'Bronx'
}
pddf['Violation_County'] = pddf['Violation_County'].map(remap_county_dict)

In [20]:
def imputation(col_name):
    pddf[col_name].fillna(pddf[col_name].mode()[0], inplace=True)

In [21]:
#Limit no. of categories for columns and put extra values into 'other' category
def drop_categories(col_name):
    value_counts = pddf[col_name].value_counts()
    threshold = value_counts.sum() * 0.05
    to_merge = value_counts[value_counts <= threshold].index.tolist()
    pddf[col_name] = pddf[col_name].replace(to_merge, 'OTHER')

In [22]:
#Assign values to columns and get dummies
def separate_cols(df_temp, col_name):
    col = df_temp[col_name].unique()
    i=0
    coldict = {}
    for c in col:
        coldict[c] = i
        i=i+1
    df_temp.replace({col_name:coldict},inplace=True)
    df_temp = pd.get_dummies(df_temp, columns=[col_name])
    return df_temp

In [23]:
for col in pddf.columns:
    if pddf[col].isnull().sum() > 0:
        imputation(col)

In [24]:
drop_categories('Plate_Type')
drop_categories('Violation_Code')
drop_categories('Vehicle_Body_Type')
drop_categories('Vehicle_Make')
drop_categories('Sub_Division')
drop_categories('Street_Code1')
drop_categories('Vehicle_Year')
drop_categories('Registration_State')
drop_categories('Issuer_Command')

In [25]:
pddf.isnull().sum()

Summons_Number                       0
Registration_State                   0
Plate_Type                           0
Violation_Code                       0
Vehicle_Body_Type                    0
Vehicle_Make                         0
Issuing_Agency                       0
Street_Code1                         0
Violation_Location                   0
Issuer_Precinct                      0
Issuer_Command                       0
Violation_Time                       0
Violation_County                     0
Violation_In_Front_Of_Or_Opposite    0
Law_Section                          0
Sub_Division                         0
Days_Parking_In_Effect               0
From_Hours_In_Effect                 0
To_Hours_In_Effect                   0
Vehicle_Year                         0
dtype: int64

In [26]:
for col in pddf.columns:
    if col != 'Summons_Number' and col != 'Violation_Location':
        pddf = separate_cols(pddf, col)

In [27]:
pddf.shape

(8334, 204)

In [28]:
pddf['Violation_Location']

0       0007
1       0025
3        102
4       0028
5       0067
        ... 
9994    0047
9995    0018
9996    0019
9998    0014
9999    0009
Name: Violation_Location, Length: 8334, dtype: object

In [29]:
#Split data into train and test sets
train, test = train_test_split(pddf, test_size=0.20, random_state=1)
X_train = train.drop("Violation_Location",axis=1)
X_test = train.drop("Violation_Location",axis=1)
y_train = train["Violation_Location"]
y_test = test["Violation_Location"]
X_train.shape, y_train.shape, X_test.shape

((6667, 203), (6667,), (6667, 203))

In [38]:
#Train with Random Forest
random_forest = RandomForestClassifier(n_estimators=150, criterion='gini', max_depth=12)

random_forest.fit(X_train, y_train)

y_pred = random_forest.predict(X_test)

accuracy = random_forest.score(X_train, y_train)

In [39]:
accuracy

0.9845507724613769