In [1]:
#make sure spark is setup correct by initializing SparkContext
sc

<pyspark.context.SparkContext at 0x1c445c7ff60>

In [1]:
# Load crime data
path = "C:\data\\NYPD_7_Major_Felony_Incidents.csv"
data = sc.textFile(path)

In [2]:
# execute Action
data.take(10)

['OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1',
 '1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"',
 '2,c6245d4d,12/14/1968 12:20:00 AM,Saturday,Dec,14,1968,0,12,14,2008,GRAND LARCENY,FELONY,G,28,MANHATTAN,N.Y. POLICE DEPT,996470,232106,"(40.8037530600001, -73.955861904)"',
 '3,716dbc6f,10/30/1970 03:30:00 PM,Friday,Oct,30,1970,15,10,31,2008,BURGLARY,FELONY,H,84,BROOKLYN,N.Y. POLICE DEPT,986508,190249,"(40.688874254, -73.9918594329999)"',
 '4,638cd7b7,07/18/1972 11:00:00 PM,Tuesday,Jul,18,1972,23,7,19,2012,GRAND LARCENY OF MOTOR VEHICLE,FELONY,F,73,BROOKLYN,N.Y. POLICE DEPT,1005876,182440,"(40.6674141890001, -73.9220463899999)"',
 '5,6e410287,05/21/1987 12:01:00

### We have data in the form of string which is unstructured. Now we would convert the unstructured data in to structured objects so that the data access can be easier for analysis.

In [3]:
header = data.first()
print(header)

OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1


In [4]:
# separate header and data into 2 RDDs
dataWoHeader = data.filter(lambda x: x != header)

In [5]:
dataWoHeader.first()

'1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"'

In [6]:
# convert the string (data) into list of data 
dataWoHeader.map(lambda x:x.split(',')).take(10)

[['1',
  'f070032d',
  '09/06/1940 07:30:00 PM',
  'Friday',
  'Sep',
  '6',
  '1940',
  '19',
  '9',
  '7',
  '2010',
  'BURGLARY',
  'FELONY',
  'D',
  '66',
  'BROOKLYN',
  'N.Y. POLICE DEPT',
  '987478',
  '166141',
  '"(40.6227027620001',
  ' -73.9883732929999)"'],
 ['2',
  'c6245d4d',
  '12/14/1968 12:20:00 AM',
  'Saturday',
  'Dec',
  '14',
  '1968',
  '0',
  '12',
  '14',
  '2008',
  'GRAND LARCENY',
  'FELONY',
  'G',
  '28',
  'MANHATTAN',
  'N.Y. POLICE DEPT',
  '996470',
  '232106',
  '"(40.8037530600001',
  ' -73.955861904)"'],
 ['3',
  '716dbc6f',
  '10/30/1970 03:30:00 PM',
  'Friday',
  'Oct',
  '30',
  '1970',
  '15',
  '10',
  '31',
  '2008',
  'BURGLARY',
  'FELONY',
  'H',
  '84',
  'BROOKLYN',
  'N.Y. POLICE DEPT',
  '986508',
  '190249',
  '"(40.688874254',
  ' -73.9918594329999)"'],
 ['4',
  '638cd7b7',
  '07/18/1972 11:00:00 PM',
  'Tuesday',
  'Jul',
  '18',
  '1972',
  '23',
  '7',
  '19',
  '2012',
  'GRAND LARCENY OF MOTOR VEHICLE',
  'FELONY',
  'F',
  '73

In [7]:
import csv
import io
from collections import namedtuple

In [8]:
fields = header.replace(" ","_").split(",")
print(fields)

['OBJECTID', 'Identifier', 'Occurrence_Date', 'Day_of_Week', 'Occurrence_Month', 'Occurrence_Day', 'Occurrence_Year', 'Occurrence_Hour', 'CompStat_Month', 'CompStat_Day', 'CompStat_Year', 'Offense', 'Offense_Classification', 'Sector', 'Precinct', 'Borough', 'Jurisdiction', 'XCoordinate', 'YCoordinate', 'Location_1']


In [9]:
# Create a class "Crime" which will have all the fields that has in header dataset.
Crime = namedtuple("Crime", fields, verbose=True)


from builtins import property as _property, tuple as _tuple
from operator import itemgetter as _itemgetter
from collections import OrderedDict

class Crime(tuple):
    'Crime(OBJECTID, Identifier, Occurrence_Date, Day_of_Week, Occurrence_Month, Occurrence_Day, Occurrence_Year, Occurrence_Hour, CompStat_Month, CompStat_Day, CompStat_Year, Offense, Offense_Classification, Sector, Precinct, Borough, Jurisdiction, XCoordinate, YCoordinate, Location_1)'

    __slots__ = ()

    _fields = ('OBJECTID', 'Identifier', 'Occurrence_Date', 'Day_of_Week', 'Occurrence_Month', 'Occurrence_Day', 'Occurrence_Year', 'Occurrence_Hour', 'CompStat_Month', 'CompStat_Day', 'CompStat_Year', 'Offense', 'Offense_Classification', 'Sector', 'Precinct', 'Borough', 'Jurisdiction', 'XCoordinate', 'YCoordinate', 'Location_1')

    def __new__(_cls, OBJECTID, Identifier, Occurrence_Date, Day_of_Week, Occurrence_Month, Occurrence_Day, Occurrence_Year, Occurrence_Hour, CompStat_Month, CompStat_Day, CompStat_Year, Offens

In [10]:
#Function to take string(rows in data) and return instance of a Crime class
def parse(row):
    reader = csv.reader(io.StringIO(row))
    row = next(reader)
    return Crime(*row)  # *args behaves like a list and used to pass a variable number of arguments 

In [11]:
crimes = dataWoHeader.map(parse) # creates a Crime object/instance for each row values in the data

In [12]:
crimes.first()

Crime(OBJECTID='1', Identifier='f070032d', Occurrence_Date='09/06/1940 07:30:00 PM', Day_of_Week='Friday', Occurrence_Month='Sep', Occurrence_Day='6', Occurrence_Year='1940', Occurrence_Hour='19', CompStat_Month='9', CompStat_Day='7', CompStat_Year='2010', Offense='BURGLARY', Offense_Classification='FELONY', Sector='D', Precinct='66', Borough='BROOKLYN', Jurisdiction='N.Y. POLICE DEPT', XCoordinate='987478', YCoordinate='166141', Location_1='(40.6227027620001, -73.9883732929999)')

In [13]:
#Get the values of fields using the fields name
crimes.first().Offense

'BURGLARY'

### Identifying and filtering missed values

In [14]:
crimes.map(lambda x:x.Offense).countByValue()

defaultdict(int,
            {'BURGLARY': 191369,
             'FELONY ASSAULT': 184042,
             'GRAND LARCENY': 428993,
             'GRAND LARCENY OF MOTOR VEHICLE': 101963,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4574,
             'NA': 1,
             'RAPE': 13779,
             'ROBBERY': 198744})

In [15]:
crimes.map(lambda x:x.Occurrence_Year).countByValue()

defaultdict(int,
            {'': 244,
             '1905': 2,
             '1908': 1,
             '1910': 3,
             '1911': 1,
             '1912': 1,
             '1913': 4,
             '1914': 2,
             '1915': 3,
             '1920': 1,
             '1940': 1,
             '1945': 2,
             '1946': 1,
             '1950': 1,
             '1954': 1,
             '1955': 1,
             '1956': 1,
             '1958': 1,
             '1959': 1,
             '1960': 1,
             '1964': 1,
             '1965': 2,
             '1966': 7,
             '1968': 1,
             '1969': 1,
             '1970': 2,
             '1971': 1,
             '1972': 2,
             '1973': 5,
             '1974': 3,
             '1975': 2,
             '1976': 2,
             '1977': 3,
             '1978': 2,
             '1979': 6,
             '1980': 5,
             '1981': 1,
             '1982': 5,
             '1983': 1,
             '1984': 4,
             '1985': 8,
 

In [16]:
crimesFiltered = crimes.filter(lambda x: not x.Occurrence_Year == '')\
                    .filter(lambda x: int(x.Occurrence_Year) >= 2006)
 


In [17]:
crimesFiltered.map(lambda x:x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2007': 120554,
             '2008': 117375,
             '2009': 106018,
             '2010': 105643,
             '2011': 107206,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849,
             '2015': 102657})

### Filter an0malies

In [20]:
# Pass the location (in data) as string and extract Longitude and Lattitude
def extractCoords(location):
    loc_lat = float(location[1:location.index(',')])
    loc_long = float(location[location.index(',')+1:-1])
    return (loc_lat, loc_long)

In [21]:
# Get the min longitude and lattitude
crimesFiltered.map(lambda x:extractCoords(x.Location_1))\
                .reduce(lambda x,y:(min(x[0],y[0]),min(x[1],y[1])))

(40.112709974, -77.519206334)

In [22]:
# Get the max longitude and lattitude
crimesFiltered.map(lambda x:extractCoords(x.Location_1))\
                .reduce(lambda x,y:(max(x[0],y[0]),max(x[1],y[1])))

(59.5805088160001, -73.700716685)

In [23]:
#Take NY city boundaries from the web (constants) in terms of Longitude and Lattitude and filter out the records that falls outside these coords.
crimesFinal = crimesFiltered.filter(lambda x: extractCoords(x.Location_1)[0]>=40.477399 and \
                                               extractCoords(x.Location_1)[0]<=40.917577 and \
                                                extractCoords(x.Location_1)[1]>=-74.25909 and \
                                                extractCoords(x.Location_1)[1]<=-73.700009)

In [24]:
#Show/summarize the Trends by year
crimesFinal.map(lambda x:x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2007': 120491,
             '2008': 117375,
             '2009': 106018,
             '2010': 105639,
             '2011': 107203,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849,
             '2015': 102657})

In [25]:
crimesFinal.filter(lambda x: x.Offense == 'BURGLARY')\
            .map(lambda x: x.Occurrence_Year)\
            .countByValue()

defaultdict(int,
            {'2006': 23069,
             '2007': 21715,
             '2008': 20732,
             '2009': 19441,
             '2010': 18700,
             '2011': 18860,
             '2012': 19309,
             '2013': 17419,
             '2014': 16832,
             '2015': 14967})

### Visualize and Draw InSights - Crime data on Google map

In [26]:
import gmplot
gmap = gmplot.GoogleMapPlotter(37.428, -122.145, 16).from_geocode('New York City')

In [28]:
#burglary_lats = crimesFinal.filter(lambda x: x.Offense == 'BURGLARY' and x.Occurrence_Year == 2015)\
burglary_lats=crimesFinal.filter(lambda x:x.Offense=='BURGLARY' and x.Occurrence_Year=='2015')\
                        .map(lambda x: extractCoords(x.Location_1)[0])\
                        .collect()

In [29]:
burglary_lats

[40.6814381990001,
 40.643985023,
 40.710876791,
 40.81198578,
 40.7665716220001,
 40.7585326380001,
 40.631020431,
 40.6651346690001,
 40.685662779,
 40.864210277,
 40.768784828,
 40.864210277,
 40.6910397940001,
 40.7348527840001,
 40.744935135,
 40.692716492,
 40.615491223,
 40.760321794,
 40.5773796170001,
 40.6306860440001,
 40.622749023,
 40.768784828,
 40.6939528280001,
 40.7430801680001,
 40.6929761100001,
 40.7067341690001,
 40.7725020120001,
 40.8252587470001,
 40.774433463,
 40.828578104,
 40.8293492650001,
 40.6852000860001,
 40.6631257980001,
 40.5989247580001,
 40.8536146860001,
 40.7572421040001,
 40.7670996470001,
 40.764058097,
 40.6737400100001,
 40.7416645330001,
 40.801012471,
 40.5908611020001,
 40.731538464,
 40.8324699500001,
 40.595897343,
 40.6706671250001,
 40.8251892600001,
 40.744793357,
 40.765653688,
 40.5846411890001,
 40.6996504460001,
 40.6801118930001,
 40.577517662,
 40.887853793,
 40.730460432,
 40.73736705,
 40.671723302,
 40.7231264320001,
 40.8938

In [30]:
burglary_longs = crimesFinal.filter(lambda x:x.Offense=='BURGLARY' and x.Occurrence_Year=='2015')\
                            .map(lambda x: extractCoords(x.Location_1)[1])\
                            .collect()

In [31]:
burglary_longs

[-73.9545150989999,
 -73.9010261449999,
 -73.807024586,
 -73.914287154,
 -73.9183419349999,
 -73.9270798599999,
 -73.976608947,
 -73.765448342,
 -73.955471329,
 -73.8548086169999,
 -73.988806289,
 -73.8548086169999,
 -73.742278929,
 -73.958408122,
 -73.883632911,
 -73.8507357829999,
 -74.08165623,
 -73.863272463,
 -73.96829052,
 -73.9673140799999,
 -73.961403025,
 -73.988806289,
 -73.841364287,
 -73.954736461,
 -73.849862531,
 -73.7893667339999,
 -73.957135418,
 -73.893884137,
 -73.9884045759999,
 -73.904437269,
 -73.891886594,
 -73.939934878,
 -73.9986533759999,
 -73.9729687829999,
 -73.897272944,
 -73.968273486,
 -73.834767706,
 -73.9618373609999,
 -73.956863621,
 -73.9992833159999,
 -73.946216267,
 -73.9754888369999,
 -73.719747994,
 -73.9042437699999,
 -73.973391305,
 -73.875644318,
 -73.8764720409999,
 -73.993292546,
 -73.964645032,
 -73.9563343309999,
 -73.938316997,
 -73.904339953,
 -73.9619043389999,
 -73.85537181,
 -74.0020797129999,
 -73.8801783819999,
 -73.951720698,
 -74.00

In [32]:
gmap.scatter(burglary_lats, burglary_longs, '#4440C8', size=40, marker=False)


In [33]:
gmap.draw('crime_map.html')