In [1]:
## Spark session libraries
import findspark
findspark.init()
import pyspark
findspark.find()

from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import monotonically_increasing_id

## Required for parsing the file as csv
import csv
from io import StringIO
from itertools import islice, repeat

## For preprocessing
from re import search, split, sub, compile as comp
import numpy as np
from statistics import median

## For Plots
import matplotlib.pyplot as plt
from collections import defaultdict
from matplotlib import cm, colors


## for RF model
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import RandomForestRegressionModel
from pyspark.ml.feature import StringIndexer
from pyspark.sql import types
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.types import Row
from pyspark.ml.evaluation import RegressionEvaluator

from numpy import allclose

import warnings

In [2]:
sc = SparkContext(appName="SDDM", master='local[*,4]')
sc.setLocalProperty("spark.scheduler.pool", "pool1")
#ss = SparkSession.builder.appName('SDDM_2').getOrCreate()
#print(sc.pythonVer)
#print (sc.master)
#sc

Internally threads on PVM and JVM are not synced, and JVM thread can be reused for multiple threads on PVM, which fails to isolate local properties for each thread on PVM. 
To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. 
To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread.


In [3]:
ss = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "64g") \
    .appName('my-cool-app') \
    .config("spark.driver.maxResultSize", "32g")\
    .config("spark.executor.memory", "64g")\
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")\
    .getOrCreate()

In [4]:
def parseCSV(csvRow) :
    '''Parses a row into a list of elements '''
    data = StringIO(csvRow)
    dataReader = csv.reader(data, lineterminator = '')
    return(next(dataReader))

def readFileAsCSV(session, filepath):
    '''Reads a files as text file and then parses each row and returns a list of list: 
        [[Row]
         [Row]
         [Row]]
     '''
    try:
        data = session.textFile(name = str(filepath))
        data = data.map(parseCSV)
    except:
        print('Failed to read the file!')
        data = []
    return data

### Ticket data pre processing

In [5]:
def tickTime(x, ind):
    '''Extracts the month and year from the issue date'''
    try:
        m = int(x[ind][0:2])
        y = int(x[ind][6:10])
    except:
        m = '0'
        y = '0'    
    x.append(str(m))
    x.append(str(y))
    return x

def street_preprocess(x, ind):
    '''Preprocess the street names'''
    s = x[ind]
    s = s.replace('AVENUE','AVE').replace('STREET','ST').replace('BLVD','BL')
    s = s.replace('\sEAST\s',' E ').replace('\sWEST\s',' W ').replace('\sNORTH\s',' N ').replace('\sSOUTH\s',' S ')
    s = s.replace('\sROAD\s',' RD ').replace('\sEXPY\s','EXWY').replace('\sPARKWY\s','PKWY').replace('\sISLAND\s','ISL')
    s = s.replace('\sFIRST\s','1').replace('\sSECOND\s','2').replace('\sTHRID\s','3')
    s = s.replace('\sFOURTH\s','4').replace('\sFIVETH\s','5').replace('\sSIXTH\s','6')
    s = s.replace('\sSEVENTH\s','7').replace('\sEIGHTH\s','8').replace('\sNINETH\s','9').replace('\sTENTH\s','10')
    s = s.split()
    result = [x if not search(r'\d', x) else sub('[^0-9]','', x) for x in s]
    result = ' '.join(result)
    x[ind] = result.lower()
    return x

def rState(x, ind):
    if x[ind] == 'NY':
        x.append('0')
    else:
        x.append('1')
    return x

def violationType(x, ind):
    mydict = {"Misc":[35,41,90,91,94],
                        "No Parking":[20,21,23,24,27],
                        "No Standing":[3,4,5,6,8,10,11,12,13,14,15,16,17,18,19,22,25,26,30,31,40,44,54,57,58,63,64,77,78,81,89,92],
                        "Permit/Doc Issue":[1,2,29,70,71,72,73,76,80,83,87,88,93,97],
                        "Plate Issues":[74,75,82],
                        "Obstructing Path":[7,9,36,45,46,47,48,49,50,51,52,53,55,56,59,60,61,62,66,67,68,79,84,96,98],
                        "Overtime":[28,32,33,34,37,38,39,42,43,65,69,85,86]
                        }
    label = ''
    try:
        for key, value in mydict.items():
             for y in value:
                    if y == int(x[ind]):
                        label = key
    except: 
        label = ''
    newLabs = {0:'',
               1:"Misc",
               2:"No Parking",
               3:"No Standing",
               4:"Permit/Doc Issue",
               5:"Plate Issues",
               6:"Obstructing Path",
               7:"Overtime"}
    x.append(str(list(newLabs.keys())[list(newLabs.values()).index(label)]))
    x[ind] = label
    return x
    
def sH(x, ind):
    ''' Extracts the street number and house number from House number column
        Some house numbers are: 123-34, 45-56 and some are 34, 45 etc
    '''
    house_num = x[ind]
    try:
        if house_num == '':
            s = '0'
            h = '0'
        else:
            cond = '-' in house_num
            if cond:
                s, h = house_num.split('-')
            else:
                s = int(house_num)
                h = '0'
    except:
        s = '0'
        h = '0'
    x.append(str(s))
    x.append(str(h))
    return x
    
def preprocessedCSV(session, filepath):
    '''
    Reads the csv files, and then converts Issue date to date and month
    '''
    data = readFileAsCSV(session, filepath)
    header = data.take(1)[0]
    data = data.map(lambda x: [x[0], x[2], x[3], x[4], x[5], x[6], x[7], x[19], x[23], x[24]])
    header = data.take(1)[0]
    #print(header)
    ## Extracting the month and year
    data = data.map(lambda x: tickTime(x, header.index('Issue Date')))
    header.append('Issue Month')
    header.append('Issue Year')
    
    ## Removing the header line
    data = data.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
    ## Preprocessing the street names
    data = data.map(lambda x: street_preprocess(x, header.index('Street Name')))    
    ## Extracts the street and house number
    data = data.map(lambda x: sH(x, header.index('House Number')))
    header.append('Street')
    header.append('House')
    data = data.map(lambda x: rState(x, header.index('Registration State')))
    header.append('RState')
    data = data.map(lambda x: violationType(x, header.index('Violation Code')))
    header.append('VType')
    return data, header

### Coordinate file processing

In [6]:
def streetHouse(x):
    '''
        Extracts the street and house number ranges for a street name and coordinate
    '''
    house_num = x[0]
    try:
        if house_num == '':
            l_s_min = '0'
            l_s_max = '0'
            r_s_min = '0'
            r_s_max = '0'

            h_l_min = '0'
            h_l_max = '0'
            h_r_min = '0'
            h_r_max = '0'
        else:
            cond = '-' in house_num
            if cond:
                l_s_min, h_l_min = x[0].split('-')
                l_s_max, h_l_max = x[1].split('-')
                r_s_min, h_r_min = x[4].split('-')
                r_s_max, h_r_max = x[5].split('-')
            else:
                l_s_min = int(x[0])
                l_s_max = int(x[1])
                r_s_min = int(x[4])
                r_s_max = int(x[5])
                h_l_min = '0'
                h_l_max = '0'
                h_r_min = '0'
                h_r_max = '0'

    except:
        l_s_min = '0'
        l_s_max = '0'
        r_s_min = '0'
        r_s_max = '0'
        h_l_min = '0'
        h_l_max = '0'
        h_r_min = '0'
        h_r_max = '0'
        
    x.extend([l_s_min, l_s_max, r_s_min, r_s_max, h_l_min, h_l_max, h_r_min, h_r_max])
    return x

def geoms(x, ind):
    '''Extracting one single latitute and longitude values from the geometry '''
    coords = x[ind]
    try: 
        coords = coords.replace('MULTILINESTRING ', '').replace('(','').replace(')', '').split(', ')
        coords = [i.split(' ') for i in coords]
        coords = [[float(j), float(k)] for j,k in coords]
        lon = str(median([j for j,k in coords ]))
        lat = str(median([k for j,k in coords ]))
    except:
        lon = 'NA'
        lat = 'NA'
    x.append(lon)
    x.append(lat)
    return x
    

def createCoordsFiles(session, filepath):
    '''Reading the centerline data set and returns a preprocessed RDD'''
    coords = readFileAsCSV(session, filepath) 
    ## Removing the first line
    coords = coords.map(lambda x: [x[0],x[1],x[28],x[3],x[4],x[5]])
    ## Getting the header
    coords_header = coords.take(1)[0]
    coords = coords.mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
    ## Preprocessing the street names
    coords = coords.map(lambda x: street_preprocess(x, coords_header.index('FULL_STREE'))) 
    ## Finding the street numbers and house number limits
    coords = coords.map(streetHouse)
    coords = coords.map(lambda x: geoms(x, coords_header.index('the_geom')))
    ## Extracting the required columns
    #coords = coords.map(lambda x: [x[2],x[6],x[7],x[8],x[9], x[10], x[11]])
    coords_header.extend(['L_S_min', 'L_S_max', 'R_S_min', 'R_S_max', 'L_H_min', 'L_H_max', 'R_H_min', 'R_H_max', 'lon', 'lat'] )
    return coords, coords_header

### Match the ticket to their respectve coordinates

In [7]:
def matchstreet(t, c):
    '''
    Based on the street and house number in tickets dataframe finds the coordintae value in centerline dataframe
    and merges the coordinate value to it.
    '''
    ## Making the columns integers for comparision
    t =  t.withColumn("Issue_Month", t["Issue_Month"].cast('integer'))
    t =  t.withColumn("Issue_Year", t["Issue_Year"].cast('integer'))
    t =  t.withColumn("Street", t["Street"].cast('integer'))
    t =  t.withColumn("House", t["House"].cast('integer'))
    t =  t.withColumn("RState", t["RState"].cast('integer'))
    t =  t.withColumn("VType", t["VType"].cast('integer'))
    t =  t.withColumn("Ids", monotonically_increasing_id())
    t =  t.withColumn("Ids", t["Ids"].cast('string'))
    
    c = c.select('FULL_STREE', c.L_S_min.cast('integer'),c.L_S_max.cast('integer'),\
                 c.R_S_min.cast('integer'),c.R_S_max.cast('integer'),\
                 c.L_H_min.cast('integer'),c.L_H_max.cast('integer'),\
                 c.R_H_min.cast('integer'),c.R_H_max.cast('integer'),\
                 c.lon.cast('float'),c.lat.cast('float'))
    ## performs inner join on the tickets. 
    merged = t.join(c, [t.Street_Name == c.FULL_STREE,\
                        (t.Street>=c.L_S_min)  | (t.Street>=c.R_S_min),\
                        (t.Street<=c.L_S_max)  | (t.Street<=c.R_S_max),\
                        (t.House >=c.L_H_min)  | (t.House >=c.R_H_min),\
                        (t.House <= c.L_H_max) | (t.House <= c.R_H_max)],'inner').select('Ids', 'Summons_Number', 'Registration_State', 'Plate_Type', 'Issue_Date', 'Violation_Code', 'Vehicle_Body_Type', 'Vehicle_Make', 'House_Number', 'Street_Name', 'Issue_Month', 'Issue_Year', 'Violation_Time', 'Street', 'House', 'RState', 'VType', 'lon', 'lat')
    
    return merged.dropDuplicates(subset=['Ids'])

### Data collection and EDA

In [8]:
def getData(sc, filepath, filename, year):
    '''Reads each file in a loop and returns a list of RDDs'''
    tickets = []
    for yr in year:
        filelocation = str(filepath)+str(filename)+str(yr)+".csv"
        print(filelocation)
        parking_data, header = preprocessedCSV(sc, filelocation)
        tickets.append(parking_data)
    return tickets, header

In [9]:
def group_data(data, Val, Key):
    pairs = data.map(lambda x: (x[Key], x[Val]))
    return pairs.groupByKey().collect()

def group_data_toList(y, groupby):
    '''
    Input:
        - y: Grouped pyspark data returned from group_data function
        - groupby: Column name to remove an extra element
    '''
    lab = list(map(lambda x:x[0], y))
    val = list(map(lambda x:len(x[1]), y))
    try:
        kick = lab.index(groupby)
        lab.pop(kick)
        val.pop(kick)
    except:
        0
    if groupby=='Issue Month' or groupby == 'Violation Code':
        lab = list(map(lambda x: int(x), lab))
    return [lab,val]

In [10]:
def plot(ax, header, data, year, groupby, pt, plot_data):
    '''
    Input:
        - data: PySpark parsed CSV
        - groupby: Column name for grouping the data 
        - pt: Plot type
        - plot data: Variables required for bar plot
        
    Output: 
        - Plot
    '''
    if pt=='bar':
        col, val, axis_labels, legend_labels = plot_data
        
        data = data[data.Issue_Year == year]
        
        cat_index = data.columns.get_loc(col)
        cat1 = data[data[cat_index] == val]
        cat2 = data[data[cat_index] != val]
        cat2 = cat2.groupby('Issue_Month').sum() 
        print(cat2)
        ## plotting the graph
        ax.bar(cat1.Issue_Month, cat1[:, 4], width = 0.5, label=legend_labels[0]) 
        ax.bar(cat2.Issue_Month, cat2[:, 4], width = 0.5, label=legend_labels[1])
        
        ax.legend()
        ax.set_xlabel(groupby, fontsize=18)
        ax.set_ylabel('Number of tickets', fontsize=18)
        ax.set_title('Parking tickets for the year '+str(year-1), fontsize=22)
    
    if pt == 'pie':
        #try:
            #groupby_index = header.index(groupby)
            #count_column = header.index('Summons Number')
            
            label, data = group_data_toList(group_data(data , Val=count_column, Key=groupby_index), groupby)
            title = 'Parking ticket '+str(groupby)+' for the year '+str(year-1)
            
            if groupby == 'Violation Code': 
                mydict = { 0:'',
                           1:"Misc",
                           2:"No Parking",
                           3:"No Standing",
                           4:"Permit/Doc Issue",
                           5:"Plate Issues",
                           6:"Obstructing Path",
                           7:"Overtime"}
                ## Defining the violation code merges as a dictionary
                labs = {"Misc":[35,41,90,91,94],
                        "No Parking":[20,21,23,24,27],
                        "No Standing":[3,4,5,6,8,10,11,12,13,14,15,16,17,18,19,22,25,26,30,31,40,44,54,57,58,63,64,77,78,81,89,92],
                        "Permit/Doc Issue":[1,2,29,70,71,72,73,76,80,83,87,88,93,97],
                        "Plate Issues":[74,75,82],
                        "Obstructing Path":[7,9,36,45,46,47,48,49,50,51,52,53,55,56,59,60,61,62,66,67,68,79,84,96,98],
                        "Overtime":[28,32,33,34,37,38,39,42,43,65,69,85,86]
                        }
                ## Count based on the grouping
            
                temp = defaultdict(list)
                for i in range(len(label)):
                    for key, val in labs.items():
                        if label[i] in val:
                            if temp[key] == []:
                                temp[key] = 0
                            else:
                                temp[key] = temp[key]+data[i]

                ## Ordering data based on the dictionary 
                label, data  = list(), list()
                for key in labs.keys():
                    label.append(key)
                    data.append(temp[key])    

            ## Defining color for each category
            temp = defaultdict(list)
            for l,c in zip(labs,cm.tab20(range(len(labs)))):
                temp[l]=c

            centre_circle = plt.Circle((0,0),0.85,fc='white') ## radius to make it like a donut
            explode = np.full(len(label), 0.04) ## Gaps between the categories

            pat = ax.pie(list(map(lambda x: x*100/sum(data), data)), labels=label, textprops={'fontsize': 20}, autopct='%1.1f%%', startangle=90, pctdistance=0.6, explode = explode)
            if groupby == 'Violation Code':
                for pie_wedge in pat[0]:
                    pie_wedge.set_edgecolor('white')
                    pie_wedge.set_facecolor(temp[pie_wedge.get_label()]) # Assigning color code for each catergory

            ax.axis('equal') # Equal aspect ratio ensures that pie is drawn as a circle
            ax.set_title(title, fontsize =22, pad=20)
            plt.gcf().gca().add_artist(centre_circle)
        #except:
        #    print('Failed to plot!')        
    return ax 


def EDA(header, tickets, year, groupby, pt, plotdim, plot_data):
    fig = plt.figure(figsize=(30, 25))
    axs=plt.GridSpec(plotdim[0], plotdim[1], hspace=0.15, wspace=0.1)
    tickets = tickets.select('Issue_Year', 'Issue_Month', str(plot_data[0])).groupBy('Issue_Year', 'Issue_Month', plot_data[0]).count().toPandas()
    for i in range(len(year)):
        tickets.filter(lambda x: x)
        plot(fig.add_subplot(axs[i]), header, tickets, year[i], groupby, pt, plot_data)
    plt.savefig('EDA_'+str(groupby)+'_'+str(pt)+'.png',  bbox_inches='tight')

## The final showdown

### Matching the ticket with their coordinate values
This is done using join method
First we convert the `all_tickets` into a dataframe, then we read the cetnerline data set as a dataframe `coord_df` and then using `matchStreet` function we match the coordinates based on the left and right range of street numbers and house numbers based on the street names. 
Then we select the data which is from the years of interest

In [17]:
def Analyse(year, filepath, filename):
    ## Returns a list of RDDs, these RDD have data for each year
    tickets, header = getData(sc, filepath, filename, year)
    ## Merging the RDDs into a  one single RDD
    ticket_part0 = tickets[0]
    #ticket_part1 = tickets[1]
    #ticket_part2 = tickets[2]
    #ticket_part3 = tickets[3]
    #ticket_part4 = tickets[4]
    #ticket_part5 = tickets[5]
    
    ## Converting the RDD list to a pyspark dataframe
    ticket_part0 = ss.createDataFrame(ticket_part0, schema=[x.replace(' ', '_') for x in header])
    #ticket_part1 = ss.createDataFrame(ticket_part1, schema=[x.replace(' ', '_') for x in header])
    #ticket_part2 = ss.createDataFrame(ticket_part2, schema=[x.replace(' ', '_') for x in header])
    #ticket_part3 = ss.createDataFrame(ticket_part3, schema=[x.replace(' ', '_') for x in header])
    #ticket_part4 = ss.createDataFrame(ticket_part4, schema=[x.replace(' ', '_') for x in header])
    #ticket_part5 = ss.createDataFrame(ticket_part5, schema=[x.replace(' ', '_') for x in header])
    ## Reading the centerline data set
    coords, c_header = createCoordsFiles(sc, 'Centerline.csv')
    coord_df = ss.createDataFrame(coords, schema=c_header)
    
    ## Matching the tickets to their coordinate location
    ticket_part0 = matchstreet(ticket_part0, coord_df)
    #ticket_part1 = matchstreet(ticket_part1, coord_df)
    #ticket_part2 = matchstreet(ticket_part2, coord_df)
    #ticket_part3 = matchstreet(ticket_part3, coord_df)
    #ticket_part4 = matchstreet(ticket_part4, coord_df)
    #ticket_part5 = matchstreet(ticket_part5, coord_df)
    
    ## Keeping the dataset which is from 2015 to 2020
    ticket_part0 = ticket_part0.filter(F.col('Issue_Year').isin(year))
    #ticket_part1 = ticket_part1.filter(F.col('Issue_Year').isin(year))
    #ticket_part2 = ticket_part2.filter(F.col('Issue_Year').isin(year))
    #ticket_part3 = ticket_part3.filter(F.col('Issue_Year').isin(year))
    #ticket_part4 = ticket_part4.filter(F.col('Issue_Year').isin(year))
    #ticket_part5 = ticket_part5.filter(F.col('Issue_Year').isin(year))
    
    #return ticket_part0, ticket_part1,ticket_part2,ticket_part3,ticket_part4,ticket_part5
    return ticket_part0

In [18]:
%%time
filepath = 'nyc-parking-tickets/'
filename = 'Parking_Violations_Issued_-_Fiscal_Year_'
year = list(range(2015, 2021))

# this is set for reading only the first year right now
year=[2015]

#t_part1, t_part2,t_part3,t_part4,t_part5,t_part6 = Analyse(year, filepath, filename)
t_part1= Analyse(year, filepath, filename)

nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2015.csv
Wall time: 3.22 s


In [19]:
%%time
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "lon"], outputCol="features")
t_part1_with_location = vecAssembler.transform(t_part1)
#t_part2_with_location = vecAssembler.transform(t_part2)
#t_part3_with_location = vecAssembler.transform(t_part3)
#t_part4_with_location = vecAssembler.transform(t_part4)
#t_part5_with_location = vecAssembler.transform(t_part5)
#t_part6_with_location = vecAssembler.transform(t_part6)

Wall time: 153 ms


In [20]:
%%time

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=796, seed=1)  #  clusters here

model = kmeans.fit(t_part1_with_location.select('features'))
transformed_1 = model.transform(t_part1_with_location)
#transformed_2 = model.transform(t_part2_with_location)
#transformed_3 = model.transform(t_part3_with_location)
#transformed_4 = model.transform(t_part4_with_location)
#transformed_5 = model.transform(t_part5_with_location)
#transformed_6 = model.transform(t_part6_with_location)
'''
from pyspark.ml.clustering import BisectingKMeans
bkm = BisectingKMeans().setK(796).setSeed(1)
model = bkm.fit(t_part1_with_location.select('features'))
transformed_1 = model.transform(t_part1_with_location)
model = bkm.fit(t_part2_with_location.select('features'))
transformed_2 = model.transform(t_part2_with_location)
'''

Wall time: 8min 40s


"\nfrom pyspark.ml.clustering import BisectingKMeans\nbkm = BisectingKMeans().setK(796).setSeed(1)\nmodel = bkm.fit(t_part1_with_location.select('features'))\ntransformed_1 = model.transform(t_part1_with_location)\nmodel = bkm.fit(t_part2_with_location.select('features'))\ntransformed_2 = model.transform(t_part2_with_location)\n"

ctr=[]
centers = model.clusterCenters()
for center in centers:
    ctr.append(center)

ctr[0]

import folium
from folium.plugins import MarkerCluster
mc = MarkerCluster()
m = folium.Map(
    location=[40.767937,-73.982155],
    zoom_start=12,)


for i in range(len(ctr)):
    mc.add_child(folium.Marker(location=[ ctr[i][0],ctr[i][1] ]))


m.add_child(mc)
m.save('F:/marker_cluster_example_file.html')
m

amenity_names = []
amenity_count = []
import requests, json, time, random
import pandas as pd
from collections import Counter

Amenity_per_location =pd.DataFrame()
Amenity_per_location_2=pd.DataFrame()
overpass_url = "http://overpass-api.de/api/interpreter"
for i in range(len(ctr)):
    overpass_query = '[out:json];' +'(' + \
    '// query part for: “aminity=*”' +'\n'+\
    'node["amenity"](around:1000,' + str(ctr[i][0])+','+str(ctr[i][1])+');'+'\n'+\
    'way["amenity"](around:1000,' +  str(ctr[i][0])+','+str(ctr[i][1])+');'+'\n'+\
    'relation["amenity"](around:1000,' +  str(ctr[i][0])+','+str(ctr[i][1])+');'+'\n'+\
    ');' + '\n'+\
    '// print results'+'\n'+\
    'out;'+'\n'+\
    '>;'+'out count;'
     
    response = requests.get(overpass_url, 
                        params={'data': overpass_query})
    try:
        data  = response.json()
    
    except (requests.exceptions.ConnectionError, json.decoder.JSONDecodeError):
        time.sleep(2**1 + random.random()*0.01) #exponential backoff


    typeamenity =[]
    for l in range(len(data['elements'])):
        try:
            ind = list(data['elements'][l].keys()).index('tags')
        
            typeamenity.append(data['elements'][l]['tags']['amenity'])
        except:
            ind = False

    amenity_names.append(list(Counter(typeamenity).keys()))
    amenity_count.append(list(Counter(typeamenity).values()))
    dictionary = dict(zip(amenity_names[i], amenity_count[i]))
    Amenity_per_location =  pd.DataFrame.from_dict(dictionary, orient='index')
    Amenity_per_location_2 = pd.concat([Amenity_per_location, Amenity_per_location_2], axis=1, sort=True)

#### This is how each ticket file look like

Each ticket will have multiple columns below you can see some of the columns along with the coordinate of each ticket

Amenity_per_location_2 = Amenity_per_location_2.T
Amenity_per_location_2['prediction'] =np.arange(0,len(Amenity_per_location_2))
Amenity_per_location_2.to_csv('F:/Amenity_per_location_2_clusters.csv')

In [21]:
%%time
Amenity_per_location_2 = ss.read.csv("Amenity_per_location.csv", header=True, sep=",");
Location_with_features_1 = Amenity_per_location_2.join(transformed_1, on=['prediction'], how='right_outer')
#Location_with_features_2 = Amenity_per_location_2.join(transformed_2, on=['prediction'], how='right_outer')
#Location_with_features_3 = Amenity_per_location_2.join(transformed_2, on=['prediction'], how='right_outer')
#Location_with_features_4 = Amenity_per_location_2.join(transformed_2, on=['prediction'], how='right_outer')
#Location_with_features_5 = Amenity_per_location_2.join(transformed_2, on=['prediction'], how='right_outer')
#Location_with_features_6 = Amenity_per_location_2.join(transformed_2, on=['prediction'], how='right_outer')

Wall time: 169 ms


In [22]:
Location_with_features_1 = Location_with_features_1.withColumn("Violation_Time", F.concat(F.col("Violation_Time"), F.lit("M")))
Location_with_features_1 = Location_with_features_1.withColumn("Violation_Time", F.to_timestamp(Location_with_features_1.Violation_Time, 'KKmmaa'))
Location_with_features_1 = Location_with_features_1.withColumn("Violation_Time", F.date_format('Violation_Time', 'HH'))
Location_with_features_1 = Location_with_features_1.filter(Location_with_features_1.Violation_Time.isNotNull())

In [23]:
filelocation_weather = 'merged_weather_holidays_fixedMissingValues.csv'
weather = ss.read.csv(filelocation_weather, header=True)
weather = weather.withColumn("time", F.to_timestamp(weather.time, 'HH:mm:ss'))
weather = weather.withColumn("date", F.to_timestamp(weather.date, 'yyyy-MM-dd'))
weather = weather.withColumn("time", F.date_format('time', 'HH'))
weather = weather.withColumn("date", F.date_format('date', 'MM/dd/yyyy'))
weather = weather.withColumnRenamed("time", "Violation_Time")
weather = weather.withColumnRenamed("date", "Issue_Date")

In [24]:
#weather_locationFeatures_joined = Location_with_features_1.join(weather, (Location_with_features_1.Issue_Date == weather.date) & (Location_with_features_1.Violation_Time == weather.time))
weather_locationFeatures_joined = Location_with_features_1.join(weather, ['Issue_Date', 'Violation_Time'])

In [25]:
agg = weather_locationFeatures_joined.groupBy('prediction','Issue_Date','Violation_Time').count()

In [27]:
re_merge = agg.join(weather, ["Issue_Date", "Violation_Time"])
re_merge_2 = re_merge.join(Amenity_per_location_2, (re_merge.prediction == Amenity_per_location_2.prediction))

In [30]:
re_merge_2 = re_merge_2.drop("prediction")
re_merge_2 = re_merge_2.drop("datetime")

In [31]:
re_merge_2 = re_merge_2.withColumn("Issue_Date", F.to_timestamp(re_merge_2.Issue_Date, 'MM/dd/yyyy'))
re_merge_2 = re_merge_2.withColumn('Day_of_week',F.dayofweek(re_merge_2.Issue_Date))
re_merge_2 = re_merge_2.withColumn('Day_of_year',F.dayofyear(re_merge_2.Issue_Date))
re_merge_2 = re_merge_2.withColumn('Day_of_month',F.dayofmonth(re_merge_2.Issue_Date))
re_merge_2 = re_merge_2.withColumn("Month", F.date_format('Issue_Date', 'MM'))
re_merge_2 = re_merge_2.withColumn("Year", F.date_format('Issue_Date', 'YYYY'))

In [32]:
re_merge_2 = re_merge_2.drop("Issue_Date")

In [33]:
# these are chosen pretty recklessly. the ones i assumed would have the least correlation to parking tickets

amenities_to_drop = ['amenity|ice_cream', 
'animal_boarding', 
'animal_shelter', 
'art_centre', 
'arts_centre', 
'bicycle_parking',
'bicycle_rental', 
'bicycle_repair_station', 
'biergarten', 
'boat_rental', 
'boat_storage',
'car_rental', 
'car_service', 
'car_sharing',
 'car_wash',
 'charging_station',
 'clock',
 'community_centre',
 'compressed_air',
 'concert_hall',
 'cooking_school',
 'courthouse',
 'coworking_space',
 'dancing_school',
 'dentist',
 'disused',
 'dojo',
 'drinking_water',
 'driving_school',
 'ferry_terminal',
 'fire_station',
 'food_court',
 'fortune_teller',
 'fountain',
 'fuel',
 'graphic_design',
 'grave_yard',
 'ice_cream',
 'internet_cafe',
 'karaoke_box',
 'language_school',
 'library',
 'loading_dock',
 'meditation_centre',
 'monastery',
 'motorcycle_parking',
 'museum',
 'music_school',
 'music_venue',
 'nail salon',
 'nail_salon',
 'nursing_home',
 'outdoor_seating',
 'parking',
 'parking_entrance',
 'parking_space',
 'payment_centre',
 'payment_terminal', 'picnic_table',
 'police',
 'post_box',
 'post_depot',
 'prep_school',
 'prison',
 'public_bath',
 'public_bookcase',
 'public_building',
 'radio station',
 'ranger_station',
 'recycling',
 'rescue_station',
 'research_institute',
 'salon',
 'self_storage',
 'shelter',
 'shoe_repair',
 'smoking_area',
 'social_centre',
 'social_facility',
 'spa',
 'stock_exchange',
 'stripclub',
 'studio', 'swimming_pool',
 'swingerclub',
 'taxi',
 'telephone',
 'theatre',
 'toilets',
 'tourism',
 'townhall',
 'training',
 'university',
 'urgent_care',
 'vehicle_inspection',
 'vending_machine',
 'veterinary',
 'waste_basket',
 'waste_disposal',
 'waste_transfer_station',
 'wifi;telephone;device_charging_station',
'_c0']

In [34]:
for c in amenities_to_drop:
    re_merge_2 = re_merge_2.drop(c)

In [35]:
%%time

# indexing categorical features

categorical_features = ["summary", "icon", "precipType"]

for cat in categorical_features:
    stringIndexer = StringIndexer(inputCol=cat, outputCol=cat+"_cat", stringOrderType="frequencyDesc")
    stringIndexer.setHandleInvalid("keep")
    model = stringIndexer.fit(re_merge_2)
    model.setHandleInvalid("keep")
    re_merge_2 = model.transform(re_merge_2)

Wall time: 13min 1s


In [36]:
# dropping non-indexed categorical feature columns

re_merge_2 = re_merge_2.drop("summary")
re_merge_2 = re_merge_2.drop("icon")
re_merge_2 = re_merge_2.drop("precipType")

['Violation_Time',
 'count',
 'precipIntensity',
 'precipProbability',
 'temperature',
 'apparentTemperature',
 'dewPoint',
 'humidity',
 'pressure',
 'windSpeed',
 'windGust',
 'windBearing',
 'cloudCover',
 'uvIndex',
 'visibility',
 'precipAccumulation',
 'ozone',
 'Holiday',
 'atm',
 'bakery',
 'bank',
 'bar',
 'bbq',
 'bench',
 'bureau_de_change',
 'bus_station',
 'cafe',
 'childcare',
 'cinema',
 'clinic',
 'clothing store',
 'college',
 'doctors',
 'embassy',
 'events_venue',
 'fast_food',
 'gym',
 'hospital',
 'kindergarten',
 'marketplace',
 'money_transfer',
 'nightclub',
 'pharmacy',
 'place_of_worship',
 'post_office',
 'pub',
 'restaurant',
 'school',
 'supermarket',
 'Day_of_week',
 'Day_of_year',
 'Day_of_month',
 'Month',
 'Year',
 'summary_cat',
 'icon_cat',
 'precipType_cat']

In [None]:
re_merge_2.printSchema()

In [37]:
#cat_features = ["summary_cat", "icon_cat", "precipType_cat", "summary", "icon", "precipType"]

# the above line is commented since
# the stringindexed categorical vars can also be float and not double

cat_features = [] 
floats = [x for x in re_merge_2.columns if x not in cat_features]
for feature in floats:
    re_merge_2 = re_merge_2.withColumn(feature, re_merge_2[feature].cast(types.FloatType()))

In [39]:
feature_indices = [i for i, x in enumerate(re_merge_2.columns) if i!=1] # all columns except the label

# transformed_df contains the data as a list of LabeledPoints which contain the label and a vector of features
# that is the format needed for the RF
transformed_df = re_merge_2.rdd.map(lambda row: LabeledPoint(row[1], Vectors.dense(np.array(row)[np.array(feature_indices)])))

In [40]:
%%time
from pyspark.mllib.tree import RandomForest

TRAINING_DATA_RATIO = 0.8
RANDOM_SEED = 3

# in general, these should probably be pushed as far upwards as our machines can handle for better performance
# training for 1 year with current parameters takes <10min

RF_NUM_TREES = 10
RF_MAX_DEPTH = 5
RF_MAX_BINS = 10

splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data, test_data = transformed_df.randomSplit(splits, RANDOM_SEED)

# we should look into whether categoricalFeaturesInfo should be set to the categorical variables ?

model = RandomForest.trainRegressor(training_data, categoricalFeaturesInfo={}, \
    numTrees=RF_NUM_TREES, featureSubsetStrategy="auto", \
    maxDepth=RF_MAX_DEPTH, maxBins=RF_MAX_BINS, seed=RANDOM_SEED)

Wall time: 6min 37s


In [42]:
# change the model name if you dont want to overwrite a previous model ! (change date or version)
%%time

model_name = "RF_test_model_26_6_2020"
model.save(sc, model_name)

In [43]:
%%time

# getting the labels takes <10min

predictions = model.predict(test_data.map(lambda x: x.features))
labels_and_predictions = test_data.map(lambda x: x.label).zip(predictions)

Model accuracy: 0.000%
Wall time: 8min 9s


In [50]:
#converting the labels_and_predictions RDD to a DataFrame

def f(x):
    d = {}
    for i in range(len(x)):
        d[str(i)] = x[i]
    return d

predictions_labels_df = labels_and_predictions.map(lambda x: Row(**f(x))).toDF()

In [52]:
# evaluating...
# for 2015 - rmse: 5.0768, mae: 2.8551

evaluator_rmse = RegressionEvaluator(labelCol="0", predictionCol="1", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions_labels_df)
print('RMSE: '+str(rmse))

In [53]:
rmse

5.076857361810865

In [54]:
evaluator_mae = RegressionEvaluator(labelCol="0", predictionCol="1", metricName="mae")
mae = evaluator_mae.evaluate(predictions_labels_df)
print('MAE: '+str(mae))

In [55]:
mae

2.8551452954946157