In [1]:
#Show the spark environment 
sc

# Initial Loading file to RDD

In [91]:
# load open data 
# The data is fetched from New York City's Socrata data portal: https://data.cityofnewyork.us/Public-Safety/NYPD-7-Major-Felony-Incidents/hyij-8hr7/data
filePath="/Users/chenwisely/notebook/NYPD_7_Major_Felony_Incidents.csv"
data = sc.textFile(filePath)
data.count()

1123466

In [7]:
#show the header 
header=data.first()
print(header)

OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1


In [10]:
#show content of CSV file 
dataWoHeader=data.filter(lambda x: x!=header)
dataWoHeader.take(10)

['1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"',
 '2,c6245d4d,12/14/1968 12:20:00 AM,Saturday,Dec,14,1968,0,12,14,2008,GRAND LARCENY,FELONY,G,28,MANHATTAN,N.Y. POLICE DEPT,996470,232106,"(40.8037530600001, -73.955861904)"',
 '3,716dbc6f,10/30/1970 03:30:00 PM,Friday,Oct,30,1970,15,10,31,2008,BURGLARY,FELONY,H,84,BROOKLYN,N.Y. POLICE DEPT,986508,190249,"(40.688874254, -73.9918594329999)"',
 '4,638cd7b7,07/18/1972 11:00:00 PM,Tuesday,Jul,18,1972,23,7,19,2012,GRAND LARCENY OF MOTOR VEHICLE,FELONY,F,73,BROOKLYN,N.Y. POLICE DEPT,1005876,182440,"(40.6674141890001, -73.9220463899999)"',
 '5,6e410287,05/21/1987 12:01:00 AM,Thursday,May,21,1987,0,5,28,2009,GRAND LARCENY,FELONY,K,75,BROOKLYN,N.Y. POLICE DEPT,1017958,182266,"(40.6668988440001, -73.878495425)"',
 '6,7eebfe3c,02/01/1990 09:00:00 AM,Thursday,Feb,1,1990,9,9,17,2014,GRAND LARCENY,FELONY,K,105,QUEENS,N.Y. POLICE DEPT,

In [13]:
# deal with CSV file 
import csv
from io import StringIO
from collections import namedtuple

In [14]:
# modify the header 
fields=header.replace(" ","_").replace("/","_").split(",")
fields

['OBJECTID',
 'Identifier',
 'Occurrence_Date',
 'Day_of_Week',
 'Occurrence_Month',
 'Occurrence_Day',
 'Occurrence_Year',
 'Occurrence_Hour',
 'CompStat_Month',
 'CompStat_Day',
 'CompStat_Year',
 'Offense',
 'Offense_Classification',
 'Sector',
 'Precinct',
 'Borough',
 'Jurisdiction',
 'XCoordinate',
 'YCoordinate',
 'Location_1']

In [87]:
#convert csv to namedtuple class 
Crime=namedtuple("Crime",fields )

In [88]:
# define how we parse the CSV file 
def parse(row):
    reader=csv.reader(StringIO(row))
    row=next(reader)
    return Crime(*row)

In [89]:
# convert csv file to Crime RDD
crime=dataWoHeader.map(parse)

#  Discover data quality phase 1 , filter the un-quality data 

In [90]:
#show first element 
crime.first()

Crime(OBJECTID='1', Identifier='f070032d', Occurrence_Date='09/06/1940 07:30:00 PM', Day_of_Week='Friday', Occurrence_Month='Sep', Occurrence_Day='6', Occurrence_Year='1940', Occurrence_Hour='19', CompStat_Month='9', CompStat_Day='7', CompStat_Year='2010', Offense='BURGLARY', Offense_Classification='FELONY', Sector='D', Precinct='66', Borough='BROOKLYN', Jurisdiction='N.Y. POLICE DEPT', XCoordinate='987478', YCoordinate='166141', Location_1='(40.6227027620001, -73.9883732929999)')

In [33]:
crime.first().Offense

'BURGLARY'

In [35]:
# count the number group by the Offense type 
crime.map(lambda x: x.Offense).countByValue()

defaultdict(int,
            {'BURGLARY': 191369,
             'FELONY ASSAULT': 184042,
             'GRAND LARCENY': 428993,
             'GRAND LARCENY OF MOTOR VEHICLE': 101963,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4574,
             'NA': 1,
             'RAPE': 13779,
             'ROBBERY': 198744})

In [36]:
# show crime year
# found the data <2006 and >2006 is different scale 
# <2006 data should be incomplete 
crime.map(lambda x: x.Occurrence_Year).countByValue()

defaultdict(int,
            {'': 244,
             '1905': 2,
             '1908': 1,
             '1910': 3,
             '1911': 1,
             '1912': 1,
             '1913': 4,
             '1914': 2,
             '1915': 3,
             '1920': 1,
             '1940': 1,
             '1945': 2,
             '1946': 1,
             '1950': 1,
             '1954': 1,
             '1955': 1,
             '1956': 1,
             '1958': 1,
             '1959': 1,
             '1960': 1,
             '1964': 1,
             '1965': 2,
             '1966': 7,
             '1968': 1,
             '1969': 1,
             '1970': 2,
             '1971': 1,
             '1972': 2,
             '1973': 5,
             '1974': 3,
             '1975': 2,
             '1976': 2,
             '1977': 3,
             '1978': 2,
             '1979': 6,
             '1980': 5,
             '1981': 1,
             '1982': 5,
             '1983': 1,
             '1984': 4,
             '1985': 8,
 

In [39]:
# due to < 2006 data is not complete data
# filter the incompleted data 
crimeFiltered=crime.filter(lambda x: not (x.Offense=="NA" or x.Occurrence_Year==''))\
                   .filter(lambda x: int(x.Occurrence_Year)>=2006)

In [41]:
# analysis the layout again and see if there is any problem 
crimeFiltered.map(lambda x: x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2007': 120554,
             '2008': 117375,
             '2009': 106018,
             '2010': 105643,
             '2011': 107206,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849,
             '2015': 102657})

# Discover data quality phase 2 , filter the un-quality data 

In [60]:
# extract the Lat/Lon by the location string "(40.7522284, -73.971924858)"
def extractCoords(location):
    loc_lat=float(location[1:location.index(",")])
    loc_lon=float(location[location.index(",")+1:-1])
    return (loc_lat,loc_lon)

In [61]:
# find the min lat/lon value in dataset 
crimeFiltered.map(lambda x:extractCoords(x.Location_1))\
             .reduce(lambda x,y: (min(x[0],y[0]),min(x[1],y[1])))

(40.112709974, -77.519206334)

In [62]:
# find the max lat/lon value in dataset 
crimeFiltered.map(lambda x:extractCoords(x.Location_1))\
             .reduce(lambda x,y: (max(x[0],y[0]),max(x[1],y[1])))

(59.5805088160001, -73.700716685)

In [68]:
# New York's lat/lon min is 40.477399/-74.25909 , lat/lon max is 40.917577/-73.700009
# filter the out of New York range data 
crimeFinal=crimeFiltered.filter(lambda x: extractCoords(x.Location_1)[0]>=40.477399 and \
                                    extractCoords(x.Location_1)[0] <= 40.917577 and \
                                    extractCoords(x.Location_1)[1] >= -74.25909 and \
                                    extractCoords(x.Location_1)[1] <= -73.700009)

# CrimeFinal is the clean dataset

In [92]:
crimeFinal.first()

Crime(OBJECTID='258', Identifier='13b6949b', Occurrence_Date='01/09/2006 12:00:00 AM', Day_of_Week='Monday', Occurrence_Month='Jan', Occurrence_Day='9', Occurrence_Year='2006', Occurrence_Hour='0', CompStat_Month='8', CompStat_Day='16', CompStat_Year='2006', Offense='GRAND LARCENY', Offense_Classification='FELONY', Sector='H', Precinct='102', Borough='QUEENS', Jurisdiction='N.Y. POLICE DEPT', XCoordinate='1029007', YCoordinate='194256', Location_1='(40.6997596520001, -73.8385879319999)')

# Analysis the data we need and visualize it 

In [93]:
# fetch the data by the dimension we need 
crimeFinal.filter(lambda x: x.Offense=="BURGLARY")\
          .map(lambda x:x.Occurrence_Year)\
          .countByValue()

defaultdict(int,
            {'2006': 23069,
             '2007': 21715,
             '2008': 20732,
             '2009': 19441,
             '2010': 18700,
             '2011': 18860,
             '2012': 19309,
             '2013': 17419,
             '2014': 16832,
             '2015': 14967})

In [94]:
# get Occurrence_Year 2015 , Offense type = BURGLARY's lat/lon 
b_lats = crimeFinal.filter(lambda x: x.Offense=="BURGLARY" and x.Occurrence_Year=="2015")\
                   .map(lambda x:extractCoords(x.Location_1)[0])\
                   .collect()

b_lons = crimeFinal.filter(lambda x: x.Offense=="BURGLARY" and x.Occurrence_Year=="2015")\
                   .map(lambda x:extractCoords(x.Location_1)[1])\
                   .collect()

In [None]:
# visualize it 
import gmplot
gmap=gmplot.GoogleMapPlotter(37.428,-122.145,16).from_geocode("New Yrok")
gmap.scatter(b_lats,b_lons,"#DE1515" , size=40, marker=False)
gmap.draw("mymap.html")