# Set up Spark and create Session

In [1]:
import findspark
import os
os.environ['SPARK_HOME'] = '/opt/spark'
os.environ['PATH'] = os.environ['SPARK_HOME'] + ':' + os.environ['PATH']
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [28]:
from pyspark.sql.functions import to_timestamp, col, lit
import pyspark.sql.functions as F

In [103]:
# Read Crime data

In [4]:
df = spark.read.csv('./data.csv', header=True).withColumn('Date',to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))

In [5]:
df.count()

6752727

In [8]:
df.show(3)

+--------+-----------+-------------------+------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|             Block|IUCR|       Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11034701|   JA366925|2001-01-01 11:00:00|   016XX E 86TH PL|1153| DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|           RESIDENCE| false|   false|0412|     004|   8|            45|      11|        n

In [17]:
pdf = df.limit(100).toPandas()

In [18]:
pdf.head()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11034701,JA366925,2001-01-01 11:00:00,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,8,45,11,,,2001,08/05/2017 03:50:08 PM,,,
1,11227287,JB147188,2017-10-08 03:00:00,092XX S RACINE AVE,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,RESIDENCE,False,False,...,21,73,2,,,2017,02/11/2018 03:57:41 PM,,,
2,11227583,JB147595,2017-03-28 14:00:00,026XX W 79TH ST,620,BURGLARY,UNLAWFUL ENTRY,OTHER,False,False,...,18,70,5,,,2017,02/11/2018 03:57:41 PM,,,
3,11227293,JB147230,2017-09-09 20:17:00,060XX S EBERHART AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,20,42,6,,,2017,02/11/2018 03:57:41 PM,,,
4,11227634,JB147599,2017-08-26 10:00:00,001XX W RANDOLPH ST,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,HOTEL/MOTEL,False,False,...,42,32,2,,,2017,02/11/2018 03:57:41 PM,,,


# Get columns of Crime, True/False was Arrested and Count with groupby

In [22]:
df2 = df.groupBy('Primary Type', 'Arrest').count()

In [23]:
df2.printSchema()

root
 |-- Primary Type: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- count: long (nullable = false)



In [24]:
df2 = df2.orderBy('Primary Type')

In [25]:
df2.show()

+--------------------+------+------+
|        Primary Type|Arrest| count|
+--------------------+------+------+
|               ARSON|  true|  1463|
|               ARSON| false|  9693|
|             ASSAULT|  true| 96828|
|             ASSAULT| false|321674|
|             BATTERY| false|951209|
|             BATTERY|  true|280938|
|            BURGLARY|  true| 22407|
|            BURGLARY| false|365616|
|CONCEALED CARRY L...|  true|   269|
|CONCEALED CARRY L...| false|    14|
| CRIM SEXUAL ASSAULT| false| 22634|
| CRIM SEXUAL ASSAULT|  true|  4361|
|     CRIMINAL DAMAGE| false|716956|
|     CRIMINAL DAMAGE|  true| 54498|
|CRIMINAL SEXUAL A...| false|   373|
|CRIMINAL SEXUAL A...|  true|    64|
|   CRIMINAL TRESPASS| false| 51736|
|   CRIMINAL TRESPASS|  true|141632|
|  DECEPTIVE PRACTICE| false|221458|
|  DECEPTIVE PRACTICE|  true| 44279|
+--------------------+------+------+
only showing top 20 rows



# Get a df counting total number of crime incidents for each type

In [37]:
#df3 = df2.groupBy('Primary Type').agg({'count': 'min'})
df3 = df2.groupBy('Primary Type').agg(F.sum(col('count')))

# Left join incident count with arrest count

In [44]:
df4 = df3.join(df2.filter(col('Arrest') == 'true'), how='left', on='Primary Type').select('Primary Type', 'sum(count)', 'count')

In [46]:
df4 = df4.orderBy('Primary Type')
df4.show()

+--------------------+----------+------+
|        Primary Type|sum(count)| count|
+--------------------+----------+------+
|               ARSON|     11156|  1463|
|             ASSAULT|    418502| 96828|
|             BATTERY|   1232147|280938|
|            BURGLARY|    388023| 22407|
|CONCEALED CARRY L...|       283|   269|
| CRIM SEXUAL ASSAULT|     26995|  4361|
|     CRIMINAL DAMAGE|    771454| 54498|
|CRIMINAL SEXUAL A...|       437|    64|
|   CRIMINAL TRESPASS|    193368|141632|
|  DECEPTIVE PRACTICE|    265737| 44279|
|   DOMESTIC VIOLENCE|         1|     1|
|            GAMBLING|     14422| 14318|
|            HOMICIDE|      9466|  4639|
|   HUMAN TRAFFICKING|        49|     6|
|INTERFERENCE WITH...|     15140| 13888|
|        INTIMIDATION|      3937|   699|
|          KIDNAPPING|      6658|   744|
|LIQUOR LAW VIOLATION|     14067| 13942|
| MOTOR VEHICLE THEFT|    314116| 28706|
|           NARCOTICS|    711646|707436|
+--------------------+----------+------+
only showing top

# Make Percent Arrest column with resulting joined columns of incidents and arrests (True) counts

In [49]:
df5 = df4.withColumn('Percent Arrest', col('count') / col('sum(count)'))

In [50]:
df5.show()

+--------------------+----------+------+-------------------+
|        Primary Type|sum(count)| count|     Percent Arrest|
+--------------------+----------+------+-------------------+
|               ARSON|     11156|  1463|0.13114019361778415|
|             ASSAULT|    418502| 96828|0.23136806992559175|
|             BATTERY|   1232147|280938|  0.228006885542066|
|            BURGLARY|    388023| 22407|0.05774657687817475|
|CONCEALED CARRY L...|       283|   269|  0.950530035335689|
| CRIM SEXUAL ASSAULT|     26995|  4361|0.16154843489535098|
|     CRIMINAL DAMAGE|    771454| 54498|0.07064322694548217|
|CRIMINAL SEXUAL A...|       437|    64|0.14645308924485126|
|   CRIMINAL TRESPASS|    193368|141632| 0.7324479748458897|
|  DECEPTIVE PRACTICE|    265737| 44279|0.16662715391533733|
|   DOMESTIC VIOLENCE|         1|     1|                1.0|
|            GAMBLING|     14422| 14318| 0.9927887948966856|
|            HOMICIDE|      9466|  4639| 0.4900697232199451|
|   HUMAN TRAFFICKING|  

# Calling groupy() on single column, then agg function (sum, here), followed by collect()[0][0] gets you the raw value

In [60]:
sum_arrest = df5.select('count').groupby().sum()

In [68]:
sum_arrest.collect()[0][0]

1874211

In [69]:
sum_incidents = df5.select('sum(count)').groupby().sum().collect()[0][0]

In [70]:
1874211 / sum_incidents

0.2775487591901761

In [71]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)



# Use Window and rank() to get the top 3 locations for each crime

## Define the window partioning by crime type and ordering by the count of type/location (To be calculated)

In [104]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank
# Partitioning by Crime type, so we get a ranking for each
windowSpec  = Window.partitionBy("Primary Type").orderBy(col("count").desc())

## GroupBy type/location and count and compute the rank

In [91]:
df_place = df.groupBy('Primary Type', 'Location Description').count()
df_place = df_place.withColumn("location rank", rank().over(windowSpec))

In [90]:
pdf

Unnamed: 0,Primary Type,Location Description,count,location rank
0,OFFENSE INVOLVING CHILDREN,RESIDENCE,24877,1
1,OFFENSE INVOLVING CHILDREN,APARTMENT,10755,2
2,OFFENSE INVOLVING CHILDREN,STREET,2008,3
3,OFFENSE INVOLVING CHILDREN,SIDEWALK,1115,4
4,OFFENSE INVOLVING CHILDREN,OTHER,1088,5
...,...,...,...,...
195,PUBLIC PEACE VIOLATION,"SCHOOL, PUBLIC, BUILDING",4591,4
196,PUBLIC PEACE VIOLATION,APARTMENT,1947,5
197,PUBLIC PEACE VIOLATION,ALLEY,1385,6
198,PUBLIC PEACE VIOLATION,OTHER,1366,7


In [94]:
df_place.filter(col('location rank') <= 3).show()

+--------------------+--------------------+-----+-------------+
|        Primary Type|Location Description|count|location rank|
+--------------------+--------------------+-----+-------------+
|OFFENSE INVOLVING...|           RESIDENCE|24877|            1|
|OFFENSE INVOLVING...|           APARTMENT|10755|            2|
|OFFENSE INVOLVING...|              STREET| 2008|            3|
|CRIMINAL SEXUAL A...|           RESIDENCE|  174|            1|
|CRIMINAL SEXUAL A...|           APARTMENT|   99|            2|
|CRIMINAL SEXUAL A...|              STREET|   20|            3|
|            STALKING|           RESIDENCE|  928|            1|
|            STALKING|              STREET|  889|            2|
|            STALKING|           APARTMENT|  379|            3|
|PUBLIC PEACE VIOL...|              STREET|14651|            1|
|PUBLIC PEACE VIOL...|            SIDEWALK| 9069|            2|
|PUBLIC PEACE VIOL...|           RESIDENCE| 4789|            3|
|NON-CRIMINAL (SUB...|           APARTME

## Filtering on multiple columns

## Show top 3 stalking only

In [102]:
df_place.filter(
    (col('location rank') <= 3) & 
    (col('Primary Type') == 'STALKING')
    ).show()

+------------+--------------------+-----+-------------+
|Primary Type|Location Description|count|location rank|
+------------+--------------------+-----+-------------+
|    STALKING|           RESIDENCE|  928|            1|
|    STALKING|              STREET|  889|            2|
|    STALKING|           APARTMENT|  379|            3|
+------------+--------------------+-----+-------------+



# Simple solutions to overall percent of incidents resulting in arrest and top locations

In [105]:
df.filter(col('Arrest') == 'true').count() / df.select('Arrest').count()

0.2775487591901761

In [107]:
df.groupby('Location Description').count().orderBy(col('count').desc()).show()

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|1770478|
|           RESIDENCE|1144683|
|           APARTMENT| 698195|
|            SIDEWALK| 665511|
|               OTHER| 256811|
|PARKING LOT/GARAG...| 193778|
|               ALLEY| 150901|
|SCHOOL, PUBLIC, B...| 142320|
|    RESIDENCE-GARAGE| 131616|
|  SMALL RETAIL STORE| 119269|
|RESIDENCE PORCH/H...| 117913|
|VEHICLE NON-COMME...| 108405|
|          RESTAURANT| 105599|
|  GROCERY FOOD STORE|  87300|
|    DEPARTMENT STORE|  83663|
|         GAS STATION|  71984|
|RESIDENTIAL YARD ...|  69471|
|CHA PARKING LOT/G...|  55453|
|       PARK PROPERTY|  52339|
|COMMERCIAL / BUSI...|  49172|
+--------------------+-------+
only showing top 20 rows

