## Data Pre-processing and Cleaning

In [1]:
# import findspark
import findspark
findspark.init()

In [2]:
# import SparkSession
from pyspark.sql import SparkSession

In [3]:
# create session
spark = SparkSession \
    .builder \
    .appName("Python Spark Clustering example") \
    .getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x000000EC98870F28>


In [5]:
# read dataset
df = spark.read.csv("crime.csv", header=True, inferSchema=True)

In [6]:
# find out attributes
df.head()

Row(INCIDENT_ID=2016376978, OFFENSE_ID=2016376978521300, OFFENSE_CODE=5213, OFFENSE_CODE_EXTENSION=0, OFFENSE_TYPE_ID='weapon-unlawful-discharge-of', OFFENSE_CATEGORY_ID='all-other-crimes', FIRST_OCCURRENCE_DATE='6/15/2016 11:31:00 PM', LAST_OCCURRENCE_DATE=None, REPORTED_DATE='6/15/2016 11:31:00 PM', INCIDENT_ADDRESS=None, GEO_X=3193983, GEO_Y=1707251, GEO_LON=-104.8098811, GEO_LAT=39.773188, DISTRICT_ID=5, PRECINCT_ID=521, NEIGHBORHOOD_ID='montbello', IS_CRIME=1, IS_TRAFFIC=0)

In [7]:
# find out each schema
df.schema

StructType(List(StructField(INCIDENT_ID,LongType,true),StructField(OFFENSE_ID,LongType,true),StructField(OFFENSE_CODE,IntegerType,true),StructField(OFFENSE_CODE_EXTENSION,IntegerType,true),StructField(OFFENSE_TYPE_ID,StringType,true),StructField(OFFENSE_CATEGORY_ID,StringType,true),StructField(FIRST_OCCURRENCE_DATE,StringType,true),StructField(LAST_OCCURRENCE_DATE,StringType,true),StructField(REPORTED_DATE,StringType,true),StructField(INCIDENT_ADDRESS,StringType,true),StructField(GEO_X,IntegerType,true),StructField(GEO_Y,IntegerType,true),StructField(GEO_LON,DoubleType,true),StructField(GEO_LAT,DoubleType,true),StructField(DISTRICT_ID,IntegerType,true),StructField(PRECINCT_ID,IntegerType,true),StructField(NEIGHBORHOOD_ID,StringType,true),StructField(IS_CRIME,IntegerType,true),StructField(IS_TRAFFIC,IntegerType,true)))

In [8]:
df.count()

483347

In [9]:
df_selected = df.select('INCIDENT_ID', 'GEO_LAT', 'GEO_LON')
df_selected.head()

Row(INCIDENT_ID=2016376978, GEO_LAT=39.773188, GEO_LON=-104.8098811)

In [10]:
df_selected.count()

483347

In [11]:
# create alias table to work on as 'crimes'
df_selected.createOrReplaceTempView("crimes")

In [12]:
query1 = spark.sql("SELECT *\
                    FROM crimes")
query1.count()

483347

In [13]:
query1 = spark.sql("SELECT DISTINCT *\
                    FROM crimes").na.drop()
query1.count()

442461

In [14]:
# import pandas
import pandas as pd

In [15]:
# convert the result into pandas dataframe then save it as single csv file
pd_query1 = query1.toPandas()
pd_query1.to_csv("C:/big-data/final-project/crime_preprocessed.csv", index=False)

## Clustering and Visualization

In [16]:
# read new dataset
df = spark.read.csv("crime_preprocessed.csv", header=True, inferSchema=True)
df.head()

Row(INCIDENT_ID=201872754, GEO_LAT=39.7280757, GEO_LON=-104.9904797)

In [17]:
df.schema

StructType(List(StructField(INCIDENT_ID,LongType,true),StructField(GEO_LAT,DoubleType,true),StructField(GEO_LON,DoubleType,true)))

In [18]:
# create alias table to work on as 'crimes'
df.createOrReplaceTempView("crimes")

In [19]:
query1 = spark.sql("SELECT INCIDENT_ID, GEO_LAT, GEO_LON\
                    FROM crimes")
query1.show()

+-----------+----------+------------+
|INCIDENT_ID|   GEO_LAT|     GEO_LON|
+-----------+----------+------------+
|  201872754|39.7280757|-104.9904797|
| 2018160078|39.7010384|-104.9411546|
|20188000138|39.8063801|-104.7835494|
|  201873363|39.7419576|-104.9998593|
|20186002182|39.6910538|-105.0075138|
| 2018158702|39.7398975| -104.987485|
| 2018135501| 39.660673|-105.0583917|
| 2018135535|39.7394277|-104.8927758|
| 2018135170|39.6276828|-104.8978601|
| 2018121073|39.7400317|-104.9730966|
|  201856594|39.7541424|-104.9893015|
| 2018121832|39.7841834|-104.7951591|
| 2018122630|39.7609489|-105.0043337|
|20186000435|39.7898232|-104.9959236|
|   20186331| 39.800103|-104.7721177|
|20186002300| 39.676926|-105.0002765|
| 2018234862|39.7338946|-105.0552142|
| 2018226102|39.7489126|-104.9873978|
|20186003257|39.6936244|-104.9766113|
|   20182905|39.7733189|-104.8492671|
+-----------+----------+------------+
only showing top 20 rows



In [20]:
query1.count()

442461

In [21]:
# because pyspark.ml only reads input in vector, we're gonna
# need to convert latitude and longitude into vector
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["GEO_LAT", "GEO_LON"],
    outputCol='features')

query1 = assembler.transform(query1)
query1.show()

+-----------+----------+------------+--------------------+
|INCIDENT_ID|   GEO_LAT|     GEO_LON|            features|
+-----------+----------+------------+--------------------+
|  201872754|39.7280757|-104.9904797|[39.7280757,-104....|
| 2018160078|39.7010384|-104.9411546|[39.7010384,-104....|
|20188000138|39.8063801|-104.7835494|[39.8063801,-104....|
|  201873363|39.7419576|-104.9998593|[39.7419576,-104....|
|20186002182|39.6910538|-105.0075138|[39.6910538,-105....|
| 2018158702|39.7398975| -104.987485|[39.7398975,-104....|
| 2018135501| 39.660673|-105.0583917|[39.660673,-105.0...|
| 2018135535|39.7394277|-104.8927758|[39.7394277,-104....|
| 2018135170|39.6276828|-104.8978601|[39.6276828,-104....|
| 2018121073|39.7400317|-104.9730966|[39.7400317,-104....|
|  201856594|39.7541424|-104.9893015|[39.7541424,-104....|
| 2018121832|39.7841834|-104.7951591|[39.7841834,-104....|
| 2018122630|39.7609489|-105.0043337|[39.7609489,-105....|
|20186000435|39.7898232|-104.9959236|[39.7898232,-104...

In [22]:
# before we predict each sample, we need to train model to cluster
# setK indicates the number of clusters we would like to have,
# and setSeed is to set the level of randomness
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans().setK(9).setSeed(1)
model = kmeans.fit(query1)

In [23]:
# predict and show result
predictions = model.transform(query1)
predictions.show()

+-----------+----------+------------+--------------------+----------+
|INCIDENT_ID|   GEO_LAT|     GEO_LON|            features|prediction|
+-----------+----------+------------+--------------------+----------+
|  201872754|39.7280757|-104.9904797|[39.7280757,-104....|         5|
| 2018160078|39.7010384|-104.9411546|[39.7010384,-104....|         8|
|20188000138|39.8063801|-104.7835494|[39.8063801,-104....|         4|
|  201873363|39.7419576|-104.9998593|[39.7419576,-104....|         5|
|20186002182|39.6910538|-105.0075138|[39.6910538,-105....|         0|
| 2018158702|39.7398975| -104.987485|[39.7398975,-104....|         5|
| 2018135501| 39.660673|-105.0583917|[39.660673,-105.0...|         6|
| 2018135535|39.7394277|-104.8927758|[39.7394277,-104....|         2|
| 2018135170|39.6276828|-104.8978601|[39.6276828,-104....|         7|
| 2018121073|39.7400317|-104.9730966|[39.7400317,-104....|         5|
|  201856594|39.7541424|-104.9893015|[39.7541424,-104....|         5|
| 2018121832|39.7841

In [24]:
# show the final centroids of each cluster
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[  39.70225565 -105.01477399]
[ 1.97380388e-02 -2.39276316e-06]
[  39.7623903  -104.88463562]
[  39.75549371 -105.0263742 ]
[  39.80246576 -104.75816426]
[  39.74799273 -104.98031865]
[  39.65784151 -105.05837386]
[  39.66146771 -104.89591159]
[  39.7010466  -104.94111587]
