<a href="https://colab.research.google.com/github/tingyiwu714/san-diego-crime-analysis/blob/master/Spark_SD_Crime.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# San Diego Crime Analysis

## 0: Setup and Load Data

Set up Google Drive environment

In [131]:
# install Spark, Java and findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

# set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# initilize pyspark
import findspark
findspark.init()

# start spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from google.colab import output
output.clear()

In [132]:
# import packages
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.types import *
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import Row

!pip install geopandas
import geopandas as gpd
from geopandas.tools import geocode

from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

import folium
from folium import Choropleth, Circle, Marker
from folium.plugins import HeatMap, MarkerCluster

import warnings
warnings.filterwarnings('ignore')

output.clear()

Load datasets into Spark DataFrames

The datasets were adapted from the San Diego Regional Data Library

In [133]:
# load crime data between 2007 to 2011
# file_id = ['1GynC_phtJr_ycck_FwP6U_1G4-wGjjrb']
file_id = ['16GBta4t4zAWO0yr7w7dLDyo7Ji34zzWf',
           '1EblPEoGj4x8hvxLzjsDZh-IKeYhELE18',
           '1tbURDD5QDeAgaHgGvZxIBIrkmQIxFPeh',
           '1WuFB52qSr-dnB5--u5jYwog5Uno12tEK',
           '1QdadT4p1O-FJdjfFlSkSI2Y9dFN-cKdq']
mySchema = StructType([StructField("activityType", StringType(), True),
                       StructField("AGENCY", StringType(), True),
                       StructField("activityDate", StringType(), True),
                       StructField("LEGEND", StringType(), True),
                       StructField("Charge_Description", StringType(), True),
                       StructField("BLOCK_ADDRESS", StringType(), True),
                       StructField("City_Name", StringType(), True),
                       StructField("ZipCode", StringType(), True)])
sdfs = []
for id in file_id:
  link = 'https://drive.google.com/uc?export=download&id={FILE_ID}'
  url = link.format(FILE_ID=id)
  pdf = pd.read_csv(url, dtype=str)
  spd = spark.createDataFrame(pdf, schema=mySchema)
  sdfs.append(spd)
df_07to11 = reduce(DataFrame.unionAll, sdfs)

In [134]:
# load crime data between 2012 to 2017
# file_id = ['12VYPF8HeJH1CpDfsM3fbn2S77KBhD6pv']
file_id = ['1TpeuhgB7IHa7IDupjkSoY_CdLHQCdIhd',
           '1WgMUozlgojzrc3RPREimLrcToY1yoRaA',
           '1d05-6gtJYEanI6W-9NhteVEzL6mdpXe_',
           '1VdAikAd1LRkh7Z81bVv61R_A4RUespTf',
           '159AY4OxMvX-XF00FqFQW4KPy1aJOWjMJ',
           '18A3yryRU2q873W149H653jl_UgxWsM_-']
mySchema = StructType([StructField("reportingYear", StringType(), True),
                       StructField("reportingMonth", StringType(), True),
                       StructField("agency", StringType(), True),
                       StructField("activityStatus", StringType(), True),
                       StructField("activitydate", StringType(), True),
                       StructField("numberActualReported", StringType(), True),
                       StructField("BLOCK_ADDRESS", StringType(), True),
                       StructField("city", StringType(), True),
                       StructField("zipCode", StringType(), True),
                       StructField("censusTract", StringType(), True),
                       StructField("censusBlock", StringType(), True),
                       StructField("CrimeCategory", StringType(), True),
                       StructField("CrimeDescription", StringType(), True)])
sdfs = []
for id in file_id:
  link = 'https://drive.google.com/uc?export=download&id={FILE_ID}'
  url = link.format(FILE_ID=id)
  pdf = pd.read_excel(url, dtype=str)
  spd = spark.createDataFrame(pdf, schema=mySchema)
  sdfs.append(spd)
df_12to17 = reduce(DataFrame.unionAll, sdfs)

## 1: Data Exploration

### 1.1 Understand Raw Dataset

In [136]:
print("Number of rows: ", df_07to11.count())
print("Number of cols: ", len(df_07to11.columns))
df_07to11.show(5)

Number of rows:  843468
Number of cols:  8
+------------+--------------------+--------------------+-------------+--------------------+--------------------+-----------+-------+
|activityType|              AGENCY|        activityDate|       LEGEND|  Charge_Description|       BLOCK_ADDRESS|  City_Name|ZipCode|
+------------+--------------------+--------------------+-------------+--------------------+--------------------+-----------+-------+
|  CRIME CASE| Carlsbad Police, CA|Jan 1, 2007 12:00...|THEFT/LARCENY|GRAND THEFT:MONEY...|7100  BLOCK AVIAR...|   CARLSBAD|  92009|
|  CRIME CASE|Chula Vista Polic...|Jan 1, 2007 12:00...|        FRAUD|               FRAUD|300  BLOCK SANDST...|CHULA VISTA|  91911|
|  CRIME CASE|Chula Vista Polic...|Jan 1, 2007 12:00...|        FRAUD|               FRAUD|900  BLOCK PAPPAS...|CHULA VISTA|  91911|
|  CRIME CASE|Chula Vista Polic...|Jan 1, 2007 12:00...|THEFT/LARCENY|GRAND THEFT:MONEY...|1300  BLOCK MESA ...|CHULA VISTA|  91910|
|  CRIME CASE|Escondido Po

In [137]:
print("Number of rows: ", df_12to17.count())
print("Number of cols: ", len(df_12to17.columns))
df_12to17.show(5)

Number of rows:  1216628
Number of cols:  13
+-------------+--------------+--------+---------------+-------------------+--------------------+--------------------+----------+-------+-----------+-----------+---------------+--------------------+
|reportingYear|reportingMonth|  agency| activityStatus|       activitydate|numberActualReported|       BLOCK_ADDRESS|      city|zipCode|censusTract|censusBlock|  CrimeCategory|    CrimeDescription|
+-------------+--------------+--------+---------------+-------------------+--------------------+--------------------+----------+-------+-----------+-----------+---------------+--------------------+
|         2012|             1|CARLSBAD|OPEN - WORKABLE|Aug 26 2011 11:00AM|                   1|0  BLOCK UNKNOWN ...|  CARLSBAD|    NaN|          0|          0|  Part II Crime|               FRAUD|
|         2012|             1|CARLSBAD|OPEN - WORKABLE|Dec  1 2011  8:00AM|                   1|3100  BLOCK EL CA...|  CARLSBAD|  92010|      19803|       1024|Lar

### 1.2 Data Cleaning and Processing

Merge two DataFrames

In [138]:
# rename columns
df_07to11 = df_07to11.withColumnRenamed('activityDate', 'date')\
                     .withColumnRenamed('LEGEND', 'category')\
                     .withColumnRenamed('Charge_Description', 'description')\
                     .withColumnRenamed('City_Name', 'city')
for col in df_07to11.columns:
    df_07to11 = df_07to11.withColumnRenamed(col, col.lower())

df_12to17 = df_12to17.withColumnRenamed('activityDate', 'date')\
                     .withColumnRenamed('CrimeCategory', 'category')\
                     .withColumnRenamed('CrimeDescription', 'description')
for col in df_12to17.columns:
    df_12to17 = df_12to17.withColumnRenamed(col, col.lower())

In [139]:
# union two dataframes
cols = ["date", "category", "description", "block_address", "city", "zipcode"]
df1 = df_07to11.select(cols)
df2 = df_12to17.select(cols)
df = df1.union(df2)

Missing value

In [140]:
# count missing values of each columns
missing = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns])
print("Number of missing data per column:")
missing.show()

Number of missing data per column:
+----+--------+-----------+-------------+-----+-------+
|date|category|description|block_address| city|zipcode|
+----+--------+-----------+-------------+-----+-------+
|   3|       3|         12|         9475|16058| 110796|
+----+--------+-----------+-------------+-----+-------+



In [141]:
# drop the rows with all NaN values
df = df.filter(df.date != 'NaN')

In [142]:
# drop the rows with all location columns are NaN
df = df.filter((df.block_address != 'NaN') & (df.city != 'NaN') & (df.zipcode != 'NaN'))

In [143]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+----+--------+-----------+-------------+----+-------+
|date|category|description|block_address|city|zipcode|
+----+--------+-----------+-------------+----+-------+
|   0|       0|          6|            0|   0|      0|
+----+--------+-----------+-------------+----+-------+



Geocoding to convert address into geographic coordinates

In [144]:
# remove different version of address
df = df.withColumn('full_address', concat(df.block_address, lit(", "), df.city, lit(", CA "), df.zipcode))\
       .withColumn('full_address', regexp_replace('full_address', ' BLOCK ', ' '))\
       .withColumn('full_address', regexp_replace('full_address', '  ', ' '))\
       .withColumn('full_address', trim('full_address'))

In [145]:
df.show()

+--------------------+-------------+--------------------+--------------------+-------------+-------+--------------------+
|                date|     category|         description|       block_address|         city|zipcode|        full_address|
+--------------------+-------------+--------------------+--------------------+-------------+-------+--------------------+
|Jan 1, 2007 12:00...|THEFT/LARCENY|GRAND THEFT:MONEY...|7100  BLOCK AVIAR...|     CARLSBAD|  92009|7100 AVIARA DRIVE...|
|Jan 1, 2007 12:00...|        FRAUD|               FRAUD|300  BLOCK SANDST...|  CHULA VISTA|  91911|300 SANDSTONE COU...|
|Jan 1, 2007 12:00...|        FRAUD|               FRAUD|900  BLOCK PAPPAS...|  CHULA VISTA|  91911|900 PAPPAS COURT,...|
|Jan 1, 2007 12:00...|THEFT/LARCENY|GRAND THEFT:MONEY...|1300  BLOCK MESA ...|  CHULA VISTA|  91910|1300 MESA GRANDE ...|
|Jan 1, 2007 12:00...|        FRAUD|               FRAUD|1500  BLOCK RIMRO...|    ESCONDIDO|  92027|1500 RIMROCK DRIV...|
|Jan 1, 2007 12:00...|  

In [146]:
def my_geo(address):
  try:
    location = geocode(address, provider="nominatim").geometry.iloc[0]
    return Row(latitude = location.y, longitude = location.x)
  except:
    pass

my_schema = StructType([StructField("latitude", FloatType(), False),
                        StructField("longitude", FloatType(), False)])

address_func = udf(my_geo, my_schema)

df2 = df.withColumn("Output", explode(array(address_func(df.full_address))))
df2 = df2.select("date", "category", "description", "block_address", "city", "zipcode","full_address","Output.*")

In [147]:
# df2.show()

In [148]:
# df2.take(10)

Convert date column to datetime

In [149]:
# convert "date" column to datetime

def dynamic_date(col, frmts=("MM dd, yyyy HH:mm:ss aa", "MM/dd/yyyy HH:mm", "MMM d yyyy HH:mmaa")):
    try:
      return coalesce(*[to_timestamp(col, i) for i in frmts])
    except:
      pass

df2.withColumn("date", dynamic_date(df.date))

DataFrame[date: timestamp, category: string, description: string, block_address: string, city: string, zipcode: string, full_address: string, latitude: float, longitude: float]

Inconsistent data

### 1.3 Data Visualization

Geospatial Visualization

In [None]:
df_pd = df2.toPandas()

In [None]:
# Create a map
m = folium.Map(location=[32.817316, -117.043098], tiles='cartodbpositron', zoom_start=10)

# HeatMap(data=df_pd[['latitude', 'longitude']], radius=10).add_to(m)

df_pd['latitude'] = df_pd['latitude'].astype(float)
df_pd['longitude'] = df_pd['longitude'].astype(float)

# Filter the DF for rows, then columns, then remove NaNs
heat_df = df_pd[['latitude', 'longitude']]
heat_df = heat_df.dropna(axis=0, subset=['latitude','longitude'])

# List comprehension to make out list of lists
heat_data = [[row['latitude'],row['longitude']] for index, row in heat_df.iterrows()]

# Plot it on the map
HeatMap(heat_data).add_to(m)

# Display the map
m
