## Processing Chicago Divvy Data using Spark on RCC

In [1]:
from pyspark.sql import functions as F
import matplotlib.pyplot as plt

#from pyspark import SizeEstimator

In [2]:
spark = SparkSession.builder.appName('ChicagoTransportEDA').getOrCreate()

In [None]:
import pandas as pd

**Read data from HIVE table**

In [2]:
divy_bike = spark.sql("select * from atal.chicago_divvy_bike")

### Convert Latitude Longitude to Community Names

In [7]:
import gps_to_neighborhood as gps

NEAR SOUTH SIDE


In [8]:
all_neighborhoods = gps.get_all_neighborhoods()
def find_nbr(lon_lat):
    return gps.find_neighborhood(lon_lat[0],lon_lat[1],all_neighborhoods)

In [9]:
find_nbr([-87.616743,41.880958])

'LOOP'

In [10]:
sc.addPyFile("gps_to_neighborhood.py")
sc.addPyFile("Neighborhoods_2012_polygons.json")

**Create UDF for converting longitude and latitude to Community names**

In [11]:
from pyspark.sql.functions import udf,array
from pyspark.sql.types import StringType,DoubleType,DateType
udf_community = udf(lambda z: find_nbr(z), StringType())

In [8]:
#Remove the header row
divy_bike = divy_bike.filter(divy_bike.longitude_start != 'longitude_start')

In [9]:
divy_bike.select('longitude_start','latitude_start').show()

+------------------+------------------+
|   longitude_start|    latitude_start|
+------------------+------------------+
|        -87.668385|         41.939365|
|         -87.64693|          41.86458|
|-87.65371400000001|         41.921687|
|        -87.649633|         41.877749|
|        -87.661501|         41.872187|
|        -87.648747|         41.933341|
|        -87.639833|41.882090999999996|
|        -87.626937|41.891738000000004|
|-87.67410100000001|         41.961626|
|        -87.639566|         41.874337|
|         -87.63639|         41.888243|
|        -87.591875|         41.809443|
|-87.63058453549999|     41.8759326655|
|        -87.669313|         41.903448|
|        -87.648747|         41.933341|
|        -87.654752|         41.881487|
|      -87.63563839|       41.89680204|
|        -87.626937|41.891738000000004|
|         -87.63206|         41.890749|
|        -87.634664|         41.912202|
+------------------+------------------+
only showing top 20 rows



In [10]:
divy_bike = divy_bike.withColumn('CommunityName',udf_community(array(divy_bike.longitude_start.cast(DoubleType()),
                                                                        divy_bike.latitude_start.cast(DoubleType()))))

In [22]:
import pandas as pd
pd.set_option('max_columns',500)

In [12]:
div1 = divy_bike.limit(5).toPandas()

In [13]:
div1

Unnamed: 0,trip_id,year,month,week,day,hour,usertype,gender,starttime,stoptime,...,from_station_name,latitude_start,longitude_start,dpcapacity_start,to_station_id,to_station_name,latitude_end,longitude_end,dpcapacity_end,CommunityName
0,2355134,2014,6,27,0,23,Subscriber,Male,2014-06-30 23:57:00,2014-07-01 00:07:00,...,Lincoln Ave & Belmont Ave,41.939365,-87.668385,15.0,303,Broadway & Cornelia Ave,41.945512,-87.64598000000001,15.0,LAKE VIEW
1,2355133,2014,6,27,0,23,Subscriber,Male,2014-06-30 23:56:00,2014-07-01 00:00:00,...,Halsted St & Maxwell St,41.86458,-87.64693,15.0,22,May St & Taylor St,41.8694821,-87.6554864,15.0,"LITTLE ITALY, UIC"
2,2355130,2014,6,27,0,23,Subscriber,Male,2014-06-30 23:33:00,2014-06-30 23:35:00,...,Sheffield Ave & Webster Ave,41.921687,-87.65371400000001,19.0,225,Halsted St & Dickens Ave,41.919936,-87.64883,15.0,LINCOLN PARK
3,2355129,2014,6,27,0,23,Subscriber,Female,2014-06-30 23:26:00,2014-07-01 00:24:00,...,Peoria St & Jackson Blvd,41.877749,-87.649633,19.0,194,State St & Wacker Dr,41.887155,-87.62775,11.0,WEST LOOP
4,2355128,2014,6,27,0,23,Subscriber,Female,2014-06-30 23:16:00,2014-06-30 23:26:00,...,Loomis St & Lexington St,41.872187,-87.661501,15.0,134,Peoria St & Jackson Blvd,41.877749,-87.649633,19.0,"LITTLE ITALY, UIC"


In [14]:
#Create temporary tables
divy_bike.createOrReplaceTempView('divvy')

**Group the data by Date and Community Name**

In [15]:
div_aggregates = spark.sql('''
    select substr(starttime,1,11) as date,
    communityname,
    count(*) as tripCount,
    avg(temperature)
    from divvy
    group by date, communityname
    order by date
''')

#### Divvy Bike - Convert latitude longitude to community name - second attempt

In [3]:
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
#divy_bike.write.mode('overwrite').saveAsTable("cta_divvy_bike_community")

In [4]:
divy_bike = spark.sql("select * from atal.divvy_aggregate_3")

In [5]:
divy_bike.show()

+-----------+------------------+------------------+------------------+----------+
|         dt|    latitude_start|   longitude_start|           avgtemp|totaltrips|
+-----------+------------------+------------------+------------------+----------+
|2014-01-01 |         41.849527|        -87.640591|              21.0|         1|
|2014-01-01 |     41.8858327415|    -87.6413823149|              21.0|         1|
|2014-01-01 |41.889179999999996|          -87.6277|              21.0|         1|
|2014-01-01 |         41.895769|-87.67721999999999|              21.0|         2|
|2014-01-01 |41.895965999999994|-87.66774699999999|              17.1|         1|
|2014-01-01 |       41.89680204|      -87.63563839|              21.0|         1|
|2014-01-01 |         41.902893|        -87.687275|              17.1|         1|
|2014-01-01 |         41.913688|        -87.652855|              14.0|         1|
|2014-01-01 |         41.916027|-87.67741099999999|21.899999999999995|         3|
|2014-01-01 |   

In [12]:
divy_bike_comm = divy_bike.withColumn('CommunityName',udf_community(array(divy_bike.longitude_start.cast(DoubleType()),
                                                                        divy_bike.latitude_start.cast(DoubleType()))))

In [13]:
divy_bike_comm.show()

+-----------+------------------+------------------+------------------+----------+--------------------+
|         dt|    latitude_start|   longitude_start|           avgtemp|totaltrips|       CommunityName|
+-----------+------------------+------------------+------------------+----------+--------------------+
|2014-01-01 |         41.849527|        -87.640591|              21.0|         1|ARMOUR SQUARE,CHI...|
|2014-01-01 |     41.8858327415|    -87.6413823149|              21.0|         1|           WEST LOOP|
|2014-01-01 |41.889179999999996|          -87.6277|              21.0|         1|         RIVER NORTH|
|2014-01-01 |         41.895769|-87.67721999999999|              21.0|         2|WICKER PARK,WEST ...|
|2014-01-01 |41.895965999999994|-87.66774699999999|              17.1|         1|WICKER PARK,WEST ...|
|2014-01-01 |       41.89680204|      -87.63563839|              21.0|         1|         RIVER NORTH|
|2014-01-01 |         41.902893|        -87.687275|              17.1|   

In [15]:
divy_bike_comm.createOrReplaceTempView("divvy_agg_community")

In [18]:
divy_bike_comm.count()

554436

**Writing converted data to hive table**

In [None]:
#Try persist, checkpoint, cache

In [None]:
from pyspark.sql import HiveContext
hive_context = HiveContext(sc)
divy_bike_comm.write.mode('overwrite').saveAsTable("divy_bike_comm_hive")

In [None]:
divy_bike_comm.write.format("csv").save("/user/atal/data/divvy_community_name.csv")