# Лекция 4 
# Практика Spark - ETL-пайплайн для отзывов Кинопоиска

---


## Введение в ETL-процессы

### Что такое ETL?
ETL — процесс подготовки данных для анализа и машинного обучения:

```
Raw Data → Extract → Transform → Load → Analytics/ML
(Сырые данные → Извлечение → Трансформация → Загрузка → Аналитика/ML)
```

В нашем случае:
```
131583 текстовых файлов → Spark DataFrame → Очистка → Parquet → Готово для анализа
```

---

## 1. Введение в структурированные данные в Spark

### 1.1. Эволюция Spark API: от RDD к DataFrame
```
Эволюция абстракций данных в Spark:
┌─────────────────────────────────────────────────────────────┐
│                    УРОВЕНЬ АБСТРАКЦИИ                       │
├─────────────────────────────────────────────────────────────┤
│  DataFrame/Dataset (Высокий уровень)                        │
│  • Декларативный SQL-подобный синтаксис                     │
│  • Автоматическая оптимизация через Catalyst                │
│  • Типизированные, структурированные данные                 │
├─────────────────────────────────────────────────────────────┤
│  DataFrame API (Средний уровень)                            │
│  • Табличное представление                                  │
│  • Schema-on-read                                           │
│  • Оптимизации Tungsten                                     │
├─────────────────────────────────────────────────────────────┤
│  RDD API (Низкий уровень)                                   │
│  • Императивный стиль                                       │
│  • Работа с неструктурированными данными                    │
│  • Полный контроль над execution plan                       │
└─────────────────────────────────────────────────────────────┘
```

### 1.2. Архитектурное сравнение RDD vs DataFrame

```mermaid
graph TD
    A[Запрос пользователя] --> B{RDD API}
    A --> C{DataFrame API}
    
    B --> D[Ручная оптимизация]
    B --> E[Functional Programming]
    B --> F[Python/JVM overhead]
    
    C --> G[Catalyst Optimizer]
    C --> H[Tungsten Engine]
    C --> I[Whole-Stage Codegen]
    
    G --> J[Логическая оптимизация]
    G --> K[Физическая оптимизация]
    G --> L[Cost-based оптимизация]
    
    H --> M[Off-heap память]
    H --> N[Векторизованные операции]
    
    I --> O[Генерация Java bytecode]
    
    style C fill:#e1f5e1,stroke:#333,color:#000
    style G fill:#e1f5e1,stroke:#333,color:#000
    style H fill:#e1f5e1,stroke:#333,color:#000
    style I fill:#e1f5e1,stroke:#333,color:#000
```

## 2. Архитектурные различия: RDD vs DataFrame

### 2.1. Структурное сравнение

| Аспект | RDD (Resilient Distributed Dataset) | *DataFrame |
|--------|------------------------------------------|---------------|
| Уровень абстракции | Низкоуровневый | Высокоуровневый |
| Структура данных | Произвольные объекты Python/JVM | Таблица со схемой (строки и колонки) |
| Оптимизация | Нет автоматической оптимизации | Catalyst Optimizer + Tungsten |
| Синтаксис | Функциональный (map, filter, reduce) | Декларативный (SQL-подобный) |
| Типизация | Динамическая (Python) / Статическая (Scala) | Статическая (Schema-on-read) |
| Производительность | Медленнее (Python overhead) | Быстрее (JVM-оптимизации) |


### 2.2. Визуализация архитектурных различий

```
RDD ПРИМЕР:
val rdd = sc.textFile("data.txt")           // RDD[String]
  .flatMap(_.split(" "))                    // RDD[String]  
  .map(word => (word, 1))                   // RDD[(String, Int)]
  .reduceByKey(_ + _)                       // Shuffle
  .collect()

DataFrame ПРИМЕР:
val df = spark.read.text("data.txt")        // DataFrame[value: String]
  .select(explode(split(col("value"), " "))) // DataFrame[word: String]
  .groupBy("word").count()                  // DataFrame[word: String, count: Long]
  .collect()
```


### 2.3. Tungsten Engine: Секретное оружие Spark 

Что такое Tungsten и почему он важен

```mermaid
graph TD
    A[Apache Spark: исполнение запросов] --> B{До Tungsten}
    A --> C{С Tungsten}

    B --> B1[Много Java-объектов в куче JVM]
    B --> B2[Высокий overhead на объект и ссылку]
    B --> B3[Частые GC-паузы]
    B --> B4[Плохая кэш-локальность]

    C --> C1["Бинарный формат данных (как C-структуры)"]
    C --> C2["Off-heap управление памятью"]
    C --> C3["Векторизованные операции (колоночная обработка, SIMD)"]
    C --> C4["Whole-stage codegen (генерация Java bytecode)"]

    C1 --> D1[2-5x меньше памяти]
    C2 --> D2[Меньше нагрузки на GC и пауз]
    C3 --> D3[Рост скорости сканов и агрегаций]
    C4 --> D4[Меньше аллокаций и вызовов, до 10-100x быстрее]

    style C fill:#e1f5e1,stroke:#333,color:#000
    style C1 fill:#e1f5e1,stroke:#333,color:#000
    style C2 fill:#e1f5e1,stroke:#333,color:#000
    style C3 fill:#e1f5e1,stroke:#333,color:#000
    style C4 fill:#e1f5e1,stroke:#333,color:#000
```


Как Tungsten работает:

```python
# ПРИМЕР: Операция length() для 16000 отзывов

# Старый подход (до Tungsten):
for review in reviews:           # Python-итератор
    len(review)                  # Вызов функции Python
    # Накладные расходы: упаковка/распаковка, проверка типов

# Подход с Tungsten:
# 1. Генерация кода для всего этапа выполнения:
# Spark порождает простой байткод Java:
for (int i = 0; i < 16000; i++) {
    int length = utf8Length(data[i]);  # Прямой доступ к памяти
    results[i] = length;               # Запись в двоичный буфер
}

# 2. Векторная обработка (если поддерживается процессором):
# Векторные инструкции процессора (SIMD): обрабатываем 8 строк за одну операцию!
```

### 2.4. Catalyst Optimizer: Как это работает внутри

### 2.1. Фазы оптимизации запроса
```
┌─────────────────────────────────────────────────────────┐
│              ПРОЦЕСС ОПТИМИЗАЦИИ CATALYST               │
├─────────────────────────────────────────────────────────┤
│ ФАЗА 1: Анализ                                          │
│ • Разрешение имен таблиц и колонок                      │
│ • Проверка типов данных                                 │
│ • Валидация SQL-синтаксиса                              │
├─────────────────────────────────────────────────────────┤
│ ФАЗА 2: Логическая оптимизация                          │
│ • Predicate Pushdown (проталкивание предикатов)         │
│ • Projection Pruning (отсечение ненужных колонок)       │
│ • Constant Folding (сворачивание констант)              │
│ • Filter Merging (объединение фильтров)                 │
├─────────────────────────────────────────────────────────┤
│ ФАЗА 3: Физическое планирование                         │
│ • Выбор алгоритмов соединения                           │
│         (широковещательное хеш-соединение               │ 
│             против сортировочного соединения)           │
│ • Оптимизация партиционирования                         │
│ • Локальность данных (data locality)                    │
├─────────────────────────────────────────────────────────┤
│ ФАЗА 4: Генерация кода                                  │
│ • Генерация кода для всего этапа выполнения             │
│ • Векторизованное выполнение                            │
│ • Минимизация виртуальных вызовов                       │
└─────────────────────────────────────────────────────────┘
```

### Сводная таблица отличий

| Признак | Catalyst (Optimizer) | DAG Scheduler |
| :--- | :--- | :--- |
| Уровень | Высокоуровневый (Spark SQL/DataFrames) | Низкоуровневый (Spark Core, RDD) |
| Основная цель | Оптимизация логики запроса | Планирование и выполнение задач на кластере |
| Результат работы | Оптимизированный физический план выполнения (последовательность RDD операций) | DAG этапов (Stages) и управление задачами (Tasks) |
| Ключевая концепция | Правила оптимизации (Predicate Pushdown, Join Selection) | Этапы (Stages), разделённые shuffle-границами |
| Активность | Работает до выполнения, на этапе построения плана | Работает во время выполнения, управляет жизненным циклом задач |
| Пользователь | "Невидим" для программиста, работает под капотом DataFrame API | Его логика отображается в Spark UI на вкладке "Stages" |

## 3. DataFrame API: Основные операции

### 3.1. Иерархия операций DataFrame

Разница: трансформации и действия

- Трансформации (transformations)  
  Описывают, *как* нужно преобразовать данные.  
  Не запускают вычисления сразу (отложенные вычисления, *lazy evaluation*): Spark только строит план, но не читает и не обрабатывает данные до тех пор, пока не вызвано действие.

- Действия (actions)  
  Запускают реальное вычисление.  
  Когда вы вызываете действие, Spark:
  1) строит логический и физический план,  
  2) оптимизирует его,  
  3) выполняет все необходимые трансформации,  
  4) возвращает результат в драйвер или записывает его во внешнее хранилище.

---

### Таблица операций DataFrame

| Тип операции      | Подтип                  | Операция              | Что делает (кратко)                                                                 |
|-------------------|-------------------------|-----------------------|-------------------------------------------------------------------------------------|
| Трансформация | Проекции                | `select()`            | Выбирает подмножество колонок или выражений над колонками                           |
|                   |                         | `selectExpr()`        | То же, что `select`, но выражения задаются строками в виде мини-SQL                 |
|                   |                         | `withColumn()`        | Добавляет новую колонку или переопределяет существующую                             |
| Трансформация | Фильтрация              | `filter()` / `where()`| Оставляет только строки, удовлетворяющие условию                                    |
|                   |                         | `distinct()`          | Удаляет полностью дубликатные строки (по всем колонкам)                             |
|                   |                         | `dropDuplicates()`    | Удаляет дубликаты по указанным колонкам                                             |
| Трансформация | Агрегации (группировка) | `groupBy()`           | Группирует строки по ключу(ам) для последующих агрегатных функций                   |
|                   |                         | `rollup()`            | Иерархическая агрегация (итоги по подуровням и общие итоги)                         |
|                   |                         | `cube()`              | Многомерная агрегация по всем комбинациям измерений                                 |
| Трансформация | Соединения              | `join()`              | Объединяет строки двух DataFrame по условию                                         |
|                   |                         | `union()`             | Складывает строки двух совместимых по схеме DataFrame (как оператор UNION ALL)     |
|                   |                         | `intersect()`         | Оставляет только те строки, которые есть в обоих DataFrame                          |
| Трансформация | Оконные функции         | `window()`            | Определяет «окно» (рамку) по времени/ключам для оконных агрегатных функций          |
|                   |                         | `rank()`              | Присваивает ранг в пределах окна (с возможными «дырами» в нумерации)                |
|                   |                         | `row_number()`        | Нумерует строки подряд в пределах окна                                              |
| Действие      | Сбор данных в драйвер   | `collect()`           | Вычисляет план и возвращает все данные в виде списка объектов в приложении         |
|                   |                         | `show()`              | Вычисляет план и печатает первые N строк в консоль                                  |
|                   |                         | `take()`              | Вычисляет план и возвращает первые N строк в виде списка                            |
| Действие      | Агрегации как результат | `count()`             | Возвращает количество строк (число)                                                 |
|                   |                         | `first()`             | Возвращает первую строку                                                            |
|                   |                         | `reduce()`            | Сводит все элементы с помощью пользовательской функции                              |
| Действие      | Запись                  | `write.csv()`         | Вычисляет план и сохраняет результат в файлы формата CSV                            |
|                   |                         | `write.parquet()`     | Вычисляет план и сохраняет результат в формате Parquet                              |
|                   |                         | `write.json()`        | Вычисляет план и сохраняет результат в формате JSON                                 |

Кратко: трансформации описывают вычисление и возвращают новый DataFrame, а действия запускают вычисление и возвращают «конечный» результат (в память, на экран или в файловую систему).

## 4. Инициализация Spark Session

### 4.1. Архитектура Spark Application

```
Spark Application состоит из:
• Driver Program (ваша программа)
• SparkContext (точка входа для RDD)
• SparkSession (точка входа для DataFrame)
• Cluster Manager (YARN, Mesos, Standalone)
• Executors (рабочие процессы на узлах)

[ВАША ПРОГРАММА] → [DRIVER] → [CLUSTER MANAGER] → [EXECUTORS]
       ↑                              ↓
   Код на Python              Распределяет задачи
```

### Краткий справочник по настройкам (конфигурации) Spark

#### Минимальный набор
```python
spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \          # Все ядра
    .config("spark.sql.shuffle.partitions", "4") \  # Для маленьких данных
    .config("spark.driver.memory", "2g") \          # Память
    .config("spark.sql.adaptive.enabled", "true") \ # Автооптимизация
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")  # Меньше логов
```

#### Память (OOM?)
- `driver/executor.memory` ↑ при OutOfMemory
- `memory.fraction=0.6` (по умолчанию) - память выполнения
- `memory.storageFraction=0.5` - доля для кэша

#### Оптимизации (всегда включать)
- `adaptive.enabled=true` - автооптимизация shuffle
- `autoBroadcastJoinThreshold=10MB` - маленькие таблицы в память
- `files.maxPartitionBytes=128MB` - оптимальный размер партиции

#### Для 16000 мелких файлов
```python
.config("spark.sql.files.openCostInBytes", "4MB")  # Дешевое чтение файлов
.config("spark.default.parallelism", "8")          # Параллелизм по умолчанию
```

#### Правило партиций
- 160MB данных → 4 партиции (shuffle.partitions=4)
- 16GB данных → 200 партиций (по умолчанию)
- >100GB данных → 400+ партиций

#### Частые ошибки
1. Медленный shuffle → уменьшить `shuffle.partitions`
2. OutOfMemory → увеличить `*.memory`
3. Мало задач → увеличить `default.parallelism`
4. Медленные файлы → уменьшить `openCostInBytes`

Итог для ETL: `adaptive=true`, `shuffle.partitions=4`, `memory=2g`

In [2]:
# Установка PySpark (если не установлен)
# !pip install pyspark

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time
import builtins
import glob


### 4.2. Конфигурация для локальной разработки

- `appName("Kinopoisk Reviews ETL")`  
  Имя приложения в Spark UI и логах, на работу не влияет.

- `master("local")`  
  Локальный режим, использовать все ядра машины.

- `spark.sql.shuffle.partitions = 4`  
  Кол-во партиций при шафле (join/groupBy); уменьшено до 4 для маленьких локальных задач.

- `spark.sql.adaptive.enabled = true`  
  Включает адаптивный планировщик SQL: во время выполнения оптимизирует план (партиции, тип join).

- `spark.driver.memory = 2g`  
  Память JVM-процесса драйвера — 2 ГБ.

- `spark.executor.memory = 2g`  
  Память на каждый executor — 2 ГБ (в local фактически та же машина).

- `spark.sparkContext.setLogLevel("WARN")`  
  Логировать только предупреждения и ошибки, скрыть INFO/DEBUG.

#### Инициализация SparkSession с оптимизациями 

In [3]:
spark = SparkSession.builder \
    .appName("Kinopoisk_Reviews_ETL") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/19 15:26:07 WARN Utils: Your hostname, Mordor, resolves to a loopback address: 127.0.1.1; using 192.168.1.179 instead (on interface wlp0s20f3)
25/12/19 15:26:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/19 15:26:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 4. Источники данных в Spark

### 4.1. Поддерживаемые форматы

ТЕКСТОВЫЕ ФОРМАТЫ:
- CSV, JSON, TXT — человекочитаемые, но относительно медленные и занимают больше места  
- XML — иерархические данные (удобен для сложных вложенных структур)
- Логи (форматы с разделителями, произвольный текст) — можно читать как текст и разбирать вручную

БИНАРНЫЕ ФОРМАТЫ (предпочтительны для аналитики):
- Parquet — колоночный, сжатый, хорошо подходит для аналитических запросов
- ORC — колоночный формат, исторически тесно связан с Hive
- Avro — поддерживает эволюцию схемы (изменение структуры данных без потери совместимости)
- Delta Lake, Apache Iceberg, Apache Hudi — форматы «табличных данных на файловой системе» с поддержкой версионирования данных, транзакций и удобной работы с изменениями

БАЗЫ ДАННЫХ И ХРАНИЛИЩА:
- JDBC (PostgreSQL, MySQL, SQL Server и другие реляционные СУБД)
- Cassandra, MongoDB — нереляционные (NoSQL) базы
- HBase, Hive — распределённые хранилища поверх HDFS
- ElasticSearch / OpenSearch — полнотекстовый поиск и аналитика по документам
- Redis и другие ключ-значение хранилища (обычно как источник/кеш, а не основной дата-лейк)

ПОТОКОВЫЕ ИСТОЧНИКИ:
- Kafka — очереди сообщений и стриминг-системы
- Файлы, которые появляются в каталогах (streaming чтение из папок)


### 4.2. Проблема мелких файлов

Пример структуры данных:

reviews/  
├── neg/ (19 804 файлов × 10 КБ ≈ 198 МБ)  
├── pos/ (87 101 файл × 10 КБ ≈ 871 МБ)  
└── neu/ (24 678 файлов × 10 КБ ≈ 247 МБ)  

Всего: 131 583 файла, ≈1,3 ГБ данных

В чём проблема:
- Каждый файл → отдельная задача (task) в Spark  
- Создание задачи занимает ~100 мс (накладные расходы)  
- 131 583 × 0,1 с ≈ 13 000 секунд тратится только на подготовку задач  
- Реальная обработка данных может занимать, например, всего ~1–2 минуты

Как решать:
- Объединять мелкие файлы (операции `coalesce` / `repartition`)  
- Сохранять данные в колоночном формате (например, Parquet)  
- Настраивать размер разделов (партиций), чтобы файлов было меньше и каждый был достаточного размера


---
## 5. Чтение данных из сложной структуры

### 5.1. Структура наших данных

reviews/  
├── neg/           # отрицательные отзывы  
│   ├── XXX-XX.txt  
│   ├── XXX-XX.txt  
│   └── ...  
├── pos/           # положительные отзывы  
│   └── ...  
└── neu/           # нейтральные отзывы  
    └── ...

Каждый файл:
- Расширение: `*.txt`
- Содержимое: текст отзыва
- Метка класса: в имени папки (`neg` / `pos` / `neu`)

Дополнительно:  
Имя файла, например `306-15.txt`, может обозначать идентификатор фильма на сайте (`306`) и порядковый номер рецензии (`15`).


Практика

#### Исследование структуры данных 

- Посмотреть структуру папки reviews/
- Подсчитать количество файлов в каждой категории
- Посмотреть примеры файлов

In [4]:
BASE_PATH = "reviews/archive/dataset" 

if not os.path.exists(BASE_PATH):
    print(f"ОШИБКА: Папка {BASE_PATH} не найдена!")
else:
    print("Структура папок:")
    for root, dirs, files in os.walk(BASE_PATH):
        level = root.replace(BASE_PATH, '').count(os.sep)
        indent = ' ' * 2 * level
        print(f"{indent}{os.path.basename(root)}/ ({len(files)} файлов)")

Структура папок:
dataset/ (0 файлов)
  pos/ (87138 файлов)
  neu/ (24704 файлов)
  neg/ (19827 файлов)


#### Чтение текстовых файлов в DataFrame

Прочитать все текстовые файлы и сохранить их в DataFrame

In [5]:
# чтение всех текстовых файлов с сохранением пути
raw_df = spark.read.text(f"{BASE_PATH}/*/*.txt", wholetext=True)

# посмотрим, что получилось
print("Схема DataFrame:")
raw_df.printSchema()

print(f"\nКоличество прочитанных отзывов: {raw_df.count()}")
print("\nПримеры данных:")
raw_df.show(5, truncate=50)

25/12/19 15:26:10 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: reviews/archive/dataset/*/*.txt.
java.io.FileNotFoundException: File reviews/archive/dataset/*/*.txt does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:980)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1301)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:970)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.sinks.FileStreamSink$.hasMetadata(FileStreamSink.scala:58)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:384)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.c

Схема DataFrame:
root
 |-- value: string (nullable = true)



                                                                                


Количество прочитанных отзывов: 131669

Примеры данных:
+--------------------------------------------------+
|                                             value|
+--------------------------------------------------+
|”Чужого 3” зрители ждали долго. Они ждали его п...|
|Люблю французское кино. Впрочем, я вообще кино ...|
|I. Крылатый Хичкок:\n\nАльфред Хичкок (13.08.18...|
|Помните, в детстве мы все...ну, не все, а те, к...|
|Пересмешник. Североамериканский певчий пересмеш...|
+--------------------------------------------------+
only showing top 5 rows


Проблема: 
мы потеряли информацию о тональности (neg/pos/neu)

### 5.3. Извлечение дополнительных метаданных

Извлекаемые метаданные:
1. Тональность (`neg` / `pos` / `neu`) — из пути
2. Идентификатор фильма — из имени файла (первая часть, до дефиса)
3. Номер рецензии — из имени файла (вторая часть, после дефиса)
4. Дата создания файла — из метаданных файловой системы
5. Размер файла — в байтах

Пример имени файла: `306-15.txt`  
- `306` — идентификатор фильма на сайте  
- `15` — номер рецензии для этого фильма

Регулярные выражения:
- Путь:  
  - `".*/(neg|pos|neu)/.*"`
- Имя файла (выделяем id фильма и номер рецензии):  
  - `".*/(\d+)-(\d+)\.txt"`
- Полный путь с разбором каталога и имени файла:  
  - `"(.*)/(neg|pos|neu)/(\d+-\d+\.txt)"`
---




#### Извлечение метаданных из путей и названия файла

In [6]:
from pyspark.sql.functions import input_file_name, regexp_extract, col, substring

# читаем данные с wholetext=True (каждый файл = одна запись)
raw_df = spark.read.text(f"{BASE_PATH}/*/*.txt", wholetext=True)

# добавляем полный путь к файлу
df_with_path = raw_df.withColumn("file_path", input_file_name())

# извлекаем ВСЕ метаданные из пути
df_with_metadata = df_with_path.withColumn(
    "sentiment",  # тональность: neg/pos/neu
    regexp_extract(col("file_path"), r".*/(neg|pos|neu)/.*", 1)
).withColumn(
    "filename",  # имя файла: 306-15.txt
    regexp_extract(col("file_path"), r".*/([^/]+\.txt)$", 1)
).withColumn(
    "film_id",  # ID фильма: 306 (первая часть до дефиса)
    regexp_extract(col("filename"), r"^(\d+)-\d+\.txt$", 1).cast("int")
).withColumn(
    "review_num",  # номер рецензии: 15 (вторая часть после дефиса) 
    regexp_extract(col("filename"), r"^\d+-(\d+)\.txt$", 1).cast("int")
)

# переименовываем колонку с текстом
df = df_with_metadata.withColumnRenamed("value", "review_text") \
    .drop("filename")  # удаляем промежуточную колонку

# проверяем результат
print("Схема DataFrame с метаданными:")
df.printSchema()

print("\nПример данных (первые 5 записей):")
df.select("film_id", "review_num", "sentiment", 
                substring("review_text", 1, 50).alias("text_preview")) \
        .show(5, truncate=False)

25/12/19 15:26:48 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: reviews/archive/dataset/*/*.txt.
java.io.FileNotFoundException: File reviews/archive/dataset/*/*.txt does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:980)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1301)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:970)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.sinks.FileStreamSink$.hasMetadata(FileStreamSink.scala:58)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:384)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.c

Схема DataFrame с метаданными:
root
 |-- review_text: string (nullable = true)
 |-- file_path: string (nullable = false)
 |-- sentiment: string (nullable = false)
 |-- film_id: integer (nullable = true)
 |-- review_num: integer (nullable = true)


Пример данных (первые 5 записей):
+-------+----------+---------+----------------------------------------------------+
|film_id|review_num|sentiment|text_preview                                        |
+-------+----------+---------+----------------------------------------------------+
|2286   |91        |pos      |”Чужого 3” зрители ждали долго. Они ждали его почт  |
|20326  |3         |pos      |Люблю французское кино. Впрочем, я вообще кино люб  |
|9118   |11        |neu      |I. Крылатый Хичкок:\n\nАльфред Хичкок (13.08.1899 – |
|6044   |4         |pos      |Помните, в детстве мы все...ну, не все, а те, кто   |
|357    |65        |pos      |Пересмешник. Североамериканский певчий пересмешник  |
+-------+----------+---------+----------------


## 6. Трансформации и очистка данных

### 6.1. Типичные проблемы в текстовых данных

```
1. Пропуска:
   • Пустые файлы
   • Файлы только с пробелами
   • NULL значения

2. Аномалии:
   • Слишком короткие отзывы (< 10 символов)
   • Слишком длинные отзывы (> 5000 символов)
   • Невалидная кодировка

3. Структурнык проблемы:
   • HTML-теги в тексте
   • Спецсимволы
   • Лишние пробелы, переносы строк
```

### 1. Как Catalyst Optimizer оптимизирует наши проверки

Все наши фильтры объединяются в один оптимизированный запрос.

```python
# Наш код:
df_cleaned = df.filter(
    (col("review_length") >= 10) & 
    (col("review_length") <= 5000) &
    (col("sentiment").isNotNull()) &
    (col("film_id") > 0)
)

# Catalyst делает:
# 1. Filter Merging: объединяет все условия в один WHERE
# 2. Predicate Pushdown: если данные в Parquet, фильтрует при чтении
# 3. Constant Folding: вычисляет 10 <= review_length <= 5000 один раз

# Итоговый план: Scan → Filter(все_условия_сразу) → Project
```

### 2. Векторизованные операции Tungsten для текста

Почему length() и split() работают быстро:
- Tungsten генерирует нативный байт-код
- Обработка строк оптимизирована через sun.misc.Unsafe
- Минимизация аллокаций объектов Python

```python
# Под капотом:
# Наш код: length(col("review_text"))
# Tungsten: генерирует цикл с прямым доступом к памяти UTF-8 строк
```

### 3. Статистика в Spark SQL для определения границ

```python
# Мы используем эмпирические границы (10, 5000)
# На практике можно использовать статистику Spark:

# Spark собирает статистику при записи в Parquet
# Можно использовать для оптимизации:
# - min/max значения колонок
# - количество null значений
# - приблизительные квантили
```

### 4. Spark SQL vs DataFrame API для проверок

```python
# Оба подхода дают одинаковый оптимизированный план:

# DataFrame API
df.filter(col("review_length") > 100)

# Spark SQL  
spark.sql("SELECT * FROM reviews WHERE review_length > 100")

# Catalyst создает одинаковое AST для обоих
```

### 5. Оптимизация группировок для проверки дубликатов

```python
# Наша проверка дубликатов:
df.groupBy("film_id", "review_num").count().filter(col("count") > 1)

# Catalyst оптимизирует:
# 1. Использует hash-based агрегацию
# 2. Применяет partial aggregation до shuffle
# 3. Использует bloom filters для ускорения
```

Все наши проверки качества данных в DataFrame API автоматически оптимизируются Catalyst и выполняются эффективно благодаря Tungsten, даже если мы пишем их как простые цепочки фильтров.

#### Обработка ошибок и очистка

In [7]:
from pyspark.sql.functions import col, when, count, length, split, size, countDistinct, avg, min, max, stddev

# 1. проверяем пропуски по всем колонкам
print("Проверка пропусков:")
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# 2. добавляем метрики качества
df_enhanced = df.withColumn("review_length", length(col("review_text"))) \
                .withColumn("word_count", size(split(col("review_text"), "\\s+")))

# 3. проверяем корректность film_id и review_num
print("\nПроверка корректности числовых полей:")
df_enhanced.select(
    count(when(col("film_id").isNull() | (col("film_id") <= 0), True)).alias("bad_film_id"),
    count(when(col("review_num").isNull() | (col("review_num") <= 0), True)).alias("bad_review_num")
).show()

# 4. проверяем дубликаты (уникальность film_id + review_num)
print("\nПроверка дубликатов (film_id + review_num):")
duplicate_count = df_enhanced.groupBy("film_id", "review_num") \
    .count() \
    .filter(col("count") > 1) \
    .count()
print(f"Найдено дублирующихся пар film_id+review_num: {duplicate_count}")

# 5. анализ распределения длины
print("\nСтатистика по длине отзывов:")
df_enhanced.select(
    min("review_length").alias("min_len"),
    max("review_length").alias("max_len"),
    avg("review_length").alias("avg_len"),
    stddev("review_length").alias("std_len")
).show()

# 6. расширенная фильтрация аномалий
df_cleaned = df_enhanced.filter(
    (col("review_length") >= 10) & 
    (col("review_length") <= 5000) &
    (col("sentiment").isNotNull()) &
    (col("film_id").isNotNull()) &
    (col("film_id") > 0) &
    (col("review_num").isNotNull()) &
    (col("review_num") > 0)
)

print(f"\nСтатистика очистки:")
print(f"До очистки: {df_enhanced.count()} записей")
print(f"После очистки: {df_cleaned.count()} записей")
print(f"Удалено: {df_enhanced.count() - df_cleaned.count()} записей")

# 7. дополнительная проверка: анализ film_id по тональности
print("\nАнализ film_id по категориям тональности:")
df_cleaned.groupBy("sentiment").agg(
    countDistinct("film_id").alias("unique_films"),
    avg("review_length").alias("avg_review_len")
).show()

Проверка пропусков:


                                                                                

+-----------+---------+---------+-------+----------+
|review_text|file_path|sentiment|film_id|review_num|
+-----------+---------+---------+-------+----------+
|          0|        0|        0|      0|         0|
+-----------+---------+---------+-------+----------+


Проверка корректности числовых полей:


                                                                                

+-----------+--------------+
|bad_film_id|bad_review_num|
+-----------+--------------+
|          0|         15613|
+-----------+--------------+


Проверка дубликатов (film_id + review_num):


                                                                                

Найдено дублирующихся пар film_id+review_num: 86

Статистика по длине отзывов:


                                                                                

+-------+-------+-----------------+------------------+
|min_len|max_len|          avg_len|           std_len|
+-------+-------+-----------------+------------------+
|     58|  13642|2262.584852926657|1367.3904526812264|
+-------+-------+-----------------+------------------+


Статистика очистки:


                                                                                

До очистки: 131669 записей


                                                                                

После очистки: 109998 записей


                                                                                

Удалено: 21671 записей

Анализ film_id по категориям тональности:




+---------+------------+------------------+
|sentiment|unique_films|    avg_review_len|
+---------+------------+------------------+
|      neg|        3641|2092.0192849014793|
|      neu|        4868|2012.4027764103573|
|      pos|        7876|2040.0542425363421|
+---------+------------+------------------+



                                                                                

### 6.3. Анализ качества данных (EDA)

```
Метрика для анализа:
1. Количество отзывов по тональности
2. Распределение длины отзывов
3. Средняя/медианная длина
4. Стандартное отклонение
5. Минимум/максимум

Статистические границы:
• Q1 - 1.5×IQR → нижняя граница выбросов
• Q3 + 1.5×IQR → верхняя граница выбросов
• IQR = Q3 - Q1 (интерквартильный размах)
```

---

### 1. Статистические границы выбросов (IQR метод)

In [8]:
from pyspark.sql.functions import expr

# квантили для каждой категории
print("\nСтатистические границы выбросов (IQR метод):")
iqr_stats = df_cleaned.groupBy("sentiment").agg(
    expr("percentile_approx(review_length, 0.25)").alias("q1"),
    expr("percentile_approx(review_length, 0.75)").alias("q3")
).withColumn("iqr", col("q3") - col("q1")) \
 .withColumn("lower_bound", col("q1") - 1.5 * col("iqr")) \
 .withColumn("upper_bound", col("q3") + 1.5 * col("iqr"))

iqr_stats.show()

# подсчет выбросов по категориям
print("\nКоличество выбросов по IQR методу:")
for row in iqr_stats.collect():
    sentiment = row["sentiment"]
    lower = row["lower_bound"]
    upper = row["upper_bound"]
    
    outliers = df_cleaned.filter(
        (col("sentiment") == sentiment) & 
        ((col("review_length") < lower) | (col("review_length") > upper))
    ).count()
    
    print(f"{sentiment}: {outliers} выбросов (границы: {lower:.0f}-{upper:.0f})")


Статистические границы выбросов (IQR метод):


                                                                                

+---------+----+----+----+-----------+-----------+
|sentiment|  q1|  q3| iqr|lower_bound|upper_bound|
+---------+----+----+----+-----------+-----------+
|      neu|1199|2669|1470|    -1006.0|     4874.0|
|      neg|1296|2748|1452|     -882.0|     4926.0|
|      pos|1236|2677|1441|     -925.5|     4838.5|
+---------+----+----+----+-----------+-----------+


Количество выбросов по IQR методу:


                                                                                

neu: 113 выбросов (границы: -1006-4874)


                                                                                

neg: 65 выбросов (границы: -882-4926)




pos: 523 выбросов (границы: -926-4838)


                                                                                

### 2. Анализ распределения film_id и review_num

In [9]:
print("\nАнализ film_id (сколько отзывов на фильм):")
film_stats = df_cleaned.groupBy("film_id").agg(
    count("*").alias("review_count"),
    avg("review_length").alias("avg_length")
).orderBy(col("review_count").desc())

print("Топ-10 фильмов по количеству отзывов:")
film_stats.show(10)

print("\nСтатистика по review_num (проверка целостности нумерации):")
review_num_stats = df_cleaned.groupBy("film_id").agg(
    min("review_num").alias("min_num"),
    max("review_num").alias("max_num"),
    count("*").alias("actual_count"),
    (max("review_num") - min("review_num") + 1).alias("expected_count")
).withColumn("missing_reviews", col("expected_count") - col("actual_count"))

print("Фильмы с пропущенными номерами рецензий:")
review_num_stats.filter(col("missing_reviews") > 0).show(10)


Анализ film_id (сколько отзывов на фильм):
Топ-10 фильмов по количеству отзывов:


                                                                                

+-------+------------+------------------+
|film_id|review_count|        avg_length|
+-------+------------+------------------+
|1048334|         104| 2482.769230769231|
| 481086|          99| 2388.969696969697|
| 406141|          99|1373.4141414141413|
|  77454|          99|1722.8080808080808|
|   2950|          99| 2112.626262626263|
|   8219|          99|1837.3232323232323|
| 279627|          99|1610.1515151515152|
| 493768|          99|1987.2424242424242|
| 396193|          98| 2064.816326530612|
|  47814|          98| 1875.795918367347|
+-------+------------+------------------+
only showing top 10 rows

Статистика по review_num (проверка целостности нумерации):
Фильмы с пропущенными номерами рецензий:




+-------+-------+-------+------------+--------------+---------------+
|film_id|min_num|max_num|actual_count|expected_count|missing_reviews|
+-------+-------+-------+------------+--------------+---------------+
| 279600|      1|     99|          95|            99|              4|
|1114927|      1|     99|          92|            99|              7|
|   2962|      1|     63|          61|            63|              2|
| 647676|      1|     34|          33|            34|              1|
|  77540|      1|     99|          90|            99|              9|
|   6241|      1|     23|          22|            23|              1|
| 405608|      1|     99|          89|            99|             10|
| 500617|      1|     99|          86|            99|             13|
| 397220|      1|     45|          43|            45|              2|
| 542581|      1|     99|          95|            99|              4|
+-------+-------+-------+------------+--------------+---------------+
only showing top 10 

                                                                                

### 3. Анализ корреляции между длиной и тональностью

In [10]:
from pyspark.sql.functions import corr

print("\nКорреляция длины отзыва и тональности:")
# преобразуем тональность в числовой формат для корреляции
df_numeric = df_cleaned.withColumn(
    "sentiment_num", 
    when(col("sentiment") == "neg", 1)
    .when(col("sentiment") == "neu", 2)
    .when(col("sentiment") == "pos", 3)
    .otherwise(0)
)

correlation = df_numeric.select(
    corr("sentiment_num", "review_length").alias("correlation")
).first()[0]

print(f"Корреляция между тональностью и длиной отзыва: {correlation:.3f}")

# интерпретация:
if correlation > 0.3:
    print("Заметная положительная корреляция: позитивные отзывы длиннее")
elif correlation < -0.3:
    print("Заметная отрицательная корреляция: негативные отзывы длиннее")
else:
    print("Слабая корреляция: длина не зависит от тональности")


Корреляция длины отзыва и тональности:




Корреляция между тональностью и длиной отзыва: -0.012
Слабая корреляция: длина не зависит от тональности


                                                                                

### 4. Анализ частоты слов по тональности

In [11]:
from pyspark.sql.functions import explode, split, lower, length, col

print("\nАнализ частоты слов по тональностям (топ-10):")

# разбиваем текст на слова и анализируем
words_df = df_cleaned.select(
    "sentiment",
    explode(split(lower(col("review_text")), "\\s+")).alias("word")
).filter(
    (length(col("word")) > 3) &  # игнорируем короткие слова
    (~col("word").rlike(r"\d+"))  # игнорируем числа
)

# топ слова для каждой тональности
for sentiment in ["pos", "neg", "neu"]:
    print(f"\nТоп-10 слов для {sentiment}:")
    words_df.filter(col("sentiment") == sentiment) \
        .groupBy("word") \
        .count() \
        .orderBy(col("count").desc()) \
        .limit(10) \
        .show(truncate=False)


Анализ частоты слов по тональностям (топ-10):

Топ-10 слов для pos:


                                                                                

+-------+------+
|word   |count |
+-------+------+
|фильм  |110927|
|очень  |85295 |
|только |55886 |
|просто |51530 |
|даже   |51433 |
|этот   |50823 |
|если   |45758 |
|который|41313 |
|когда  |40761 |
|фильма |38620 |
+-------+------+


Топ-10 слов для neg:


                                                                                

+------+-----+
|word  |count|
+------+-----+
|фильм |26560|
|даже  |15791|
|только|15344|
|если  |15074|
|просто|14430|
|было  |13865|
|очень |13269|
|фильма|11570|
|можно |11197|
|этот  |10972|
+------+-----+


Топ-10 слов для neu:




+------+-----+
|word  |count|
+------+-----+
|фильм |31143|
|очень |19027|
|только|15679|
|если  |15552|
|даже  |15260|
|было  |13448|
|просто|12980|
|фильма|12238|
|можно |12235|
|этот  |11120|
+------+-----+



                                                                                

### 5. Визуализация распределения через квантили




In [12]:
print("\nРаспределение длин через квантили:")
for sentiment in ["pos", "neg", "neu"]:
    subset = df_cleaned.filter(col("sentiment") == sentiment)
    quantiles = subset.approxQuantile(
        "review_length", 
        [0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99], 
        0.01
    )
    
    print(f"\n{sentiment.upper()} - квантили длины:")
    print(f"  10%: {quantiles[0]:.0f}  |  25%: {quantiles[1]:.0f}  |  50%: {quantiles[2]:.0f}")
    print(f"  75%: {quantiles[3]:.0f}  |  90%: {quantiles[4]:.0f}  |  95%: {quantiles[5]:.0f}")
    print(f"  99%: {quantiles[6]:.0f}")

    if quantiles[6] > 3000:
        print(f"  Внимание: 1% отзывов длиннее {quantiles[6]:.0f} символов")


Распределение длин через квантили:


                                                                                


POS - квантили длины:
  10%: 867  |  25%: 1250  |  50%: 1828
  75%: 2692  |  90%: 3571  |  95%: 4119
  99%: 5000
  Внимание: 1% отзывов длиннее 5000 символов


                                                                                


NEG - квантили длины:
  10%: 861  |  25%: 1298  |  50%: 1903
  75%: 2707  |  90%: 3684  |  95%: 4034
  99%: 5000
  Внимание: 1% отзывов длиннее 5000 символов





NEU - квантили длины:
  10%: 725  |  25%: 1176  |  50%: 1799
  75%: 2673  |  90%: 3564  |  95%: 4080
  99%: 5000
  Внимание: 1% отзывов длиннее 5000 символов


                                                                                

#### Разведочный анализ данных (EDA) 

In [13]:
# 1. базовая статистика по категориям
print("Статистика по категориям тональности:")
summary_df = df_cleaned.groupBy("sentiment").agg(
    count("*").alias("count_reviews"),
    avg("review_length").alias("avg_length"),
    stddev("review_length").alias("std_length"),
    min("review_length").alias("min_length"),
    max("review_length").alias("max_length"),
    expr("percentile_approx(review_length, 0.5)").alias("median_length")
).orderBy("count_reviews", ascending=False)

summary_df.show()

# 2. топ-5 самых длинных отзывов по категориям
window_spec = Window.partitionBy("sentiment").orderBy(col("review_length").desc())

df_with_rank = df_cleaned.withColumn("rank", row_number().over(window_spec))

print("\nТоп-3 самых длинных отзывов по категориям:")
top_reviews = df_with_rank.filter(col("rank") <= 3) \
                         .select("sentiment", "rank", "review_length", 
                                 substring("review_text", 1, 100).alias("preview"))
top_reviews.show(truncate=False)

# 3. гистограмма длин (псевдографика)
print("\nГистограмма распределения длин (по категориям):")



# сначала соберем все данные для гистограммы
for sentiment in ["pos", "neg", "neu"]:
    subset = df_cleaned.filter(col("sentiment") == sentiment)
    
    # собираем длины в Python список
    lengths = [row.review_length for row in subset.select("review_length").collect()]
    
    if lengths:  # проверяем, что список не пустой
        # используем builtins.min/max вместо переопределенных PySpark
        min_len = builtins.min(lengths)
        max_len = builtins.max(lengths)
        
        # создаем bins вручную
        num_bins = 10
        bin_width = (max_len - min_len) / num_bins
        
        bins = [min_len + i * bin_width for i in range(num_bins + 1)]
        hist = [0] * num_bins
        
        for length in lengths:
            for i in range(num_bins):
                if bins[i] <= length < bins[i+1]:
                    hist[i] += 1
                    break
            else:
                # для последнего значения (включая границу)
                if length == bins[-1]:
                    hist[-1] += 1
        
        print(f"\n{sentiment.upper()} (всего {len(lengths)} отзывов):")
        
        # находим максимум для нормализации
        max_count = builtins.max(hist) if hist else 1
        
        for i in range(num_bins):
            bar_length = int(hist[i] / max_count * 50) if max_count > 0 else 0
            bar = "#" * bar_length
            print(f"{int(bins[i]):4d}-{int(bins[i+1]):4d} chars: {bar} ({hist[i]} reviews)")
    else:
        print(f"\n{sentiment.upper()}: нет данных")

Статистика по категориям тональности:


                                                                                

+---------+-------------+------------------+------------------+----------+----------+-------------+
|sentiment|count_reviews|        avg_length|        std_length|min_length|max_length|median_length|
+---------+-------------+------------------+------------------+----------+----------+-------------+
|      pos|        72987|2040.0542425363421| 1051.288871516491|        62|      5000|         1837|
|      neu|        20314|2012.4027764103573|1072.9284107579115|        58|      5000|         1824|
|      neg|        16697|2092.0192849014793|1045.0218599134996|        98|      5000|         1902|
+---------+-------------+------------------+------------------+----------+----------+-------------+


Топ-3 самых длинных отзывов по категориям:


                                                                                

+---------+----+-------------+----------------------------------------------------------------------------------------------------+
|sentiment|rank|review_length|preview                                                                                             |
+---------+----+-------------+----------------------------------------------------------------------------------------------------+
|neg      |1   |5000         |'Богопротивная, дрянная вещь тоска...' когда-то жаловался Верлен. Фильм Меркуловой и Чупова про чело|
|neg      |2   |4996         |Шёл год 1997. Почти – конец века, и практически – начало новой эры развлекательного кинематографа. К|
|neg      |3   |4995         |Месть — вредящие действия, произведенные из побуждения покарать за реальную или мнимую несправедливо|
|neu      |1   |5000         |Шедевры о прочности веры появляются на экране крайне редко, в лучшем случае один раз в десятилетие, |
|neu      |2   |4998         |90-ые… сколько же воспоминаний накрывает, когд

                                                                                


POS (всего 72987 отзывов):
  62- 555 chars: ####### (2368 reviews)
 555-1049 chars: ################################# (10239 reviews)
1049-1543 chars: ################################################## (15283 reviews)
1543-2037 chars: ############################################ (13707 reviews)
2037-2531 chars: ################################## (10595 reviews)
2531-3024 chars: ######################## (7590 reviews)
3024-3518 chars: ################ (5129 reviews)
3518-4012 chars: ############ (3675 reviews)
4012-4506 chars: ######## (2539 reviews)
4506-5000 chars: ###### (1862 reviews)


                                                                                


NEG (всего 16697 отзывов):
  98- 588 chars: ####### (537 reviews)
 588-1078 chars: ############################### (2198 reviews)
1078-1568 chars: ################################################## (3438 reviews)
1568-2058 chars: ############################################# (3107 reviews)
2058-2549 chars: ################################### (2428 reviews)
2549-3039 chars: ########################## (1819 reviews)
3039-3529 chars: ################## (1268 reviews)
3529-4019 chars: ############ (893 reviews)
4019-4509 chars: ######## (586 reviews)
4509-5000 chars: ###### (423 reviews)





NEU (всего 20314 отзывов):
  58- 552 chars: ############ (1034 reviews)
 552-1046 chars: ################################## (2816 reviews)
1046-1540 chars: ################################################## (4071 reviews)
1540-2034 chars: ############################################# (3741 reviews)
2034-2529 chars: ################################### (2898 reviews)
2529-3023 chars: ######################### (2088 reviews)
3023-3517 chars: ################# (1427 reviews)
3517-4011 chars: ############ (1028 reviews)
4011-4505 chars: ######## (695 reviews)
4505-5000 chars: ###### (516 reviews)


                                                                                




### 7.1. Как посмотреть план выполнения
```mermaid
graph LR
    A[Наш запрос<br/>filter + groupBy + agg] --> B
    
    subgraph B [Catalyst Optimizer]
        B1[Анализ запроса] --> B2[Логическая оптимизация]
        B2 --> B3[Физическое планирование]
    end
    
    B --> C
    
    subgraph C [Оптимизации для нашего ETL]
        C1[Объединение фильтров<br/>sentiment='pos' & length>100]
        C2[Отсечение колонок<br/>Только нужные поля]
        C3[Проталкивание предикатов<br/>в Parquet reader]
    end
    
    C --> D[Tungsten Engine]
    
    D --> E[Code Generation<br/>Быстрые примитивы]
    D --> F[Векторизация<br/>Обработка батчами]
    
    E --> G[Результат]
    F --> G
    
    style B fill:#e1f5fe,color:#000000
    style C fill:#d4edda,color:#000000
    style D fill:#fff3cd,color:#000000
```


### 7.2. Типы планов выполнения

```
1. Логический план:
   • Описывает что нужно сделать
   • Не учитывает распределенность
   • Пример: Scan → Filter → Project → Aggregate

2. Физический план:
   • Описывает как это сделать
   • Учитывает распределенность, shuffle
   • Пример: Scan → Filter → HashAggregate → Exchange → HashAggregate

3. Оптимизированный логический план:
   • После применения правил оптимизации
   • Predicate Pushdown, Projection Pruning, Constant Folding
```

In [14]:
from pyspark.sql.functions import col, count, avg

# Покажем разницу между неоптимизированным и оптимизированным запросом
print("Сравнение: DataFrame API и Ручная оптимизация")

# неоптимизированный подход (частая ошибка)
print("\nНЕОПТИМИЗИРОВАННЫЙ подход:")
bad_query = df_cleaned \
    .select("*") \
    .filter(col("sentiment") == "pos") \
    .select("sentiment", "review_length") \
    .filter(col("review_length") > 100) \
    .groupBy("sentiment") \
    .agg(count("*").alias("count"), avg("review_length").alias("avg_len"))

print("План выполнения (плохой):")
bad_query.explain("simple")

# оптимизированный подход (наш)
print("\nОПТИМИЗИРОВАННЫЙ подход (наш ETL):")
good_query = df_cleaned \
    .filter((col("sentiment") == "pos") & (col("review_length") > 100)) \
    .groupBy("sentiment") \
    .agg(count("*").alias("count"), avg("review_length").alias("avg_len"))

print("План выполнения (хороший):")
good_query.explain("simple")

Сравнение: DataFrame API и Ручная оптимизация

НЕОПТИМИЗИРОВАННЫЙ подход:
План выполнения (плохой):
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[sentiment#15], functions=[count(1), avg(review_length#74)])
   +- Exchange hashpartitioning(sentiment#15, 4), ENSURE_REQUIREMENTS, [plan_id=1981]
      +- HashAggregate(keys=[sentiment#15], functions=[partial_count(1), partial_avg(review_length#74)])
         +- Project [regexp_extract(file_path#14, .*/(neg|pos|neu)/.*, 1) AS sentiment#15, length(value#12) AS review_length#74]
            +- Filter (isnotnull(value#12) AND (((((((length(value#12) >= 10) AND (length(value#12) <= 5000)) AND isnotnull(cast(regexp_extract(regexp_extract(file_path#14, .*/([^/]+\.txt)$, 1), ^(\d+)-\d+\.txt$, 1) as int))) AND (cast(regexp_extract(regexp_extract(file_path#14, .*/([^/]+\.txt)$, 1), ^(\d+)-\d+\.txt$, 1) as int) > 0)) AND isnotnull(cast(regexp_extract(regexp_extract(file_path#14, .*/([^/]+\.txt)$, 1), ^\d+-(\d+)\.txt$, 1

#### Оптимизация через Catalyst Optimizer  


In [15]:
# cоздаём сложный запрос
complex_query = df_cleaned \
    .filter(col("sentiment") == "pos") \
    .filter(col("review_length") > 100) \
    .groupBy("sentiment") \
    .agg(
        count("*").alias("count"),
        avg("review_length").alias("avg_len"),
        expr("percentile_approx(review_length, 0.9)").alias("p90_len")
    )

print("План выполнения запроса")
print("\nФорматированный план:")
complex_query.explain("formatted")

print("\nСравнение с SQL")
df_cleaned.createOrReplaceTempView("cleaned_reviews")

sql_query = """
SELECT sentiment,
       COUNT(*) as count,
       AVG(review_length) as avg_len,
       PERCENTILE_APPROX(review_length, 0.9) as p90_len
FROM cleaned_reviews
WHERE sentiment = 'pos' AND review_length > 100
GROUP BY sentiment
"""

sql_result = spark.sql(sql_query)
print("SQL план выполнения:")
sql_result.explain()

# cравним производительность

print("\nСравнение производительности")

start = time.time()
df_result = complex_query.collect()
df_time = time.time() - start

start = time.time()
sql_result.collect()
sql_time = time.time() - start

print(f"DataFrame API время: {df_time:.3f} сек")
print(f"Spark SQL время: {sql_time:.3f} сек")

План выполнения запроса

Форматированный план:
== Physical Plan ==
AdaptiveSparkPlan (8)
+- ObjectHashAggregate (7)
   +- Exchange (6)
      +- ObjectHashAggregate (5)
         +- Project (4)
            +- Filter (3)
               +- Project (2)
                  +- Scan text  (1)


(1) Scan text 
Output [1]: [value#12]
Batched: false
Location: InMemoryFileIndex [file:/home/shoose/Документы/reviews/archive/dataset/neg/1000083-0.txt, ... 131668 entries]
ReadSchema: struct<value:string>

(2) Project
Output [2]: [value#12, input_file_name() AS file_path#14]
Input [1]: [value#12]

(3) Filter
Input [2]: [value#12, file_path#14]
Condition : (isnotnull(value#12) AND (((((((length(value#12) >= 10) AND (length(value#12) <= 5000)) AND isnotnull(cast(regexp_extract(regexp_extract(file_path#14, .*/([^/]+\.txt)$, 1), ^(\d+)-\d+\.txt$, 1) as int))) AND (cast(regexp_extract(regexp_extract(file_path#14, .*/([^/]+\.txt)$, 1), ^(\d+)-\d+\.txt$, 1) as int) > 0)) AND isnotnull(cast(regexp_extract(regexp

[Stage 100:>                                                        (0 + 1) / 1]

DataFrame API время: 22.388 сек
Spark SQL время: 22.219 сек


                                                                                


## Как Catalyst оптимизирует НАШ конкретный запрос

### Наш запрос из практики:
```python
complex_query = df_cleaned \
    .filter(col("sentiment") == "pos") \
    .filter(col("review_length") > 100) \
    .groupBy("sentiment") \
    .agg(
        count("*").alias("count"),
        avg("review_length").alias("avg_len"),
        expr("percentile_approx(review_length, 0.9)").alias("p90_len")
    )
```

### Обоснование оптимизаций Catalyst:

1. Filter Merging (объединение фильтров):
```python
# Что мы написали:
.filter(col("sentiment") == "pos") \
.filter(col("review_length") > 100) \

# Что делает Catalyst:
.filter((col("sentiment") == "pos") & (col("review_length") > 100))
```
Почему это важно: Вместо двух проходов по данным - один проход с объединенным условием.

2. Predicate Pushdown (проталкивание предикатов):
```
Если наши данные хранятся в Parquet (а мы их туда запишем),
Catalyst "проталкивает" фильтры в чтение данных:

ЧТЕНИЕ БЕЗ оптимизации:
1. Читаем ВСЕ данные из Parquet
2. Фильтруем в памяти

ЧТЕНИЕ С оптимизацией:
1. Читаем ТОЛЬКО данные где sentiment='pos' и review_length>100
2. Уже отфильтрованные данные попадают в память
```

3. Projection Pruning (отсечение колонок):
```python
# Наш запрос использует только:
# - sentiment (для фильтра и группировки)
# - review_length (для фильтра и агрегаций)

# Catalyst понимает, что НЕ нужны:
# - review_text (большой текст)
# - file_path (путь к файлу)
# - film_id, review_num (метаданные)

# Результат: читаем только 2 колонки вместо 6!
```

### Визуализация оптимизации для нашего запроса:

```mermaid
graph TD
    A[Наш запрос<br/>filter→filter→groupBy→agg] --> B
    
    subgraph B [Logical Plan до оптимизации]
        B1[Project: sentiment, count, avg_len, p90_len]
        B2[HashAggregate: GROUP BY sentiment]
        B3[Filter: review_length > 100]
        B4[Filter: sentiment = 'pos']
        B5[Scan: ВСЕ 6 колонок]
        
        B1 --> B2 --> B3 --> B4 --> B5
    end
    
    A --> C
    
    subgraph C [Optimized Logical Plan после Catalyst]
        C1[Project: sentiment, count, avg_len, p90_len]
        C2[HashAggregate: GROUP BY sentiment]
        C3[Filter: sentiment='pos' AND review_length>100]
        C4[Scan: ТОЛЬКО sentiment, review_length]
        
        C1 --> C2 --> C3 --> C4
    end
    
    style B fill:#ffebee
    style C fill:#e8f5e9
```

### Числовая выгода для нашего ETL:

```
РАСЧЕТ ЭКОНОМИИ:

БЕЗ оптимизации:
• Чтение: 6 колонок × 131000 записей
• Фильтрация: 2 прохода по данным
• Память: все колонки в памяти

С оптимизацией Catalyst:
• Чтение: 2 колонки × ~8000 записей (после фильтрации)
• Фильтрация: 1 проход по данным
• Память: только нужные колонки

ЭКОНОМИЯ:
• Данные для чтения: в 3 раза меньше (6→2 колонок)
• Данные в памяти: в 3 раза меньше
• Операции фильтрации: в 2 раза меньше
```

### Почему SQL дает тот же план выполнения:

```python
# DataFrame API
complex_query = df_cleaned.filter(col("sentiment") == "pos")...

# Spark SQL
sql_query = "SELECT ... WHERE sentiment = 'pos' ..."

# Catalyst создает ОДИН Abstract Syntax Tree (AST) для обоих!
```

Ключевой вывод: Независимо от того, пишем ли мы на DataFrame API или SQL, Catalyst применяет одинаковые оптимизации. Это делает Spark декларативным движком - мы говорим "что" нужно сделать, а Spark решает "как" это сделать оптимально.
Это и есть сила Catalyst Optimizer - он позволяет писать чистый, понятный код, не жертвуя производительностью.

## 8. Оптимизированная запись данных

## 8.1. Детальное сравнение форматов (таблица)

| Параметр | CSV | JSON | Parquet | ORC | Avro |
|-------------|---------|----------|-------------|---------|----------|
| Тип формата | Текстовый | Текстовый | Бинарный, колоночный | Бинарный, колоночный | Бинарный, строчный |
| Сжатие | Обычно нет | Обычно нет | Отличное (3-5x) | Отличное (3-5x) | Хорошее (2-3x) |
| Чтение | Медленное | Медленное | Быстрое (predicate pushdown) | Быстрое | Среднее |
| Запись | Медленная | Медленная | Средняя | Средняя | Быстрая |
| Схема | Нет (все строки) | Полуструктурированный | Строгая (типы данных) | Строгая | Schema evolution |
| Поддержка в Spark | Полная | Полная | Нативная оптимизация | Хорошая | Хорошая |
| Идеальный случай | Маленькие данные, обмен | Веб-API, полуструктурированные | Аналитика, большие данные | Hive, аналитика | Потоковая обработка |

### Для нашего ETL с отзывами:
- CSV: 160MB → ~200MB (без сжатия)
- Parquet: 160MB → ~40MB (сжатие 4x) 
- Чтение только негативных отзывов: 
  - CSV: читаем 200MB, фильтруем в памяти
  - Parquet: читаем ~13MB (только папку sentiment=neg/)

---

## 8.2. Алгоритмы сжатия в Parquet

| Алгоритм | Скорость | Сжатие | CPU Usage | Рекомендация |
|-------------|-------------|------------|---------------|------------------|
| uncompressed | Быстрее всего | Нет сжатия | Низкий | Только для тестирования |
| snappy | Быстрое | Хорошее (2-3x) | Низкий | По умолчанию, баланс скорости/сжатия |
| gzip | Медленное | Отличное (4-5x) | Высокий | Для архивных данных |
| lzo | Быстрое | Среднее | Низкий | Для Hadoop экосистемы |
| zstd | Средняя | Отличное (4-5x) | Средний | Современная альтернатива gzip |

Для нашего ETL: `compression="snappy"` - оптимальный баланс.

---

## 8.3. Стратегии партиционирования

| Стратегия | Преимущества | Недостатки | Когда использовать |
|--------------|-----------------|----------------|------------------------|
| По тональности (sentiment) | Быстрые фильтры по sentiment | 3 папки, макс 3 ускорение | ✅ НАШ СЛУЧАЙ - частые фильтры по sentiment |
| По film_id | Быстрые запросы по фильму | Слишком много папок (1000+) | Если часто ищут по конкретным фильмам |
| По дате (год/месяц) | Временные анализы | Нужна колонка даты | Для временных рядов |
| Bucketing (по film_id) | Оптимизация JOIN | Сложная настройка | Для частых JOIN с таблицей фильмов |
| Комбинированное (sentiment + год) | Максимальная гибкость | Сложное управление | Для production систем |

---

## 8.4. Расчет оптимального количества файлов (таблица)

| Размер данных | Рекомендуемое кол-во файлов | Размер файла | Пример для наших данных |
|------------------|--------------------------------|------------------|----------------------------|
| < 1GB | 2-8 файлов | 64-512MB | 160MB → 4 файла по ~40MB |
| 1-10GB | 10-100 файлов | 100MB-1GB | - |
| 10-100GB | 100-500 файлов | 100MB-200MB | - |
| > 100GB | 500-2000 файлов | 100MB-200MB | - |

Формула расчета:
```
Оптимальное_кол-во_файлов = ceil(Общий_размер / 128MB)
Для наших данных: ceil(160MB / 128MB) = 2 файла
С партиционированием: 3 категории × 2 файла = 6 файлов
```

---

## 8.5. Сравнение методов записи (таблица)

| Метод записи | Код | Преимущества | Недостатки | Производительность |
|-----------------|---------|------------------|----------------|------------------------|
| Базовая | `.write.parquet()` | Простота | Мелкие файлы, нет оптимизаций | Средняя |
| С партиционированием | `.partitionBy().parquet()` | Быстрые фильтры | Overhead папок | Высокая для фильтрации |
| С контролем файлов | `.repartition().write()` | Оптимальный размер | Дополнительный shuffle | Высокая |
| С динамическим | `.writeBucketed()` | Оптимизация JOIN | Сложная настройка | Очень высокая для JOIN |
| С инкрементальной | `.mode("append")` | Добавление данных | Риск дубликатов | Зависит от объема |

---


#### Оптимизированная запись данных

Сохранить данные в оптимальном формате для дальнейшего использования

In [16]:
output_dir = "./data/processed"
os.makedirs(output_dir, exist_ok=True)

# CSV (для совместимости)
csv_path = f"{output_dir}/reviews_csv"
df_cleaned.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(csv_path)

# Parquet (оптимизированный бинарный формат)
parquet_path = f"{output_dir}/reviews.parquet"
df_cleaned.write \
    .mode("overwrite") \
    .parquet(parquet_path)

# Parquet с партиционированием
partitioned_path = f"{output_dir}/reviews_partitioned"
df_cleaned.write \
    .mode("overwrite") \
    .partitionBy("sentiment") \
    .parquet(partitioned_path)

print("Данные сохранены в форматах:")
print(f"1. CSV: {csv_path}")
print(f"2. Parquet: {parquet_path}")
print(f"3. Parquet с партиционированием: {partitioned_path}")

print("\nСравнение размеров")
for format_name, path in [("CSV", csv_path), ("Parquet", parquet_path), ("Parquet partitioned", partitioned_path)]:
    total_size = 0
    for file in glob.glob(f"{path}/", recursive=True):
        if os.path.isfile(file):
            total_size += os.path.getsize(file)
    print(f"{format_name}: {total_size / 1024 / 1024:.2f} MB")

print("\nПреимущества партиционирования")
print("Чтение только негативных отзывов из партиционированных данных:")

start = time.time()
neg_reviews = spark.read.parquet(f"{partitioned_path}/sentiment=neg")
neg_count = neg_reviews.count()
partitioned_time = time.time() - start

start = time.time()
neg_reviews2 = spark.read.parquet(parquet_path).filter(col("sentiment") == "neg")
neg_count2 = neg_reviews2.count()
non_partitioned_time = time.time() - start

print(f"Партиционированные: {partitioned_time:.3f} сек, {neg_count} записей")
print(f"Непартиционированные: {non_partitioned_time:.3f} сек, {neg_count2} записей")
print(f"Выигрыш: {non_partitioned_time/partitioned_time:.1f}x")

                                                                                

Данные сохранены в форматах:
1. CSV: ./data/processed/reviews_csv
2. Parquet: ./data/processed/reviews.parquet
3. Parquet с партиционированием: ./data/processed/reviews_partitioned

Сравнение размеров
CSV: 0.00 MB
Parquet: 0.00 MB
Parquet partitioned: 0.00 MB

Преимущества партиционирования
Чтение только негативных отзывов из партиционированных данных:




Партиционированные: 1.798 сек, 16697 записей
Непартиционированные: 1.812 сек, 16697 записей
Выигрыш: 1.0x


                                                                                

In [17]:
output_dir = "./data/processed"
os.makedirs(output_dir, exist_ok=True)

# запись данных в разные форматы
csv_path = f"{output_dir}/reviews_csv"
parquet_path = f"{output_dir}/reviews.parquet"
partitioned_path = f"{output_dir}/reviews_partitioned"

# записываем все форматы
df_cleaned.write.mode("overwrite").option("header", "true").csv(csv_path)
df_cleaned.write.mode("overwrite").parquet(parquet_path)
df_cleaned.write.mode("overwrite").partitionBy("sentiment").parquet(partitioned_path)

print("✓ Данные сохранены в 3 форматах")

# Spark ленивый - нужно вызвать действие, чтобы запись реально произошла
df_cleaned.count()  # это заставит Spark выполнить все pending операции
time.sleep(2)  # ждем завершения записи

def get_size(path):
    if not os.path.exists(path):
        return 0
    total = 0
    for root, dirs, files in os.walk(path):
        for file in files:
            filepath = os.path.join(root, file)
            if os.path.isfile(filepath):
                total += os.path.getsize(filepath)
    return total / (1024*1024)  # в MB

print("\n Размер файлов:")
sizes = []
for name, path in [("CSV", csv_path), ("Parquet", parquet_path), ("Parquet с партиц.", partitioned_path)]:
    size_mb = get_size(path)
    sizes.append((name, size_mb))
    print(f"  {name}: {size_mb:.2f} MB")

print("\n Тест скорости чтения:")

start = time.time()
neg1 = spark.read.parquet(f"{partitioned_path}/sentiment=neg")
count1 = neg1.count()
time1 = time.time() - start

start = time.time()
neg2 = spark.read.parquet(parquet_path).filter(col("sentiment") == "neg")
count2 = neg2.count()
time2 = time.time() - start

print(f"  Партиционированные: {time1:.2f} сек")
print(f"  Непартиционированные: {time2:.2f} сек")
print(f"  Ускорение: {time2/time1:.1f} раз")


                                                                                

✓ Данные сохранены в 3 форматах


                                                                                


 Размер файлов:
  CSV: 402.10 MB
  Parquet: 221.77 MB
  Parquet с партиц.: 244.22 MB

 Тест скорости чтения:




  Партиционированные: 1.09 сек
  Непартиционированные: 1.38 сек
  Ускорение: 1.3 раз


                                                                                

## 9. Создание итогового отчета

### 9.1. Метрики качества ETL-пайплайна

```
Основные метрики:
1. Input/Output: Сколько данных прочитано/записано
2. Время выполнения: Общее время пайплайна
3. Обработка ошибок: Сколько файлов пропущено
4. Распределение данных: Баланс по категориям
5. Размеры файлов: Оптимальность хранения
```

#### Создание итогового отчёта

Создать сводный отчёт по проделанной работе

In [18]:
from datetime import datetime
from pyspark.sql.functions import countDistinct, stddev, expr, corr, when
import time

start_time = time.time()

# директория для результатов 
output_dir = "./data/processed"
os.makedirs(output_dir, exist_ok=True)

print("Подготовка метрик для отчета...")

# подсчет дубликатов
duplicate_count = df_cleaned.groupBy("film_id", "review_num") \
    .count() \
    .filter(col("count") > 1) \
    .count()

# статистика по фильмам
film_stats = df_cleaned.groupBy("film_id") \
    .count() \
    .agg(
        avg("count").alias("avg_reviews_per_film"),
        max("count").alias("max_reviews_per_film")
    ).first()

# корреляции
df_numeric = df_cleaned.withColumn(
    "sentiment_score",
    when(col("sentiment") == "neg", -1)
    .when(col("sentiment") == "neu", 0)
    .when(col("sentiment") == "pos", 1)
    .otherwise(None)
)

corr_data = df_numeric.select(
    corr("sentiment_score", "review_length").alias("corr_sentiment_length"),
    corr("review_length", "word_count").alias("corr_length_words")
).first()

# формирование данных отчета
report_data = [
    # базовые метрики обработки
    ("Всего обработано файлов", df.count()),
    ("После очистки", df_cleaned.count()),
    ("Удалено записей", df.count() - df_cleaned.count()),
    ("Процент удаленных", ((df.count() - df_cleaned.count()) / df.count() * 100) if df.count() > 0 else 0),
    
    # распределение по тональности
    ("Негативных отзывов", df_cleaned.filter(col("sentiment") == "neg").count()),
    ("Позитивных отзывов", df_cleaned.filter(col("sentiment") == "pos").count()),
    ("Нейтральных отзывов", df_cleaned.filter(col("sentiment") == "neu").count()),
    
    # метаданные фильмов
    ("Уникальных фильмов (film_id)", df_cleaned.select(countDistinct("film_id")).first()[0]),
    ("Среднее отзывов на фильм", film_stats["avg_reviews_per_film"]),
    ("Максимум отзывов на фильм", film_stats["max_reviews_per_film"]),
    
    # статистика review_num
    ("Средний номер рецензии", df_cleaned.select(avg("review_num")).first()[0]),
    ("Минимальный review_num", df_cleaned.select(min("review_num")).first()[0]),
    ("Максимальный review_num", df_cleaned.select(max("review_num")).first()[0]),
    
    # статистика длины текста
    ("Средняя длина отзыва", df_cleaned.select(avg("review_length")).first()[0]),
    ("Медианная длина отзыва", df_cleaned.select(expr("percentile_approx(review_length, 0.5)")).first()[0]),
    ("Стандартное отклонение длины", df_cleaned.select(stddev("review_length")).first()[0]),
    ("Минимальная длина", df_cleaned.select(min("review_length")).first()[0]),
    ("Максимальная длина", df_cleaned.select(max("review_length")).first()[0]),
    
    # статистика по словам
    ("Среднее слов в отзыве", df_cleaned.select(avg("word_count")).first()[0]),
    
    # качество данных
    ("Дубликатов film_id+review_num", duplicate_count),
    ("Некорректных film_id", df.filter((col("film_id").isNull()) | (col("film_id") <= 0)).count()),
    ("Некорректных review_num", df.filter((col("review_num").isNull()) | (col("review_num") <= 0)).count()),
    
    # корреляции
    ("Корреляция тональность-длина", corr_data["corr_sentiment_length"]),
    ("Корреляция длина-слова", corr_data["corr_length_words"]),
]


for metric, value in report_data:
    if isinstance(value, float):
        if metric.startswith("Корреляция"):
            print(f"{metric:40} {value:15.3f}")
        elif metric.startswith("Процент"):
            print(f"{metric:40} {value:15.2f}%")
        else:
            print(f"{metric:40} {value:15.2f}")
    else:
        print(f"{metric:40} {value:15d}")


# распределение по тональности в процентах
total_clean = df_cleaned.count()
if total_clean > 0:
    neg_count = df_cleaned.filter(col("sentiment") == "neg").count()
    pos_count = df_cleaned.filter(col("sentiment") == "pos").count()
    neu_count = df_cleaned.filter(col("sentiment") == "neu").count()
    
    neg_pct = neg_count / total_clean * 100
    pos_pct = pos_count / total_clean * 100
    neu_pct = neu_count / total_clean * 100
    
    print(f"\nРаспределение по тональности:")
    print(f"  Негативные: {neg_pct:6.1f}% ({neg_count} отзывов)")
    print(f"  Позитивные: {pos_pct:6.1f}% ({pos_count} отзывов)")
    print(f"  Нейтральные: {neu_pct:6.1f}% ({neu_count} отзывов)")

# топ-5 фильмов по количеству отзывов
print(f"\nТоп-5 фильмов по количеству отзывов:")
top_films = df_cleaned.groupBy("film_id") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(5) \
    .collect()

for i, row in enumerate(top_films, 1):
    print(f"  {i}. Фильм ID {row['film_id']}: {row['count']} отзывов")

# анализ длины отзывов
print(f"\nАнализ длины отзывов:")
length_stats = df_cleaned.select(
    expr("percentile_approx(review_length, 0.25)").alias("q1"),
    expr("percentile_approx(review_length, 0.75)").alias("q3")
).first()

iqr = length_stats["q3"] - length_stats["q1"]
lower_bound = length_stats["q1"] - 1.5 * iqr
upper_bound = length_stats["q3"] + 1.5 * iqr

print(f"  Q1 (25-й перцентиль): {length_stats['q1']:.0f} символов")
print(f"  Q3 (75-й перцентиль): {length_stats['q3']:.0f} символов")
print(f"  IQR: {iqr:.0f} символов")
print(f"  Границы выбросов: {lower_bound:.0f} - {upper_bound:.0f}")

# сохранение отчета в файл
report_filename = f"{output_dir}/etl_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
print(f"Сохранение отчета в файл: {report_filename}")

with open(report_filename, "w", encoding="utf-8") as f:
    f.write("Основные метрики обработки:\n")
    for metric, value in report_data:
        if isinstance(value, float):
            if metric.startswith("Корреляция"):
                f.write(f"{metric}: {value:.3f}\n")
            elif metric.startswith("Процент"):
                f.write(f"{metric}: {value:.2f}%\n")
            else:
                f.write(f"{metric}: {value:.2f}\n")
        else:
            f.write(f"{metric}: {value}\n")
    
    f.write("\nАналитика данных:\n")
    
    if total_clean > 0:
        f.write(f"Распределение по тональности:\n")
        f.write(f"  • Негативные: {neg_pct:.1f}% ({neg_count} отзывов)\n")
        f.write(f"  • Позитивные: {pos_pct:.1f}% ({pos_count} отзывов)\n")
        f.write(f"  • Нейтральные: {neu_pct:.1f}% ({neu_count} отзывов)\n\n")
    
    f.write(f"Топ-5 фильмов по количеству отзывов:\n")
    for i, row in enumerate(top_films, 1):
        f.write(f"  {i}. Фильм ID {row['film_id']}: {row['count']} отзывов\n")
    
    f.write(f"\nСтатистические границы (IQR метод):\n")
    f.write(f"  • Q1 (25-й перцентиль): {length_stats['q1']:.0f} символов\n")
    f.write(f"  • Q3 (75-й перцентиль): {length_stats['q3']:.0f} символов\n")
    f.write(f"  • IQR: {iqr:.0f} символов\n")
    f.write(f"  • Границы выбросов: {lower_bound:.0f} - {upper_bound:.0f}\n")

    f.write(f"Сводка:\n")
    f.write(f"  • Обработано файлов: {df.count()}\n")
    f.write(f"  • Успешно очищено: {df_cleaned.count()} ({df_cleaned.count()/df.count()*100:.1f}%)\n")
    f.write(f"  • Уникальных фильмов: {df_cleaned.select(countDistinct('film_id')).first()[0]}\n")
    f.write(f"  • Средняя длина отзыва: {df_cleaned.select(avg('review_length')).first()[0]:.0f} символов\n")

    f.write("Метаданные отчета:\n")
    f.write(f"  • Дата генерации: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
    f.write(f"  • Spark Application: {spark.conf.get('spark.app.name')}\n")
    f.write(f"  • Режим выполнения: {spark.conf.get('spark.master')}\n")

print(f"Отчет успешно сохранен в файл: {report_filename}")

# статистика времени выполнения
end_time = time.time()
total_time = end_time - start_time

print(f"Общее время выполнения ETL: {total_time:.2f} секунд")
print(f"Обработано отзывов в секунду: {df_cleaned.count()/total_time:.1f}")

spark.stop()
print("\nETL пайплайн успешно завершен!")

Подготовка метрик для отчета...


                                                                                

Всего обработано файлов                           131669
После очистки                                     109998
Удалено записей                                    21671
Процент удаленных                                  16.46%
Негативных отзывов                                 16697
Позитивных отзывов                                 72987
Нейтральных отзывов                                20314
Уникальных фильмов (film_id)                        9065
Среднее отзывов на фильм                           12.13
Максимум отзывов на фильм                            104
Средний номер рецензии                             27.26
Минимальный review_num                                 1
Максимальный review_num                               99
Средняя длина отзыва                             2042.84
Медианная длина отзыва                              1845
Стандартное отклонение длины                     1054.62
Минимальная длина                                     58
Максимальная длина            

                                                                                


Распределение по тональности:
  Негативные:   15.2% (16697 отзывов)
  Позитивные:   66.4% (72987 отзывов)
  Нейтральные:   18.5% (20314 отзывов)

Топ-5 фильмов по количеству отзывов:


                                                                                

  1. Фильм ID 1048334: 104 отзывов
  2. Фильм ID 493768: 99 отзывов
  3. Фильм ID 2950: 99 отзывов
  4. Фильм ID 481086: 99 отзывов
  5. Фильм ID 8219: 99 отзывов

Анализ длины отзывов:


                                                                                

  Q1 (25-й перцентиль): 1240 символов
  Q3 (75-й перцентиль): 2688 символов
  IQR: 1448 символов
  Границы выбросов: -932 - 4860
Сохранение отчета в файл: ./data/processed/etl_report_20251219_155258.txt


                                                                                

Отчет успешно сохранен в файл: ./data/processed/etl_report_20251219_155258.txt
Общее время выполнения ETL: 815.95 секунд


                                                                                

Обработано отзывов в секунду: 134.8

ETL пайплайн успешно завершен!
