In [1]:
from pyspark import SparkContext, SparkConf
import os
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

## Инициализация сессии

In [2]:
spark = SparkSession.builder.appName("Lab1_Interactive_bike_analysis").getOrCreate()

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [4]:
spark.version

NameError: name 'spark' is not defined

### Загрузка данных

In [67]:
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join(data_path, "trips.csv")
stations_path = os.path.join(data_path, "stations.csv")

In [68]:
trip_data = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .option("timestampFormat", "M/d/y H:m")
    .csv(trips_path)
)

stations_data = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .option("timestampFormat", "M/d/y H:m")
    .csv(stations_path)
)

print("Trips")
trip_data.printSchema()
print("Stations")
stations_data.printSchema()

Trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



### Задание 1. Найти велосипед с максимальным временем пробега

In [69]:
# Группировка по id велосипеда и подсчет вермени пробега каждого велосипеда
max_trip_bike = trip_data.groupBy("bike_id").agg(
    sum(col("duration")).alias("total_trips_duration")
)


# Выбор велосипеда с максимальным пробегом
bike_with_max_duration = max_trip_bike.orderBy(
    col("total_trips_duration").desc()
).first()

# bike_id
bike_id = bike_with_max_duration["bike_id"]

# Значение пробега
duration = bike_with_max_duration["total_trips_duration"]

print(f"Велосипед: {bike_id} с максимальным пробегом: {duration}")

Велосипед: 535 с максимальным пробегом: 18611693


### Задание 2. Найти наибольшее геодезическое расстояние между станциями

In [70]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType


# Функция для вычисления расстояния между станциями
def distance(lat1, long1, lat2, long2):
    return ((lat1 - lat2) ** 2 + (long1 - long2) ** 2) ** 0.5


# Конвертация функции в pyspark.sql.functions.udf
distance_udf = udf(distance, DoubleType())

# Создание различных комбинаций станций
station_combinations = stations_data.alias("station1").crossJoin(
    stations_data.alias("station2")
)

# Вычилсение расстояния для каждой пары станции
joined_stations = station_combinations.withColumn(
    "distance",
    distance_udf(
        col("station1.lat"),
        col("station1.long"),
        col("station2.lat"),
        col("station2.long"),
    ),
)

# Поиск максимального расстояния
max_distance = joined_stations.agg({"distance": "max"}).collect()[0][0]

print(f"Maximum geodesic distance between stations: {max_distance}")

Maximum geodesic distance between stations: 0.7058482821754397


### 3. Найти путь велосипеда с максимальным временем пробега через станции

In [71]:
# Сортировка по столбцу duration и выбор наиболее длительной поездки
trip_max_distance = (
    trip_data.select("start_station_name", "end_station_name", "duration")
    .orderBy(col("duration").desc())
    .first()
)

# Начальная, конечная станция, а также время поездки
start_location = trip_max_distance["start_station_name"]
end_location = trip_max_distance["end_station_name"]
trip_time = trip_max_distance["duration"]

print(
    f"Максимальное время пробега {trip_time} секунд \n Из станции {start_location} на станцию {end_location}"
)

Максимальное время пробега 17270400 секунд 
 Из станции South Van Ness at Market на станцию 2nd at Folsom


### 4. Найти количество велосипедов в системе

In [72]:
bike_counts = trip_data.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]

print(f"Количество велосипедов в системе: {bike_counts}")

Количество велосипедов в системе: 700


### 5. Найти пользователей потративших на поездки более 3 часов

In [75]:
# Группировка по id велосипеда и подсчет общего времени, проведенного в поездке
users_with_total_trip_time = trip_data.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "total_time")
users_with_total_trip_time.filter("total_time>10800").show()

+-------+----------+
|bike_id|total_time|
+-------+----------+
|    471|   1718831|
|    496|   1679568|
|    148|    332138|
|    463|   1722796|
|    540|   1752835|
|    392|   1789476|
|    623|   2037219|
|    516|   1896751|
|     31|    407907|
|    580|   1034382|
|    137|   1529200|
|    251|   1282980|
|    451|   1695574|
|    458|   1647080|
|     65|    216922|
|    588|    266415|
|    255|    396395|
|     53|    226389|
|    481|   1925143|
|    472|   1620686|
+-------+----------+
only showing top 20 rows

