In [1]:
import logging
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as psf

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 pyspark-shell'

In [3]:
schema = StructType([
    StructField("crime_id", StringType(), True),
    StructField("original_crime_type_name", StringType(), True),
    StructField("report_date", StringType(), True),
    StructField("call_date", StringType(), True),
    StructField("offense_date", StringType(), True),
    StructField("call_time", StringType(), True),
    StructField("call_date_time", StringType(), True),
    StructField("disposition", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("agency_id", StringType(), True),
    StructField("address_type", StringType(), True),
    StructField("common_location", StringType(), True)
])

In [4]:
logger = logging.getLogger(__name__)

In [5]:
try:
    print(spark)
except:
    print("spark variable is undefined")

spark variable is undefined


In [6]:
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("KafkaSparkStructuredStreaming") \
        .getOrCreate()

logger.info("Spark started")
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f68a0123c18>


In [7]:
# TODO Create Spark Configuration
# Create Spark configurations with max offset of 200 per trigger
# set up correct bootstrap server and port
df = spark \
        .readStream \
        .format("kafka") \
        .option("maxOffsetsPerTrigger", 200) \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "com.crime.police.calls") \
        .option("startingOffsets", "earliest") \
        .option("stopGracefullyOnShutdown", "true") \
        .load()
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
# TODO extract the correct column from the kafka input resources
# Take only value and convert it to String
kafka_df = df.selectExpr("CAST(value AS STRING)")
kafka_df.printSchema()

root
 |-- value: string (nullable = true)



In [9]:
service_table = kafka_df\
    .select(psf.from_json(psf.col('value'), schema).alias("CALLS_TOPICS"))\
    .select("CALLS_TOPICS.*")
service_table.printSchema()

root
 |-- crime_id: string (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- offense_date: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- call_date_time: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- agency_id: string (nullable = true)
 |-- address_type: string (nullable = true)
 |-- common_location: string (nullable = true)



In [10]:
# TODO select original_crime_type_name and disposition
distinct_table = service_table\
    .selectExpr( \
        "original_crime_type_name", \
        "disposition", \
        "to_timestamp(call_date_time) as call_date_time" \
    ) \
    .withWatermark("call_date_time", "60 minutes")
distinct_table.printSchema()

root
 |-- original_crime_type_name: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- call_date_time: timestamp (nullable = true)



In [11]:
# count the number of original crime type
agg_df = distinct_table\
    .withWatermark("call_date_time", "60 minutes") \
    .groupBy( \
        psf.window(distinct_table.call_date_time, "60 minutes", "10 minutes"), \
        distinct_table.original_crime_type_name, \
        distinct_table.disposition \
    ).count()
agg_df.printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- count: long (nullable = false)



In [12]:
# TODO Q1. Submit a screen shot of a batch ingestion of the aggregation
# TODO write output stream
query = agg_df \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()
print(query)

# TODO attach a ProgressReporter
# query.awaitTermination()

<pyspark.sql.streaming.StreamingQuery object at 0x7f68a0133978>


In [13]:
# TODO get the right radio code json path
# TODO rename disposition_code column to disposition
radio_code_json_filepath = "radio_code.json"
radio_code_df = spark.read.json(radio_code_json_filepath, multiLine=True).withColumnRenamed("disposition_code", "disposition")
radio_code_df.printSchema()

# clean up your data so that the column names match on radio_code_df and agg_df
# we will want to join on the disposition code

root
 |-- description: string (nullable = true)
 |-- disposition: string (nullable = true)



In [21]:
# TODO join on disposition column
join_df = agg_df.join(radio_code_df, "disposition")
join_df.printSchema()

root
 |-- disposition: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- count: long (nullable = false)
 |-- description: string (nullable = true)



In [19]:
join_query = join_df\
    .writeStream \
    .format("console") \
    .queryName("join") \
    .start()
print(join_query)
# join_query.awaitTermination()

<pyspark.sql.streaming.StreamingQuery object at 0x7f68a0123eb8>


In [20]:
spark.stop()
print("spark stopped")

spark stopped
