# Описание задач

Для соревнования SNA Hackathon были собраны логи показов контента из открытых групп в новостных лентах пользователей за февраль-март 2018 года. В тестовое множество спрятаны последние полторы недели марта. Каждая запись в логе содержит информацию о том, что и кому было показано, а также о том, как отреагировал пользователь на этот контент: поставил «Класс», прокомментировал, проигнорировал или скрыл из ленты. 

Суть задачи в том, чтобы для каждого пользователя тестового множества отранжировать кандидатов, как можно выше поднимая тех, которые получат «класс»,.

Обычно мы давали одну задачу, но в этот раз решили дать сразу три. Вам не нужно их решать все, достаточно только одну. Поскольку пользовательская лента совмещает контент разного типа, то при его ранжировании востребованы навыки из разных областей — компьютерное зрение, работа с текстами и рекомендательные системы. 

В рамках онлайн-этапа мы предлагаем три набора данных, в каждом из которых представлен только один из типов информации: изображение, текст или данные о разнообразных коллаборативных признаках. 

Только на втором этапе, когда эксперты в разных областях соберутся вместе, будет раскрыт общий датасет, позволяющий найти точки для синергии разных методов.
После открытия чемпионата на платформе, вы увидите описание задач и получите возможность скачать необходимые для участия данные. 

# Описание данных
 

Информация представлена в формате Apache Parquet, который является основным для фреймворка Spark. Для работы с этим форматом из Python мы рекомендуем воспользоваться библиотекой Apache Arrow. Для простоты понимания в репозитории на GitHub выложены бейзлайны. Пользуйтесь! 

В обучающем множестве данные разложены по дням, а внутри дня разделены на 6 частей по ID пользователя (один и тот же пользователь всегда попадает в ту же самую часть). Такая раскладка позволяет участникам анализировать не все данные сразу, а ограничиться определёнными днями и/или подгруппами пользователей.

Обучающие наборы разбиты на три непересекающиеся группы: с текстами, с картинками и с коллаборативными признаками. В каждой группе данные содержат следующие поля:

* instanceId_userId — идентификатор пользователя (анонимизированный);
* instanceId_objectType — тип объекта;
* instanceId_objectId — идентификатор объекта (анонимизированный);
* feedback — массив с типами реакций пользователя (наличие в массиве токена Liked говорит о том, что объект получил «класс» от пользователя);
* audit_clientType — тип платформы, с которой зашёл пользователь;
* audit_timestamp — время, когда строилась лента;
* metadata_ownerId — автор показанного объекта (анонимизированный);
* metadata_createdAt — дата создания показанного объекта.


Для объектов из обучающего текстового множества дополнительно предоставлены связанные с ними тексты в формате Apache Parquet:

* objectId — идентификатор объекта;
* lang — язык текста (на базе детектора языка Одноклассников);
* text — сырой текст, связанный с объектом;
* preprocessed — массив токенов, полученный после фильтрации пунктуации и стемминга.



В данных для ранжирования по картинкам дополнительно присутствует поле-массив ImageId с MD5-хешами, связанными с объектами картинок. Тела изображений разложены по отдельным tar-файлам, в зависимости от первой буквы хеша.


В блоке с коллаборативными признаками представлена разнообразная дополнительная информация:

* audit_* — расширенная информация о контексте построения ленты;
* metadata_* — расширенная информация о самом объекте;
* userOwnerCounters_* — информация о предыдущих взаимодействиях пользователя и автора контента;
* ownerUserCounters_* — информация о предыдущих взаимодействиях автора контента и пользователя;
* membership_* — информация о членстве пользователя в группе, где опубликован контент;
* user_* — подробная информация о пользователе;
* auditweights_* — большое количество runtime-признаков, извлечённых текущей системой.
Структуры тестовых наборов эквивалентны структуре обучающих множеств, но не разложены по дням и не содержат поля feedback.

Оценка результата

Участники чемпионата должны так отсортировать ленту, чтобы объекты с высокой вероятностью «класса» оказались наверху. Сортировка производится индивидуально для каждого пользователя, после чего формируется текст сабмита следующего вида (формат соответствует экспорту из Pandas-датафрейма с колонками типа int и int[]):

* User_id_1,"[object_id_1_1, object_id_2_2]
* User_id_2,"[object_id_2_1, object_id_2_2, object_id_2_3]


В сабмите должна присутствовать строчка для каждого пользователя тестового набора, а строки отсортированы по возрастанию ID. Объекты для каждого пользователя должны быть отсортированы по убыванию релевантности.  При оценке сабмита для каждого пользователя будет посчитан его личный ROC-AUC, после чего посчитано среднее по всем пользователям и умножено на 100. 

# Some imports

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.5`

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val spark = NotebookSparkSession
    .builder()
    .master("local[*]")
    .getOrCreate()

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36m$ivy.$                                  

[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@2505909c

In [2]:
import $ivy.`org.plotly-scala::plotly-almond:0.7.6`
import plotly._, plotly.element._, plotly.layout._, plotly.Almond._

repl.pprinter() = repl.pprinter().copy(defaultHeight = 3)

[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36mplotly._, plotly.element._, plotly.layout._, plotly.Almond._

[39m

In [3]:
def sc = spark.sparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

defined [32mfunction[39m [36msc[39m
[36msqlContext[39m: [32mSQLContext[39m = org.apache.spark.sql.SQLContext@7e99b332

In [4]:
import spark.implicits._
// val df = sqlContext.read.parquet("data/test/part-00000-6d949390-48b0-4104-a477-39e306b726c5-c000.gz.parquet")
// df.count()

[32mimport [39m[36mspark.implicits._
// val df = sqlContext.read.parquet("data/test/part-00000-6d949390-48b0-4104-a477-39e306b726c5-c000.gz.parquet")
// df.count()[39m

In [5]:
import $ivy.`org.apache.spark::spark-mllib:2.4.5`

import org.apache.spark.ml.linalg.{Matrix, Vectors}
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler, ChiSqSelector, QuantileDiscretizer}
import org.apache.spark.ml.clustering.{KMeans, BisectingKMeans, GaussianMixture}

[32mimport [39m[36m$ivy.$                                    

[39m
[32mimport [39m[36morg.apache.spark.ml.linalg.{Matrix, Vectors}
[39m
[32mimport [39m[36morg.apache.spark.ml.stat.Correlation
[39m
[32mimport [39m[36morg.apache.spark.ml.classification.LogisticRegression
[39m
[32mimport [39m[36morg.apache.spark.ml.evaluation.BinaryClassificationEvaluator
[39m
[32mimport [39m[36morg.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.{MinMaxScaler, VectorAssembler, ChiSqSelector, QuantileDiscretizer}
[39m
[32mimport [39m[36morg.apache.spark.ml.clustering.{KMeans, BisectingKMeans, GaussianMixture}[39m

In [6]:
val df = sqlContext.read.parquet("train/")

[36mdf[39m: [32mDataFrame[39m = [instanceId_userId: int, instanceId_objectType: string ... 167 more fields]

In [7]:
df.printSchema

root
 |-- instanceId_userId: integer (nullable = true)
 |-- instanceId_objectType: string (nullable = true)
 |-- instanceId_objectId: integer (nullable = true)
 |-- audit_pos: long (nullable = true)
 |-- audit_clientType: string (nullable = true)
 |-- audit_timestamp: long (nullable = true)
 |-- audit_timePassed: long (nullable = true)
 |-- audit_experiment: string (nullable = true)
 |-- audit_resourceType: long (nullable = true)
 |-- metadata_ownerId: integer (nullable = true)
 |-- metadata_ownerType: string (nullable = true)
 |-- metadata_createdAt: long (nullable = true)
 |-- metadata_authorId: integer (nullable = true)
 |-- metadata_applicationId: long (nullable = true)
 |-- metadata_numCompanions: integer (nullable = true)
 |-- metadata_numPhotos: integer (nullable = true)
 |-- metadata_numPolls: integer (nullable = true)
 |-- metadata_numSymbols: integer (nullable = true)
 |-- metadata_numTokens: integer (nullable = true)
 |-- metadata_numVideos: integer (nullable = true)
 |-- me

 |-- auditweights_userOwner_USER_INTERNAL_LIKE: double (nullable = true)
 |-- auditweights_userOwner_USER_INTERNAL_UNLIKE: double (nullable = true)
 |-- auditweights_userOwner_USER_PRESENT_SEND: double (nullable = true)
 |-- auditweights_userOwner_USER_PROFILE_VIEW: double (nullable = true)
 |-- auditweights_userOwner_USER_SEND_MESSAGE: double (nullable = true)
 |-- auditweights_userOwner_USER_STATUS_COMMENT_CREATE: double (nullable = true)
 |-- auditweights_userOwner_VIDEO: double (nullable = true)
 |-- auditweights_userOwner_VOTE_POLL: double (nullable = true)
 |-- auditweights_x_ActorsRelations: long (nullable = true)
 |-- auditweights_likersSvd_spark_hyper: double (nullable = true)
 |-- auditweights_source_PROMO: double (nullable = true)
 |-- date: date (nullable = true)



# Get info about dataframe

### Посмотрим на датафрейм.

In [8]:
val n_rows = df.count()

[36mn_rows[39m: [32mLong[39m = [32m18286575L[39m

In [9]:
val n_columns = df.columns.length

[36mn_columns[39m: [32mInt[39m = [32m169[39m

In [10]:
df.select("audit_pos").describe().show()

+-------+-----------------+
|summary|        audit_pos|
+-------+-----------------+
|  count|         18286575|
|   mean|11.39937626373446|
| stddev|16.76051641364607|
|    min|                0|
|    max|              489|
+-------+-----------------+



In [11]:
df.head(5)

[36mres10[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [108,Post,18452434,0,MOB,1520194086477,10184811,XPRM-5386_G1,8,13680,GROUP_OPE...

# About target

### Сформулируем критерий для таргета - отток. Посмотрим на статусы feedback.

In [12]:
df.select("feedback").show(10)

+----------------+
|        feedback|
+----------------+
|         [Liked]|
|[Clicked, Liked]|
|         [Liked]|
|       [Ignored]|
|       [Ignored]|
|       [Ignored]|
|[Clicked, Liked]|
|         [Liked]|
|       [Ignored]|
|       [Ignored]|
+----------------+
only showing top 10 rows



In [13]:
df.select("feedback").schema

[36mres12[39m: [32mtypes[39m.[32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"feedback"[39m, [33mArrayType[39m(StringType, true), true, {})
)

### Для оттока нам будут интересны статусы, характеризующие вовлеченность пользователя в контент ленты. Поэтому, для вовлеченности будем смотреть:
* Просмотр
* Клик
* Лайк

In [14]:
val df_flags_tmp = df
.withColumn("feedback_Liked_flag", array_contains($"feedback", "Liked").cast("Int"))
.withColumn("feedback_Clicked_flag", array_contains($"feedback", "Clicked").cast("Int"))
.withColumn("feedback_Viewed_flag", array_contains($"feedback", "Viewed").cast("Int"))
.withColumn("activity_flag", $"feedback_Liked_flag" + $"feedback_Clicked_flag" + $"feedback_Viewed_flag")
.withColumn("report_date", to_date(from_unixtime($"audit_timestamp" / 1000)))

[36mdf_flags_tmp[39m: [32mDataFrame[39m = [instanceId_userId: int, instanceId_objectType: string ... 172 more fields]

In [15]:
df_flags_tmp.select("instanceId_userId", "report_date", "feedback", "feedback_Liked_flag", "feedback_Clicked_flag", "feedback_Viewed_flag", "activity_flag").show(15)

+-----------------+-----------+----------------+-------------------+---------------------+--------------------+-------------+
|instanceId_userId|report_date|        feedback|feedback_Liked_flag|feedback_Clicked_flag|feedback_Viewed_flag|activity_flag|
+-----------------+-----------+----------------+-------------------+---------------------+--------------------+-------------+
|              108| 2018-03-04|         [Liked]|                  1|                    0|                   0|            1|
|              231| 2018-03-03|[Clicked, Liked]|                  1|                    1|                   0|            2|
|              423| 2018-03-04|         [Liked]|                  1|                    0|                   0|            1|
|              624| 2018-03-04|       [Ignored]|                  0|                    0|                   0|            0|
|              768| 2018-03-04|       [Ignored]|                  0|                    0|                   0|       

### Неотток = Активность. 
### Посчитаем флаги активности по каждому дню для каждого пользователя. Для этого сгруппируем датафрейм и посчитаем сумму по флагам активности.

In [16]:
val df_flags = df_flags_tmp.groupBy("instanceId_userId", "report_date").agg(
    sum("activity_flag").as("activity_flag"),
    max("audit_timestamp").as("audit_timestamp"),
)
df_flags.show()

+-----------------+-----------+-------------+---------------+
|instanceId_userId|report_date|activity_flag|audit_timestamp|
+-----------------+-----------+-------------+---------------+
|            96495| 2018-03-04|            0|  1520183519817|
|           232125| 2018-03-04|            0|  1520168706454|
|           237948| 2018-03-04|            0|  1520139716325|
|           258231| 2018-03-04|            0|  1520149331388|
|           282264| 2018-03-04|            2|  1520182451103|
|           299859| 2018-03-04|            0|  1520170731741|
|           315990| 2018-03-04|            0|  1520192657376|
|           356985| 2018-03-04|            0|  1520175449578|
|           412998| 2018-03-04|            0|  1520179368385|
|           506979| 2018-03-04|            2|  1520165809565|
|           512100| 2018-03-04|            0|  1520142304475|
|           533940| 2018-03-04|            1|  1520188462136|
|           881097| 2018-03-04|            0|  1520150430625|
|       

[36mdf_flags[39m: [32mDataFrame[39m = [instanceId_userId: int, report_date: date ... 2 more fields]

### Посмотрим на даты в датафрейме. Посмотрим на календрь. Это будет полезно для окончательного формирования таргета.

In [17]:
val calendar_df = df_flags.select("report_date").distinct().orderBy(asc("report_date"))

[36mcalendar_df[39m: [32mDataset[39m[[32mRow[39m] = [report_date: date]

In [18]:
calendar_df.show(50)

+-----------+
|report_date|
+-----------+
| 2018-01-31|
| 2018-02-01|
| 2018-02-02|
| 2018-02-03|
| 2018-02-04|
| 2018-02-05|
| 2018-02-06|
| 2018-02-07|
| 2018-02-08|
| 2018-02-09|
| 2018-02-10|
| 2018-02-11|
| 2018-02-12|
| 2018-02-13|
| 2018-02-14|
| 2018-02-15|
| 2018-02-16|
| 2018-02-17|
| 2018-02-18|
| 2018-02-19|
| 2018-02-20|
| 2018-02-21|
| 2018-02-22|
| 2018-02-23|
| 2018-02-24|
| 2018-02-25|
| 2018-02-26|
| 2018-02-27|
| 2018-02-28|
| 2018-03-01|
| 2018-03-02|
| 2018-03-03|
| 2018-03-04|
| 2018-03-05|
| 2018-03-06|
| 2018-03-07|
| 2018-03-08|
| 2018-03-09|
| 2018-03-10|
| 2018-03-11|
| 2018-03-12|
| 2018-03-13|
| 2018-03-14|
| 2018-03-15|
| 2018-03-16|
| 2018-03-17|
| 2018-03-18|
| 2018-03-19|
| 2018-03-20|
| 2018-03-21|
+-----------+



### Посмотрим на интервал дат для трейна. Очень важно для следующего шага.

In [19]:
calendar_df.agg(min("report_date"), max("report_date")).show()

+----------------+----------------+
|min(report_date)|max(report_date)|
+----------------+----------------+
|      2018-01-31|      2018-03-21|
+----------------+----------------+



### Модель оттока должна смотреть на несколько дней вперед, чтобы мы могли делать какое то предложение до наступления события и вовлекать пользователя до того, как уйдет в отток.  Примеры воздействия на могут быть следующие:
* Уведомление PUSH об обновлениях в группах, которые интересны пользователю.
* Уведомление PUSH о действиях друзей пользователя.
* И тд.

### Я выбрал окно для таргета оттока - 3 дня вперед.

In [20]:
val df_flags_dates = df_flags.select("instanceId_userId", "audit_timestamp", "activity_flag", "report_date")
.withColumn("report_date_2", date_sub(to_date(from_unixtime($"audit_timestamp" / 1000)), -1))
.withColumn("report_date_3", date_sub(to_date(from_unixtime($"audit_timestamp" / 1000)), -2))
.withColumn("report_date_4", date_sub(to_date(from_unixtime($"audit_timestamp" / 1000)), -3))

[36mdf_flags_dates[39m: [32mDataFrame[39m = [instanceId_userId: int, audit_timestamp: bigint ... 5 more fields]

### Посмотрим, правильно ли мы собрали окно на 3 дня вперед.

In [21]:
df_flags_dates.createOrReplaceTempView("flags_dates")

In [22]:
spark.sql("select d1.instanceId_userId, d1.report_date, d1.activity_flag, d2.report_date as report_dt_2, coalesce(d2.activity_flag, 0) as activ_fl_2, d3.report_date as report_dt_3, coalesce(d3.activity_flag, 0) as activity_fl_3, d4.report_date as report_dt_4, coalesce(d4.activity_flag, 0) as activity_fl_4 from flags_dates d1 left join flags_dates d2 on d1.report_date_2=d2.report_date and d1.instanceId_userId=d2.instanceId_userId left join flags_dates d3 on d1.report_date_3=d3.report_date and d1.instanceId_userId=d3.instanceId_userId left join flags_dates d4 on d1.report_date_4=d4.report_date and d1.instanceId_userId=d4.instanceId_userId").show

+-----------------+-----------+-------------+-----------+----------+-----------+-------------+-----------+-------------+
|instanceId_userId|report_date|activity_flag|report_dt_2|activ_fl_2|report_dt_3|activity_fl_3|report_dt_4|activity_fl_4|
+-----------------+-----------+-------------+-----------+----------+-----------+-------------+-----------+-------------+
|           280740| 2018-01-31|            0|       null|         0|       null|            0|       null|            0|
|          1046904| 2018-01-31|            0| 2018-02-01|         0|       null|            0|       null|            0|
|          1165407| 2018-01-31|            1|       null|         0|       null|            0|       null|            0|
|          1755825| 2018-01-31|            1|       null|         0|       null|            0|       null|            0|
|          1849224| 2018-01-31|            0|       null|         0|       null|            0|       null|            0|
|          2142267| 2018-01-31| 

### Соберем датафрейм с флагами активности на 3 дня вперед.

In [23]:
val targets_df_tmp = spark.sql("select d1.instanceId_userId, d1.report_date, d1.activity_flag, coalesce(d2.activity_flag, 0) as activity_flag_2, coalesce(d3.activity_flag, 0) as activity_flag_3, coalesce(d4.activity_flag, 0) as activity_flag_4 from flags_dates d1 left join flags_dates d2 on d1.report_date_2=d2.report_date and d1.instanceId_userId=d2.instanceId_userId left join flags_dates d3 on d1.report_date_3=d3.report_date and d1.instanceId_userId=d3.instanceId_userId left join flags_dates d4 on d1.report_date_4=d4.report_date and d1.instanceId_userId=d4.instanceId_userId")
targets_df_tmp.show()

+-----------------+-----------+-------------+---------------+---------------+---------------+
|instanceId_userId|report_date|activity_flag|activity_flag_2|activity_flag_3|activity_flag_4|
+-----------------+-----------+-------------+---------------+---------------+---------------+
|           280740| 2018-01-31|            0|              0|              0|              0|
|          1046904| 2018-01-31|            0|              0|              0|              0|
|          1165407| 2018-01-31|            1|              0|              0|              0|
|          1755825| 2018-01-31|            1|              0|              0|              0|
|          1849224| 2018-01-31|            0|              0|              0|              0|
|          2142267| 2018-01-31|            0|              0|              0|              0|
|          2330496| 2018-01-31|            1|              0|              0|              0|
|          2334525| 2018-01-31|            0|              0

[36mtargets_df_tmp[39m: [32mDataFrame[39m = [instanceId_userId: int, report_date: date ... 4 more fields]

### Сформируем окончательный таргет для модели оттока.
1. Наличие флага активности в текущий и следующий день.
2. Отсутствие флага активности в 3 и 4 день.
3. Сделаем фильтр на дату менее 2018-03-19, так как после таргет еще не вызрел.

In [24]:
val targets_df = targets_df_tmp.filter($"activity_flag" > 0 and $"activity_flag_2" > 0 and $"report_date" < "2018-03-19")
.withColumn("label", when($"activity_flag_3" === 0 && $"activity_flag_4" === 0, 1)
            .otherwise(0)).select("instanceId_userId", "report_date", "label").cache()
targets_df.show()

+-----------------+-----------+-----+
|instanceId_userId|report_date|label|
+-----------------+-----------+-----+
|          6893838| 2018-01-31|    0|
|           306036| 2018-02-01|    0|
|           339429| 2018-02-01|    1|
|           548673| 2018-02-01|    0|
|           822885| 2018-02-01|    0|
|          3691557| 2018-02-01|    1|
|          4096401| 2018-02-01|    1|
|          4198875| 2018-02-01|    0|
|          4666080| 2018-02-01|    0|
|          4999875| 2018-02-01|    1|
|          5099466| 2018-02-01|    1|
|          5303145| 2018-02-01|    1|
|          6049128| 2018-02-01|    1|
|          6071649| 2018-02-01|    1|
|          6345990| 2018-02-01|    1|
|          6581355| 2018-02-01|    1|
|          6619776| 2018-02-01|    1|
|          7125012| 2018-02-01|    0|
|          7546404| 2018-02-01|    1|
|          7715256| 2018-02-01|    1|
+-----------------+-----------+-----+
only showing top 20 rows



[36mtargets_df[39m: [32mDataset[39m[[32mRow[39m] = [instanceId_userId: int, report_date: date ... 1 more field]

### Посмотрим аналитику по таргету оттока. 

### Средняя доля оттока и количество клиентов по всей выборке.

In [25]:
val targets_analytics = targets_df.agg(mean("label"), count("label"))

[36mtargets_analytics[39m: [32mDataFrame[39m = [avg(label): double, count(label): bigint]

In [26]:
targets_analytics.show(50)

+-----------------+------------+
|       avg(label)|count(label)|
+-----------------+------------+
|0.645734278788633|      353551|
+-----------------+------------+



### Посмотрим на стабильность таргета по дням. Средняя доля оттока и количество клиентов в разрезе report_date. Нестабильность обуславливается праздничными днями.

In [27]:
val targets_analytics = targets_df.groupBy("report_date").agg(mean("label"), count("label"))

[36mtargets_analytics[39m: [32mDataFrame[39m = [report_date: date, avg(label): double ... 1 more field]

In [28]:
targets_analytics.sort(col("report_date").asc).show(50)

+-----------+------------------+------------+
|report_date|        avg(label)|count(label)|
+-----------+------------------+------------+
| 2018-01-31|0.5791962174940898|         423|
| 2018-02-01|0.6390174897119342|        7776|
| 2018-02-02|0.6327129015808262|        7844|
| 2018-02-03|0.6513633185737591|        8582|
| 2018-02-04|0.6301940707227051|        8399|
| 2018-02-05|0.6325542965061378|        8472|
| 2018-02-06|0.6378721901847711|        8497|
| 2018-02-07|0.6470308788598574|        8420|
| 2018-02-08|0.7832330919356818|        8147|
| 2018-02-09|0.7677042272665489|        7357|
| 2018-02-10|0.5335276967930029|         343|
| 2018-02-11|0.5441860465116279|         430|
| 2018-02-12|0.6326554881674397|        8409|
| 2018-02-13| 0.630317848410758|        8180|
| 2018-02-14|0.7120772946859903|        8280|
| 2018-02-15| 0.749093107617896|        8270|
| 2018-02-16|0.6910396171231056|        3761|
| 2018-02-17|0.6505494505494506|        3185|
| 2018-02-18| 0.621261378413524|  

### Теперь нужно подготовить фичи.

### Отберем фичи, характеризующие поведение пользователя.

In [29]:
val feature_columns = scala.collection.mutable.ListBuffer[String]()
for (elem <- df.schema) {
    if ( (elem.name.toString.split("_")(0) == "userOwnerCounters") | (elem.name.toString.split("_")(0) == "user")
       | (elem.name.toString.split("_")(0) == "auditweights") | (elem.name.toString.split("_")(0) == "owner")
    )
    feature_columns += elem.name.toString
}

[36mfeature_columns[39m: [32mcollection[39m.[32mmutable[39m.[32mListBuffer[39m[[32mString[39m] = [33mListBuffer[39m(
  [32m"userOwnerCounters_USER_FEED_REMOVE"[39m,
...

### Проблема в том, что в исходном датафрейме имеются состояния клиента несколько раз в день. Чтобы подготовить датафрейм для обучения мы сгруппируем его по пользователю и по дню, а для фичей, характеризующих поведение пользователя посчитаем среднее.

In [30]:
val train_tmp = df
.withColumn("report_date", to_date(from_unixtime($"audit_timestamp" / 1000)))
.groupBy("report_date", "instanceId_userId").mean(feature_columns:_*)

[36mtrain_tmp[39m: [32mDataFrame[39m = [report_date: date, instanceId_userId: int ... 111 more fields]

### Выберем колонки с фичами - средними и таргетом.

In [31]:
val feature_columns_train = feature_columns.map(x => "avg(" + x.toString + ")")
feature_columns_train += "label"

[36mfeature_columns_train[39m: [32mcollection[39m.[32mmutable[39m.[32mListBuffer[39m[[32mString[39m] = [33mListBuffer[39m(
  [32m"avg(userOwnerCounters_USER_FEED_REMOVE)"[39m,
...
[36mres30_1[39m: [32mcollection[39m.[32mmutable[39m.[32mListBuffer[39m[[32mString[39m] = [33mListBuffer[39m(
  [32m"avg(userOwnerCounters_USER_FEED_REMOVE)"[39m,
...

### Сджоиним фичи с таргетами для получения train датасета. Сразу же заполним пропуски 0, так как по бизнес смыслу это самое близкое для средних. Отберем фичи по списку выше, исключив id юзера и report_date.

In [32]:
val train_df = train_tmp.join(targets_df, targets_df("instanceId_userId") === train_tmp("instanceId_userId") &&
    targets_df("report_date") === train_tmp("report_date"),"inner").na.fill(0)
.select(feature_columns_train.head, feature_columns_train.tail: _*)

[36mtrain_df[39m: [32mDataFrame[39m = [avg(userOwnerCounters_USER_FEED_REMOVE): double, avg(userOwnerCounters_USER_PROFILE_VIEW): double ... 110 more fields]

In [33]:
train_df.select("avg(auditweights_source_PROMO)", "label").show(1)

+------------------------------+-----+
|avg(auditweights_source_PROMO)|label|
+------------------------------+-----+
|                           0.0|    1|
+------------------------------+-----+
only showing top 1 row



### Для моделирования необходимо применить VectorAssembler.

In [34]:
val assembler = new VectorAssembler().setInputCols(feature_columns_train.filter(_ != "label").toArray).setOutputCol("features")
val output = assembler.transform(train_df).select("features", "label")

[36massembler[39m: [32mVectorAssembler[39m = vecAssembler_84e8546ba3a5
[36moutput[39m: [32mDataFrame[39m = [features: vector, label: int]

### Сделаем скейлинг для логистической регрессии MinMaxScaler.

In [35]:
val scaler = new MinMaxScaler().setInputCol("features").setOutputCol("Scaled_features")
val output_scaled = scaler.fit(output).transform(output)
output_scaled.select("features","Scaled_features").show(5)

+--------------------+--------------------+
|            features|     Scaled_features|
+--------------------+--------------------+
|(111,[40,41,42,43...|[0.0,0.5,0.5,0.5,...|
|(111,[0,23,40,41,...|[5.77591190586236...|
|(111,[22,23,24,40...|[0.0,0.5,0.5,0.5,...|
|(111,[23,40,41,42...|[0.0,0.5,0.5,0.5,...|
|(111,[24,40,41,42...|[0.0,0.5,0.5,0.5,...|
+--------------------+--------------------+
only showing top 5 rows



[36mscaler[39m: [32mMinMaxScaler[39m = minMaxScal_bb2518dcc0c7
[36moutput_scaled[39m: [32mDataFrame[39m = [features: vector, label: int ... 1 more field]

### Отберем фичи для логистической регрессии по Хи квадрату

In [36]:
// Feature selection using chisquareSelector
val css = new ChiSqSelector()
  .setFeaturesCol("Scaled_features")
  .setLabelCol("label")
  .setOutputCol("Aspect")
  .setFpr(0.1)

val output_selector = css.fit(output_scaled).transform(output_scaled).cache()
//val test = css.transform(test)
output_selector.select("Aspect", "label").show(5)

+--------------------+-----+
|              Aspect|label|
+--------------------+-----+
|[0.0,0.0,0.0,0.0,...|    1|
|[5.77591190586236...|    0|
|[0.0,0.0,0.0,0.0,...|    1|
|[0.0,0.0,0.0,0.0,...|    1|
|[0.0,0.0,0.0,0.0,...|    1|
+--------------------+-----+
only showing top 5 rows



[36mcss[39m: [32mChiSqSelector[39m = chiSqSelector_f4c5812eea88
[36moutput_selector[39m: [32mDataset[39m[[32mRow[39m] = [features: vector, label: int ... 2 more fields]

### Разделим датафрейм на тренировочную и тестовую выборки в соотношении 70% и 30% соответственно

In [37]:
val Array(training, test) = output_selector.randomSplit(Array(0.7, 0.3), seed = 777)

[36mtraining[39m: [32mDataset[39m[[32mRow[39m] = [features: vector, label: int ... 2 more fields]
[36mtest[39m: [32mDataset[39m[[32mRow[39m] = [features: vector, label: int ... 2 more fields]

In [38]:
training.select("Aspect", "label").show(5)

+--------------------+-----+
|              Aspect|label|
+--------------------+-----+
|[0.08371239665130...|    1|
|[5.64625430917878...|    0|
|[1.19504793745868...|    1|
|[3.20642720964502...|    1|
|[9.79458171305819...|    0|
+--------------------+-----+
only showing top 5 rows



In [39]:
test.select("Aspect", "label").show(5)

+--------------------+-----+
|              Aspect|label|
+--------------------+-----+
|[0.09697299577607...|    1|
|[5.39086646634568...|    0|
|[0.06721016968013...|    1|
|[8.87798862225146...|    0|
|[8.64546028713874...|    1|
+--------------------+-----+
only showing top 5 rows



### Обучим модель Логистической регрессии на выбранных фичах для предсказания оттока.

In [None]:
// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
val lr = new LogisticRegression()
.setMaxIter(45)
.setRegParam(0.02)
.setElasticNetParam(0.08)
.setFeaturesCol("Aspect")
.setLabelCol("label")


val model = lr.fit(training)
val predict_train = model.transform(training)
val predict_test = model.transform(test)
//predict_test.select("Outcome","prediction").show(10)

In [41]:
predict_test.select("label", "Aspect", "rawPrediction", "probability", "prediction").show()

+-----+--------------------+--------------------+--------------------+----------+
|label|              Aspect|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|    1|[0.09697299577607...|[-0.3970528956361...|[0.40202061886501...|       1.0|
|    0|[5.39086646634568...|[0.97057759066969...|[0.72523460889123...|       0.0|
|    1|[0.06721016968013...|[0.29871099965590...|[0.57412737976798...|       0.0|
|    0|[8.87798862225146...|[-0.1395664373965...|[0.46516491774104...|       1.0|
|    1|[8.64546028713874...|[-0.1072482888368...|[0.47321359807265...|       1.0|
|    0|[0.00557536937784...|[0.09663708217832...|[0.52414048668615...|       0.0|
|    1|[0.07366709455025...|[-0.0826898338658...|[0.47933931266664...|       1.0|
|    0|[0.00832188646673...|[5.86196453589847...|[0.99716242692305...|       0.0|
|    1|[2.58026049835855...|[0.07068336575819...|[0.51766348794961...|       0.0|
|    1|[6.789951

### Посчитаем метрики на тесте и трейне. Посмотрим, чтобы не было большой разницы между ними.

In [42]:
val evaluator = new BinaryClassificationEvaluator()
.setRawPredictionCol("rawPrediction")
.setLabelCol("label")

val lr_auc_train = evaluator.evaluate(predict_train)
val lr_auc_test = evaluator.evaluate(predict_test)

[36mevaluator[39m: [32mBinaryClassificationEvaluator[39m = binEval_a99c3cb71bd5
[36mlr_auc_train[39m: [32mDouble[39m = [32m0.6418781527174102[39m
[36mlr_auc_test[39m: [32mDouble[39m = [32m0.6381972197861229[39m

### Метрики получились вполне адекватные для задачи оттока. В таких задачах наоборот, слишком высокий аук свидетельствуют:
1. О проникновении информации о таргете из будущего в фичи
2. Нахождение некорректных причинно-следственных связей

In [43]:
println("LogisticRegression Roc auc train", lr_auc_train)

(LogisticRegression Roc auc train,0.6418781527174102)


In [44]:
println("LogisticRegression Roc auc test ", lr_auc_test)

(LogisticRegression Roc auc test ,0.6381972197861229)


### Попробуем сегментировать юзеров.

### Классический подход - нарезать сегменты в бакеты по вероятности оттока. Попробуем 5 бакетов. 
* Засплитим столбец с выходом вероятности для 2х классов.

In [54]:
val first = udf((v: org.apache.spark.ml.linalg.Vector) => v.toArray(0))
val second = udf((v: org.apache.spark.ml.linalg.Vector) => v.toArray(1))
val predict_train_new = predict_train
      .withColumn("prob1", first($"probability"))
      .withColumn("prob2", second($"probability"))
      .drop("probability")

[36mfirst[39m: [32mexpressions[39m.[32mUserDefinedFunction[39m = [33mUserDefinedFunction[39m(
  ammonite.$sess.cmd53$Helper$$Lambda$7642/1804894790@129e27c8,
...
[36msecond[39m: [32mexpressions[39m.[32mUserDefinedFunction[39m = [33mUserDefinedFunction[39m(
  ammonite.$sess.cmd53$Helper$$Lambda$7643/21624963@66c2f804,
...
[36mpredict_train_new[39m: [32mDataFrame[39m = [features: vector, label: int ... 6 more fields]

In [58]:
predict_train_new.select("label", "prob1", "prob2").show(5)

+-----+-------------------+--------------------+
|label|              prob1|               prob2|
+-----+-------------------+--------------------+
|    1| 0.4290044082984949|   0.570995591701505|
|    0|0.45451305547564835|  0.5454869445243516|
|    1| 0.4770434532270394|  0.5229565467729606|
|    1|  0.556040418212389|   0.443959581787611|
|    0| 0.9726554775374182|0.027344522462581723|
+-----+-------------------+--------------------+
only showing top 5 rows



### Применим QuantileDiscretizer для нарезки по бакетам.

In [72]:
val qds = new QuantileDiscretizer().setNumBuckets(5).setInputCol("prob2").setOutputCol("buckets")
val bucketizer = qds.fit(predict_train_new)
val predict_train_buckets = bucketizer.setHandleInvalid("skip").transform(predict_train_new).select("label", "prob2", "buckets")

[36mqds[39m: [32mQuantileDiscretizer[39m = quantileDiscretizer_5358ac8074e3
[36mbucketizer[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mml[39m.[32mfeature[39m.[32mBucketizer[39m = quantileDiscretizer_5358ac8074e3
[36mpredict_train_buckets[39m: [32mDataFrame[39m = [label: int, prob2: double ... 1 more field]

In [73]:
predict_train_buckets.show(5)

+-----+--------------------+-------+
|label|               prob2|buckets|
+-----+--------------------+-------+
|    1|   0.570995591701505|    0.0|
|    0|  0.5454869445243516|    0.0|
|    1|  0.5229565467729606|    0.0|
|    1|   0.443959581787611|    0.0|
|    0|0.027344522462581723|    0.0|
+-----+--------------------+-------+
only showing top 5 rows



### Посмотрим на границы вероятности (мин, макс), средние вероятности в бакетах и поймем, какой целевой сегмент нам нужен для работы с оттоком:
* Бакет 4 как сегмент с наибольшей вероятностью оттока.
* Бакет 3 тоже можно потестить, т.к. клиенты с наибольшей вероятностью оттока могут быть не склонны к реактивации вообще.

### В принципе, все готово для АВ теста. Будем запускать Бакеты 3 и 4 для пилота и смотреть за долей реактивации в них по отдельности.

In [74]:
predict_train_buckets.groupBy("buckets").agg(count("prob2"), mean("prob2") , min("prob2") , max("prob2")).sort(col("buckets").asc).show()

+-------+------------+------------------+--------------------+------------------+
|buckets|count(prob2)|        avg(prob2)|          min(prob2)|        max(prob2)|
+-------+------------+------------------+--------------------+------------------+
|    0.0|       50043|0.5007798600388275|9.673905894212953...|0.5849134396343241|
|    1.0|       49376|0.6098084165651789|  0.5849142753996271|0.6322062726733407|
|    2.0|       49754|0.6564785265511999|  0.6322074529821985|0.6882237418273143|
|    3.0|       49623|0.7079756025352025|   0.688223898327393|0.7282386525530309|
|    4.0|       48990| 0.746729271308491|  0.7282387822002222|0.9999999995598345|
+-------+------------+------------------+--------------------+------------------+



### Посмотрим на сколько хорошо средняя вероятность в сегменте по модели соответствует фактической вероятности по таргету.

In [75]:
predict_train_buckets.groupBy("buckets").agg(mean("label"), mean("prob2")).sort(col("buckets").asc).show()

+-------+-------------------+------------------+
|buckets|         avg(label)|        avg(prob2)|
+-------+-------------------+------------------+
|    0.0|0.43924225166356934|0.5007798600388275|
|    1.0| 0.6137597213220998|0.6098084165651789|
|    2.0| 0.7039032037625116|0.6564785265511999|
|    3.0| 0.7140237389919997|0.7079756025352025|
|    4.0| 0.7661359461114513| 0.746729271308491|
+-------+-------------------+------------------+



### Попробуем альтернативные подходы. Например, без учителя. Поскольку, мы отобрали фичи по влиянию на таргет, то значит, по ним в евклидовом пространстве можно выделить сегменты, с разной степенью вовлеченности и оттока.

### Сегментировать клиентов будем кластеризацией методом Bisecting k-means. Выберем базово 3 кластера, с целью поиска следующих сегментов юзеров:
1. Невовлеченные юзеры - высокая доля оттока.
2. Неопределеные юзеры.
3. Активные юзеры.

In [None]:
// Trains a Bisecting k-means model.
val mixture = new GaussianMixture().setK(3).setSeed(4).setFeaturesCol("Aspect").setPredictionCol("Clusters")
val model_mixture = mixture.fit(training)

// Evaluate clustering by computing Within Set Sum of Squared Errors.
//val WSSSE = model_km.computeCost(training)

In [42]:
val clusters = model_mixture.transform(training)
clusters.show(5)

+--------------------+-----+--------------------+--------------------+--------+--------------------+
|            features|label|     Scaled_features|              Aspect|Clusters|         probability|
+--------------------+-----+--------------------+--------------------+--------+--------------------+
|(111,[0,18,19,22,...|    1|[0.08371239665130...|[0.08371239665130...|       2|[2.73988975235334...|
|(111,[0,18,22,23,...|    0|[5.64625430917878...|[5.64625430917878...|       0|[1.0,5.1933930615...|
|(111,[0,18,22,23,...|    1|[1.19504793745868...|[1.19504793745868...|       1|[1.99601012388526...|
|(111,[0,18,22,23,...|    1|[3.20642720964502...|[3.20642720964502...|       1|[1.00168818392355...|
|(111,[0,18,22,23,...|    0|[9.79458171305819...|[9.79458171305819...|       2|[6.69361428003523...|
+--------------------+-----+--------------------+--------------------+--------+--------------------+
only showing top 5 rows



[36mclusters[39m: [32mDataFrame[39m = [features: vector, label: int ... 4 more fields]

### Такой подход тоже имеет место быть. Он скорее отделил клиентов с низкой вероятностью оттока.

In [44]:
clusters.groupBy("Clusters").agg(mean("label"), count("label")).show()

+--------+------------------+------------+
|Clusters|        avg(label)|count(label)|
+--------+------------------+------------+
|       1|0.6525714337213128|      221919|
|       2|0.6075754239154372|       18164|
|       0| 0.573672595092821|        7703|
+--------+------------------+------------+



### Тут имеет смысл объединить 2 подхода. Строить модель только на кластере №1. Так мы будем учиться ранжировать в более узком сегменте.

_______________________________________________________________________________________________________________________

## Выводы.

### Что улучшить?
* Группировать датафрейм с фичами не только по средним, но и поминимумам, максимум, отклонениям, чтобы получить больше информации распределения фичей внутри дня для юзера.
* Сделать подбор гиперпараметров (тут у меня все зависало, поэтому у меня нет этого шага).
* Попробовать другие методы отбора фичей.
* Попробовать другие нелинейные методы (случайный лес, бустинг)
* Попробовать другие подходы к опеределению .

### Что дальше делать?
* Большой плюс данной модели, что мы после скоринга имеем целый день для действий, в виду особенности определения таргета.
* Планировать АБ тест. Тестировать не только на верхних бакетах по вероятности оттока.
* Подумать о предложениях и уведомлениях пользователя, которые мы будем отправлять, с целью побуждения не выходить в отток.
* Подумать о дополнительных "плюшках" для отточных пользователей, не в рамках предложений.