# Домашняя работа 2. Анализ поездок посредством Spark DataFrame API
Вохрамеев Михаил ИУ6-55Б

### Задание 1
- определите для каждой станции количество начала поездок и количество завершения поездок
- сопоставьте станции с кварталами города (zones) и определите суммы количества начала и завершения для каждого квартала
- выведите по убыванию количества поездок и
- отобразите в виде картограмм (Choropleth).

In [117]:
import pyspark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F
import pandas as pd

#### Инициализируем Spark Session:

In [118]:
conf = (
    pyspark.SparkConf()
        .set("spark.executor.memory", "1g")
        .set("spark.executor.core", "2")
        .set("spark.driver.host", "127.0.0.1")
        .setMaster("local[2]")
)

In [119]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

#### Создадим DataFrame и прочитаем из файла:

In [120]:
FILE_PATH = "./data/201902-citibike-tripdata.csv"

df_trips = spark.read.csv(path=FILE_PATH, header="true", inferSchema="true")
df_trips.printSchema()


[Stage 49:>                                                         (0 + 2) / 2]

root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- start station id: double (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: double (nullable = true)
 |-- start station longitude: double (nullable = true)
 |-- end station id: double (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: double (nullable = true)
 |-- end station longitude: double (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: integer (nullable = true)
 |-- gender: integer (nullable = true)



                                                                                

In [121]:
df_trips.show(5)

+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|tripduration|           starttime|            stoptime|start station id|  start station name|start station latitude|start station longitude|end station id|    end station name|end station latitude|end station longitude|bikeid|  usertype|birth year|gender|
+------------+--------------------+--------------------+----------------+--------------------+----------------------+-----------------------+--------------+--------------------+--------------------+---------------------+------+----------+----------+------+
|         219|2019-02-01 00:00:...|2019-02-01 00:03:...|          3494.0|E 115 St & Lexing...|             40.797911|               -73.9423|        3501.0|E 118 St & Madiso...|          40.8014866|          -73.9442507| 33450|Su

Общее кол-во поездок:

In [122]:
df_trips.count()

943744

Посчитаем кол-во начала, завершения поездок и их сумму:

In [123]:
df_start_counts = df_trips.groupBy(
    "start station id", "start station name", 
    "start station latitude", "start station longitude"
).count().withColumnRenamed("count", "start_count")

df_end_counts = df_trips.groupBy(
    "end station id", "end station name", 
    "end station latitude", "end station longitude"
).count().withColumnRenamed("count", "end_count")

In [124]:
df_start_counts_pn = df_start_counts.toPandas()
df_end_counts_pn = df_end_counts.toPandas()


df_start_counts_pn = df_start_counts_pn.rename(columns={
    "start station id": "id", "start station name": "name",
    "start station latitude": "lat", "start station longitude": "lng"
})
df_end_counts_pn = df_end_counts_pn.rename(columns={
    "end station id": "id", "end station name": "name",
    "end station latitude": "lat", "end station longitude": "lng"
})

                                                                                

In [125]:
df_stations_pn = pd.merge(
    df_start_counts_pn, df_end_counts_pn, 
    on=["id", "name", "lat", "lng"], 
    how="outer"
).fillna(0)

df_stations_pn["total"] = df_stations_pn["start_count"] + df_stations_pn["end_count"]

##### Кол-во начала и завершения поездок для каждой станции:

In [126]:
df_stations_pn

Unnamed: 0,id,name,lat,lng,start_count,end_count,total
0,72.0,W 52 St & 11 Ave,40.767272,-73.993929,1831.0,1867.0,3698.0
1,79.0,Franklin St & W Broadway,40.719116,-74.006667,1248.0,1238.0,2486.0
2,82.0,St James Pl & Pearl St,40.711174,-74.000165,654.0,665.0,1319.0
3,83.0,Atlantic Ave & Fort Greene Pl,40.683826,-73.976323,637.0,939.0,1576.0
4,119.0,Park Ave & St Edwards St,40.696089,-73.978034,171.0,217.0,388.0
...,...,...,...,...,...,...,...
775,0.0,0,40.860000,-73.890000,0.0,1.0,1.0
776,0.0,0,40.860000,-73.884000,2.0,1.0,3.0
777,0.0,0,40.863000,-73.890000,1.0,1.0,2.0
778,0.0,0,40.866000,-73.887000,0.0,1.0,1.0


##### Перейдем к работе с geojson:

In [127]:
import json
import pandas as pd
import geopandas as gpd
from shapely.geometry import Polygon, MultiPolygon
from geopandas.tools import sjoin

import folium

In [128]:
ZONES_PATH = "./data/NYC Taxi Zones.geojson"

In [129]:
m = folium.Map()

Построим зоны Нью-Йорка:

In [130]:
def embed_map(m):
    # from IPython.display import IFrame
    m.save('index.html')
    # return IFrame('index.html', width='100%', height='750px')

Визуализируем контуры:

In [131]:
style_function = lambda x: {
    "color" : "orange",
    "weight": 1
}

folium.GeoJson(ZONES_PATH, name="geojson", style_function=style_function).add_to(m)
m.fit_bounds(m.get_bounds())
# embed_map(m)

In [132]:
with open(ZONES_PATH) as f:
    zones_geojson = json.load(f)

In [133]:
column_name_list = [key for key, _ in zones_geojson["features"][0]["properties"].items()]
column_name_list += ["geometry"]
column_name_list

['shape_area',
 'objectid',
 'shape_leng',
 'location_id',
 'zone',
 'borough',
 'geometry']

Функция-генератор для извлечения свойств и преобразования координат полигонов из GeoJSON в объекты Shapely:

In [134]:
def create_zone_rows(features):
    for item in features:
        row = list()
        for key, value in item["properties"].items():
            row.append(value)        
        polygons = list()
        for polygon in item["geometry"]["coordinates"]:
            polygons.append(Polygon(polygon[0]))
        row.append(MultiPolygon(polygons=polygons))
        yield row

In [135]:
df_zones_pn = pd.DataFrame(data=create_zone_rows(zones_geojson["features"]), 
                           columns=column_name_list)
df_zones_pn.head(5)

Unnamed: 0,shape_area,objectid,shape_leng,location_id,zone,borough,geometry
0,0.0007823067885,1,0.116357453189,1,Newark Airport,EWR,MULTIPOLYGON (((-74.18445299999996 40.69499599...
1,0.00486634037837,2,0.43346966679,2,Jamaica Bay,Queens,MULTIPOLYGON (((-73.82337597260663 40.63898704...
2,0.000314414156821,3,0.0843411059012,3,Allerton/Pelham Gardens,Bronx,MULTIPOLYGON (((-73.84792614099985 40.87134223...
3,0.000111871946192,4,0.0435665270921,4,Alphabet City,Manhattan,MULTIPOLYGON (((-73.97177410965318 40.72582128...
4,0.000497957489363,5,0.0921464898574,5,Arden Heights,Staten Island,MULTIPOLYGON (((-74.17421738099989 40.56256808...


Преобразуем pandas df в GeoDataFrame:

In [136]:
gdf_zones = gpd.GeoDataFrame(df_zones_pn, geometry=df_zones_pn["geometry"])
gdf_zones.set_crs(epsg=4326, inplace=True, allow_override=True)
gdf_zones.head(5)

Unnamed: 0,shape_area,objectid,shape_leng,location_id,zone,borough,geometry
0,0.0007823067885,1,0.116357453189,1,Newark Airport,EWR,"MULTIPOLYGON (((-74.18445 40.695, -74.18449 40..."
1,0.00486634037837,2,0.43346966679,2,Jamaica Bay,Queens,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ..."
2,0.000314414156821,3,0.0843411059012,3,Allerton/Pelham Gardens,Bronx,"MULTIPOLYGON (((-73.84793 40.87134, -73.84725 ..."
3,0.000111871946192,4,0.0435665270921,4,Alphabet City,Manhattan,"MULTIPOLYGON (((-73.97177 40.72582, -73.97179 ..."
4,0.000497957489363,5,0.0921464898574,5,Arden Heights,Staten Island,"MULTIPOLYGON (((-74.17422 40.56257, -74.17349 ..."


GeoDataFrame для станций:

In [137]:
gdf_stations = gpd.GeoDataFrame(
    df_stations_pn,
    geometry=gpd.points_from_xy(df_stations_pn.lng, df_stations_pn.lat),
    crs="EPSG:4326"
)

Функция, которая находит в какую зону попадает каждая станция:

In [138]:
def apply_zones_locally(df_pandas, gdf_zones):
    gdf_points = gpd.GeoDataFrame(
        df_pandas,
        geometry=gpd.points_from_xy(df_pandas["lng"], df_pandas["lat"]),
        crs="EPSG:4326"
    )

    joined = sjoin(
        gdf_points,
        gdf_zones[["location_id", "geometry"]],
        how="left",
        predicate="intersects"
    )
    
    joined["zone_id"] = joined["location_id"].fillna(0).astype(int)
    return joined

In [139]:
df_stations_with_zones_pn = apply_zones_locally(df_stations_pn, gdf_zones)
df_stations_with_zones_pn.head(5)

Unnamed: 0,id,name,lat,lng,start_count,end_count,total,geometry,index_right,location_id,zone_id
0,72.0,W 52 St & 11 Ave,40.767272,-73.993929,1831.0,1867.0,3698.0,POINT (-73.99393 40.76727),49.0,50,50
1,79.0,Franklin St & W Broadway,40.719116,-74.006667,1248.0,1238.0,2486.0,POINT (-74.00667 40.71912),230.0,231,231
2,82.0,St James Pl & Pearl St,40.711174,-74.000165,654.0,665.0,1319.0,POINT (-74.00017 40.71117),41.0,45,45
3,83.0,Atlantic Ave & Fort Greene Pl,40.683826,-73.976323,637.0,939.0,1576.0,POINT (-73.97632 40.68383),97.0,97,97
4,119.0,Park Ave & St Edwards St,40.696089,-73.978034,171.0,217.0,388.0,POINT (-73.97803 40.69609),97.0,97,97


Агрегируем статистику поездок по зонам:

In [140]:
df_zone_stats_pn = df_stations_with_zones_pn.groupby("zone_id")[["start_count", "end_count", "total"]].sum().reset_index()
df_zone_stats_pn.head(5)


Unnamed: 0,zone_id,start_count,end_count,total
0,0,0.0,3.0,3.0
1,4,10752.0,10054.0,20806.0
2,7,5529.0,5598.0,11127.0
3,8,400.0,398.0,798.0
4,12,771.0,836.0,1607.0


##### Добавляем картограмму с выделением зон цветами в зависимости от кол-ва поездок:

In [141]:
m = folium.Map()

folium.Choropleth(
    geo_data=zones_geojson,
    data=df_zone_stats_pn,
    columns=["zone_id", "total"],
    name="кол-во поездок",
    legend_name="общее кол-во поездок",
    key_on="feature.properties.location_id",
    highlight=True,
    nan_fill_color="grey",
    nan_fill_opacity=0.1,
    fill_color="YlOrRd",
    fill_opacity=0.7,
    line_opacity=0.2,
).add_to(m)
m.fit_bounds(m.get_bounds())
# embed_map(m)

##### Добавляем вывод информации о зоне при наведении на нее мышкой:

In [142]:
gdf_zones["location_id"] = gdf_zones["location_id"].astype("int64")
gdf_zones_stats = gdf_zones.merge(df_zone_stats_pn, left_on="location_id", right_on="zone_id", how="left")
gdf_zones_stats.head(5)

Unnamed: 0,shape_area,objectid,shape_leng,location_id,zone,borough,geometry,zone_id,start_count,end_count,total
0,0.0007823067885,1,0.116357453189,1,Newark Airport,EWR,"MULTIPOLYGON (((-74.18445 40.695, -74.18449 40...",,,,
1,0.00486634037837,2,0.43346966679,2,Jamaica Bay,Queens,"MULTIPOLYGON (((-73.82338 40.63899, -73.82277 ...",,,,
2,0.000314414156821,3,0.0843411059012,3,Allerton/Pelham Gardens,Bronx,"MULTIPOLYGON (((-73.84793 40.87134, -73.84725 ...",,,,
3,0.000111871946192,4,0.0435665270921,4,Alphabet City,Manhattan,"MULTIPOLYGON (((-73.97177 40.72582, -73.97179 ...",4.0,10752.0,10054.0,20806.0
4,0.000497957489363,5,0.0921464898574,5,Arden Heights,Staten Island,"MULTIPOLYGON (((-74.17422 40.56257, -74.17349 ...",,,,


In [143]:
folium.features.GeoJson(
    gdf_zones_stats,
    name='информация о зонах',
    style_function=lambda x: {
        'fillColor': 'transparent',
        'color': 'black',
        'weight': 1,
        'fillOpacity': 0
    },
    tooltip=folium.features.GeoJsonTooltip(
        fields=['zone_id', 'zone', 'start_count', 'end_count', 'total'],
        aliases=['ID зоны:', 'Название:', 'Начало поездок:', 'Конец поездок:', 'Всего:'],
        localize=True
    )
).add_to(m)

<folium.features.GeoJson at 0x1759c7b20>

##### Добавляем отображение всех станций на карту в виде точек, а также popup с возможностью выбора слоев для отображения:

In [144]:
stations_group = folium.FeatureGroup(name='отображение станций', show=False)

for idx, row in df_stations_with_zones_pn.iterrows():
    folium.CircleMarker(
        location=[row['lat'], row['lng']],
        radius=3,
        color='darkred',
        fill_color='red',
        fill=True,
        fill_opacity=0.4,
        weight=1,
    ).add_to(stations_group)

stations_group.add_to(m)

folium.LayerControl(collapsed=False).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)


##### Выводим зоны по убыванию кол-ва поездок:

In [145]:
gdf_zones_stats_sorted = gdf_zones_stats[gdf_zones_stats['total'] > 0].sort_values('total', ascending=False)[['objectid', 'zone', 'start_count', 'end_count', 'total']]
gdf_zones_stats_sorted

Unnamed: 0,objectid,zone,start_count,end_count,total
73,79,East Village,43333.0,43170.0,86503.0
68,68,East Chelsea,38967.0,39395.0,78362.0
164,170,Murray Hill,32572.0,32578.0,65150.0
234,234,Union Sq,30527.0,31464.0,61991.0
113,113,Greenwich Village North,26485.0,27054.0,53539.0
...,...,...,...,...,...
144,152,Manhattanville,231.0,229.0,460.0
23,20,Belmont,7.0,8.0,15.0
14,18,Bedford Park,0.0,1.0,1.0
170,169,Mount Hope,1.0,0.0,1.0
