<div style="font-size:18pt; padding-top:20px; text-align:center"><b>Bikeshare and </b> <span style="font-weight:bold; color:green">Spark DataFrames</span></div><hr>
<div style="text-align:right;">Sergei Papulin <span style="font-style: italic;font-weight: bold;">(papulin_bmstu@mail.ru)</span></div>

<a name="0"></a>
<div><span style="font-size:14pt; font-weight:bold">Contents</span>
    <ol>
        <li><a href="#1">Bike Trips</a></li>
        <li><a href="#2">NYC Zones</a></li>
        <li><a href="#3">Number of Stations per Zone</a></li>
        <li><a href="#4">Calculating out-degrees</a></li>
        <li><a href="#5">References</a></li>
    </ol>
</div>

Install the `geopandas` python library:

`pip install geopandas --user`

In [1]:
import json
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon, MultiPolygon
from geopandas.tools import sjoin

Install the `Folium` python library to plot maps:

`pip install folium --user`

In [2]:
import folium
from folium.plugins import HeatMap, HeatMapWithTime

In [3]:
# https://github.com/python-visualization/folium/issues/812
def embed_map(m):
    from IPython.display import IFrame

    m.save('index.html')
    return IFrame('index.html', width='100%', height='750px')

[OPTIONAL] **Environment Setup**

Run Spark Context

In [4]:
import pyspark
from pyspark.sql import SparkSession, Row

In [5]:
packages = "graphframes:graphframes:0.6.0-spark2.3-s_2.11"

In [6]:
conf = pyspark.SparkConf() \
        .set("spark.executor.memory", "1g") \
        .set("spark.executor.core", "2") \
        .set("spark.jars.packages", packages)\
        .setAppName("bikeGraphApp") \
        .setMaster("local[4]")

In [7]:
spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()

In [8]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [9]:
import graphframes as gf

<a name="1"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">1. Bike Trips</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

Trips (bikeshare): <a href="https://s3.amazonaws.com/tripdata/201902-citibike-tripdata.csv.zip">data</a> | <a href="https://www.citibikenyc.com/system-data">description</a><br>

In [10]:
trips_data_path = "data/201902-citibike-tripdata.csv"

[OPTIONAL] Copy the local dataset to HDFS

In [11]:
df_trips = spark.read.load(trips_data_path, 
                           format="csv", 
                           header="true", 
                           inferSchema="true",
                           sep=",")

print("Total number of trips:", df_trips.count())
df_trips.show(5)

Total number of trips: 943744
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|end station id|    end station name|end station latitude|end station longitude|bikeid|  usertype|birth year|gender|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|         219|2019-02-01 00:00:...|2019-02-01 00:03:...|            3494|E 115 St & Lexing...|             40.797911|               -73.9423|          3501|E 118 St & Madiso...|          40.8014866| 

In [12]:
df_stations_start = df_trips.select(F.col("start station id").alias("id"), 
                              F.col("start station latitude").alias("lat"), 
                              F.col("start station longitude").alias("lng"))\
                            .distinct()

df_stations_end = df_trips.select(F.col("end station id").alias("id"), 
                              F.col("end station latitude").alias("lat"), 
                              F.col("end station longitude").alias("lng"))\
                            .distinct()

df_stations = df_stations_start.union(df_stations_end).distinct()

print("Number of stations:", df_stations.count())
print("Number of partitions:", df_stations.rdd.getNumPartitions())
print("Mean of partition size:", df_stations.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).mean())

df_stations.show(5)

Number of stations: 780
Number of partitions: 200
Mean of partition size: 3.900000000000001
+----+-----------+------------+
|  id|        lat|         lng|
+----+-----------+------------+
| 379|  40.749156|    -73.9916|
|3495| 40.7945663| -73.9362541|
|3490|  40.796879|  -73.937261|
| 340|40.71269042|-73.98776323|
|3391| 40.7892529|-73.93956237|
+----+-----------+------------+
only showing top 5 rows



Reduce the number of partitions:

In [13]:
df_stations = df_stations.coalesce(4).persist()
df_stations.rdd.getNumPartitions()

4

In [14]:
df_stations.show(5)

+----+-----------+------------+
|  id|        lat|         lng|
+----+-----------+------------+
| 379|  40.749156|    -73.9916|
|3495| 40.7945663| -73.9362541|
|3490|  40.796879|  -73.937261|
| 340|40.71269042|-73.98776323|
|3391| 40.7892529|-73.93956237|
+----+-----------+------------+
only showing top 5 rows



Plot the stations on map

In [15]:
m = folium.Map()
for index, row in df_stations.toPandas().iterrows():
    folium.CircleMarker(location=(row["lat"], row["lng"]),
                        weight=1,
                        radius= 5,
                        color="seagreen",
                        fill_color="seagreen",
                        fill_opacity=0.5,
                        fill=True).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

<a name="2"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">2. NYC Zones</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

NYC zones: <a href="https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON">data</a>

In [16]:
borough_data_path = "data/NYC Taxi Zones.geojson"

Plot the NYC zones

In [17]:
style_function = lambda x: {
    "color" : "orange",
    "weight": 1
}

folium.GeoJson(borough_data_path, name="geojson", style_function=style_function).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

Convert the `zones` GeoJson to GeoDataFrame:

In [18]:
with open(borough_data_path) as f:
    zones_geojson = json.load(f)

In [19]:
column_name_list = [key for key, value in zones_geojson["features"][0]["properties"].items()]
column_name_list += ["geometry"]
column_name_list

['shape_area',
 'objectid',
 'shape_leng',
 'location_id',
 'zone',
 'borough',
 'geometry']

In [20]:
def get_pandas_rows(features):
    for item in features:
        row = list()
        for key, value in item["properties"].items():
            row.append(value)        
        polygons = list()
        for polygon in item["geometry"]["coordinates"]:
            polygons.append(Polygon(polygon[0]))
        row.append(MultiPolygon(polygons=polygons))
        yield row

DataFrame:

In [21]:
df_zones_pn = pd.DataFrame(get_pandas_rows(zones_geojson["features"]), columns=column_name_list)
df_zones_pn.head(5)

Unnamed: 0,shape_area,objectid,shape_leng,location_id,zone,borough,geometry
0,0.0007823067885,1,0.116357453189,1,Newark Airport,EWR,(POLYGON ((-74.18445299999996 40.6949959999999...
1,0.00486634037837,2,0.43346966679,2,Jamaica Bay,Queens,(POLYGON ((-73.82337597260663 40.6389870471767...
2,0.000314414156821,3,0.0843411059012,3,Allerton/Pelham Gardens,Bronx,(POLYGON ((-73.84792614099985 40.8713422339999...
3,0.000111871946192,4,0.0435665270921,4,Alphabet City,Manhattan,(POLYGON ((-73.97177410965318 40.7258212813370...
4,0.000497957489363,5,0.0921464898574,5,Arden Heights,Staten Island,(POLYGON ((-74.17421738099989 40.5625680859999...


GeoDataFrame:

In [22]:
gdf_zones = gpd.GeoDataFrame(df_zones_pn, geometry=df_zones_pn["geometry"])
gdf_zones.head(5)

Unnamed: 0,shape_area,objectid,shape_leng,location_id,zone,borough,geometry
0,0.0007823067885,1,0.116357453189,1,Newark Airport,EWR,"MULTIPOLYGON (((-74.18445 40.69500, -74.18449 ..."
1,0.00486634037837,2,0.43346966679,2,Jamaica Bay,Queens,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ..."
2,0.000314414156821,3,0.0843411059012,3,Allerton/Pelham Gardens,Bronx,"MULTIPOLYGON (((-73.84793 40.87134, -73.84725 ..."
3,0.000111871946192,4,0.0435665270921,4,Alphabet City,Manhattan,"MULTIPOLYGON (((-73.97177 40.72582, -73.97179 ..."
4,0.000497957489363,5,0.0921464898574,5,Arden Heights,Staten Island,"MULTIPOLYGON (((-74.17422 40.56257, -74.17349 ..."


<a name="3"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">3. Number of Stations per Zone</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

#### Approach 0 

Just out of curiosity, here is pandas alternative. This approach is reasonable if there is a modest dataset like in the task considered:

In [23]:
# %%timeit -n1
df_stations_pn = df_stations.toPandas()
points = gpd.GeoDataFrame(df_stations_pn,
                          geometry=gpd.points_from_xy(df_stations_pn["lng"], 
                                                      df_stations_pn["lat"]))
sjoin(points[["geometry"]], gdf_zones, how="left")\
        .groupby(["location_id", "zone"])["location_id"]\
        .count()\
        .reset_index(name="count")\
        .head(5)

Unnamed: 0,location_id,zone,count
0,100,Garment District,6
1,106,Gowanus,10
2,107,Gramercy,5
3,112,Greenpoint,15
4,113,Greenwich Village North,9


#### Approach 1

In [24]:
bc_zones = spark.sparkContext.broadcast(gdf_zones)
bc_zones

<pyspark.broadcast.Broadcast at 0x118cbef90>

In [25]:
def zone_contains_v1(row):
    point = Point((row["lng"], row["lat"]))
    for index, item in bc_zones.value.iterrows():
        if item["geometry"].contains(point):
            return (item["location_id"], item["zone"])

In [26]:
# %%timeit -n1
df_stations.rdd.map(zone_contains_v1).countByValue()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 37.0 failed 1 times, most recent failure: Lost task 0.0 in stage 37.0 (TID 2056, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'shapely'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 580, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'shapely'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


#### Approach 2

In [None]:
def zone_contains_v2(rows):
    points = list()
    for row in rows: 
        points.append([row["lng"], row["lat"]])
    if len(points) == 0:
        return list()
    df_points_pn = pd.DataFrame(points, columns=["lng", "lat"])
    gdf_points = gpd.GeoDataFrame(df_points_pn, 
                                  geometry=gpd.points_from_xy(df_points_pn["lng"],
                                                              df_points_pn["lat"]))
    for index, item in sjoin(gdf_points[["geometry"]], bc_zones.value, how="left").iterrows():
        yield (item["location_id"], item["zone"])

In [None]:
# %%timeit -n1
df_stations.rdd.mapPartitions(zone_contains_v2).countByValue()

#### Convert to Dataframe

In [None]:
def zone_contains_v2_df(rows):
    points = list()
    for row in rows: 
        points.append([row["lng"], row["lat"]])
    if len(points) == 0:
        return list()
    df_points_pn = pd.DataFrame(points, columns=["lng", "lat"])
    gdf_points = gpd.GeoDataFrame(df_points_pn, 
                                  geometry=gpd.points_from_xy(df_points_pn["lng"],
                                                              df_points_pn["lat"]))
    for index, item in sjoin(gdf_points[["geometry"]], bc_zones.value, how="left").iterrows():
        yield Row(id=item["location_id"], zone=item["zone"])

In [None]:
#%%timeit -n1
df_stations_count = df_stations.rdd.mapPartitions(zone_contains_v2_df).toDF()\
                        .groupBy("id", "zone")\
                        .agg(F.count("id").alias("count"))
df_stations_count.show()

#### Plotting on Map

In [None]:
df_count_pn = df_stations_count.toPandas()

Just out of curiosity here is pandas alternative:


```python
df_count_pn = sjoin(points[["geometry"]], gdf_zones, how="left")\
    .groupby(["location_id", "zone"])["location_id"]\
    .count()\
    .reset_index(name="count")

df_count_pn.head(5)
```

In [None]:
m = folium.Map()

folium.Choropleth(
    geo_data=zones_geojson,
    data=df_count_pn,
    columns=["id", "count"],
    name="Number of stations",
    legend_name="Number of stations",
    key_on="feature.properties.location_id",
    highlight=True,
    nan_fill_color="grey",
    nan_fill_opacity=0.1,
    fill_color="YlOrRd",
    fill_opacity=0.7,
    line_opacity=0.2,
).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

<a name="4"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">4. Calculating out-degrees</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

The total number of trips that start on a given station:

In [None]:
df_trips_start_counts = df_trips.select(F.col("start station id").alias("id"),
                                        F.col("start station latitude").alias("lat"), 
                                        F.col("start station longitude").alias("lng"))\
                                .groupBy("id")\
                                .agg(F.first("lat").alias("lat"), F.first("lng").alias("lng"), 
                                     F.count("id").alias("count"))
df_trips_start_counts.show()

Basic stats for the `count` column:

In [None]:
df_trips_start_counts.describe("count").show()

Median:

In [None]:
median = df_trips_start_counts.approxQuantile("count", [0.5], 0)[0]
median

Extract the `max` value:

In [None]:
max_start_count = df_trips_start_counts.select(F.max("count").alias("max")).rdd.collect()[0]["max"]
max_start_count

Convert the `df_trips_start_counts` dataframe to matrix:

In [None]:
trips_matrix = df_trips_start_counts.toPandas()[["lat", "lng", "count"]].values

Plot `HeatMap` for counts:

In [None]:
m = folium.Map()
HeatMap(trips_matrix, radius=15, max_val=max_start_count).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

<a name="5"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">5. References</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

[GeoPandas](http://geopandas.org)

[Shapely](https://github.com/Toblerity/Shapely)

[Folium](https://github.com/python-visualization/folium)