In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F

In [3]:
sc = SparkContext('local')
spark = SparkSession(sc)

In [4]:
#df = spark.read.csv("Crime_Data_2010_2017.csv")
df = spark.read.format("csv").option("header", "True").option("delimiter", ",").load("Crime_Data_2010_2017.csv")

In [5]:
df.head()

Row(DR Number='1208575', Date Reported='03/14/2013', Date Occurred='03/11/2013', Time Occurred='1800', Area ID='12', Area Name='77th Street', Reporting District='1241', Crime Code='626', Crime Code Description='INTIMATE PARTNER - SIMPLE ASSAULT', MO Codes='0416 0446 1243 2000', Victim Age='30.0', Victim Sex='F', Victim Descent='W', Premise Code='502.0', Premise Description='MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)', Weapon Used Code='400.0', Weapon Description='STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)', Status Code='AO', Status Description='Adult Other', Crime Code 1='626.0', Crime Code 2=None, Crime Code 3=None, Crime Code 4=None, Address='6300    BRYNHURST                    AV', Cross Street=None, Location ='(33.9829, -118.3338)')

In [6]:
df.show(2)

+---------+-------------+-------------+-------------+-------+-----------+------------------+----------+----------------------+-------------------+----------+----------+--------------+------------+--------------------+----------------+--------------------+-----------+------------------+------------+------------+------------+------------+--------------------+------------+--------------------+
|DR Number|Date Reported|Date Occurred|Time Occurred|Area ID|  Area Name|Reporting District|Crime Code|Crime Code Description|           MO Codes|Victim Age|Victim Sex|Victim Descent|Premise Code| Premise Description|Weapon Used Code|  Weapon Description|Status Code|Status Description|Crime Code 1|Crime Code 2|Crime Code 3|Crime Code 4|             Address|Cross Street|           Location |
+---------+-------------+-------------+-------------+-------+-----------+------------------+----------+----------------------+-------------------+----------+----------+--------------+------------+----------------

In [7]:
df.count()

1584316

In [8]:
df.printSchema()

root
 |-- DR Number: string (nullable = true)
 |-- Date Reported: string (nullable = true)
 |-- Date Occurred: string (nullable = true)
 |-- Time Occurred: string (nullable = true)
 |-- Area ID: string (nullable = true)
 |-- Area Name: string (nullable = true)
 |-- Reporting District: string (nullable = true)
 |-- Crime Code: string (nullable = true)
 |-- Crime Code Description: string (nullable = true)
 |-- MO Codes: string (nullable = true)
 |-- Victim Age: string (nullable = true)
 |-- Victim Sex: string (nullable = true)
 |-- Victim Descent: string (nullable = true)
 |-- Premise Code: string (nullable = true)
 |-- Premise Description: string (nullable = true)
 |-- Weapon Used Code: string (nullable = true)
 |-- Weapon Description: string (nullable = true)
 |-- Status Code: string (nullable = true)
 |-- Status Description: string (nullable = true)
 |-- Crime Code 1: string (nullable = true)
 |-- Crime Code 2: string (nullable = true)
 |-- Crime Code 3: string (nullable = true)
 |-- 

In [9]:
df.columns

['DR Number',
 'Date Reported',
 'Date Occurred',
 'Time Occurred',
 'Area ID',
 'Area Name',
 'Reporting District',
 'Crime Code',
 'Crime Code Description',
 'MO Codes',
 'Victim Age',
 'Victim Sex',
 'Victim Descent',
 'Premise Code',
 'Premise Description',
 'Weapon Used Code',
 'Weapon Description',
 'Status Code',
 'Status Description',
 'Crime Code 1',
 'Crime Code 2',
 'Crime Code 3',
 'Crime Code 4',
 'Address',
 'Cross Street',
 'Location ']

In [10]:
from pyspark.sql.functions import isnan, when, count, col

In [11]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---------+-------------+-------------+-------------+-------+---------+------------------+----------+----------------------+--------+----------+----------+--------------+------------+-------------------+----------------+------------------+-----------+------------------+------------+------------+------------+------------+-------+------------+---------+
|DR Number|Date Reported|Date Occurred|Time Occurred|Area ID|Area Name|Reporting District|Crime Code|Crime Code Description|MO Codes|Victim Age|Victim Sex|Victim Descent|Premise Code|Premise Description|Weapon Used Code|Weapon Description|Status Code|Status Description|Crime Code 1|Crime Code 2|Crime Code 3|Crime Code 4|Address|Cross Street|Location |
+---------+-------------+-------------+-------------+-------+---------+------------------+----------+----------------------+--------+----------+----------+--------------+------------+-------------------+----------------+------------------+-----------+------------------+------------+---------

In [12]:
df_filtered = df.select("Date Reported", "Date Occurred", "Time Occurred", "Area ID", "Reporting District", 
                       "Crime Code", "Crime Code Description", "MO Codes", "Victim Age", "Victim Sex", "Victim Descent", "Premise Code", 
                       "Weapon Used Code", "Crime Code 1")

In [13]:
#df_filtered.show(10)

In [14]:
from pyspark.sql.types import IntegerType
#df_filtered.groupBy(df_filtered["Crime Code"].cast(IntegerType()).alias("Crime Code")).count().show()

In [15]:
#df.select("Victim Sex").distinct().show()

In [16]:
df_notNull = df_filtered.filter(df_filtered["Crime Code Description"].isNotNull())

In [17]:
df_notNull.count()

1583904

In [18]:
crimeCode_count = df_notNull.groupBy(df_notNull["Crime Code"].cast(IntegerType()).alias("Crime Code")).count()
crimeCode_count.orderBy(crimeCode_count["count"].desc()).show()

+----------+------+
|Crime Code| count|
+----------+------+
|       624|145767|
|       510|121329|
|       330|121318|
|       310|114751|
|       440|113709|
|       354|100653|
|       626| 85908|
|       740| 79433|
|       745| 71523|
|       230| 67631|
|       420| 63995|
|       210| 63408|
|       341| 56377|
|       930| 44560|
|       442| 35032|
|       331| 22588|
|       649| 18856|
|       946| 16449|
|       956| 16371|
|       900| 16053|
+----------+------+
only showing top 20 rows



In [19]:
victimSex_count = df_notNull.groupBy(df_notNull["Victim Sex"].alias("Victim Sex")).count()
victimSex_count.orderBy(victimSex_count["count"].desc()).show()

+----------+------+
|Victim Sex| count|
+----------+------+
|         M|739521|
|         F|675054|
|      null|145198|
|         X| 24077|
|         H|    53|
|         -|     1|
+----------+------+



In [20]:
from itertools import chain
from pyspark.sql.functions import create_map, lit

In [21]:
victimDescent_count = df_notNull.groupBy(df_notNull["Victim Descent"].alias("Victim Descent")).count()
descentDict = {"A":"Other Asian", "H":"Hispanic/Latin/Mexican", "B":"Black", "C":"Chinese", 
             "D":"Cambodian", "F":"Filipino", "G":"Guamanian", "I":"American Indian/Alaskan Native", 
             "J":"Japanese", "K":"Korean", "L":"Laotian", "O":"Other", "P":"Pacific Islander",
             "S":"Samoan", "U":"Hawaiian", "V":"Vietamese", "W":"White", "X":"Unknown", 
             "Z":"Asian Indian"}

mapping_expr = create_map([lit(x) for x in chain(*descentDict.items())])
victimDescent_count = victimDescent_count.withColumn('Victim Descent', mapping_expr[victimDescent_count['Victim Descent']])

#mapping = create_map([lit(x) for x in chain(*descentDict.items())])
#victimDescent_count.select(mapping[victimDescent_count['Victim Descent']].alias('Victim Descent'))
victimDescent_count.orderBy(victimDescent_count["count"].desc()).show(25)

+--------------------+------+
|      Victim Descent| count|
+--------------------+------+
|Hispanic/Latin/Me...|549363|
|               White|391787|
|               Black|254895|
|               Other|152758|
|                null|145231|
|             Unknown| 41531|
|         Other Asian| 37136|
|              Korean|  7136|
|            Filipino|  1885|
|American Indian/A...|   663|
|             Chinese|   618|
|    Pacific Islander|   276|
|            Japanese|   237|
|            Hawaiian|   137|
|           Vietamese|    86|
|           Guamanian|    60|
|        Asian Indian|    55|
|              Samoan|    24|
|           Cambodian|    15|
|             Laotian|    10|
|                null|     1|
+--------------------+------+



In [22]:
victimAge_count = df_notNull.groupBy(df_notNull["Victim Age"].alias("Victim Age")).count()
victimAge_count.orderBy(victimAge_count["count"].desc()).show()

+----------+------+
|Victim Age| count|
+----------+------+
|      null|128601|
|      15.0| 38175|
|      25.0| 36935|
|      24.0| 36093|
|      26.0| 35689|
|      23.0| 35129|
|      27.0| 35093|
|      28.0| 34513|
|      29.0| 34323|
|      30.0| 33681|
|      14.0| 32791|
|      22.0| 32659|
|      16.0| 31866|
|      31.0| 31642|
|      32.0| 31192|
|      35.0| 30366|
|      33.0| 30277|
|      21.0| 30071|
|      34.0| 30021|
|      36.0| 28005|
+----------+------+
only showing top 20 rows



In [23]:
AreaID_count = df_notNull.groupBy(df_notNull["Area ID"].alias("Area ID")).count()
AreaID_count.orderBy(AreaID_count["count"].desc()).show()

+-------+------+
|Area ID| count|
+-------+------+
|     12|110545|
|      3|102233|
|     15| 86370|
|     14| 83756|
|     18| 83475|
|     19| 80230|
|     11| 76616|
|      9| 75412|
|     13| 74373|
|     17| 74007|
|     21| 73280|
|      6| 72206|
|      5| 70942|
|     20| 70108|
|     10| 67792|
|      1| 67081|
|      2| 66999|
|      8| 66380|
|      7| 63595|
|     16| 60927|
+-------+------+
only showing top 20 rows

