In [1]:
from pyspark import SparkContext, SparkConf
app_name = "Lab1"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc



In [2]:
!hadoop fs -put /mnt/data /data

In [3]:
!hadoop fs -ls /data

Found 9 items
-rwxr-xr-x   3 root root        394 2022-11-24 07:50 /data/list_of_countries_sorted_gini.txt
-rwxr-xr-x   3 root root   19459967 2022-11-24 07:50 /data/nycTaxiFares.gz
-rwxr-xr-x   3 root root   84135506 2022-11-24 07:50 /data/nycTaxiRides.csv
-rwxr-xr-x   3 root root   79500408 2022-11-24 07:50 /data/nyctaxi.gz
-rwxr-xr-x   3 root root   74162295 2022-11-24 07:50 /data/posts_sample.xml
-rwxr-xr-x   3 root root      40269 2022-11-24 07:50 /data/programming-languages.csv
-rwxr-xr-x   3 root root       5647 2022-11-24 07:50 /data/stations.csv
-rwxr-xr-x   3 root root   80208831 2022-11-24 07:50 /data/trips.csv
-rwxr-xr-x   3 root root    5315699 2022-11-24 07:50 /data/warandsociety.txt


### Tecты

In [4]:
warandpeace = sc.textFile("/mnt/data/warandsociety.txt")
warandpeace.count()

12851

In [5]:
nilFile = sc.textFile("nil")
#nilFile.count()

In [6]:
warandpeace.take(10)

['Лев Николаевич Толстой',
 'Война и мир. Книга 1',
 '',
 'Война и мир – 1',
 '',
 ' ',
 ' http://www.lib.ru',
 '',
 'Аннотация ',
 '']

In [7]:
linesWithWar = warandpeace.filter(lambda x: "война" in x)
linesWithWar.first()

"– Еh bien, mon prince. Genes et Lucques ne sont plus que des apanages, des поместья, de la famille Buonaparte. Non, je vous previens, que si vous ne me dites pas, que nous avons la guerre, si vous vous permettez encore de pallier toutes les infamies, toutes les atrocites de cet Antichrist (ma parole, j'y crois) – je ne vous connais plus, vous n'etes plus mon ami, vous n'etes plus мой верный раб, comme vous dites. [Ну, что, князь, Генуа и Лукка стали не больше, как поместьями фамилии Бонапарте. Нет, я вас предупреждаю, если вы мне не скажете, что у нас война, если вы еще позволите себе защищать все гадости, все ужасы этого Антихриста (право, я верю, что он Антихрист) – я вас больше не знаю, вы уж не друг мой, вы уж не мой верный раб, как вы говорите.] Ну, здравствуйте, здравствуйте. Je vois que je vous fais peur, [Я вижу, что я вас пугаю,] садитесь и рассказывайте."

In [8]:
def time(f):    
    import time
    t = time.process_time()
    f()
    print(f"Elapsed time: {int((time.process_time() - t)*1e9)} ns")

In [10]:
linesWithWar.cache()
time(lambda: linesWithWar.count() )
time(lambda: linesWithWar.count() )

Elapsed time: 7243900 ns
Elapsed time: 6807699 ns


In [11]:
wordCounts = linesWithWar.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

In [13]:
wordCounts.top(5, lambda x: x[1])

[('и', 250), ('что', 152), ('не', 117), ('в', 108), ('–', 92)]

### Подготовка данных для заданий

In [14]:
from typing import NamedTuple
from datetime import datetime

def InitStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )
        
def InitTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        yield Trip(                             
            trip_id = int(trip[0]),
            duration = int(trip[1]) if trip[1] != '' else 0,
            start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M') if trip[2] != '' else None,
            start_station_name = trip[3],
            start_station_id = int(trip[4]),
            end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M') if trip[5] != '' else None,
            end_station_name = trip[6],
            end_station_id = int(trip[7]),
            bike_id = int(trip[8]),
            subscription_type = trip[9],
            zip_code = trip[10]
        )

In [15]:
def SplitNameColumnsAndData(data):
    columns = data.first()
    table = data.filter(lambda row: row != columns).map(lambda row: row.split(","))
    return columns, table

In [16]:
trip_data = sc.textFile("/mnt/data/trips.csv")
station_data = sc.textFile("/mnt/data/stations.csv")
station_data.take(2)

['id,name,lat,long,dock_count,city,installation_date',
 '2,San Jose Diridon Caltrain Station,37.329732,-121.90178200000001,27,San Jose,8/6/2013']

In [17]:
trip_columns, trips = SplitNameColumnsAndData(trip_data)
station_columns, stations = SplitNameColumnsAndData(station_data)
stations.first()

['2',
 'San Jose Diridon Caltrain Station',
 '37.329732',
 '-121.90178200000001',
 '27',
 'San Jose',
 '8/6/2013']

In [19]:
!head /mnt/data/stations.csv

id,name,lat,long,dock_count,city,installation_date
2,San Jose Diridon Caltrain Station,37.329732,-121.90178200000001,27,San Jose,8/6/2013
3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013
4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013
5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013
6,San Pedro Square,37.336721000000004,-121.894074,15,San Jose,8/7/2013
7,Paseo de San Antonio,37.333798,-121.88694299999999,15,San Jose,8/7/2013
8,San Salvador at 1st,37.330165,-121.88583100000001,15,San Jose,8/5/2013
9,Japantown,37.348742,-121.89471499999999,15,San Jose,8/5/2013
10,San Jose City Hall,37.337391,-121.886995,15,San Jose,8/6/2013


In [20]:
trip_columns, trips.take(1)

('id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code',
 [['4576',
   '63',
   '',
   'South Van Ness at Market',
   '66',
   '8/29/2013 14:14',
   'South Van Ness at Market',
   '66',
   '520',
   'Subscriber',
   '94127']])

In [21]:
station_columns, stations.take(1)

('id,name,lat,long,dock_count,city,installation_date',
 [['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']])

In [23]:
trips_objects = trips.mapPartitions(InitTrip)
trips_objects.take(1)

[Trip(trip_id=4576, duration=63, start_date=None, start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id=66, bike_id=520, subscription_type='Subscriber', zip_code='94127')]

In [25]:
stations_internal = stations.mapPartitions(InitStation)
stations_internal.take(1)

[Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))]

# 1 Найти велосипед с максимальным временем пробега.

In [26]:
# Выделим как ключ id велосипеда
trips_by_bike = trips_objects.keyBy(lambda trip: trip.bike_id)
trips_by_bike.first()

(520,
 Trip(trip_id=4576, duration=63, start_date=None, start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id=66, bike_id=520, subscription_type='Subscriber', zip_code='94127'))

In [27]:
#Оставим только пробег
query = trips_by_bike.mapValues(lambda trip: trip.duration)
query.take(5)

[(520, 63), (661, 0), (48, 71), (26, 77), (319, 83)]

In [29]:
# Получим общий пробег для каждого велосипеда
query = query.reduceByKey(lambda a, b: a + b)

In [30]:
# Выведем велосипед с максимальным временем пробега
id_bike_w_max_dur = query.top(1, key = lambda x: x[1])[0][0]
id_bike_w_max_dur

535

# 2 Найти наибольшее геодезическое расстояние между станциями.

In [31]:
# Сформируем пары ключ значение, где ключ  - пара с номерами начальной и конечной станции, значение - время пути между ними
stations_dur = trips_objects\
                .filter(lambda trip: trip.start_station_id != trip.end_station_id)\
                .keyBy(lambda trip: (trip.start_station_id, trip.end_station_id))\
                .mapValues(lambda trip: trip.duration)
stations_dur.take(5)

[((66, 67), 83),
 ((4, 5), 109),
 ((10, 11), 114),
 ((49, 54), 125),
 ((6, 4), 126)]

In [32]:
# Найдём среднее время пути для каждой станции
query = stations_dur.aggregateByKey(
    (0.0, 0.0),
    lambda acc, value: (acc[0] + value, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),
).mapValues(lambda values: values[0] / values[1])

In [33]:
# Получим наибольшее время пути
test = query.top(1, key = lambda x: x[1])[0][1]

In [34]:
test

229914.0

# 3 Найти путь велосипеда с максимальным временем пробега через станции.

In [55]:
# Берём все строки с уже найденным велосипедом, сортируем по времени начала, оставляем начальную и конечную станцию
query = trips_objects.filter(lambda trip: trip.bike_id == id_bike_w_max_dur)\
        .sortBy(lambda trip: trip.start_date)\
        .map(lambda trip: (trip.start_station_name, trip.end_station_name)) 

In [56]:
query.take(3)

[('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)'),
 ('San Francisco Caltrain (Townsend at 4th)',
  'San Francisco Caltrain 2 (330 Townsend)'),
 ('San Francisco Caltrain 2 (330 Townsend)', 'Market at Sansome')]

In [57]:
query.count()

1328

In [38]:
# Выводим всё
#query.collect()

# 4 Найти количество велосипедов в системе.

In [39]:
query = trips_objects.map(lambda trip: trip.bike_id)
query.take(5)

[520, 661, 48, 26, 319]

In [40]:
query.count()

669959

In [41]:
# убирает дубликаты
query.distinct().count()

700

# 5 Найти пользователей потративших на поездки более 3 часов.

In [43]:
three_hours = 3 * 60 * 60
query = trips_objects.filter(lambda trip: trip.duration > three_hours).map(lambda trip: trip.trip_id)

In [44]:
query.take(5)

[4639, 4637, 4528, 4363, 4193]

In [45]:
query.count()

8322

In [46]:
query.distinct().count()

8322