In [1]:
!pip install pyspark==3.0.0
from pyspark import SparkContext, SparkConf
app_name = 'L1 - Introduction to Apache Spark'
conf = SparkConf().setAppName(app_name).setMaster('local[1]')
sc = SparkContext(conf=conf)
sc

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[0mCollecting py4j==0.10.9 (from pyspark==3.0.0)
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044159 sha256=48d6d5d1485dd4ddad05fc2302be73fc8c80710bfc3747042249eb7e448ee364
  Stored in directory: /root/.cache/pip/wheels/b1/bb/8b/ca24d3f756f2ed967225b0871898869db676eb5846df5adc56
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py

In [2]:
from typing import NamedTuple
from datetime import datetime
from functools import reduce

In [3]:
def initStation(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dockcount: int
        landmark: str
        installation: str

    for station in stations:
        yield Station(
            station_id = int(station[0]),
            name = station[1],
            lat = float(station[2]),
            long = float(station[3]),
            dockcount = int(station[4]),
            landmark = station[5],
            installation = datetime.strptime(station[6], '%m/%d/%Y')
        )

def initTrip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
             trip_id = int(trip[0]),
             duration = int(trip[1]),
             start_date = datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
             start_station_name = trip[3],
             start_station_id = int(trip[4]),
             end_date = datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
             end_station_name = trip[6],
             end_station_id = trip[7],
             bike_id = int(trip[8]),
             subscription_type = trip[9],
             zip_code = trip[10]
            )
        except:
            pass

In [11]:
trip_data = sc.textFile('trips.csv')
station_data = sc.textFile('stations.csv')

In [12]:
def get_data_without_headers(data):
    headers = data.first()
    table_data = data.filter(lambda row: row != headers).map(lambda row: row.split(","))
    return table_data

In [13]:
trips = get_data_without_headers(trip_data)
stations = get_data_without_headers(station_data)

In [14]:
trips.take(3)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138'],
 ['4130',
  '71',
  '8/29/2013 10:16',
  'Mountain View City Hall',
  '27',
  '8/29/2013 10:17',
  'Mountain View City Hall',
  '27',
  '48',
  'Subscriber',
  '97214']]

In [15]:
stations_mapped = stations.mapPartitions(initStation)
trips_mapped= trips.mapPartitions(initTrip)

In [16]:
stations_mapped.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dockcount=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [17]:
trips_mapped.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

# 1. Найти велосипед с максимальным временем пробега.

In [18]:
bike_max_mileage = trips_mapped.keyBy(lambda x: x.bike_id)
bike_duration = bike_max_mileage.mapValues(lambda x: x.duration).reduceByKey(lambda x1, x2: x1 + x2)
bike_duration_top = bike_duration.top(1, key=lambda x: x[1])[0][0]
bike_duration_top

535

# 2. Найти наибольшее геодезическое расстояние между станциями.

In [19]:
trips_mapped.first()

Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214')

In [20]:
trips_stations = trips_mapped.filter(lambda trip: str(trip.start_station_id) != str(trip.end_station_id))\
                             .keyBy(lambda trip: (trip.start_station_id, trip.end_station_id))\
                             .mapValues(lambda trip: trip.duration)
trips_stations.take(5)

[((66, '67'), 83),
 ((4, '5'), 109),
 ((10, '11'), 114),
 ((49, '54'), 125),
 ((6, '4'), 126)]

In [21]:
query = trips_stations\
    .aggregateByKey(
        (0.0, 0.0),
        lambda acc, value: (acc[0] + value, acc[1] + 1),
        lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]),)\
    .mapValues(lambda values: values[0] / values[1])

In [22]:
query.map(lambda x: x[::-1]).top(5)

[(229914.0, (26, '16')),
 (179212.5, (32, '63')),
 (169308.0, (80, '36')),
 (152365.28070175438, (66, '62')),
 (84633.0, (5, '33'))]

# 3. Найти путь велосипеда с максимальным временем пробега через станции.

In [23]:
bike_path = trips_mapped.filter(lambda x: x.bike_id == bike_duration_top)\
                        .sortBy(lambda x: x.start_date)\
                        .map(lambda x: (x.start_station_name, x.end_station_name))

bike_path.first()

('Post at Kearney', 'San Francisco Caltrain (Townsend at 4th)')

# 4. Найти количество велосипедов в системе.

In [24]:
count_bikes = trips_mapped.map(lambda x: x.bike_id).distinct().count()
count_bikes

700

# 5. Найти пользователей потративших на поездки более 3 часов.

In [25]:
users = trips_mapped.filter(lambda x: x.duration > (3 * 60 * 60))\
                    .map(lambda x: x.zip_code)\
                    .filter(lambda x: x != "")\
                    .distinct()
users.take(10)

['58553',
 '94301',
 '94039',
 '94133',
 '93726',
 '94123',
 '4517',
 '29200',
 '45322',
 '94080']