In [None]:
import os
import re

from pyspark.sql import SparkSession
from pyspark.sql.functions import hour

In [None]:
spark = SparkSession.builder.appName("nyc-taxi").getOrCreate()

In [None]:
def get_month_chunks(month, year='2015', path='/data'):
    prefix = str(year) + '-'
    digits = ('0' + str(month))[-2:]
    return sorted([path+'/'+f for f in os.listdir(path) if re.findall(prefix+digits+'.csv.gz',f)])

In [None]:
get_month_chunks(1)

In [None]:
def get_chunk_df(f):
    return spark.read.load(f, format='csv', header='true', inferSchema='true')

In [None]:
def get_month_df(month , year=2015):
    chunks = get_month_chunks(month, year=year)
    df = get_chunk_df(chunks[0])
    for chunk in chunks[1:]:
        df = df.union(get_chunk_df(chunk))
    return df

In [None]:
def get_year_df(year=2015):
    df = get_month_df(1, year=year)
    for m in range(2,13):
        df = df.union(get_month_df(m, year))
    return df

In [None]:
january = get_month_df(1)

In [None]:
january.columns

In [None]:
january.count()

In [None]:
january_hourly_count = january.select(hour('lpep_pickup_datetime')).withColumnRenamed('hour(lpep_pickup_datetime)', "hr").groupby('hr').count()

In [None]:
january_hours = january_hourly_count.orderBy('hr').collect()

In [None]:
january_hours

In [None]:
jan_sample = january.select('Pickup_latitude','Pickup_longitude',
    'Dropoff_latitude','Dropoff_longitude').sample(False,1.0/4000)

In [None]:
jan_sample.count()

In [None]:
sample_points = jan_sample.collect()

In [None]:
gpickup = [[p.Pickup_latitude,p.Pickup_longitude] for p in sample_points]
gdropoff = [[p.Dropoff_latitude,p.Dropoff_longitude] for p in sample_points]

In [None]:
import statistics
glattitudes = map(lambda p: p[0],gpickup+gdropoff)
glongitudes = map(lambda p: p[1],gpickup+gdropoff)
gcentre = [statistics.mean(glattitudes),statistics.mean(glongitudes)]

In [None]:
import folium
green_map = folium.Map(location=gcentre)
for p in gpickup:
    folium.CircleMarker(p ,fill_color='green',radius=5).add_to(green_map)
for p in gdropoff:
    folium.CircleMarker(p ,fill_color='blue',radius=5).add_to(green_map)       
green_map

In [None]:
from numpy import array
january_rdd = january.select('Pickup_latitude','Pickup_longitude').rdd.map(list).map(array)

In [None]:
from pyspark.mllib.clustering import KMeans

cluster_sets = sc.parallelize([KMeans.train(january_rdd, k, maxIterations=10,initializationMode="random")
    for k in range(3,8)])

In [None]:
def cluster_map(k):
    centers = [list(c) for c in cluster_sets.collect()[k].centers if abs(sum(c)) > 0.1]
    cmap = folium.Map(location=gcentre)
    for p in centers:
        folium.CircleMarker(p ,fill_color='green',radius=200).add_to(cmap)
    return cmap

In [None]:
cluster_map(4)