In [1]:
from pyspark import SparkContext, SparkConf

app_name = "lb1"
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc



In [2]:
!hadoop fs -put /mnt/data /data

In [3]:
!hadoop fs -ls /data

Found 1 items
drwxr-xr-x   - root root          0 2022-12-05 17:02 /data/data


In [33]:
warandpeace = sc.textFile("warandsociety.txt")
print(warandpeace.count())

12851


In [34]:
nilFile = sc.textFile("nil")
warandpeace.take(10)

['Лев Николаевич Толстой',
 'Война и мир. Книга 1',
 '',
 'Война и мир – 1',
 '',
 ' ',
 ' http://www.lib.ru',
 '',
 'Аннотация ',
 '']

In [35]:
linesWithPeace = warandpeace.filter(lambda x: "Мир " in x) 
linesWithPeace.first()

'– Мир заключен… – начал он. Но Наполеон не дал ему говорить. Ему, видно, нужно было говорить самому, одному, и он продолжал говорить с тем красноречием и невоздержанием раздраженности, к которому так склонны балованные люди.'

In [37]:
def time(f):
    import time
    t = time.process_time()
    f()
    print(f"Elapsed time: {int((time.process_time() - t)*1e9)} ns")
    
linesWithPeace.cache()
time(lambda: linesWithPeace.count())
time(lambda: linesWithPeace.count())

Elapsed time: 10771500 ns
Elapsed time: 10525499 ns


In [38]:
wordCounts = linesWithPeace\
    .flatMap(lambda line: line.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a + b)

In [39]:
wordCounts.top(5, lambda x: x[1])

[('–', 2), ('говорить', 2), ('и', 2), ('Мир', 1), ('заключен…', 1)]

In [40]:
trips_data = sc.textFile("trip.csv")
stations_data = sc.textFile("station.csv")

In [41]:
from typing import NamedTuple
from datetime import datetime
from functools import reduce

def initStations(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str
    
    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrips(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str
        
    for trip in trips:
        try:
            yield Trip(                             
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            ) 
        except:
            pass


In [42]:
# method to fetch data

def GetDataFromTable(data):
    columns = data.first()
    table = data.filter(lambda row: row != columns)\
                .map(lambda row: row.split(","))
    return columns, table

In [43]:
!head trip.csv

id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code
4576,63,8/29/2013 14:13,South Van Ness at Market,66,8/29/2013 14:14,South Van Ness at Market,66,520,Subscriber,94127
4607,70,8/29/2013 14:42,San Jose City Hall,10,8/29/2013 14:43,San Jose City Hall,10,661,Subscriber,95138
4130,71,8/29/2013 10:16,Mountain View City Hall,27,8/29/2013 10:17,Mountain View City Hall,27,48,Subscriber,97214
4251,77,8/29/2013 11:29,San Jose City Hall,10,8/29/2013 11:30,San Jose City Hall,10,26,Subscriber,95060
4299,83,8/29/2013 12:02,South Van Ness at Market,66,8/29/2013 12:04,Market at 10th,67,319,Subscriber,94103
4927,103,8/29/2013 18:54,Golden Gate at Polk,59,8/29/2013 18:56,Golden Gate at Polk,59,527,Subscriber,94109
4500,109,8/29/2013 13:25,Santa Clara at Almaden,4,8/29/2013 13:27,Adobe on Almaden,5,679,Subscriber,95112
4563,111,8/29/2013 14:02,San Salvador at 1st,8,8/29/2013 14:04,San Salvador at 1st,8,687,Subscri

In [44]:
trips_data.first()

'id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code'

In [45]:
trips_headers = trips_data.first()
stations_headers = stations_data.first()

In [46]:
trips = trips_data.filter(lambda x: x != trips_headers).map(lambda x: x.split(","))

stations = stations_data.filter(lambda x: x != stations_headers).map(lambda x: x.split(","))

In [47]:
for data in trips.take(3):
    print(data)

['4576', '63', '8/29/2013 14:13', 'South Van Ness at Market', '66', '8/29/2013 14:14', 'South Van Ness at Market', '66', '520', 'Subscriber', '94127']
['4607', '70', '8/29/2013 14:42', 'San Jose City Hall', '10', '8/29/2013 14:43', 'San Jose City Hall', '10', '661', 'Subscriber', '95138']
['4130', '71', '8/29/2013 10:16', 'Mountain View City Hall', '27', '8/29/2013 10:17', 'Mountain View City Hall', '27', '48', 'Subscriber', '97214']


In [48]:
for data in stations.take(3):
    print(data)

['2', 'San Jose Diridon Caltrain Station', '37.329732', '-121.90178200000001', '27', 'San Jose', '8/6/2013']
['3', 'San Jose Civic Center', '37.330698', '-121.888979', '15', 'San Jose', '8/5/2013']
['4', 'Santa Clara at Almaden', '37.333988', '-121.894902', '11', 'San Jose', '8/6/2013']


In [49]:
stations_MP = stations.mapPartitions(initStations)
stations_MP.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [50]:
trips_MP = trips.mapPartitions(initTrips)
trips_MP.first()

Trip(trip_id=4576, duration=63, start_date=datetime.datetime(2013, 8, 29, 14, 13), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 14, 14), end_station_name='South Van Ness at Market', end_station_id='66', bike_id=520, subscription_type='Subscriber', zip_code='94127')

# 1 Найти велосипед с максимальным временем пробега.

In [51]:
bike_top_duration = trips_MP.map(lambda trip: (trip.bike_id, trip.duration))\
                  .reduceByKey(lambda a, b: a + b)\
                  .top(1, key = lambda x: x[1])[0][0]

print(bike_top_duration)

535


# 2 Найти наибольшее геодезическое расстояние между станциями.

In [52]:
trips_stations = trips_MP \
.filter(lambda trip: trip.start_station_id != trip.end_station_id) \
.keyBy(lambda trip: (trip.start_station_id, trip.end_station_id)) \
.mapValues(lambda trip: trip.duration)


query = trips_stations \
.aggregateByKey((0.0, 0.0), \
                lambda acc, value: (acc[0] + value, acc[1] + 1), \
                lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),) \
.mapValues(lambda values: values[0] / values[1])

query.map(lambda x: x[::-1]).top(1)

[(229914.0, (26, '16'))]

# 3 Найти путь велосипеда с максимальным временем пробега через станции.

In [53]:
bike_path = trips_MP.filter(lambda x: x.bike_id == bike_top_duration)\
.sortBy(lambda x: x.start_date)\
.map(lambda x: (x.start_station_name, x.end_station_name))

print(bike_path.top(1))


[('Yerba Buena Center of the Arts (3rd @ Howard)', 'Townsend at 7th')]


# 4 Найти количество велосипедов в системе.

In [54]:
print(trips_MP.map(lambda trip: trip.bike_id).distinct().count())

700


# 5 Найти пользователей потративших на поездки более 3 часов.

In [55]:
threeHours = 3 * 60 * 60

users = trips_MP.filter(lambda trip: trip.duration > threeHours).map(lambda trip: trip.zip_code).distinct()

print(users.take(10))
print("Count", users.count())

['', '58553', '94301', '94039', '94133', '93726', '94123', '4517', '29200', '45322']
Count 2101
