# Лабораторная 1. Интерактивный анализ данных велопарковок SF Bay Area Bike Share в Apache Spark

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as t
from geopy.distance import geodesic
import os

In [3]:
spark = SparkSession.builder.getOrCreate()
spark

In [4]:
os.chdir('../')
def get_path_to_file(file_name):
    return f'file:///{os.getcwd()}/data/{file_name}'.replace("\\", "/")

In [5]:
def get_data(spark, dataset_name):
    data = spark.read.format('csv').option('header', 'true').option("inferSchema", True).option("timestampFormat", 'M/d/y H:m').load(get_path_to_file(dataset_name))
    data = data.dropna()
    print("Схема:")
    data.printSchema()
    
    print("Первые 5 элементов:")
    data.show(n = 5)
    
    print("Описание датасета:")
    data.describe().show()
    
    print("Количество элементов:")
    print(data.count())
    return data

In [6]:
tripData = get_data(spark, "trips.csv")

Схема:
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Первые 5 элементов:
+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|         start_date|  start_station_name|start_station_id|           end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-------------

In [7]:
stationData = get_data(spark, "stations.csv")

Схема:
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)

Первые 5 элементов:
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        

# 1. Найти велосипед с максимальным временем пробега.

In [8]:
max_time_mileage_bike_data = tripData.groupBy('bike_id').agg(F.max(F.col("duration").cast(t.IntegerType())).alias("duration")).orderBy(F.col('duration').desc())
max_time_mileage_bike_data.show(1)

+-------+--------+
|bike_id|duration|
+-------+--------+
|    535|17270400|
+-------+--------+
only showing top 1 row



In [9]:
max_time_mileage_bike = max_time_mileage_bike_data.collect()[0].bike_id

# 2. Найти наибольшее геодезическое расстояние между станциями.

In [10]:
max_geo_dist_data = stationData.select("id", "lat", "long")

In [11]:
max_geo_dist_dict = {"max_dist": -1, "station1_id": -1, "station2_id": -1}
max_geo_dist_array = max_geo_dist_data.collect()
for idx, row in enumerate(max_geo_dist_array):
    if idx == 0:
        continue
    prev_val = max_geo_dist_array[idx - 1]
    mark1 = (prev_val.lat, prev_val.long)
    mark2 = (row.lat, row.long)
    dist = geodesic(mark1, mark2, ellipsoid='WGS-84').km
    if dist > max_geo_dist_dict["max_dist"]:
        max_geo_dist_dict["max_dist"] = dist
        max_geo_dist_dict["station1_id"] = prev_val.id
        max_geo_dist_dict["station2_id"] = row.id

In [12]:
max_geo_dist_data.where(F.col("id").between(max_geo_dist_dict["station1_id"], max_geo_dist_dict["station2_id"])).show()

+---+---------+-------------------+
| id|      lat|               long|
+---+---------+-------------------+
| 80|37.352601|-121.90573300000001|
| 82|37.798541|-122.40086200000002|
+---+---------+-------------------+



In [13]:
# Максимальное расстояние в километрах
max_geo_dist_dict["max_dist"]

66.05030433475366

# 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [14]:
max_dist_trip_data = tripData.select("id", "bike_id", "start_station_id", "end_station_id").filter(F.col("bike_id") == max_time_mileage_bike).orderBy(F.col("id").cast(t.IntegerType()))

In [15]:
max_dist_trip_data.show(max_dist_trip_data.count())

+------+-------+----------------+--------------+
|    id|bike_id|start_station_id|end_station_id|
+------+-------+----------------+--------------+
|  4966|    535|              47|            70|
|  5067|    535|              70|            69|
|  5179|    535|              69|            77|
|  5199|    535|              77|            64|
|  7806|    535|              61|            42|
| 11422|    535|              58|            72|
| 12245|    535|              72|            47|
| 13107|    535|              46|            77|
| 13423|    535|              77|            77|
| 14380|    535|              77|            62|
| 14581|    535|              62|            61|
| 15231|    535|              55|            61|
| 15242|    535|              61|            60|
| 15605|    535|              41|            50|
| 15611|    535|              50|            41|
| 15770|    535|              41|            70|
| 16294|    535|              70|            74|
| 16409|    535|    

# 4. Найти количество велосипедов в системе.

In [16]:
max_time_mileage_bike_data.count()

700

# 5. Найти пользователей потративших на поездки более 3 часов.

In [17]:
more_than_three_hours_data = tripData.groupBy('zip_code').agg(F.max(F.col("duration").cast(t.IntegerType())).alias("duration")).filter(F.col("duration") >= 10800)

In [18]:
more_than_three_hours_data.show(more_than_three_hours_data.count())

+--------+--------+
|zip_code|duration|
+--------+--------+
|   94102|  464952|
|   95134|   82487|
|   84606|   14575|
|   80305|   74749|
|   60070|   26540|
|   91910|   20243|
|   94610|   76287|
|   94404|   63504|
|   80301|   36931|
|   94309|   18484|
|   97239|  193241|
|   94592|   26999|
|   11106|   13773|
|   93013|   25116|
|   30324|   17117|
|   94568|  295512|
|   94015|  103760|
|   91780|   16184|
|   60661|   24042|
|    4665|   16342|
|   95130|  146178|
|   75219|   11750|
|   46614|   50626|
|   11218|   19232|
|   81611|   34138|
|   53714|   18241|
|   94550|   75233|
|   19333|   14683|
|   33805|   26926|
|   94107|  212191|
|   95020|   30718|
|   76039|   31418|
|   81377|   20201|
|   97206|   18491|
|   33129|   22108|
|    2144|   17023|
|   94621|   34635|
|   90802|   11866|
|   60611|   16186|
|   92116|   77858|
|   70119|  246487|
|   10512|   17799|
|   90042|   15199|
|    1207|   15179|
|   85254|   20678|
|   94070|   87612|
|   80435|   23464|
