# Лабораторная 1. Интерактивный анализ данных вело парковок SF Bay Area Bike Share в Apache Spark

## Описание данных

https://www.kaggle.com/benhamner/sf-bay-area-bike-share

Схема файла stations.csv:

```
id: station ID number
name: name of station
lat: latitude
long: longitude
dock_count: number of total docks at station
city: city (San Francisco, Redwood City, Palo Alto, Mountain View, San Jose)
installation_date: original date that station was installed. If station was moved, it is noted below.
```

Схема файла trips.csv:

```
id: numeric ID of bike trip
duration: time of trip in seconds
start_date: start date of trip with date and time, in PST
start_station_name: station name of start station
start_station_id: numeric reference for start station
end_date: end date of trip with date and time, in PST
end_station_name: station name for end station
end_station_id: numeric reference for end station
bike_id: ID of bike used
subscription_type: Subscriber = annual or 30-day member; Customer = 24-hour or 3-day member
zip_code: Home zip code of subscriber (customers can choose to manually enter zip at kiosk however data is unreliable)
```

In [1]:
import os
import sys

from pyspark import SparkConf
from pyspark import SparkContext

In [2]:
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [3]:
conf = (SparkConf().setAppName("L1_interactive_bike_analysis")
        .setMaster("local[*]")
        .set("spark.pyspark.python", sys.executable)
        .set("spark.pyspark.driver.python", sys.executable)
        .set("spark.python.profile", "true")
        )

In [4]:
sc = SparkContext.getOrCreate(conf=conf)

In [5]:
from pathlib import Path, PurePosixPath
from typing import Union

from pyspark import SparkContext
from pyspark.rdd import RDD
from py4j.protocol import Py4JJavaError
from socket import error as SocketError

CSVRow = list[str]
CSVHeader = str


def _to_file_uri(p: Union[str, Path]) -> str:
    """Путь → URI вида file:///C:/... (Hadoop‑friendly)."""
    return f"file:///{PurePosixPath(Path(p).resolve()).as_posix()}"


def _is_textfile_runtime_error(exc: Py4JJavaError) -> bool:
    """
    True, если внутри Py4JJavaError прячется та самая
    java.net.SocketException / IOException, с которой нам
    нужно переключиться на безопасное чтение.
    """
    msg = str(exc.java_exception)
    return (
            "Connection reset" in msg  # SocketException
            or "Cannot run program \"python3\"" in msg  # воркер не запустился
            or "Input path does not exist" in msg  # неверный URI
    )


def read_csv_rdd(
        sc: SparkContext,
        path: Union[str, Path],
        *,
        delimiter: str = ",",
        encoding: str = "utf-8",
        errors: str = "replace",
) -> tuple[CSVHeader, RDD[CSVRow]]:
    """
    Надёжно загружает CSV‑файл и возвращает (header, data_rdd).

    • Сначала пробует `textFile()`;
    • При UnicodeDecodeError или Py4J‑ошибке, связанной с сокетом/воркером,
      автоматически переходит на безопасный режим `binaryFiles()`;
    • Любые другие исключения пробрасываются вызывающему коду.
    """
    uri = _to_file_uri(path)

    # --- fallback: binaryFiles() -----------------------------------------
    def _read_binary() -> RDD[str]:
        return (
            sc.binaryFiles(uri)
            .flatMap(lambda kv: kv[1].splitlines())
            .map(lambda b: b.decode(encoding, errors))
        )

    # --- попытка textFile() ----------------------------------------------
    try:
        rdd_raw = sc.textFile(uri)
        _ = rdd_raw.first()  # триггерим action
    except UnicodeDecodeError:
        rdd_raw = _read_binary()
    except Py4JJavaError as e:
        if _is_textfile_runtime_error(e):
            rdd_raw = _read_binary()
        else:
            raise  # не наша ситуация – проброс
    except SocketError:
        rdd_raw = _read_binary()
    # любые иные исключения не перехватываем

    # --- отделяем заголовок и возвращаем результат -----------------------
    header = rdd_raw.first()
    data_rdd: RDD[CSVRow] = (
        rdd_raw
        .filter(lambda row: row != header)
        .map(lambda row: row.split(delimiter, -1))
    )

    return header, data_rdd

In [6]:
DATA_DIR = Path("../data")

trips_header, trip_data = read_csv_rdd(sc, DATA_DIR / "trips.csv")
stations_header, station_data = read_csv_rdd(sc, DATA_DIR / "stations.csv")

print("Header (trips):  ", trips_header)
print("Sample trips row:", trip_data.first())

print("Header (station):  ", trips_header)
print("Sample station row:", trip_data.first())

Header (trips):   id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code
Sample trips row: ['4576', '63', '', 'South Van Ness at Market', '66', '8/29/2013 14:14', 'South Van Ness at Market', '66', '520', 'Subscriber', '94127']
Header (station):   id,duration,start_date,start_station_name,start_station_id,end_date,end_station_name,end_station_id,bike_id,subscription_type,zip_code
Sample station row: ['4576', '63', '', 'South Van Ness at Market', '66', '8/29/2013 14:14', 'South Van Ness at Market', '66', '520', 'Subscriber', '94127']


In [7]:
list(enumerate(trips_header.split(",")))

[(0, 'id'),
 (1, 'duration'),
 (2, 'start_date'),
 (3, 'start_station_name'),
 (4, 'start_station_id'),
 (5, 'end_date'),
 (6, 'end_station_name'),
 (7, 'end_station_id'),
 (8, 'bike_id'),
 (9, 'subscription_type'),
 (10, 'zip_code')]

In [8]:
list(enumerate(stations_header.split(",")))

[(0, 'id'),
 (1, 'name'),
 (2, 'lat'),
 (3, 'long'),
 (4, 'dock_count'),
 (5, 'city'),
 (6, 'installation_date')]

In [9]:
trip_data.take(2)

[['4576',
  '63',
  '',
  'South Van Ness at Market',
  '66',
  '8/29/2013 14:14',
  'South Van Ness at Market',
  '66',
  '520',
  'Subscriber',
  '94127'],
 ['4607',
  '',
  '8/29/2013 14:42',
  'San Jose City Hall',
  '10',
  '8/29/2013 14:43',
  'San Jose City Hall',
  '10',
  '661',
  'Subscriber',
  '95138']]

In [10]:
station_data.take(2)

[['2',
  'San Jose Diridon Caltrain Station',
  '37.329732',
  '-121.90178200000001',
  '27',
  'San Jose',
  '8/6/2013'],
 ['3',
  'San Jose Civic Center',
  '37.330698',
  '-121.888979',
  '15',
  'San Jose',
  '8/5/2013']]

Объявите `stationsIndexed` так, чтобы результатом был список пар ключ-значение с целочисленным ключом из первой колонки.
Таким образом вы создаёте индекс на основе первой колонки - номера велостоянки

In [11]:
stationsIndexed = station_data.keyBy(lambda station: station[0])

In [12]:
stationsIndexed.take(2)

[('2',
  ['2',
   'San Jose Diridon Caltrain Station',
   '37.329732',
   '-121.90178200000001',
   '27',
   'San Jose',
   '8/6/2013']),
 ('3',
  ['3',
   'San Jose Civic Center',
   '37.330698',
   '-121.888979',
   '15',
   'San Jose',
   '8/5/2013'])]

Аналогичное действие проделайте для индексирования коллекции trips по колонкам start_station_id и  end_station_id и сохраните результат в переменные, например tripsByStartTerminals и tripsByEndTerminals.

In [13]:
# ── 1. Найдём нужные индексы в CSV ────────────────────────────────────────
columns = trips_header.split(",")  # ['id', 'duration', 'start_date', ...]
IDX_START = columns.index("start_station_id")  # 4
IDX_END = columns.index("end_station_id")  # 7

# ── 2. Формируем индекс: {station_id → Iterable[Row‑list]} ───────────────
tripsByStartTerminals = (
    trip_data
    .keyBy(lambda row: row[IDX_START])  # ('66',  ['4576', '63', ...])
    .groupByKey()
    .cache()  # кэшируем, если будем часто использовать
)

tripsByEndTerminals = (
    trip_data
    .keyBy(lambda row: row[IDX_END])  # ('66',  [...])
    .groupByKey()
    .cache()
)

# ── 3. Быстрая проверка, что всё построилось корректно ───────────────────
print("Первые 2 ключа (start):")
for k, rows in tripsByStartTerminals.take(2):
    print("  →", k, "| кол-во поездок:", len(list(rows)))

print("\nВсего поездок:", trip_data.count(),
      "| через start‑индекс:", tripsByStartTerminals.mapValues(len).values().sum(),
      "| через end‑индекс:", tripsByEndTerminals.mapValues(len).values().sum())

Первые 2 ключа (start):
  → 56 | кол-во поездок: 15709
  → 26 | кол-во поездок: 311

Всего поездок: 669959 | через start‑индекс: 669959 | через end‑индекс: 669959


Выполните операцию объединения коллекций по ключу с помощью функции join. Объедините stationsIndexed и tripsByStartTerminals, stationsIndexed и tripsByEndTerminals.

In [14]:
startTrips = stationsIndexed.join(tripsByStartTerminals)
endTrips = stationsIndexed.join(tripsByEndTerminals)

Объявление последовательности трансформаций приводит к созданию ацикличного ориентированного графа. Вывести полученный граф можно для любого RDD.

In [15]:
print(startTrips.toDebugString().decode("utf-8"))

(3) PythonRDD[46] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[37] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[36] at partitionBy at DirectMethodHandleAccessor.java:103 []
 +-(3) PairwiseRDD[35] at join at C:\Users\nodax\AppData\Local\Temp\ipykernel_31732\2908465305.py:1 []
    |  PythonRDD[34] at join at C:\Users\nodax\AppData\Local\Temp\ipykernel_31732\2908465305.py:1 []
    |  UnionRDD[33] at union at DirectMethodHandleAccessor.java:103 []
    |  PythonRDD[31] at RDD at PythonRDD.scala:53 []
    |  file:///C:\/Users/nodax/PycharmProjects/BigData_Ishchenko_6405/data/stations.csv MapPartitionsRDD[6] at textFile at DirectMethodHandleAccessor.java:103 []
    |  file:///C:\/Users/nodax/PycharmProjects/BigData_Ishchenko_6405/data/stations.csv HadoopRDD[5] at textFile at DirectMethodHandleAccessor.java:103 []
    |  PythonRDD[32] at RDD at PythonRDD.scala:53 []
    |  PythonRDD[21] at RDD at PythonRDD.scala:53 []
    |      CachedPartitions: 1; MemorySize: 24.7 MiB; 

In [16]:
print(endTrips.toDebugString().decode("utf-8"))

(3) PythonRDD[45] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[44] at mapPartitions at PythonRDD.scala:160 []
 |  ShuffledRDD[43] at partitionBy at DirectMethodHandleAccessor.java:103 []
 +-(3) PairwiseRDD[42] at join at C:\Users\nodax\AppData\Local\Temp\ipykernel_31732\2908465305.py:2 []
    |  PythonRDD[41] at join at C:\Users\nodax\AppData\Local\Temp\ipykernel_31732\2908465305.py:2 []
    |  UnionRDD[40] at union at DirectMethodHandleAccessor.java:103 []
    |  PythonRDD[38] at RDD at PythonRDD.scala:53 []
    |  file:///C:\/Users/nodax/PycharmProjects/BigData_Ishchenko_6405/data/stations.csv MapPartitionsRDD[6] at textFile at DirectMethodHandleAccessor.java:103 []
    |  file:///C:\/Users/nodax/PycharmProjects/BigData_Ishchenko_6405/data/stations.csv HadoopRDD[5] at textFile at DirectMethodHandleAccessor.java:103 []
    |  PythonRDD[39] at RDD at PythonRDD.scala:53 []
    |  PythonRDD[26] at RDD at PythonRDD.scala:53 []
    |      CachedPartitions: 1; MemorySize: 25.1 MiB; 

Выполните объявленные графы трансформаций вызовом команды count.

In [17]:
startTrips.count()

70

In [18]:
endTrips.count()

70

Если вы знаете распределение ключей заранее, вы можете выбрать оптимальный способ хеширования ключей по разделам `Partition`. Например, если один ключ встречается на порядки чаще, чем другие ключи, то использование `HashPartitioner` будет не лучшим выбором, так как данные связанные с этим ключом будут собираться в одном разделе. Это приведёт к неравномерной нагрузке на вычислительные ресурсы.

Выбрать определённую реализацию класса распределения по разделам можно с помощью функции RDD `partitionBy`. Например, для RDD `stationsIndexed`  выбирается `portable_hash(idx)` с количеством разделов равным количеству разделов trips RDD.

In [19]:
from pyspark.rdd import portable_hash

stationsIndexed.partitionBy(numPartitions=trip_data.getNumPartitions(), partitionFunc=lambda x: portable_hash(x[0]))

MapPartitionsRDD[52] at mapPartitions at PythonRDD.scala:160

Узнать какой класс назначен для текущего RDD можно обращением к полю partitioner.

In [20]:
stationsIndexed.partitioner

## Создание модели данных

Для более эффективной обработки и получения дополнительных возможностей мы можем объявить классы сущностей предметной области и преобразовать исходные строковые данные в объявленное представление.

In [21]:
from typing import NamedTuple
from datetime import datetime

In [22]:
def init_station(stations):
    class Station(NamedTuple):
        station_id: int
        name: str
        lat: float
        long: float
        dock_count: int
        landmark: str
        installation: datetime
    
    for station in stations:
        yield Station(
            station_id=int(station[0]),
            name=station[1],
            lat=float(station[2]),
            long=float(station[3]),
            dock_count=int(station[4]),
            landmark=station[5],
            installation=datetime.strptime(station[6], '%m/%d/%Y')
        )

In [23]:
stationsInternal = station_data.mapPartitions(init_station)
stationsInternal.first()

Station(station_id=2, name='San Jose Diridon Caltrain Station', lat=37.329732, long=-121.90178200000001, dock_count=27, landmark='San Jose', installation=datetime.datetime(2013, 8, 6, 0, 0))

In [24]:
def init_trip(trips):
    class Trip(NamedTuple):
        trip_id: int
        duration: int
        start_date: datetime
        start_station_name: str
        start_station_id: int
        end_date: datetime
        end_station_name: str
        end_station_id: int
        bike_id: int
        subscription_type: str
        zip_code: str

    for trip in trips:
        try:
            yield Trip(
                trip_id=int(trip[0]),
                duration=int(trip[1]),
                start_date=datetime.strptime(trip[2], '%m/%d/%Y %H:%M'),
                start_station_name=trip[3],
                start_station_id=int(trip[4]),
                end_date=datetime.strptime(trip[5], '%m/%d/%Y %H:%M'),
                end_station_name=trip[6],
                end_station_id=trip[7],
                bike_id=int(trip[8]),
                subscription_type=trip[9],
                zip_code=trip[10]
            )
        except Exception:
            continue

In [25]:
tripsInternal = trip_data.mapPartitions(init_trip)
tripsInternal.take(10)

[Trip(trip_id=4130, duration=71, start_date=datetime.datetime(2013, 8, 29, 10, 16), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 10, 17), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=48, subscription_type='Subscriber', zip_code='97214'),
 Trip(trip_id=4251, duration=77, start_date=datetime.datetime(2013, 8, 29, 11, 29), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 11, 30), end_station_name='San Jose City Hall', end_station_id='10', bike_id=26, subscription_type='Subscriber', zip_code='95060'),
 Trip(trip_id=4299, duration=83, start_date=datetime.datetime(2013, 8, 29, 12, 2), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 12, 4), end_station_name='Market at 10th', end_station_id='67', bike_id=319, subscription_type='Subscriber', zip_code='94103'),
 Trip(trip_id=4927, duration=103, s

Для каждой стартовой станции найдем среднее время поездки. Будем использовать метод groupByKey.

Для этого потребуется преобразовать trips RDD в RDD коллекцию пар ключ-значение аналогично тому, как мы совершали это ранее методом keyBy.

In [26]:
tripsByStartStation = tripsInternal.keyBy(lambda trip: trip.start_station_name)

Рассчитаем среднее время поездки для каждого стартового парковочного места

In [27]:
import numpy as np

avgDurationByStartStation = tripsByStartStation \
    .mapValues(lambda trip: trip.duration) \
    .groupByKey() \
    .mapValues(lambda trip_durations: np.mean(list(trip_durations)))

Выведем первые 10 результатов

In [28]:
%%time

avgDurationByStartStation.top(10, key=lambda x: x[1])

CPU times: total: 0 ns
Wall time: 43.7 s


[('University and Emerson', np.float64(7090.239417989418)),
 ('California Ave Caltrain Station', np.float64(4628.005847953216)),
 ('Redwood City Public Library', np.float64(4579.234741784037)),
 ('Park at Olive', np.float64(4438.1613333333335)),
 ('San Jose Civic Center', np.float64(4208.016938519448)),
 ('Rengstorff Avenue / California Street', np.float64(4174.082373782108)),
 ('Redwood City Medical Center', np.float64(3959.491961414791)),
 ('Palo Alto Caltrain Station', np.float64(3210.6489815253435)),
 ('San Mateo County Center', np.float64(2716.7700348432054)),
 ('Broadway at Main', np.float64(2481.2537313432836))]

Выполнение операции groupByKey приводит к интенсивным передачам данных. Если группировка делается для последующей редукции элементов лучше использовать трансформацию reduceByKey или aggregateByKey. Их выполнение приведёт сначала к локальной редукции над разделом Partition, а затем будет произведено окончательное суммирование над полученными частичными суммами.

*Примечание.* Выполнение reduceByKey логически сходно с выполнением Combine и Reduce фазы MapReduce  работы.

Функция aggregateByKey является аналогом reduceByKey с возможностью указывать начальный элемент.

Рассчитаем среднее значение с помощью aggregateByKey. Одновременно будут вычисляться два значения для каждого стартового терминала: сумма времён и количество поездок.

In [29]:
? tripsByStartStation.aggregateByKey

In [30]:
def seq_func(acc, duration):
    duration_sum, count = acc
    return duration_sum + duration, count + 1


def comb_func(acc1, acc2):
    duration_sum1, count1 = acc1
    duration_sum2, count2 = acc2
    return duration_sum1 + duration_sum2, count1 + count2


def mean_func(acc):
    duration_sum, count = acc
    return duration_sum / count


avgDurationByStartStation2 = tripsByStartStation \
    .mapValues(lambda trip: trip.duration) \
    .aggregateByKey(
    zeroValue=(0, 0),
    seqFunc=seq_func,
    combFunc=comb_func) \
    .mapValues(mean_func)

В `zeroValue` передаётся начальное значение. В нашем случае это пара нулей. Первая функция `seqFunc` предназначена для прохода по коллекции партиции. На этом проходе значение элементов помещаются средой в переменную duration, а переменная «аккумулятора» acc накапливает значения. Вторая функция `combFunc` предназначена для этапа редукции частично посчитанных локальных результатов.

Сравните результаты `avgDurationByStartStation` и `avgDurationByStartStation2` и их время выполнения.

In [31]:
%%time

avgDurationByStartStation2.top(10, key=lambda x: x[1])

CPU times: total: 0 ns
Wall time: 43.7 s


[('University and Emerson', 7090.239417989418),
 ('California Ave Caltrain Station', 4628.005847953216),
 ('Redwood City Public Library', 4579.234741784037),
 ('Park at Olive', 4438.1613333333335),
 ('San Jose Civic Center', 4208.016938519448),
 ('Rengstorff Avenue / California Street', 4174.082373782108),
 ('Redwood City Medical Center', 3959.491961414791),
 ('Palo Alto Caltrain Station', 3210.6489815253435),
 ('San Mateo County Center', 2716.7700348432054),
 ('Broadway at Main', 2481.2537313432836)]

Теперь найдём первую поездку для каждой велостоянки. Для решения опять потребуется группировка. Ещё одним недостатком `groupByKey` данных является то, что для группировки данные должны поместиться в оперативной памяти. Это может привести к ошибке `OutOfMemoryException` для больших объёмов данных.

Найдем самую раннюю поездку для каждой станции. Сгруппируем поездки по станциям, возьмём первую поездку из отсортированного списка:

In [32]:
def earliest_trip(trips):
    if trips is None:
        return None
    if len(trips) == 0:
        return trips
    trips = list(trips)
    min_date = trips[0].start_date
    min_trip = trips[0]
    for trip in trips[1:]:
        if min_date > trip.start_date:
            min_date = trip.start_date
            min_trip = trip
    return min_trip


firstGrouped = tripsByStartStation \
    .groupByKey() \
    .mapValues(lambda trips: earliest_trip(trips))

In [33]:
%%time

firstGrouped.take(5)

CPU times: total: 0 ns
Wall time: 1min 12s


[('Post at Kearney',
  Trip(trip_id=4279, duration=1105, start_date=datetime.datetime(2013, 8, 29, 11, 53), start_station_name='Post at Kearney', start_station_id=47, end_date=datetime.datetime(2013, 8, 29, 12, 12), end_station_name='Post at Kearney', end_station_id='47', bike_id=602, subscription_type='Subscriber', zip_code='94114')),
 ('Steuart at Market',
  Trip(trip_id=4289, duration=970, start_date=datetime.datetime(2013, 8, 29, 11, 58), start_station_name='Steuart at Market', start_station_id=74, end_date=datetime.datetime(2013, 8, 29, 12, 14), end_station_name='Steuart at Market', end_station_id='74', bike_id=570, subscription_type='Subscriber', zip_code='94109')),
 ('San Francisco Caltrain (Townsend at 4th)',
  Trip(trip_id=4177, duration=278, start_date=datetime.datetime(2013, 8, 29, 11, 3), start_station_name='San Francisco Caltrain (Townsend at 4th)', start_station_id=70, end_date=datetime.datetime(2013, 8, 29, 11, 8), end_station_name='2nd at South Park', end_station_id='64

Лучшим вариантом с точки зрения эффективности будет использование трансформации `reduceByKey`

In [34]:
firstGrouped = tripsByStartStation \
    .reduceByKey(lambda trip_a, trip_b: trip_a if trip_a.start_date < trip_b.start_date else trip_b)

In [35]:
%%time

firstGrouped.take(5)

CPU times: total: 0 ns
Wall time: 41.7 s


[('Mountain View City Hall',
  Trip(trip_id=4081, duration=218, start_date=datetime.datetime(2013, 8, 29, 9, 38), start_station_name='Mountain View City Hall', start_station_id=27, end_date=datetime.datetime(2013, 8, 29, 9, 41), end_station_name='Mountain View City Hall', end_station_id='27', bike_id=150, subscription_type='Subscriber', zip_code='97214')),
 ('San Jose City Hall',
  Trip(trip_id=4160, duration=1271, start_date=datetime.datetime(2013, 8, 29, 10, 42), start_station_name='San Jose City Hall', start_station_id=10, end_date=datetime.datetime(2013, 8, 29, 11, 3), end_station_name='San Jose City Hall', end_station_id='10', bike_id=680, subscription_type='Subscriber', zip_code='95112')),
 ('South Van Ness at Market',
  Trip(trip_id=4074, duration=1131, start_date=datetime.datetime(2013, 8, 29, 9, 24), start_station_name='South Van Ness at Market', start_station_id=66, end_date=datetime.datetime(2013, 8, 29, 9, 43), end_station_name='San Francisco Caltrain 2 (330 Townsend)', end

In [36]:
sc.stop()