In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
!pip install findspark
import findspark
findspark.init()



In [4]:
!pip3 install pyspark==3.0.0



In [5]:
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

In [6]:
# Инициализация сессии
spark = SparkSession \
    .builder \
    .appName("L1_interactive_bike_analysis") \
    .getOrCreate()

In [7]:
# Загружаем данные
import os
data_path = os.path.join(os.curdir, "data")
trips_path = os.path.join("trips.csv")
stations_path = os.path.join("stations.csv")

In [8]:
trips = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(trips_path)

stat = spark.read \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("DateTimeFormat", 'M/d/y H:m') \
        .csv(stations_path)

#Задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):



*   Найти велосипед с максимальным временем пробега.
*   Найти наибольшее геодезическое расстояние между станциями.
*   Найти путь велосипеда с максимальным временем пробега через станции.
*   Найти количество велосипедов в системе.
*   Найти пользователей потративших на поездки более 3 часов.









**Найти велосипед с максимальным временем пробега.**

In [9]:
# Группируем велосипеды по id и считаем время пробега для каждого велосипеда применяя функцию sum
max_trip_duration = trips.groupBy("bike_id").agg(sum(col("duration")).alias("duration"))

# Выбираем велосипед с максимальным пробегом
trips_duration = max_trip_duration.orderBy(col("duration").desc()).first()
bike_id = trips_duration["bike_id"]
duration = trips_duration["duration"]
print(f"Велосипед № {bike_id} с максимальным временем пробега = {duration} километров")

Велосипед № 535 с максимальным временем пробега = 18611693 километров


**Найти наибольшее геодезическое расстояние между станциями.**

In [10]:
from math import sqrt, radians, sin, cos, atan2

In [25]:
def dist(lat1, long1, lat2, long2):
    R = 6373 # радиус Земли
    lat1 = radians(lat1)
    lat2 = radians(lat2)
    long1 = radians(long1)
    long2 = radians(long2)
    # Вычисляем геодезическое расстояние
    dist = R * (2 * atan2(sqrt(sin((lat2 - lat1) / 2)**2 + cos(lat1) * cos(lat2) * sin((long2 - long1) / 2)**2), sqrt(1 - (sin((lat2 - lat1) / 2)**2 + cos(lat1) * cos(lat2) * sin((long2 - long1) / 2)**2))))
    return dist

# в pyspark.sql.functions.udf
geo_dist = udf(dist, DoubleType())
# Объединяем станции с самими собой для получения всевозможных пар
station = stat.alias("station1").crossJoin(stat.alias("station2"))
# Вычисляем расстояние для каждой пары станций
station_dist = station.withColumn("geodesic_distance", geo_dist(col("station1.lat"), col("station1.long"), col("station2.lat"), col("station2.long")))
# Максимальное геодезическое расстояние для каждой пары станций
dist = station_dist.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]
print(f"Наибольшее геодезическое расстояние между станциями = {dist} километров")

Наибольшее геодезическое расстояние между станциями = 69.9428256877473 километров


**Найти путь велосипеда с максимальным временем пробега через станции.**

In [24]:
# Выбираем самую длительную поездку
duration = trips.select("start_station_name", "end_station_name", "duration").orderBy(col("duration").desc()).first()
first_duration = duration["start_station_name"] # начальная станция
second_duration = duration["end_station_name"] # конечная станция
all_time = duration["duration"] # время поездки
print(f"Самый максимальный пробег через станции из пункта \"{first_duration}\" в пункт \"{second_duration}\" занял {all_time} секунд")

Самый максимальный пробег через станции из пункта "South Van Ness at Market" в пункт "2nd at Folsom" занял 17270400 секунд


**Найти количество велосипедов в системе.**

In [22]:
# Группируем по id велосипеда и считаем уникальные значения id
count_bikes = trips.agg(countDistinct("bike_id").alias("bike_count")).collect()[0]["bike_count"]
print(f"Количество велосипедов в системе = {count_bikes} штук")

Количество велосипедов в системе = 700 штук


**Найти пользователей потративших на поездки более 3 часов.**

In [23]:
# Группируем велосипеды по id и считаем общее время поездки
users_total_time = trips.groupBy("bike_id").sum("duration").withColumnRenamed("sum(duration)", "time")
# Устанавливаем фильтр (в секундах) больше 3 часов
users_total_time.filter("time>10800").show()

+-------+-------+
|bike_id|   time|
+-------+-------+
|    471|1718831|
|    496|1679568|
|    148| 332138|
|    463|1722796|
|    540|1752835|
|    392|1789476|
|    623|2037219|
|    243| 307458|
|    516|1896751|
|     31| 407907|
|    580|1034382|
|    137|1529200|
|    251|1282980|
|    451|1695574|
|     85|1214769|
|    458|1647080|
|     65| 216922|
|    588| 266415|
|    255| 396395|
|     53| 226389|
+-------+-------+
only showing top 20 rows

