## Домашнее задание № 2 по курсу "MLOps"
#### Работа с большими данными на Apache Spark
##### Автор: Кравченя Павел

##### Цели работы:
Отработка основных этапов работы с данными с помощью Apache Spark на примере датасета показов ленты социальной сети ``ok.ru``.

##### Постановка задачи:

1. Ознакомиться с задачей и описанием датасета [SNA Hackathon 2019](https://www.kaggle.com/sharthz23/sna-hackathon-2019-collaboration).

2. Скачать [датасет](https://www.kaggle.com/sharthz23/sna-hackathon-2019-collaboration/download).

3. Используя Spark:

* построить топ популярных групп для каждого типа обьекта;
* построить гистограммы популярности/активности по времени суток;
* посчитать корреляцию признаков с целевой переменной;
* найти другие интересные инсайты в данных.

Работа выполнялась с использованием Docker-образа системы ``almond.sh`` с версией ``Scala 2.12`` и ``Spark 2.4``.

Установим необходимые библиотеки для выполнения вычислений и визуализации.

In [1]:
import $ivy.`org.apache.spark::spark-core:2.4.0`
import $ivy.`org.apache.spark::spark-sql:2.4.0`
import $ivy.`org.plotly-scala::plotly-almond:0.8.2`
import $ivy.`org.apache.spark::spark-mllib:2.4.0`
import plotly._, plotly.element._, plotly.layout._, plotly.Almond._
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions.array_contains
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{collect_list, collect_set}
import scala.collection.mutable.WrappedArray

[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36mplotly._, plotly.element._, plotly.layout._, plotly.Almond._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.array_contains
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.expressions.Window
[39m
[32mimport [39m[36morg.apache.spark.sql.functions.{collect_list, collect_set}
[39m
[32mimport [39m[36mscala.collection.mutable.WrappedArray[39m

Установим необходимый уровень логирования сообщений и создадим Spark-сессию.

In [2]:
Logger.getLogger("org").setLevel(Level.OFF)

val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}


Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@2aebe5d6

Загрузим датасет формата ``parquet``.

In [3]:
import spark.implicits._
val data = spark.read.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat").
    load("homework_2/train/")

[32mimport [39m[36mspark.implicits._
[39m
[36mdata[39m: [32mDataFrame[39m = [instanceId_userId: int, instanceId_objectType: string ... 167 more fields]

Выведем схему данных в датафрейме.

In [4]:
data.printSchema()

root
 |-- instanceId_userId: integer (nullable = true)
 |-- instanceId_objectType: string (nullable = true)
 |-- instanceId_objectId: integer (nullable = true)
 |-- audit_pos: long (nullable = true)
 |-- audit_clientType: string (nullable = true)
 |-- audit_timestamp: long (nullable = true)
 |-- audit_timePassed: long (nullable = true)
 |-- audit_experiment: string (nullable = true)
 |-- audit_resourceType: long (nullable = true)
 |-- metadata_ownerId: integer (nullable = true)
 |-- metadata_ownerType: string (nullable = true)
 |-- metadata_createdAt: long (nullable = true)
 |-- metadata_authorId: integer (nullable = true)
 |-- metadata_applicationId: long (nullable = true)
 |-- metadata_numCompanions: integer (nullable = true)
 |-- metadata_numPhotos: integer (nullable = true)
 |-- metadata_numPolls: integer (nullable = true)
 |-- metadata_numSymbols: integer (nullable = true)
 |-- metadata_numTokens: integer (nullable = true)
 |-- metadata_numVideos: integer (nullable = true)
 |-- me

 |-- auditweights_userOwner_USER_INTERNAL_LIKE: double (nullable = true)
 |-- auditweights_userOwner_USER_INTERNAL_UNLIKE: double (nullable = true)
 |-- auditweights_userOwner_USER_PRESENT_SEND: double (nullable = true)
 |-- auditweights_userOwner_USER_PROFILE_VIEW: double (nullable = true)
 |-- auditweights_userOwner_USER_SEND_MESSAGE: double (nullable = true)
 |-- auditweights_userOwner_USER_STATUS_COMMENT_CREATE: double (nullable = true)
 |-- auditweights_userOwner_VIDEO: double (nullable = true)
 |-- auditweights_userOwner_VOTE_POLL: double (nullable = true)
 |-- auditweights_x_ActorsRelations: long (nullable = true)
 |-- auditweights_likersSvd_spark_hyper: double (nullable = true)
 |-- auditweights_source_PROMO: double (nullable = true)
 |-- date: date (nullable = true)



Выведем содержание одной строки датафрейма. Программный код заимствован из видеолекции по данной теме.

In [5]:
data.columns.zip(data.take(1).head.toSeq).foreach( z => print(f"${z._1}%-50s\t${z._2}\n") )

instanceId_userId                                 	108
instanceId_objectType                             	Post
instanceId_objectId                               	18452434
audit_pos                                         	0
audit_clientType                                  	MOB
audit_timestamp                                   	1520194086477
audit_timePassed                                  	10184811
audit_experiment                                  	XPRM-5386_G1
audit_resourceType                                	8
metadata_ownerId                                  	13680
metadata_ownerType                                	GROUP_OPEN_OFFICIAL
metadata_createdAt                                	1520147725000
metadata_authorId                                 	73356
metadata_applicationId                            	0
metadata_numCompanions                            	0
metadata_numPhotos                                	1
metadata_numPolls                                 	0
metadata_numSymbo

auditweights_userAge                              	36.0
auditweights_userOwner_CREATE_COMMENT             	0.9963158
auditweights_userOwner_CREATE_IMAGE               	null
auditweights_userOwner_CREATE_LIKE                	1.0
auditweights_userOwner_IMAGE                      	1.0
auditweights_userOwner_MOVIE_COMMENT_CREATE       	null
auditweights_userOwner_PHOTO_COMMENT_CREATE       	null
auditweights_userOwner_PHOTO_MARK_CREATE          	null
auditweights_userOwner_PHOTO_VIEW                 	null
auditweights_userOwner_TEXT                       	1.0
auditweights_userOwner_UNKNOWN                    	null
auditweights_userOwner_USER_DELETE_MESSAGE        	null
auditweights_userOwner_USER_FEED_REMOVE           	null
auditweights_userOwner_USER_FORUM_MESSAGE_CREATE  	null
auditweights_userOwner_USER_INTERNAL_LIKE         	null
auditweights_userOwner_USER_INTERNAL_UNLIKE       	null
auditweights_userOwner_USER_PRESENT_SEND          	null
auditweights_userOwner_USER_PROFILE_VIEW      

В соответствии с постановкой задачи, построим топ популярных групп для каждого типа обьекта. Для этого необходимо отфильтровать строки датафрема по атрибуту feedback, который содержит "Liked", и сделать агрегацию по ``instanceId_objectType`` и ``instanceId_objectId``.

In [6]:
val liked_data = data.select("instanceId_objectType", "instanceId_objectId").
    filter( array_contains(data("feedback"), "Liked") ).
    groupBy("instanceId_objectType", "instanceId_objectId").
    count().
    sort($"instanceId_objectType", $"count".desc)

[36mliked_data[39m: [32mDataset[39m[[32mRow[39m] = [instanceId_objectType: string, instanceId_objectId: int ... 1 more field]

Выведем несколько строк сформированного датафрейма.

In [7]:
liked_data.show()

+---------------------+-------------------+-----+
|instanceId_objectType|instanceId_objectId|count|
+---------------------+-------------------+-----+
|                Photo|            6537034|  215|
|                Photo|            7001118|  171|
|                Photo|            7474842|  152|
|                Photo|            6644424|  145|
|                Photo|            1790708|  134|
|                Photo|            6276929|  124|
|                Photo|            8153427|  100|
|                Photo|            2886014|   99|
|                Photo|            8550904|   82|
|                Photo|            7533932|   79|
|                Photo|            8414106|   75|
|                Photo|            5212228|   74|
|                Photo|            7343426|   73|
|                Photo|            5168713|   73|
|                Photo|            7030205|   70|
|                Photo|            7803023|   69|
|                Photo|            2184938|   69|


Выведем по 5 наиболее популярных объектов (которых "лайкнули" наибольшее число раз) для каждой категории.

In [8]:
val window  = Window.partitionBy("instanceId_objectType").orderBy($"count".desc)
val first_five_liked_data = liked_data.withColumn("row_number", row_number.over(window)).
                                filter($"row_number" <= 5).drop("row_number")

[36mwindow[39m: [32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@7ee5a604
[36mfirst_five_liked_data[39m: [32mDataFrame[39m = [instanceId_objectType: string, instanceId_objectId: int ... 1 more field]

In [9]:
first_five_liked_data.show()

+---------------------+-------------------+-----+
|instanceId_objectType|instanceId_objectId|count|
+---------------------+-------------------+-----+
|                Video|             535842| 1329|
|                Video|            1282812|  868|
|                Video|             603629|  822|
|                Video|            1041333|  633|
|                Video|            9458730|  569|
|                Photo|            6537034|  215|
|                Photo|            7001118|  171|
|                Photo|            7474842|  152|
|                Photo|            6644424|  145|
|                Photo|            1790708|  134|
|                 Post|           19152905|  689|
|                 Post|           11300713|  660|
|                 Post|           38567725|  568|
|                 Post|           31009524|  560|
|                 Post|           35514331|  544|
+---------------------+-------------------+-----+



Получим датафрейм в виде массива типов объектов со списками групп и числа "лайков".

In [10]:
val top_of_groups = first_five_liked_data.groupBy($"instanceId_objectType").
    agg(collect_list($"instanceId_objectId").alias("instanceId_objectId"),
        collect_list($"count").alias("count")).
    collect()

[36mtop_of_groups[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [Video,WrappedArray(535842, 1282812, 603629, 1041333, 9458730),WrappedArray(1329, 868, 822, 633, 569)],
  [Photo,WrappedArray(6537034, 7001118, 7474842, 6644424, 1790708),WrappedArray(215, 171, 152, 145, 134)],
  [Post,WrappedArray(19152905, 11300713, 38567725, 31009524, 35514331),WrappedArray(689, 660, 568, 560, 544)]
)

Реализуем функцию визуализации.

In [11]:
def plot_top_objects(object_type: String, 
                    groups: List[String], 
                    counts: List[Long]): Unit = {
    
    val data = Seq(
        Bar(
            groups.toSeq, counts.toSeq
            )
        )
    
    val plot_layout = Layout(
        title = s"${object_type}"
        )
    
    plot(data, plot_layout)
}

defined [32mfunction[39m [36mplot_top_objects[39m

In [12]:
top_of_groups.foreach( x => {
    val groups = x(1).asInstanceOf[WrappedArray[Integer]].map(x => "G-" + x.toString).toList
    val counts = x(2).asInstanceOf[WrappedArray[Long]].toList
    plot_top_objects("Top groups distribution for " + x(0).toString, groups, counts)
})

Теперь построим гистограммы популярности объекта по времени суток. Так же, как и в предыдущем задании, критерием популярности служит количество "лайков". Выберем три наиболее популярных объекта.

In [13]:
val liked_objects = data.select("instanceId_objectId").
    filter( array_contains(data("feedback"), "Liked") ).
    groupBy("instanceId_objectId").
    count().
    sort($"count".desc).take(3)

[36mliked_objects[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m([535842,1329], [1282812,868], [603629,822])

Выведем эти популярные объекты в виде списка.

In [14]:
val most_liked_groups = liked_objects.map(x => x(0).asInstanceOf[Int]).toList

[36mmost_liked_groups[39m: [32mList[39m[[32mInt[39m] = [33mList[39m([32m535842[39m, [32m1282812[39m, [32m603629[39m)

Теперь отфильтруем все строки изначального датафрейма по объектам, которые были определы ранее, определим час из поля ``audit_timestamp`` и посчитаем количество "лайков", которые были поставлены объекту в пределах конкретного часа.

In [15]:
val popularity_of_objects = data.select("instanceId_objectId", "audit_timestamp").
                                 filter($"instanceId_objectId".isin(most_liked_groups: _*)).
                                 withColumn("hour", hour(to_timestamp(col("audit_timestamp")))).
                                 drop("audit_timestamp").
                                 groupBy("instanceId_objectId", "hour").
                                 count().
                                 sort($"instanceId_objectId".asc, $"hour".asc).
                                 groupBy("instanceId_objectId").
                                 agg(collect_list($"hour").alias("hour"),
                                     collect_list($"count").alias("count")
                                 ).
                                 collect()

[36mpopularity_of_objects[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [603629,WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23),WrappedArray(193, 231, 204, 197, 213, 210, 212, 193, 200, 208, 221, 208, 216, 203, 199, 213, 193, 207, 189, 225, 213, 195, 189, 214)],
  [1282812,WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23),WrappedArray(220, 207, 219, 205, 220, 214, 235, 221, 214, 201, 221, 212, 204, 210, 201, 203, 162, 221, 197, 204, 212, 218, 206, 204)],
  [535842,WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23),WrappedArray(285, 342, 307, 286, 348, 294, 281, 306, 304, 303, 312, 311, 327, 319, 290, 324, 328, 343, 329, 302, 309, 316, 330, 305)]
)

Визуализируем полученные результаты.

In [16]:
popularity_of_objects.foreach( x => {
    val groups = x(1).asInstanceOf[WrappedArray[Integer]].map(x => "H" + x.toString).toList
    val counts = x(2).asInstanceOf[WrappedArray[Long]].toList
    plot_top_objects("Popularity time distribution for object " + x(0).toString, groups, counts)
})

Аналогичным образом построим гистограммы активности пользователей по времени суток. Алгоритм решения такой же, как и для предыдущего примера, кроме того факта, что активностью пользователя можно считать не только простановку "лайка", но и любое действие с объектом. Поэтому, фильтрацию по конкретным значениям атрибута ``feedback`` делать не нужно.

In [17]:
val active_users = data.select("instanceId_userId").
    groupBy("instanceId_userId").
    count().
    sort($"count".desc).
    take(3)

[36mactive_users[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [6579093,1742],
  [7543269,1607],
  [14777547,1541]
)

In [18]:
val most_active_users = active_users.map(x => x(0).asInstanceOf[Int])

[36mmost_active_users[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m([32m6579093[39m, [32m7543269[39m, [32m14777547[39m)

In [19]:
val activity_of_users = data.select("instanceId_userId", "audit_timestamp").
                             filter($"instanceId_userId".isin(most_active_users: _*)).
                             withColumn("hour", hour(to_timestamp(col("audit_timestamp")))).
                             drop("audit_timestamp").
                             groupBy("instanceId_userId", "hour").
                             count().
                             sort($"instanceId_userId".asc, $"hour".asc).
                             groupBy("instanceId_userId").
                             agg(collect_list($"hour").alias("hour"),
                                 collect_list($"count").alias("count")
                             ).
                             collect()

[36mactivity_of_users[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [14777547,WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23),WrappedArray(53, 59, 73, 40, 57, 79, 63, 73, 75, 43, 47, 77, 61, 66, 73, 71, 68, 97, 53, 55, 69, 70, 56, 63)],
  [6579093,WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23),WrappedArray(51, 78, 73, 76, 39, 88, 76, 73, 77, 65, 66, 63, 114, 76, 75, 72, 62, 81, 60, 84, 64, 73, 63, 93)],
  [7543269,WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23),WrappedArray(72, 69, 66, 72, 69, 59, 65, 70, 60, 75, 84, 69, 38, 69, 78, 78, 67, 55, 81, 53, 68, 81, 53, 56)]
)

In [20]:
activity_of_users.foreach( x => {
    val groups = x(1).asInstanceOf[WrappedArray[Integer]].map(x => "H" + x.toString).toList
    val counts = x(2).asInstanceOf[WrappedArray[Long]].toList
    plot_top_objects("Activities time distribution for user " + x(0).toString, groups, counts)
})

Интересные графики получились. Такое чувство, что наиболее активные пользователи сидят в "Одноклассниках" круглосуточно...

Теперь определим корреляцию признаков с целевой переменной. Как сказано в описании датасета, в соревновании требовалось построить для каждого пользователя список объектов, удовлетворяющий определенным условиям. Поэтому, логично предположить, что целевой переменной в нашем случае является ``instanceId_userId``. Данный признак является категориальным, что ограничивает множество методов, анализирующих корреляции между признаками. В работе принято решение выбрать несколько других категориальных признаков и проанализировать их корреляцию с целевой переменной способом $\chi ^ 2$-test.

Данный метод подразумевает расчет корреляции с использованием сводной таблицы всех значений пары категориальных признаков, участвующих в исследовании. Так как Spark ограничивает размер этой таблицы, а число различных значений целевой переменной довольно большое, то выберем 100 наиболее активных пользователей и построим статистику только для них.

In [21]:
val active_users = data.select("instanceId_userId").
                   groupBy("instanceId_userId").count().sort($"count".desc).take(100).
                   map(x => x(0).asInstanceOf[Int])

val clipped_users = data.select("instanceId_userId", "instanceId_objectType", "user_gender", "metadata_ownerId").
                             filter($"instanceId_userId".isin(active_users: _*))

[36mactive_users[39m: [32mArray[39m[[32mInt[39m] = [33mArray[39m(
  [32m6579093[39m,
  [32m7543269[39m,
  [32m14777547[39m,
  [32m15323820[39m,
  [32m14649588[39m,
  [32m11452815[39m,
  [32m7263396[39m,
  [32m15112584[39m,
  [32m14767260[39m,
  [32m980694[39m,
  [32m1037205[39m,
  [32m10637391[39m,
  [32m9720798[39m,
  [32m12894429[39m,
  [32m13616343[39m,
  [32m15087672[39m,
  [32m15188028[39m,
  [32m9437079[39m,
  [32m12275619[39m,
  [32m10482393[39m,
  [32m15362751[39m,
  [32m10875126[39m,
  [32m226227[39m,
  [32m2129604[39m,
  [32m8760873[39m,
  [32m8604420[39m,
  [32m5469291[39m,
  [32m10077018[39m,
  [32m7831497[39m,
  [32m13091136[39m,
  [32m6966822[39m,
  [32m7374717[39m,
  [32m3219927[39m,
  [32m8880450[39m,
  [32m8618208[39m,
  [32m9768774[39m,
  [32m13496352[39m,
  [32m15347832[39m,
...
[36mclipped_users[39m: [32mDataset[39m[[32mRow[39m] = [instanceId_userId: int, instanceId_objectType:

In [22]:
clipped_users.show()

+-----------------+---------------------+-----------+----------------+
|instanceId_userId|instanceId_objectType|user_gender|metadata_ownerId|
+-----------------+---------------------+-----------+----------------+
|           471426|                 Post|          1|           49570|
|           471426|                 Post|          1|           57851|
|           471426|                 Post|          1|           11193|
|           471426|                 Post|          1|            4658|
|           471426|                 Post|          1|           79089|
|           471426|                 Post|          1|           79338|
|           471426|                 Post|          1|           25197|
|           471426|                Photo|          1|           22411|
|           471426|                 Post|          1|           11193|
|           471426|                 Post|          1|           37463|
|           471426|                 Post|          1|           59867|
|     

Так как метод $\chi ^2$-тест может работать только с числовыми признаками, проиндексируем строковые значения признака ``instanceId_objectType``. В качестве тестовых данных добавим "фиктивный" признак, все значения которого равны 1. Затем, соберем множество исследуемых признаков в MLLib-вектор для последующего анализа методом ``ChiSquareTest``.

In [23]:
import org.apache.spark.ml.stat.ChiSquareTest
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("instanceId_objectType")
  .setOutputCol("instanceId_objectType_index")

val indexed = indexer.fit(
    clipped_users.select("instanceId_objectType", "instanceId_userId", "user_gender", "metadata_ownerId")
).transform(
    clipped_users.select("instanceId_objectType", "instanceId_userId", "user_gender", "metadata_ownerId")
).withColumn("newcol",lit(1))

val assembler = new VectorAssembler()
  .setInputCols(Array("instanceId_objectType_index", "user_gender", "instanceId_userId", "newcol", "metadata_ownerId"))
  .setOutputCol("features")

import org.apache.spark.sql.functions.lit
val output = assembler.transform(indexed)
output.show()

+---------------------+-----------------+-----------+----------------+---------------------------+------+--------------------+
|instanceId_objectType|instanceId_userId|user_gender|metadata_ownerId|instanceId_objectType_index|newcol|            features|
+---------------------+-----------------+-----------+----------------+---------------------------+------+--------------------+
|                 Post|           471426|          1|           49570|                        0.0|     1|[0.0,1.0,471426.0...|
|                 Post|           471426|          1|           57851|                        0.0|     1|[0.0,1.0,471426.0...|
|                 Post|           471426|          1|           11193|                        0.0|     1|[0.0,1.0,471426.0...|
|                 Post|           471426|          1|            4658|                        0.0|     1|[0.0,1.0,471426.0...|
|                 Post|           471426|          1|           79089|                        0.0|     1|[0.0,1

[32mimport [39m[36morg.apache.spark.ml.stat.ChiSquareTest
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.VectorAssembler
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.StringIndexer

[39m
[36mindexer[39m: [32mStringIndexer[39m = strIdx_2aa5a62e6207
[36mindexed[39m: [32mDataFrame[39m = [instanceId_objectType: string, instanceId_userId: int ... 4 more fields]
[36massembler[39m: [32mVectorAssembler[39m = vecAssembler_07c9b7af82eb
[32mimport [39m[36morg.apache.spark.sql.functions.lit
[39m
[36moutput[39m: [32mDataFrame[39m = [instanceId_objectType: string, instanceId_userId: int ... 5 more fields]

Выполним анализ корреляции выбранных признаков с целевой переменной, используя $\chi ^2$-test.

In [24]:
ChiSquareTest.test(output, "features", "instanceId_userId").collect()

[36mres23[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [[0.0,0.0,0.0,1.0,0.0],WrappedArray(198, 99, 9801, 0, 625383),[9533.252577344,75081.00000000001,7433018.999999998,0.0,4433820.655121116]]
)

Видно, что p-value только для "фиктивного" признака равен единице, что свидетельствует о том, что он не коррелирует с целевой переменной. Для всех остальных признаков ситуация обратная.

Вызывают беспокойство значения p-value, которые получились либо нулевыми, либо единичными. Попробуем оценить скоррелированность признаков ``instanceId_objectType`` и ``instanceId_userId``, реализовав расчет $\chi ^2$-test без использования MLLib и сравнить результаты. Для этого составим сводную таблицу всех значений обоих признаков и по ней рассчитаем статистику.

In [25]:
val df_cross = output.select("instanceId_objectType", "instanceId_userId").
    stat.crosstab("instanceId_userId", "instanceId_objectType")

[36mdf_cross[39m: [32mDataFrame[39m = [instanceId_userId_instanceId_objectType: string, Photo: bigint ... 2 more fields]

In [26]:
df_cross.show()

+---------------------------------------+-----+----+-----+
|instanceId_userId_instanceId_objectType|Photo|Post|Video|
+---------------------------------------+-----+----+-----+
|                                 957861|   50| 616|   34|
|                               14767260|  172| 862|   90|
|                               10875126|  220| 576|   26|
|                               12145905|   28| 585|   86|
|                               13839342|   51| 647|   26|
|                                3219927|   39| 625|  100|
|                                9437079|   34| 787|   26|
|                                4409550|  121| 494|   18|
|                                9482301|   81| 534|   18|
|                                8715546|   45| 604|   21|
|                                9720723|  126| 472|   60|
|                                 226227|   53| 720|   42|
|                               13426485|   23| 554|   24|
|                                2129604|   94| 698|   2

Сформируем несколько новых столбцов датафрейма со значениями сумм по строкам и столбцам для расчета [статистики](https://en.wikipedia.org/wiki/Chi-squared_test).

In [27]:
val df_chi = df_cross.withColumn("total_Photo",sum('Photo).over()).
    withColumn("total_Post",sum('Post).over()).
    withColumn("total_Video",sum('Video).over()).
    withColumn("sum_row", $"Photo" + $"Post" + $"Video").
    drop("instanceId_userId_instanceId_objectType").
    withColumn("total",sum('sum_row).over()).
    withColumn("Photo_un", $"total_Photo" * $"sum_row" / $"total").
    withColumn("Post_un", $"total_Post" * $"sum_row" / $"total").
    withColumn("Video_un", $"total_Video" * $"sum_row" / $"total").
    withColumn("chi_Photo", ($"Photo" - $"Photo_un")*($"Photo" - $"Photo_un") / $"Photo_un").
    withColumn("chi_Post", ($"Post" - $"Post_un")*($"Post" - $"Post_un") / $"Post_un").
    withColumn("chi_Video", ($"Video" - $"Video_un")*($"Video" - $"Video_un") / $"Video_un")

[36mdf_chi[39m: [32mDataFrame[39m = [Photo: bigint, Post: bigint ... 12 more fields]

In [28]:
df_chi.select(sum('chi_Photo) + sum('chi_Post) + sum('chi_Video), count('chi_Photo)).
show()

+---------------------------------------------------+----------------+
|((sum(chi_Photo) + sum(chi_Post)) + sum(chi_Video))|count(chi_Photo)|
+---------------------------------------------------+----------------+
|                                  9533.252577343997|             100|
+---------------------------------------------------+----------------+



Получаем, что для количества степеней свободы (100 - 1) * (3 - 1) = 198 значение статистики $\chi ^2$ равно 9533. С учетом того, что критическое значение статистики при таком количестве степеней свободы для p = 0.05 [равно 1.653](https://www.socscistatistics.com/tests/criticalvalues/default.aspx), получаем, что согласно $\chi ^2$-тесту, рассматриваемый признак и целевая переменная действительно коррелируют. Столь экстремально большое значение статистики объясняет нулевое значение p-value в вышерассмотренном расчете.

Рассмотрим теперь распределение активностей пользователей по возрасту, полу и стране. Определим, какие значения принимает переменная, отвечающая за пол пользователя.

In [29]:
data.select("auditweights_userAge", "user_gender").groupBy("user_gender").count().show()

+-----------+--------+
|user_gender|   count|
+-----------+--------+
|       null|    1376|
|          1| 5033698|
|          2|13251484|
|          0|      17|
+-----------+--------+



Ну, 3 значения пола... Европейские ценности и все такое...

В дальнейшем исследовании будем учитывать только Gender1 и Gender2, как самые многочисленные.

Посмотрим на статистику возраста пользователей обоих полов.

In [30]:
data.select("auditweights_userAge", "user_gender").filter($"user_gender" === 1).
    stat.approxQuantile("auditweights_userAge", Array(0.25, 0.5, 0.75), 0.01)

[36mres29[39m: [32mArray[39m[[32mDouble[39m] = [33mArray[39m([32m35.0[39m, [32m44.0[39m, [32m56.0[39m)

In [31]:
data.select("auditweights_userAge", "user_gender").filter($"user_gender" === 2).
    stat.approxQuantile("auditweights_userAge", Array(0.25, 0.5, 0.75), 0.01)

[36mres30[39m: [32mArray[39m[[32mDouble[39m] = [33mArray[39m([32m37.0[39m, [32m47.0[39m, [32m58.0[39m)

Можно заметить, что Gender2 представлен чуть более возрастными представителями. Построим теперь гистограмму авктивности пользователей обоих полов по странам.

In [32]:
data.select("user_ID_country", "user_gender").filter($"user_gender".isNotNull).
   groupBy("user_ID_country", "user_gender").count().
   sort($"user_ID_country".asc, $"user_gender".asc).show()

+---------------+-----------+-----+
|user_ID_country|user_gender|count|
+---------------+-----------+-----+
|    10392716690|          1|  281|
|    10392716690|          2| 2196|
|    10392808561|          1|   44|
|    10392808561|          2|  129|
|    10392890835|          1|  212|
|    10392890835|          2|  381|
|    10392982119|          1|   61|
|    10392982119|          2|  541|
|    10393232409|          1|  906|
|    10393232409|          2| 1700|
|    10393424473|          1|  214|
|    10393424473|          2|  259|
|    10393450888|          1|   37|
|    10393450888|          2|   46|
|    10393621238|          1| 2151|
|    10393621238|          2| 5627|
|    10393631600|          1| 1062|
|    10393631600|          2| 2779|
|    10394079228|          1|   18|
|    10394079228|          2|  180|
+---------------+-----------+-----+
only showing top 20 rows



In [33]:
val genders_with_countries = data.select("user_ID_country", "user_gender").
    filter($"user_gender".isNotNull).
    groupBy("user_ID_country", "user_gender").count().
    sort($"user_ID_country".asc, $"user_gender".asc).
    groupBy("user_ID_country").
    agg(collect_list($"user_gender").alias("genders"), collect_list($"count").alias("counts")).
    take(40)

[36mgenders_with_countries[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m(
  [10417127108,WrappedArray(1, 2),WrappedArray(245, 80)],
  [10424065948,WrappedArray(1, 2),WrappedArray(113, 208)],
  [10393621238,WrappedArray(1, 2),WrappedArray(2151, 5627)],
  [10399224101,WrappedArray(1, 2),WrappedArray(11, 19)],
  [10425274320,WrappedArray(1, 2),WrappedArray(78, 233)],
  [10401935333,WrappedArray(1, 2),WrappedArray(726, 697)],
  [10408209383,WrappedArray(1, 2),WrappedArray(142, 104)],
  [10395467357,WrappedArray(1, 2),WrappedArray(40, 146)],
  [26334910464,WrappedArray(1, 2),WrappedArray(2927, 7080)],
  [10418754394,WrappedArray(1, 2),WrappedArray(42, 82)],
  [10405794288,WrappedArray(1, 2),WrappedArray(130, 116)],
  [10394079228,WrappedArray(1, 2),WrappedArray(18, 180)],
  [10394605145,WrappedArray(1, 2),WrappedArray(120, 74)],
  [10423428619,WrappedArray(1, 2),WrappedArray(53, 63)],
  [10425905274,WrappedArray(1, 2),WrappedArray(105, 114)],
  [10410580805,WrappedArray(1, 2),Wrap

Визуализируем полученные данные.

In [34]:
val trace_gender1 = Bar(
    genders_with_countries.map(x => "C-" + x(0).asInstanceOf[Long].toString).toSeq,
    genders_with_countries.map(x => x(2).asInstanceOf[ WrappedArray[Long]](0)).toSeq,
    name = "Gender 1"
)
    
val trace_gender2 = Bar(
    genders_with_countries.map(x => "C-" + x(0).asInstanceOf[Long].toString).toSeq,
    genders_with_countries.map(x => x(2).asInstanceOf[ WrappedArray[Long]](1)).toSeq,
    name = "Gender 2"
)

val plot_data = Seq(trace_gender1, trace_gender2)

val plot_layout = Layout(
    barmode = BarMode.Group
)

    plot(plot_data, plot_layout)

[36mtrace_gender1[39m: [32mBar[39m = [33mBar[39m(
  [33mStrings[39m(
    [33mWrappedArray[39m(
      [32m"C-10417127108"[39m,
      [32m"C-10424065948"[39m,
      [32m"C-10393621238"[39m,
      [32m"C-10399224101"[39m,
      [32m"C-10425274320"[39m,
      [32m"C-10401935333"[39m,
      [32m"C-10408209383"[39m,
      [32m"C-10395467357"[39m,
      [32m"C-26334910464"[39m,
      [32m"C-10418754394"[39m,
      [32m"C-10405794288"[39m,
      [32m"C-10394079228"[39m,
      [32m"C-10394605145"[39m,
      [32m"C-10423428619"[39m,
      [32m"C-10425905274"[39m,
      [32m"C-10410580805"[39m,
      [32m"C-10414930207"[39m,
      [32m"C-10402425385"[39m,
      [32m"C-10405620585"[39m,
      [32m"C-10399865285"[39m,
      [32m"C-10397453891"[39m,
      [32m"C-10404374905"[39m,
      [32m"C-10404540364"[39m,
      [32m"C-10408281200"[39m,
      [32m"C-10418580649"[39m,
      [32m"C-10424568667"[39m,
      [32m"C-10399201022"[39m,
    

Из диаграммы видно, что для стран с наиболее активными пользователями доля Gender2 значительно выше доли Gender1.

#### Выводы:

В процессе выполнения работы был получен датасет, содержащий логи показов контента из открытых групп в новостных лентах социальной сети "Одноклассники" пользователей за февраль-март 2018 года. Данный датасет был загружен в систему Apache Spark и проанализирован с его помощью. Были построены гистограммы самых популярных групп для каждого типа обьекта, популярности объектов и активности пользователей по времени суток. Была рассчитана корреляция между целевой переменной и некоторыми выбранными признаками с помощью $\chi ^2$-теста. Анализ показал сильную корреляцию признаков. Корректность расчета была проверена на одной паре признаков с помощью расчета статистики $\chi ^2$ без использования MLLib. Дополнительно были проведены исследования распределения пользователей по полу, возрасту и стране проживания, отмечены особенности полученных результатов.

В результате работы были получены навыки работы с DataFrame API Apache Spark на языке Scala, изучены основные методы работы с датафреймами.