Spark лабораторные могут выполняться в Google Colab, наподобие того, как это сделано здесь https://colab.research.google.com/drive/1G894WS7ltIUTusWWmsCnF_zQhQqZCDOc.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark

findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf, col, max, sum, countDistinct
from math import sin, cos, sqrt, atan2, radians

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

Загрузка и просмотр данных

In [5]:
trips_df = spark.read.csv('trips.csv', header=True, sep=",")
trips_df.show(5)
trips_df.printSchema()

+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|     start_date|  start_station_name|start_station_id|       end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+---------------+--------------------+----------------+---------------+--------------------+--------------+-------+-----------------+--------+
|4576|      63|           null|South Van Ness at...|              66|8/29/2013 14:14|South Van Ness at...|            66|    520|       Subscriber|   94127|
|4607|    null|8/29/2013 14:42|  San Jose City Hall|              10|8/29/2013 14:43|  San Jose City Hall|            10|    661|       Subscriber|   95138|
|4130|      71|8/29/2013 10:16|Mountain View Cit...|              27|8/29/2013 10:17|Mountain View Cit...|            27|     48|       Subscriber|   97214|
|4251|      77|8/29/2013 11:29|  San Jose City Hall|      

In [4]:
stations_df = spark.read.csv('stations.csv', header=True, sep=",")
stations_df.show(5)
stations_df.printSchema()

+---+--------------------+------------------+-------------------+----------+--------+-----------------+
| id|                name|               lat|               long|dock_count|    city|installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|         8/6/2013|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|         8/5/2013|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|         8/6/2013|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|         8/5/2013|
|  6|    San Pedro Square|37.336721000000004|        -121.894074|        15|San Jose|         8/7/2013|
+---+--------------------+------------------+-------------------+----------+--------+-----------------+
only showing top 5 rows

root
 |-- id: string (nullable = true)


**Решите следующие задачи для данных велопарковок Сан-Франциско (trips.csv, stations.csv):**

1. Найти велосипед с максимальным временем пробега.
2. Найти наибольшее геодезическое расстояние между станциями.
3. Найти путь велосипеда с максимальным временем пробега через станции.
4. Найти количество велосипедов в системе.
5. Найти пользователей потративших на поездки более 3 часов.



In [6]:
# Найти велосипед с максимальным временем пробега.
from pyspark.sql import functions as F

#для каждого велосипеда считается суммарное время пробега
total_trips_bike = trips_df.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))
#велосипед с максимальным пробегом
bike_max_trips = total_trips_bike.orderBy(F.desc("total_trips_duration")).first()

#получение id велосипеда и значение пробега
total_id = bike_max_trips["bike_id"]
total_duration = bike_max_trips["total_trips_duration"]

print("Bike", total_id, "has total duration ", total_duration)

Bike 535 has total duration  18611693.0


In [16]:
#Найти наибольшее геодезическое расстояние между станциями.

#функция вычисления геодезического расстояния
def geodesic_distance(lat1, lon1, lat2, lon2):
    R = 6373.0
    lat1 = radians(float(lat1))
    lat2 = radians(float(lat2))
    lon1 = radians(float(lon1))
    lon2 = radians(float(lon2))
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = R * c
    return distance

#конвертация в pyspark.sql.functions.udf
geodesic_distance_udf = udf(geodesic_distance, DoubleType())

#получение всех возможных пар станций
station_pairs = stations_df.alias("station_1").crossJoin(stations_df.alias("station_2"))

#вычисление расстояния для каждой пары станций
station_pairs_with_distance = station_pairs.withColumn(
    "geodesic_distance",
    geodesic_distance_udf(col("station_1.lat"), col("station_1.long"), col("station_2.lat"), col("station_2.long"))
)

#поиск максимального геодезического расстояния среди всех пар станций
max_distance = station_pairs_with_distance.selectExpr("max(geodesic_distance) as max_distance").collect()[0]["max_distance"]
print("Максимальное геодезическое расстояние:", max_distance)

Максимальное геодезическое расстояние: 69.9428256877473


In [15]:
#Найти путь велосипеда с максимальным временем пробега через станции.

trip_max_duration = trips_df.select("start_station_name", "end_station_name", "duration").orderBy(F.desc("duration")).first()

start = trip_max_duration["start_station_name"]
end = trip_max_duration["end_station_name"]
time = trip_max_duration["duration"]

print("Самая длинная поездка =", time, "от", start,  "до", end)

Самая длинная поездка = 99993 от St James Park до San Pedro Square


In [8]:
#Найти количество велосипедов в системе.

#подсчет уникальных id
unq_bikes = trips_df.select("bike_id").distinct().count()
print("Всего велосипедов:", unq_bikes)

Всего велосипедов: 700


In [11]:
#Найти пользователей потративших на поездки более 3 часов.

users_total_time = trips_df.groupBy("bike_id").agg(sum(col("duration")).alias("total_trips_duration"))
users_total_time.filter("total_trips_duration>10800").show()

+-------+--------------------+
|bike_id|total_trips_duration|
+-------+--------------------+
|    675|            370599.0|
|    467|            912341.0|
|    296|            486909.0|
|    691|            295372.0|
|    125|            157947.0|
|    451|           1695574.0|
|    666|            167414.0|
|    124|            365040.0|
|    447|           1547611.0|
|    591|           2077782.0|
|     51|            388170.0|
|    574|           1944036.0|
|    613|           2409014.0|
|    307|            365382.0|
|    475|           1165854.0|
|    334|           1896004.0|
|    544|           1633451.0|
|    577|           1637572.0|
|    581|           1819832.0|
|    205|            154952.0|
+-------+--------------------+
only showing top 20 rows

