# Recommendation using RF

## FBI Code for Police records

You are provided with the dataset that contains the crime records from Chicago. The dataset belongs to Chicago Police Department. This dataset reflects reported incidents of crime (with the exception of murders where data exists for each victim) that occurred in the City of Chicago from 2012 to 2017. The data is extracted from the Chicago Police Department's CLEAR (Citizen Law Enforcement Analysis and Reporting) system.

Columns in the Dataset:

**ID** - Unique identifier for the record.

**Case Number** - The Chicago Police Department RD Number (Records Division Number), which is unique to the incident.

**Date** - Date when the incident occurred. this is sometimes a best estimate.

**Block** - The partially redacted address where the incident occurred, placing it on the same block as the actual address.

**IUCR** - The Illinois Unifrom Crime Reporting code. This is directly linked to the Primary Type and Description.

**Primary Type** - The primary description of the IUCR code.

**Description** - The secondary description of the IUCR code, a subcategory of the primary description.

**Location Description** - Description of the location where the incident occurred.

**Arrest** - Indicates whether an arrest was made.

**Domestic** - Indicates whether the incident was domestic-related as defined by the Illinois Domestic Violence Act.

**Beat** - Indicates the beat where the incident occurred. A beat is the smallest police geographic area – each beat has a dedicated police beat car. Three to five beats make up a police sector, and three sectors make up a police district. The Chicago Police Department has 22 police districts.

**District** - Indicates the police district where the incident occurred.

**Ward** - The ward (City Council district) where the incident occurred.

**Community Area** - Indicates the community area where the incident occurred. Chicago has 77 community areas.

**FBI Code** - Indicates the crime classification as outlined in the FBI's National Incident-Based Reporting System (NIBRS).

**X Coordinate** - The x coordinate of the location where the incident occurred in State Plane Illinois East NAD 1983 projection. This location is shifted from the actual location for partial redaction but falls on the same block.

**Y Coordinate** - The y coordinate of the location where the incident occurred in State Plane Illinois East NAD 1983 projection. This location is shifted from the actual location for partial redaction but falls on the same block.

**Year** - Year the incident occurred.

**Updated On** - Date and time the record was last updated.

**Latitude** - The latitude of the location where the incident occurred. This location is shifted from the actual location for partial redaction but falls on the same block.

**Longitude** - The longitude of the location where the incident occurred. This location is shifted from the actual location for partial redaction but falls on the same block.

**Location** - The location where the incident occurred in a format that allows for creation of maps and other geographic operations on this data portal. This location is shifted from the actual location for partial redaction but falls on the same block.

Our objective is to use the information and try to come up with a system that recommends the **FBI Code** for each crime absed on the given information.

### Initialising the Spark session

In [1]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.driver.memory": "6000M"}
}

In [2]:
# Initialising the Spark session
spark.sparkContext.getConf().get('spark.driver.memory')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1638805202650_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'6000M'

### Installing the requried libraries

In [3]:
# check the libraries in already installed on the cluster 
spark.sparkContext.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- -------
beautifulsoup4             4.8.1
boto                       2.49.0
jmespath                   0.9.4
lxml                       4.4.2
mysqlclient                1.4.6
nltk                       3.4.5
nose                       1.3.4
numpy                      1.14.5
pip                        21.3.1
py-dateutil                2.2
python36-sagemaker-pyspark 1.2.6
pytz                       2019.3
PyYAML                     3.11
setuptools                 59.5.0
six                        1.13.0
soupsieve                  1.9.5
wheel                      0.37.0
windmill                   1.6

In [4]:
spark.sparkContext.install_pypi_package("pandas==0.25.1") #Install pandas version 0.25.1 
spark.sparkContext.install_pypi_package("matplotlib", "https://pypi.org/simple") #Install matplotlib from given PyPI repository

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Downloading pandas-0.25.1-cp36-cp36m-manylinux1_x86_64.whl (10.5 MB)
Collecting python-dateutil>=2.6.1
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.2

Collecting matplotlib
  Downloading matplotlib-3.3.4-cp36-cp36m-manylinux1_x86_64.whl (11.5 MB)
Collecting pillow>=6.2.0
  Downloading Pillow-8.4.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
Collecting numpy>=1.15
  Downloading numpy-1.19.5-cp36-cp36m-manylinux2010_x86_64.whl (14.8 MB)
Collecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.3.1-cp36-cp36m-manylinux1_x86_64.whl (1.1 MB)
Collecting cycler>=0.10
  Downloading cycler-0.11.0-py3-none-any.whl (6.4 kB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.3
  Downloading pyparsing-3.0.6-py3-none-any.whl (97 kB)
Installing collected packages: pyparsing, pillow, numpy, kiwisolver, cycler, matplotlib
  

### Loading the dataset

In [5]:
# Data stored in Public S3 Bucket 'chicago-crime-mlc' as 'Chicago_Crimes_2012_to_2017.csv'
df=spark.read.csv('s3a://chicago-crime-mlc/Chicago_Crimes_2012_to_2017.csv',\
                  header=True,inferSchema=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Printing the first row
df.head(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(_c0='3', ID='10508693', Case Number='HZ250496', Date='05/03/2016 11:40:00 PM', Block='013XX S SAWYER AVE', IUCR='0486', Primary Type='BATTERY', Description='DOMESTIC BATTERY SIMPLE', Location Description='APARTMENT', Arrest='True', Domestic='True', Beat='1022', District='10.0', Ward='24.0', Community Area='29.0', FBI Code='08B', X Coordinate='1154907.0', Y Coordinate='1893681.0', Year='2016', Updated On='05/10/2016 03:56:50 PM', Latitude='41.864073157', Longitude='-87.706818608', Location='(41.864073157, -87.706818608)')]

In [7]:
# Schema of the dataset
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- _c0: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)

In [8]:
# Number of rows and columns in the dataset
print(df.count(), len(df.columns))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1456714 23

### Data Exploration and Cleaning

#### Dropping columns

In [9]:
# Columns in dataframe
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['_c0', 'ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude', 'Location']

In [10]:
# Dropping the variables that will not contribute to the final model
df = df.drop('_c0', 'Case Number', "Updated On")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Checking the number distinct values in each attribute
from pyspark.sql.functions import col, countDistinct
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------+---------+--------+
|     ID|  Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Latitude|Longitude|Location|
+-------+------+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------+---------+--------+
|1456714|582146|32774| 365|          33|        342|                 142|     2|       2| 302|      24|  50|            78|      26|       67714|      111555|   6|  368076|   367942|  368286|
+-------+------+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------+---------+--------+

In [13]:
# Variables associated with location:
# "Block", "Beat", "District", "Ward", "Community Area", "X Coordinate", "Y Coordinate", "Latitude", "Longitude", "Location"
# Dropping redundant variables
df = df.drop("Block", "Beat", "Ward", "Community Area", "Location")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Variables associated with the crime type:
# "IUCR", "Primary Type", "Description"
df = df.drop("IUCR", "Description")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Columns and rows in the update dataFrame
print(df.count(), len(df.columns))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1456714 13

#### Handling null values

In [17]:
# Counting the number of null values in each column
from pyspark.sql.functions import when, count, col, isnull
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----+------------+--------------------+------+--------+--------+--------+------------+------------+----+--------+---------+
| ID|Date|Primary Type|Location Description|Arrest|Domestic|District|FBI Code|X Coordinate|Y Coordinate|Year|Latitude|Longitude|
+---+----+------------+--------------------+------+--------+--------+--------+------------+------------+----+--------+---------+
|  0|   0|           0|                1658|     0|       0|       1|       0|       37083|       37083|   0|   37083|    37083|
+---+----+------------+--------------------+------+--------+--------+--------+------------+------------+----+--------+---------+

In [18]:
# Dropping the rows with null values
df = df.na.drop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Check if the null values are dropped
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----+------------+--------------------+------+--------+--------+--------+------------+------------+----+--------+---------+
| ID|Date|Primary Type|Location Description|Arrest|Domestic|District|FBI Code|X Coordinate|Y Coordinate|Year|Latitude|Longitude|
+---+----+------------+--------------------+------+--------+--------+--------+------------+------------+----+--------+---------+
|  0|   0|           0|                   0|     0|       0|       0|       0|           0|           0|   0|       0|        0|
+---+----+------------+--------------------+------+--------+--------+--------+------------+------------+----+--------+---------+

#### Correction in column type

In [20]:
# Column type
df.dtypes

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ID', 'string'), ('Date', 'string'), ('Primary Type', 'string'), ('Location Description', 'string'), ('Arrest', 'string'), ('Domestic', 'string'), ('District', 'string'), ('FBI Code', 'string'), ('X Coordinate', 'string'), ('Y Coordinate', 'string'), ('Year', 'string'), ('Latitude', 'string'), ('Longitude', 'string')]

In [21]:
# Changing the type of column Date to timestamp
from pyspark.sql.functions import to_timestamp
df = df.withColumn('Date_Time', to_timestamp('Date', 'MM/dd/yyy HH:mm:ss'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Extracting 'hour' from the dataset
from pyspark.sql.functions import hour
df = df.withColumn('hour', hour(df['Date_Time']))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Dropping the columns: Date, Date_Time
df = df.drop('Date', 'Date_Time')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# Column type
df.dtypes

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ID', 'string'), ('Primary Type', 'string'), ('Location Description', 'string'), ('Arrest', 'string'), ('Domestic', 'string'), ('District', 'string'), ('FBI Code', 'string'), ('X Coordinate', 'string'), ('Y Coordinate', 'string'), ('Year', 'string'), ('Latitude', 'string'), ('Longitude', 'string'), ('hour', 'int')]

In [30]:
# Changing the required columns from string type to numerical 
from pyspark.sql.types import FloatType, IntegerType

df = df.withColumn('X Coordinate', df['X Coordinate'].cast(FloatType()))
df = df.withColumn('Y Coordinate', df['Y Coordinate'].cast(FloatType()))
df = df.withColumn('Latitude', df['Latitude'].cast(FloatType()))
df = df.withColumn('Longitude', df['Longitude'].cast(FloatType()))
df = df.withColumn('Year', df['Year'].cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# Checking the data type of each column
df.dtypes

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ID', 'string'), ('Primary Type', 'string'), ('Location Description', 'string'), ('Arrest', 'string'), ('Domestic', 'string'), ('District', 'string'), ('FBI Code', 'string'), ('X Coordinate', 'float'), ('Y Coordinate', 'float'), ('Year', 'int'), ('Latitude', 'float'), ('Longitude', 'float'), ('hour', 'int')]

#### Exploring the target variable

In [32]:
# Spread of data in FBI Code
df.groupBy('FBI Code').count().orderBy("count", ascending=False).collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(FBI Code='06', count=321960), Row(FBI Code='08B', count=222988), Row(FBI Code='14', count=152816), Row(FBI Code='26', count=134437), Row(FBI Code='18', count=125762), Row(FBI Code='05', count=81671), Row(FBI Code='08A', count=66801), Row(FBI Code='07', count=59858), Row(FBI Code='11', count=59250), Row(FBI Code='03', count=56093), Row(FBI Code='04B', count=35956), Row(FBI Code='04A', count=23380), Row(FBI Code='24', count=17080), Row(FBI Code='15', count=17070), Row(FBI Code='10', count=7783), Row(FBI Code='16', count=7585), Row(FBI Code='02', count=6756), Row(FBI Code='20', count=6303), Row(FBI Code='17', count=5387), Row(FBI Code='01A', count=2578), Row(FBI Code='19', count=2214), Row(FBI Code='09', count=2158), Row(FBI Code='22', count=1928), Row(FBI Code='13', count=393), Row(FBI Code='12', count=185), Row(FBI Code='01B', count=12)]

In [33]:
# Storing in a pandas dataframe for visualisation
fbi_df = df.groupBy('FBI Code').count().orderBy("count", ascending=False).toPandas()
print(fbi_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   FBI Code   count
0        06  321960
1       08B  222988
2        14  152816
3        26  134437
4        18  125762
5        05   81671
6       08A   66801
7        07   59858
8        11   59250
9        03   56093
10      04B   35956
11      04A   23380
12       24   17080
13       15   17070
14       10    7783
15       16    7585
16       02    6756
17       20    6303
18       17    5387
19      01A    2578
20       19    2214
21       09    2158
22       22    1928
23       13     393
24       12     185
25      01B      12

In [38]:
spark.sparkContext.install_pypi_package("numpy==1.15")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package already installed for current Spark context!
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 1110, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [39]:
# import matplot lib 
import matplotlib.pyplot as plt

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Matplotlib requires numpy>=1.15; you have 1.14.5
Traceback (most recent call last):
  File "/tmp/1638805827415-0/local/lib64/python3.6/site-packages/matplotlib/__init__.py", line 174, in <module>
    _check_versions()
  File "/tmp/1638805827415-0/local/lib64/python3.6/site-packages/matplotlib/__init__.py", line 171, in _check_versions
    .format(modname, minver, module.__version__))
ImportError: Matplotlib requires numpy>=1.15; you have 1.14.5



In [None]:
# create the plot


# display the plot
%matplot plt

### Model Building

In [40]:
# Storing the cleaned df in a copy variable
df_copy = df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
#  Columns in the dataframe
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['ID', 'Primary Type', 'Location Description', 'Arrest', 'Domestic', 'District', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Latitude', 'Longitude', 'hour']

In [42]:
# Storing the categorical and continuous columns in different lists
categorical_features = ['Primary Type', 'Location Description', 'Arrest', 'Domestic', 'District', 'hour']
continuous_features = ['X Coordinate', 'Y Coordinate', 'Year', 'Latitude', 'Longitude']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
# Importing the libraries for data transormation
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
# Initialising the variable 'stages' to store every step for building a pipeline
stages = []

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
# Building a function for encoding all the categorical variables
for categoricalCol in categorical_features:
    stringIndexer= StringIndexer(inputCol = categoricalCol, outputCol=categoricalCol+'Index')
    encoder = OneHotEncoderEstimator(inputCols = [stringIndexer.getOutputCol()], outputCols = [categoricalCol+'classVec'])
    stages += [stringIndexer, encoder]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
# Encoding the target variable as label
label_stringIdx = StringIndexer(inputCol = 'FBI Code', outputCol = 'label')
stages += [label_stringIdx]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
# Building a function for generating a vector of all features
assemblerInputs = [c + 'classVec' for c in categorical_features]+ continuous_features
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol= 'features')
stages += [assembler]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# Loading all the steps in a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
# Fitting the steps on the dataFrame
pipelineModel = pipeline.fit(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# Transforming the dataframe
df = pipelineModel.transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
# Checking the transformed dataFrame
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- ID: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- District: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: float (nullable = true)
 |-- Y Coordinate: float (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- hour: integer (nullable = true)
 |-- Primary TypeIndex: double (nullable = false)
 |-- Primary TypeclassVec: vector (nullable = true)
 |-- Location DescriptionIndex: double (nullable = false)
 |-- Location DescriptionclassVec: vector (nullable = true)
 |-- ArrestIndex: double (nullable = false)
 |-- ArrestclassVec: vector (nullable = true)
 |-- DomesticIndex: double (nullable = false)
 |-- DomesticclassVec: vector (nullable = true)
 |-- DistrictIndex: double (nullable = false)
 |-- Districtclass

### Model Fitting

In [53]:
# Splitting the dataFrame into training and testing set
train, test = df.randomSplit([0.7, 0.3], seed = 2020)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
# Number of observations in training set
train.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

992636

In [55]:
# Number of observations in test set
test.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

425768

In [57]:
# Building the RF model
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', \
                            maxDepth=5, impurity='gini', numTrees=25, seed=2020, subsamplingRate = 0.7)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
# Fitting the model over the training set
rfModel = rf.fit(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
# Printing the forest obtained from the model
print(rfModel.toDebugString)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RandomForestClassificationModel (uid=RandomForestClassifier_1cfc4fa8322f) with 25 trees
  Tree 0 (weight 1.0):
    If (feature 171 in {0.0})
     If (feature 0 in {0.0})
      If (feature 4 in {0.0})
       If (feature 38 in {0.0})
        Predict: 4.0
       Else (feature 38 not in {0.0})
        If (feature 21 in {0.0})
         Predict: 4.0
        Else (feature 21 not in {0.0})
         Predict: 22.0
      Else (feature 4 not in {0.0})
       If (feature 187 in {0.0})
        Predict: 6.0
       Else (feature 187 not in {0.0})
        If (feature 38 in {0.0})
         Predict: 6.0
        Else (feature 38 not in {0.0})
         Predict: 11.0
     Else (feature 0 not in {0.0})
      Predict: 0.0
    Else (feature 171 not in {0.0})
     If (feature 2 in {0.0})
      If (feature 11 in {0.0})
       If (feature 4 in {0.0})
        If (feature 1 in {0.0})
         Predict: 0.0
        Else (feature 1 not in {0.0})
         Predict: 1.0
       Else (feature 4 not in {0.0})
        Predic

### Model Prediction and Evaluation

In [60]:
# Applying the model on test set
predictions = rfModel.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
# Printing the required columns
predictions.select('ID', 'label', 'rawPrediction', 'prediction', 'probability').show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+--------------------+----------+--------------------+
|      ID|label|       rawPrediction|prediction|         probability|
+--------+-----+--------------------+----------+--------------------+
|10000348|  8.0|[6.97922569810444...|       0.0|[0.27916902792417...|
|10000517|  3.0|[2.66048014920260...|       3.0|[0.10641920596810...|
|10001141|  8.0|[5.06829749573779...|       0.0|[0.20273189982951...|
|10001170|  8.0|[5.11395324938361...|       0.0|[0.20455812997534...|
|10001210|  2.0|[1.34168727386628...|       2.0|[0.05366749095465...|
|10001230|  3.0|[3.39524124653722...|       3.0|[0.13580964986148...|
|10002193|  3.0|[3.45221459111338...|       3.0|[0.13808858364453...|
|10002782| 16.0|[4.77911409072262...|       0.0|[0.19116456362890...|
|10002882|  3.0|[2.97855425843603...|       3.0|[0.11914217033744...|
|10003041|  0.0|[12.0847318042244...|       0.0|[0.48338927216897...|
+--------+-----+--------------------+----------+--------------------+
only showing top 10 

In [62]:
# Model evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
# Model Accuracy - 25 trees
print(accuracy)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.770971515003476

In [64]:
# Test Error
print("Test Error = %g" % (1.0 - accuracy))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test Error = 0.229028

In [65]:
# Confusion Matrix
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
import pyspark.sql.functions as F

#important: need to cast to float type, and order by prediction, else it won't work:
preds_and_labels = predictions.select(['prediction','label']).withColumn('label', F.col('label').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
preds_and_labels = preds_and_labels.select(['prediction','label'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[9.6387e+04 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00]
 [0.0000e+00 6.7122e+04 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00]
 [0.0000e+00 0.0000e+00 4.6009e+04 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00 0.0000e+00
  0.0000e+00 0.0000e+00]
 [1.2594e+04 3.0000e+01 0.0000e+00 2.5799e+04 1.7610e+03 0.0000e+00
  0.0000e+00 0.0000e+00 0.0000e+00 0.0000

In [66]:
#Model building with 35 trees
from pyspark.ml.classification import RandomForestClassifier
rf35 = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', \
                            maxDepth=5, impurity='gini', numTrees=25, seed=2020, subsamplingRate = 0.7)
rfModel35 = rf35.fit(train)
predictions35 = rfModel35.transform(test)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy35 = evaluator.evaluate(predictions35)
# Model Accuracy - 15 trees
print(accuracy35)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.770971515003476

### Feature Importance

In [67]:
# Feature Importance
rfModel.featureImportances

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparseVector(212, {0: 0.14, 1: 0.1284, 2: 0.2186, 3: 0.0591, 4: 0.0341, 5: 0.1093, 6: 0.0749, 7: 0.0271, 8: 0.022, 9: 0.0646, 10: 0.0038, 11: 0.0126, 12: 0.0028, 13: 0.0056, 14: 0.0059, 15: 0.0026, 16: 0.0002, 17: 0.0, 18: 0.0005, 19: 0.0009, 20: 0.0, 21: 0.0005, 22: 0.0, 24: 0.0001, 32: 0.0037, 33: 0.0018, 34: 0.0044, 35: 0.0042, 36: 0.0, 38: 0.0004, 40: 0.0007, 41: 0.0, 42: 0.0014, 43: 0.0, 44: 0.0, 46: 0.0074, 47: 0.0004, 51: 0.0, 56: 0.0, 60: 0.0001, 66: 0.0, 73: 0.0, 76: 0.0, 87: 0.0, 89: 0.0, 90: 0.0, 96: 0.0, 102: 0.0, 171: 0.049, 172: 0.0106, 173: 0.001, 176: 0.0, 181: 0.0, 183: 0.0, 187: 0.0, 188: 0.0, 189: 0.0, 190: 0.0002, 196: 0.0, 197: 0.0, 199: 0.0, 204: 0.0, 205: 0.0, 207: 0.0001, 208: 0.0003, 209: 0.0001, 210: 0.0003, 211: 0.0})

In [68]:
# Defining a function to extract features along with the feature importance score
import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
# Printing the feature importance scores
ExtractFeatureImp(rfModel.featureImportances, predictions, "features").head(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

     idx                                     name     score
7      2     Primary TypeclassVec_CRIMINAL DAMAGE  0.218598
5      0               Primary TypeclassVec_THEFT  0.139971
6      1             Primary TypeclassVec_BATTERY  0.128359
10     5       Primary TypeclassVec_OTHER OFFENSE  0.109329
11     6            Primary TypeclassVec_BURGLARY  0.074927
14     9             Primary TypeclassVec_ROBBERY  0.064582
8      3           Primary TypeclassVec_NARCOTICS  0.059097
176  171                     ArrestclassVec_False  0.049018
9      4             Primary TypeclassVec_ASSAULT  0.034138
12     7  Primary TypeclassVec_DECEPTIVE PRACTICE  0.027123