In [1]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from pyspark.sql.functions import col

In [2]:
conf = SparkConf().setAppName("Bikes_analysis").setMaster('yarn')

In [3]:
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
tripData = spark.read.option("header", True).option("inferSchema", True).option("timestampFormat", 'M/d/y H:m').csv("trips.csv")
tripData

DataFrame[id: int, duration: int, start_date: timestamp, start_station_name: string, start_station_id: int, end_date: timestamp, end_station_name: string, end_station_id: int, bike_id: int, subscription_type: string, zip_code: string]

In [5]:
stationData = spark.read.option("header", True).option("inferSchema", True).option("timestampFormat", 'M/d/y').csv("stations.csv")

In [6]:
tripData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [7]:
stationData.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



1.Найти велосипед с максимальным временем пробега. (находим пробеги всех велосипедов и сортируем их)

In [8]:
tripData.createOrReplaceTempView("trips")

In [9]:
task1 = tripData.select(col("bike_id"), col("duration")).groupBy(col("bike_id")).sum("duration").sort("sum(duration)", ascending=False).limit(1)\
.show()

+-------+-------------+
|bike_id|sum(duration)|
+-------+-------------+
|    535|     36229902|
+-------+-------------+



2.Найти наибольшее геодезическое расстояние между станциями. (считаем по формуле Хаверсина)

In [10]:
joined_stations = stationData.crossJoin(stationData)
joined_stations.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- installation_date: timestamp (nullable = true)



In [11]:
from math import radians, cos, sin, asin, sqrt

def Haversin_formula(lat1, lon1, lat2, lon2):
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    R_earth = 6371
    return 2 * R_earth * asin(sqrt(sin((lat2 - lat1)/2)**2 + cos(lat1) * cos(lat2) * sin((lon2 - lon1)/2)**2))

In [12]:
task2 = joined_stations.rdd.map(lambda x: Haversin_formula(x[2], x[3], x[9], x[10])).max()

In [13]:
print(f'наибольшее геодезическое расстояние между станциями:{task2}км')

наибольшее геодезическое расстояние между станциями:69.92087595428183км


3.Найти путь велосипеда с максимальным временем пробега через станции. (находим поездку с максимальным временем пробега и считаем путь с помощью формулы Хаверсина)

In [14]:
max_duration = tripData.orderBy(col("duration").desc()).limit(1).collect()
task3 = joined_stations.rdd.filter(lambda l: (l[1] == max_duration[0].start_station_name) and (l[8] == max_duration[0].end_station_name)).map(lambda x: Haversin_formula(x[2], x[3], x[9], x[10]))
print(f'путь велосипеда с максимальным временем пробега через станции.:{task3.collect()} км')

путь велосипеда с максимальным временем пробега через станции.:[2.312047985749405] км


4.Найти количество велосипедов в системе. (находим количества уникальных id велосипедов)

In [15]:
task4 = spark.sql("SELECT count(distinct bike_id) FROM trips")
task4.show()

+-----------------------+
|count(DISTINCT bike_id)|
+-----------------------+
|                    700|
+-----------------------+



5.Найти пользователей потративших на поездки более 3 часов. (не понял, где взять id пользователя, поэтому брал id поездок)

In [16]:
task5 = spark.sql("SELECT id, sum_duration FROM ( SELECT id, sum(duration) as sum_duration FROM trips GROUP BY id) WHERE sum_duration > 10800")
task5.show()
task5.count()

+------+------------+
|    id|sum_duration|
+------+------------+
|843086|       30644|
|831518|       34234|
|797846|       11798|
|796682|       55696|
|781053|       15068|
|755329|       13080|
|744184|       13622|
|730089|       15182|
|721109|       11826|
|712173|       27468|
|702265|       18332|
|701901|       23816|
|697283|       43534|
|692091|       27238|
|681960|      156566|
|645660|       19188|
|645212|       19740|
|627911|       15738|
|597804|       33998|
|589028|       11416|
+------+------------+
only showing top 20 rows



11394

In [17]:
sc.stop()