<a href="https://colab.research.google.com/github/yuprotsyk/bigdata-course/blob/main/notebooks/topic02_rdd.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Аналіз та обробка великих даних

Ю.С. Процик. Курс лекцій

# Тема 2. Основи RDD: Низькорівневе керування даними та обчисленнями в Apache Spark

### План

1. [Налаштування `SparkContext`](#1.-Налаштування-`SparkContext`)
2. [Створення RDD](#2.-Створення-RDD)
3. [Операції з RDD: Трансформації та Дії](#3.-Операції-з-RDD:-Трансформації-та-Дії)
4. [Pair RDD та Shuffle](#4.-Pair-RDD-та-Shuffle)
5. [Спільні змінні (Shared Variables)](#5.-Спільні-змінні-\(Shared-Variables\))
6. [Кешування (Persistence)](#6.-Кешування-\(Persistence\))
7. [Аналіз плану виконання через `toDebugString()`](#7.-Аналіз-плану-виконання-через-`toDebugString\(\)`)
8. [Комплексний приклад та ключові концепції](#8.-Комплексний-приклад-та-ключові-концепції)
9. [Корисні ресурси](#9.Корисні-ресурси)

**RDD** – це основна абстракція Spark – відмовостійка (resilient), розподілена (distributed) колекція елементів, які можна обробляти паралельно.

**Ключові властивості:**

- **Розподіленість:** Дані розділені на частини між вузлами кластера.

- **Відмовостійкість:** RDD автоматично відновлюються у разі збоїв вузлів.

- **Незмінність (Immutable):** Після створення RDD не можна змінити, лише трансформувати у новий RDD.

## 1. Налаштування `SparkContext`

Перш ніж працювати з RDD, необхідно ініціалізувати `SparkSession`, яка автоматично створює `SparkContext`. В середовищі Google Colab ми використовуємо локальний режим виконання (`local[*]`), у якому всі ядра процесора імітують роботу кластера.

In [None]:
# 1. Встановлення PySpark
!pip install pyspark -q

# 2. Імпорт SparkSession – основної точки входу в Apache Spark
from pyspark.sql import SparkSession

# Створення сесії та отримання SparkContext
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("RDD_Lecture_Notes") \
    .getOrCreate()

sc = spark.sparkContext
print(f"SparkContext ініціалізовано. Версія: {sc.version}")

SparkContext ініціалізовано. Версія: 4.0.2


##2. Створення RDD

Згідно з документацією, є два способи створення RDD:

1. **Паралелізація існуючих колекцій (parallelizing):** Використовується метод `parallelize()` для перетворення існуючої колекції в програмі-драйвері.

2. **Зовнішні джерела даних:** Spark може створювати RDD з будь-якого джерела, що підтримується Hadoop (локальна файлова система, HDFS, S3 тощо).

In [None]:
# Спосіб 1: Parallelize (використовуємо 4 партиції)
data = [("Laptop", 1200), ("Mouse", 25), ("Keyboard", 45), ("Monitor", 300)]
rdd_from_collection = sc.parallelize(data, 4)

# Спосіб 2: Text File
# Створимо тимчасовий файл для демонстрації
with open("lecture_data.txt", "w") as f:
    f.write("Apache Spark\nRDD API\nDistributed Computing\nOptimization")

rdd_from_file = sc.textFile("lecture_data.txt")

print(f"Партицій у колекції: {rdd_from_collection.getNumPartitions()}")
print(f"Перший рядок файлу: {rdd_from_file.first()}")

Партицій у колекції: 4
Перший рядок файлу: Apache Spark


#### **Parallelize – паралелізація колекцій**

Метод `parallelize()` створює RDD з колекції (list, tuple тощо).

**Синтаксис:**
```python
sc.parallelize(data, numSlices=None)
```

**Параметри:**
- `data` – колекція даних (list, range, tuple)
- `numSlices` – кількість партицій (за замовчуванням = `spark.default.parallelism`)

#### Що відбувається всередині?

1. Driver отримує колекцію
2. Розділяє на N партицій (slices)
3. Розподіляє партиції по executors
4. Кожен executor обробляє свої партиції паралельно

#### Вибір кількості партицій

**Емпіричне правило:** `numSlices = 2-4 × кількість CPU cores`

#### **External Datasets – читання зовнішніх даних**

Spark підтримує читання даних з різних джерел:

#### Текстові файли
```python
sc.textFile(path, minPartitions=None)
```

**Особливості:**
- Кожен рядок файлу → один елемент RDD
- Підтримує wildcards: `data/*.txt`
- Підтримує стиснуті файли: `.gz`, `.bz2`
- Автоматично визначає кількість партицій на основі HDFS блоків

**Приклади:**
```python
sc.textFile("file.txt")              # Локальний файл
sc.textFile("hdfs://path/data.txt")  # HDFS
sc.textFile("s3://bucket/data.txt")  # S3
sc.textFile("logs/*.log")            # Wildcards
sc.textFile("data.gz")               # Стиснутий файл
```

#### Цілі файли (wholeTextFiles)
```python
sc.wholeTextFiles(path)
```

Повертає `RDD[(filename, content)]` – весь файл як один запис.

**Використання:** коли потрібно обробляти файл цілком (наприклад, XML, малі JSON файли).

#### Інші формати

**Hadoop формати:**
```python
sc.sequenceFile(path)           # Hadoop SequenceFile
sc.newAPIHadoopFile(path)       # Користувацький InputFormat
```

**Сучасні структуровані формати:**
- **JSON, CSV, Parquet** → рекомендується DataFrame API
```python
  df = spark.read.json("data.json")
  rdd = df.rdd  # Конвертація у разі потреби
```

**Для структурованих даних краще використовувати DataFrame API з подальшою конвертацією в RDD за потреби.**

##3. Операції з RDD: Трансформації та Дії

RDD підтримують два типи операцій: **Трансформації (Transformations)** та **Дії (Actions)**.


#### Transformations (Трансформації)

**Lazy operations** – створюють новий RDD з існуючого, але **не виконують обчислення одразу**.

- Spark запам'ятовує послідовність трансформацій у вигляді **DAG (Directed Acyclic Graph)** – орієнтованого ациклічного графа.

- Одночасно будується **граф походження (lineage)** – інформація про походження кожного RDD (які RDD та через які трансформації його створили), що дозволяє відновлювати втрачені партиції у разі збоїв.

- При трансформаціях дані не обчислюються, зберігається лише план виконання та метадані про залежності.

![image](https://raw.githubusercontent.com/yuprotsyk/bigdata-course/refs/heads/main/img/img0201.png)

#### Основні трансформації:

| Трансформація | Опис | Приклад |
|---------------|------|---------|
| `map(f)` | Застосовує функцію до кожного елемента | `rdd.map(lambda x: x * 2)` |
| `filter(f)` | Фільтрує елементи за умовою | `rdd.filter(lambda x: x > 10)` |
| `flatMap(f)` | Map + flatten (один → багато) | `rdd.flatMap(lambda x: x.split())` |
| `distinct()` | Видаляє дублікати | `rdd.distinct()` |
| `union(other)` | Об'єднання двох RDD | `rdd1.union(rdd2)` |
| `intersection(other)` | Перетин множин | `rdd1.intersection(rdd2)` |
| `subtract(other)` | Різниця множин | `rdd1.subtract(rdd2)` |

#### Типи залежностей:

**Narrow (Вузькі)** – кожна партиція дочірнього RDD залежить щонайбільше від однієї партиції батьківського RDD.

- Операції: `map`, `filter`, `union`, `sample`
- Без shuffle (швидко)
- Pipeline-обробка

**Wide (Широкі)** – кожна партиція дочірнього RDD може залежати від кількох партицій батьківського RDD.

- Операції: `groupByKey`, `reduceByKey`, `join`, `distinct`
- Потребують shuffle (повільно, дорого)
- Створюють межі stages


![image](https://raw.githubusercontent.com/yuprotsyk/bigdata-course/refs/heads/main/img/img0202.png)

#### Actions (Дії)

**Eager operations** – запускають **виконання всього DAG** і повертають результат у driver або зберігають у файлову систему.

- Spark формує **фізичний план**: розбиває DAG на stages та tasks для паралельного виконання на кластері.

| Action | Опис | Повертає |
|--------|------|----------|
| `collect()` | Збирає всі елементи на driver | `List[T]` |
| `count()` | Підраховує кількість елементів | `int` |
| `first()` | Повертає перший елемент | `T` |
| `take(n)` | Повертає перші n елементів | `List[T]` |
| `reduce(f)` | Агрегує всі елементи | `T` |
| `foreach(f)` | Виконує функцію для кожного | `None` |
| `saveAsTextFile(path)` | Зберігає у файл | `None` |
| `countByValue()` | Підраховує частоту значень | `dict` |

**Увага:** `collect()` завантажує **всі дані** на driver → небезпечно для великих RDD!


![image](https://raw.githubusercontent.com/yuprotsyk/bigdata-course/refs/heads/main/img/img0203.png)

In [None]:
# Трансформації — лише будують план
prices_rdd = rdd_from_collection.map(lambda x: (x[0], x[1] * 1.1))
expensive_items = prices_rdd.filter(lambda x: x[1] > 100)

# Action: collect (ось тут Spark запускає Job)
print("Результат після Actions:", expensive_items.collect())

Результат після Actions: [('Laptop', 1320.0), ('Monitor', 330.0)]


##4. Pair RDD та Shuffle

Деякі операції доступні лише для RDD, що складаються з кортежів `(key, value)`. Такі RDD називаються **Pair RDD**.

Класичний приклад використання Pair RDD – підрахунок кількості слів у тексті.

Спеціальні операції для Pair RDD:
- **Агрегація**: `reduceByKey`, `groupByKey`, `aggregateByKey`
- **Об'єднання**: `join`, `leftOuterJoin`, `rightOuterJoin`
- **Сортування**: `sortByKey`
- **Партиціонування**: `partitionBy`

**Shuffle** – це процес **перерозподілу даних між вузлами кластера**  
для групування або агрегації за ключем.

Це **дорога операція**, оскільки вона включає:
  - дисковий ввід/вивід (disk I/O);
  - мережеве копіювання даних між вузлами.


#### Коли відбувається shuffle?

- `groupByKey`, `reduceByKey`, `aggregateByKey`
- `join`, `cogroup`
- `distinct`, `intersection`
- `repartition`

**Важливо:** Офіційна документація рекомендує використовувати `reduceByKey` замість `groupByKey` для економії трафіку.

`reduceByKey` – оптимальний вибір (комбінує локально перед shuffle):
```python
# ДОБРЕ
word_counts = words_rdd.map(lambda w: (w, 1)).reduceByKey(lambda a, b: a + b)
```

`groupByKey` – передає всі дані через shuffle:
```python
# ПОГАНО
word_counts = words_rdd.map(lambda w: (w, 1)).groupByKey().mapValues(sum)
```

In [None]:
# Сценарій: Word Count (класика)
text = "spark is fast spark is cool spark is distributed"
words_rdd = sc.parallelize(text.split(" "))

# Створюємо пари (слово, 1) та агрегуємо
word_counts = words_rdd.map(lambda x: (x, 1)) \
                       .reduceByKey(lambda a, b: a + b)

print("Підрахунок слів:", word_counts.collect())

# Подивимось на "родовід" (Lineage)
print("\nГраф походження (Lineage):")
print(word_counts.toDebugString().decode())

Підрахунок слів: [('fast', 1), ('distributed', 1), ('spark', 3), ('is', 3), ('cool', 1)]

Граф походження (Lineage):
(2) PythonRDD[14] at collect at /tmp/ipython-input-1295917330.py:9 []
 |  MapPartitionsRDD[13] at mapPartitions at PythonRDD.scala:168 []
 |  ShuffledRDD[12] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[11] at reduceByKey at /tmp/ipython-input-1295917330.py:7 []
    |  PythonRDD[10] at reduceByKey at /tmp/ipython-input-1295917330.py:7 []
    |  ParallelCollectionRDD[9] at readRDDFromFile at PythonRDD.scala:297 []


Коли ви бачите в Spark **Lineage (граф походження)** для операції `reduceByKey`, ключовими є три етапи:

1. **Local Combine (**`PythonRDD [ ]`**):** Spark не відправляє всі дані відразу. Він спочатку підсумовує значення для кожного слова локально на кожному вузлі. Це суттєво економить мережевий трафік.

2. **Shuffle (**`ShuffledRDD [ ]`**):** **Це найдорожча операція**. Spark перерозподіляє дані між вузлами кластера так, щоб усі однакові ключі (наприклад, усі слова "spark") опинилися на одному фізичному сервері для фінального підрахунку.

3. **Final Aggregation (**`MapPartitionsRDD [ ]`**):** На кожному вузлі проводиться остаточне додавання отриманих результатів. Після цього дані готові до видачі через `.collect()`.

##5. Спільні змінні (Shared Variables)

За замовчуванням, коли Spark передає функцію на executors, він серіалізує та надсилає копії всіх використаних змінних для **кожного task окремо**.

**Проблема:** якщо є 100 tasks і змінна розміром 1 GB → 100 GB мережевого трафіку!

Для оптимізації Spark пропонує два типи спільних змінних:

- **Broadcast Variables (Широкомовні змінні)**

- **Accumulators (Акумулятори)**

#### Broadcast Variables (Широкомовні змінні)

**Призначення:** ефективна передача read-only даних.

**Як працює:**
- Змінна надсилається **один раз на executor** (не на кожен task)
- Всі tasks на executor використовують одну копію
- Економія: 100 tasks на 10 executors → 10 копій замість 100

**Типові випадки:**
- Lookup таблиці (словники для перекладу кодів)
- ML моделі для prediction
- Конфігураційні дані

In [None]:
# Демонстрація Broadcast Variables

# Без broadcast — неефективно
print("=== БЕЗ BROADCAST ===\n")

# Словник з країнами
country_codes = {
    "US": "United States",
    "UK": "United Kingdom",
    "DE": "Germany",
    "FR": "France",
    "JP": "Japan"
}

# RDD з кодами країн
codes = sc.parallelize(["US", "UK", "DE", "US", "FR", "JP", "UK"], 3)

# Без broadcast - словник передається в кожен task
# (не рекомендовано для великих даних)
result_no_broadcast = codes.map(lambda code: (code, country_codes.get(code, "Unknown")))
print("Результат без broadcast:")
print(result_no_broadcast.collect())
print()

# З broadcast - ефективно
print("=== З BROADCAST ===\n")

# Створення broadcast змінної
broadcast_countries = sc.broadcast(country_codes)

# Використання broadcast
result_with_broadcast = codes.map(
    lambda code: (code, broadcast_countries.value.get(code, "Unknown"))
)

print("Результат з broadcast:")
print(result_with_broadcast.collect())
print()

# Статистика
print(f"Розмір broadcast: {len(broadcast_countries.value)} записів")
print(f"Кількість партицій RDD: {codes.getNumPartitions()}")
print(f"Переваги: словник передано 1 раз на executor замість N разів на task")
print()

# Видалення broadcast змінної з пам'яті
broadcast_countries.unpersist()
print("Broadcast variable unpersisted")

=== БЕЗ BROADCAST ===

Результат без broadcast:
[('US', 'United States'), ('UK', 'United Kingdom'), ('DE', 'Germany'), ('US', 'United States'), ('FR', 'France'), ('JP', 'Japan'), ('UK', 'United Kingdom')]

=== З BROADCAST ===

Результат з broadcast:
[('US', 'United States'), ('UK', 'United Kingdom'), ('DE', 'Germany'), ('US', 'United States'), ('FR', 'France'), ('JP', 'Japan'), ('UK', 'United Kingdom')]

Розмір broadcast: 5 записів
Кількість партицій RDD: 3
Переваги: словник передано 1 раз на executor замість N разів на task

Broadcast variable unpersisted


#### Accumulators (Акумулятори)

**Призначення:** збір метрик з розподілених обчислень.

**Як працює:**
- Executors можуть лише **додавати** значення (`add()`)
- Driver може **читати** фінальний результат
- Гарантується коректність лише в actions (не в transformations)

**Типові випадки:**
- Підрахунок помилок під час обробки
- Збір статистики (кількість пропущених записів)
- Debugging та моніторинг

In [None]:
# Демонстрація Accumulators

print("=== ACCUMULATORS DEMO ===\n")

# Створення акумуляторів
error_counter = sc.accumulator(0)
valid_counter = sc.accumulator(0)
sum_accumulator = sc.accumulator(0)

# RDD з даними (деякі некоректні)
data = sc.parallelize([
    "10", "20", "invalid", "30", "40", "error", "50", "60", "bad", "70"
])

# Функція обробки з використанням акумуляторів
def process_record(value):
    global error_counter, valid_counter, sum_accumulator
    try:
        num = int(value)
        valid_counter.add(1)
        sum_accumulator.add(num)
        return num
    except ValueError:
        error_counter.add(1)
        return None

# Обробка даних через map
# УВАГА: Використання акумуляторів у map не гарантує точності при збоях!
# Якщо Task буде перезапущено через Lineage, значення можуть дублюватися.
# Для 100% точності лічильників використовуйте foreach() або агрегацію.
result = data.map(process_record).filter(lambda x: x is not None)

# Запускаємо action для trigger обчислень
result.collect()

# Читання значень акумуляторів (тільки на driver!)
print(f"Валідних записів: {valid_counter.value}")
print(f"Помилок: {error_counter.value}")
print(f"Сума валідних: {sum_accumulator.value}")
print(f"Середнє: {sum_accumulator.value / valid_counter.value:.2f}")
print()

# Приклад 2: Підрахунок слів різної довжини
short_words = sc.accumulator(0)
medium_words = sc.accumulator(0)
long_words = sc.accumulator(0)

text = sc.parallelize([
    "Apache Spark is a powerful distributed computing framework",
    "It provides high-level APIs for data processing"
])

def categorize_word(word):
    length = len(word)
    if length < 5:
        short_words.add(1)
    elif length < 8:
        medium_words.add(1)
    else:
        long_words.add(1)
    return word

words_categorized = text.flatMap(lambda line: line.split()).map(categorize_word)
words_categorized.count()  # Action для trigger

print("Категорії слів за довжиною:")
print(f"   Короткі (<5): {short_words.value}")
print(f"   Середні (5-7): {medium_words.value}")
print(f"   Довгі (8+): {long_words.value}")

=== ACCUMULATORS DEMO ===

Валідних записів: 7
Помилок: 3
Сума валідних: 280
Середнє: 40.00

Категорії слів за довжиною:
   Короткі (<5): 6
   Середні (5-7): 2
   Довгі (8+): 7


##6. Кешування (Persistence)

#### Навіщо потрібне кешування?

За замовчуванням RDD **recompute** (перераховується) кожного разу при action.

**Проблема без cache:**
```python
rdd = sc.textFile("file.txt").filter(...)

rdd.count()      # Обчислення 1: читає файл + filter
rdd.collect()    # Обчислення 2: знову читає файл + filter (!)
```

**Рішення:**
```python
rdd = sc.textFile("file.txt").filter(...).cache()

rdd.count()      # Обчислення 1: читає + filter + CACHE
rdd.collect()    # Використовує CACHE (швидко!)
```


#### Методи кешування

#### `cache()`

```python
rdd.cache()  # Зберігає в пам'яті
```

Еквівалентно `persist(StorageLevel.MEMORY_ONLY)`

#### `persist(storageLevel)`
```python
from pyspark import StorageLevel

rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.persist(StorageLevel.DISK_ONLY)
```

#### Storage Levels

| Level | Опис | Використання |
| :--- | :--- | :--- |
| **`MEMORY_ONLY`** | Тільки RAM | Найшвидший варіант, але дані можуть не поміститись |
| **`MEMORY_AND_DISK`** | RAM + Диск | Оптимальний баланс швидкості та надійності |
| **`MEMORY_ONLY_SER`** | RAM (серіалізовано) | Економить місце в пам'яті, але навантажує CPU |
| **`DISK_ONLY`** | Тільки диск | Повільно, але гарантує результат при нестачі RAM|
| **`OFF_HEAP`** | Поза межами JVM | Зменшує тиск на Garbage Collector (GC pressure) |

**Детальний опис кожного рівня та параметрів:** [Spark RDD Programming Guide - Persistence](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)

#### Коли кешувати?

**Так:**
- RDD використовується багато разів
- Ітеративні алгоритми (ML)
- Інтерактивна аналітика

**Ні:**
- RDD використовується лише раз
- Дуже великі дані, що не поміщаються в пам'ять

#### Політика витіснення (Eviction Policy)

Spark використовує **LRU** (Least Recently Used):
- Коли пам’ять заповнена, видаляються дані, які використовувалися найдавніше

#### Очищення кешу
```python
rdd.unpersist()  # Видаляє з пам'яті
```

##7. Аналіз плану виконання через `toDebugString()`

Оскільки Spark працює за принципом **Lazy Evaluation**, нам потрібен спосіб "зазирнути під капот" ще до того, як запустяться обчислення. Метод `toDebugString()` – це головний інструмент для візуалізації **lineage** у текстовому форматі.

#### Навіщо аналізувати план

- **Виявлення Shuffle**  
  Ідентифікація «широких» залежностей, що сповільнюють роботу.

- **Перевірка Stages**  
  Розуміння того, як логічний DAG розбивається на фізичні етапи.

- **Контроль паралелізму**  
  Моніторинг кількості партицій на кожному кроці.

- **Відстеження кешування**  
  Перевірка наявності мітки `[Memory...]`.


```python
print(rdd.toDebugString().decode())
```

#### Інтерпретація виводу

**Приклад виводу:**
```
(4) PythonRDD[10] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[9] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[8] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[7] at reduceByKey at <stdin>:1 []
    |  PythonRDD[6] at reduceByKey at <stdin>:1 []
    |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262 []
```

**Ключові символи та позначення**

| Символ | Значення            | Пояснення                                                                 |
|:-------|:---------------------|:---------------------------------------------------------------------------|
| `\|`    | Narrow dependency   | "Вузька" залежність. Дані обробляються конвеєром (pipelining) в межах одного Stage |
| `+–`   | Wide dependency     | "Широка" залежність (Shuffle). Межа між Stages; верхній RDD очікує дані від нижнього |
| `(4)`  | Partitions          | Кількість паралельних завдань (tasks) на даному етапі                     |
| `[]`   | Storage Info        | Статус кешування. Порожні дужки – не закешовано, `[Memory...]` – дані в RAM |

**Логіка виконання в PySpark**  
*(читати знизу вгору)*

1. **ParallelCollectionRDD**  
  Початок: створення RDD

2. **PythonRDD**  
  Виконання вашої логіки в Python-процесах  
  (це можуть бути `map` / `filter` або локальна агрегація перед Shuffle)

3. **PairwiseRDD**  
  Створення пар "ключ-значення" для подальшого групування

4. **--- МЕЖА STAGE (SHUFFLE) ---** (символ `+–`)

5. **ShuffledRDD**  
  Результат фізичного перемішування даних між вузлами кластера

6. **MapPartitionsRDD / PythonRDD**  
  Фінальна обробка отриманих даних та видача результату

**Важливо:** Поява символу `+–` свідчить про те, що Spark змушений розірвати конвеєр обробки та виконати **Shuffle** – найдорожчу операцію.  Оптимізація Spark-задач зазвичай зводиться до **зменшення кількості таких розривів**.

##8. Комплексний приклад та ключові концепції

#### Практичне завдання: аналіз логів веб-сервера

In [None]:
print("=== КОМПЛЕКСНИЙ ПРИКЛАД: АНАЛІЗ ЛОГІВ ===\n")

# Симуляція логів веб-сервера
logs = sc.parallelize([
    "192.168.1.1 - - [01/Jan/2024:12:00:00] GET /home 200 1234",
    "192.168.1.2 - - [01/Jan/2024:12:01:00] GET /about 200 2345",
    "192.168.1.1 - - [01/Jan/2024:12:02:00] GET /home 200 1456",
    "192.168.1.3 - - [01/Jan/2024:12:03:00] POST /contact 500 3456",
    "192.168.1.2 - - [01/Jan/2024:12:04:00] GET /products 404 4567",
    "192.168.1.1 - - [01/Jan/2024:12:05:00] GET /api/data 200 5678",
    "192.168.1.4 - - [01/Jan/2024:12:06:00] GET /home 200 6789",
    "192.168.1.3 - - [01/Jan/2024:12:07:00] GET /about 500 7890",
], 2)

# Кешування для багаторазового використання
logs.cache()

import re

# 1. Запити від кожної IP
def extract_ip(log):
    return log.split()[0]

ip_counts = (logs
    .map(lambda log: (extract_ip(log), 1))
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda kv: kv[1], ascending=False))

print("1. Запити від кожної IP:")
for ip, count in ip_counts.collect():
    print(f"   {ip:15} : {count} запитів")
print()

# 2. Популярні URL
def extract_url(log):
    match = re.search(r'(GET|POST)\s+(\S+)', log)
    return match.group(2) if match else None

url_counts = (logs
    .map(lambda log: (extract_url(log), 1))
    .filter(lambda kv: kv[0] is not None)
    .reduceByKey(lambda a, b: a + b)
    .sortBy(lambda kv: kv[1], ascending=False))

print("2. Популярні URL:")
for url, count in url_counts.collect():
    print(f"   {url:20} : {count} запитів")
print()

# 3. Розподіл статус-кодів
def extract_status(log):
    parts = log.split()
    return parts[-2] if len(parts) >= 2 else None

status_distribution = (logs
    .map(lambda log: (extract_status(log), 1))
    .filter(lambda kv: kv[0] is not None)
    .reduceByKey(lambda a, b: a + b))

print("3. Розподіл статус-кодів:")
for status, count in status_distribution.collect():
    print(f"   HTTP {status} : {count} запитів")
print()

# 4. Помилки (статус >= 400)
errors = logs.filter(lambda log: int(extract_status(log)) >= 400 if extract_status(log) else False)
print("4. Запити з помилками:")
for error in errors.collect():
    print(f"   {error}")
print()

# 5. Середній розмір відповіді
def extract_size(log):
    parts = log.split()
    return int(parts[-1]) if len(parts) >= 1 else 0

total_size = logs.map(extract_size).reduce(lambda a, b: a + b)
total_requests = logs.count()
avg_size = total_size / total_requests

print("5. Статистика розміру відповідей:")
print(f"   Загальний розмір: {total_size} bytes")
print(f"   Середній розмір: {avg_size:.2f} bytes")
print(f"   Загальних запитів: {total_requests}")
print()

# 6. Lineage graph
print("6. Lineage Graph (приклад для url_counts):")
print(url_counts.toDebugString().decode('utf-8'))

# Очищення
logs.unpersist()

=== КОМПЛЕКСНИЙ ПРИКЛАД: АНАЛІЗ ЛОГІВ ===

1. Запити від кожної IP:
   192.168.1.1     : 3 запитів
   192.168.1.2     : 2 запитів
   192.168.1.3     : 2 запитів
   192.168.1.4     : 1 запитів

2. Популярні URL:
   /home                : 3 запитів
   /about               : 2 запитів
   /contact             : 1 запитів
   /products            : 1 запитів
   /api/data            : 1 запитів

3. Розподіл статус-кодів:
   HTTP 200 : 5 запитів
   HTTP 500 : 2 запитів
   HTTP 404 : 1 запитів

4. Запити з помилками:
   192.168.1.3 - - [01/Jan/2024:12:03:00] POST /contact 500 3456
   192.168.1.2 - - [01/Jan/2024:12:04:00] GET /products 404 4567
   192.168.1.3 - - [01/Jan/2024:12:07:00] GET /about 500 7890

5. Статистика розміру відповідей:
   Загальний розмір: 33415 bytes
   Середній розмір: 4176.88 bytes
   Загальних запитів: 8

6. Lineage Graph (приклад для url_counts):
(2) PythonRDD[44] at collect at /tmp/ipython-input-3698269704.py:46 []
 |  MapPartitionsRDD[43] at mapPartitions at PythonRD

ParallelCollectionRDD[22] at readRDDFromFile at PythonRDD.scala:297

#### Ключові концепції

1. **RDD – фундамент Spark**
   - Розуміння RDD критично для глибокого знання Spark
   - DataFrame/Dataset API будуються поверх RDD

2. **Lazy Evaluation**
   - Трансформації – lazy: не виконують код миттєво, а будують логічний DAG; для кожного RDD зберігається lineage – шлях трансформацій, що його створили.
   - Actions – eager: запускають обчислення, перетворюючи логічний план у фізичний DAG.
   - Оптимізація: Spark аналізує весь ланцюжок перед запуском, щоб уникнути зайвих дій.

3. **Immutability**
   - RDD незмінні
   - Спрощує паралелізм та fault tolerance

4. **Партиціонування важливе**
   - Впливає на паралелізм та локальність даних
   - Правило: 2-4 × кількість cores

5. **Shuffle – дорога операція**
   - Уникайте зайвого shuffle
   - Використовуйте `reduceByKey` замість `groupByKey`

RDD – потужний інструмент, але для **structured data** (таблиці, JSON, Parquet) Spark пропонує **DataFrame API**:

**DataFrame** = RDD + схема + автоматичні оптимізації

```
# RDD: ручна робота
rdd.map(...).filter(...).reduceByKey(...)

# DataFrame: декларативний стиль + оптимізації
df.select("name", "age").filter(df.age > 18).groupBy("country").count()
```


Переваги **DataFrame**:

- Декларативний API (що робити, не як)
- Catalyst optimizer – автоматична оптимізація
- Tungsten – ефективне виконання
- SQL підтримка

У **наступній лекції** детально розглянемо DataFrame API та його можливості.

### 9. Корисні ресурси

1. [Офіційна документація по RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

2. [PySpark API](https://spark.apache.org/docs/latest/api/python/)
