In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date, timedelta, datetime
import time
import pyspark.sql.functions as f

In [2]:
# initialize spark session
sc = SparkSession.builder.appName("AustinCrimeReports").config("spark.sql.shuffle.partitions", "50").config("spark.driver.maxResultSize","5g").config("spark.sql.execution.arrow.enabled", "true").getOrCreate()

In [3]:
# read dataset into dataframe
df = sc.read.option("header",True).csv("Crime_Reports.csv")
print('data read into dataframe')

data read into dataframe


In [5]:
# drop duplicate records
df = df.dropDuplicates()

# drop unnecessary columns
df.drop("council_district","apd_sector","apd_district",
               "pra","census_tract","ucr_category","category_description",
               "x_coord","y_coord","highest_offense_code","occurred_date_time",
               "report_date_time","clearance_date","report_time","occurred_time")

# filter out null values from occurred_date field
df = df.filter(df.occurred_date.isNotNull())

## Analysis

In [6]:
# number of various crimes changed over time

df_clean = df.withColumn('year',f.year(f.to_timestamp('occurred_date','MM/dd/yyyy')))

df_clean1 = df_clean.groupBy("highest_offense_description", "year").count().orderBy("year","highest_offense_description",ascending=True)

In [7]:
# number arrests corresponding to the crimes changed over time

print('Number of crimes per year')
df_clean2 = df_clean.groupBy("year").count().orderBy("year")
print()

print('Number of arrests per year')
df_clean3 = df_clean.where("clearance_status == 'C'").groupBy("year").count().orderBy("year")

Number of crimes per year

Number of arrests per year


In [8]:
# top crimes committed
df_clean4 = df_clean.groupBy('highest_offense_description').count().orderBy(desc("count"))

In [9]:
# which locations are these frequent crimes being committed to?
df_clean5 = df_clean.groupBy("location_type").count().orderBy(desc("count"))

In [10]:
# are there certain high crime locations for certain crimes?

df_clean6 = df_clean.groupBy("highest_offense_description", "location_type").count().sort("highest_offense_description", "location_type")

In [11]:
# how has the number of certain crimes changed over the years in Austin?

# car burglary
print('Amount of Car Burglaries per year')
df_clean7 = df_clean.where("highest_offense_description == 'BURGLARY OF VEHICLE'").groupBy("year").count().orderBy("year")
print()

# theft
print('Amount of Theft per year')
df_clean8 = df_clean.where("highest_offense_description == 'THEFT'").groupBy("year").count().orderBy("year")
print()

# possession of marijuana
print('Possession of Marijuana per year')
df_clean9 = df_clean.where("highest_offense_description == 'POSSESSION OF MARIJUANA'").groupBy("year").count().orderBy("year")
print()

# home burglary
print('Amount of Home Burglaries per year')
df_clean10 = df_clean.where("highest_offense_description == 'BURGLARY OF RESIDENCE'").groupBy("year").count().orderBy("year")

Amount of Car Burglaries per year

Amount of Theft per year

Possession of Marijuana per year

Amount of Home Burglaries per year


In [24]:
# save each query as a csv file for visualizations

# df_clean1.toPandas().to_csv("results/1-crime-change-per-year.csv")
# df_clean2.toPandas().to_csv("results/2-crimes-per-year.csv")
# df_clean3.toPandas().to_csv("results/3-arrests-per-year.csv")
# df_clean4.toPandas().to_csv("results/4-top-crimes.csv")
# df_clean5.toPandas().to_csv("results/5-top-locations.csv")
# df_clean6.toPandas().to_csv("results/6-crimes-locations.csv")
# df_clean7.toPandas().to_csv("results/7-car-robberies-per-year.csv")
# df_clean8.toPandas().to_csv("results/8-theft-per-year.csv")
# df_clean9.toPandas().to_csv("results/9-weed-per-year.csv")
# df_clean10.toPandas().to_csv("results/10-home-robberies-per-year.csv")