## Data Set - 311


For this exercise I have chosen to run with the 311 data that we created a while back. This dataset and the ability to reference it made more sense to me and there should be enough data in there to run interesting models on it.

In [1]:
# Import libraries
# I'm starting with a bunch as I'm not sure 

from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer

In [2]:
# For the model features, we may need to revisit once we decide on what features to include and predict on.
# For now, my idea is to predict whether or not a case is closed

# 'statusI', 'categoryI','complaint_typeI','descriptorI','sourceI','police_districtI'

def vector_from_inputs(r):
    return(r["statusI"], Vectors.dense(float(r["categoryI"]),
                                           float(r["complaint_typeI"]),
                                           float(r["descriptorI"]),
                                           float(r["sourceI"]),
                                           #float(r["latitude"]),
                                           #float(r["longitude"],
                                           float(r["neighborhoodI"]),
                                           float(r["police_districtI"])))

spark = SparkSession.builder.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar').config("spark.sql.autoBroadcastJoinThreshold", "-1").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/08 21:16:50 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/04/08 21:16:50 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/04/08 21:16:50 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/04/08 21:16:50 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


## Get BigQuery Data

For this model, we will use the below features to run a classification on case status. Now, there are likely other features that are more important or with more meaning (such as how long case is open) but for now we will start here to get our feet wet. 

Predicted Variable:

    - status
    
Predictors:
    
    - category
    - complaint_type
    - descriptor
    - source
    - latitude
    - longitude
    - police_district
    
**note:** I have specifically used the above listed columns 'lat'/'long' for location data vs. categorizing the listed neighborhoods. This could be done but we already have so many features we will have to index/encode that I don't want to overwhelm the model

In [3]:
crime = spark.read.format("bigquery").option(
    "table", "lcmhng_biengine_tutorial.311_service_requests").load()
# Create a view so that Spark SQL queries can be run against the data.
crime.createOrReplaceTempView("311_crime")

## -------------------ENSURE CLEAN DATA NO NULLS----------------

# Original SELECT with Lat/Long moved to neighborhood due to data complexity

sql_query = """
SELECT status, category, complaint_type, descriptor, source, neighborhood, police_district
from 311_crime
WHERE status is not null
AND category is not null
AND complaint_type is not null
AND descriptor is not null
AND source is not null
AND neighborhood is not null
AND police_district is not null
"""
crime_clean = spark.sql(sql_query)

In [4]:
crime_clean.show(10)

[Stage 0:>                                                          (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+-----------------+----------------+---------------+
|status|            category|      complaint_type|          descriptor|           source|    neighborhood|police_district|
+------+--------------------+--------------------+--------------------+-----------------+----------------+---------------+
|Closed|Rec and Park Requ...|Park - Neighborho...|          Irrigation|            Phone|Presidio Terrace|       RICHMOND|
|Closed|    Damaged Property|Damaged Transit_S...|Transit_Shelter_P...|            Phone|   Outer Mission|      INGLESIDE|
|Closed|       Child Request|Shared_SpacesObst...|Required_gap_for_...|Integrated Agency|Presidio Heights|       RICHMOND|
|Closed|   Abandoned Vehicle|Abandoned Vehicle...|DPT Abandoned Veh...|            Phone|   Miraloma Park|      INGLESIDE|
|Closed|    Tree Maintenance|Trees - Overgrown...|      Blocking_signs|            Phone|      Parkmerced|        TARAVAL|
|Closed|    Tree

                                                                                

In [5]:
print(crime_clean.count())



4904881


                                                                                

In [6]:
## Seeing as above the number of rows is huge we are just going to take a random sample of 1% of the data as that appears to be
## the rough threshold to where this will run without a reconfigure

crime_clean = crime_clean.sample(fraction=0.01)

## Create indexed columns for model similar to previous exercise

In [7]:
#First index columns
data_index = StringIndexer(inputCols=['status', 'category','complaint_type','descriptor','source','police_district', 'neighborhood'], 
                           outputCols=['statusI', 'categoryI','complaint_typeI','descriptorI','sourceI','police_districtI', 'neighborhoodI'])

data_indexed = data_index.fit(crime_clean).transform(crime_clean)

                                                                                

In [8]:
data_indexed.show(10)

[Stage 7:>                                                          (0 + 1) / 1]

+------+--------------------+--------------------+--------------------+-----------------+--------------------+---------------+----------------+-----------+-------+---------+---------------+-------------+-------+
|status|            category|      complaint_type|          descriptor|           source|        neighborhood|police_district|police_districtI|descriptorI|statusI|categoryI|complaint_typeI|neighborhoodI|sourceI|
+------+--------------------+--------------------+--------------------+-----------------+--------------------+---------------+----------------+-----------+-------+---------+---------------+-------------+-------+
|Closed|       SFHA Requests|          Electrical|Electrical - Routine|            Phone|      Merced Heights|        TARAVAL|             5.0|      137.0|    0.0|     14.0|           60.0|         58.0|    1.0|
|Closed|       SFHA Requests|            Plumbing|  Plumbing - Routine|            Phone|           Sunnydale|      INGLESIDE|             2.0|      103

                                                                                

In [9]:
# We will only need the indexed versions of the columns

model_data = data_indexed.drop('status', 'category','complaint_type','descriptor','source','police_district', 'neighborhood')

In [10]:
model_data.show(1)

+----------------+-----------+-------+---------+---------------+-------------+-------+
|police_districtI|descriptorI|statusI|categoryI|complaint_typeI|neighborhoodI|sourceI|
+----------------+-----------+-------+---------+---------------+-------------+-------+
|             5.0|      137.0|    0.0|     14.0|           60.0|         58.0|    1.0|
+----------------+-----------+-------+---------+---------------+-------------+-------+
only showing top 1 row



In [11]:
# Finally, move to training data

training_data = model_data.rdd.map(vector_from_inputs).toDF(["label",
                                                             "features"])
training_data.cache()

                                                                                

DataFrame[label: double, features: vector]

**checking size of data as I had issues below**

## Modeling

Now, in light of instructions we will choose a different classification model for this analysis for variability. I'll start with a Log and see where that goes.


In [12]:
logr = LogisticRegression(maxIter=5, regParam=0.3, elasticNetParam=0.7)
model = logr.fit(training_data)

22/04/08 21:17:45 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/08 21:17:45 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [13]:
model.summary.accuracy

                                                                                

0.9868239272532322

In [14]:
training_data

DataFrame[label: double, features: vector]

In [15]:
# Additional Model Summary Notes
# Took some notes to derive a confusion matrix from the output

from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
import pandas as pd


predictionAndLabels = model.transform(training_data).select('label', 'prediction')

#important: need to cast to float type, and order by prediction, else it won't work
#preds_and_labels = predictions.select(['prediction','features']).withColumn('label', F.col('d').cast(FloatType())).orderBy('prediction')

#select only prediction and label columns
#preds_and_labels = preds_and_labels.select(['prediction','label'])

#predictionAndLabels = SparkContext.parallelize(predictions, training_data.label)

metrics = MulticlassMetrics(predictionAndLabels.rdd.map(lambda x: tuple(map(float, x))))

#print(metrics.confusionMatrix().toArray())

confusion_matrix = metrics.confusionMatrix().toArray()
labels = [int(l) for l in metrics.call('labels')]
confusion_matrix = pd.DataFrame(confusion_matrix , index=labels, columns=labels)

print(confusion_matrix)



         0
0  47858.0


                                                                                