## Data Wrangling using Spark and Data from San Francisco Police Department

The data we will be working with was made publicly available as part of the Open Data Initiative from San Francisco City.
It encompass data from police incident reports from 2018 till today. The reason this dataset was picked up in particular
was because it also contains information about location which is one of the aspects we would like to explore using the
visualisation techniques available in Python.  In particular the use of heatmaps.

[San Francisco Police Reports](https://data.sfgov.org/Public-Safety/Police-Department-Incident-Reports-2018-to-Present/wg3w-h783)

For convenience the data has been downloaded locally and stored in CSV format.

In [1]:
# Police_Department_Incident_Reports_2018_to_Present.csv
# D:\datascience\solve_nleq\data
from os import path
np = path.normpath('D:\datascience\solve_nleq\data\Police_Department_Incident_Reports_2018_to_Present.csv')
print(np)

D:\datascience\solve_nleq\data\Police_Department_Incident_Reports_2018_to_Present.csv


At the end as I will show below it was not a very good idea to read data as CSV and I end
up using JSON which apparently has better support for types rather than delaying the type
inferrement to the CSV reader.

The LINK to the JSON API is provided: 
https://data.sfgov.org/resource/wg3w-h783.json

The API Documentation is available at the following location:
https://dev.socrata.com/foundry/data.sfgov.org/wg3w-h783

## Creating a Spark Session

First we import Spark and other necessary modules, create the Spark Session and proceed to load the data.

In [6]:
# To initalize Spark locally we make use of findspark
import findspark
# In case the SPARK_HOME environment variable has not been defined we need to explicitly pass that as an argument
#findspark.init('D:\spark\spark-2.4.4-bin-hadoop2.7')
findspark.init()

Proceed to create the session and name it or get it in case there is already an existing session available. 

In [2]:
import pyspark

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

number_cores = 4
memory_gb = 4
conf = (
    pyspark.SparkConf()
        .setMaster('local[{}]'.format(number_cores))
        .set('spark.driver.memory', '{}g'.format(memory_gb))
)
#sc = pyspark.SparkContext(conf=conf)

#session = SparkSession.builder.appName("Mapping Crime in San Francisco").getOrCreate()

builder = SparkSession.builder\
    .master('local[{}]'.format(number_cores))\
    .appName('mapping-crime-in-san-francisco')

builder.config('spark.driver.memory', '{}g'.format(memory_gb))
    
session = builder.getOrCreate()

Displaying session parameters. If session creation step above did not succeed an error will be triggered at this point.

In [3]:
#sc.getConf().getAll()
session.sparkContext.getConf().getAll()

[('spark.master', 'local[4]'),
 ('spark.app.id', 'local-1573134285418'),
 ('spark.driver.memory', '4g'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'mapping-crime-in-san-francisco'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.host', '172.16.180.180'),
 ('spark.driver.port', '56288')]

Now we are ready to read the data.  First we were reading CSV, but the JSON prove to be more convenient. Notice that in the Open Data website is also possible to download the data in a 
format called GeoJSON which is a superset of JSON. Is important not to confuse them both. 

I taught otherwise and I end up stuck with a format that altought being parsed by the JSON reader is largely incompatible with the rest of Spark data manipulation model. 

If you are curious about the differences between JSON and Geo JSON I would reccommend you the following resuource. It's from there that I learned GeoJSON is a superset of JSON.
[JSON vs GeoJSON](https://github.com/earthcubeprojects-chords/chords/wiki/JSON-vs-GeoJSON)

In [8]:
data_path = "data/Police_Department_Incident_Reports_2018_to_Present.csv"
crime_reports = session.read.csv(data_path)

data_path_json = "data/wg3w-h783.json"
crime_reports_json = session.read.json(data_path_json)

## Familiarizing with the Data

As a first step to get familiar with the data we print the schema to see which fields are there.

In [66]:
crime_reports.printSchema()
crime_reports.count()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (nullable = tru

278807

In [10]:
crime_reports_json.printSchema()

root
 |-- _corrupt_record: string (nullable = true)



Hmm.. So what happened?  Well I learned once again that although this format is mostly JSON, it is not strict JSON but JSONL.  A format created as a replacement to CSV.  If interested about the difference I included a reference below.

https://stackoverflow.com/questions/38895057/reading-json-with-apache-spark-corrupt-record

What we need instead is to provide the JSON reader with a file where every single line is a proper JSON object. 

Let's begin by loading a JSON file, where each line is a JSON object:

In [11]:
crime_reports_jsonl = session.read.option("multiline", "true").json(data_path_json)

In [12]:
crime_reports_jsonl.printSchema()

root
 |-- :@computed_region_26cr_cadq: string (nullable = true)
 |-- :@computed_region_2dwj_jsy4: string (nullable = true)
 |-- :@computed_region_6qbp_sg9q: string (nullable = true)
 |-- :@computed_region_ajp5_b2md: string (nullable = true)
 |-- :@computed_region_h4ep_8xdi: string (nullable = true)
 |-- :@computed_region_jg9y_a9du: string (nullable = true)
 |-- :@computed_region_nqbw_i6c3: string (nullable = true)
 |-- :@computed_region_qgnn_b9vv: string (nullable = true)
 |-- :@computed_region_y6ts_4iup: string (nullable = true)
 |-- analysis_neighborhood: string (nullable = true)
 |-- cad_number: string (nullable = true)
 |-- cnn: string (nullable = true)
 |-- filed_online: boolean (nullable = true)
 |-- incident_category: string (nullable = true)
 |-- incident_code: string (nullable = true)
 |-- incident_date: string (nullable = true)
 |-- incident_datetime: string (nullable = true)
 |-- incident_day_of_week: string (nullable = true)
 |-- incident_description: string (nullable = tru

In [13]:
crime_reports_jsonl.describe()

DataFrame[summary: string, :@computed_region_26cr_cadq: string, :@computed_region_2dwj_jsy4: string, :@computed_region_6qbp_sg9q: string, :@computed_region_ajp5_b2md: string, :@computed_region_h4ep_8xdi: string, :@computed_region_jg9y_a9du: string, :@computed_region_nqbw_i6c3: string, :@computed_region_qgnn_b9vv: string, :@computed_region_y6ts_4iup: string, analysis_neighborhood: string, cad_number: string, cnn: string, incident_category: string, incident_code: string, incident_date: string, incident_datetime: string, incident_day_of_week: string, incident_description: string, incident_id: string, incident_number: string, incident_subcategory: string, incident_time: string, incident_year: string, intersection: string, latitude: string, longitude: string, police_district: string, report_datetime: string, report_type_code: string, report_type_description: string, resolution: string, row_id: string, supervisor_district: string]

In [14]:
crime_reports_jsonl.show(n=1)

+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------+----------+----+------------+-----------------+-------------+--------------------+--------------------+--------------------+--------------------+-----------+---------------+--------------------+-------------+-------------+------------+--------+---------+-----+---------------+--------------------+----------------+-----------------------+--------------+-----------+-------------------+
|:@computed_region_26cr_cadq|:@computed_region_2dwj_jsy4|:@computed_region_6qbp_sg9q|:@computed_region_ajp5_b2md|:@computed_region_h4ep_8xdi|:@computed_region_jg9y_a9du|:@computed_region_nqbw_i6c3|:@computed_region_qgnn_b9vv|:@computed_region_y6ts_4iup|analysis_neighborhood|cad_number| cnn|filed_online|incident_category|incident_code|   

In [15]:
crime_reports_jsonl.take(3) 

[Row(:@computed_region_26cr_cadq=None, :@computed_region_2dwj_jsy4=None, :@computed_region_6qbp_sg9q=None, :@computed_region_ajp5_b2md=None, :@computed_region_h4ep_8xdi=None, :@computed_region_jg9y_a9du=None, :@computed_region_nqbw_i6c3=None, :@computed_region_qgnn_b9vv=None, :@computed_region_y6ts_4iup=None, analysis_neighborhood=None, cad_number=None, cnn=None, filed_online=True, incident_category='Larceny Theft', incident_code='06374', incident_date='2019-08-15T00:00:00.000', incident_datetime='2019-08-15T11:41:00.000', incident_day_of_week='Thursday', incident_description='Theft, Other Property, >$950', incident_id='854240', incident_number='196208089', incident_subcategory='Larceny Theft - Other', incident_time='11:41', incident_year='2019', intersection=None, latitude=None, longitude=None, point=None, police_district='Central', report_datetime='2019-10-01T14:06:00.000', report_type_code='II', report_type_description='Coplogic Initial', resolution='Open or Active', row_id='8542400

In [17]:
crime_reports_jsonl.count()

1000

In [19]:
crime_reports_jsonl.describe().show()

+-------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------+-------------------+--------------------+-----------------+-----------------+--------------------+--------------------+--------------------+--------------------+------------------+----------------+--------------------+-------------+------------------+--------------------+-------------------+-------------------+---------------+--------------------+----------------+-----------------------+--------------+-------------------+-------------------+
|summary|:@computed_region_26cr_cadq|:@computed_region_2dwj_jsy4|:@computed_region_6qbp_sg9q|:@computed_region_ajp5_b2md|:@computed_region_h4ep_8xdi|:@computed_region_jg9y_a9du|:@computed_region_nqbw_i6c3|:@computed_region_qgnn_b9vv|:@computed_region_y6ts_4iup|analysis_n

In [23]:
crime_reports_jsonl.describe("resolution", "incident_category").show()

+-------+--------------+-----------------+
|summary|    resolution|incident_category|
+-------+--------------+-----------------+
|  count|          1000|             1000|
|   mean|          null|             null|
| stddev|          null|             null|
|    min|Open or Active|            Arson|
|    max|     Unfounded|  Weapons Offense|
+-------+--------------+-----------------+



In [24]:
crime_reports_jsonl.describe("incident_year", "incident_time").show()

+-------+------------------+-------------+
|summary|     incident_year|incident_time|
+-------+------------------+-------------+
|  count|              1000|         1000|
|   mean|          2018.985|         null|
| stddev|0.1216132796605237|         null|
|    min|              2018|        00:00|
|    max|              2019|        23:45|
+-------+------------------+-------------+



## Selecting a Subset of the Data for Processing

In this dataset there are some fields are more interesting for me to analyze than others. Also in order to enhace and reduce the amount of data being processed it is advised to create a dataframe from just the data we would like to process.

In [67]:
incidents = crime_reports_jsonl.select(["resolution","incident_category", "incident_time"])

In [42]:
# Displaying rows in JSON list format
incidents.take(5)

[Row(resolution='Open or Active', incident_category='Larceny Theft', incident_time='11:41'),
 Row(resolution='Open or Active', incident_category='Larceny Theft', incident_time='22:00'),
 Row(resolution='Open or Active', incident_category='Robbery', incident_time='14:25'),
 Row(resolution='Open or Active', incident_category='Larceny Theft', incident_time='19:30'),
 Row(resolution='Open or Active', incident_category='Non-Criminal', incident_time='16:53')]

In [43]:
# Displaying incidents in tabular format
incidents.show(n=5)

+--------------+-----------------+-------------+
|    resolution|incident_category|incident_time|
+--------------+-----------------+-------------+
|Open or Active|    Larceny Theft|        11:41|
|Open or Active|    Larceny Theft|        22:00|
|Open or Active|          Robbery|        14:25|
|Open or Active|    Larceny Theft|        19:30|
|Open or Active|     Non-Criminal|        16:53|
+--------------+-----------------+-------------+
only showing top 5 rows



In [68]:
incidents.head()

Row(resolution='Open or Active', incident_category='Larceny Theft', incident_time='11:41')

## Calculating Crime Statistics by Hour of the Day

A first simple visualization task would be to generate crime statistics depending on the time of the day.  Sort of finding out what time of the day is associated with more criminal activity.   I choosed this task because I tought it could be something useful to know as well as give me more opportunity to familiarize with the processing of data in Spark. In particular the use of lambda functions to produce data mappings and filtering results.

First start by defining a function to extract the hour of the day from the incident_time field

In [69]:
import datetime
from pyspark.sql.functions import udf

get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
conv_time = udf(lambda x: datetime.datetime.strptime(x, '%H:%M'). hour)

In [64]:
incidents.head()

Row(resolution='Open or Active', incident_category='Larceny Theft', incident_time='11:41')

In [70]:
incidents = incidents.withColumn("hour", conv_time(incidents.incident_time))
incidents.head()

Row(resolution='Open or Active', incident_category='Larceny Theft', incident_time='11:41', hour='11')

In [71]:
incidents_in_hour = incidents.groupby(incidents.hour).count().orderBy(incidents.hour.cast("float"))
incidents_in_hour.show()

+----+-----+
|hour|count|
+----+-----+
|   0|   54|
|   1|   38|
|   2|   23|
|   3|    7|
|   4|   10|
|   5|   20|
|   6|   15|
|   7|   17|
|   8|   29|
|   9|   38|
|  10|   41|
|  11|   53|
|  12|   56|
|  13|   46|
|  14|   60|
|  15|   61|
|  16|   42|
|  17|   58|
|  18|   90|
|  19|   66|
+----+-----+
only showing top 20 rows

