In [1]:
!pip3 install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
     ---------------------------------------- 0.0/204.7 MB ? eta -:--:--
     -------------------------------------- 0.0/204.7 MB 660.6 kB/s eta 0:05:10
     -------------------------------------- 0.1/204.7 MB 544.7 kB/s eta 0:06:16
     -------------------------------------- 0.1/204.7 MB 804.6 kB/s eta 0:04:15
     -------------------------------------- 0.2/204.7 MB 876.1 kB/s eta 0:03:54
     -------------------------------------- 0.2/204.7 MB 876.1 kB/s eta 0:03:54
     -------------------------------------- 0.2/204.7 MB 876.1 kB/s eta 0:03:54
     -------------------------------------- 0.2/204.7 MB 876.1 kB/s eta 0:03:54
     -------------------------------------- 0.2/204.7 MB 484.9 kB/s eta 0:07:02
     -------------------------------------- 0.2/204.7 MB 484.9 kB/s eta 0:07:02
     -------------------------------------- 0.2/204.7 MB 484.9 kB/s eta 0:07:02
     -------------------------------------- 0.2/204.7 M

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as t
from geopy.distance import geodesic
from math import sqrt

In [4]:
spark = SparkSession.builder.getOrCreate()
spark

**1. Найти велосипед с максимальным временем пробега.**

In [7]:
# Чтение данных из файла "trips.csv" в формате CSV с использованием Spark
trips = spark.read.format('csv').option('header', 'true').load("../data/trips.csv")

In [8]:
# Вычисление максимальной продолжительности поездки для каждого велосипеда
bike_max_trip_duration = (
    trips  # Исходные данные о поездках
    .groupBy('bike_id')  # Группировка по идентификатору велосипеда
    .agg(
        F.max(F.col("duration").cast(t.IntegerType())).alias("duration")  # Вычисление максимальной продолжительности поездки
    )
)

# Сортировка результатов по убыванию продолжительности поездки и выбор верхней строки
top_longest_trip = bike_max_trip_duration.orderBy(F.col('duration').desc())
top_longest_trip.show(1)  # Вывод верхней строки с наибольшей продолжительностью поездки

+-------+--------+
|bike_id|duration|
+-------+--------+
|    535|17270400|
+-------+--------+
only showing top 1 row



**2. Найти наибольшее геодезическое расстояние между станциями.**

In [9]:
# Чтение данных из файла "stations.csv" в формате CSV с использованием Spark
stations = spark.read.format('csv').option('header', 'true').load("../data/stations.csv")

In [10]:
# Выбор столбцов 'id', 'lat' и 'long' из исходных данных о станциях
stations_data = stations.select("id", "lat", "long")
# Вывод первых 5 строк для проверки
stations_data.show(5)

# Создание комбинаций станций для вычисления расстояний между ними
combo = stations_data.selectExpr('id as A', 'lat as A_lat', 'long as A_long').join(stations_data.selectExpr('id as B', 'lat as B_lat', 'long as B_long'))
# Выбор только тех комбинаций, где станции не совпадают
dif_combo = combo[combo.A != combo.B]

# Определение функции для вычисления евклидова расстояния между двумя точками на карте
def euclidean_dist(ax, ay, bx, by):
    return sqrt((ax - bx) ** 2 + (ay - by) ** 2)

# Применение функции к каждой комбинации станций для вычисления расстояния между ними
dists = dif_combo.rdd.map(lambda row: (row.A, row.B, euclidean_dist(float(row.A_lat), float(row.A_long), float(row.B_lat), float(row.B_long)) ))

# Выбор станции с максимальным расстоянием между станциями
stations_data = dists.max(lambda row: row[2])

+---+------------------+-------------------+
| id|               lat|               long|
+---+------------------+-------------------+
|  2|         37.329732|-121.90178200000001|
|  3|         37.330698|        -121.888979|
|  4|         37.333988|        -121.894902|
|  5|         37.331415|          -121.8932|
|  6|37.336721000000004|        -121.894074|
+---+------------------+-------------------+
only showing top 5 rows



('16', '60', 0.7058482821754397)

**3. Найти путь велосипеда с максимальным временем пробега через станции.**

In [11]:
# Выбор необходимых столбцов из данных о поездках и фильтрация по идентификатору велосипеда равному 535
filtered_trips = (
    trips.select("id", "bike_id", "start_station_id", "end_station_id")  # Выбор столбцов 'id', 'bike_id', 'start_station_id' и 'end_station_id'
    .filter(F.col("bike_id") == 535)  # Фильтрация по идентификатору велосипеда равному 535
    .orderBy(F.col("id").cast(t.IntegerType()))  # Сортировка по идентификатору поездки в порядке возрастания
)

# Вывод первых filtered_trips.count() строк, чтобы показать результаты
filtered_trips.show(filtered_trips.count())

+------+-------+----------------+--------------+
|    id|bike_id|start_station_id|end_station_id|
+------+-------+----------------+--------------+
|  4966|    535|              47|            70|
|  5067|    535|              70|            69|
|  5179|    535|              69|            77|
|  5199|    535|              77|            64|
|  7806|    535|              61|            42|
| 11422|    535|              58|            72|
| 12245|    535|              72|            47|
| 12485|    535|              47|            60|
| 12558|    535|              60|            46|
| 13107|    535|              46|            77|
| 13423|    535|              77|            77|
| 14380|    535|              77|            62|
| 14581|    535|              62|            61|
| 15231|    535|              55|            61|
| 15242|    535|              61|            60|
| 15347|    535|              60|            41|
| 15605|    535|              41|            50|
| 15611|    535|    

**4. Найти количество велосипедов в системе**

In [12]:
# Вычисление общего количества велосипедов, для которых была найдена максимальная продолжительность поездки
count_bikes = bike_max_trip_duration.count()

count_bikes

700

**5. Найти пользователей потративших на поездки более 3 часов.**

In [13]:
# Группировка данных о поездках по zip_code и вычисление максимальной продолжительности поездки
output_filtered = (
    trips
    .groupBy('zip_code')  # Группировка
    .agg(
        F.max(F.col("duration").cast(t.IntegerType())).alias("duration")  # Вычисление максимальной продолжительности поездки
    )
)

# Фильтрация результата по продолжительности поездки, оставляя только те записи, у которых продолжительность больше или равна 10800 секунд (3 часа)
output_filtered = output_filtered.filter(F.col("duration") >= 10800)

output_filtered.show()

+--------+--------+
|zip_code|duration|
+--------+--------+
|   94102|  464952|
|   95134|   82487|
|   84606|   14575|
|   80305|   74749|
|   60070|   26540|
|   91910|   20243|
|    2136|   16010|
|   11722|   12173|
|   29454|   14911|
|   94610|   76287|
|   94404|   63504|
|   80301|   36931|
|   94309|   18484|
|   97239|  193241|
|   94592|   26999|
|    7650|   20150|
|   92374|   17156|
|    2464|   27997|
|   11106|   13773|
|   93013|   25116|
+--------+--------+
only showing top 20 rows

