In [34]:
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession
import pyspark.sql as sql
from datetime import datetime
from typing import NamedTuple
import pyspark.sql.functions as f

In [2]:
conf = SparkConf().setAppName("Lab1_2").setMaster('yarn')
sc = SparkContext(conf=conf)

In [3]:
!hadoop fs -ls # провереряем перенеслись ли файлы

Found 7 items
drwxr-xr-x   - kasparov kasparov          1 2023-12-21 21:42 .sparkStaging
-rwxr-xr-x   3 kasparov kasparov      16216 2023-12-21 01:02 lab1.ipynb
-rwxr-xr-x   3 kasparov kasparov       5647 2023-12-21 01:02 station.csv
-rwxr-xr-x   3 kasparov kasparov   80208848 2023-12-21 01:02 trip.csv
drwxr-xr-x   - kasparov kasparov          3 2023-12-21 21:03 warandpeace_histogram2.txt
drwxr-xr-x   - kasparov kasparov          3 2023-12-21 21:04 warandpeace_histogram_2.txt
-rwxr-xr-x   3 kasparov kasparov    5315699 2023-12-21 01:02 warandsociety.txt


In [4]:
trip_data = sc.textFile("trip.csv")
station_data = sc.textFile("station.csv")
trip_data.first()

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [26]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        try:
            yield Station(
                station_id = int(station[0]),
                name = station[1],
                lat = float(station[2]),
                long = float(station[3]),
                dockcount = int(station[4]),
                landmark = station[5],
                installation = datetime.strptime(station[6], '%m/%d/%Y')
            )
        except:
            pass

In [27]:
def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
                 trip_id = int(trip[0]),
                 duration = int(trip[1]),
                 start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
                 start_station_name = trip[3],
                 start_station_id = int(trip[4]),
                 end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
                 end_station_name = trip[6],
                 end_station_id = trip[7],
                 bike_id = int(trip[8]),
                 subscription_type = trip[9],
                 zip_code = trip[10]
            )
        except:
            pass

In [19]:
trip_headers = trip_data.first()
station_headers = station_data.first()

In [20]:
trip = trip_data.filter(lambda row: row != trip_headers).map(lambda row: row.split(",", -1))
station = station_data.filter(lambda row: row != station_headers).map(lambda row: row.split(",", -1))
trip.take(2)

[['4576',
  '63',
  '8/29/2013 14:13',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '70',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138']]

In [22]:
stations_ = station.mapPartitions(initStation)

In [28]:
stations_.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [29]:
trips_ = trip.mapPartitions(initTrip)

In [30]:
trips_.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

In [36]:
# Найти велосипед с максимальным временем пробега.

# складываем все продолжительности одного bike_id и берем bike_id с максимальной суммой
bike_max_duration_id = trips_.map(lambda x: (x.bike_id, x.duration)) \
                                    .reduceByKey(lambda a,b:a+b).reduce(lambda a,b: a if a[1]>b[1] else b)[0]

# с помощью метода map получаем из исходной выборки пары типа (bike_id, duration) - (id, продолжительность), 
# складываем все поездки по bike_id, получая в итоге пары (id, суммарная продолжительность), 
# сравниваем все пары и выбираем ту, у которой суммарная по всем поездкам продолжительность больше, 
# определяя пару из id и максимальной суммарной продолжительности.

bike_max_duration_id

535

In [37]:
# Найти наибольшее геодезическое расстояние между станциями.

def distance(x,y):
    return ((x.lat-y.lat)**2+(x.long-y.long)**2)**0.5

# с помощью метода cartesian() выбираем все возможные комбинации станций и затем с помощь метода map и функции distance, 
# вычисляем расстояние для каждой пары и выбираем максимальное

max_distance = stations_.cartesian(stations_).map(lambda x:  distance(x[0], x[1])).max()
max_distance

0.7058482821754397

In [39]:
# Найти путь велосипеда с максимальным временем пробега через станции.

# из исходной выборки выбираем поездки, в которых id велосипеда совпадает с подсчитанным ранее - id велосипеда с максимальным временем пробега
# сортируем по дате начала поездки и с помощью метода map выбираем название конечной станции.
path_max_duration = trips_.filter(lambda x:x.bike_id==bike_max_duration_id).sortBy(lambda x: x.start_date).map(lambda x: x.end_station_name).distinct().collect()
path_max_duration

['San Francisco Caltrain 2 (330 Townsend)',
 'Market at Sansome',
 '2nd at South Park',
 'Davis at Jackson',
 'Post at Kearney',
 'Embarcadero at Sansome',
 'Clay at Battery',
 'Harry Bridges Plaza (Ferry Building)',
 'Steuart at Market',
 'Townsend at 7th',
 'Powell at Post (Union Square)',
 'Market at 4th',
 'Beale at Market',
 'Powell Street BART',
 'San Francisco City Hall',
 'Embarcadero at Vallejo',
 'Yerba Buena Center of the Arts (3rd @ Howard)',
 'Howard at 2nd',
 'Commercial at Montgomery',
 'Grant Avenue at Columbus Avenue',
 'Broadway St at Battery St',
 'Post at Kearny',
 'San Francisco Caltrain (Townsend at 4th)',
 'Spear at Folsom',
 'Temporary Transbay Terminal (Howard at Beale)',
 '5th at Howard',
 'Civic Center BART (7th at Market)',
 'Market at 10th',
 '2nd at Folsom',
 'South Van Ness at Market',
 'Mechanics Plaza (Market at Battery)',
 'Embarcadero at Folsom',
 '2nd at Townsend',
 'Embarcadero at Bryant',
 'Golden Gate at Polk',
 'Washington at Kearny',
 'Washingto

In [40]:
# Найти количество велосипедов в системе.

# отбираем из исходной выборки все идентификаторы велосипедов
# убираем повторы и с помощью метода count подсчитываем их общее число.
bike_count = trips_.map(lambda x: x.bike_id).distinct().count()
bike_count

700

In [43]:
# Найти пользователей потративших на поездки более 3 часов.

# с помощью метода map из исходной выборки отбираем пары (zip_code, duration) - почтовый индекс пользователя и продолжительность поездки
# складываем все поездки для каждого пользователя и с помощью метода filter находим пары, в которых суммарная продолжительность больше трех часов, 
# из пар отбираем только почтовый индекс пользователя.
clients_count = trips_.map(lambda x: (x.zip_code, x.duration)).reduceByKey(lambda a,b: a+b).filter(lambda x: x[1]>3*60*60).map(lambda x:x[0])
clients_count.count()

3661

In [44]:
sc.stop()