In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [2]:
import os
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
import findspark
findspark.init()

In [5]:
!pip3 install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044159 sha256=9df1a1828b3f51b4a4230edc122d117554a7bb1f68a5c4815e7c6416ef8838a7
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0

<h1><center>Инициализация</center></h1>

In [6]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

In [7]:
spark = SparkSession \
    .builder \
    .appName("L1_interactive_bike_analysis") \
    .getOrCreate()

<h1><center>Загрузка данных</center></h1>

In [8]:
import os
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join("trips.csv")
stations_path = os.path.join("stations.csv")

In [9]:
trips = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(trips_path)

stat = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(stations_path)

<a id='Задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):'></a>
## Задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):
><li>Найти велосипед с максимальным временем пробега.
><li>Найти наибольшее геодезическое расстояние между станциями.
><li>Найти путь велосипеда с максимальным временем пробега через станции.
><li>Найти количество велосипедов в системе.
><li>Найти пользователей потративших на поездки более 3 часов.

<h1><center>Найти велосипед с максимальным временем пробега</center></h1>

In [20]:
# Группируем данные по bike_id и считаем общую длительность поездок
trip_duration = trips.groupBy("bike_id").agg(sum(col("duration")).alias("duration"))
# Проверяем, что trip_duration не пустой
if trip_duration.count() > 0:
    # Получаем запись с максимальной длительностью поездки
    trips_duration = trip_duration.orderBy(col("duration").desc()).first()
    bike_id = trips_duration["bike_id"]
    duration = trips_duration["duration"]
    print(f"Велосипед {bike_id} с максимальным временем пробега {duration}")
else:
    print("Нет данных о поездках.")

Велосипед 593 с максимальным временем пробега 1074282


<h1><center>Найти наибольшее геодезическое расстояние между станциями</center></h1>

In [28]:
from math import sqrt, radians, sin, cos, atan2

# Определение функции для расчета геодезического расстояния между двумя точками
def distance(lat1, lon1, lat2, lon2):
    # Радиус Земли в км
    R = 6373

    # Преобразование градусов в радианы
    lat1_rad = radians(lat1)
    lon1_rad = radians(lon1)
    lat2_rad = radians(lat2)
    lon2_rad = radians(lon2)

    # Разница координат
    dlon = lon2_rad - lon1_rad
    dlat = lat2_rad - lat1_rad

    # Формула Гаверсинуса для расчета расстояния
    a = sin(dlat / 2)**2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c

    return distance

g_dist = udf(distance, DoubleType())

station = stat.alias("station1").crossJoin(stat.alias("station2"))

# Вычисление геодезических расстояний между всеми парами станций
station_distance = station.withColumn("geodesic_distance", g_dist(col("station1.lat"), col("station1.long"), col("station2.lat"), col("station2.long")))

# Нахождение максимального геодезического расстояния
dist = station_distance.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]
print(f"Наибольшее геодезическое расстояние между станциями {dist}")


Самая длинная поездка из "University and Emerson" в "University and Emerson" заняла 722236 секунд


<h1>Найти путь велосипеда с максимальным временем пробега через станции</h1>

In [25]:
# Сортируем поездки по длительности и выбираем наиболее длительную
longest_trip = trips.select("start_station_name", "end_station_name", "duration").orderBy(col("duration").desc()).first()

start_station = longest_trip["start_station_name"]
end_station = longest_trip["end_station_name"]
trip_duration = longest_trip["duration"]

print(f"Путь из \"{start_station}\" в \"{end_station}\" занял максимальное время пробега {trip_duration} секунд")


Путь из "University and Emerson" в "University and Emerson" занял максимальное время пробега 722236 секунд


<h1><center>Найти количество велосипедов в системе</center></h1>

In [14]:
# Группировка по идентификатору велосипеда и подсчет уникальных значений идентификатора
count = trips.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]
print(f"Количество велосипедов в системе: {count}")

Количество велосипедов в системе: 698


<h1><center>Найти пользователей потративших на поездки более 3 часов</center></h1>

In [27]:
# Группируем поездки по идентификатору велосипеда и считаем общее время поездок
bike_trips = trips.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "total_duration")

# Фильтруем результаты, чтобы найти велосипеды с общим временем поездок более 10800 секунд (3 часа)
long_trips = bike_trips.filter("total_duration > 10800")

# Выводим информацию о велосипедах с длительными поездками
long_trips.show()

+-------+--------------+
|bike_id|total_duration|
+-------+--------------+
|    471|        413707|
|    496|        509079|
|    148|        196805|
|    463|        453005|
|    540|        631175|
|    392|        532104|
|    623|        706208|
|    243|        131043|
|    516|        532087|
|     31|         73874|
|    580|        627818|
|    137|        260436|
|    251|        127062|
|    451|        528162|
|     85|        109338|
|    458|        577151|
|     65|         57529|
|    588|        266415|
|    255|        107686|
|     53|         58229|
+-------+--------------+
only showing top 20 rows

