In [2]:
from pyspark import SparkContext
sc = SparkContext()

In [5]:
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline  

import seaborn as sns
sns.set(style="whitegrid")

import fiona
import fiona.crs
import shapely
import rtree
import csv
import pyproj
import shapely.geometry as geom
import geopandas as gpd

In [6]:
proj = pyproj.Proj(init="epsg:2263", preserve_units=True) 
boroughs_geojson = 'boroughs.geojson'
neighborhoods_geojson = 'neighborhoods.geojson'
boroughs = gpd.read_file(boroughs_geojson).to_crs(fiona.crs.from_epsg(2263))
neighborhoods = gpd.read_file(neighborhoods_geojson).to_crs(fiona.crs.from_epsg(2263))
boroughs.head()


Unnamed: 0,boro_code,boro_name,shape_area,shape_leng,geometry
0,2,Bronx,1186612478.27,462958.187578,"MULTIPOLYGON (((1012821.806 229228.265, 101278..."
1,5,Staten Island,1623757282.78,325956.009,"MULTIPOLYGON (((970217.022 145643.332, 970227...."
2,3,Brooklyn,1937593022.64,738745.842115,"MULTIPOLYGON (((1021176.479 151374.797, 102100..."
3,4,Queens,3045878293.21,904188.424488,"MULTIPOLYGON (((1029606.077 156073.814, 102957..."
4,1,Manhattan,636602658.887,361212.476098,"MULTIPOLYGON (((981219.056 188655.316, 980940...."


In [7]:
boroughs.boro_name[0][0]

'B'

In [8]:
neighborhoods.head()

Unnamed: 0,neighborhood,boroughCode,borough,@id,geometry
0,Allerton,2,Bronx,http://nyc.pediacities.com/Resource/Neighborho...,"POLYGON ((1026123.025 256886.794, 1026891.266 ..."
1,Alley Pond Park,4,Queens,http://nyc.pediacities.com/Resource/Neighborho...,"POLYGON ((1055377.316 208575.849, 1055271.095 ..."
2,Arden Heights,5,Staten Island,http://nyc.pediacities.com/Resource/Neighborho...,"POLYGON ((937062.134 143738.067, 937063.349 14..."
3,Arlington,5,Staten Island,http://nyc.pediacities.com/Resource/Neighborho...,"POLYGON ((939915.999 173002.139, 939852.023 17..."
4,Arrochar,5,Staten Island,http://nyc.pediacities.com/Resource/Neighborho...,"POLYGON ((967369.901 155396.653, 967366.602 15..."


In [11]:
# We perform the same task using Spark. Here we run the task in
# parallel on each partition (chunk of data). For each task, we
# have to re-create the R-Tree since the index cannot be shared
# across partitions. Note: we have to import the package inside
# processTrips() to respect the closure property.
#
# We also factor out the code for clarity.

def createIndex(geojson):
    '''
    This function takes in a shapefile path, and return:
    (1) index: an R-Tree based on the geometry data in the file
    (2) zones: the original data of the shapefile
    
    Note that the ID used in the R-tree 'index' is the same as
    the order of the object in zones.
    '''
    import rtree
    import fiona.crs
    import geopandas as gpd
    zones = gpd.read_file(geojson).to_crs(fiona.crs.from_epsg(2263))
    index = rtree.Rtree()
    for idx,geometry in enumerate(zones.geometry):
        index.insert(idx, geometry.bounds)
    return (index, zones)

def findZone(p, index, zones):
    '''
    findZone returned the ID of the shape (stored in 'zones' with
    'index') that contains the given point 'p'. If there's no match,
    None will be returned.
    '''
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        if zones.geometry[idx].contains(p):
            return idx
    return None

def processTrips(pid, records):
    '''
    Our aggregation function that iterates through records in each
    partition, checking whether we could find a zone that contain
    the pickup location.
    '''
    import csv
    import pyproj
    import shapely.geometry as geom
    
    # Create an R-tree index
    proj = pyproj.Proj(init="epsg:2263", preserve_units=True)    
    B_index, B_zones = createIndex('boroughs.geojson')    
    N_index, N_zones = createIndex('neigborhood.geojson')    
    # Skip the header
    if pid==0:
        next(records)
    reader = csv.reader(records)
    counts = {}
    
    for row in reader:
        if 'NULL'  in row[2:6]:
            continue
        try:
            pickup = geom.Point(proj(float(row[3]), float(row[2])))
            dropoff = geom.Point(proj(float(row[5]), float(row[4])))
            borough = findZone(pickup, B_index, B_zones)
            neighborhood = findNeigborhoods(dropoff, N_index, N_zones)
        except:
            continue
        if borough and neighborhood:
            key = (borough, neighborhood)
            counts[key] = counts.get(key, 0) + 1
    return counts.items()

In [18]:
if __name__ == "__main__":
    

    
    neighborhoods_file = 'neighborhoods.geojson'
    boroughs_file = 'boroughs.geojson'
    
    neighborhoods = gpd.read_file(neighborhoods_file)
    boroughs = gpd.read_file(boroughs_file)

    taxi = sc.textFile(sys.argv[1])

    counts = taxi.filter(lambda row:len(row)>4) \
                 .mapPartitionsWithIndex(processTrips) \
                 .reduceByKey(lambda x,y: x+y) \
                 .map(lambda x: (boroughs.boro_name[x[0][0]],(neighborhoods.neighborhood[x[0][1]],x[1]))) \
                 .groupByKey() \
                 .mapValues(list) \
                 .mapValues(lambda x: sorted(x, key=lambda x:x[1], reverse=True)[:3]) \
                 .map(lambda x: (x[0], (x[1][0][0], x[1][0][1], x[1][1][0], x[1][1][1], x[1][2][0], x[1][2][1]))) \
                 .sortByKey() \
                 .saveAsTextFile(sys.argv[2])


NameError: name 'sys' is not defined