In [2]:
path = "file:///home/snath/spark-ws/nypd.csv"
data = sc.textFile(path)

In [3]:
data.take(10)

['OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1',
 '1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"',
 '2,c6245d4d,12/14/1968 12:20:00 AM,Saturday,Dec,14,1968,0,12,14,2008,GRAND LARCENY,FELONY,G,28,MANHATTAN,N.Y. POLICE DEPT,996470,232106,"(40.8037530600001, -73.955861904)"',
 '3,716dbc6f,10/30/1970 03:30:00 PM,Friday,Oct,30,1970,15,10,31,2008,BURGLARY,FELONY,H,84,BROOKLYN,N.Y. POLICE DEPT,986508,190249,"(40.688874254, -73.9918594329999)"',
 '4,638cd7b7,07/18/1972 11:00:00 PM,Tuesday,Jul,18,1972,23,7,19,2012,GRAND LARCENY OF MOTOR VEHICLE,FELONY,F,73,BROOKLYN,N.Y. POLICE DEPT,1005876,182440,"(40.6674141890001, -73.9220463899999)"',
 '5,6e410287,05/21/1987 12:01:00

In [4]:
# Filter the header row
header = data.first()
print(header)

OBJECTID,Identifier,Occurrence Date,Day of Week,Occurrence Month,Occurrence Day,Occurrence Year,Occurrence Hour,CompStat Month,CompStat Day,CompStat Year,Offense,Offense Classification,Sector,Precinct,Borough,Jurisdiction,XCoordinate,YCoordinate,Location 1


In [5]:
dataWoHeader = data.filter(lambda x: x != header)

In [6]:
dataWoHeader.first()

'1,f070032d,09/06/1940 07:30:00 PM,Friday,Sep,6,1940,19,9,7,2010,BURGLARY,FELONY,D,66,BROOKLYN,N.Y. POLICE DEPT,987478,166141,"(40.6227027620001, -73.9883732929999)"'

In [9]:
dataWoHeader.map(lambda x:x.split(",")) #.take(10)

PythonRDD[7] at RDD at PythonRDD.scala:49

In [12]:
import csv
from io import StringIO
from collections import namedtuple

In [14]:
fields = header.replace(" ","_").replace("/","_").split(",")
print(fields)

['OBJECTID', 'Identifier', 'Occurrence_Date', 'Day_of_Week', 'Occurrence_Month', 'Occurrence_Day', 'Occurrence_Year', 'Occurrence_Hour', 'CompStat_Month', 'CompStat_Day', 'CompStat_Year', 'Offense', 'Offense_Classification', 'Sector', 'Precinct', 'Borough', 'Jurisdiction', 'XCoordinate', 'YCoordinate', 'Location_1']


In [20]:
Crime = namedtuple('Crime', fields)

In [21]:
def parse(row):
    reader = csv.reader(StringIO(row))
    #row=reader.next()
    #return Crime(*row)
    for row in reader:
        return Crime(*row)

In [24]:
crimes = dataWoHeader.map(parse)
crimes.first().Offense

'BURGLARY'

In [25]:
crimes.map(lambda x:  x.Offense).countByValue()

defaultdict(int,
            {'BURGLARY': 191369,
             'GRAND LARCENY': 428993,
             'GRAND LARCENY OF MOTOR VEHICLE': 101963,
             'RAPE': 13779,
             'ROBBERY': 198744,
             'FELONY ASSAULT': 184042,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4574,
             'NA': 1})

In [27]:
crimes.map(lambda x:  x.Occurrence_Year).countByValue()

defaultdict(int,
            {'1940': 1,
             '1968': 1,
             '1970': 2,
             '1972': 2,
             '1987': 6,
             '1990': 17,
             '1992': 12,
             '1994': 19,
             '1995': 27,
             '1996': 34,
             '1998': 74,
             '1999': 124,
             '2000': 282,
             '2001': 343,
             '2002': 368,
             '2003': 490,
             '2004': 692,
             '2005': 3272,
             '2006': 127887,
             '1910': 3,
             '1913': 4,
             '1945': 2,
             '1981': 1,
             '1985': 8,
             '1988': 6,
             '1991': 12,
             '1905': 2,
             '1971': 1,
             '1997': 40,
             '1914': 2,
             '1956': 1,
             '1989': 12,
             '1993': 23,
             '2015': 102657,
             '1954': 1,
             '1982': 5,
             '1950': 1,
             '1959': 1,
             '1966': 7,
            

In [59]:
crimesFiltered = crimes.filter(lambda x: not(x.Offense == 'NA' or x.Occurrence_Year =='') and int(x.Occurrence_Year) > 2005)

In [57]:
crimesFiltered.map(lambda x: x.Occurrence_Year).countByValue()

defaultdict(int,
            {'2006': 127887,
             '2015': 102657,
             '2007': 120554,
             '2008': 117375,
             '2009': 106018,
             '2010': 105643,
             '2011': 107206,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849})

In [61]:
crimesFiltered.map(lambda x:  x.Offense).countByValue()

defaultdict(int,
            {'GRAND LARCENY': 424635,
             'BURGLARY': 191045,
             'GRAND LARCENY OF MOTOR VEHICLE': 101728,
             'FELONY ASSAULT': 183879,
             'ROBBERY': 198569,
             'RAPE': 12974,
             'MURDER & NON-NEGL. MANSLAUGHTE': 4443})

In [45]:
def extractCoords(location):
    location_lat = float(location[1:location.index(",")])
    location_lon = float(location[location.index(",")+1:-1])
    return (location_lat, location_lon)

In [46]:
crimesFiltered.map(lambda x:extractCoords(x.Location_1)).reduce(lambda x,y: (min(x[0],y[0]),min(x[1],y[1])))

(40.112709974, -77.519206334)

In [49]:
crimesFiltered.map(lambda x:extractCoords(x.Location_1)).reduce(lambda x,y: (max(x[0],y[0]),max(x[1],y[1])))

(59.5805088160001, -73.700716685)

In [53]:
def validateCoord(x):
    a = extractCoords(x.Location_1)
    return (a[0]>=40.477399 and a[0]<=40.917577 and a[1]>=-74.25909 and a[1]<=-73.700009)
crimesFinal = crimesFiltered.filter(lambda x: validateCoord(x) )

In [54]:
crimesFinal.map(lambda x:extractCoords(x.Location_1)).reduce(lambda x,y: (min(x[0],y[0]),min(x[1],y[1])))

(40.4987675300001, -74.255075543)

In [55]:
crimesFinal.map(lambda x:extractCoords(x.Location_1)).reduce(lambda x,y: (max(x[0],y[0]),max(x[1],y[1])))

(40.912723396, -73.700716685)

In [66]:
results = crimesFinal.map(lambda x:x.Occurrence_Year).countByValue()

In [67]:
results

defaultdict(int,
            {'2006': 127887,
             '2015': 102657,
             '2007': 120491,
             '2008': 117375,
             '2009': 106018,
             '2010': 105639,
             '2011': 107203,
             '2012': 111798,
             '2013': 111286,
             '2014': 106849})

In [68]:
import gmplot

ModuleNotFoundError: No module named 'gmplot'