In [26]:
import datetime
import operator
import os
import sys
import time
import pyspark
from operator import add
import numpy as np
import matplotlib.path as mplPath
start = time.time()

In [27]:
def create_geojson(filename,data):
    import json
    coordinatesList = {}
    with open ('block-groups-polygons.geojson') as dataFile:
        blockData = json.load(dataFile)
    count = 0
    for i in data:
        for block in blockData['features']:
            if i == block['properties']['OBJECTID']:
                coordinatesList[count] = [block['geometry'],block['properties']]
                count+=1

    template = \
        ''' \
        { "type" : "Feature",
            "id" : %s,
            "properties" : %s,
            "geometry" : %s
            },
        '''

    # the head of the geojson file
    output = \
        ''' \
    { "type" : "FeatureCollection",
        "features" : [
        '''

    for k,v in coordinatesList.iteritems():
        output += template % (k,json.dumps(v[1]),json.dumps(v[0]))

    # the tail of the geojson file
    output += \
        ''' \
        ]
    }
        '''

    # opens an geoJSON file to write the output to
    outFileHandle = open(filename+".geojson", "w")
    outFileHandle.write(output)
    outFileHandle.close()

In [28]:
def indexZones(shapeFilename):
    import rtree
    import fiona.crs
    import geopandas as gpd
    index = rtree.Rtree()
    zones = gpd.read_file(shapeFilename).to_crs(fiona.crs.from_epsg(2263))
    for idx,geometry in enumerate(zones.geometry):
        index.insert(idx, geometry.bounds)
    return (index, zones)

def findBlock(p, index, zones):
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        z = mplPath.Path(np.array(zones.geometry[idx].exterior))
        if z.contains_point(np.array(p)):
            return zones['OBJECTID'][idx]
    return -1

def findB(p, index, zones):
    match = index.intersection((p.x, p.y, p.x, p.y))
    for idx in match:
        if any(map(lambda x: x.contains(p), zones.geometry[idx])):
            return zones['boroname'][idx]
    return -1

In [29]:
def mapToZone(parts):
    import pyproj
    import shapely.geometry as geom
    proj = pyproj.Proj(init="epsg:2263", preserve_units=True)    
    index, zones = indexZones('block-groups-polygons-simple.geojson')
    index2, zones2 = indexZones('boroughs.geojson')
    for line in parts:
        if line.startswith('vendor_id'): continue 
        fields = line.strip('').split(',')
        if fields ==['']: continue
        if all((fields[5],fields[6],fields[9],fields[10])) and float(fields[4])<=2:
            pickup_location  = geom.Point(proj(float(fields[5]), float(fields[6])))
            dropoff_location = geom.Point(proj(float(fields[9]), float(fields[10])))
            pickup_block = findBlock(pickup_location, index, zones)
            dropoff_block = findBlock(dropoff_location, index, zones)
            pickup_borough = findB(pickup_location, index2, zones2)
            dropoff_borough = findB(dropoff_location, index2, zones2)
            if pickup_block>=0 and pickup_borough>0 and dropoff_block>0 and dropoff_borough>0:#np.array(pickup_block.exterior)
                yield (pickup_block,pickup_borough,dropoff_block,dropoff_borough)
                
                
def mapper2(k2v2):
    from heapq import nlargest
    k, values = k2v2
    top10 = nlargest(10, values, key=lambda x:x[1])
    return (k,top10)

In [34]:
# if __name__=='__main__':
#     if len(sys.argv)<3:
#         print "Usage: <input files> <output path>"
#         sys.exit(-1)

#sc = pyspark.SparkContext()
#     trips = sc.textFile(','.join(sys.argv[1:-1]))
trips = sc.textFile('/home/dc/Desktop/Big Data/codes/final project/yellow_tripdata_2011-05.csv')

output = sc.parallelize(mapToZone(trips.take(20000)))
pickup = output.map(lambda x: ((x[0],x[1]),1)).reduceByKey(lambda x,y: x+y).map(lambda x:(x[0][1], (x[0][0],x[1]))).groupByKey().map(mapper2)
print pickup.collect()
print (time.time()-start)/60.0

start = time.time()
dropoff = output.map(lambda x: ((x[2],x[3]),1)).reduceByKey(lambda x,y: x+y).map(lambda x:(x[0][1], (x[0][0],x[1]))).groupByKey().map(mapper2)
print dropoff.collect()
print (time.time()-start)/60.0

# pickup_all.union(dropff_all)

#print final.collect()
#final.saveAsTextFile(sys.argv[-1])

[(u'Bronx', [(5387, 1), (5267, 1), (5888, 1), (5396, 1), (5609, 1), (5357, 1), (5389, 1), (5513, 1)]), (u'Manhattan', [(9144, 190), (9245, 153), (8986, 135), (9052, 132), (9493, 122), (9023, 85), (9088, 84), (9509, 80), (9139, 73), (9594, 67)]), (u'Brooklyn', [(12755, 8), (12164, 7), (12048, 3), (11708, 3), (12756, 3), (11332, 3), (12041, 3), (12049, 3), (11297, 3), (12730, 3)]), (u'Queens', [(2281, 31), (2371, 6), (3722, 5), (2282, 4), (2252, 3), (3027, 2), (3030, 2), (3164, 2), (3711, 1), (2167, 1)])]
2.79014529785
[(u'Bronx', [(5358, 1), (5278, 1), (5491, 1), (5607, 1), (5263, 1), (5280, 1), (6032, 1), (5888, 1)]), (u'Manhattan', [(9144, 243), (9245, 165), (8986, 123), (9493, 111), (9052, 111), (9023, 109), (9509, 107), (9085, 83), (9139, 76), (9549, 72)]), (u'Brooklyn', [(12860, 3), (12876, 3), (12041, 3), (12049, 3), (12061, 2), (12757, 2), (12869, 2), (12218, 2), (11290, 2), (13006, 2)]), (u'Queens', [(2281, 24), (3722, 8), (2371, 5), (2282, 5), (3430, 2), (3711, 1), (3167, 1), (

In [31]:
# create_geojson("pickup_map",pickup_all.map(lambda x: x[0][0]).collect())
# create_geojson("dropoff_map",pickup_all.map(lambda x: x[0][0]).collect())


In [32]:
a = sc.parallelize([(1,2),(3,4)])
b = sc.parallelize([(1,2),(3,4)])
c = a.union(b)

In [33]:
c.collect()

[(1, 2), (3, 4), (1, 2), (3, 4)]