# Тестовое задание
## Входные данные
```
+-------+----------+----------+---------+
|partner|  rep_date|     value|  article|
+-------+----------+----------+---------+
|  00001|2021-05-17|   8951.36|    begin|
|  00001|2021-05-17|-117015.26|comission|
|  00001|2021-05-17|     -0.15|      end|
|  00002|2021-03-05|  -8517.04|    begin|
|  00002|2021-03-05|     -0.92|comission|
|  00002|2021-03-05|  -6706.29|      end|
|  00002|2021-05-09|   -245.28|    begin|
|  00002|2021-05-09|  -1053.11|comission|
|  00002|2021-05-09|      -4.3|      end|
|  00004|2021-11-04|    -81.37|    begin|
|  00004|2021-11-04|    245.12|comission|
|  00004|2021-11-04|   -326.49|      end|
|  00006|2021-12-05|   -4847.4|    begin|
|  00006|2021-12-05|  30825.59|comission|
|  00006|2021-12-05|     -6.49|      end|
|  00006|2021-12-28|      7.98|    begin|
|  00006|2021-12-28|    238.45|comission|
|  00006|2021-12-28|   6619.69|      end|
|  00008|2021-04-19| -56554.39|    begin|
|  00008|2021-04-19|      5.97|comission|
+-------+----------+----------+---------+
```
Датасет описывает активность клиента и содержит колонки:
- `partner` - номер клиента
- `rep_date` - дата
- `article` - название статьи (`begin` - остатки по счетам клиента в начале дня, `end` - остатки по счетам клиента в конце дня, `comission` - комиссия)
- `value` - значение

## Теория
Клиент `partner` является **активным** на определенный день `rep_date`, если у него есть ненулевое изменение остатков по счетам в течение дня без учета списания комиссий `comission` банком (пассивное действие, не требующее участия клиента). В остальных случаях он считается **неактивным**.

_Например, если изменение остатков по счетам у клиента за день составило 100 рублей, и комиссия составила 100 рублей, то он тратил деньги только на комисию, поэтому он неактивен. Если клиент тратил деньги не только на комиссию, то он - активен._

Если клиент был неактивным `N=30` или более дней, то он переходит в **отток**. Если клиент вернулся в банк после периода неактивности в `N=30` или более дней, то он становится **новым** клиентом.

Задача банка - предотвращать отток, так как удерживать старых клиентов - намного дешевле, чем привлекать новых. Банк использует модели для поиска клиентов с высоким риском оттока и коммуницирует с ними с целью их удержания в банке. Для этого клиенту, например, могут быть предложены более выгодные условия по какому-либо продукту или более высокий cashback.  

## Задача
Собрать датасет для аналитиков и Data Scientist-ов с разметкой периодов активности клиента.

Выходные данные должны иметь структуру:
- `partner` - номер клиента
- `rep_date` - дата
- `life` - номер жизни клиента

_Например, клиент Х активен 10 дней: 2022-01-07, 2022-01-09, 2022-01-13, 2022-02-21, 2022-03-24, 2022-04-02, 2022-05-13, 2022-05-23, 2022-05-31. Первые три даты относятся к первой жизни клиента (между ними менее 30 дней). Далее клиент оттекает и возвращается только 2022-02-21 (это вторая жизнь клиента). Третья жизнь клиента начинается с 2022-05-13._

```
+-------+----------+----+
|partner|  rep_date|life|
+-------+----------+----+
|  03255|2022-01-07|   0|
|  03255|2022-01-09|   0|
|  03255|2022-01-13|   0|
|  03255|2022-02-21|   1|
|  03255|2022-03-24|   2|
|  03255|2022-04-02|   2|
|  03255|2022-05-13|   3|
|  03255|2022-05-23|   3|
|  03255|2022-05-31|   3|
+-------+----------+----+

```

## Требования
* Задание необходимо выполнить на Spark (PySpark, Spark SQL, Scala)
* В качестве результата предоставить Jupyter Notebook содержащий полный ход решения с комментариями

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as spark_sum, expr, lag, when
from pyspark.sql.window import Window

# Инициализация Spark-сессии
spark = SparkSession.builder.appName("CustomerActivityAnalysis").getOrCreate()

# Читаем данные
df = spark.read.parquet("dataset/")

# Возьмем отдельно записи begin comission end
df_begin = df.filter(df['article'] == 'begin').withColumnRenamed("value", "begin_value")
df_end = df.filter(df['article'] == 'end').withColumnRenamed("value", "end_value")
df_comission = df.filter(df['article'] == 'comission').withColumnRenamed("value", "comission_value")

# Джойним всё вместе
df_joined = df_begin.join(df_end, on=['partner', 'rep_date'], how='inner') \
                      .join(df_comission, on=['partner', 'rep_date'], how='left')

# Помечаем активный ли клиент если если остаток на конец дня не равен остаток в начале минус комиссия:
# begin - comission != end
df_with_activity = df_joined.withColumn(
    "is_active", 
    when((df_joined['begin_value'] - df_joined['comission_value'] != df_joined['end_value']), 1).otherwise(0)
)

# Добавляем периоды активности исключая записи неактивных дней
window_spec = Window.partitionBy("partner").orderBy("rep_date")
tmp_df = df_with_activity.filter(df_with_activity['is_active'] == 1).withColumn(
    "prev_date", lag("rep_date").over(window_spec)
)

# Добавляем периоды жизни
tmp_df = tmp_df.withColumn(
    "days_since_last", expr("datediff(rep_date, prev_date)")
)
tmp_df = tmp_df.withColumn(
    "life_group", expr("CASE WHEN days_since_last >= 30 OR prev_date IS NULL THEN 1 ELSE 0 END")
)
tmp_df = tmp_df.withColumn(
    "life", spark_sum("life_group").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

# датасет с результатами
final_df = tmp_df.select("partner", "rep_date", "life")

final_df.show()

+-------+----------+----+
|partner|  rep_date|life|
+-------+----------+----+
|  00000|2021-06-30|   1|
|  00001|2021-05-17|   1|
|  00001|2021-07-31|   2|
|  00002|2021-03-05|   1|
|  00002|2021-03-23|   1|
|  00002|2021-05-09|   2|
|  00004|2021-09-27|   1|
|  00005|2021-05-05|   1|
|  00006|2021-11-01|   1|
|  00006|2021-12-05|   2|
|  00006|2021-12-28|   2|
|  00008|2021-01-28|   1|
|  00008|2021-04-19|   2|
|  00008|2021-11-01|   3|
|  00008|2021-11-28|   3|
|  00009|2021-01-02|   1|
|  00009|2021-05-25|   2|
|  00010|2021-04-26|   1|
|  00011|2021-09-07|   1|
|  00012|2021-01-27|   1|
+-------+----------+----+
only showing top 20 rows

