In [10]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [35]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [36]:
!pip install findspark
import findspark
findspark.init()



# Инициализация сессии

In [37]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

In [38]:
spark = SparkSession \
    .builder \
    .appName("L1_interactive_bike_analysis") \
    .getOrCreate()

In [39]:
spark.version

'3.1.1'

# Загрузка данных

In [47]:
import os
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join("trips.csv")
stations_path = os.path.join("stations.csv")

In [41]:
trip_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv(trips_path)

print("Trips")
trip_data.printSchema()

stations_data = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv(stations_path)

print("Stations")
stations_data.printSchema()

Trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



# Решите следующие задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):
1. Найти велосипед с максимальным временем пробега.
2. Найти наибольшее геодезическое расстояние между станциями.
3. Найти путь велосипеда с максимальным временем пробега через станции.
4. Найти количество велосипедов в системе.
5. Найти пользователей потративших на поездки более 3 часов.

# 1 задание

Найти велосипед с максимальным временем пробега

In [45]:
# Группировка по id велосипеда и применение функции sum для подсчета времени пробега каждого велосипеда
max_trips_duration_per_bike = trip_data.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))

# Выбор велосипеда с максимальным пробегом
bike_with_max_trips_duration = max_trips_duration_per_bike.orderBy(col("total_trips_duration").desc()).first()

bike_id_with_max_duration = bike_with_max_trips_duration["bike_id"]
total_duration = bike_with_max_trips_duration["total_trips_duration"]

print(f"Велосипед #{bike_id_with_max_duration} с суммарным временем пробега  = {total_duration}")

Велосипед #535 с суммарным временем пробега  = 18611693


# 2 задание

Найти наибольшее геодезическое расстояние между станциями

In [48]:
from math import sin, cos, sqrt, atan2, radians

def geodesic_distance(lat1, lon1, lat2, lon2):
    # Радиус Земли в километрах
    R = 6373.0
    lat1 = radians(lat1)
    lat2 = radians(lat2)
    lon1 = radians(lon1)
    lon2 = radians(lon2)

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    # Вычисление геодезического расстояния по формуле Хаверсина
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

# Конвертация функции в pyspark.sql.functions.udf (user-defined function)
geodesic_distance_udf = udf(geodesic_distance, DoubleType())

# Объединение датасета станций с самим собой для получения всех возможных пар
station_pairs = stations_data.alias("station1").crossJoin(stations_data.alias("station2"))

# Вычисление расстояния для каждой пары станций с помощью объявленной ранее функции
station_pairs_with_distance = station_pairs.withColumn(
    "geodesic_distance",
    geodesic_distance_udf(
        col("station1.lat"),
        col("station1.long"),
        col("station2.lat"),
        col("station2.long")
    )
)

# Поиск максимального геодезического расстояния среди всех расстояний для каждой пары станций
max_distance = station_pairs_with_distance.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]

print(f"Максимальное геодезическое расстояние между станциями равно {max_distance} километрам")

Максимальное геодезическое расстояние между станциями равно 69.9428256877473 километрам


# 3 задание

Найти путь велосипеда с максимальным временем пробега через станции

In [27]:
# Сортировка по столбцу duration и выбор наиболее длительной поездки
trip_with_max_duration = trip_data.select("start_station_name", "end_station_name", "duration").orderBy(col("duration").desc()).first()

# Получение стартовой и конечной станций, а также времени поездки
start_location = trip_with_max_duration["start_station_name"]
end_location = trip_with_max_duration["end_station_name"]
trip_time = trip_with_max_duration["duration"]

print(f"Самая длинная поездка ({trip_time} секунд)  from \"{start_location}\" to \"{end_location}\"")

Самая длинная поездка (17270400 секунд)  from "South Van Ness at Market" to "2nd at Folsom"


# 4 задание

Найти количество велосипедов в системе

In [46]:
# Группировка по id велосипеда и подсчет уникальных значений id
unique_bikes_count = trip_data.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]

print(f"Суммарное количество велосипедов: {unique_bikes_count}")

Суммарное количество велосипедов: 700


# 5 задание

Найти пользователей потративших на поездки более 3 часов

In [29]:
# Группировка по id велосипеда и подсчет общего времени, проведенного в поездке
users_with_total_trip_time = trip_data.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "total_time")
users_with_total_trip_time.filter("total_time>10800").show()

+-------+----------+
|bike_id|total_time|
+-------+----------+
|    471|   1718831|
|    496|   1679568|
|    148|    332138|
|    463|   1722796|
|    540|   1752835|
|    392|   1789476|
|    623|   2037219|
|    243|    307458|
|    516|   1896751|
|     31|    407907|
|    580|   1034382|
|    137|   1529200|
|    251|   1282980|
|    451|   1695574|
|     85|   1214769|
|    458|   1647080|
|     65|    216922|
|    588|    266415|
|    255|    396395|
|     53|    226389|
+-------+----------+
only showing top 20 rows

