## SF crime data analysis and modeling

### In this notebook, I use Spark SQL for big data analysis on SF crime data. (https://data.sfgov.org/Public-Safety/Police-Department-Incident-Reports-Historical-2003/tmnf-yvry). 
The first part is OLAP for scrime data analysis.  
The second part is unsupervised learning for spatial data analysis.

**Note**: We can download the small data (one month e.g. 2018-10) for debug, then download the data from 2013 to 2018 for testing and analysising. 

1. It can be used for different city 
2. include other data together like house price, weather, news.

In [3]:
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
import warnings

import os
os.environ["PYSPARK_PYTHON"] = "python3"


In [4]:
#import urllib.request
# urllib.request.urlretrieve("https://data.sfgov.org/api/views/tmnf-yvry/rows.csv?accessType=DOWNLOAD", "/tmp/myxxxx.csv")
# dbutils.fs.mv("file:/tmp/myxxxx.csv", "dbfs:/laioffer/spark_hw1/data/sf_03_18.csv")
# display(dbutils.fs.ls("dbfs:/laioffer/spark_hw1/data/"))
## data source
# https://data.sfgov.org/api/views/tmnf-yvry/rows.csv?accessType=DOWNLOAD


In [5]:
data_path = "dbfs:/laioffer/spark_hw1/data/sf_03_18.csv"
# use this file name later

In [6]:
# read data from the data storage
crime_data_lines = sc.textFile(data_path)
#prepare data 
df_crimes = crime_data_lines.map(lambda line: [x.strip('"') for x in next(reader([line]))])
#get header
header = df_crimes.first()
print(header)

#remove the first line of data
crimes = df_crimes.filter(lambda x: x != header)

#get the first line of data
display(crimes.take(3))
# print(crimes.toDF().toPandas().head(0))

_1,_2,_3,_4,_5,_6,_7,_8,_9,_10,_11,_12,_13,_14,_15,_16,_17,_18,_19,_20,_21,_22,_23,_24,_25,_26,_27,_28,_29,_30,_31,_32,_33
146196161,NON-CRIMINAL,LOST PROPERTY,Tuesday,09/23/2014,01:00,SOUTHERN,NONE,800 Block of BRYANT ST,-122.403404791479,37.775420706711,POINT (-122.403404791479 37.775420706711),14619616171000,32,1,10,34,14,2,9,28853,34,,,,,,,2,,,,
150045675,ASSAULT,BATTERY,Thursday,01/15/2015,17:00,TARAVAL,NONE,1800 Block of VICENTE ST,-122.48560378101,37.7388214326705,POINT (-122.48560378101 37.7388214326705),15004567504134,40,10,7,35,1,8,3,29491,35,,,,,,,1,,,,
140632022,SUSPICIOUS OCC,INVESTIGATIVE DETENTION,Wednesday,07/30/2014,09:32,BAYVIEW,NONE,100 Block of GILLETTE AV,-122.396535107224,37.7106603302503,POINT (-122.396535107224 37.7106603302503),14063202264085,89,2,9,1,10,3,8,309,1,,,,,,,1,,,,


In [7]:
#get the total number of data 
print(crimes.count())

### Solove  big data issues via Spark
approach 1: use RDD   
approach 2: use Dataframe, register the RDD to a dataframe 
approach 3: use SQL 

***note***: I tried all approaches as introduced above

In [9]:

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("crime analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df_opt1 = spark.read.format("csv").option("header", "true").load(data_path)
## helper function to transform the date, choose your way to do it. 
# refer: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-functions-datetime.html
from pyspark.sql.functions import to_date, to_timestamp, hour, concat,lit,col,date_format
df_opt1 = df_opt1.withColumn('Date', to_date(df_opt1['Date'], "MM/dd/yy"))
df_opt1 = df_opt1.withColumn('Timestamp', concat(col('Date'),lit(' '), col('Time')))
df_opt1 = df_opt1.withColumn('Timestamp', to_timestamp(df_opt1['Timestamp'], "yy-MM-dd HH:mm"))
df_opt1 = df_opt1.withColumn('Hour', hour(df_opt1['Timestamp']))
df_opt1 = df_opt1.withColumn('DayOfWeek', date_format(df_opt1.Timestamp, "EEEE"))
df_opt1 = df_opt1.withColumn('X', df_opt1['X'].cast('float'))
df_opt1 = df_opt1.withColumn('Y', df_opt1['Y'].cast('float'))

display(df_opt1)
df_opt1.createOrReplaceTempView("sf_crime")
## approach 2 udf 
#from pyspark.sql.functions import col, udf
#from pyspark.sql.functions import expr
#from pyspark.sql.functions import from_unixtime

#date_func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())
#month_func = udf (lambda x: datetime.strptime(x, '%m/%d/%Y').strftime('%Y/%m'), StringType())

#df = df_opt1.withColumn('month_year', month_func(col('Date')))\
#           .withColumn('Date_time', date_func(col('Date')))

## approach 3 sql 
# select Date, substring(Date,7) as Year, substring(Date,1,2) as Month from sf_crime


## approach 4
# from pyspark.sql.functions import *
# df_update = df_opt1.withColumn("Date", to_date(col("Date"), "MM/dd/yyyy")) ##change datetype from string to date
# df_update.createOrReplaceTempView("sf_crime")
# crimeYearMonth = spark.sql("SELECT Year(Date) AS Year, Month(Date) AS Month, FROM sf_crime")

IncidntNum,Category,Descript,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Location,PdId,SF Find Neighborhoods,Current Police Districts,Current Supervisor Districts,Analysis Neighborhoods,DELETE - Fire Prevention Districts,DELETE - Police Districts,DELETE - Supervisor Districts,DELETE - Zip Codes,DELETE - Neighborhoods,DELETE - 2017 Fix It Zones,Civic Center Harm Reduction Project Boundary,Fix It Zones as of 2017-11-06,DELETE - HSOC Zones,Fix It Zones as of 2018-02-07,"CBD, BID and GBD Boundaries as of 2017","Areas of Vulnerability, 2016",Central Market/Tenderloin Boundary,Central Market/Tenderloin Boundary Polygon - Updated,HSOC Zones as of 2018-06-05,OWED Public Spaces,Timestamp,Hour
146196161,NON-CRIMINAL,LOST PROPERTY,Tuesday,2014-09-23,01:00,SOUTHERN,NONE,800 Block of BRYANT ST,-122.403404,37.77542,POINT (-122.403404791479 37.775420706711),14619616171000,32.0,1.0,10.0,34.0,14.0,2.0,9.0,28853.0,34.0,,,,,,,2.0,,,,,2014-09-23T01:00:00.000+0000,1
150045675,ASSAULT,BATTERY,Thursday,2015-01-15,17:00,TARAVAL,NONE,1800 Block of VICENTE ST,-122.4856,37.738823,POINT (-122.48560378101 37.7388214326705),15004567504134,40.0,10.0,7.0,35.0,1.0,8.0,3.0,29491.0,35.0,,,,,,,1.0,,,,,2015-01-15T17:00:00.000+0000,17
140632022,SUSPICIOUS OCC,INVESTIGATIVE DETENTION,Wednesday,2014-07-30,09:32,BAYVIEW,NONE,100 Block of GILLETTE AV,-122.39654,37.71066,POINT (-122.396535107224 37.7106603302503),14063202264085,89.0,2.0,9.0,1.0,10.0,3.0,8.0,309.0,1.0,,,,,,,1.0,,,,,2014-07-30T09:32:00.000+0000,9
150383259,ASSAULT,BATTERY,Saturday,2015-05-02,23:10,BAYVIEW,"ARREST, BOOKED",2400 Block of PHELPS ST,-122.40013,37.73009,POINT (-122.400130573297 37.7300925390327),15038325904134,87.0,2.0,9.0,1.0,10.0,3.0,8.0,58.0,1.0,,,,,,,2.0,,,,,2015-05-02T23:10:00.000+0000,23
40753980,OTHER OFFENSES,RECKLESS DRIVING,Friday,2004-07-02,13:43,BAYVIEW,NONE,I-280 / CESAR CHAVEZ ST,-120.5,90.0,POINT (-120.5 90),4075398065020,,,,,,,,,,,,,,,,,,,,,2004-07-02T13:43:00.000+0000,13
40855122,SUICIDE,SUICIDE BY JUMPING,Tuesday,2004-07-27,15:19,SOUTHERN,NONE,500 Block of I-80,-122.386665,37.789883,POINT (-122.386667033903 37.7898821569191),4085512260170,,,,,,,,28856.0,,,,,,,,,,,,,2004-07-27T15:19:00.000+0000,15
66085191,NON-CRIMINAL,LOST PROPERTY,Sunday,2006-11-19,17:45,BAYVIEW,NONE,0 Block of GIANTS DR,-122.387505,37.716877,POINT (-122.38750147945 37.716878646429),6608519171000,88.0,2.0,9.0,1.0,10.0,3.0,8.0,58.0,1.0,,,,,,,2.0,,,,,2006-11-19T17:45:00.000+0000,17
50908404,VEHICLE THEFT,STOLEN AUTOMOBILE,Saturday,2005-08-13,17:00,TENDERLOIN,NONE,JENNINGS CT / INGALLS ST,-120.5,90.0,POINT (-120.5 90),5090840407021,,,,,,,,,,,,,,,,,,,,,2005-08-13T17:00:00.000+0000,17
90768064,ARSON,ARSON OF A VEHICLE,Tuesday,2009-07-28,23:26,BAYVIEW,NONE,SELBY ST / OAKDALE AV,-122.39969,37.739902,POINT (-122.399686082806 37.739901780585),9076806426031,87.0,2.0,9.0,1.0,10.0,3.0,8.0,58.0,1.0,,,,,,,2.0,,,,,2009-07-28T23:26:00.000+0000,23
111027676,ASSAULT,BATTERY,Saturday,2011-12-24,07:00,SOUTHERN,NONE,0 Block of DORE ST,-122.41293,37.773926,POINT (-122.412933062384 37.7739274524819),11102767604134,32.0,1.0,10.0,34.0,8.0,2.0,9.0,28853.0,34.0,,1.0,,1.0,,,2.0,,,1.0,,2011-12-24T07:00:00.000+0000,7


#### Q1 question (OLAP): 
#####Counts the number of crimes for different category.

Below are some example codes to demonstrate the way to use Spark RDD, DF, and SQL to work with big data.

In [11]:
q1_result = df_opt1.groupBy('category').count().orderBy('count', ascending=False)
display(q1_result)

category,count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


In [12]:
#Spark SQL based
crimeCategory = spark.sql("SELECT  category, COUNT(*) AS Count FROM sf_crime GROUP BY category ORDER BY Count DESC")
display(crimeCategory)

category,Count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


In [13]:
catorgory_set_rdd = crimes.map(lambda item: (item[1],1))
from operator import add
result = sorted(catorgory_set_rdd.reduceByKey(add).collect(), key = lambda item: -item[1])
display(result)

_1,_2
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


In [14]:
We can see that the top 3 categories are Larceny/Theft, Other offenses and Non-offenses.

In [15]:
# important hints: 
## first step: spark df or sql to compute the statisitc result 
## second step: export your result to a pandas dataframe. 

crimes_pd_df = crimeCategory.toPandas()
display(crimes_pd_df)

category,Count
LARCENY/THEFT,480448
OTHER OFFENSES,309358
NON-CRIMINAL,238323
ASSAULT,194694
VEHICLE THEFT,126602
DRUG/NARCOTIC,119628
VANDALISM,116059
WARRANTS,101379
BURGLARY,91543
SUSPICIOUS OCC,80444


#### Q2 question (OLAP)
Counts the number of crimes for different district, and visualize your results

In [17]:
q2_result = df_opt1.groupBy('PdDistrict').count().orderBy('count', ascending=False)
display(q2_result)

PdDistrict,count
SOUTHERN,399785
MISSION,300076
NORTHERN,272713
CENTRAL,226255
BAYVIEW,221000
INGLESIDE,194180
TENDERLOIN,191746
TARAVAL,166971
PARK,125479
RICHMOND,116818


In [18]:
The districts that have most crimes are Southern, Mission, and Northern, etc.

#### Q3 question (OLAP)
Count the number of crimes each "Sunday" at "SF downtown".   
 1: SF downtown is defiend  via the range of spatial location. For example, I use a rectangle to define the SF downtown, or we can define a cicle with center as well. Thus, we need to write our own UDF function to filter data which are located inside certain spatial range. We can follow the example here: https://changhsinlee.com/pyspark-udf/

 2: SF downtown rectangle a < x < b  and c < y < d. thus, San Francisco Latitude and longitude coordinates are: 37.773972, -122.431297. X and Y represents each. So we assume SF downtown spacial range: X (-122.4213,-122.4313), Y(37.7540,37.7740).

In [20]:
from pyspark.sql.types import IntegerType
def crime_sunday_downtown(day_of_week, x, y):
  count = 0
  if day_of_week == 'Sunday' and x >= -122.4087 and x <= -122.3933 and y >= 37.7936 and y <=37.7966:
    count = 1
  return count
crime_sunday_downtown_udf = udf(lambda x, y, z : crime_sunday_downtown(x, y, z), IntegerType())
df_sunday_downtown_count = df_opt1.select('DayOfWeek', 'Date', 'X', 'Y', crime_sunday_downtown_udf('DayOfWeek', 'X', 'Y').alias('Sunday_Downtown'))
q3_result = df_sunday_downtown_count.filter(df_sunday_downtown_count.Sunday_Downtown == 1).groupBy(df_sunday_downtown_count.Date).count().orderBy(df_sunday_downtown_count.Date)
display(q3_result)


Date,count
2003-01-12,3
2003-01-19,2
2003-01-26,2
2003-02-02,2
2003-02-16,4
2003-02-23,6
2003-03-02,1
2003-03-09,4
2003-03-16,3
2003-03-23,1


In [21]:
crimeSundaySF = spark.sql("SELECT  Date, COUNT(*) AS Count FROM sf_crime WHERE DayOfWeek == 'Sunday' AND X>=-122.4087 AND X <= -122.3933 AND Y<=37.7966 AND Y>= 37.7936 GROUP BY Date ORDER BY Date")
display(crimeSundaySF)

Date,Count
2003-01-12,3
2003-01-19,2
2003-01-26,2
2003-02-02,2
2003-02-16,4
2003-02-23,6
2003-03-02,1
2003-03-09,4
2003-03-16,3
2003-03-23,1


#### Q4 question (OLAP)
Analysis the number of crime in each month of 2015, 2016, 2017, 2018.

In [23]:
q4_result = spark.sql("SELECT YEAR(Date) AS Year, MONTH(Date) AS Month, COUNT(*) AS Count FROM sf_crime WHERE YEAR(Date) in(2015, 2016, 2017, 2018) GROUP BY 1, 2 ORDER BY 1, 2")
display(q4_result)

Year,Month,Count
2015,1,13606
2015,2,12329
2015,3,13929
2015,4,12959
2015,5,13729
2015,6,13304
2015,7,13365
2015,8,13730
2015,9,12896
2015,10,13147


In [24]:
For the first four months, crimes in 2018 are compared low than in previous years.

#### Q5 question (OLAP)
Analysis the number of crime w.r.t the hour in certian day like 2015/12/15, 2016/12/15, 2017/12/15.

In [26]:
q5_result = spark.sql("SELECT Date,  Hour, COUNT(*) AS Count FROM sf_crime WHERE Date IN ('2015-12-15', '2016-12-15', '2017-12-15') GROUP BY 1, 2 ORDER BY 1, 2")
display(q5_result)

Date,Hour,Count
2015-12-15,0,15
2015-12-15,1,6
2015-12-15,2,5
2015-12-15,3,4
2015-12-15,4,10
2015-12-15,5,3
2015-12-15,6,4
2015-12-15,7,8
2015-12-15,8,12
2015-12-15,9,10


In [27]:
q5_result_1 = spark.sql("SELECT Hour, COUNT(*) AS Count FROM sf_crime WHERE Date ='2015-12-15' GROUP BY 1 ORDER BY 1")
display(q5_result_1)

Hour,Count
0,15
1,6
2,5
3,4
4,10
5,3
6,4
7,8
8,12
9,10


In [28]:
q5_result_2 = spark.sql("SELECT Hour, COUNT(*) AS Count FROM sf_crime WHERE Date ='2016-12-15' GROUP BY 1 ORDER BY 1")
display(q5_result_2)

Hour,Count
0,22
1,10
2,12
3,9
4,1
5,6
6,5
7,3
8,24
9,23


In [29]:
q5_result_3 = spark.sql("SELECT Hour, COUNT(*) AS Count FROM sf_crime WHERE Date ='2017-12-15' GROUP BY 1 ORDER BY 1")
display(q5_result_3)

Hour,Count
0,30
1,12
2,8
3,4
4,9
5,2
6,6
7,11
8,26
9,14


In [30]:
Crimes are compared low in the early morning before 7 am and high after 5 pm. 

#### Q6 question (OLAP)
(1) 1: Find out the top-3 danger disrict  
(2) 2: find out the crime event w.r.t category and time (hour) from the result of step 1

In [32]:
top3_danger_drt = spark.sql("SELECT PdDistrict, COUNT(*) AS Count FROM sf_crime GROUP BY PdDistrict ORDER BY Count DESC LIMIT 3")
display(top3_danger_drt)

PdDistrict,Count
SOUTHERN,399785
MISSION,300076
NORTHERN,272713


In [33]:
top3_danger_drt.createOrReplaceTempView("top3_danger_drt")
q6_result = spark.sql("SELECT PdDistrict,Category, Hour, COUNT(*) AS Count FROM sf_crime WHERE PdDistrict IN (SELECT PdDistrict FROM top3_danger_drt) GROUP BY 1,2,3 ORDER BY 4 DESC")
display(q6_result)

PdDistrict,Category,Hour,Count
SOUTHERN,LARCENY/THEFT,18,9633
SOUTHERN,LARCENY/THEFT,19,9198
SOUTHERN,LARCENY/THEFT,17,7927
SOUTHERN,LARCENY/THEFT,20,7414
NORTHERN,LARCENY/THEFT,19,6819
SOUTHERN,LARCENY/THEFT,16,6741
NORTHERN,LARCENY/THEFT,18,6724
SOUTHERN,LARCENY/THEFT,12,6451
SOUTHERN,LARCENY/THEFT,15,6377
SOUTHERN,LARCENY/THEFT,14,6255


In [34]:
q6_hour = spark.sql("SELECT PdDistrict, Hour, COUNT(*) AS Count FROM sf_crime WHERE PdDistrict IN (SELECT PdDistrict FROM top3_danger_drt) GROUP BY 1,2 ORDER BY 1,2")
display(q6_hour)

PdDistrict,Hour,Count
MISSION,0,16797
MISSION,1,11125
MISSION,2,8930
MISSION,3,5599
MISSION,4,3920
MISSION,5,3089
MISSION,6,4791
MISSION,7,7449
MISSION,8,10647
MISSION,9,11453


In [35]:
Crimes decrease before 5 am and increase after 7 am.

#### Q7 question (OLAP)
For different category of crime, find the percentage of resolution.

In [37]:
q7_resolution = spark.sql("SELECT Category, Resolution, COUNT(*) AS Count,sum(count(*)) over(partition by Category) AS CateCount, count(*) * 100.0 / sum(count(*)) over(partition by Category) AS Percentage FROM sf_crime GROUP BY 1,2 ORDER BY 1")
display(q7_resolution)

Category,Resolution,Count,CateCount,Percentage
ARSON,PROSECUTED BY OUTSIDE AGENCY,3,3931,0.07631645891631
ARSON,PSYCHOPATHIC CASE,17,3931,0.43245993385907
ARSON,PROSECUTED FOR LESSER OFFENSE,1,3931,0.02543881963877
ARSON,JUVENILE CITED,9,3931,0.22894937674892
ARSON,CLEARED-CONTACT JUVENILE FOR MORE INFO,2,3931,0.05087763927754
ARSON,"ARREST, BOOKED",630,3931,16.02645637242432
ARSON,UNFOUNDED,25,3931,0.63597049096922
ARSON,JUVENILE ADMONISHED,9,3931,0.22894937674892
ARSON,EXCEPTIONAL CLEARANCE,2,3931,0.05087763927754
ARSON,NOT PROSECUTED,3,3931,0.07631645891631


In [38]:
q7_resolution.createOrReplaceTempView("q7_resolution")
q7_1 = spark.sql("SELECT Category, SUM(Count) AS ResolutionCount, CateCount As Total, SUM(Percentage) AS Percentage FROM q7_resolution WHERE Resolution != 'NONE' GROUP BY 1,3 ORDER BY 4 DESC ")
display(q7_1)

Category,ResolutionCount,Total,Percentage
PROSTITUTION,15851,16701,94.91048440213162
WARRANTS,95897,101379,94.5925684806518
DRIVING UNDER THE INFLUENCE,5355,5672,94.4111424541608
DRUG/NARCOTIC,109357,119628,91.41421740729594
LIQUOR LAWS,3632,4083,88.95420034288514
STOLEN PROPERTY,10452,11891,87.89841056261037
LOITERING,2131,2430,87.69547325102882
DRUNKENNESS,8101,9826,82.44453490738859
WEAPON LAWS,16164,22234,72.69946928128093
OTHER OFFENSES,221514,309358,71.60441947517117


In [39]:
display(q7_1)

Category,ResolutionCount,Total,Percentage
PROSTITUTION,15851,16701,94.91048440213162
WARRANTS,95897,101379,94.5925684806518
DRIVING UNDER THE INFLUENCE,5355,5672,94.4111424541608
DRUG/NARCOTIC,109357,119628,91.41421740729594
LIQUOR LAWS,3632,4083,88.95420034288514
STOLEN PROPERTY,10452,11891,87.89841056261037
LOITERING,2131,2430,87.69547325102882
DRUNKENNESS,8101,9826,82.44453490738859
WEAPON LAWS,16164,22234,72.69946928128093
OTHER OFFENSES,221514,309358,71.60441947517117


In [40]:
Most crimes have compared high resolutions. However, crimes like Larceny/Theft, Vehicle theft, and recovered vehicle have a very low-resolution rate.

### Part 2: Clustering
I applied Spark ML custering algorithm to cluster the spatial data, then visualize the clustering results.

In [42]:
q8_df = df_opt1.select(['X', 'Y']).where(df_opt1.Y <90)
display(q8_df)

X,Y
-122.403404,37.77542
-122.4856,37.738823
-122.39654,37.71066
-122.40013,37.73009
-122.386665,37.789883
-122.387505,37.716877
-122.39969,37.739902
-122.41293,37.773926
-122.41984,37.786358
-122.39895,37.73385


In [43]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=['X', 'Y'], outputCol='features')
df_kmeans = vecAssembler.transform(q8_df)

In [44]:
# Optimize choice of k
cost = np.zeros(20)
for k in range(2,20):
  kmeans = KMeans().setK(k).setSeed(1)
  model = kmeans.fit(df_kmeans.sample(False, 0.1, seed=42))
  cost[k] = model.computeCost(df_kmeans)
  
fig, ax = plt.subplots(1, 1, figsize=(8, 6))
ax.plot(range(2,20), cost[2:20])
ax.set_xlabel('K')
ax.set_ylabel('cost')
display(fig.show())

In [45]:
# Trains a k-means model
kmeans = KMeans().setK(15).setSeed(1)
model = kmeans.fit(df_kmeans)
centers = model.clusterCenters()

df_pred = model.transform(df_kmeans)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(df_pred)

print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [46]:
display(df_pred)

X,Y,features,prediction
-122.403404,37.77542,"List(1, 2, List(), List(-122.40340423583984, 37.775421142578125))",0
-122.4856,37.738823,"List(1, 2, List(), List(-122.48560333251953, 37.73882293701172))",11
-122.39654,37.71066,"List(1, 2, List(), List(-122.39653778076172, 37.71065902709961))",12
-122.40013,37.73009,"List(1, 2, List(), List(-122.40013122558594, 37.7300910949707))",10
-122.386665,37.789883,"List(1, 2, List(), List(-122.38666534423828, 37.78988265991211))",5
-122.387505,37.716877,"List(1, 2, List(), List(-122.38750457763672, 37.71687698364258))",10
-122.39969,37.739902,"List(1, 2, List(), List(-122.39968872070312, 37.73990249633789))",10
-122.41293,37.773926,"List(1, 2, List(), List(-122.41293334960938, 37.77392578125))",7
-122.41984,37.786358,"List(1, 2, List(), List(-122.41983795166016, 37.78635787963867))",13
-122.39895,37.73385,"List(1, 2, List(), List(-122.3989486694336, 37.733848571777344))",10


In [47]:
res = df_pred.drop('features').toPandas()
plt.style.use('ggplot')
fig, ax = plt.subplots()
ax.scatter(res['X'], res['Y'], c=(res['prediction']),cmap=plt.cm.jet, alpha=0.9)
ax.set_title("the spatial distribution")
display()

### Conclusion. 
1.I used Spark to conduct big data analysis of crime data in San Francisco. I expected to discover potential crime patterns and find solutions from data analysis. I found that theft is the largest number of crimes, and the resolution rate is also the lowest. The three places of SOUTHERN, MISIION, and NORTHERN are crime-prone areas. The frequency of crime incidents has become higher since 7 o'clock, and it is compared high after 4 pm.

2.The clustering and cluster center I found can be helpful for police to distribute resources.

### Optional part: Time series analysis
This part is not based on Spark, and only based on Pandas Time Series package.    
process:  
1.visualize time series  
2.plot ACF and find optimal parameter  
3.Train ARIMA  
4.Prediction 

Refer:   
https://zhuanlan.zhihu.com/p/35282988  
https://zhuanlan.zhihu.com/p/35128342  
https://www.statsmodels.org/dev/examples/notebooks/generated/tsa_arma_0.html  
https://www.howtoing.com/a-guide-to-time-series-forecasting-with-arima-in-python-3  
https://www.joinquant.com/post/9576?tag=algorithm  
https://blog.csdn.net/u012052268/article/details/79452244