In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as t
from geopy.distance import geodesic
from math import sqrt

In [2]:
spark = SparkSession.builder.getOrCreate()
spark

# 1. Найти велосипед с максимальным временем пробега.

In [3]:
# Загружаем данные с поездками
trips = spark.read.format("csv").option("header", "true").load("trips.csv")

In [4]:
# Группируем по id велосипеда и агрегируем по продолжительности поездки
max_duration_trip = trips.groupBy(trips.bike_id).agg(
    F.max(trips.duration.cast(t.IntegerType())).alias("duration")
    ).orderBy(F.col("duration").desc())
max_duration_trip.show(1)

+-------+--------+
|bike_id|duration|
+-------+--------+
|    535|17270400|
+-------+--------+
only showing top 1 row



# 2. Найти наибольшее геодезическое расстояние между станциями.

In [5]:
# Загружаем данные со станциями
stations = spark.read.format("csv").option("header", "true").load("stations.csv")

In [6]:
# Выбираем столбцы id, lat, long в проекции
stations_data = stations.select(stations.id, stations.lat, stations.long)

# Соединяем данные о станциям сами с собой по id
stations_comb = stations_data.select(
        stations_data.id.alias("id1"),
        stations_data.lat.alias("lat1"),
        stations_data.long.alias("long1")
    ).join(stations_data.select(
        stations_data.id.alias("id2"),
        stations_data.lat.alias("lat2"),
        stations_data.long.alias("long2")
    )).where(F.col("id1") != F.col("id2"))

def dist(lat1, long1, lat2, long2):
  '''Подсчет расстояния между станциями'''
  return ((lat2-lat1)**2 + (long2-long1)**2)**(1/2)

# Вычисление расстояние между станциями проекцией
max_dist = stations_comb.select(
        stations_comb.id1,
        stations_comb.id2,
        dist(stations_comb.lat1, stations_comb.long1, stations_comb.lat2, stations_comb.long2).alias("dist")
    ).orderBy(F.col("dist").desc())

max_dist.show(1)

+---+---+------------------+
|id1|id2|              dist|
+---+---+------------------+
| 16| 60|0.7058482821754397|
+---+---+------------------+
only showing top 1 row



# 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [7]:
# Поиск велосипеда с максимальной суммой путей и вывод его путей
trip_path = trips.select(
        trips.id,
        trips.bike_id,
        trips.start_station_id,
        trips.end_station_id
    ).where(
        trips.bike_id == max_duration_trip.first().bike_id
    ).orderBy(trips.id.cast(t.IntegerType()).asc())

trip_path.show()

+-----+-------+----------------+--------------+
|   id|bike_id|start_station_id|end_station_id|
+-----+-------+----------------+--------------+
| 4966|    535|              47|            70|
| 5067|    535|              70|            69|
| 5179|    535|              69|            77|
| 5199|    535|              77|            64|
| 7806|    535|              61|            42|
|11422|    535|              58|            72|
|12245|    535|              72|            47|
|12485|    535|              47|            60|
|12558|    535|              60|            46|
|13107|    535|              46|            77|
|13423|    535|              77|            77|
|14380|    535|              77|            62|
|14581|    535|              62|            61|
|15231|    535|              55|            61|
|15242|    535|              61|            60|
|15347|    535|              60|            41|
|15605|    535|              41|            50|
|15611|    535|              50|        

# 4. Найти количество велосипедов в системе.

In [8]:
# Группировка по id велосипеда и агрегация по тому же id
trips.groupBy(trips.bike_id).agg(trips.bike_id).count()

700

# 5. Найти пользователей потративших на поездки более 3 часов.

In [9]:
# Поиск пользователей с минимальным временем поездки 3 часа,
# путем группировки по id пользоватеся и агрегацией по продолжительности поездки
zip_trips = trips.select(
        trips.zip_code,
        trips.duration
    ).groupBy(trips.zip_code).agg(
        F.min(trips.duration.cast(t.IntegerType())).alias("duration")
    ).where(F.col("duration") > 10800)

zip_trips.show()

+--------+--------+
|zip_code|duration|
+--------+--------+
|    2136|   16010|
|   11722|   12158|
|   16303|   13053|
|    4665|   16322|
|   94079|   33057|
|   45219|   16841|
|    5052|   24097|
|   33805|   26903|
|   33706|   55535|
|    7015|   12889|
|   94068|   16250|
|  902104|   23925|
|    5732|   13550|
|    6245|   27242|
|   49168|   14129|
|   41311|   17027|
|   95454|   16359|
|   32003|   14691|
|   89084|   12237|
|   59160|   22505|
+--------+--------+
only showing top 20 rows

