## Пример на pyspark

В качестве набора данных для примера будем использовать данные конкурса про ответы студентов на тесты
https://www.kaggle.com/c/riiid-test-answer-prediction

При подключении к spark драйверу установим лимиты по памяти и по числу ядер. Также выберем номер порта для Spark UI

Нужно выбрать уникальное имя приложения и номер порта, чтобы не войти в коллизию с другими пользователями

In [1]:
%pylab inline
import pandas as pd
import numpy as np

Populating the interactive namespace from numpy and matplotlib


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("OTUS-PySpark-Notebook")
        .config("spark.dynamicAllocation.enabled", "true")
        .config("spark.executor.memory", "2g")
        .config("spark.driver.memory", "1g")
        .getOrCreate()
)

Данные будем читать из заранее сконвертированного parquet

In [4]:
df = spark.read.parquet("data/train.parquet")

Схема данных и первые 10 записей

In [5]:
df.printSchema()

root
 |-- row_id: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- content_id: integer (nullable = true)
 |-- content_type_id: integer (nullable = true)
 |-- task_container_id: integer (nullable = true)
 |-- user_answer: integer (nullable = true)
 |-- answered_correctly: integer (nullable = true)
 |-- prior_question_elapsed_time: double (nullable = true)
 |-- prior_question_had_explanation: boolean (nullable = true)



In [6]:
df.show(10)

+-------+-----------+--------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
| row_id|  timestamp| user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|
+-------+-----------+--------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+
|1590966|30069556424|33808123|     10345|              0|             2253|          0|                 1|                    20750.0|                          true|
|1875768|35651077247|39733270|     10153|              0|             2405|          0|                 1|                    25250.0|                          true|
|1614269| 1034449587|34226097|      3401|              0|              171|          1|                 1|                    31666.0|                          true|
|128

In [7]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter
df

row_id,timestamp,user_id,content_id,content_type_id,task_container_id,user_answer,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation
1590966,30069556424,33808123,10345,0,2253,0,1,20750.0,True
1875768,35651077247,39733270,10153,0,2405,0,1,25250.0,True
1614269,1034449587,34226097,3401,0,171,1,1,31666.0,True
1282661,4128858947,27082418,6869,0,161,0,0,38000.0,True
1249333,1901589697,25861499,6381,0,315,0,1,26000.0,True
109916,17416341046,2393889,6028,0,359,1,1,64000.0,True
950499,3069106121,19629488,3648,0,1167,3,0,39000.0,True
986519,190732026,20693956,6791,0,127,1,1,26000.0,True
2129328,24775844512,44572629,239,0,744,3,0,14000.0,True
1821608,27474183045,38513219,1246,0,1188,3,1,15000.0,True


Замерим время выполнения простых запросов с группировками

In [8]:
from pyspark.sql import functions as f
from pyspark.sql.functions import col

In [9]:
%%time
(
    df
        .select('content_id', 'answered_correctly')
        .groupBy('content_id')
        .mean('answered_correctly')
        .show()
)

+----------+-----------------------+
|content_id|avg(answered_correctly)|
+----------+-----------------------+
|      1342|     0.6960430741098187|
|      8638|     0.4459175781681582|
|      8389|     0.9217750257997936|
|      1238|      0.597189012655275|
|      4519|     0.4880797853928695|
|      9376|     0.8307933662034962|
|     10623|     0.9437243642329779|
|      3918|     0.5936385255648038|
|       471|     0.6837108953613807|
|     23336|                   -1.0|
|       148|     0.8297672832665587|
|      6658|      0.662733319030251|
|      3749|     0.6274719401389631|
|      6620|     0.7847812401636765|
|      7554|     0.8941355674028941|
|      2122|     0.5887302396736359|
|      4818|     0.4098718947459534|
|       496|     0.7542558870632896|
|      1829|     0.7589958158995815|
|     10817|     0.7436510307738273|
+----------+-----------------------+
only showing top 20 rows

CPU times: user 2.43 ms, sys: 4.31 ms, total: 6.74 ms
Wall time: 5.95 s


In [10]:
%%time
(
    df
        .select('user_id', 'answered_correctly')
        .where(col('answered_correctly') != -1)
        .groupby('user_id')
        .mean('answered_correctly')
        .show()
)

+---------+-----------------------+
|  user_id|avg(answered_correctly)|
+---------+-----------------------+
|238610111|     0.5106382978723404|
|224426519|     0.7173333333333334|
|253500385|     0.5285296981499513|
|242039738|    0.46367041198501874|
|252345392|       0.56480117820324|
|240485154|     0.5521978021978022|
|233395975|     0.5333333333333333|
|254408119|     0.7657534246575343|
|257509511|    0.49019607843137253|
|245302461|     0.6613756613756614|
|259875659|     0.6183574879227053|
|243302977|     0.6267942583732058|
|256909067|    0.29411764705882354|
|220663840|     0.7661870503597122|
|250727264|     0.5666666666666667|
|240750690|     0.6890756302521008|
|219176829|     0.5384615384615384|
|254957588|     0.2702702702702703|
|244175802|     0.7359550561797753|
|252861630|               0.546875|
+---------+-----------------------+
only showing top 20 rows

CPU times: user 4.08 ms, sys: 4.46 ms, total: 8.54 ms
Wall time: 16.9 s


## Упражнение 1
Выведите top 10 студентов с наилучшими результатами. 
Обратите внимание, что поле answered_correctly равно -1, если это была лекция, а не тест. Такие записи нужно исключить.

## Упражнение 2
Выведите top 10 задач с наихудшими результатами

## pyspark user defined functions (UDF)

Как и для других языков, поддерживаемых Spark, для python есть возможность использовать UDF. При этом возникают дополнительные накладные расходы по сравнению с Java и Scala.

In [11]:
from pyspark.sql.types import LongType

def to_months(ms):
    return ms // 31536000000 // 12 #1 year = 31536000000 ms

to_months_udf = f.udf(to_months, LongType())

Замерим время выполнения без UDF

In [12]:
%%time
(
    df
        .select("content_id", "timestamp")
        .groupby("content_id")
        .mean("timestamp")
        .show()
)

+----------+--------------------+
|content_id|      avg(timestamp)|
+----------+--------------------+
|     10362|1.091651063851827...|
|      2366| 9.180255834014534E9|
|      1580| 8.499988982626443E9|
|      1342| 8.307796525376441E9|
|      3997|1.016755047805518...|
|      4818| 6.142197065523717E9|
|      8592| 8.025769332450499E9|
|      1238| 7.261110172361754E9|
|      1645| 9.994734853553385E9|
|      3749| 6.928784635868185E9|
|       833|  8.54668654262351E9|
|     23336| 7.288716371091926E9|
|      9465| 9.754291493026693E9|
|      5156| 6.836066048756999E9|
|      4519| 5.839083879095016E9|
|      7880| 6.134804397191137E9|
|       496| 7.627430069740495E9|
|      1959|1.010759745561863...|
|      1591| 7.811043362183572E9|
|      9900| 8.861819655241117E9|
+----------+--------------------+
only showing top 20 rows

CPU times: user 6.59 ms, sys: 0 ns, total: 6.59 ms
Wall time: 10.9 s


Применим простой UDF к похожему запросу

In [13]:
%%time
(
    df
        .select("content_id", to_months_udf("timestamp").alias("months"))
        .groupBy("content_id")
        .mean("months")
        .show()
)

+----------+-----------+
|content_id|avg(months)|
+----------+-----------+
|      3749|        0.0|
|      9852|        0.0|
|       148|        0.0|
|      7833|        0.0|
|      8389|        0.0|
|      4519|        0.0|
|      7880|        0.0|
|      4818|        0.0|
|     11858|        0.0|
|      6466|        0.0|
|     10623|        0.0|
|      6620|        0.0|
|      5156|        0.0|
|      3794|        0.0|
|     13289|        0.0|
|      1238|        0.0|
|     10206|        0.0|
|      4935|        0.0|
|       496|        0.0|
|      8086|        0.0|
+----------+-----------+
only showing top 20 rows

CPU times: user 21.4 ms, sys: 212 µs, total: 21.6 ms
Wall time: 1min 21s


Перепишем логику, которая была в UDF

In [14]:
%%time
(
    df
        .select("content_id", (col("timestamp") / 31536000000 / 12).alias("months"))
        .groupby("content_id")
        .mean("months")
        .show()
)

+----------+--------------------+
|content_id|         avg(months)|
+----------+--------------------+
|      1342| 0.02195320830526077|
|      8638|0.026568053084666098|
|      8389|0.023038468581201264|
|      1238| 0.01918735776139902|
|      4519|0.015429677931821345|
|      9376|0.023047010695516096|
|     10623| 0.02441428905667462|
|      3918|0.020248593705688527|
|       471|0.026115179467367237|
|     23336|0.019260306662998702|
|       148|0.017910791660268707|
|      6658|0.025049151627509217|
|      3749|0.018309193291973693|
|      6620|0.022097287253877614|
|      7554|0.030580929715970585|
|      2122| 0.02605702050124899|
|      4818|  0.0162306492725872|
|       496|0.020155351740181838|
|      1829|0.024396345388877146|
|     10817|0.035670606454313726|
+----------+--------------------+
only showing top 20 rows

CPU times: user 0 ns, sys: 9.87 ms, total: 9.87 ms
Wall time: 10.5 s


## Упражнение 3
Постройте гистограмму по числу месяцев до первого взаимодействия студента с заданием

In [20]:
%%time
df_pandas = (
    df
        .select("content_id", (col("timestamp") / 31536000000 / 12).alias("months"))
        .groupby("content_id")
        .mean("months")
        .toPandas()
)

CPU times: user 52.3 ms, sys: 405 µs, total: 52.8 ms
Wall time: 7.22 s


In [21]:
df_pandas.head()

Unnamed: 0,content_id,avg(months)
0,10623,0.024414
1,1829,0.024396
2,4101,0.02599
3,4818,0.016231
4,1238,0.019187


## Обогащение данных

Таблица с вопросами лежит в отдельном файле questions.csv. 

In [15]:
questions = spark.read.csv("data/questions.csv", header=True, inferSchema=True)

In [16]:
questions.count()

13523

In [24]:
questions.printSchema()

root
 |-- question_id: integer (nullable = true)
 |-- bundle_id: integer (nullable = true)
 |-- correct_answer: integer (nullable = true)
 |-- part: integer (nullable = true)
 |-- tags: string (nullable = true)



Объединим ее с ответами при условии, что эта запись ссылкается на вопрос если content_type_id и content_id - идентификатор вопроса.

In [17]:
df_join = (
    df
        .where(f.col("content_type_id") == 0)
        .join(questions, df.content_id == questions.question_id, 'left')
)

In [18]:
df_join

row_id,timestamp,user_id,content_id,content_type_id,task_container_id,user_answer,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation,question_id,bundle_id,correct_answer,part,tags
1590966,30069556424,33808123,10345,0,2253,0,1,20750.0,True,10345,10344,0,6,27
1875768,35651077247,39733270,10153,0,2405,0,1,25250.0,True,10153,10152,0,6,14
1614269,1034449587,34226097,3401,0,171,1,1,31666.0,True,3401,3399,1,4,136 144 81
1282661,4128858947,27082418,6869,0,161,0,0,38000.0,True,6869,6869,1,6,8 162
1249333,1901589697,25861499,6381,0,315,0,1,26000.0,True,6381,6381,0,5,4
109916,17416341046,2393889,6028,0,359,1,1,64000.0,True,6028,6028,1,5,72
950499,3069106121,19629488,3648,0,1167,3,0,39000.0,True,3648,3648,1,5,55
986519,190732026,20693956,6791,0,127,1,1,26000.0,True,6791,6789,1,6,27 162
2129328,24775844512,44572629,239,0,744,3,0,14000.0,True,239,239,0,2,155 163 38 102
1821608,27474183045,38513219,1246,0,1188,3,1,15000.0,True,1246,1246,3,2,2 32 92 29


In [19]:
df_join.count()

99271300

Проверим, что вероястность правильного ответа не зависит от его номера.

In [20]:
%%time
(
    df_join
        .select('correct_answer', 'answered_correctly')
        .groupby('correct_answer')
        .mean('answered_correctly')
        .show()
)

+--------------+-----------------------+
|correct_answer|avg(answered_correctly)|
+--------------+-----------------------+
|             1|     0.6728072675886204|
|             3|     0.6592246990371898|
|             2|     0.6192199974012494|
|             0|     0.6674206042887552|
+--------------+-----------------------+

CPU times: user 1.84 ms, sys: 3.98 ms, total: 5.82 ms
Wall time: 9.39 s


In [21]:
df_join.toPandas().head()

Py4JJavaError: An error occurred while calling o185.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 37.0 failed 4 times, most recent failure: Lost task 5.3 in stage 37.0 (TID 361, rc1a-dataproc-d-3rn07j4v9i4tt8sr.mdb.yandexcloud.net, executor 33): ExecutorLostFailure (executor 33 exited caused by one of the running tasks) Reason: Container from a bad node: container_1730310288524_0002_01_000050 on host: rc1a-dataproc-d-3rn07j4v9i4tt8sr.mdb.yandexcloud.net. Exit status: 137. Diagnostics: [2024-10-30 18:41:59.680]Container killed on request. Exit code is 137
[2024-10-30 18:41:59.681]Container exited with a non-zero exit code 137. 
[2024-10-30 18:41:59.681]Killed by external signal
.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3450)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3447)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)


## Упражнение 4

В файле "lectures.csv" хранится информация об лекциях. Объедините эту таблицу с основным набором данных при условии, что content_type_id == 1.