## Team members
### 1. Chetan Naveen Vadde
### 2. Vishal Singh
### 3. Reshma Punukollu

# Hackathon 03: Tumbling 8-hour Time Window
## Goal:  Which 'Category' in which 8-hour Time window are SF criminals most likely to Strike? 
## (Midnight-08AM,  08AM-4:00PM, 4:00PM-Midnight)

In [0]:
# Step 01:  Confirm have 32 CSV files for lab
display(dbutils.fs.ls("dbfs:/FileStore/tables/sfpd1/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/sfpd1/sf101,sf101,1048560,1673394799000
dbfs:/FileStore/tables/sfpd1/sf102,sf102,1048542,1673394799000
dbfs:/FileStore/tables/sfpd1/sf103,sf103,1048423,1673394802000
dbfs:/FileStore/tables/sfpd1/sf104,sf104,1048478,1673394802000
dbfs:/FileStore/tables/sfpd1/sf105,sf105,1048480,1673394805000
dbfs:/FileStore/tables/sfpd1/sf106,sf106,1048465,1673394805000
dbfs:/FileStore/tables/sfpd1/sf107,sf107,1048448,1673394808000
dbfs:/FileStore/tables/sfpd1/sf108,sf108,1048512,1673394808000
dbfs:/FileStore/tables/sfpd1/sf109,sf109,1048435,1673394811000
dbfs:/FileStore/tables/sfpd1/sf110,sf110,1048432,1673394811000


In [0]:
# Step 02: Here's the Column names. Create a Schema to be used in next Cell
# 'IncidentNum', 'Category', 'Description', 'DayOfWeek', 'Date', 'Time', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y', 'Loc', 'PdId'

from pyspark.sql.types import StructType, StructField, StringType

policeSchema =  StructType([ 
    StructField("IncidentNum",StringType(),True), 
    StructField("Category",StringType(),True), 
    StructField("Description",StringType(),True), 
    StructField("DayOfWeek", StringType(), True), 
    StructField("Date", StringType(), True), 
    StructField("Time", StringType(), True),
    StructField("PdDistrict", StringType(), True),
    StructField("Resolution", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("X", StringType(), True),
    StructField("Y", StringType(), True),
    StructField("Loc", StringType(), True),
    StructField("PdId", StringType(), True)
  ])

In [0]:
# Step 03: Read in the CSV Data along with Schema above
# Then display to confirm have populated DataFrame

df = spark.read.csv("dbfs:/FileStore/tables/sfpd1/", schema=policeSchema)
display(df)

IncidentNum,Category,Description,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Loc,PdId
150356400,VEHICLE THEFT,STOLEN AUTOMOBILE,Thursday,04/23/2015 12:00:00 AM,16:30,NORTHERN,NONE,CAPRA WY / MALLORCA WY,-122.438621320362,37.8028892454572,"(37.8028892454572, -122.438621320362)",15035640007021
150356416,BURGLARY,"BURGLARY OF STORE, FORCIBLE ENTRY",Thursday,04/23/2015 12:00:00 AM,20:00,TARAVAL,NONE,2500 Block of OCEAN AV,-122.472657880592,37.7314897634742,"(37.7314897634742, -122.472657880592)",15035641605051
150356422,VEHICLE THEFT,STOLEN AUTOMOBILE,Thursday,04/23/2015 12:00:00 AM,18:00,NORTHERN,NONE,3000 Block of LAGUNA ST,-122.430810962722,37.7993113027935,"(37.7993113027935, -122.430810962722)",15035642207021
150356450,OTHER OFFENSES,"DOG, BARKING",Friday,04/24/2015 12:00:00 AM,07:40,RICHMOND,NONE,400 Block of 10TH AV,-122.468562845833,37.7798381239123,"(37.7798381239123, -122.468562845833)",15035645064011
150356466,BURGLARY,"BURGLARY OF STORE, FORCIBLE ENTRY",Friday,04/24/2015 12:00:00 AM,03:45,MISSION,NONE,2000 Block of MISSION ST,-122.419520367886,37.764228935718,"(37.764228935718, -122.419520367886)",15035646605051
150356472,SUSPICIOUS OCC,SUSPICIOUS OCCURRENCE,Friday,04/24/2015 12:00:00 AM,01:15,INGLESIDE,NONE,500 Block of ALEMANY BL,-122.421305348396,37.7321638759455,"(37.7321638759455, -122.421305348396)",15035647264070
150356488,"SEX OFFENSES, FORCIBLE","FORCIBLE RAPE, BODILY FORCE",Thursday,04/23/2015 12:00:00 AM,02:00,TARAVAL,NONE,1300 Block of 9TH AV,-122.466313420543,37.763085116614,"(37.763085116614, -122.466313420543)",15035648802004
150356507,BURGLARY,"BURGLARY, FORCIBLE ENTRY",Thursday,04/23/2015 12:00:00 AM,23:00,TARAVAL,NONE,1900 Block of IRVING ST,-122.478742819999,37.7635454765989,"(37.7635454765989, -122.478742819999)",15035650705071
150356513,VEHICLE THEFT,STOLEN AUTOMOBILE,Thursday,04/23/2015 12:00:00 AM,18:30,RICHMOND,NONE,800 Block of 39TH AV,-122.49918056986,37.7728034161411,"(37.7728034161411, -122.49918056986)",15035651307021
150356529,ASSAULT,"BATTERY, FORMER SPOUSE OR DATING RELATIONSHIP",Friday,04/24/2015 12:00:00 AM,09:30,MISSION,"ARREST, BOOKED",2800 Block of MISSION ST,-122.418434028318,37.7514199300011,"(37.7514199300011, -122.418434028318)",15035652904138


In [0]:
# Step 04: Create following Timestamp format (IE: 2015-10-11T19:30:00) from 'Date' and 'Time' columns.
# Give this 'manufactured' column the name = 'tstamp'
# Then display to confirm 'tstamp' has correct format.  Note it is defined as 'string' data type

from pyspark.sql.functions import *

df1 = df.withColumn("tstamp",concat(from_unixtime(unix_timestamp("Date",'MM/dd/yyyy hh:mm:ss a'),'yyyy-MM-dd'),lit("T"),col("Time"),lit(":00")))

display(df1)

IncidentNum,Category,Description,DayOfWeek,Date,Time,PdDistrict,Resolution,Address,X,Y,Loc,PdId,tstamp
150356400,VEHICLE THEFT,STOLEN AUTOMOBILE,Thursday,04/23/2015 12:00:00 AM,16:30,NORTHERN,NONE,CAPRA WY / MALLORCA WY,-122.438621320362,37.8028892454572,"(37.8028892454572, -122.438621320362)",15035640007021,2015-04-23T16:30:00
150356416,BURGLARY,"BURGLARY OF STORE, FORCIBLE ENTRY",Thursday,04/23/2015 12:00:00 AM,20:00,TARAVAL,NONE,2500 Block of OCEAN AV,-122.472657880592,37.7314897634742,"(37.7314897634742, -122.472657880592)",15035641605051,2015-04-23T20:00:00
150356422,VEHICLE THEFT,STOLEN AUTOMOBILE,Thursday,04/23/2015 12:00:00 AM,18:00,NORTHERN,NONE,3000 Block of LAGUNA ST,-122.430810962722,37.7993113027935,"(37.7993113027935, -122.430810962722)",15035642207021,2015-04-23T18:00:00
150356450,OTHER OFFENSES,"DOG, BARKING",Friday,04/24/2015 12:00:00 AM,07:40,RICHMOND,NONE,400 Block of 10TH AV,-122.468562845833,37.7798381239123,"(37.7798381239123, -122.468562845833)",15035645064011,2015-04-24T07:40:00
150356466,BURGLARY,"BURGLARY OF STORE, FORCIBLE ENTRY",Friday,04/24/2015 12:00:00 AM,03:45,MISSION,NONE,2000 Block of MISSION ST,-122.419520367886,37.764228935718,"(37.764228935718, -122.419520367886)",15035646605051,2015-04-24T03:45:00
150356472,SUSPICIOUS OCC,SUSPICIOUS OCCURRENCE,Friday,04/24/2015 12:00:00 AM,01:15,INGLESIDE,NONE,500 Block of ALEMANY BL,-122.421305348396,37.7321638759455,"(37.7321638759455, -122.421305348396)",15035647264070,2015-04-24T01:15:00
150356488,"SEX OFFENSES, FORCIBLE","FORCIBLE RAPE, BODILY FORCE",Thursday,04/23/2015 12:00:00 AM,02:00,TARAVAL,NONE,1300 Block of 9TH AV,-122.466313420543,37.763085116614,"(37.763085116614, -122.466313420543)",15035648802004,2015-04-23T02:00:00
150356507,BURGLARY,"BURGLARY, FORCIBLE ENTRY",Thursday,04/23/2015 12:00:00 AM,23:00,TARAVAL,NONE,1900 Block of IRVING ST,-122.478742819999,37.7635454765989,"(37.7635454765989, -122.478742819999)",15035650705071,2015-04-23T23:00:00
150356513,VEHICLE THEFT,STOLEN AUTOMOBILE,Thursday,04/23/2015 12:00:00 AM,18:30,RICHMOND,NONE,800 Block of 39TH AV,-122.49918056986,37.7728034161411,"(37.7728034161411, -122.49918056986)",15035651307021,2015-04-23T18:30:00
150356529,ASSAULT,"BATTERY, FORMER SPOUSE OR DATING RELATIONSHIP",Friday,04/24/2015 12:00:00 AM,09:30,MISSION,"ARREST, BOOKED",2800 Block of MISSION ST,-122.418434028318,37.7514199300011,"(37.7514199300011, -122.418434028318)",15035652904138,2015-04-24T09:30:00


In [0]:
# Step 05:
# Create 'df2' with just following columns 'IncidentNum', 'Category', 'Description', 'PdDistrict', 'tstamp'
# Then manufacturer new Column = 'tstmp' and point to 'tstamp' column. Then 'cast' as 'timestamp' data type
# Then drop 'tstamp' since won't be needing it
# Display to confirm have following columns: 'IncidentNum', 'Category', 'Description', 'PdDistrict', 'tstmp'
# 'tstmp' should now have following format: (IE: 2015-10-11T19:30:00.000+0000)
# Finally write df2 to parquet or delta file format to path = 'dbfs:/FileStore/tables/sfpd2/'

df2 = df1.select('IncidentNum', 'Category', 'Description', 'PdDistrict', 'tstamp')
df2 = df2.withColumn("tstamp",col("tstamp").cast("timestamp"))

display(df2)

df2.write.parquet("dbfs:/FileStore/tables/sfpd2/hackathon3.parquet") 

IncidentNum,Category,Description,PdDistrict,tstamp
150356400,VEHICLE THEFT,STOLEN AUTOMOBILE,NORTHERN,2015-04-23T16:30:00.000+0000
150356416,BURGLARY,"BURGLARY OF STORE, FORCIBLE ENTRY",TARAVAL,2015-04-23T20:00:00.000+0000
150356422,VEHICLE THEFT,STOLEN AUTOMOBILE,NORTHERN,2015-04-23T18:00:00.000+0000
150356450,OTHER OFFENSES,"DOG, BARKING",RICHMOND,2015-04-24T07:40:00.000+0000
150356466,BURGLARY,"BURGLARY OF STORE, FORCIBLE ENTRY",MISSION,2015-04-24T03:45:00.000+0000
150356472,SUSPICIOUS OCC,SUSPICIOUS OCCURRENCE,INGLESIDE,2015-04-24T01:15:00.000+0000
150356488,"SEX OFFENSES, FORCIBLE","FORCIBLE RAPE, BODILY FORCE",TARAVAL,2015-04-23T02:00:00.000+0000
150356507,BURGLARY,"BURGLARY, FORCIBLE ENTRY",TARAVAL,2015-04-23T23:00:00.000+0000
150356513,VEHICLE THEFT,STOLEN AUTOMOBILE,RICHMOND,2015-04-23T18:30:00.000+0000
150356529,ASSAULT,"BATTERY, FORMER SPOUSE OR DATING RELATIONSHIP",MISSION,2015-04-24T09:30:00.000+0000


In [0]:
# Step 06: Create 'df3' from parquet path in Step 05
# 'printSchema' and confirm 'tstmp' column is in Timestamp data type
# Then display df3

df3 = spark.read.parquet("dbfs:/FileStore/tables/sfpd2/hackathon3.parquet") 

df3.printSchema()
display(df3)

root
 |-- IncidentNum: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- tstamp: timestamp (nullable = true)



IncidentNum,Category,Description,PdDistrict,tstamp
150184364,LARCENY/THEFT,PETTY THEFT SHOPLIFTING,NORTHERN,2015-02-28T16:53:00.000+0000
150184370,VEHICLE THEFT,STOLEN AND RECOVERED VEHICLE,SOUTHERN,2015-02-28T16:59:00.000+0000
150184370,STOLEN PROPERTY,"STOLEN PROPERTY, POSSESSION WITH KNOWLEDGE, RECEIVING",SOUTHERN,2015-02-28T16:59:00.000+0000
150184370,OTHER OFFENSES,EVADING A POLICE OFFICER RECKLESSLY,SOUTHERN,2015-02-28T16:59:00.000+0000
150184386,NON-CRIMINAL,FOUND PROPERTY,INGLESIDE,2015-02-27T03:00:00.000+0000
150184392,WARRANTS,ENROUTE TO OUTSIDE JURISDICTION,SOUTHERN,2015-02-28T16:38:00.000+0000
150184405,NON-CRIMINAL,"LICENSE PLATE, FOUND",SOUTHERN,2015-02-28T16:40:00.000+0000
150184411,"SEX OFFENSES, NON FORCIBLE",UNLAWFUL SEXUAL INTERCOURSE,INGLESIDE,2015-02-28T09:00:00.000+0000
150184411,ASSAULT,"BATTERY, FORMER SPOUSE OR DATING RELATIONSHIP",INGLESIDE,2015-02-28T09:00:00.000+0000
150184427,LARCENY/THEFT,GRAND THEFT FROM PERSON,TENDERLOIN,2015-02-28T17:15:00.000+0000


In [0]:
# Step 07: Hard-code above Schema for upcoming Streaming DataFrame

from pyspark.sql.types import TimestampType
DDL_schema = StructType([ 
    StructField("IncidentNum",StringType(),True), 
    StructField("Category",StringType(),True), 
    StructField("Description",StringType(),True), 
    StructField("PdDistrict", StringType(), True),
    StructField("tstamp", TimestampType(), True)])

In [0]:
# Step 08: 
# Create DataFrame using 'readStream' and define as follows: 
# Point to schema in above cell, want 'maxFilesPerTrigger' = 1, and point to Parquet path in Step 05

readStreamDF = (spark.readStream.schema(DDL_schema).option("maxFilesPerTrigger",1).parquet("dbfs:/FileStore/tables/sfpd2/hackathon3.parquet"))

In [0]:
# Step 09:
# Create 8-hour Tumbling Time window on 'tstmp' column. 'groupBy' = 'Category' with 'count'
from pyspark.sql.functions import *

aggrCat = readStreamDF.groupBy(readStreamDF.Category,window(readStreamDF.tstamp,"8 hour")).count()

In [0]:
# Step 10: Display 'aggrCat' to confirm you are reading the Stream (wait 30 seconds for Output to display)
# After viewing Streaming data, ensure click 'Cancel' hotlink before running the next Cell

display(aggrCat)

Category,window,count
NON-CRIMINAL,"List(2015-11-10T00:00:00.000+0000, 2015-11-10T08:00:00.000+0000)",8
ASSAULT,"List(2015-07-13T08:00:00.000+0000, 2015-07-13T16:00:00.000+0000)",10
VANDALISM,"List(2015-09-14T16:00:00.000+0000, 2015-09-15T00:00:00.000+0000)",4
DRUG/NARCOTIC,"List(2015-10-26T16:00:00.000+0000, 2015-10-27T00:00:00.000+0000)",7
NON-CRIMINAL,"List(2015-04-19T08:00:00.000+0000, 2015-04-19T16:00:00.000+0000)",21
DRUG/NARCOTIC,"List(2015-07-10T08:00:00.000+0000, 2015-07-10T16:00:00.000+0000)",3
VEHICLE THEFT,"List(2015-06-15T00:00:00.000+0000, 2015-06-15T08:00:00.000+0000)",4
DRIVING UNDER THE INFLUENCE,"List(2015-07-07T16:00:00.000+0000, 2015-07-08T00:00:00.000+0000)",1
RECOVERED VEHICLE,"List(2015-12-02T08:00:00.000+0000, 2015-12-02T16:00:00.000+0000)",1
RUNAWAY,"List(2015-07-31T16:00:00.000+0000, 2015-08-01T00:00:00.000+0000)",2


In [0]:
# Step 11: Code the 'writeStream' as follows:
# format = 'Memory', queryName = 'WinCts', outputMode = 'complete', trigger = '30 seconds'
# Don't forget to issue 'start()'
# As it runs over next 5 minutes, go to next Cell and execute 

spark.conf.set("spark.sql.shuffle.partitions", "4")

winQuery = (aggrCat.writeStream
            .format("memory")
            .queryName("WinCts")
            .outputMode("complete")
            .trigger(processingTime = "30 seconds")
            .option("checkpointLocation", "/tmp/checkpoint/")
            .start())

In [0]:
%sql
-- Step 12:
-- Every minute for next 4 minutes, run this query to see which Time Window most Crimes occur.
-- Eventually when all Committed Batches are run, the counts in Output will no longer increase.
-- # Goal: Which Police 'Category' in which 8-hour Time window are SF criminals most likely to Strike
-- (Midnight-08AM, 08AM-4:00PM, 4:00-Midnight) in the Batches that were Committed?
-- The Answer set in below window will determine your grade along with Answer you type in next Cell.

SELECT * FROM WinCts ORDER BY count DESC, window ASC

Category,window,count
LARCENY/THEFT,"List(2015-08-07T16:00:00.000+0000, 2015-08-08T00:00:00.000+0000)",93
LARCENY/THEFT,"List(2015-08-08T16:00:00.000+0000, 2015-08-09T00:00:00.000+0000)",85
LARCENY/THEFT,"List(2015-09-10T16:00:00.000+0000, 2015-09-11T00:00:00.000+0000)",74
LARCENY/THEFT,"List(2015-09-11T16:00:00.000+0000, 2015-09-12T00:00:00.000+0000)",70
LARCENY/THEFT,"List(2015-08-14T16:00:00.000+0000, 2015-08-15T00:00:00.000+0000)",66
LARCENY/THEFT,"List(2015-08-15T16:00:00.000+0000, 2015-08-16T00:00:00.000+0000)",66
LARCENY/THEFT,"List(2015-09-12T16:00:00.000+0000, 2015-09-13T00:00:00.000+0000)",65
LARCENY/THEFT,"List(2015-08-11T16:00:00.000+0000, 2015-08-12T00:00:00.000+0000)",64
LARCENY/THEFT,"List(2015-09-05T16:00:00.000+0000, 2015-09-06T00:00:00.000+0000)",64
LARCENY/THEFT,"List(2015-08-07T08:00:00.000+0000, 2015-08-07T16:00:00.000+0000)",61


In [0]:
# Step 13:
# While your waiting, go to 'Spark UI' and click on the 'Structured Streaming' button
# Based on 'Operation Batch' caption, 
# How many Batches were Committed before it stopped working?

In [0]:
# Step 14: Use below code to stop the Streaming Job after 4 minutes

winQuery.stop()

#  Deadline for email submission is 3 days
## To save this Notebook as file, do the following:
  
#####1) In gray vertical bar on left, click on 'Workspace' icon
#####2) Find notebook, and click on 'down' arrow on right side of it
#####3) Select 'Export' > 'IPython Notebook'
#####4) Click 'Save'
#####5) Upload your file to Canvas for grading
#####   along with names of any team members so I can assign grades
#####6) You will be graded on answers in Step 12

# End of Hackathon 03: Tumbling 8-hour Time Window