In [None]:
!pip3 install pyspark==3.0.0
!pip install geopy

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from geopy.distance import geodesic
from pyspark.sql.functions import countDistinct
import pyspark.sql as sql
from pyspark.sql.functions import col

In [None]:
try:
    sc = SparkContext.getOrCreate()
    sc.setLogLevel("ERROR")
except:
    conf = SparkConf().setAppName("Lab1").setMaster('local[1]')
    sc = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
spark = SparkSession(sc)

sc

In [None]:
import requests

with open("stations.csv", "wb") as f:
    request = requests.get("https://git.ai.ssau.ru/tk/big_data/raw/branch/bachelor/data/stations.csv")
    f.write(request.content)

with open("trips.csv", "wb") as f:
    request = requests.get("https://git.ai.ssau.ru/tk/big_data/raw/branch/bachelor/data/trips.csv")
    f.write(request.content)

In [None]:
tripData = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("timestampFormat", 'M/d/y H:m')
    .csv("trips.csv")
)

print(tripData)

DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]


In [None]:
stationData = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("timestampFormat", 'M/d/y')
    .csv("stations.csv")
)

print(stationData)

DataFrame[id: int, name: string, lat: double, long: double, dock_count: int, city: string, installation_date: timestamp]


Задание №1: Найти велосипед с максимальным временем пробега

In [None]:
result = (
    tripData
    .groupBy("bike_id")
    .agg(F.sum("duration").alias("total_duration"))
    .orderBy(F.desc("total_duration"))
    .first()
)

print(result)

Row(bike_id=535, total_duration=18611693)


Задание 2: Найти наибольшее геодезическое расстояние между станциями.

In [None]:
stations_coordinates = (
    stationData
    .select("id", "lat", "long")
    .withColumnRenamed("lat", "lat1")
    .withColumnRenamed("long", "long1")
).createOrReplaceTempView("stations_coordinates")


station_combinations = (
    stations_coordinates.alias("a")
    .crossJoin(stations_coordinates.alias("b"))
    .where(col("a.id") < col("b.id"))
    .select(col("a.id").alias("station1"), col("b.id").alias("station2"), "a.lat1", "a.long1", col("b.lat1").alias("lat2"), col("b.long1").alias("long2"))
)

def calculate_distance(lat1, lon1, lat2, lon2):
    return geodesic((lat1, lon1), (lat2, lon2)).kilometers

calculate_distance_udf = spark.udf.register("calculate_distance", calculate_distance)

result = station_combinations.withColumn("distance", calculate_distance_udf("lat1", "long1", "lat2", "long2"))

max_distance = result.select("station1", "station2", "distance").orderBy(col("distance").desc()).first()

max_distance

Row(station1=24, station2=36, distance='9.669526104642657')

In [None]:
max_duration_trip = (
    tripData
    .orderBy(col("duration").desc())
    .limit(1)
    .select("start_station_name", "end_station_name")
    .first()
)

filtered_joined_station = (
    stationData
    .filter((col("name") == max_duration_trip.start_station_name) | (col("name") == max_duration_trip.end_station_name))
)

result = (
    filtered_joined_station
    .crossJoin(filtered_joined_station.select(col("name").alias("end_station_name"), col("lat").alias("end_lat"), col("long").alias("end_long")))
    .withColumn("distance", calculate_distance_udf(col("lat"), col("long"), col("end_lat"), col("end_long")))
    .select("name", "end_station_name", "distance")
    .filter((col("name") != col("end_station_name")) & (col("distance") != 0))
)

result.show()

+--------------------+--------------------+------------------+
|                name|    end_station_name|          distance|
+--------------------+--------------------+------------------+
|       2nd at Folsom|South Van Ness at...|2.3150845505323323|
|South Van Ness at...|       2nd at Folsom|2.3150845505323323|
+--------------------+--------------------+------------------+



Задание 4. Найти количество велосипедов в системе.

In [None]:
bike_count = (
    tripData
    .select(countDistinct("bike_id"))
    .first()
)

bike_count

Row(count(DISTINCT bike_id)=700)

Задание 5. Найти пользователей потративших на поездки более 3 часов.

In [None]:
from pyspark.sql.functions import sum as F_sum

grouped_data = (
    tripData
    .groupBy("id")
    .agg(F_sum("duration").alias("sum_duration"))
    .withColumnRenamed("sum_duration", "sum_duration")
)

filtered_data = grouped_data.filter(col("sum_duration") > 10800)

filtered_data.show()

+------+------------+
|    id|sum_duration|
+------+------------+
|  6654|       17751|
| 22097|       21686|
| 22223|       15619|
| 30654|       13479|
| 34759|       17959|
| 43688|       22504|
| 88666|       21964|
| 88674|       13726|
|105536|       19854|
|143153|       20649|
|146988|       44084|
|189310|       21785|
|431881|       28377|
|431018|       12301|
|427387|       12612|
|418759|       15526|
|418461|       15103|
|410754|       16743|
|386707|       14313|
|305619|       12412|
+------+------------+
only showing top 20 rows



In [None]:
sc.stop()