# Развертывание Spark

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
!tar xf spark-3.5.5-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

!pip install findspark
import findspark
findspark.init()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [5]:
import os
from pyspark import SparkContext, SparkConf
import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

spark = SparkSession.builder.appName('ЛР1_Сидоров_6404').getOrCreate()
trips_path = os.path.join('trips.csv')
stations_path = os.path.join('stations.csv')

trip_data = spark.read.option("header", True).option("inferSchema", True).option("timestampFormat", 'M/d/y H:m').csv(trips_path)
print("Trips")
trip_data.printSchema()

stations_data = spark.read.option("header", True).option("inferSchema", True).option("timestampFormat", 'M/d/y H:m').csv(stations_path)
print("Stations")
stations_data.printSchema()

Trips
root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

Stations
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



# Решите следующие задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):
1. Найти велосипед с максимальным временем пробега.
2. Найти наибольшее геодезическое расстояние между станциями.
3. Найти путь велосипеда с максимальным временем пробега через станции.
4. Найти количество велосипедов в системе.
5. Найти пользователей потративших на поездки более 3 часов.

## Задание 1

In [6]:
# Группируем по ID велосипеда, затем суммируем время пробега по велосипедам
# Сортируем по убыванию суммы и берем первый в этом списке

result = trip_data.groupBy('bike_id').agg(sum(col('duration')).alias('duration_sum')).orderBy(col('duration_sum').desc()).first()
print(f'Велосипед: {result["bike_id"]}. Время пробега: {result["duration_sum"]}')

Велосипед: 535. Время пробега: 18611693


## Задание 2

In [7]:
import math


def geodesic_distance(lat1, lon1, lat2, lon2):
    R = 6371.0 # Радиус Земли (км)
    lat1, lat2 = math.radians(lat1),  math.radians(lat2)

    delta_lon = math.radians(lon2 - lon1)
    delta_lat = lat2 - lat1

    # Вычисление геодезического расстояния по формуле Хаверсина
    a = math.sin(delta_lat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(delta_lon / 2) ** 2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
    distance = R * c
    return distance


# Создаем пользовательскую функцию в Spark
geodesic_distance_udf = udf(geodesic_distance, DoubleType())

# Делаем декартово пересечение
station_pairs = stations_data.alias('station1').crossJoin(stations_data.alias('station2'))

# Вычисляем расстояние между каждой парой станций
station_pairs_with_distance = station_pairs.withColumn(
    'geodesic_distance',
    geodesic_distance_udf(
        col('station1.lat'),
        col('station1.long'),
        col('station2.lat'),
        col('station2.long')
    )
)

# Берем пару с наибольшим расстоянием
max_distance_stations = station_pairs_with_distance.sort('geodesic_distance', ascending=False).first()

# Нужно понять где начинается вторая станция в выборке
pair_step = len(stations_data.columns)
first_station = max_distance_stations[max_distance_stations.index(max_distance_stations['name'])]
second_station = max_distance_stations[max_distance_stations.index(max_distance_stations['name']) + pair_step]

print(f'Максимальное геодезическое расстояние между станциями равно {max_distance_stations["geodesic_distance"]} километрам между станциями {first_station} и {second_station}')


Максимальное геодезическое расстояние между станциями равно 69.92087595428244 километрам между станциями SJSU - San Salvador at 9th и Embarcadero at Sansome


## Задание 3

In [8]:
# Поездка с максимальным временем пробега
longest_trip = trip_data.sort('duration', ascending=False).first()
print(f'Максимальный путь: ({longest_trip["duration"]}). Велосипед: {longest_trip["bike_id"]}. {longest_trip["start_station_name"]} - {longest_trip["end_station_name"]}')

Максимальный путь: (17270400). Велосипед: 535. South Van Ness at Market - 2nd at Folsom


## Задание 4

In [9]:
# countDistinct считает уникальные значения
bikes_count = trip_data.select(countDistinct('bike_id')).first()[0]
print(f'Количество велосипедов в системе: {bikes_count}')

Количество велосипедов в системе: 700


## Задание 5

In [10]:
strong_users = trip_data.groupBy('bike_id').agg(sum(col('duration')).alias('duration_sum')).filter(col('duration_sum') > 3 * 60 * 60).orderBy(col('bike_id'))
strong_users.show()

+-------+------------+
|bike_id|duration_sum|
+-------+------------+
|      9|      913730|
|     10|      551314|
|     11|      315011|
|     12|      757912|
|     13|      949523|
|     14|      399114|
|     15|      831149|
|     16|     1334601|
|     17|      509406|
|     18|      500113|
|     19|      543930|
|     20|      263431|
|     21|      282836|
|     22|      936581|
|     23|      420393|
|     24|      293533|
|     25|      453322|
|     26|      236064|
|     27|      404937|
|     28|      342587|
+-------+------------+
only showing top 20 rows

