In [0]:
from pyspark.sql.types import *
from pyspark.sql import *

## This is the schema of the Seattle Real Time Fire 911 Calls API
jsonSchema = StructType([
    StructField(":@computed_region_2days_rhn5", StringType(), True),
    StructField(":@computed_region_cyqu_gs94", StringType(), True),
    StructField(":@computed_region_kuhn_3gp2", StringType(), True),
    StructField(":@computed_region_q256_3sug", StringType(), True),
    StructField(":@computed_region_ru88_fbhk", StringType(), True),
    StructField("address", StringType(), True),
    StructField("datetime", StringType(), True),
    StructField("incident_number", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),

    StructField("report_location",
                StructType([
                    StructField("coordinates", ArrayType(DoubleType(), True), True),
                    StructField("type", StringType(), True)
                ]), True),
    StructField("type", StringType(), True)
])

inputPath = '/tmp/data/sfd'

In [0]:
from pyspark.sql.functions import *

# This reads the files from the file location using the schema
staticInputDF = (
    spark
        .read
        .schema(jsonSchema)
        .json(inputPath)
)
# A dataframe has been created

In [0]:
display(staticInputDF)

:@computed_region_2days_rhn5,:@computed_region_cyqu_gs94,:@computed_region_kuhn_3gp2,:@computed_region_q256_3sug,:@computed_region_ru88_fbhk,address,datetime,incident_number,latitude,longitude,report_location,type
,,22.0,18379.0,14.0,5th Ave / Terrace St,2023-12-03T12:39:00.000,F230159229,47.602272,-122.327883,"List(List(-122.327883, 47.602272), Point)",Aid Response
,,49.0,18795.0,2.0,5624 Sw Admiral Way,2023-12-03T12:39:00.000,F230159228,47.577094,-122.404279,"List(List(-122.404279, 47.577094), Point)",Aid Response
,,40.0,18390.0,50.0,805 4th Ave N,2023-12-03T12:38:00.000,F230159227,47.626322,-122.348863,"List(List(-122.348863, 47.626322), Point)",Low Acuity Response
,,4.0,18789.0,8.0,525 14th Ave E,2023-12-03T12:33:00.000,F230159225,47.623391,-122.314649,"List(List(-122.314649, 47.623391), Point)",Alarm Bell
,,24.0,18081.0,14.0,1521 3rd Ave,2023-12-03T12:25:00.000,F230159222,47.609953,-122.337979,"List(List(-122.337979, 47.609953), Point)",Medic Response- Overdose
,,42.0,17919.0,38.0,3720 39th Ave S,2023-12-03T12:25:00.000,F230159224,47.570494,-122.283943,"List(List(-122.283943, 47.570494), Point)",Medic Response
,,47.0,18383.0,60.0,5255 15th Ave Ne,2023-12-03T12:20:00.000,F230159221,47.666813,-122.31191,"List(List(-122.31191, 47.666813), Point)",Auto Fire Alarm
,1.0,33.0,19582.0,46.0,14115 Aurora Ave N,2023-12-03T12:18:00.000,F230159220,47.731129,-122.345064,"List(List(-122.345064, 47.731129), Point)",Aid Response
,,41.0,17919.0,42.0,2213 16th Ave S,2023-12-03T12:10:00.000,F230159219,47.583596,-122.31208,"List(List(-122.31208, 47.583596), Point)",Aid Response
,,12.0,18379.0,19.0,801 9th Ave,2023-12-03T12:08:00.000,F230159218,47.606982,-122.326056,"List(List(-122.326056, 47.606982), Point)",Nurseline/AMR


In [0]:
from pyspark.sql.functions import * 

# Static aggregation
StaticCountsDF = (
    staticInputDF
    .groupBy(
        staticInputDF.type,
        window(staticInputDF.datetime, "1 hour")
    )
    .count()
)
StaticCountsDF.cache()

StaticCountsDF.createOrReplaceTempView("static_counts")
# static_counts is a temporary sql table

In [0]:
%sql
-- Agregation results
select type, sum(count) as total_count from static_counts group by type

-- This is a static representation, but we want it to be updated in real-time


type,total_count
Triaged Incident,100
Aid Response,896
Rubbish Fire,36
Auto Fire Alarm,161
1RED 1 Unit,10
Activated CO Detector,10
Low Acuity Response,70
Medic Response,174
EVENT - Special Event,12
Alarm Bell,30


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import *

StreamingInputDF = (
    spark
    .readStream # converts static read to streaming read
    .schema(jsonSchema)
    .option("maxFilesPerTrigger", 1) # picking one file at a time, treats as seperate batch
    .json(inputPath)
)

StreamingCountsDF = (
    StreamingInputDF
    .groupBy(
        StreamingInputDF.type,
        window(StreamingInputDF.datetime, "1 hour")
    )
    .count()
)

StreamingAreaDF = (
    StreamingInputDF
    .groupBy(
        StreamingInputDF[':@computed_region_kuhn_3gp2'],
        window(StreamingInputDF.datetime, "1 hour")
    )
    .count()
)

In [0]:
## Checking whether StreamingCountsDF is a stream
StreamingCountsDF.isStreaming

True

In [0]:
## Now, we need to stream to memory
sfdquery = (
    StreamingCountsDF
    .writeStream
    .format("memory")
    .queryName("counts")
    .outputMode("complete")
    .start()
)
sfdquery2 = (
    StreamingAreaDF
    .writeStream
    .format("memory")
    .queryName("counts2")
    .outputMode("complete")
    .start()
)

In [0]:
%sql
EXPLAIN EXTENDED select type, date_format(window.end, "yyyy-MM-dd HH:mm") as time, count from counts order by date_format(window.end, "yyyy-MM-dd HH:mm");

plan
"== Parsed Logical Plan == 'Sort ['date_format('window.end, yyyy-MM-dd HH:mm) ASC NULLS FIRST], true +- 'Project ['type, 'date_format('window.end, yyyy-MM-dd HH:mm) AS time#79799, 'count]  +- 'UnresolvedRelation [counts], [], false == Analyzed Logical Plan == type: string, time: string, count: bigint Project [type#58980, time#79799, count#58982L] +- Sort [date_format(window#58981.end, yyyy-MM-dd HH:mm, Some(Etc/UTC)) ASC NULLS FIRST], true  +- Project [type#58980, date_format(window#58981.end, yyyy-MM-dd HH:mm, Some(Etc/UTC)) AS time#79799, count#58982L, window#58981]  +- SubqueryAlias counts  +- View (`counts`, [type#58980,window#58981,count#58982L])  +- MemoryPlan MemorySink, [type#58980, window#58981, count#58982L] == Optimized Logical Plan == Project [type#58980, time#79799, count#58982L] +- Sort [date_format(_extract_end#79816, yyyy-MM-dd HH:mm, Some(Etc/UTC)) ASC NULLS FIRST], true  +- Project [type#58980, date_format(window#58981.end, yyyy-MM-dd HH:mm, Some(Etc/UTC)) AS time#79799, count#58982L, window#58981.end AS _extract_end#79816]  +- MemoryPlan MemorySink, [type#58980, window#58981, count#58982L] == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [type#58980, time#79799, count#58982L]  +- Sort [date_format(_extract_end#79816, yyyy-MM-dd HH:mm, Some(Etc/UTC)) ASC NULLS FIRST], true, 0  +- Exchange rangepartitioning(date_format(_extract_end#79816, yyyy-MM-dd HH:mm, Some(Etc/UTC)) ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=15729]  +- Project [type#58980, date_format(window#58981.end, yyyy-MM-dd HH:mm, Some(Etc/UTC)) AS time#79799, count#58982L, window#58981.end AS _extract_end#79816]  +- LocalTableScan [type#58980, window#58981, count#58982L]"


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
select * from counts2;
select date_format(window.end, "yyyy-MM-dd HH:mm") as time, count from counts order by date_format(window.end, "yyyy-MM-dd HH:mm");

:@computed_region_kuhn_3gp2,window,count
38.0,"List(2023-12-01T00:00:00Z, 2023-12-01T01:00:00Z)",5
46.0,"List(2023-12-02T03:00:00Z, 2023-12-02T04:00:00Z)",5
24.0,"List(2023-12-01T18:00:00Z, 2023-12-01T19:00:00Z)",5
38.0,"List(2023-12-01T18:00:00Z, 2023-12-01T19:00:00Z)",5
41.0,"List(2023-12-03T13:00:00Z, 2023-12-03T14:00:00Z)",4
34.0,"List(2023-12-02T22:00:00Z, 2023-12-02T23:00:00Z)",10
2.0,"List(2023-12-01T18:00:00Z, 2023-12-01T19:00:00Z)",5
35.0,"List(2023-12-02T15:00:00Z, 2023-12-02T16:00:00Z)",5
7.0,"List(2023-12-01T06:00:00Z, 2023-12-01T07:00:00Z)",10
21.0,"List(2023-12-01T17:00:00Z, 2023-12-01T18:00:00Z)",5
