In [161]:
from pyspark.sql import SparkSession
from pyspark.ml import feature
from pyspark.ml import classification
from pyspark.sql import functions as fn
from pyspark.ml import Pipeline, evaluation
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, \
    RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql import SparkSession
from pyspark_pipes import pipe
from pyspark.sql.functions import col, substring, when, count, isnan

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [8]:
nypd = spark.read.csv('NYPD.csv', header=True, inferSchema=True)

In [9]:
nypd.limit(5).toPandas()

Unnamed: 0,CMPLNT_NUM,CMPLNT_FR_DT,CMPLNT_FR_TM,CMPLNT_TO_DT,CMPLNT_TO_TM,ADDR_PCT_CD,RPT_DT,KY_CD,OFNS_DESC,PD_CD,...,SUSP_SEX,TRANSIT_DISTRICT,Latitude,Longitude,Lat_Lon,PATROL_BORO,STATION_NAME,VIC_AGE_GROUP,VIC_RACE,VIC_SEX
0,700381962,05/28/2015,15:00:00,,,46,06/01/2015,578,HARRASSMENT 2,638,...,M,,40.84586773,-73.915888033,"(40.84586773, -73.915888033)",PATROL BORO BRONX,,25-44,WHITE HISPANIC,F
1,593660503,02/20/2012,01:30:00,02/20/2012,02:00:00,32,02/20/2012,344,ASSAULT 3 & RELATED OFFENSES,101,...,M,,40.811522012,-73.940601181,"(40.811522012, -73.940601181)",PATROL BORO MAN NORTH,,25-44,BLACK,F
2,889259677,09/28/2012,09:30:00,09/28/2012,18:45:00,47,10/02/2012,578,HARRASSMENT 2,638,...,M,,40.878003133,-73.846891755,"(40.878003133, -73.846891755)",PATROL BORO BRONX,,18-24,BLACK HISPANIC,F
3,977712246,04/27/2015,12:40:00,04/27/2015,12:45:00,73,04/27/2015,578,HARRASSMENT 2,638,...,M,,40.666464662,-73.904103595,"(40.666464662, -73.904103595)",PATROL BORO BKLYN NORTH,,45-64,BLACK,M
4,438966845,12/19/2016,13:06:00,12/19/2016,13:23:00,49,12/19/2016,344,ASSAULT 3 & RELATED OFFENSES,101,...,M,,40.844696738,-73.86493511,"(40.844696738, -73.86493511)",PATROL BORO BRONX,,25-44,BLACK HISPANIC,F


In [10]:
#nypd.printSchema()

In [11]:
#nypd.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in nypd.columns]).show()

+----------+------------+------------+------------+------------+-----------+------+-----+---------+-----+-------+----------------+----------+-------+-----------------+-------------+----------+-----------------+--------+----------+-----------+----------+----------+--------------+---------+--------+----------------+--------+---------+-------+-----------+------------+-------------+--------+-------+
|CMPLNT_NUM|CMPLNT_FR_DT|CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|ADDR_PCT_CD|RPT_DT|KY_CD|OFNS_DESC|PD_CD|PD_DESC|CRM_ATPT_CPTD_CD|LAW_CAT_CD|BORO_NM|LOC_OF_OCCUR_DESC|PREM_TYP_DESC|JURIS_DESC|JURISDICTION_CODE|PARKS_NM|HADEVELOPT|HOUSING_PSA|X_COORD_CD|Y_COORD_CD|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|Latitude|Longitude|Lat_Lon|PATROL_BORO|STATION_NAME|VIC_AGE_GROUP|VIC_RACE|VIC_SEX|
+----------+------------+------------+------------+------------+-----------+------+-----+---------+-----+-------+----------------+----------+-------+-----------------+-------------+----------+----------

In [12]:
drop_list = []

#DROP THE FOLLOWING COLUMNS BECAUSE THERE ARE A LOT OF MISSING VALUES OR FEATURES WE DO NOT WANT TO USE FOR ANALYSIS
#nypd.select("CMPLNT_NUM").show()
#nypd.select([count(when((isnan('CMPLNT_NUM')| col('CMPLNT_NUM').isNull()),True))]).show() 
drop_list.append("CMPLNT_NUM")

#nypd.select("CMPLNT_TO_DT").show()
#nypd.select([count(when((isnan('CMPLNT_TO_DT')| col('CMPLNT_TO_DT').isNull()),True))]).show() 
drop_list.append("CMPLNT_TO_DT")

#nypd.select("CMPLNT_TO_TM").show()
#nypd.select([count(when((isnan('CMPLNT_TO_TM')| col('CMPLNT_TO_TM').isNull()),True))]).show()
drop_list.append("CMPLNT_TO_TM")

#nypd.groupBy('ADDR_PCT_CD').count().show()
#nypd.select([count(when((isnan('ADDR_PCT_CD')|col('ADDR_PCT_CD').isNull()),True))]).show()
drop_list.append("ADDR_PCT_CD")

#nypd.groupBy('HADEVELOPT').count().show()
#nypd.select([count(when((isnan('HADEVELOPT')| col('HADEVELOPT').isNull()),True))]).show()
drop_list.append("HADEVELOPT")

#nypd.groupBy('HOUSING_PSA').count().show()
#nypd.select([count(when((isnan('HOUSING_PSA')| col('HOUSING_PSA').isNull()),True))]).show()
drop_list.append("HOUSING_PSA")

#nypd.groupBy('PARKS_NM').count().show()
#nypd.select([count(when((isnan('PARKS_NM')| col('PARKS_NM').isNull()),True))]).show()
drop_list.append("PARKS_NM")

#nypd.groupBy('STATION_NAME').count().show(truncate=False)
#nypd.select([count(when((isnan('STATION_NAME')| col('STATION_NAME').isNull()),True))]).show()
drop_list.append("STATION_NAME")

#nypd.groupBy('TRANSIT_DISTRICT').count().show(truncate=False)
#nypd.select([count(when((isnan('TRANSIT_DISTRICT')| col('TRANSIT_DISTRICT').isNull()),True))]).show()
drop_list.append("TRANSIT_DISTRICT")

#nypd.groupBy('SUSP_AGE_GROUP').count().show(truncate=False)
#nypd.select([count(when((isnan('SUSP_AGE_GROUP')| col('SUSP_AGE_GROUP').isNull()),True))]).show()
drop_list.append("SUSP_AGE_GROUP")

#nypd.groupBy('JURIS_DESC').count().show()
#nypd.select([count(when((isnan('JURIS_DESC')| col('JURIS_DESC').isNull()),True))]).show()
drop_list.append("JURIS_DESC")

#THE SAME INFORMATION IS GIVEN BY BORO_NM
#nypd.groupBy('PATROL_BORO').count().show(truncate=False)
#nypd.select([count(when((isnan('PATROL_BORO')| col('PATROL_BORO').isNull()),True))]).show()
drop_list.append("PATROL_BORO")

#X_COORD_CD and Y_COORD_CD have high correlation with Longitude and Latitude respectively
#nypd.groupBy('X_COORD_CD').count().show(truncate=False)
#nypd.select([count(when((isnan('X_COORD_CD')| col('X_COORD_CD').isNull()),True))]).show()
drop_list.append("X_COORD_CD")

#nypd.groupBy('Y_COORD_CD').count().show(truncate=False)
#nypd.select([count(when((isnan('Y_COORD_CD')| col('Y_COORD_CD').isNull()),True))]).show()
drop_list.append("Y_COORD_CD")

#nypd.groupBy('Lat_lon').count().show(truncate=False)
#nypd.select([count(when((isnan('Lat_lon')| col('Lat_lon').isNull()),True))]).show()
drop_list.append("Lat_lon")

#THE SAME INFORMATION IS GIVEN BY CMPLNT_FR_DT
#nypd.groupBy('RPT_DT').count().show()
#nypd.select([count(when((isnan('RPT_DT')| col('RPT_DT').isNull()),True))]).show()
drop_list.append("RPT_DT")

#GIVES INFO ON THE TARGET VARIABLE DIRECTLY
#nypd.groupBy('KY_CD').count().show()
#nypd.select([count(when((isnan('KY_CD')| col('KY_CD').isNull()),True))]).show()
drop_list.append("KY_CD")

#nypd.groupBy('OFNS_DESC').count().show()
#nypd.select([count(when((isnan('OFNS_DESC')| col('OFNS_DESC').isNull()),True))]).show()
drop_list.append("OFNS_DESC")

#nypd.groupBy('PD_CD').count().show()
#nypd.select([count(when((isnan('PD_CD')| col('PD_CD').isNull()),True))]).show()
drop_list.append("PD_CD")

#nypd.groupBy('PD_DESC').count().show()
#nypd.select([count(when((isnan('PD_DESC')| col('PD_DESC').isNull()),True))]).show()
drop_list.append("PD_DESC")
#DROP CMPLNT_NUM, CMPLNT_TO_DT, CMPLNT_TO_TM, ADDR_PCT_CD, HADEVELOPT, HOUSING_PSA, PARKS_NM, STATION_NAME, TRANSIT_DISTRICT, 
# SUSP_AGE_GROUP, PATROL_BORO, RPT_DT, KY_CD, OFNS_DESC, PD_CD, PD_DESC

In [205]:
drop_list
new_nypd_df = nypd.drop(*drop_list)

In [206]:
#new_nypd_df.select("CMPLNT_FR_TM").show()
#new_nypd_df.select([count(when((isnan('CMPLNT_FR_TM')| col('CMPLNT_FR_TM').isNull()),True))]).show()
new_nypd_df = new_nypd_df.na.drop(subset=["CMPLNT_FR_TM"])

In [207]:
new_nypd_df = new_nypd_df.withColumn('CMPLNT_FR_TM', substring('CMPLNT_FR_TM', 1,2))
new_nypd_df = new_nypd_df.withColumn("Hour", new_nypd_df['CMPLNT_FR_TM'].cast('int')).drop('CMPLNT_FR_TM')
new_nypd_df = new_nypd_df.withColumn("Time",
                       fn.when((col("Hour") >= 6) & (col("Hour") <= 19) , "Day")
                        .otherwise("Night"))
new_nypd_df = new_nypd_df.drop('Hour')

In [208]:
new_nypd_df = new_nypd_df.withColumn('CMPLNT_FR_DT', substring('CMPLNT_FR_DT', 1,2))
new_nypd_df = new_nypd_df.withColumnRenamed("CMPLNT_FR_DT","Month")
#new_nypd_df.select('Month').show()

In [209]:
#df1.show()
#new_nypd_df.select(col('Month'))

new_nypd_df = new_nypd_df.withColumn("Season",
                       fn.when((col("Month") == "03") | (col("Month") == "04") | (col("Month") == "05") , "Spring")
                        .when((col("Month") == "06") | (col("Month") == "07") | (col("Month") == "08") , "Summer")
                        .when((col("Month") == "09") | (col("Month") == "10") | (col("Month") == "11") , "Autumn")
                        .otherwise("Winter"))
new_nypd_df = new_nypd_df.drop('Month')

In [210]:
#new_nypd_df.groupBy('BORO_NM').count().show()
#new_nypd_df.select([count(when((isnan('BORO_NM')| col('BORO_NM').isNull()),True))]).show()
#new_nypd_df.groupBy('CRM_ATPT_CPTD_CD').count().show()
new_nypd_df = new_nypd_df.na.drop(subset=["BORO_NM","CRM_ATPT_CPTD_CD"])

+-------------+------+
|      BORO_NM| count|
+-------------+------+
|         null|  5102|
|       QUEENS|432290|
|     BROOKLYN|663953|
|        BRONX|467132|
|    MANHATTAN|411166|
|STATEN ISLAND|105722|
+-------------+------+



In [211]:
#new_nypd_df.groupBy('JURISDICTION_CODE').count().show()
#new_nypd_df.select([count(when((isnan('JURISDICTION_CODE')| col('JURISDICTION_CODE').isNull()),True))]).show()
new_nypd_df = new_nypd_df.withColumn("JURISDICTION",
                       fn.when((col("JURISDICTION_CODE") == "0") , "Police")
                        .otherwise("Other"))
new_nypd_df = new_nypd_df.drop('JURISDICTION_CODE')

In [113]:
new_nypd_df.groupBy('LAW_CAT_CD').count().show()
new_nypd_df.select([count(when((isnan('LAW_CAT_CD')| col('LAW_CAT_CD').isNull()),True))]).show()



+-----------+------+
| LAW_CAT_CD| count|
+-----------+------+
|     FELONY|512147|
|MISDEMEANOR|920266|
|  VIOLATION|647848|
+-----------+------+

+--------------------------------------------------------------------------+
|count(CASE WHEN (isnan(LAW_CAT_CD) OR (LAW_CAT_CD IS NULL)) THEN true END)|
+--------------------------------------------------------------------------+
|                                                                         0|
+--------------------------------------------------------------------------+



In [212]:
new_nypd_df = new_nypd_df.withColumn("LAW_CAT_CD",
                                    fn.when((col("LAW_CAT_CD") == "FELONY") , 1)
                                    .otherwise(0))

In [213]:
#new_nypd_df.groupBy('LOC_OF_OCCUR_DESC').count().show()
#new_nypd_df.select([count(when((isnan('LOC_OF_OCCUR_DESC')| col('LOC_OF_OCCUR_DESC').isNull()),True))]).show()

In [214]:
new_nypd_df = new_nypd_df.withColumn("LOC_OF_OCCUR_DESC",
                       fn.when((col("LOC_OF_OCCUR_DESC") == "INSIDE") , "Inside")
                        .otherwise("Outside"))

In [215]:
#new_nypd_df.groupBy('PREM_TYP_DESC').count().show(truncate=False)
#new_nypd_df.select([count(when((isnan('PREM_TYP_DESC')| col('PREM_TYP_DESC').isNull()),True))]).show()
#CHANGE TO PUBLIC SETTING AND PRIVATE PREMISES

In [216]:
new_nypd_df = new_nypd_df.withColumn("PREM_TYP_DESC",
                                    fn.when((col("PREM_TYP_DESC") == "RESIDENCE - APT. HOUSE") | (col("PREM_TYP_DESC") == "RESIDENCE - PUBLIC HOUSING")| (col("PREM_TYP_DESC") == "RESIDENCE-HOUSE") , "RESIDENCE")
                                    .otherwise("Public"))

In [217]:
new_nypd_df.groupBy('SUSP_RACE').count().show(truncate=False)
new_nypd_df.select([count(when((isnan('SUSP_RACE')| col('SUSP_RACE').isNull()),True))]).show()

+------------------------------+-------+
|SUSP_RACE                     |count  |
+------------------------------+-------+
|WHITE                         |310887 |
|BLACK                         |1046937|
|AMERICAN INDIAN/ALASKAN NATIVE|9384   |
|BLACK HISPANIC                |142902 |
|WHITE HISPANIC                |477971 |
|OTHER                         |9      |
|ASIAN / PACIFIC ISLANDER      |92171  |
+------------------------------+-------+

+------------------------------------------------------------------------+
|count(CASE WHEN (isnan(SUSP_RACE) OR (SUSP_RACE IS NULL)) THEN true END)|
+------------------------------------------------------------------------+
|                                                                       0|
+------------------------------------------------------------------------+



In [218]:
#new_nypd_df.groupBy('SUSP_SEX').count().show(truncate=False)
#new_nypd_df.select([count(when((isnan('SUSP_SEX')| col('SUSP_SEX').isNull()),True))]).show()
new_nypd_df = new_nypd_df.na.drop(subset=["SUSP_SEX"])

In [219]:
new_nypd_df.groupBy('VIC_AGE_GROUP').count().show(truncate=False)
new_nypd_df.select([count(when((isnan('VIC_AGE_GROUP')| col('VIC_AGE_GROUP').isNull()),True))]).show()
new_nypd_df = new_nypd_df.withColumn("VIC_AGE_GROUP",
                       fn.when((col("VIC_AGE_GROUP") == "<18") , "<18")
                       .when((col("VIC_AGE_GROUP") == "18-24"), "18-24")
                       .when((col("VIC_AGE_GROUP") == "25-44" ), "25-44")
                       .when((col("VIC_AGE_GROUP") == "45-64"), "45-64")
                       .when((col("VIC_AGE_GROUP") == "65+"), "65+")
                        .otherwise(None))
new_nypd_df = new_nypd_df.na.drop(subset=["VIC_AGE_GROUP"])

+-------------+------+
|VIC_AGE_GROUP|count |
+-------------+------+
|944          |1     |
|926          |1     |
|-71          |1     |
|-942         |1     |
|-1           |1     |
|-959         |2     |
|938          |3     |
|-954         |2     |
|-39          |1     |
|-55          |1     |
|-985         |1     |
|-929         |1     |
|970          |1     |
|1012         |1     |
|924          |1     |
|-983         |1     |
|-59          |1     |
|-7           |1     |
|<18          |177962|
|917          |1     |
+-------------+------+
only showing top 20 rows

+--------------------------------------------------------------------------------+
|count(CASE WHEN (isnan(VIC_AGE_GROUP) OR (VIC_AGE_GROUP IS NULL)) THEN true END)|
+--------------------------------------------------------------------------------+
|                                                                           26523|
+--------------------------------------------------------------------------------+



In [220]:
#new_nypd_df.groupBy('VIC_RACE').count().show(truncate=False)
#new_nypd_df.select([count(when((isnan('VIC_RACE')| col('VIC_RACE').isNull()),True))]).show()

+------------------------------+------+
|VIC_RACE                      |count |
+------------------------------+------+
|WHITE                         |404891|
|BLACK                         |819423|
|AMERICAN INDIAN/ALASKAN NATIVE|12761 |
|BLACK HISPANIC                |112129|
|WHITE HISPANIC                |530188|
|OTHER                         |12    |
|ASIAN / PACIFIC ISLANDER      |150726|
+------------------------------+------+

+----------------------------------------------------------------------+
|count(CASE WHEN (isnan(VIC_RACE) OR (VIC_RACE IS NULL)) THEN true END)|
+----------------------------------------------------------------------+
|                                                                     0|
+----------------------------------------------------------------------+



In [221]:
#new_nypd_df.groupBy('VIC_SEX').count().show(truncate=False)
#new_nypd_df.select([count(when((isnan('VIC_SEX')| col('VIC_SEX').isNull()),True))]).show()

+-------+-------+
|VIC_SEX|count  |
+-------+-------+
|F      |1257200|
|E      |34     |
|M      |772751 |
|D      |145    |
+-------+-------+

+--------------------------------------------------------------------+
|count(CASE WHEN (isnan(VIC_SEX) OR (VIC_SEX IS NULL)) THEN true END)|
+--------------------------------------------------------------------+
|                                                                   0|
+--------------------------------------------------------------------+



In [222]:
#Checking correlation between X co-ordinates and Y-co-ordinates and longitude and latitude respectively. 
coord_latlong_corr_df = nypd.select(fn.corr('X_COORD_CD', 'Longitude').alias('corr_X_COORD_CD_Lat'),
                                    fn.corr('Y_COORD_CD', 'Latitude').alias('corr_Y_COORD_CD_Long'))
coord_latlong_corr_df.show()

#Drop X_COORD_CD and Y_COORD_CD because they are highly correlated with Latitude and Longitude



+-------------------+--------------------+
|corr_X_COORD_CD_Lat|corr_Y_COORD_CD_Long|
+-------------------+--------------------+
|  0.999997985303004|  0.9999887030878197|
+-------------------+--------------------+



In [223]:
new_nypd_df.select([count(when((isnan('Longitude')| col('Longitude').isNull()),True))]).show()
new_nypd_df.select([count(when((isnan('Latitude')| col('Latitude').isNull()),True))]).show()

new_nypd_df = new_nypd_df.withColumn("Latitude",new_nypd_df.Latitude.cast('float'))
new_nypd_df = new_nypd_df.withColumn("Longitude",new_nypd_df.Longitude.cast('float'))

+------------------------------------------------------------------------+
|count(CASE WHEN (isnan(Longitude) OR (Longitude IS NULL)) THEN true END)|
+------------------------------------------------------------------------+
|                                                                       0|
+------------------------------------------------------------------------+

+----------------------------------------------------------------------+
|count(CASE WHEN (isnan(Latitude) OR (Latitude IS NULL)) THEN true END)|
+----------------------------------------------------------------------+
|                                                                     0|
+----------------------------------------------------------------------+



In [125]:
new_nypd_df.printSchema()

root
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- LAW_CAT_CD: integer (nullable = false)
 |-- BORO_NM: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = false)
 |-- PREM_TYP_DESC: string (nullable = false)
 |-- SUSP_RACE: string (nullable = true)
 |-- SUSP_SEX: string (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- VIC_AGE_GROUP: string (nullable = true)
 |-- VIC_RACE: string (nullable = true)
 |-- VIC_SEX: string (nullable = true)
 |-- Time: string (nullable = false)
 |-- Season: string (nullable = false)
 |-- JURISDICTION: string (nullable = false)



In [224]:
categorical_columns= ['CRM_ATPT_CPTD_CD', 'BORO_NM', 'LOC_OF_OCCUR_DESC', 'PREM_TYP_DESC','SUSP_RACE', 'VIC_AGE_GROUP',\
                      'VIC_RACE','VIC_SEX', 'Time', 'Season', 'JURISDICTION']

indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in categorical_columns
]

In [225]:
encoders = [OneHotEncoder(dropLast=False,inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol())) 
    for indexer in indexers
]

In [226]:
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders],outputCol="features")

In [129]:
sample_nypd_df = new_nypd_df.sample(withReplacement=False, fraction=0.15, seed=0)

In [227]:
one_hot_encoder =  Pipeline(stages=indexers + encoders + [assembler]).fit(new_nypd_df)

In [228]:
data1 = one_hot_encoder.transform(new_nypd_df)

In [229]:
training_df, validation_df, testing_df = data1.randomSplit([0.6, 0.3, 0.1], seed=1)

In [230]:
from pyspark.ml import regression
pipe2_model = Pipeline(stages=[
  classification.LogisticRegression(featuresCol='features', labelCol='LAW_CAT_CD')  
]).fit(training_df)

In [231]:
pipe2_model.transform(training_df)

DataFrame[CRM_ATPT_CPTD_CD: string, LAW_CAT_CD: int, BORO_NM: string, LOC_OF_OCCUR_DESC: string, PREM_TYP_DESC: string, SUSP_RACE: string, SUSP_SEX: string, Latitude: float, Longitude: float, VIC_AGE_GROUP: string, VIC_RACE: string, VIC_SEX: string, Time: string, Season: string, JURISDICTION: string, CRM_ATPT_CPTD_CD_indexed: double, BORO_NM_indexed: double, LOC_OF_OCCUR_DESC_indexed: double, PREM_TYP_DESC_indexed: double, SUSP_RACE_indexed: double, VIC_AGE_GROUP_indexed: double, VIC_RACE_indexed: double, VIC_SEX_indexed: double, Time_indexed: double, Season_indexed: double, JURISDICTION_indexed: double, CRM_ATPT_CPTD_CD_indexed_encoded: vector, BORO_NM_indexed_encoded: vector, LOC_OF_OCCUR_DESC_indexed_encoded: vector, PREM_TYP_DESC_indexed_encoded: vector, SUSP_RACE_indexed_encoded: vector, VIC_AGE_GROUP_indexed_encoded: vector, VIC_RACE_indexed_encoded: vector, VIC_SEX_indexed_encoded: vector, Time_indexed_encoded: vector, Season_indexed_encoded: vector, JURISDICTION_indexed_encoded

In [232]:
pipe2_model.transform(training_df).show(1)

+----------------+----------+-------+-----------------+-------------+---------+--------+--------+---------+-------------+--------+-------+----+------+------------+------------------------+---------------+-------------------------+---------------------+-----------------+---------------------+----------------+---------------+------------+--------------+--------------------+--------------------------------+-----------------------+---------------------------------+-----------------------------+-------------------------+-----------------------------+------------------------+-----------------------+--------------------+----------------------+----------------------------+--------------------+--------------------+--------------------+----------+
|CRM_ATPT_CPTD_CD|LAW_CAT_CD|BORO_NM|LOC_OF_OCCUR_DESC|PREM_TYP_DESC|SUSP_RACE|SUSP_SEX|Latitude|Longitude|VIC_AGE_GROUP|VIC_RACE|VIC_SEX|Time|Season|JURISDICTION|CRM_ATPT_CPTD_CD_indexed|BORO_NM_indexed|LOC_OF_OCCUR_DESC_indexed|PREM_TYP_DESC_indexed|

In [233]:
pipe2_model.stages[0].coefficients

DenseVector([-0.4808, 1.9965, 0.1688, -0.0481, 0.122, -0.0262, -0.4053, -0.1085, 0.212, -0.0956, 0.1965, 0.4174, 0.0006, -0.5297, 0.1454, -0.4542, -0.5473, 1.1796, -0.0574, -0.1874, 0.1771, 0.3511, 0.514, -0.2523, 0.086, 0.2394, 0.4455, -0.0051, 0.0564, 0.8797, -0.1628, 0.2675, 0.9524, 1.8711, -0.1479, 0.2557, 0.0293, 0.0609, -0.0283, 0.0726, 0.0517, 0.1762])

In [234]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='LAW_CAT_CD')
AUC = evaluator.evaluate(pipe2_model.transform(validation_df))

In [235]:
AUC

0.6727436038252652

In [236]:
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
predictions = pipe2_model.transform(testing_df)
preds_and_labels = predictions.select(['prediction','LAW_CAT_CD']).withColumn('LAW_CAT_CD', fn.col('LAW_CAT_CD').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','LAW_CAT_CD'])

confusion_matrix = MulticlassMetrics(preds_and_labels.rdd.map(tuple)).confusionMatrix().toArray()
confusion_matrix

array([[150527.,   2647.],
       [ 44069.,   5675.]])

In [237]:
data1.count()

2030130

In [174]:
categorical_columns= ['CRM_ATPT_CPTD_CD', 'BORO_NM', 'LOC_OF_OCCUR_DESC', 'PREM_TYP_DESC','SUSP_RACE', 'VIC_AGE_GROUP',\
                      'VIC_RACE','VIC_SEX', 'Time', 'Season', 'JURISDICTION']

In [238]:
data1.groupBy('CRM_ATPT_CPTD_CD_indexed').count().show(truncate=False)
data1.groupBy('CRM_ATPT_CPTD_CD').count().show(truncate=False)

+------------------------+-------+
|CRM_ATPT_CPTD_CD_indexed|count  |
+------------------------+-------+
|0.0                     |1996289|
|1.0                     |33841  |
+------------------------+-------+

+----------------+-------+
|CRM_ATPT_CPTD_CD|count  |
+----------------+-------+
|ATTEMPTED       |33841  |
|COMPLETED       |1996289|
+----------------+-------+



In [239]:
pipe2_model.stages[0].coefficients

DenseVector([-0.4808, 1.9965, 0.1688, -0.0481, 0.122, -0.0262, -0.4053, -0.1085, 0.212, -0.0956, 0.1965, 0.4174, 0.0006, -0.5297, 0.1454, -0.4542, -0.5473, 1.1796, -0.0574, -0.1874, 0.1771, 0.3511, 0.514, -0.2523, 0.086, 0.2394, 0.4455, -0.0051, 0.0564, 0.8797, -0.1628, 0.2675, 0.9524, 1.8711, -0.1479, 0.2557, 0.0293, 0.0609, -0.0283, 0.0726, 0.0517, 0.1762])

In [240]:
data1.groupBy('BORO_NM_indexed_encoded').count().show(truncate=False)
data1.groupBy('BORO_NM').count().show(truncate=False)

pipe2_model.stages[0].coefficients[2:]

+-----------------------+------+
|BORO_NM_indexed_encoded|count |
+-----------------------+------+
|(5,[1],[1.0])          |456921|
|(5,[4],[1.0])          |103277|
|(5,[3],[1.0])          |399562|
|(5,[0],[1.0])          |648007|
|(5,[2],[1.0])          |422363|
+-----------------------+------+

+-------------+------+
|BORO_NM      |count |
+-------------+------+
|QUEENS       |422363|
|BROOKLYN     |648007|
|BRONX        |456921|
|MANHATTAN    |399562|
|STATEN ISLAND|103277|
+-------------+------+



array([ 1.68771591e-01, -4.81195302e-02,  1.22045441e-01, -2.62021901e-02,
       -4.05267879e-01, -1.08547843e-01,  2.11967870e-01, -9.55591788e-02,
        1.96493510e-01,  4.17373673e-01,  6.25251718e-04, -5.29652904e-01,
        1.45441369e-01, -4.54191349e-01, -5.47289854e-01,  1.17964978e+00,
       -5.73908300e-02, -1.87397777e-01,  1.77072999e-01,  3.51131298e-01,
        5.13988595e-01, -2.52332898e-01,  8.59580205e-02,  2.39411555e-01,
        4.45502158e-01, -5.13165932e-03,  5.64052423e-02,  8.79670327e-01,
       -1.62810216e-01,  2.67501004e-01,  9.52445198e-01,  1.87107926e+00,
       -1.47949701e-01,  2.55653434e-01,  2.92528091e-02,  6.08905585e-02,
       -2.82654022e-02,  7.26146140e-02,  5.16622967e-02,  1.76245143e-01])

In [241]:
data1.groupBy('LOC_OF_OCCUR_DESC_indexed_encoded').count().show(truncate=False)
data1.groupBy('LOC_OF_OCCUR_DESC').count().show(truncate=False)

pipe2_model.stages[0].coefficients[7:]

+---------------------------------+-------+
|LOC_OF_OCCUR_DESC_indexed_encoded|count  |
+---------------------------------+-------+
|(2,[0],[1.0])                    |1221539|
|(2,[1],[1.0])                    |808591 |
+---------------------------------+-------+

+-----------------+-------+
|LOC_OF_OCCUR_DESC|count  |
+-----------------+-------+
|Inside           |1221539|
|Outside          |808591 |
+-----------------+-------+



array([-1.08547843e-01,  2.11967870e-01, -9.55591788e-02,  1.96493510e-01,
        4.17373673e-01,  6.25251718e-04, -5.29652904e-01,  1.45441369e-01,
       -4.54191349e-01, -5.47289854e-01,  1.17964978e+00, -5.73908300e-02,
       -1.87397777e-01,  1.77072999e-01,  3.51131298e-01,  5.13988595e-01,
       -2.52332898e-01,  8.59580205e-02,  2.39411555e-01,  4.45502158e-01,
       -5.13165932e-03,  5.64052423e-02,  8.79670327e-01, -1.62810216e-01,
        2.67501004e-01,  9.52445198e-01,  1.87107926e+00, -1.47949701e-01,
        2.55653434e-01,  2.92528091e-02,  6.08905585e-02, -2.82654022e-02,
        7.26146140e-02,  5.16622967e-02,  1.76245143e-01])

In [242]:
data1.groupBy('PREM_TYP_DESC_indexed_encoded').count().show(truncate=False)
data1.groupBy('PREM_TYP_DESC').count().show(truncate=False)

pipe2_model.stages[0].coefficients[9:]

+-----------------------------+-------+
|PREM_TYP_DESC_indexed_encoded|count  |
+-----------------------------+-------+
|(2,[0],[1.0])                |1150836|
|(2,[1],[1.0])                |879294 |
+-----------------------------+-------+

+-------------+-------+
|PREM_TYP_DESC|count  |
+-------------+-------+
|Public       |879294 |
|RESIDENCE    |1150836|
+-------------+-------+



array([-9.55591788e-02,  1.96493510e-01,  4.17373673e-01,  6.25251718e-04,
       -5.29652904e-01,  1.45441369e-01, -4.54191349e-01, -5.47289854e-01,
        1.17964978e+00, -5.73908300e-02, -1.87397777e-01,  1.77072999e-01,
        3.51131298e-01,  5.13988595e-01, -2.52332898e-01,  8.59580205e-02,
        2.39411555e-01,  4.45502158e-01, -5.13165932e-03,  5.64052423e-02,
        8.79670327e-01, -1.62810216e-01,  2.67501004e-01,  9.52445198e-01,
        1.87107926e+00, -1.47949701e-01,  2.55653434e-01,  2.92528091e-02,
        6.08905585e-02, -2.82654022e-02,  7.26146140e-02,  5.16622967e-02,
        1.76245143e-01])

In [243]:
data1.groupBy('SUSP_RACE_indexed_encoded').count().show(truncate=False)
data1.groupBy('SUSP_RACE').count().show(truncate=False)

pipe2_model.stages[0].coefficients[11:]

+-------------------------+-------+
|SUSP_RACE_indexed_encoded|count  |
+-------------------------+-------+
|(7,[6],[1.0])            |9      |
|(7,[1],[1.0])            |467670 |
|(7,[4],[1.0])            |90220  |
|(7,[2],[1.0])            |302636 |
|(7,[5],[1.0])            |9159   |
|(7,[0],[1.0])            |1020691|
|(7,[3],[1.0])            |139745 |
+-------------------------+-------+

+------------------------------+-------+
|SUSP_RACE                     |count  |
+------------------------------+-------+
|WHITE                         |302636 |
|BLACK                         |1020691|
|AMERICAN INDIAN/ALASKAN NATIVE|9159   |
|BLACK HISPANIC                |139745 |
|WHITE HISPANIC                |467670 |
|OTHER                         |9      |
|ASIAN / PACIFIC ISLANDER      |90220  |
+------------------------------+-------+



array([ 4.17373673e-01,  6.25251718e-04, -5.29652904e-01,  1.45441369e-01,
       -4.54191349e-01, -5.47289854e-01,  1.17964978e+00, -5.73908300e-02,
       -1.87397777e-01,  1.77072999e-01,  3.51131298e-01,  5.13988595e-01,
       -2.52332898e-01,  8.59580205e-02,  2.39411555e-01,  4.45502158e-01,
       -5.13165932e-03,  5.64052423e-02,  8.79670327e-01, -1.62810216e-01,
        2.67501004e-01,  9.52445198e-01,  1.87107926e+00, -1.47949701e-01,
        2.55653434e-01,  2.92528091e-02,  6.08905585e-02, -2.82654022e-02,
        7.26146140e-02,  5.16622967e-02,  1.76245143e-01])

In [244]:
data1.groupBy('VIC_AGE_GROUP_indexed_encoded').count().show(truncate=False)
data1.groupBy('VIC_AGE_GROUP').count().show(truncate=False)

pipe2_model.stages[0].coefficients[18:]

+-----------------------------+------+
|VIC_AGE_GROUP_indexed_encoded|count |
+-----------------------------+------+
|(5,[1],[1.0])                |455832|
|(5,[4],[1.0])                |75646 |
|(5,[3],[1.0])                |177962|
|(5,[0],[1.0])                |970491|
|(5,[2],[1.0])                |350199|
+-----------------------------+------+

+-------------+------+
|VIC_AGE_GROUP|count |
+-------------+------+
|<18          |177962|
|25-44        |970491|
|65+          |75646 |
|18-24        |350199|
|45-64        |455832|
+-------------+------+



array([-0.05739083, -0.18739778,  0.177073  ,  0.3511313 ,  0.51398859,
       -0.2523329 ,  0.08595802,  0.23941156,  0.44550216, -0.00513166,
        0.05640524,  0.87967033, -0.16281022,  0.267501  ,  0.9524452 ,
        1.87107926, -0.1479497 ,  0.25565343,  0.02925281,  0.06089056,
       -0.0282654 ,  0.07261461,  0.0516623 ,  0.17624514])

In [245]:
data1.groupBy('VIC_RACE_indexed_encoded').count().show(truncate=False)
data1.groupBy('VIC_RACE').count().show(truncate=False)

pipe2_model.stages[0].coefficients[23:]

+------------------------+------+
|VIC_RACE_indexed_encoded|count |
+------------------------+------+
|(7,[6],[1.0])           |12    |
|(7,[1],[1.0])           |530188|
|(7,[4],[1.0])           |112129|
|(7,[2],[1.0])           |404891|
|(7,[5],[1.0])           |12761 |
|(7,[0],[1.0])           |819423|
|(7,[3],[1.0])           |150726|
+------------------------+------+

+------------------------------+------+
|VIC_RACE                      |count |
+------------------------------+------+
|WHITE                         |404891|
|BLACK                         |819423|
|AMERICAN INDIAN/ALASKAN NATIVE|12761 |
|BLACK HISPANIC                |112129|
|WHITE HISPANIC                |530188|
|OTHER                         |12    |
|ASIAN / PACIFIC ISLANDER      |150726|
+------------------------------+------+



array([-0.2523329 ,  0.08595802,  0.23941156,  0.44550216, -0.00513166,
        0.05640524,  0.87967033, -0.16281022,  0.267501  ,  0.9524452 ,
        1.87107926, -0.1479497 ,  0.25565343,  0.02925281,  0.06089056,
       -0.0282654 ,  0.07261461,  0.0516623 ,  0.17624514])

In [246]:
data1.groupBy('VIC_SEX_indexed_encoded').count().show(truncate=False)
data1.groupBy('VIC_SEX').count().show(truncate=False)

pipe2_model.stages[0].coefficients[30:]

+-----------------------+-------+
|VIC_SEX_indexed_encoded|count  |
+-----------------------+-------+
|(4,[2],[1.0])          |145    |
|(4,[0],[1.0])          |1257200|
|(4,[3],[1.0])          |34     |
|(4,[1],[1.0])          |772751 |
+-----------------------+-------+

+-------+-------+
|VIC_SEX|count  |
+-------+-------+
|F      |1257200|
|E      |34     |
|M      |772751 |
|D      |145    |
+-------+-------+



array([-0.16281022,  0.267501  ,  0.9524452 ,  1.87107926, -0.1479497 ,
        0.25565343,  0.02925281,  0.06089056, -0.0282654 ,  0.07261461,
        0.0516623 ,  0.17624514])

In [247]:
data1.groupBy('Time_indexed_encoded').count().show(truncate=False)
data1.groupBy('Time').count().show(truncate=False)

pipe2_model.stages[0].coefficients[34:]

+--------------------+-------+
|Time_indexed_encoded|count  |
+--------------------+-------+
|(2,[0],[1.0])       |1301785|
|(2,[1],[1.0])       |728345 |
+--------------------+-------+

+-----+-------+
|Time |count  |
+-----+-------+
|Night|728345 |
|Day  |1301785|
+-----+-------+



array([-0.1479497 ,  0.25565343,  0.02925281,  0.06089056, -0.0282654 ,
        0.07261461,  0.0516623 ,  0.17624514])

In [248]:
data1.groupBy('Season_indexed_encoded').count().show(truncate=False)
data1.groupBy('Season').count().show(truncate=False)

pipe2_model.stages[0].coefficients[36:]

+----------------------+------+
|Season_indexed_encoded|count |
+----------------------+------+
|(4,[2],[1.0])         |503746|
|(4,[0],[1.0])         |550342|
|(4,[3],[1.0])         |458681|
|(4,[1],[1.0])         |517361|
+----------------------+------+

+------+------+
|Season|count |
+------+------+
|Spring|503746|
|Summer|550342|
|Autumn|517361|
|Winter|458681|
+------+------+



array([ 0.02925281,  0.06089056, -0.0282654 ,  0.07261461,  0.0516623 ,
        0.17624514])

In [249]:
data1.groupBy('JURISDICTION_indexed_encoded').count().show(truncate=False)
data1.groupBy('JURISDICTION').count().show(truncate=False)

pipe2_model.stages[0].coefficients[40:]

+----------------------------+-------+
|JURISDICTION_indexed_encoded|count  |
+----------------------------+-------+
|(2,[0],[1.0])               |1778511|
|(2,[1],[1.0])               |251619 |
+----------------------------+-------+

+------------+-------+
|JURISDICTION|count  |
+------------+-------+
|Other       |251619 |
|Police      |1778511|
+------------+-------+



array([0.0516623 , 0.17624514])

In [250]:
pipe2_model.stages[0].intercept

-1.0900373470589302