<a href="https://colab.research.google.com/github/vitalikx100/big_data/blob/master/L1_6402_%D0%A1%D1%82%D0%B0%D1%80%D1%8B%D0%B3%D0%B8%D0%BD%D0%92%D0%90.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [157]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, sqrt, pow, radians, sin, cos, asin, sum as func_sum

spark = SparkSession.builder.appName("L1_Analysis").getOrCreate()

1. Найти велосипед с максимальным временем пробега.

In [158]:
tripData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y H:m')\
.csv("L1_csv_data/trip.csv")

tripData.printSchema()
tripData.show(n=5)

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)

+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|  id|duration|         start_date|  start_station_name|start_station_id|           end_date|    end_station_name|end_station_id|bike_id|subscription_type|zip_code|
+----+--------+-------------------+--------------------+----------------+-------------------+--------------------+--------------+-------+-----------------+--------+
|4576|      

In [159]:
max_duration_trip = (
    tripData.groupBy("bike_id")
    .agg(max(col("duration")).alias("max_duration"))
    .orderBy(col("max_duration").desc())
)

max_duration_trip.show(n=1)

+-------+------------+
|bike_id|max_duration|
+-------+------------+
|    535|    17270400|
+-------+------------+
only showing top 1 row



2. Найти наибольшее геодезическое расстояние между станциями.

In [160]:
stationData = spark.read\
.option("header", True)\
.option("inferSchema", True)\
.option("timestampFormat", 'M/d/y')\
.csv("L1_csv_data/station.csv")

stationData.printSchema()
stationData.show(n=5)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)

+---+--------------------+------------------+-------------------+----------+--------+-------------------+
| id|                name|               lat|               long|dock_count|    city|  installation_date|
+---+--------------------+------------------+-------------------+----------+--------+-------------------+
|  2|San Jose Diridon ...|         37.329732|-121.90178200000001|        27|San Jose|2013-08-06 00:00:00|
|  3|San Jose Civic Ce...|         37.330698|        -121.888979|        15|San Jose|2013-08-05 00:00:00|
|  4|Santa Clara at Al...|         37.333988|        -121.894902|        11|San Jose|2013-08-06 00:00:00|
|  5|    Adobe on Almaden|         37.331415|          -121.8932|        19|San Jose|

In [161]:
#Радиус Земли в километрах
R = 6371.0

#Преобразование координат в радианы
stationData_columns_radians = stationData.withColumn("lat_rad", radians(col("lat"))).withColumn("long_rad", radians(col("long")))

stations_crossJoin = stationData_columns_radians.alias("s1").crossJoin(stationData_columns_radians.alias("s2"))

#Haversine formula
sqrt_expression = pow(sin((col("s2.lat_rad") - col("s1.lat_rad")) / 2), 2) + cos(col("s1.lat_rad")) * cos(col("s2.lat_rad")) * pow(sin((col("s2.long_rad") - col("s1.long_rad")) / 2), 2)
d = 2 * R * asin(sqrt(sqrt_expression))

stations_crossJoin_distance = stations_crossJoin.withColumn("distance", d)

max_distance = stations_crossJoin_distance.select(
    col("s1.name").alias("station_1"),
    col("s2.name").alias("station_2"),
    col("distance")
).orderBy(col("distance").desc())

print("Станции с наибольшим расстоянием:")
max_distance.show(1, truncate=False)

Станции с наибольшим расстоянием:
+--------------------------+----------------------+-----------------+
|station_1                 |station_2             |distance         |
+--------------------------+----------------------+-----------------+
|SJSU - San Salvador at 9th|Embarcadero at Sansome|69.92087595428183|
+--------------------------+----------------------+-----------------+
only showing top 1 row



3. Найти путь велосипеда с максимальным временем пробега через станции.

In [162]:
bike_id_max = max_duration_trip.first()["bike_id"]
print(f"ID велосипеда с максимальным временем пробега: {bike_id_max}")

#Фильтруем поездки только для этого велосипеда и сортируем их по времени начала
longest_bike_trips = (
    tripData.filter(col("bike_id") == bike_id_max)
    .orderBy(col("start_date"))
)

#Выбираем только id начальной и конечной станции
result_df = longest_bike_trips.select("start_station_id", "end_station_id")

result_df.show(n=200)

ID велосипеда с максимальным временем пробега: 535
+----------------+--------------+
|start_station_id|end_station_id|
+----------------+--------------+
|              47|            70|
|              70|            69|
|              69|            77|
|              77|            64|
|              61|            42|
|              58|            72|
|              72|            47|
|              47|            60|
|              60|            46|
|              46|            77|
|              77|            77|
|              77|            62|
|              62|            61|
|              55|            61|
|              61|            60|
|              60|            41|
|              41|            50|
|              50|            41|
|              41|            70|
|              70|            74|
|              74|            61|
|              61|            50|
|              50|            65|
|              65|            70|
|              69|            6

4. Найти количество велосипедов в системе.

In [163]:
print(f"Количество велосипедов в системе: {max_duration_trip.count()}")

Количество велосипедов в системе: 700


5. Найти пользователей потративших на поездки более 3 часов.

In [164]:
output_filtered = (
    tripData
    .groupBy("zip_code")
    .agg(func_sum(col("duration")).alias("total_duration"))
)

output_filtered = output_filtered.filter(col("total_duration") >= (60 * 60 * 3)).orderBy(col("total_duration"))

output_filtered.show(n=100)

+----------+--------------+
|  zip_code|total_duration|
+----------+--------------+
|     48154|         10800|
|      7407|         10807|
|     45750|         10821|
|      1486|         10825|
|     29650|         10833|
|     64152|         10841|
|     91020|         10848|
|     61828|         10851|
|      2332|         10856|
|     80227|         10861|
|     34689|         10873|
|     60126|         10876|
|     36854|         10908|
|     75211|         10912|
|     26101|         10917|
|     96740|         10922|
|     15218|         10926|
|     95938|         10928|
|     23336|         10931|
|       292|         10938|
|     98611|         10946|
|      1609|         10952|
|     20024|         10955|
|      6035|         10964|
|     90742|         10965|
|      2617|         10968|
|      7087|         10968|
|     23510|         10972|
|     21029|         10979|
|     84094|         10981|
|     34108|         10983|
|     93635|         10985|
|      7031|        