In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", False) \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.sql.sources.bucketing.enabled", True) \
    .config("spark.executor.memory", "") \
    .config("spark.driver.memory", "") \
    .appName("session_name") \
    .getOrCreate()

- Параметры по памяти определить самостоятельно
- Включение AQE или Auto Broadcast => не зачет

# Задание 1

В исходном файле представлена информация о сотрудниках, их подразделении и заработной плате.
1. Использовать Spark-сессию, которую мы использовали на семинарах или в ДЗ №1
2. Посчитать среднюю зарплату в каждом подразделении с применением Spark RDD
3. При решении задачи необходимо использовать функцию `aggregateByKey`

### Решение

In [None]:
# Ваше решение в ячейках ниже ...

# Задание 2

## Входные данные

Таблица `transactions` - информация о длительности просмотра контента пользователями:
1. user_uid — уникальный идентификатор пользователя
2. element_uid — уникальный идентификатор контента
3. watched_time — время просмотра в секундах

Справочник `catalogue` - каталог с описанием контента и метаинформации по нему

P.S. Как вы можете заметить при просмотре данных по пользователями, нужный нам ключ для операции будет перекошен (90% строк представлены на фильм, очень популярный среди смотревших) - это нужно доказать, то есть описать проблему в датасетах с точки зрения обработки Spark

### Решение

In [None]:
# Ваше решение в ячейках ниже ...

In [None]:
# Здесь необходимо вывести результат:
# result.explain()
# result.show(truncate=False)

### Решение с оптимизацией

In [None]:
# Ваше решение в ячейках ниже ...

In [None]:
# Здесь необходимо вывести результат:
# result.explain()
# result.show(truncate=False)

# Задание 3

## Входные данные

Таблица `transactions`  — информация о длительности просомтра контента пользователями:

1. user_uid — уникальный идентификатор пользователя
2. element_uid — уникальный идентификатор контента
3. watched_time — время просмотра в секундах

Таблица `ratings`  — информация об оценках, поставленных пользователями:

1. user_uid — уникальный идентификатор пользователя
2. element_uid — уникальный идентификатор контента
3. rating — поставленный пользователем рейтинг

Справочник `user_uids`  — выборка пользователей:
1. user_uid — уникальный идентификатор пользователя


## Что нужно сделать
Для каждого пользователя из выборки посчитать:
1. Максимальное и минимальное время просмотра фильмов с оценками 8, 9 и 10
2. Название фичи должно быть в формате `feat_<агрегирующая_функция>_watched_time_rating_<оценка>`
3. Если у пользователь не ставил оценки 8, 9 и 10 то значение фичей должно быть null
4. Описать принятые при разработки кода решения и возможные оптимизации

Важно: сокращаем затраты на shuflle

### Решение

In [None]:
# Ваше решение в ячейках ниже ...

In [None]:
# Здесь необходимо вывести результат:
# result.explain()
# result.show(truncate=False)

# Задание 4

## Входные данные

1. Потоковые данные о транзакциях из Kafka
2. Ивенты генерятся случайным образом, при помощи скрипта в директории task5/ или task4/ - они дублируются и структура одинаковая

Важно: данные генерируются случайным образом, поэтому результат будет у всех разный, соответственно можно ловить в том числе простые анамалии, например, статистически значимое отклонение значения какого-либо атрибута

## Что нужно сделать

1. Обработать потоковые данные: чтение и обработка данных о транзакциях в реальном времени, преобразовать входящие данные в DataFrame
2. Вычислить скользящее среднее значение суммы транзакций за последние 5 минут
3. Определить аномалии: пометить проблемные транзакции, например, если значение суммы транзакции больше среднего значения суммы транзакций в 2 раза
4. Вывести данные об аномальных транзакциях в консоль в режиме реального времени


In [None]:
spark = (
    SparkSession.builder
    .appName("Spark streaming")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", 4)
    .enableHiveSupport()
    .getOrCreate()
)

In [None]:
spark

In [None]:
# ваш код с рещением здесь ...

# Задание 5

**Входные данные**

1. Вам будет предоставлен docker-compose файл. Отличный от инфраструкты для ДЗ№1 и предыдущих заданий этого ДЗ№2
2. Генератор ивентов для топика кафки. Из этого топика нужно будет считывать данные
3. Структура данных, которые будут поступать в топик
4. Скорипт с генерацие ивентов их записью в Kafka Topic

#### Описание входящих данных

transaction_id — уникальный идентификатор транзакции
user_id — идентификатор пользователя
timestamp — время транзакции
amount — сумма транзакции
currency — валюта транзакции
transaction_type — тип транзакции (покупка, возврат, снятие, пополнение)
status — статус транзакции (успешно, неуспешно, в ожидании)

**Что требуется сделать**

Представьте, что вам поступила задача. Необходимо по рилтайм данным собрать витрины для аналитиков, к которым они могли бы обращаться, поверх которых они могли бы строить дашборды и многое другое.
1. Ваша задача: перенести данные из контура Kafka в контур ClickHouse
2. Построить несколько Kafka Engine таблиц (здесь на ваше усмотрение, можно одну, а можно и несколько)
3. Учесть, что некоторые входящие данные содержать ошибки или могут не парситься и т д. Для решение этой проблемы необходимо реализовать deadletter механизм при помощи MV
4. У вас должно быть минимум две таблицы MergeTree, они должны отличаться и быть логичными, то есть содержать какой-то смысл, иными словами – быть удобными для пользователя
5. Необходимо приложить код кафка энжинов, код MV, код таблиц и скрины запросов и создания таблиц в ClickHouse

In [None]:
# ваш код здесь ...

In [None]:
# ваши скрипты создания Kafka-Engine здесь ...

In [None]:
# ваши скрипты создания MV и MV DLQ здесь...

In [None]:
# ваши скрипты создания MergeTree таблиц здесь ...

In [None]:
# ваш прочий Python-код для решения задачи / аналитики / EDA здесь ...