In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [46]:
import findspark
findspark.init()

In [47]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct, desc

Инициализация сессии

In [48]:
spark = SparkSession \
    .builder \
    .appName("L1_BD_Spark") \
    .getOrCreate()

In [49]:
spark.version

'3.5.1'

Загрузка данных

In [50]:
tripData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("trips.csv")

tripData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [51]:
tripData.show(n=5)

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|           NULL|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    NULL|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [52]:
stationData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("stations.csv")

stationData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



In [53]:
stationData.show(n=5)

+---+--------------------+------------------+-------------------+----------+--------+-------------------+
| id|                name|               lat|               long|dock_count|    city|  installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-------------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|2013-08-06 00:00:00|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|2013-08-05 00:00:00|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|2013-08-06 00:00:00|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|2013-08-05 00:00:00|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|2013-08-07 00:00:00|
+---+--------------------+------------------+-------------------+----------+--------+-------------------+
only showing top 5 rows



# Решите следующие задачи для данных велопарковок Сан-Франциско

## 1. Найти велосипед с максимальным временем пробега.

In [54]:
# сгруппируем по bike_id, суммируем по времени пробега, сортируем по убыванию полученного пробега
# берем 1 значение из полученного списка
bike_max = tripData.groupBy("bike_id").agg({"duration": "sum"}).sort("sum(duration)", ascending=False).first()

print(f"Велосипед { bike_max['bike_id'] } имеет самое большое время пробега = {bike_max['sum(duration)']}")


Велосипед 535 имеет самое большое время пробега = 18611693


## 2. Найти наибольшее геодезическое расстояние между станциями.

In [55]:
!pip install haversine



In [56]:
# для вычисления расстояния воспользуемся формулой гаверсинуса (Википедия)
from haversine import haversine

# используем user-defined function и найдем расстояние между 2 точками
distance_udf = udf(lambda lat1, lon1, lat2, lon2: haversine((lat1, lon1), (lat2, lon2)), returnType=DoubleType())

# объединим самого с собой для получения всех пар точек
station_pairs = stationData.alias("station1").crossJoin(stationData.alias("station2"))

# вычисление расстояния для каждой пары станций с помощью distance_udf (внутри нее ф-ла гаверсинуса)
station_pairs_dist = station_pairs.withColumn(
    "geodesic_distance",
    distance_udf(
        col("station1.lat"),
        col("station1.long"),
        col("station2.lat"),
        col("station2.long")
    )
)

# поиск максимального расстояния среди всех расстояний для каждой пары станций
max_distance = station_pairs_dist.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]

print(f"Максимальное расстояние: {max_distance} км")

Максимальное расстояние: 69.92097253310907 км


## 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [57]:
# сортируем по пробегу и выбираем первый из списка
longest_trip = tripData.sort("duration", ascending=False).first()

print(f"Максимальный путь из {longest_trip['start_station_name']} в {longest_trip['end_station_name']} \
проехал велосипед {longest_trip['bike_id']}. Дистанция = {longest_trip['duration']}")

Максимальный путь из South Van Ness at Market в 2nd at Folsom проехал велосипед 535. Дистанция = 17270400


## 4. Найти количество велосипедов в системе.

In [58]:
# считаем уникальные значения и выводим
number_of_bikes = tripData.agg(countDistinct("bike_id")).first()[0]
print(f"Всего велосипедов в системе - {number_of_bikes}")

Всего велосипедов в системе - 700


## 5. Найти пользователей потративших на поездки более 3 часов.

In [59]:
# группируем по пользователям, вычисляем общее время поездок, применяем фильтр на больше 3х часов (10800 секунд)
users_over_3h = (tripData
    .groupBy("zip_code")
    .agg(sum("duration").alias("total_time"))
    .filter("total_time > 3*60*60")
    )

users_over_3h.show()

+--------+----------+
|zip_code|total_time|
+--------+----------+
|   94102|  19128021|
|   95134|    728023|
|   84606|     95145|
|   80305|    180906|
|   60070|     28919|
|   95519|     30303|
|   43085|     11670|
|   91910|     50488|
|   77339|     13713|
|   48063|     13755|
|   85022|     12682|
|    1090|     20391|
|    2136|     16010|
|   11722|     24331|
|   95138|    155295|
|   94610|   3630628|
|   94404|   3589350|
|   80301|    152189|
|   91326|     65885|
|   90742|     10965|
+--------+----------+
only showing top 20 rows

