# 1. Adaptive Query Execution (AQE)

В **Spark 2** стратегия выполнения запроса определяется заранее и остается неизменной на протяжении всего процесса выполнения. Это означает, что план выполнения запроса фиксируется до начала обработки данных, и даже если в процессе выполнения становится очевидным, что план не оптимален, изменить его уже нельзя.

В **Spark 3** появилась революционная функция — **Adaptive Query Execution (AQE)**. Она позволяет Spark динамически адаптировать план выполнения запроса на основе фактических данных, которые обрабатываются в реальном времени. Это значительно повышает производительность, особенно в случаях, когда данные распределены неравномерно или когда первоначальные оценки объема данных оказываются неточными.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

# Инициализация Spark с фиксированным количеством shuffle partitions
spark = SparkSession.builder \
    .appName("Spark2 Example") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .master("local[*]") \
    .getOrCreate()

# Генерация большого DataFrame
df = spark.range(1_000_000).withColumn("value", rand())

# Группировка данных (shuffle operation)
df_grouped = df.groupBy("id").sum("value")

df_grouped.show()

In [None]:
# Spark 3 (с AQE, динамический размер shuffle partitions):
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand

# Инициализация Spark 3 с адаптивным выполнением запросов (AQE)
spark = SparkSession.builder \
    .appName("Spark3 Example") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .master("local[*]") \
    .getOrCreate()

# Генерация большого DataFrame
df = spark.range(1_000_000).withColumn("value", rand())

# Группировка данных (shuffle operation)
df_grouped = df.groupBy("id").sum("value")

df_grouped.show()

## 2. Улучшения Broadcast Hash Join в Spark 3  

В **Spark 3** значительно усовершенствованы механизмы *broadcast join*, что позволяет оптимизировать выполнение запросов и повысить производительность.  

### Ключевые улучшения:  
- **Автоматический выбор broadcast join**  
  Теперь Spark сам определяет, когда следует применять *broadcast join*, даже если разработчик явно не использует `broadcast()`. Это сокращает время выполнения запросов, уменьшая объем shuffle-операций.  

- **Более интеллектуальное управление ресурсами**  
  Улучшенная логика выбора метода соединения учитывает размер данных, доступную память и конфигурацию кластера, позволяя избежать проблем с нехваткой ресурсов.  

Благодаря этим изменениям Spark 3 может автоматически повышать эффективность выполнения SQL-запросов, минимизируя нагрузку на кластер и ускоряя обработку больших объемов данных.  


In [None]:
# Spark 2 (без автоматического broadcast join):
from pyspark.sql.functions import broadcast
from pyspark.sql import SparkSession

# Инициализация SparkSession
spark = SparkSession.builder.appName("Spark2_vs_Spark3").getOrCreate()

# Два больших DataFrame
df_large = spark.range(1_000_000).withColumn("value", rand())
df_small = spark.range(1000).withColumn("value", rand())

# Используем broadcast вручную
df_joined = df_large.join(broadcast(df_small), "id", "inner")

# Логируем план выполнения для Spark 2
print("Spark 2 план выполнения:")
df_joined.explain(True)

df_joined.show()


In [None]:
# Spark 3 (автоматическое определение broadcast join):
# Два больших DataFrame
df_large = spark.range(1_000_000).withColumn("value", rand())
df_small = spark.range(1000).withColumn("value", rand())

# Теперь Spark сам решает, когда использовать broadcast join
df_joined = df_large.join(df_small, "id", "inner")

# Логируем план выполнения для Spark 3
print("Spark 3 план выполнения:")
df_joined.explain(True)

df_joined.show()



# 🚀 Разница в производительности: Spark 2 vs Spark 3  

В **Spark 2** приходилось вручную указывать `broadcast()`, что было не только неудобно, но и требовало от разработчика глубокого понимания распределённых вычислений. Ошибки в этом выборе могли приводить к лишнему потреблению памяти и снижению производительности.  

В **Spark 3** этот процесс стал умнее: движок сам анализирует план выполнения и решает, когда использовать **broadcast join**, снижая нагрузку на память и улучшая общую эффективность вычислений. Это делает работу с большими объёмами данных более удобной и предсказуемой.  

🔥 Итог: **меньше ручных настроек, выше производительность, меньше проблем с памятью!**  


### 3. Dynamic Partition Pruning (DPP)

В **Apache Spark 2** при выполнении запросов с фильтрацией по партициям все партиции загружаются заранее, даже если они не используются в вычислениях. Это приводит к лишнему сканированию данных, увеличению времени выполнения запросов и повышенной нагрузке на систему.

В **Apache Spark 3** была внедрена оптимизация **Dynamic Partition Pruning (DPP)**, которая позволяет динамически определять, какие партиции действительно необходимы, и загружать только их. Это особенно полезно при работе с **джойнами** (JOIN) и подзапросами, где фильтры на партиции становятся известны только во время выполнения запроса.

Основные преимущества **DPP**:
- Значительное **сокращение объема данных**, считываемых с диска.
- **Уменьшение времени выполнения** запросов за счет работы только с нужными партициями.
- **Оптимизация использования ресурсов**, что критично для больших кластеров и многопользовательских сред.

Эта оптимизация делает **Spark 3** более эффективным для обработки **больших объемов данных**, особенно в случаях, когда используется **разбиение по партициям (partitioning)** в **Hive**, **Parquet** или других файловых форматах.


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, rand, date_add, explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Инициализация Spark
spark = SparkSession.builder \
    .appName("Generate Parquet Data") \
    .master("local[*]") \
    .getOrCreate()

# Схема данных
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("date", DateType(), nullable=False),
    StructField("value", IntegerType(), nullable=False)
])

# Генерация данных
# Создаем DataFrame с диапазоном дат
dates = spark.range(0, 100).select(
    date_add(lit("2024-01-01").cast("date"), col("id").cast("int")).alias("date")
)

# Добавляем случайные значения
data = dates.withColumn("id", (rand() * 1000).cast("integer")) \
    .withColumn("value", (rand() * 100).cast("integer"))

# Сохраняем данные в формате Parquet
data.write.mode("overwrite").parquet("big_data.parquet")


In [None]:
# Spark 2 (чтение всех партиций):
# Чтение всех партиций без фильтрации на уровне партиций
df_large = spark.read.parquet("big_data.parquet").filter(col("date") == "2024-02-10")
df_large.show()

# Получение информации о числе считанных партиций
print(f"Число считанных партиций (Spark 2): {df_large.rdd.getNumPartitions()}")


In [None]:
# Spark 3 (динамическое определение нужных партиций):
# Включение динамического обрезания партиций
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")

# Чтение данных с динамическим обрезанием партиций
df_large = spark.read.parquet("big_data.parquet").filter(col("date") == "2024-02-10")
df_large.show()

# Получение информации о числе считанных партиций
print(f"Число считанных партиций (Spark 3): {df_large.rdd.getNumPartitions()}")


# 🚀 Разница в производительности Spark 2 и Spark 3

Одним из ключевых улучшений в **Apache Spark 3** является оптимизированная работа с партициями.

📌 **Spark 2**  
При выполнении запросов сканируются **все партиции**, даже если они не содержат нужных данных. Это приводит к избыточным вычислениям и замедляет выполнение.

⚡ **Spark 3**  
Благодаря улучшенной **динамической обработке партиций**, загружаются **только релевантные** партиции. Это существенно сокращает объем обрабатываемых данных и ускоряет выполнение запросов.

Таким образом, переход на **Spark 3** позволяет значительно **повысить производительность** за счет более умного управления ресурсами. 🚀


## 4. Коалесценция партиций при записи (Optimize Write)

### 🔍 Разница между Spark 2 и Spark 3

В **Spark 3** появилась важная оптимизация при записи данных — **коалесценция партиций (Optimize Write)**.  
Теперь при работе с большими объемами данных можно автоматически уменьшать количество **мелких файлов**,  
что значительно снижает нагрузку на файловую систему и ускоряет дальнейшую обработку данных.

### 🚀 Основные преимущества:
- 📉 **Снижение количества мелких файлов**, что упрощает управление данными.
- ⚡ **Оптимизированное распределение данных** внутри партиций.
- 💾 **Экономия ресурсов хранилища** за счет более крупных и последовательных файлов.
- 📊 **Улучшение производительности** при чтении и обработке данных.

Благодаря этим улучшениям Spark 3 обеспечивает более **эффективную и надежную работу с большими данными**.


In [None]:
# Spark 2 (много мелких файлов при записи в Parquet):

df_large.write.mode("overwrite").parquet("output_v2/")

In [None]:
# Spark 3 (объединение файлов перед записью):

spark.conf.set("spark.sql.adaptive.optimizeSkewedJoin", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

df_large.write.mode("overwrite").parquet("output_v3/")

In [None]:
# Для Spark 2 можно проверить количество файлов в output_v2
import os
output_v2_files = len([name for name in os.listdir("output_v2/") if os.path.isfile(os.path.join("output_v2/", name))])
print(f"Количество файлов в output_v2: {output_v2_files}")

# Для Spark 3 можно проверить количество файлов в output_v3
output_v3_files = len([name for name in os.listdir("output_v3/") if os.path.isfile(os.path.join("output_v3/", name))])
print(f"Количество файлов в output_v3: {output_v3_files}")

# Разница в производительности Spark 2 и Spark 3

### 📌 Управление файлами выходных данных

Одной из ключевых проблем в **Spark 2** является создание большого количества мелких файлов при записи данных, особенно в распределённые хранилища, такие как **HDFS** или **S3**. Это приводит к:

- Высокой нагрузке на файловую систему.
- Увеличению количества метаданных, что замедляет операции чтения.
- Повышенному времени выполнения последующих задач.

В **Spark 3** этот недостаток был устранён: движок автоматически **объединяет файлы** на этапе записи, что значительно снижает нагрузку на хранилище и повышает производительность. Теперь задачи на обработку данных выполняются быстрее, а система лучше масштабируется в условиях больших нагрузок.

✨ Улучшенное управление файлами в **Spark 3** делает обработку больших данных более эффективной! 🚀


## 5. Улучшенная обработка ошибок и отладка

Одним из значительных улучшений в **Apache Spark 3** стала расширенная система логирования и инструментов отладки. Теперь разработчикам проще анализировать ошибки, выявлять узкие места в производительности и оптимизировать выполнение задач.

### Ключевые изменения:

- **Более подробные логи** – в Spark 3 добавлены детализированные сообщения об ошибках, включая рекомендации по их устранению.  
- **Улучшенная трассировка стека** – теперь стековые трассы содержат больше информации, помогая быстрее находить причины сбоев.  
- **Новые метрики выполнения** – расширены инструменты мониторинга для детального анализа выполнения Spark-приложений.  
- **Усовершенствованная поддержка дебаггинга** – улучшена совместимость с профилировщиками и инструментами диагностики.  

Эти изменения делают отладку в **Spark 3** значительно удобнее и помогают разработчикам быстрее находить и исправлять ошибки, повышая стабильность и производительность приложений.


In [None]:
# Spark 2 (меньше информации о планах выполнения):
print("Spark 2: Explain (True)")
df.explain(True)

In [None]:
# Spark 3 (более детальный Explain):
print("Spark 3: Explain (mode='extended')")
df.explain(mode="extended")

# Разница между Spark 2 и Spark 3

Одним из важных улучшений в **Apache Spark 3** по сравнению с **Spark 2** стала расширенная диагностика запросов.

- В **Spark 2** команда `EXPLAIN` предоставляла минимальный объем информации, что затрудняло анализ плана выполнения.
- В **Spark 3** вывод `EXPLAIN` стал более детализированным, включая **метрики выполнения, статистику операторов и фазу оптимизации**, что значительно упрощает отладку и поиск узких мест.

Благодаря этим улучшениям разработчики получили больше возможностей для анализа и оптимизации производительности своих приложений.


# 🚀 Оптимизации при переходе с Spark 2 на Spark 3  

Если у вас есть старые **Spark 2** job'ы, переход на **Spark 3** может значительно улучшить их производительность с минимальными изменениями.  

### 🔹 Что улучшилось?  
✅ **Адаптивное выполнение запросов (AQE)** – теперь Spark может динамически оптимизировать планы выполнения на основе данных во время выполнения.  
✅ **Оптимизированный Catalyst Optimizer** – более эффективные перестановки и упрощения логических планов запроса.  
✅ **Улучшенный обмен данными** – новая стратегия шифрования shuffle и оптимизированная работа с памятью.  
✅ **Поддержка ANSI SQL** – улучшена совместимость с SQL-стандартом, что упрощает написание кода.  

### 🔹 Почему стоит перейти?  
🔸 **Более быстрые джобы** – сокращение времени выполнения за счёт новых оптимизаций.  
🔸 **Лучшее управление ресурсами** – эффективное распределение памяти и CPU.  
🔸 **Минимальные изменения кода** – большинство старых job'ов работают без доработок, но с увеличенной производительностью.  

Переход на Spark 3 – это шаг к более **быстрому, умному и эффективному** анализу данных! 🚀  


In [None]:
# 1. Включить AQE:

spark.conf.set("spark.sql.adaptive.enabled", "true")

# 2. Использовать автоматический Broadcast Join:
# Удалить broadcast(), Spark сам выберет нужный план.

# 3. Включить Dynamic Partition Pruning:
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
# 4.Оптимизировать количество партиций перед записью:
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Заключение

**Spark 3** значительно превосходит **Spark 2** по производительности и интеллектуальным возможностям. Важно отметить ключевые улучшения, которые делают Spark 3 более эффективным инструментом для обработки данных:

- **Adaptive Query Execution (AQE)** – уменьшает количество shuffle partitions, что снижает нагрузку на систему и повышает производительность.
- **Автоматический Broadcast Join** – сокращает потребление памяти, обеспечивая более оптимизированные способы выполнения соединений.
- **Dynamic Partition Pruning (DPP)** – значительно уменьшает ненужное сканирование данных, повышая скорость выполнения запросов.
- **Optimize Write** – уменьшает количество маленьких файлов, что способствует лучшему распределению данных и снижает нагрузку на файловую систему.
- **Улучшенная отладка** – теперь предоставляется более полная информация о производительности запросов, что облегчает диагностику и оптимизацию.

Эти улучшения делают **Spark 3** более подходящим для крупных и сложных данных, улучшая как общую производительность, так и управление ресурсами, что имеет важное значение для современных задач обработки данных.


# Задача анализа данных о транзакциях клиентов банка

Допустим, у нас есть большой объем данных о транзакциях клиентов банка. Основная цель заключается в том, чтобы эффективно обработать эти данные, провести анализ и получить полезную информацию для бизнеса. В процессе работы с данными мы хотим выполнить несколько ключевых шагов:

1. **Загрузить данные из CSV**  
   Мы начнем с загрузки данных из CSV-файлов, содержащих информацию о транзакциях. Это позволит нам работать с данными в привычном формате и подготовить их для дальнейшей обработки.

2. **Очистить данные**  
   На этом этапе мы уделим внимание очистке данных:
   - Удаление дубликатов, чтобы избежать избыточных записей.
   - Обработка пустых значений (NULL) для обеспечения корректности анализа и расчетов.

3. **Анализировать транзакции**  
   Следующий шаг — анализ данных. Мы будем проводить:
   - **Группировку данных по клиентам** для оценки их активности.
   - **Расчет общей суммы транзакций** и **среднего чека** для каждого клиента.

4. **Записать результат в Parquet**  
   Финальный этап — сохранение обработанных данных в формате **Parquet**. Этот формат выбран за счет его высокой производительности при чтении данных и эффективного хранения, что позволит нам в будущем быстро обрабатывать большие объемы данных.

В результате мы получим очищенные и агрегированные данные, готовые для дальнейшего анализа или использования в отчетности. Этот процесс позволяет оптимизировать работу с большими данными и ускорить обработку информации в реальном времени.


# Реализация на Spark 2

Spark 2 представляет собой мощный инструмент для обработки больших данных, но в нем есть несколько ограничений, которые могут повлиять на производительность и гибкость обработки данных. Рассмотрим основные из них:

### 1. Статическое разбиение (shuffle partitions)
Spark 2 использует статическое разбиение данных при перераспределении (shuffle), что может привести к неэффективной работе с большими объемами данных. В случае, когда количество shuffle partitions слишком велико или мало, это может существенно замедлить выполнение задач.

### 2. Отсутствие Adaptive Query Execution (AQE)
Spark 2 не поддерживает Adaptive Query Execution, что означает отсутствие возможности динамически изменять планы выполнения на основе фактических данных во время обработки. Это ограничивает гибкость при выполнении сложных запросов и может привести к неоптимальному выполнению.

### 3. Неэффективная работа с партиционированными таблицами
В Spark 2 партиционированные таблицы считываются целиком, даже если часть данных не требуется для обработки. Это приводит к увеличению времени выполнения задач, так как все партиции загружаются, несмотря на то что можно было бы обрабатывать только необходимые.

### 4. Явный `broadcast()` для маленьких таблиц
В Spark 2 необходимо явно использовать метод `broadcast()` для работы с маленькими таблицами, чтобы избежать ошибок производительности. Это требует дополнительной настройки и может стать проблемой в случае работы с большими данными.

---

Хотя Spark 2 предоставляет множество полезных возможностей для работы с большими данными, эти ограничения могут повлиять на производительность, особенно при работе с большими и сложными наборами данных.


In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, broadcast

# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("BankTransactions-Spark2") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Генерация большого DataFrame с транзакциями
data = [(i, f"User_{i % 100}", i * 100.5, "2024-02-14") for i in range(10_000_0)]
columns = ["transaction_id", "customer_id", "amount", "date"]

df_transactions = spark.createDataFrame(data, columns)

# Очистка данных: удаление дубликатов и NULL-значений
df_clean = df_transactions.dropDuplicates().fillna({"amount": 0})

# Читаем данные о клиентах (маленькая таблица)
df_customers = spark.createDataFrame([(f"User_{i}", f"City_{i%10}") for i in range(100)], ["customer_id", "city"])

# Используем ручное broadcast join (обязательно в Spark 2)
df_joined = df_clean.join(broadcast(df_customers), "customer_id", "left")

# Агрегация данных: суммарные и средние транзакции по клиентам
df_agg = df_joined.groupBy("customer_id", "city").agg(
    sum("amount").alias("total_spent"),
    avg("amount").alias("avg_spent"),
    count("transaction_id").alias("num_transactions")
)

# Сохранение в Parquet
df_agg.write.mode("overwrite").parquet("output_spark2/")

spark.stop()


# Реализация на Spark 3: Оптимизации и улучшения производительности

В версии **Spark 3** реализованы множество полезных оптимизаций, которые значительно улучшили производительность и удобство работы с данными. Вот некоторые из них:

## 1. **AQE (Adaptive Query Execution)**

AQE включен по умолчанию, что позволяет автоматически адаптировать план выполнения запросов в зависимости от реальных данных. Включение параметра `spark.sql.adaptive.enabled = true` активирует динамическое перераспределение данных, улучшая работу с запросами, особенно при изменяющихся условиях выполнения.

## 2. **Broadcast Join**

Ранее для использования broadcast join требовалось явное указание операции с помощью функции `broadcast()`. В **Spark 3** это поведение автоматизируется, и **Broadcast Join** применяется по умолчанию при необходимости, что упрощает работу и повышает производительность за счет оптимизации планов выполнения.

## 3. **Dynamic Partition Pruning (DPP)**

С помощью **Dynamic Partition Pruning** можно эффективно работать с большими объемами данных. Этот механизм позволяет загружать только те партиции, которые необходимы для выполнения запроса, что значительно уменьшает объем обрабатываемых данных и ускоряет выполнение операций.

## 4. **Optimize Write & Auto File Coalescing**

Теперь Spark автоматически объединяет мелкие файлы при записи данных в хранилище. Это позволяет избежать проблемы с большим количеством небольших файлов, улучшая производительность чтения и записи, а также снижая нагрузку на хранилище данных.

## Заключение

Эти и другие оптимизации делают **Spark 3** значительно более мощным инструментом для работы с большими данными, упрощая процесс разработки и ускоряя выполнение задач. Включение этих улучшений в процессы обработки данных помогает значительно сократить время выполнения и повысить общую эффективность работы с данными.


In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when

# Инициализация SparkSession с улучшениями Spark 3
# Включаем AQE
# Объединение мелких файлов
# Динамическая фильтрация партиций
spark = SparkSession.builder \
    .appName("BankTransactions-Spark3") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true") \
    .getOrCreate()

# Генерация большого DataFrame с транзакциями
data = [(i, f"User_{i % 100}", i * 100.5, "2024-02-14") for i in range(10_000_0)]
columns = ["transaction_id", "customer_id", "amount", "date"]

df_transactions = spark.createDataFrame(data, columns)

# Очистка данных
df_clean = df_transactions.dropDuplicates().fillna({"amount": 0})

# Читаем данные о клиентах (маленькая таблица)
df_customers = spark.createDataFrame([(f"User_{i}", f"City_{i%10}") for i in range(100)], ["customer_id", "city"])

# В Spark 3 теперь НЕ НУЖНО вручную указывать broadcast
df_joined = df_clean.join(df_customers, "customer_id", "left")

# Агрегация данных
df_agg = df_joined.groupBy("customer_id", "city").agg(
    sum("amount").alias("total_spent"),
    avg("amount").alias("avg_spent"),
    count("transaction_id").alias("num_transactions")
)

# Запись с оптимизацией
df_agg.write.mode("overwrite").parquet("output_spark3/")

spark.stop()


# Анализ покупательского поведения и прогнозирование оттока клиентов

## Цель:

Цель данного анализа — изучить покупательское поведение и предсказать отток клиентов на основе их активности в магазинах. Для этого необходимо провести несколько ключевых шагов, включающих обработку, очистку и объединение данных.

## Этапы работы:

1. **Обработка массивных данных о покупках клиентов**  
   Необходимо собрать и структурировать данные о покупках клиентов из различных магазинов. Это позволит создать полную картину покупательской активности.

2. **Очистка данных**  
   Важный этап, включающий удаление или замену ненужных значений (NULL), а также выявление и устранение дубликатов, что повысит точность анализа.

3. **Объединение данных с информацией о клиентах и магазинах**  
   Для более глубокой аналитики следует объединить данные о покупках с метаданными клиентов и магазинов, что позволит увидеть полное взаимодействие клиента с брендом.

4. **Вычисление метрик лояльности клиентов**  
   На основе собранных данных нужно вычислить ключевые метрики лояльности: средний чек, количество покупок и частоту повторных покупок. Эти данные помогут оценить, насколько клиенты привязаны к бренду.

5. **Прогнозирование оттока клиентов**  
   Исходя из покупательской активности и поведения, предсказывается вероятность оттока клиентов. Это позволит заранее выявить риски и принять меры для удержания клиентов.

6. **Запись данных в формат Parquet**  
   Для удобства дальнейшего анализа и хранения следует записать обработанные данные в формат Parquet, который является оптимальным для обработки больших объемов информации.

## Ожидаемые результаты:

- Глубокое понимание поведения клиентов и их лояльности.
- Прогнозирование возможного оттока клиентов на основе их активности.
- Эффективное использование данных для стратегического планирования и маркетинговых инициатив.
- Оптимизированный процесс обработки и хранения данных для долгосрочного использования.

Данный анализ позволит не только оптимизировать взаимодействие с клиентами, но и предсказывать тенденции, что является важным шагом для повышения прибыльности бизнеса и удержания клиентов.


# Анализ покупательского поведения и прогнозирование оттока клиентов

## Цель
Цель проекта заключается в реализации анализа покупательского поведения и прогнозирования оттока клиентов с использованием **Apache Spark 2**. Это включает в себя работу с большими объемами данных, а также создание моделей и алгоритмов для предсказания вероятности ухода клиентов и других важных бизнес-метрик.

## Ограничения Spark 2

**Apache Spark 2** имеет несколько ограничений, которые необходимо учитывать при реализации анализа:

1. **Статическая конфигурация партиций**  
   В Spark 2 параметр `spark.sql.shuffle.partitions` имеет фиксированное значение, по умолчанию установленное на **200**. Это означает, что количество партиций, создаваемых во время операций shuffle, нельзя изменять динамически, что может снизить эффективность при работе с большими данными.

2. **Необходимость явного использования `broadcast()` для маленьких таблиц**  
   Для оптимизации работы с небольшими таблицами (например, таблицы магазинов или клиентов) необходимо явно использовать метод `broadcast()`. Это позволяет избежать излишних операций shuffle и ускорить выполнение запросов, однако это требует ручной настройки.

3. **Отсутствие Adaptive Query Execution (AQE)**  
   В Spark 2 отсутствует поддержка **Adaptive Query Execution (AQE)**, что ограничивает возможности автоматической адаптации плана выполнения запроса в зависимости от объема данных. Это может привести к неэффективной обработке больших наборов данных и низкой производительности.

4. **Ручная обработка мелких файлов при записи**  
   Spark 2 не предоставляет встроенных инструментов для автоматического объединения мелких файлов, что требует ручного вмешательства при записи данных, чтобы избежать большого количества маленьких файлов, что может негативно повлиять на производительность.

## Заключение

Несмотря на ограничения Spark 2, его использование для анализа покупательского поведения и прогнозирования оттока клиентов остается возможным. Однако для обеспечения оптимальной производительности и эффективности выполнения запросов необходимо внимательно учитывать описанные ограничения и применять соответствующие оптимизации.


In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lag, dense_rank, broadcast
from pyspark.sql.window import Window

# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("CustomerChurn-Spark2") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Генерация большого набора данных о транзакциях
data = [(i, f"User_{i % 100_000}", f"Store_{i % 1_000}", i * 1.2, "2024-02-14") for i in range(50_000)]
columns = ["transaction_id", "customer_id", "store_id", "amount", "date"]

df_transactions = spark.createDataFrame(data, columns)

# Чтение данных о клиентах
df_customers = spark.createDataFrame(
    [(f"User_{i}", f"Segment_{i % 10}") for i in range(100_000)],
    ["customer_id", "segment"]
)

# Чтение данных о магазинах
df_stores = spark.createDataFrame(
    [(f"Store_{i}", f"City_{i % 50}") for i in range(1_000)],
    ["store_id", "city"]
)

# Очистка данных: удаление дубликатов, NULL
df_clean = df_transactions.dropDuplicates().fillna({"amount": 0})

# Используем broadcast join (обязательно в Spark 2)
df_joined = df_clean \
    .join(broadcast(df_customers), "customer_id", "left") \
    .join(broadcast(df_stores), "store_id", "left")

# Вычисление метрик
df_metrics = df_joined.groupBy("customer_id", "segment").agg(
    sum("amount").alias("total_spent"),
    avg("amount").alias("avg_spent"),
    count("transaction_id").alias("num_transactions")
)

# Предсказание оттока клиентов: смотрим на последнюю покупку каждого клиента
window_spec = Window.partitionBy("customer_id").orderBy(col("date").desc())

df_churn = df_joined.withColumn("last_purchase", lag("date").over(window_spec)) \
    .withColumn("churn_flag", when(col("last_purchase") < "2023-12-01", 1).otherwise(0))

# Сохранение данных
df_churn.write.mode("overwrite").parquet("output_spark2/")

spark.stop()


# Анализ покупательского поведения и прогнозирование оттока клиентов

В современном бизнесе анализ покупательского поведения и прогнозирование оттока клиентов играют важную роль в оптимизации маркетинговых стратегий и улучшении качества обслуживания. Реализация таких задач с использованием современных инструментов обработки больших данных, таких как Apache Spark, позволяет значительно повысить точность прогнозов и улучшить принятие решений.

## Реализация на Spark 3

Apache Spark 3 предоставляет мощные возможности для обработки и анализа данных в распределенных системах. С помощью Spark можно эффективно обрабатывать большие объемы данных, что особенно важно при анализе покупательского поведения и прогнозировании оттока клиентов.

## Оптимизации Spark 3

Spark 3 включает несколько важных оптимизаций, которые делают обработку данных более быстрой и эффективной:

- **AQE (Adaptive Query Execution)**: Spark теперь способен адаптировать план выполнения запроса в зависимости от данных. Он автоматически выбирает оптимальный размер shuffle-партиций, что помогает избежать проблем с производительностью при работе с большими объемами данных.

- **Автоматический Broadcast Join**: Эта оптимизация позволяет Spark автоматически решать, когда использовать `broadcast()` для джоинов, избавляя от необходимости вручную настраивать broadcast-джоины. Это улучшает производительность запросов, особенно при работе с небольшими таблицами.

- **Dynamic Partition Pruning (DPP)**: При обработке данных с партициями, Spark теперь может загружать только те партиции, которые необходимы для выполнения запроса, что значительно сокращает время обработки и нагрузку на систему.

- **Optimize Write & Coalescing**: Spark сам управляет объединением мелких файлов в большие, что улучшает производительность записи данных и уменьшает количество мелких файлов, что в свою очередь снижает нагрузку на файловую систему.

Эти оптимизации позволяют значительно улучшить производительность при анализе больших данных и предоставляют более гибкие и удобные инструменты для обработки и прогнозирования.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lag
from pyspark.sql.window import Window

# Инициализация SparkSession с улучшениями Spark 3
spark = SparkSession.builder \
    .appName("CustomerChurn-Spark3") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true") \
    .getOrCreate()

# Генерация большого набора данных
data = [(i, f"User_{i % 100_000}", f"Store_{i % 1_000}", i * 1.2, "2024-02-14") for i in range(50_000_0)]
columns = ["transaction_id", "customer_id", "store_id", "amount", "date"]

df_transactions = spark.createDataFrame(data, columns)

# Чтение данных о клиентах
df_customers = spark.createDataFrame(
    [(f"User_{i}", f"Segment_{i % 10}") for i in range(100_000)],
    ["customer_id", "segment"]
)

# Чтение данных о магазинах
df_stores = spark.createDataFrame(
    [(f"Store_{i}", f"City_{i % 50}") for i in range(1_000)],
    ["store_id", "city"]
)

# Очистка данных
df_clean = df_transactions.dropDuplicates().fillna({"amount": 0})

# В Spark 3 теперь НЕ НУЖНО вручную указывать broadcast
df_joined = df_clean.join(df_customers, "customer_id", "left").join(df_stores, "store_id", "left")

# Вычисление метрик
df_metrics = df_joined.groupBy("customer_id", "segment").agg(
    sum("amount").alias("total_spent"),
    avg("amount").alias("avg_spent"),
    count("transaction_id").alias("num_transactions")
)

# Предсказание оттока клиентов
window_spec = Window.partitionBy("customer_id").orderBy(col("date").desc())

df_churn = df_joined.withColumn("last_purchase", lag("date").over(window_spec)) \
    .withColumn("churn_flag", when(col("last_purchase") < "2023-12-01", 1).otherwise(0))

# Запись с оптимизацией
df_churn.write.mode("overwrite").parquet("output_spark3/")

spark.stop()


# Сравнение Spark 2 vs Spark 3

В Spark 3 было внедрено множество улучшений, которые значительно повышают производительность и упрощают код. Рассмотрим ключевые изменения и их выгоды:

| Улучшение                                      | Spark 2                       | Spark 3                       | Выгода                                                      |
|------------------------------------------------|-------------------------------|-------------------------------|------------------------------------------------------------|
| **Adaptive Query Execution (AQE)**             | ❌                            | ✅                            | Автоматически оптимизирует план запроса для повышения производительности. |
| **Broadcast Join**                             | ✅ (ручной `broadcast()`)      | ✅ (автоопределение)           | Упрощает код за счет автоматического выбора метода соединения. |
| **Dynamic Partition Pruning (DPP)**            | ❌                            | ✅                            | Загружаются только нужные партиции, что ускоряет выполнение запросов. |
| **Optimize Write & Coalescing**                 | ❌                            | ✅                            | Автоматически уменьшает количество мелких файлов, улучшая производительность записи. |

### Дополнительные улучшения в Spark 3:
- **Лучшее использование ресурсов**: Spark 3 имеет улучшения в планировании заданий, что позволяет более эффективно распределять ресурсы и уменьшать время выполнения.
- **Улучшенная поддержка Python и Pandas API**: Для пользователей, работающих с Python, добавлены новые возможности и улучшения в API, что делает работу с Spark более удобной.

Эти изменения позволяют разработчикам создавать более эффективные и производительные приложения с меньшими затратами на поддержку и оптимизацию.
