In [38]:
import logging
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as psf

In [39]:
# TODO Create a schema for incoming resources
schema = StructType([
    StructField("crime_id",StringType(),True),
    StructField("original_crime_type_name",StringType(),True),
    StructField("report_date",StringType(),True),
    StructField("call_date",StringType(),True),
    StructField("offense_date",StringType(),True),
    StructField("call_time",StringType(),True),
    StructField("call_date_time",StringType(),True),
    StructField("disposition",StringType(),True),
    StructField("address",StringType(),True),
    StructField("city",StringType(),True),
    StructField("state",StringType(),True),
    StructField("agency_id",StringType(),True),
    StructField("address_type",StringType(),True),
    StructField("common_location",StringType(),True)
])

In [40]:
# TODO Create Spark in Standalone mode
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("KafkaSparkStructuredStreaming") \
    .getOrCreate()

In [41]:
df = spark.read.json("police-department-calls-for-service.json",multiLine=True,schema=schema)

In [42]:
df.printSchema()

root
 |-- crime_id: string (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- offense_date: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- call_date_time: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- agency_id: string (nullable = true)
 |-- address_type: string (nullable = true)
 |-- common_location: string (nullable = true)



In [43]:
df.show(5)

+---------+------------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+--------------------+-------------+-----+---------+---------------+--------------------+
| crime_id|original_crime_type_name|         report_date|           call_date|        offense_date|call_time|      call_date_time|disposition|             address|         city|state|agency_id|   address_type|     common_location|
+---------+------------------------+--------------------+--------------------+--------------------+---------+--------------------+-----------+--------------------+-------------+-----+---------+---------------+--------------------+
|183653763|            Traffic Stop|2018-12-31T00:00:...|2018-12-31T00:00:...|2018-12-31T00:00:...|    23:57|2018-12-31T23:57:...|        ADM|Geary Bl/divisade...|San Francisco|   CA|        1|   Intersection|                    |
|183653756|     Traf Violation Cite|2018-12-31T00:00:...|2018-12-31T00:00:..

In [44]:
    # TODO select original_crime_type_name and disposition
distinct_table = df \
    .select(
    psf.to_timestamp(psf.col("call_date_time")).alias("call_date_time"),
    psf.col("original_crime_type_name"),
    psf.col("disposition")
)

In [45]:
distinct_table.show()

+-------------------+------------------------+------------+
|     call_date_time|original_crime_type_name| disposition|
+-------------------+------------------------+------------+
|2018-12-31 23:57:00|            Traffic Stop|         ADM|
|2018-12-31 23:54:00|     Traf Violation Cite|         CIT|
|2018-12-31 23:49:00|            Passing Call|         HAN|
|2018-12-31 23:47:00|           Audible Alarm|         PAS|
|2018-12-31 23:46:00|            Traffic Stop|         CIT|
|2018-12-31 23:38:00|            Passing Call|          ND|
|2018-12-31 23:38:00|            Traffic Stop|         CIT|
|2018-12-31 23:36:00|            Traffic Stop|         HAN|
|2018-12-31 23:34:00|            Passing Call|Not recorded|
|2018-12-31 23:33:00|         Fight No Weapon|         GOA|
|2018-12-31 23:33:00|            Traffic Stop|         HAN|
|2018-12-31 23:33:00|            Passing Call|         HAN|
|2018-12-31 23:33:00|            Passing Call|         HAN|
|2018-12-31 23:30:00|            Traffic

In [49]:
# count the number of original crime type
agg_df = distinct_table \
    .select(
    distinct_table.call_date_time,
    distinct_table.original_crime_type_name,
    distinct_table.disposition
) \
    .withWatermark("call_date_time", "60 minutes") \
    .groupBy(
    psf.window(distinct_table.call_date_time, "10 minutes","5 minutes"),
    psf.col("original_crime_type_name")
) \
    .count()

In [50]:
agg_df.show(100)

+--------------------+------------------------+-----+
|              window|original_crime_type_name|count|
+--------------------+------------------------+-----+
|[2018-12-31 22:30...|            Passing Call|    2|
|[2018-12-31 18:30...|            Passing Call|    2|
|[2018-12-31 18:20...|                    Poss|    1|
|[2018-12-31 17:50...|                   Fraud|    1|
|[2018-12-31 17:25...|        Well Being Check|    1|
|[2018-12-31 15:40...|                     916|    1|
|[2018-12-31 14:10...|             Petty Theft|    1|
|[2018-12-31 13:15...|            Passing Call|    5|
|[2018-12-31 11:45...|              Trespasser|    1|
|[2018-12-31 10:10...|                      Lp|    1|
|[2018-12-31 03:25...|               Vandalism|    1|
|[2018-12-30 21:35...|                 518 586|    1|
|[2018-12-30 20:05...|                      Jo|    1|
|[2018-12-30 20:00...|                Jo / Bat|    1|
|[2018-12-30 19:25...|    Threats / Harassment|    1|
|[2018-12-30 18:05...|      