# Crime in New  York City 

In this notebook I tried to do some data preparation i.e. cleaning, transforming, summarizing using Apache Spark on the NYPD Crime dataset. The PySpark interface is used integrated with Spark Core features such as RDD (Resilient Distributed Dataset). 

## Overview

### 1) PySpark Initialization

### 2) Prepare  the data 

### 3) Identifying and Filtring Missing values

### 4) Understand the data

#### What is the trend in crime over the past few years?

####  Which categories of crimes are the most common?

#### In which jurisdiction are crimes committed the most common?

#### What is an avarage number of crimes committed per weekday?

#### Which hours are the criems committed the most common?

## PySpark Initialization

In [1]:
# SparkSession imports and initialization
import pyspark
sc = pyspark.SparkContext(master="local[4]",appName="NYPD_RDD")

In [2]:
# import data
rowdata = sc.textFile('NYPD_7_Major_Felony_Incidents.csv')

In [3]:
# count dataset
rowdata.count()

1123466

## Prepare  the data 

In [4]:
# Get header and dispalyed it
header = rowdata.first()
print(header)

OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1


In [5]:
# Filter out the dataset without header and split and parse a record
# SCALA data.filter(lambda x: x <> header)
dataWoHeader = rowdata.filter(lambda x: x !=header)\
                        .map(lambda row: [x for x in row.split(",")])

In [6]:
# Find empty fileds
dataWoHeader.filter(lambda x: "" in x).count()

25645

In [7]:
# Find empty fields column index
dataWoHeader.flatMap(lambda row: [i for i,x in enumerate(row) if x==""]).distinct().sortBy(lambda x:x).collect()

[0, 2, 3, 4, 5, 6, 9, 10, 13, 15, 16, 17, 18, 19]

In [8]:
#  clean and prepare the fields list
fields = header.replace(" ","_").replace("/","_").split(",")
fields.append('Location_2')
fields

['OBJECTID',
 'Identifier',
 'Occurrence_Date',
 'Day_of_Week',
 'Occurrence_Month',
 'Occurrence_Day',
 'Occurrence_Year',
 'Occurrence_Hour',
 'CompStat_Month',
 'CompStat_Day',
 'CompStat_Year',
 'Offense',
 'Offense_Classification',
 'Sector',
 'Precinct',
 'Borough',
 'Jurisdiction',
 'XCoordinate',
 'YCoordinate',
 'Location_1',
 'Location_2']

In [9]:
# prepare the datatype list for specific fields
structure = ["int",
             "string",
             "datetime",
             "string",
             "string",
             "int",
             "int",
             "int",
             "int",
             "int",
             "int",
             "string",
             "string",
             "string",
             "int",
             "string",
             "string",
             "int",
             "int",
             "float",
            "float"]

In [10]:
# display the fileds with data type
example_dict = dict(zip(fields,structure))
example_dict

{'OBJECTID': 'int',
 'Identifier': 'string',
 'Occurrence_Date': 'datetime',
 'Day_of_Week': 'string',
 'Occurrence_Month': 'string',
 'Occurrence_Day': 'int',
 'Occurrence_Year': 'int',
 'Occurrence_Hour': 'int',
 'CompStat_Month': 'int',
 'CompStat_Day': 'int',
 'CompStat_Year': 'int',
 'Offense': 'string',
 'Offense_Classification': 'string',
 'Sector': 'string',
 'Precinct': 'int',
 'Borough': 'string',
 'Jurisdiction': 'string',
 'XCoordinate': 'int',
 'YCoordinate': 'int',
 'Location_1': 'float',
 'Location_2': 'float'}

In [11]:

import datetime
# Occurrence_Date =>'09/06/1940 07:30:00 PM'
dt_fmt = "%m/%d/%Y %I:%M:%S %p"

def transform_element(RDD_elem, elem_struct, names, dt_fmt):
    """ Create the RDD, where each row is key-value dictionary.
    
        Args:
        RDD_elem : The RDD row.
        elem_struct : The field data type.
        names : The field name.
        dt_fmt : The format codes for strptime method.
        
        Return:
        The dictionary 
    """
    out = dict()
    for elem, e_type, name in zip(RDD_elem, elem_struct, names):
        if e_type == "int":
            if elem == "" or elem =='NA':
                elem = 9999
            else:
                elem = int(elem)
        elif e_type == "datetime":
            if elem == "":
                elem = ''
            else:
                elem = datetime.datetime.strptime(elem, dt_fmt)
        elif e_type == "float":
            if elem == "":
                elem = ''
            elif name =="Location_1":
                elem = float(elem[2:])
            else:
                elem = float(elem[:-2]) # Location_2
        out[name] = elem
    return out

In [12]:
# display the dataset row
x = dataWoHeader.first()
x

['1',
 'f070032d',
 '09/06/1940 07:30:00 PM',
 'Friday',
 'Sep',
 '6',
 '1940',
 '19',
 '9',
 '7',
 '2010',
 'BURGLARY',
 'FELONY',
 'D',
 '66',
 'BROOKLYN',
 'N.Y. POLICE DEPT',
 '987478',
 '166141',
 '"(40.6227027620001',
 ' -73.9883732929999)"']

In [13]:
# display transformed RDD
x = dataWoHeader.first()

transform_element(x, structure, fields, dt_fmt)

{'OBJECTID': 1,
 'Identifier': 'f070032d',
 'Occurrence_Date': datetime.datetime(1940, 9, 6, 19, 30),
 'Day_of_Week': 'Friday',
 'Occurrence_Month': 'Sep',
 'Occurrence_Day': 6,
 'Occurrence_Year': 1940,
 'Occurrence_Hour': 19,
 'CompStat_Month': 9,
 'CompStat_Day': 7,
 'CompStat_Year': 2010,
 'Offense': 'BURGLARY',
 'Offense_Classification': 'FELONY',
 'Sector': 'D',
 'Precinct': 66,
 'Borough': 'BROOKLYN',
 'Jurisdiction': 'N.Y. POLICE DEPT',
 'XCoordinate': 987478,
 'YCoordinate': 166141,
 'Location_1': 40.6227027620001,
 'Location_2': -73.9883732929999}

In [14]:
# use the transform_element function on the dataWoHeader RDD
data = dataWoHeader.map(lambda row:transform_element(row,structure,fields,dt_fmt))

In [15]:
#Persist data RDD with the default storage level 
data.cache()

PythonRDD[18] at RDD at PythonRDD.scala:53

## Identifying and Filtring Missing values

In [16]:
data.filter(lambda x: x["Occurrence_Date"] != "")\
.map(lambda x:(x["Occurrence_Date"].year,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[0]).collect()

[(1900, 3),
 (1905, 2),
 (1908, 1),
 (1910, 3),
 (1911, 1),
 (1912, 1),
 (1913, 4),
 (1914, 2),
 (1915, 3),
 (1920, 1),
 (1940, 1),
 (1945, 2),
 (1946, 1),
 (1950, 1),
 (1954, 1),
 (1955, 1),
 (1956, 1),
 (1958, 1),
 (1959, 1),
 (1960, 1),
 (1964, 1),
 (1965, 2),
 (1966, 7),
 (1968, 1),
 (1969, 1),
 (1970, 2),
 (1971, 1),
 (1972, 2),
 (1973, 5),
 (1974, 3),
 (1975, 2),
 (1976, 2),
 (1977, 3),
 (1978, 2),
 (1979, 6),
 (1980, 5),
 (1981, 1),
 (1982, 5),
 (1983, 1),
 (1984, 4),
 (1985, 8),
 (1986, 2),
 (1987, 6),
 (1988, 6),
 (1989, 12),
 (1990, 17),
 (1991, 12),
 (1992, 12),
 (1993, 23),
 (1994, 19),
 (1995, 27),
 (1996, 34),
 (1997, 40),
 (1998, 74),
 (1999, 124),
 (2000, 282),
 (2001, 343),
 (2002, 368),
 (2003, 490),
 (2004, 692),
 (2005, 3272),
 (2006, 127885),
 (2007, 120554),
 (2008, 117375),
 (2009, 106018),
 (2010, 105643),
 (2011, 107206),
 (2012, 111798),
 (2013, 111286),
 (2014, 106849),
 (2015, 102657)]

There is a significant difference in the case number when we compare the cases before and after 2006. This difference indicates that the data before 2006 are missing records. That is why we will focus only on the data after 2006. 

In [17]:
dataFiltered=data.filter(lambda x: not (x["Offense"] =="NA" or x["Occurrence_Date"] == ""))\
                    .filter(lambda x:x["Occurrence_Date"].year >=2006)
                

In [18]:
dataFiltered.count()

1117271

## Understand the data

### What is the trend in crime over the past few years?

In [19]:
dataFiltered.filter(lambda x: x["Occurrence_Date"] != "")\
                .map(lambda x:(x["Occurrence_Date"].year,1))\
                .reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[0]).collect()

[(2006, 127885),
 (2007, 120554),
 (2008, 117375),
 (2009, 106018),
 (2010, 105643),
 (2011, 107206),
 (2012, 111798),
 (2013, 111286),
 (2014, 106849),
 (2015, 102657)]

###  Which categories of crimes are the most common?

In [20]:
dataFiltered.map(lambda x : (x["Offense"],1)).reduceByKey(lambda x,y: x + y).takeOrdered(8, (lambda x: -x[1]))

[('GRAND LARCENY', 424635),
 ('ROBBERY', 198569),
 ('BURGLARY', 191043),
 ('FELONY ASSAULT', 183879),
 ('GRAND LARCENY OF MOTOR VEHICLE', 101728),
 ('RAPE', 12974),
 ('MURDER & NON-NEGL. MANSLAUGHTE', 4443)]

### In which jurisdiction are crimes committed the most common?

In [21]:
dataFiltered.map(lambda x : (x["Jurisdiction"],1)).reduceByKey(lambda x,y: x + y).takeOrdered(5, (lambda x: -x[1]))

[('N.Y. POLICE DEPT', 1034796),
 ('N.Y. HOUSING POLICE', 47327),
 ('N.Y. TRANSIT POLICE', 24362),
 ('PORT AUTHORITY', 4594),
 ('POLICE DEPT NYC', 1941)]

### What is an avarage number of crimes committed per weekday?

In [22]:
dataFiltered.filter(lambda x: x["Occurrence_Date"] != "").map(lambda x:(x["Occurrence_Date"].date(),1)).reduceByKey(lambda x,y: x+y).map(lambda x: (x[0].isoweekday(),x[1])) \
.aggregateByKey((0.,0.), (lambda acc,x: (acc[0]+x,acc[1]+1)), \
               (lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1])))\
.mapValues(lambda x: (x[1],x[0]/x[1])).takeOrdered(7)

[(1, (522.0, 307.1800766283525)),
 (2, (522.0, 301.0249042145594)),
 (3, (522.0, 305.09578544061304)),
 (4, (522.0, 303.45785440613025)),
 (5, (521.0, 340.33013435700576)),
 (6, (521.0, 305.89059500959695)),
 (7, (522.0, 278.62452107279694))]

### Which hours are the criems committed the most common?

In [23]:
dataFiltered.filter(lambda x: x["Occurrence_Date"] != "")\
            .map(lambda x : (x["Occurrence_Date"].hour,1))\
            .reduceByKey(lambda x,y: x + y).takeOrdered(5, (lambda x: -x[1]))

[(15, 65346), (12, 63565), (18, 62357), (17, 61076), (16, 60217)]