Импорт библиотек, подготовка к работе  данными

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, sum as spark_sum, countDistinct, max as spark_max, udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import radians, cos, sin, sqrt, atan2
from geopy.distance import geodesic


Найти велосипед с максимальным временем пробега

In [None]:
# функция для запуска сессии Spark
spark = SparkSession.builder \
    .appName("lab1_bd") \
    .getOrCreate()

# чтение из файла trip.csv и создание DataFrame 
trip_df = spark.read.option("header", "true").csv("trip.csv")

# добавление в DataFrame столбцов из trip.csv
trips_with_time = trip_df.withColumn(
    "start_timestamp", unix_timestamp(col("start_date"), "M/d/yyyy H:mm")
).withColumn(
    "end_timestamp", unix_timestamp(col("end_date"), "M/d/yyyy H:mm")
).withColumn(
    "duration_minutes", (col("end_timestamp") - col("start_timestamp")) / 60
)

# группировка данных по bike_id и вычисление общей длительности поездок
bike_with_max_time = trips_with_time.groupBy("bike_id") \
    .agg(
        # суммируем в total_minutes duration_minutes каждого велосипеда
        spark_sum("duration_minutes").alias("total_minutes")
    ) \
    .orderBy(
        # сортировка по total_minutes в порядке убывания
        col("total_minutes").desc()
    # ограничение выборки одним элементом (с максимальным значением total_minutes)
    ) \
    .limit(1)  

bike_with_max_time.show()



+-------+-------------+
|bike_id|total_minutes|
+-------+-------------+
|    378|     631684.0|
+-------+-------------+



Найти наибольшее геодезическое расстояние между станциями

In [None]:
# чтение из файла station.csv и создание DataFrame 
station_df = spark.read.option("header", "true").csv("station.csv")

# приведение столбцов lat и long к double
stations_df = station_df.withColumn("lat", col("lat").cast("double")) \
                         .withColumn("long", col("long").cast("double"))

# извлечение столбцов id, lat, long
stations_list = stations_df.select("id", "lat", "long").collect()

max_distance = 0

for i in range(len(stations_list)):
    for j in range(i + 1, len(stations_list)):
        # вычисление расстояния между парами станций
        station1, station2 = stations_list[i], stations_list[j]
        distance = geodesic((station1["lat"], station1["long"]), (station2["lat"], station2["long"])).km
        max_distance = max(max_distance, distance)

max_distance


69.92096757764355

Найти путь велосипеда с максимальным временем пробега через станции

In [None]:
# получение id велосипеда с наибольшим временем
bike_with_max_time_id = bike_with_max_time.collect()[0]["bike_id"]

# сортировка поездок велосипеда с наибольшим временем по start_date
bike_trips = trips_with_time.filter(col("bike_id") == bike_with_max_time_id) \
    .select("start_date", "start_station_name", "end_station_name") \
    .orderBy("start_date")

bike_trips.show()

+---------------+--------------------+--------------------+
|     start_date|  start_station_name|    end_station_name|
+---------------+--------------------+--------------------+
|1/10/2014 13:15|Commercial at Mon...|       Market at 4th|
|1/10/2014 18:40|       Market at 4th|      Market at 10th|
|1/10/2014 22:24|       Market at 4th|Harry Bridges Pla...|
|1/10/2014 22:39|Harry Bridges Pla...|Embarcadero at Sa...|
|1/10/2014 23:13|Embarcadero at Sa...|       Market at 4th|
| 1/10/2014 7:38|San Francisco Cal...|South Van Ness at...|
| 1/10/2014 9:44|South Van Ness at...|Mechanics Plaza (...|
|1/11/2014 13:17|       Market at 4th|      Market at 10th|
| 1/11/2015 9:47|Temporary Transba...|       Market at 4th|
| 1/11/2015 9:47|Temporary Transba...|       Market at 4th|
|1/12/2014 17:04|      Market at 10th|       Market at 4th|
|1/12/2015 13:37|       2nd at Folsom|   2nd at South Park|
|1/12/2015 13:37|       2nd at Folsom|   2nd at South Park|
|1/12/2015 13:59|   2nd at South Park|  

Найти количество велосипедов в системе

In [None]:
# подсчет уникальных id
distinct_bike_count = trip_df.select("bike_id").distinct().count()

distinct_bike_count

730

Найти пользователей потративших на поездки более 3 часов

In [None]:
# группировка пользователей по атрибуту zip_code
users = trips_with_time.groupBy("zip_code")

# суммарные траты на поездки пользователей
user_time = users.agg(spark_sum("duration_minutes").alias("total_minutes"))

# выборка пользователей, у которых набралось больше 180 минут
users_over_3h = user_time.filter(col("total_minutes") > 180).show()

+--------+-------------+
|zip_code|total_minutes|
+--------+-------------+
|   94102|     549054.0|
|   95134|      22851.0|
|   84606|       2946.0|
|   80305|       5612.0|
|   28117|        236.0|
|   95519|        855.0|
|   91910|       1343.0|
|   60070|        483.0|
|   43085|        324.0|
|   77339|        349.0|
|   48063|        249.0|
|   85022|        418.0|
|   90022|        218.0|
|    1090|        680.0|
|   75602|        202.0|
|    2136|        532.0|
|   11722|        810.0|
|   11563|        676.0|
|   95138|       3891.0|
|   94610|     107376.0|
+--------+-------------+
only showing top 20 rows

