In [1]:
from pyspark import SparkContext, SparkConf

import pyspark.sql as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct

**Инициализация сесии**

In [2]:
spark = SparkSession \
    .builder \
    .appName("L1_Vakhlaeva_6131") \
    .getOrCreate()

In [3]:
spark.version

'3.1.2.0-eep-800'

**Просмотрим данные велопарковок Сан-Франциско**

In [5]:
trip = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("trip.csv")

In [6]:
trip.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [7]:
station = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("station.csv")

In [8]:
station.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: string (nullable = true)



**1. Найти велосипед с максимальным временем пробега**

In [75]:
max_duration_bike = trip.groupBy("bike_id").agg(max("duration").alias("max_duration")).orderBy(col("max_duration").desc()).first()

print("ID велосипеда с максимальным временем пробега:", max_duration_bike["bike_id"])
print("Максимальное время пробега:", max_duration_bike["max_duration"])

ID велосипеда с максимальным временем пробега: 535
Максимальное время пробега: 17270400


**2. Найти наибольшее геодезическое расстояние между станциями.**

Геодезическое расстояние - это расстояние, измеренное вдоль поверхности Земли, или кратчайшая длина дуги.

Чтобы рассчитать геодезическое расстоние создадим таблицу, в которой можно будет рассматривать все возможные комбинации 

In [31]:
station_joined = station.alias("st1").crossJoin(station.alias("st2"))

In [19]:
R_of_earth = 6371.0 

Расчет расстояния был выполнен, используя следующий источник: https://en.wikipedia.org/wiki/Haversine_formula 

In [44]:
from math import sin, cos, radians, asin 

def geodesic_distance(lat1, lon1, lat2, lon2):
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return None
    rlat1 = radians(lat1)
    rlat2 = radians(lat2)
    rlon1 = radians(lon1)
    rlon2 = radians(lon2)
    
    a = sin((rlat2 - rlat1) / 2)**2 + cos(rlat1) * cos(rlat2) * sin((rlon2 - rlon1) / 2)**2
    S = 2 * asin(min(1, (a ** 0.5)))

    return R_of_earth * S

geodesic_distance_udf = udf(geodesic_distance, DoubleType())

In [46]:
# Вычисление расстояния для каждой пары станций с помощью объявленной ранее функции
station_joined_distance = station_joined.withColumn(
    "geodesic_distance",
    geodesic_distance_udf(
        col("st1.lat"),
        col("st1.long"),
        col("st2.lat"),
        col("st2.long")
    )
)

#Нахождение максимального геодезического расстояния
max_distance = station_joined_distance.orderBy(col("geodesic_distance").desc()).first()
print("Максимальное геодезическое расстояние:", max_distance["geodesic_distance"], 'км. ')

Максимальное геодезическое расстояние: 69.92087595428183 км. 


**3. Найти путь велосипеда с максимальным временем пробега через станции**

In [49]:
#Сначала определим максимальное время пробега и bike_id, который его достиг
max_duration_info = trip.agg({"duration": "max"}).collect()[0]
max_duration = max_duration_info["max(duration)"]

bike_id_ = (
    trip.filter(col("duration") == max_duration)
    .select("bike_id")
    .distinct()
    .collect()[0]["bike_id"]
)

In [51]:
#Поездки для bike_id, у которого максимальное время пробега
max_duration_trips = trip.filter(col("bike_id") == bike_id_)

#Склеивание таблиц по начальным и конечным станциям
joined_max_duration = max_duration_trips.join(station, max_duration_trips["start_station_id"] == station["id"], "inner")

In [57]:
#Теперь считаем путь для bike_id. 
max_duration_path = (
    trip.filter(col("bike_id") == bike_id_)
    .join(station, trip["start_station_id"] == station["id"], "inner")
    .select(
        "start_station_name",
        "start_station_id",
        "end_station_name",
        "end_station_id",
        "duration"
    )
    .orderBy(col("duration").desc())
)

max_duration_path.show(truncate=False)

+----------------------------------------+----------------+----------------------------------------+--------------+--------+
|start_station_name                      |start_station_id|end_station_name                        |end_station_id|duration|
+----------------------------------------+----------------+----------------------------------------+--------------+--------+
|South Van Ness at Market                |66              |2nd at Folsom                           |62            |17270400|
|South Van Ness at Market                |66              |2nd at Folsom                           |62            |17270400|
|Powell Street BART                      |39              |Civic Center BART (7th at Market)       |72            |87638   |
|San Francisco Caltrain (Townsend at 4th)|70              |Steuart at Market                       |74            |33659   |
|San Francisco Caltrain (Townsend at 4th)|70              |Steuart at Market                       |74            |33659   |


Путь велосипеда с максимальным временем пробега через станции начался со станции "South Van Ness at Market" и закончился - "2nd at Folsom"

**4. Найти количество велосипедов в системе**

In [62]:
trip.select("bike_id").distinct().count()

700

**5. Найти пользователей потративших на поездки более 3 часов**

In [73]:
trips_more_3_hours = trip.filter(col("duration") > 180*60)
users_ = trips_more_3_hours.select("subscription_type", "zip_code").distinct()

users_.show(truncate=False)
users_.count()

+-----------------+--------+
|subscription_type|zip_code|
+-----------------+--------+
|Customer         |92629   |
|Customer         |95112   |
|Customer         |32      |
|Customer         |91766   |
|Customer         |94002   |
|Subscriber       |95111   |
|Customer         |94304   |
|Customer         |78050   |
|Customer         |1325    |
|Customer         |78666   |
|Customer         |123543  |
|Customer         |60656   |
|Customer         |5160011 |
|Customer         |8203    |
|Customer         |30542   |
|Subscriber       |94544   |
|Customer         |95032   |
|Customer         |302     |
|Customer         |2145    |
|Customer         |80111   |
+-----------------+--------+
only showing top 20 rows



2215