## Chicago Crime Prediction Analysis

In [7]:
%%init_spark
launcher.master="yarn"
launcher.num_executors=6
launcher.executor_memory="6000m"
launcher.executor_cores=2

In [8]:
val data_df =spark.read.option("header","true").option("delimiter",",").option("inferschema", "true").option("escape","\"").csv("/Chicago.csv")


data_df: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 21 more fields]


Count of Missing or Null Values

In [9]:
import org.apache.spark.sql.functions.{sum, col}



import org.apache.spark.sql.functions.{sum, col}


In [10]:
data_df.select(data_df.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*).show


+---+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|_c0| ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|
+---+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|  0|  0|          1|   0|    0|   0|           0|          0|                1658|     0|       0|   0|       1|  14|            40|       0|       37083|       37083|   0|         0|   37083|    37083|   37083|
+---+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------

In [11]:
val data_filtered_null_val_df = data_df.na.drop

data_filtered_null_val_df: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 21 more fields]


In [12]:
data_filtered_null_val_df.select(data_filtered_null_val_df.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*).show


+---+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|_c0| ID|Case Number|Date|Block|IUCR|Primary Type|Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|Updated On|Latitude|Longitude|Location|
+---+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+----------+--------+---------+--------+
|  0|  0|          0|   0|    0|   0|           0|          0|                   0|     0|       0|   0|       0|   0|             0|       0|           0|           0|   0|         0|       0|        0|       0|
+---+---+-----------+----+-----+----+------------+-----------+--------------------+------+--------+----+--------+----+--------------+--------+------

Total Number of Rows with Null values

In [13]:

data_df.count()

res2: Long = 1456714


Total Number of Rows after Filtering Null Values which is 2.5% of the whole dataset

In [14]:
data_filtered_null_val_df.count()

res3: Long = 1418365


Converting all the boolean values to numeric

In [15]:
val data_without_bool_df = data_filtered_null_val_df.withColumn("Arrest",when(col("Arrest").equalTo("True"),1).otherwise(when(col("Arrest").equalTo("False"),0))).withColumn("Domestic",when(col("Domestic").equalTo("True"),1).otherwise(when(col("Domestic").equalTo("False"),0)))

data_without_bool_df: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 21 more fields]


# DownSampling the Data.

In [16]:
data_without_bool_df.groupBy("Arrest").count().show(10)
val down_sampled_df = data_without_bool_df.stat.sampleBy("Arrest", Map(0 -> 0.03542959664, 1 -> 0.1),111)
down_sampled_df.groupBy("Arrest").count().show(10)

+------+-------+
|Arrest|  count|
+------+-------+
|     1| 371057|
|     0|1047308|
+------+-------+

+------+-----+
|Arrest|count|
+------+-----+
|     1|36845|
|     0|36848|
+------+-----+



down_sampled_df: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 21 more fields]


In [17]:
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
import org.apache.spark.ml.{Pipeline, PipelineModel}

import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
import org.apache.spark.ml.{Pipeline, PipelineModel}


In [18]:
val featureCol = down_sampled_df.columns
   
var indexers: Array[StringIndexer] = Array()

for (colName <- featureCol)
    {
      val index = new StringIndexer()
        .setInputCol(colName)
        .setOutputCol(colName + "_indexed")
        
        indexers = indexers :+ index
    }

featureCol: Array[String] = Array(_c0, ID, Case Number, Date, Block, IUCR, Primary Type, Description, Location Description, Arrest, Domestic, Beat, District, Ward, Community Area, FBI Code, X Coordinate, Y Coordinate, Year, Updated On, Latitude, Longitude, Location)
indexers: Array[org.apache.spark.ml.feature.StringIndexer] = Array(strIdx_0ea7ad4e019e, strIdx_0ec6653b47e4, strIdx_a41e1ecd250c, strIdx_903a6460838f, strIdx_0ad159eafce7, strIdx_8d15ef15a95f, strIdx_b7d5d1e38b8e, strIdx_89b1da305869, strIdx_9037efdf0867, strIdx_8b755fc4a503, strIdx_a2f29f42f7e9, strIdx_35939b224499, strIdx_bf7213ffc4b6, strIdx_1aa76edbffb2, strIdx_5a90af697f2b, strIdx_b8d2c90c52b1, strIdx_a2fde87ddd11, strIdx_962c1c2f5ff2, strIdx_3f6a9564d98b, strIdx_6a3c83c65155, strIdx_89e85751ff4f, strIdx_7ee570908b6e, s...

In [19]:
    val pipeline = new Pipeline().setStages(indexers)      


pipeline: org.apache.spark.ml.Pipeline = pipeline_269098adcbbf


In [20]:

   
   val pipeline_fitted_downsampled = pipeline.fit(down_sampled_df)
    
  //  indexed_downsampled_DF.show()


pipeline_fitted_downsampled: org.apache.spark.ml.PipelineModel = pipeline_269098adcbbf


<console>: 200: error: value show is not a member of org.apache.spark.ml.PipelineModel

In [21]:
import org.apache.spark.ml.feature.VectorAssembler


import org.apache.spark.ml.feature.VectorAssembler


In [22]:
val assembler = new VectorAssembler() .setInputCols(Array("IUCR_indexed","Primary Type_indexed","Description_indexed","Location Description_indexed","Domestic_indexed","Beat_indexed","District_indexed","Ward_indexed","Community Area_indexed","FBI Code_indexed")) 
                .setOutputCol("features")



assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_44c8349584e9


In [23]:
val pipeline_1 = new Pipeline().setStages(Array(pipeline_fitted_downsampled,assembler))      

val features_DF = pipeline_1.fit(down_sampled_df).transform(down_sampled_df)
   // indexedDF2.select("features").show()
 //new Pipeline().setStages(Array(tokenizer,puncRemover,stopWordRemover, stemmer, vectorizer, tfidf,cv_lr))

pipeline_1: org.apache.spark.ml.Pipeline = pipeline_d426bd64bc50
features_DF: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 45 more fields]


# ChiSqSelector to get the top 5 features

In [26]:
features_DF.select("Features","Arrest_indexed").show(false)

+-------------------------------------------------+--------------+
|Features                                         |Arrest_indexed|
+-------------------------------------------------+--------------+
|[0.0,1.0,1.0,2.0,1.0,112.0,8.0,7.0,24.0,2.0]     |0.0           |
|[12.0,10.0,11.0,0.0,0.0,57.0,13.0,4.0,11.0,9.0]  |0.0           |
|[7.0,0.0,7.0,6.0,0.0,45.0,8.0,18.0,4.0,0.0]      |0.0           |
|[12.0,10.0,11.0,0.0,0.0,172.0,19.0,43.0,31.0,9.0]|0.0           |
|[16.0,6.0,15.0,15.0,0.0,211.0,7.0,34.0,20.0,6.0] |0.0           |
|[2.0,0.0,3.0,59.0,0.0,22.0,13.0,2.0,11.0,0.0]    |0.0           |
|[77.0,7.0,70.0,3.0,0.0,128.0,20.0,41.0,68.0,7.0] |0.0           |
|[11.0,8.0,10.0,7.0,0.0,8.0,3.0,9.0,0.0,3.0]      |1.0           |
|[11.0,8.0,10.0,4.0,0.0,78.0,8.0,18.0,4.0,3.0]    |1.0           |
|[19.0,9.0,19.0,5.0,0.0,72.0,0.0,1.0,0.0,8.0]     |0.0           |
|[13.0,0.0,12.0,8.0,0.0,1.0,0.0,3.0,1.0,0.0]      |0.0           |
|[13.0,0.0,12.0,2.0,0.0,48.0,8.0,6.0,12.0,0.0]    |0.0        

In [213]:
features_DF.createOrReplaceTempView("num_table")



In [216]:
val numeric_features= spark.sql("select IUCR_indexed,`Primary Type_indexed`,Description_indexed,Location Description_indexed,Domestic_indexed,Beat_indexed,District_indexed,Ward_indexed,`Community Area_indexed`,`FBI Code_indexed` from num_table")


numeric_features: org.apache.spark.sql.DataFrame = [IUCR_indexed: double, Primary Type_indexed: double ... 8 more fields]


In [143]:
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val selector = new ChiSqSelector()
.setSelectorType("fpr")
.setFpr(0.01)
  .setFeaturesCol("features")
  .setLabelCol("Arrest_indexed")
  .setOutputCol("selectedFeatures")


val features_top_DF = selector.fit(features_DF).transform(features_DF)


val show_feature_name_df = selector.fit(features_DF)











//println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
//features_top5_DF.select("selectedFeatures","features").show(false)


import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
selector: org.apache.spark.ml.feature.ChiSqSelector = chiSqSelector_181b0bd49d74
features_top_DF: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 46 more fields]
show_feature_name_df: org.apache.spark.ml.feature.ChiSqSelectorModel = chiSqSelector_181b0bd49d74


In [150]:
val feature_name = show_feature_name_df.selectedFeatures






feature_name: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)


# All Features selected by CHi Square selector

In [151]:
feature_name.foreach(println)

0
1
2
3
4
5
6
7
8
9


In [152]:
features_top_DF.cache()

res46: features_top_DF.type = [_c0: int, ID: int ... 46 more fields]


# Logistic Regression

In [153]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._


In [154]:
val lr = new LogisticRegression().setLabelCol("Arrest_indexed").setFeaturesCol("selectedFeatures")

lr: org.apache.spark.ml.classification.LogisticRegression = logreg_34c53c1821ef


In [155]:
val paramGrid =new ParamGridBuilder()
             .addGrid(lr.regParam, Array(0.01, 0.5, 2.0))
             .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
             .build()

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	logreg_34c53c1821ef-elasticNetParam: 0.0,
	logreg_34c53c1821ef-regParam: 0.01
}, {
	logreg_34c53c1821ef-elasticNetParam: 0.5,
	logreg_34c53c1821ef-regParam: 0.01
}, {
	logreg_34c53c1821ef-elasticNetParam: 1.0,
	logreg_34c53c1821ef-regParam: 0.01
}, {
	logreg_34c53c1821ef-elasticNetParam: 0.0,
	logreg_34c53c1821ef-regParam: 0.5
}, {
	logreg_34c53c1821ef-elasticNetParam: 0.5,
	logreg_34c53c1821ef-regParam: 0.5
}, {
	logreg_34c53c1821ef-elasticNetParam: 1.0,
	logreg_34c53c1821ef-regParam: 0.5
}, {
	logreg_34c53c1821ef-elasticNetParam: 0.0,
	logreg_34c53c1821ef-regParam: 2.0
}, {
	logreg_34c53c1821ef-elasticNetParam: 0.5,
	logreg_34c53c1821ef-regParam: 2.0
}, {
	logreg_34c53c1821ef-elasticNetParam: 1.0,
	logreg_34c53c1821ef-reg...

In [156]:
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("Arrest_indexed").setMetricName("areaUnderROC")
val cv_lr = new CrossValidator().setEstimator(lr).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)


evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_91d64ab46afc
cv_lr: org.apache.spark.ml.tuning.CrossValidator = cv_834460ed96c0


In [157]:
val pipeline_lr = new Pipeline().setStages(Array(cv_lr))


pipeline_lr: org.apache.spark.ml.Pipeline = pipeline_1b079443272a


In [158]:
val Array(training,testing)=features_top_DF.randomSplit(Array(0.8,0.2))

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: int, ID: int ... 46 more fields]
testing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: int, ID: int ... 46 more fields]


In [159]:
val model_lr = pipeline_lr.fit(training)


2018-12-06 15:34:55 WARN  BlockManager:66 - Asked to remove block broadcast_5239, which does not exist


model_lr: org.apache.spark.ml.PipelineModel = pipeline_1b079443272a


In [160]:
val predictions_lr = model_lr.transform(testing)

predictions_lr: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 49 more fields]


In [161]:
predictions_lr.select("prediction","Arrest_indexed","selectedFeatures").show(5)


+----------+--------------+--------------------+
|prediction|Arrest_indexed|    selectedFeatures|
+----------+--------------+--------------------+
|       0.0|           0.0|[77.0,7.0,70.0,3....|
|       1.0|           0.0|[19.0,9.0,19.0,5....|
|       0.0|           1.0|[1.0,2.0,2.0,0.0,...|
|       0.0|           1.0|[35.0,1.0,25.0,1....|
|       1.0|           0.0|[77.0,7.0,70.0,2....|
+----------+--------------+--------------------+
only showing top 5 rows



In [162]:
val AUC_lr = evaluator.evaluate(predictions_lr)
println(s"Area under ROC curve(AUC) for LR on test data = $AUC_lr")

Area under ROC curve(AUC) for LR on test data = 0.5958459707451914


AUC_lr: Double = 0.5958459707451914


# Random Forest

In [163]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._

import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.feature._


In [164]:
val maxBins = 310
val rf = new RandomForestClassifier().setLabelCol("Arrest_indexed").setFeaturesCol("selectedFeatures")

val paramGrid =new ParamGridBuilder()
             .addGrid(rf.maxDepth, Array(2, 5))
             .addGrid(rf.numTrees, Array(5, 20))
             //.addGrid(tfidf.minDocFreq, Array(5,10))
             .addGrid(rf.maxBins, Array(310, 315, 315))
             .build()

//val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("rawPrediction").setLabelCol("rating").setMetricName("areaUnderROC")


val cv_rf = new CrossValidator().setEstimator(rf).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)

val pipeline_rf = new Pipeline().setStages(Array(cv_rf))


maxBins: Int = 310
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_fcb7910c201c
paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfc_fcb7910c201c-maxBins: 310,
	rfc_fcb7910c201c-maxDepth: 2,
	rfc_fcb7910c201c-numTrees: 5
}, {
	rfc_fcb7910c201c-maxBins: 315,
	rfc_fcb7910c201c-maxDepth: 2,
	rfc_fcb7910c201c-numTrees: 5
}, {
	rfc_fcb7910c201c-maxBins: 315,
	rfc_fcb7910c201c-maxDepth: 2,
	rfc_fcb7910c201c-numTrees: 5
}, {
	rfc_fcb7910c201c-maxBins: 310,
	rfc_fcb7910c201c-maxDepth: 2,
	rfc_fcb7910c201c-numTrees: 20
}, {
	rfc_fcb7910c201c-maxBins: 315,
	rfc_fcb7910c201c-maxDepth: 2,
	rfc_fcb7910c201c-numTrees: 20
}, {
	rfc_fcb7910c201c-maxBins: 315,
	rfc_fcb7910c201c-maxDepth: 2,
	rfc_fcb7910c201c-numTrees: 20
}, {
	rfc_fcb7910c201c-maxBins: 310,
	rfc_fcb791...

In [165]:
val pipelineModel_rf = pipeline_rf.fit(training)

pipelineModel_rf: org.apache.spark.ml.PipelineModel = pipeline_b4b9d84e1462


In [166]:
val predictions_rf = pipelineModel_rf.transform(testing)
val AUC_rf = evaluator.evaluate(predictions_rf)
println(s"Area under ROC curve(AUC) for RF on test data = $AUC_rf")




Area under ROC curve(AUC) for RF on test data = 0.794775709187095


predictions_rf: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 49 more fields]
AUC_rf: Double = 0.794775709187095


In [167]:
predictions_rf.select("prediction","Arrest_indexed","selectedFeatures").show(5)


+----------+--------------+--------------------+
|prediction|Arrest_indexed|    selectedFeatures|
+----------+--------------+--------------------+
|       0.0|           0.0|[77.0,7.0,70.0,3....|
|       0.0|           0.0|[19.0,9.0,19.0,5....|
|       1.0|           1.0|[1.0,2.0,2.0,0.0,...|
|       1.0|           1.0|[35.0,1.0,25.0,1....|
|       0.0|           0.0|[77.0,7.0,70.0,2....|
+----------+--------------+--------------------+
only showing top 5 rows



# Gradient Boosted Tree

In [168]:
//GBT
// Create a GBT model.
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val gbt = new GBTClassifier()
  .setLabelCol("Arrest_indexed")
  .setFeaturesCol("selectedFeatures")
  .setMaxIter(10)


import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
gbt: org.apache.spark.ml.classification.GBTClassifier = gbtc_25df9f49293f


In [169]:
val paramGrid = new ParamGridBuilder()
             .addGrid(gbt.maxDepth, Array(2,5))    
            .addGrid(gbt.maxBins, Array(310, 315, 315))
             .build()
//.addGrid(gbt.maxIter, Array(10, 20,100))
//.addGrid(gbt.maxBins, Array(310, 315, 315))
val evaluator = new BinaryClassificationEvaluator().setRawPredictionCol("prediction").setLabelCol("Arrest_indexed").setMetricName("areaUnderROC")

val cv_gbt = new CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setNumFolds(3)


val pipeline_gbt = new Pipeline().setStages(Array(cv_gbt))


//val Array(training,testing)=housing.randomSplit(Array(0.8,0.2),111)



paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	gbtc_25df9f49293f-maxBins: 310,
	gbtc_25df9f49293f-maxDepth: 2
}, {
	gbtc_25df9f49293f-maxBins: 310,
	gbtc_25df9f49293f-maxDepth: 5
}, {
	gbtc_25df9f49293f-maxBins: 315,
	gbtc_25df9f49293f-maxDepth: 2
}, {
	gbtc_25df9f49293f-maxBins: 315,
	gbtc_25df9f49293f-maxDepth: 5
}, {
	gbtc_25df9f49293f-maxBins: 315,
	gbtc_25df9f49293f-maxDepth: 2
}, {
	gbtc_25df9f49293f-maxBins: 315,
	gbtc_25df9f49293f-maxDepth: 5
})
evaluator: org.apache.spark.ml.evaluation.BinaryClassificationEvaluator = binEval_23463fbbec0a
cv_gbt: org.apache.spark.ml.tuning.CrossValidator = cv_2aa6012ea963
pipeline_gbt: org.apache.spark.ml.Pipeline = pipeline_e9c05415c026


In [170]:
features_top5_DF.cache()

res51: features_top5_DF.type = [_c0: int, ID: int ... 46 more fields]


In [171]:

val pipelineModel_gbt = pipeline_gbt.fit(training)



pipelineModel_gbt: org.apache.spark.ml.PipelineModel = pipeline_e9c05415c026


In [172]:

val predictions_gbt = pipelineModel_gbt.transform(testing)
predictions_gbt.select("prediction","Arrest_indexed","selectedFeatures").show()


val AUC_gbt = evaluator.evaluate(predictions_gbt)
println(s"Area under ROC curve(AUC) for GBT on test data = $AUC_gbt")

+----------+--------------+--------------------+
|prediction|Arrest_indexed|    selectedFeatures|
+----------+--------------+--------------------+
|       0.0|           0.0|[77.0,7.0,70.0,3....|
|       0.0|           0.0|[19.0,9.0,19.0,5....|
|       1.0|           1.0|[1.0,2.0,2.0,0.0,...|
|       1.0|           1.0|[35.0,1.0,25.0,1....|
|       0.0|           0.0|[77.0,7.0,70.0,2....|
|       1.0|           1.0|[4.0,0.0,4.0,7.0,...|
|       0.0|           0.0|[2.0,0.0,3.0,0.0,...|
|       1.0|           1.0|[9.0,2.0,8.0,3.0,...|
|       1.0|           1.0|[163.0,2.0,157.0,...|
|       1.0|           1.0|[21.0,2.0,22.0,1....|
|       1.0|           1.0|[42.0,2.0,41.0,0....|
|       1.0|           1.0|[21.0,2.0,22.0,5....|
|       1.0|           1.0|[11.0,8.0,10.0,16...|
|       0.0|           0.0|[8.0,3.0,5.0,6.0,...|
|       0.0|           0.0|[12.0,10.0,11.0,1...|
|       1.0|           1.0|[4.0,0.0,4.0,14.0...|
|       0.0|           0.0|[41.0,0.0,40.0,40...|
|       1.0|        

predictions_gbt: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 49 more fields]
AUC_gbt: Double = 0.821189356120801


# Ensemble Model

In [173]:
predictions_lr.createOrReplaceTempView("table_lr")
predictions_rf.createOrReplaceTempView("table_rf")
predictions_gbt.createOrReplaceTempView("table_gbt")

In [174]:

import spark.implicits._
import org.apache.spark.mllib.evaluation._

import spark.implicits._
import org.apache.spark.mllib.evaluation._


In [175]:
spark.sql("select selectedFeatures from table_lr").show(false)

+------------------------------------------------+
|selectedFeatures                                |
+------------------------------------------------+
|[77.0,7.0,70.0,3.0,0.0,128.0,20.0,41.0,68.0,7.0]|
|[19.0,9.0,19.0,5.0,0.0,72.0,0.0,1.0,0.0,8.0]    |
|[1.0,2.0,2.0,0.0,0.0,193.0,15.0,7.0,34.0,1.0]   |
|[35.0,1.0,25.0,1.0,0.0,100.0,9.0,31.0,61.0,2.0] |
|[77.0,7.0,70.0,2.0,0.0,107.0,7.0,34.0,20.0,7.0] |
|[4.0,0.0,4.0,7.0,0.0,37.0,4.0,5.0,12.0,0.0]     |
|[2.0,0.0,3.0,0.0,0.0,205.0,12.0,37.0,27.0,0.0]  |
|[9.0,2.0,8.0,3.0,0.0,14.0,0.0,1.0,8.0,1.0]      |
|[163.0,2.0,157.0,2.0,0.0,138.0,2.0,11.0,5.0,1.0]|
|[21.0,2.0,22.0,1.0,0.0,150.0,0.0,4.0,16.0,1.0]  |
|[42.0,2.0,41.0,0.0,0.0,138.0,2.0,11.0,5.0,1.0]  |
|[21.0,2.0,22.0,5.0,0.0,168.0,15.0,15.0,28.0,1.0]|
|[11.0,8.0,10.0,16.0,0.0,27.0,2.0,10.0,9.0,3.0]  |
|[8.0,3.0,5.0,6.0,0.0,102.0,16.0,21.0,13.0,4.0]  |
|[12.0,10.0,11.0,15.0,0.0,34.0,4.0,13.0,15.0,9.0]|
|[4.0,0.0,4.0,14.0,0.0,117.0,14.0,37.0,3.0,0.0]  |
|[41.0,0.0,40.0,40.0,0.0,22.0,1

In [176]:
val joins = spark.sql("select l.Arrest_indexed,l.prediction as prediction_lr,r.prediction as prediction_rf,g.prediction as prediction_gbt from table_lr l,table_rf r,table_gbt g where l.selectedFeatures = r.selectedFeatures and l.selectedFeatures = g.selectedFeatures")

joins: org.apache.spark.sql.DataFrame = [Arrest_indexed: double, prediction_lr: double ... 2 more fields]


In [177]:
joins.createOrReplaceTempView("join_temp")
val ensemble = spark.sql("select CASE WHEN (prediction_lr = prediction_rf OR prediction_lr = prediction_gbt) Then prediction_lr else case when prediction_rf=prediction_gbt then prediction_rf else prediction_lr END  END AS prediction_ensemble,Arrest_indexed as Arrest from  join_temp")

ensemble: org.apache.spark.sql.DataFrame = [prediction_ensemble: double, Arrest: double]


In [178]:
val predictionsAndLabels=ensemble.selectExpr("cast(prediction_ensemble as Double) prediction_ensemble", "cast(Arrest as Double) Arrest").rdd.map(row =>(row.getAs[Double]("prediction_ensemble"),row.getAs[Double]("Arrest")))





predictionsAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[18155] at map at <console>:178


In [180]:
val metrics= new BinaryClassificationMetrics(predictionsAndLabels)
val AUC_EN = metrics.areaUnderROC

metrics: org.apache.spark.mllib.evaluation.BinaryClassificationMetrics = org.apache.spark.mllib.evaluation.BinaryClassificationMetrics@4ea7bb3e
AUC_EN: Double = 0.8089813196530709


In [181]:
println(s"Area under ROC curve(AUC) for LR on test data = $AUC_lr")
println(s"Area under ROC curve(AUC) for RF on test data = $AUC_rf")
println(s"Area under ROC curve(AUC) for GBT on test data = $AUC_gbt")
println(s"Area under ROC curve(AUC) for Ensemble on test data = $AUC_EN")

Area under ROC curve(AUC) for LR on test data = 0.5958459707451914
Area under ROC curve(AUC) for RF on test data = 0.794775709187095
Area under ROC curve(AUC) for GBT on test data = 0.821189356120801
Area under ROC curve(AUC) for Ensemble on test data = 0.8089813196530709


In [192]:
import org.apache.spark.mllib.linalg._
import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.classification.RandomForestClassifier


import org.apache.spark.mllib.linalg._
import org.apache.spark.ml.classification.RandomForestClassificationModel
import org.apache.spark.ml.classification.RandomForestClassifier


In [196]:
predictions_gbt

res57: org.apache.spark.sql.DataFrame = [_c0: int, ID: int ... 49 more fields]


In [230]:

val featureImportanceGBT = pipelineModel_gbt.stages(0).asInstanceOf[CrossValidatorModel].bestModel.asInstanceOf[GBTClassificationModel].featureImportances


featureImportanceGBT: org.apache.spark.ml.linalg.Vector = (10,[0,3,5],[0.7832404957880407,0.1563977306224484,0.06036177358951093])


In [237]:
val featureImportanceRF = pipelineModel_rf.stages(0).asInstanceOf[CrossValidatorModel].bestModel.asInstanceOf[RandomForestClassificationModel].featureImportances


featureImportanceRF: org.apache.spark.ml.linalg.Vector = (10,[0,1,2,3,4,5,6,7,8,9],[0.201137613975929,0.30893347257301795,0.21078700596956676,0.012787054916409564,0.010964876735941242,0.023033123943009555,0.0032388114583067535,0.001102700050240262,0.0054026122436715805,0.2226127281339072])


In [223]:
val numeric_features_array = numeric_features.collect.map(_.toSeq).flatten

//val features_gbt= numeric_features++Array("selectedFeatures")
//featureImportanceGBT.foreach(println)

numeric_features_array: Array[Any] = Array(0.0, 1.0, 1.0, (41.782921527, -87.60436317), 1.0, 112.0, 8.0, 7.0, 24.0, 2.0, 12.0, 10.0, 11.0, (41.88063228, -87.635935494), 0.0, 57.0, 13.0, 4.0, 11.0, 9.0, 7.0, 0.0, 7.0, (41.771073064, -87.568278663), 0.0, 45.0, 8.0, 18.0, 4.0, 0.0, 12.0, 10.0, 11.0, (41.99973106, -87.705809711), 0.0, 172.0, 19.0, 43.0, 31.0, 9.0, 16.0, 6.0, 15.0, (41.839947022, -87.714672499), 0.0, 211.0, 7.0, 34.0, 20.0, 6.0, 2.0, 0.0, 3.0, (41.885753285, -87.626996239), 0.0, 22.0, 13.0, 2.0, 11.0, 0.0, 77.0, 7.0, 70.0, (41.980259177, -87.710009782), 0.0, 128.0, 20.0, 41.0, 68.0, 7.0, 11.0, 8.0, 10.0, (41.909508082, -87.755180702), 0.0, 8.0, 3.0, 9.0, 0.0, 3.0, 11.0, 8.0, 10.0, (41.76971848, -87.584270226), 0.0, 78.0, 8.0, 18.0, 4.0, 3.0, 19.0, 9.0, 19.0, (41.875039579, -...

In [224]:
val features_gbt= numeric_features_array++Array("selectedFeaturess")

features_gbt: Array[Any] = Array(0.0, 1.0, 1.0, (41.782921527, -87.60436317), 1.0, 112.0, 8.0, 7.0, 24.0, 2.0, 12.0, 10.0, 11.0, (41.88063228, -87.635935494), 0.0, 57.0, 13.0, 4.0, 11.0, 9.0, 7.0, 0.0, 7.0, (41.771073064, -87.568278663), 0.0, 45.0, 8.0, 18.0, 4.0, 0.0, 12.0, 10.0, 11.0, (41.99973106, -87.705809711), 0.0, 172.0, 19.0, 43.0, 31.0, 9.0, 16.0, 6.0, 15.0, (41.839947022, -87.714672499), 0.0, 211.0, 7.0, 34.0, 20.0, 6.0, 2.0, 0.0, 3.0, (41.885753285, -87.626996239), 0.0, 22.0, 13.0, 2.0, 11.0, 0.0, 77.0, 7.0, 70.0, (41.980259177, -87.710009782), 0.0, 128.0, 20.0, 41.0, 68.0, 7.0, 11.0, 8.0, 10.0, (41.909508082, -87.755180702), 0.0, 8.0, 3.0, 9.0, 0.0, 3.0, 11.0, 8.0, 10.0, (41.76971848, -87.584270226), 0.0, 78.0, 8.0, 18.0, 4.0, 3.0, 19.0, 9.0, 19.0, (41.875039579, -87.7436902...

In [238]:
val features_rf= numeric_features_array++Array("selectedFeaturess")

features_rf: Array[Any] = Array(0.0, 1.0, 1.0, (41.782921527, -87.60436317), 1.0, 112.0, 8.0, 7.0, 24.0, 2.0, 12.0, 10.0, 11.0, (41.88063228, -87.635935494), 0.0, 57.0, 13.0, 4.0, 11.0, 9.0, 7.0, 0.0, 7.0, (41.771073064, -87.568278663), 0.0, 45.0, 8.0, 18.0, 4.0, 0.0, 12.0, 10.0, 11.0, (41.99973106, -87.705809711), 0.0, 172.0, 19.0, 43.0, 31.0, 9.0, 16.0, 6.0, 15.0, (41.839947022, -87.714672499), 0.0, 211.0, 7.0, 34.0, 20.0, 6.0, 2.0, 0.0, 3.0, (41.885753285, -87.626996239), 0.0, 22.0, 13.0, 2.0, 11.0, 0.0, 77.0, 7.0, 70.0, (41.980259177, -87.710009782), 0.0, 128.0, 20.0, 41.0, 68.0, 7.0, 11.0, 8.0, 10.0, (41.909508082, -87.755180702), 0.0, 8.0, 3.0, 9.0, 0.0, 3.0, 11.0, 8.0, 10.0, (41.76971848, -87.584270226), 0.0, 78.0, 8.0, 18.0, 4.0, 3.0, 19.0, 9.0, 19.0, (41.875039579, -87.74369026...

In [235]:
val res_gbt = features_gbt.zip(featureImportanceGBT.toArray).foreach(println)

(0.0,0.7832404957880407)
(1.0,0.0)
(1.0,0.0)
((41.782921527, -87.60436317),0.1563977306224484)
(1.0,0.0)
(112.0,0.06036177358951093)
(8.0,0.0)
(7.0,0.0)
(24.0,0.0)
(2.0,0.0)


res_gbt: Unit = ()


In [239]:
val res_rf = features_rf.zip(featureImportanceRF.toArray).foreach(println)



(0.0,0.201137613975929)
(1.0,0.30893347257301795)
(1.0,0.21078700596956676)
((41.782921527, -87.60436317),0.012787054916409564)
(1.0,0.010964876735941242)
(112.0,0.023033123943009555)
(8.0,0.0032388114583067535)
(7.0,0.001102700050240262)
(24.0,0.0054026122436715805)
(2.0,0.2226127281339072)


res_rf: Unit = ()
