In [1]:
#Подключаемся к диску
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark==3.0.1 py4j==0.10.9

Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 34 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 64.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=33aafb3087bf492a746e33c0ebe4b8e77394e974e2abc03d3bf7df951fb50ae7
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [3]:
#Импортируем библиотеки
from math import *
from typing import NamedTuple
from datetime import datetime

import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
from functools import reduce


In [4]:
#Создаём сессию
sconf = SparkConf().setAppName("L1_interactive_bike_analysis").setMaster("local[*]")

In [5]:
sc = SparkContext(conf=sconf)

In [6]:
sc

In [7]:
# Импортируем файлы
trip = sc.textFile("/content/drive/MyDrive/trip.csv")
status = sc.textFile("/content/drive/MyDrive/status.csv")
station = sc.textFile("/content/drive/MyDrive/station.csv")
weather = sc.textFile("/content/drive/MyDrive/weather.csv")

In [8]:
# Выделяем заголовки

header_trip = trip.first()
header_status = status.first()
header_station = station.first()
header_weather = weather.first()

In [9]:
# Убираем заголовки
trip_no_header = trip.filter(lambda row: row != header_trip)
status_no_header = status.filter(lambda row: row != header_status)
station_no_header = station.filter(lambda row: row != header_station)
weather_no_header = weather.filter(lambda row: row != header_weather)

In [10]:
#Сохраняем заголовки в виде ячеек
trip_parameters = list(enumerate(header_trip.split(",")))
status_parameters = list(enumerate(header_status.split(",")))
station_parameters = list(enumerate(header_station.split(",")))
weather_parameters = list(enumerate(header_weather.split(",")))


In [11]:
#Разбиение данных на ячейки
trip_cells = trip_no_header.map(lambda x: x.split(','))
status_cells = status_no_header.map(lambda x: x.split(','))
station_cells = station_no_header.map(lambda x: x.split(','))
weather_cells = weather_no_header.map(lambda x: x.split(','))

In [12]:
# На основании сохранённых параметров, задаём модели для trip и station
print(trip_parameters)

def initModelTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass

[(0, 'id'), (1, 'duration'), (2, 'start_date'), (3, 'start_station_name'), (4, 'start_station_id'), (5, 'end_date'), (6, 'end_station_name'), (7, 'end_station_id'), (8, 'bike_id'), (9, 'subscription_type'), (10, 'zip_code')]


In [13]:
print(station_parameters)

def initModelStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

[(0, 'id'), (1, 'name'), (2, 'lat'), (3, 'long'), (4, 'dock_count'), (5, 'city'), (6, 'installation_date')]


In [16]:
#Применим получившиеся модели
tripsProcessedByModel = trip_cells.mapPartitions(initModelTrip)
tripsProcessedByModel.take(10)

[Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127'),
 Trip(trip_id=4607, duration=70, start_date=datetime.datetime(2013, 8, 29, 14, 42), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 14, 43), end_station_name='San Jose City Hall', end_station_id='10', bike_id=661, subscription_type='Subscriber', zip_code='95138'),
 Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214'),
 Trip(trip_id=4251, du

In [17]:
stationsProcessedByModel = station_cells.mapPartitions(initModelStation)
stationsProcessedByModel.take(25)

[Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0)),
 Station(station_id=3, name='San Jose Civic Center', lat=37.330698, long=-121.888979, dockcount=15, landmark='San Jose', installation=datetime.datetime(2013, 8, 5, 0, 0)),
 Station(station_id=4, name='Santa Clara at Almaden', lat=37.333988, long=-121.894902, dockcount=11, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0)),
 Station(station_id=5, name='Adobe on Almaden', lat=37.331415, long=-121.8932, dockcount=19, landmark='San Jose', installation=datetime.datetime(2013, 8, 5, 0, 0)),
 Station(station_id=6, name='San Pedro Square', lat=37.336721000000004, long=-121.894074, dockcount=15, landmark='San Jose', installation=datetime.datetime(2013, 8, 7, 0, 0)),
 Station(station_id=7, name='Paseo de San Antonio', lat=37.333798, long=-121.88694299999999, dockcount=15, landmark='San J

In [18]:
#Теперь перейдём к выполнению заданий
#Задание №1
#Делим данные на пары ключ-значение
tripMapWithBicycle = tripsProcessedByModel.keyBy(lambda x: x.bike_id)


#Сгруппируем данные по продолжительности
durationBicycleMap = tripMapWithBicycle.mapValues(lambda x: x.duration).reduceByKey(lambda x1, x2: x1 + x2)

In [19]:
#Задание 1: Ответ: Велосипед с максимальным временем робега, это:
highest_mileage = durationBicycleMap.top(1, key=lambda x: x[1])[0][0]

print(highest_mileage)

535


In [20]:
#Задание 2: Найти наибольшее геодезическое расстояние между станциями.
#Задаём метод отсчёта геодезического расстояния
def distance_between_stations(gLatitudeFirstStation, gLongitudeFirstStation, gLatitudeSecondStation, gLongitudeSecondStation):
  earth_radius = 6371.0088
  gLatitudeFirstStation, gLongitudeFirstStation = radians(gLatitudeFirstStation), radians(gLongitudeFirstStation)
  gLatitudeSecondStation, gLongitudeSecondStation = radians(gLatitudeSecondStation), radians(gLongitudeSecondStation)

  differenceLatitude = gLatitudeSecondStation - gLatitudeFirstStation
  differenceLongitude = gLongitudeSecondStation - gLongitudeFirstStation
  
  haversinusLatitutde = (sin(differenceLatitude / 2)) ** 2
  haversinusLongitude = (sin(differenceLongitude / 2)) ** 2
  
  return 2 * earth_radius * sqrt(haversinusLatitutde + cos(gLatitudeFirstStation)\
                                 * cos(gLatitudeSecondStation) * haversinusLongitude)

In [21]:
#Вычислим расстояние между станциями
map_distance = stationsProcessedByModel.cartesian(stationsProcessedByModel)\
                         .filter(lambda x: x[0].station_id != x[1].station_id)\
                         .map(lambda x: [x[0], x[1], distance_between_stations(x[0].lat, x[0].long, x[1].lat, x[1].long)])\
                         .keyBy(lambda x: (x[0].name, x[1].name))\
                         .reduce(lambda x1, x2: x1 if x1[1] > x2[1] else x2)

In [22]:
#Задание 2: Ответ: Максимальное расстояние между станциями 
way_between_stantion = map_distance[0]
longest_distance = map_distance[1][2]

print(f"Расстояние между станциями: {way_between_stantion}, составляет {longest_distance} и является максимальным.")

Расстояние между станциями: ('Ryland Park', 'Mezes Park'), составляет 34.317914350160784 и является максимальным.


In [23]:
#Задание 3: Найти путь велосипеда с максимальным временем пробега через станции
way_of_bicycle = tripsProcessedByModel.filter(lambda x: x.bike_id == highest_mileage)\
.sortBy(lambda x: x.start_date).map(lambda x: (x.start_station_name, x.end_station_name))
# Ответ:
way_of_bicycle.take(10)

[('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)'),
 ('San Francisco Caltrain (Townsend at 4th)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('San Francisco Caltrain 2 (330 Townsend)', 'Market at Sansome'),
 ('Market at Sansome', '2nd at South Park'),
 ('2nd at Townsend', 'Davis at Jackson'),
 ('San Francisco City Hall', 'Civic Center BART (7th at Market)'),
 ('Civic Center BART (7th at Market)', 'Post at Kearney'),
 ('Post at Kearney', 'Embarcadero at Sansome'),
 ('Embarcadero at Sansome', 'Washington at Kearney'),
 ('Washington at Kearney', 'Market at Sansome')]

In [24]:
#Задание 4: Найти количество велосипедов в системе
count_bicycle = tripsProcessedByModel.map(lambda x: x.bike_id).distinct().count()
# Ответ:
print(count_bicycle)

700


In [27]:
#Задание 5. Найти пользователей потративших на поездки более 3 часов
three_hours = 3*60*60
riders = tripsProcessedByModel.filter(lambda x: x.duration > three_hours)\
.map(lambda x: x.zip_code).filter(lambda x: x != "").distinct()
#Ответ(получился слишком большим, поэтому будут продемонстрированы только 25 записей пользователей):
print(riders.take(25))

['58553', '94301', '94039', '94133', '93726', '94123', '4517', '29200', '45322', '94080', '92808', '5024', '89138', '11515', '28277', '34990', '94803', '92663', '94109', '91801', '8545', '95351', '94063', '90049', '60056']
