# Spark

Apache Spark — это мощная и гибкая система для обработки больших объемов данных, предлагающая широкий спектр возможностей для аналитики и машинного обучения. 

In [1]:
import os
import socket
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col, desc, rank, row_number

In [2]:
print('user:', os.environ['JUPYTERHUB_SERVICE_PREFIX'])

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

conf = SparkConf().set('spark.master', 'local[*]').set('spark.driver.memory', '4g')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark

user: /user/mishanyacorleone/


In [3]:
sdf = spark.read.csv(
    'datasets/aggrigation_logs_per_week.csv', 
    sep=',', 
    header=True
)
sdf.printSchema()

root
 |-- courseid: string (nullable = true)
 |-- userid: string (nullable = true)
 |-- num_week: string (nullable = true)
 |-- s_all: string (nullable = true)
 |-- s_all_avg: string (nullable = true)
 |-- s_course_viewed: string (nullable = true)
 |-- s_course_viewed_avg: string (nullable = true)
 |-- s_q_attempt_viewed: string (nullable = true)
 |-- s_q_attempt_viewed_avg: string (nullable = true)
 |-- s_a_course_module_viewed: string (nullable = true)
 |-- s_a_course_module_viewed_avg: string (nullable = true)
 |-- s_a_submission_status_viewed: string (nullable = true)
 |-- s_a_submission_status_viewed_avg: string (nullable = true)
 |-- NameR_Level: string (nullable = true)
 |-- Name_vAtt: string (nullable = true)
 |-- Depart: string (nullable = true)
 |-- Name_OsnO: string (nullable = true)
 |-- Name_FormOPril: string (nullable = true)
 |-- LevelEd: string (nullable = true)
 |-- Num_Sem: string (nullable = true)
 |-- Kurs: string (nullable = true)
 |-- Date_vAtt: string (nullable =

Выведем  первые пять записей из Spark DataFrame

In [4]:
sdf.limit(5).toPandas().head()

Unnamed: 0,courseid,userid,num_week,s_all,s_all_avg,s_course_viewed,s_course_viewed_avg,s_q_attempt_viewed,s_q_attempt_viewed_avg,s_a_course_module_viewed,...,s_a_submission_status_viewed_avg,NameR_Level,Name_vAtt,Depart,Name_OsnO,Name_FormOPril,LevelEd,Num_Sem,Kurs,Date_vAtt
0,71262,34527,6,9,9,4,4,0,0,0,...,0,3,Экзамен,22,1,1,1,2,2,18.06.2022
1,71262,34527,7,0,45,0,2,0,0,0,...,0,3,Экзамен,22,1,1,1,2,2,18.06.2022
2,71262,34527,8,0,3,0,13333,0,0,0,...,0,3,Экзамен,22,1,1,1,2,2,18.06.2022
3,71262,34527,9,0,225,0,1,0,0,0,...,0,3,Экзамен,22,1,1,1,2,2,18.06.2022
4,71262,34527,10,0,18,0,8,0,0,0,...,0,3,Экзамен,22,1,1,1,2,2,18.06.2022


In [5]:
print('Количество записей в наборе данных:', sdf.count())

Количество записей в наборе данных: 414528


In [6]:
sdf.orderBy('s_course_viewed').limit(5).toPandas()

Unnamed: 0,courseid,userid,num_week,s_all,s_all_avg,s_course_viewed,s_course_viewed_avg,s_q_attempt_viewed,s_q_attempt_viewed_avg,s_a_course_module_viewed,...,s_a_submission_status_viewed_avg,NameR_Level,Name_vAtt,Depart,Name_OsnO,Name_FormOPril,LevelEd,Num_Sem,Kurs,Date_vAtt
0,84236,33622,25,0,231,0,42,0,305,0,...,375,4,Экзамен,23,1,1,1,2,2,23.06.2022
1,75656,29359,28,0,2174,0,1304,0,0,0,...,435,2,Экзамен,12,1,1,2,4,3,17.06.2022
2,71262,34527,10,0,18,0,8,0,0,0,...,0,3,Экзамен,22,1,1,1,2,2,18.06.2022
3,75656,29359,29,0,2083,0,125,0,0,0,...,417,2,Экзамен,12,1,1,2,4,3,17.06.2022
4,84236,33622,26,0,22,0,4,0,29048,0,...,35714,4,Экзамен,23,1,1,1,2,2,23.06.2022


In [7]:
sdf.select('Name_vAtt').distinct().show()

+---------+
|Name_vAtt|
+---------+
|  Экзамен|
+---------+



Группировка данных, подсчет частоты появления

In [8]:
sdf.groupby('userid').count().show()

+------+-----+
|userid|count|
+------+-----+
| 35444|   72|
| 34657|   96|
| 29089|   48|
| 29573|   48|
| 30966|   24|
| 35350|   96|
| 29539|   72|
| 21783|   72|
| 24269|   72|
| 22121|   48|
|  5613|   48|
| 24528|   48|
| 27492|   48|
| 15539|   24|
| 24078|   24|
| 25350|   48|
| 30054|   24|
| 35438|   72|
| 27753|   72|
| 27563|   96|
+------+-----+
only showing top 20 rows



In [9]:
sdf.groupby('userid').count().sort(col('count').desc()).show()

+------+-----+
|userid|count|
+------+-----+
| 33470|  120|
| 33541|  120|
| 33528|  120|
| 20252|  120|
| 24347|  120|
| 33864|  120|
| 20734|  120|
| 24631|  120|
| 34345|  120|
| 25123|  120|
| 25267|  120|
| 30067|  120|
| 34186|  120|
| 33583|  120|
| 36019|  120|
| 36001|  120|
| 24710|  120|
| 33685|  120|
| 33463|  120|
| 35787|  120|
+------+-----+
only showing top 20 rows



Фильтрация данных

In [10]:
sdf.select(
    'userid',
    'num_week',
    'courseid',
    's_course_viewed'
).filter(
    (col('userid') == '33470') & (col('courseid') == '74257')
).limit(
    30
).toPandas()

Unnamed: 0,userid,num_week,courseid,s_course_viewed
0,33470,6,74257,5
1,33470,7,74257,0
2,33470,8,74257,12
3,33470,9,74257,4
4,33470,10,74257,0
5,33470,11,74257,14
6,33470,12,74257,5
7,33470,13,74257,5
8,33470,14,74257,2
9,33470,15,74257,8


## Задание 1:
### Анализ активности студентов на портале
Рассчитать общее количество событий (s_all) и просмотров курсов (s_course_viewed) по каждой неделе (num_week).

Примечание. Используйте методы groupBy + agg. 

Проанализируйте полученные данные, какие выводы можно сделать?

In [11]:
from pyspark.sql.functions import sum

In [12]:
sdf = sdf.withColumn("num_week", sdf["num_week"].cast("int")) \
         .withColumn("s_all", sdf["s_all"].cast("int")) \
         .withColumn("s_course_viewed", sdf["s_course_viewed"].cast("int"))

In [13]:
sdf.groupBy('num_week').agg(
    sum('s_all'),
    sum('s_course_viewed')
).toPandas()

Unnamed: 0,num_week,sum(s_all),sum(s_course_viewed)
0,28,1554,467
1,26,148590,27531
2,27,21641,4210
3,12,162041,37901
4,22,388401,72299
5,13,172962,38890
6,6,238295,75540
7,16,177650,38385
8,20,249532,52905
9,19,191952,40742


## Задание 2.
### Определение самых популярных курсов

Найти топ-5 курсов (courseid) по среднему количеству просмотров (s_course_viewed_avg).


In [14]:
from pyspark.sql.functions import sum
from pyspark.sql.functions import avg

In [15]:
sdf = sdf.withColumn("s_course_viewed_avg", sdf["s_course_viewed_avg"].cast("double"))

In [16]:
sdf.groupBy('courseid').agg(
    avg('s_course_viewed_avg')
).sort(col('avg(s_course_viewed_avg)').desc()).limit(5).show()

+--------+------------------------+
|courseid|avg(s_course_viewed_avg)|
+--------+------------------------+
|   76419|                    28.1|
|   78733|      27.545454545454547|
|   78705|      22.945054945054945|
|   82552|      20.387755102040817|
|   84165|      18.771084337349397|
+--------+------------------------+



In [17]:
sdf.groupBy('courseid').agg(
    sum('s_course_viewed_avg')
).sort(col('sum(s_course_viewed_avg)').desc()).limit(5).show()

+--------+------------------------+
|courseid|sum(s_course_viewed_avg)|
+--------+------------------------+
|   76419|                  4777.0|
|   71541|                  2674.0|
|   71675|                  2608.0|
|   73025|                  2566.0|
|   83768|                  2502.0|
+--------+------------------------+



## Задание 3: 
### Мини-исследование

Определить: имеется ли связь между количеством просмотров студентов отдельных курсов и количеством студентов, изучающих отдельный курс.



In [18]:
from pyspark.sql.functions import sum, countDistinct, corr

In [19]:
sdf = sdf.withColumn("s_course_viewed", sdf["s_course_viewed"].cast("int"))

In [20]:
# Агрегация: суммарное количество просмотров и уникальные студенты по курсу
course_stats = sdf.groupBy("courseid").agg(
    sum("s_course_viewed").alias("total_views"), 
    countDistinct("userid").alias("unique_students")
)
course_stats.show(10)

+--------+-----------+---------------+
|courseid|total_views|unique_students|
+--------+-----------+---------------+
|   78382|        215|             10|
|   87097|        889|             15|
|   72175|       2798|             45|
|   73414|        270|             11|
|   72314|       6187|             22|
|   73555|        162|              7|
|   75891|        444|              5|
|   88285|        674|             16|
|   73086|       1258|             13|
|   73043|       3418|             12|
+--------+-----------+---------------+
only showing top 10 rows



In [21]:
correlation = course_stats.select(corr("total_views", "unique_students")).collect()[0][0]

In [22]:
print(correlation)

0.4743706447188514


## Задание 4. 
### Сравнение активности студентов на бюджете и контракте

Найти разницу в среднем количестве всех событий (s_all_avg) между студентами на бюджете и контракте (name_osno). 

Какие выводы можно сделать?



In [23]:
from pyspark.sql.functions import col, avg

In [24]:
sdf = sdf.withColumn('s_all_avg', sdf["s_all_avg"].cast("float"))

In [25]:
activity_stats = sdf.groupBy("Name_OsnO").agg(avg("s_all_avg").alias("avg_s_all"))

In [26]:
activity_stats.show()

+---------+------------------+
|Name_OsnO|         avg_s_all|
+---------+------------------+
|        1|  8.41703942142887|
|        2|3.4732525769317584|
+---------+------------------+



In [27]:
activity_pivot = activity_stats.toPandas().set_index("Name_OsnO").T
if "1" in activity_pivot and "2" in activity_pivot:
    diff = activity_pivot["1"].values[0] - activity_pivot["2"].values[0]
    print(f"Разница в среднем количестве событий: {diff:.2f}")
else:
    print("Ошибка: не удалось найти обе категории (Бюджет и Контракт) в данных.")

Разница в среднем количестве событий: 4.94


## Задание 5.
### Исследование зависимости активности студентов от формы обучения

Определить, как форма обучения (name_formopril) влияет на активность студентов (s_all_avg)?



In [28]:
sdf.groupBy("Name_FormOPril").agg(
    avg("s_all_avg").alias("avg_s_all")
).show()

+--------------+------------------+
|Name_FormOPril|         avg_s_all|
+--------------+------------------+
|             3|1.2953216374269005|
|             1|11.699871821520128|
|             2|1.2438109107202868|
+--------------+------------------+



## Задание 6. 
### Выявление активности студентов по семестрам

Определить, в каком семестре (num_sem) студенты проявляют наибольшую активность (по s_all_avg).



In [29]:
sdf.groupBy('num_sem').agg(
    avg('s_all_avg').alias('s_all_avg')
).show()

+-------+-------------------+
|num_sem|          s_all_avg|
+-------+-------------------+
|      8|  3.508361204013378|
|      6|  6.696356437227533|
|     10|  2.418956814357824|
|      4|  6.216743692230827|
|      2|  9.363531867267612|
|     12|0.08901884340480831|
+-------+-------------------+



## Задание 7.
### Определение кафедр с наибольшей активностью студентов

Найти топ-3 кафедры (depart), где студенты наиболее активны (по s_all_avg).



In [30]:
sdf.groupBy('depart').agg(
    avg('s_all_avg').alias('s_all_avg')
).sort(col('s_all_avg').desc()).show(3)

+------+------------------+
|depart|         s_all_avg|
+------+------------------+
|     4|26.657972928359193|
|    12|20.158597662771285|
|    14| 15.56260819388344|
+------+------------------+
only showing top 3 rows



## Задание 8. 
### Анализ успеваемости студентов в зависимости от активности

Найти среднюю оценку (namer_level) для студентов с разной активностью (s_all_avg).



In [31]:
sdf.groupBy("s_all_avg").agg(
    avg("NameR_Level").alias("avg_nameR_level")
).show()

+---------+------------------+
|s_all_avg|   avg_nameR_level|
+---------+------------------+
|     18.0| 4.282534246575342|
|     64.0| 4.362068965517241|
|     82.0| 4.457142857142857|
|    107.0| 4.217391304347826|
|     47.0| 4.325757575757576|
|      9.0| 4.229078613693998|
|    453.0|               5.0|
|     58.0| 4.382022471910112|
|    101.0| 4.315789473684211|
|      5.0| 4.149025803054239|
|     39.0| 4.447368421052632|
|    132.0| 4.444444444444445|
|    144.0| 4.444444444444445|
|     17.0| 4.343465045592705|
|     30.0| 4.307971014492754|
|    105.0|4.7894736842105265|
|    190.0|               4.0|
|    117.0|             4.125|
|    163.0| 4.666666666666667|
|    183.0|               5.0|
+---------+------------------+
only showing top 20 rows



## Задание 9.
### Выявление студентов с аномально низкой активностью
Найти студентов с количеством событий (s_all_avg) ниже среднего по курсу (kurs).

Подсказка: Используйте window functions. 
`````
from pyspark.sql.window import Window
from pyspark.sql.functions import mean
`````

In [32]:
from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col

In [33]:
# Определяем оконную функцию для вычисления среднего значения по каждому курсу (kurs)
window_spec = Window.partitionBy("kurs")

In [34]:
# Вычисляем среднее значение активности по курсу и добавляем его как новый столбец
sdf_with_avg = sdf.withColumn("avg_s_all_avg", mean("s_all_avg").over(window_spec))

In [35]:
# Фильтруем студентов с активностью ниже среднего по курсу
students_below_avg = sdf_with_avg.filter(col("s_all_avg") < col("avg_s_all_avg"))

In [36]:
# Выводим результаты
students_below_avg.select("userid", "kurs", "s_all_avg", "avg_s_all_avg").show(truncate=False)

+------+----+---------+-----------------+
|userid|kurs|s_all_avg|avg_s_all_avg    |
+------+----+---------+-----------------+
|34527 |2   |9.0      |9.363531867267612|
|34527 |2   |3.0      |9.363531867267612|
|34527 |2   |1.0      |9.363531867267612|
|34609 |2   |6.0      |9.363531867267612|
|34609 |2   |1.0      |9.363531867267612|
|34609 |2   |1.0      |9.363531867267612|
|34610 |2   |8.0      |9.363531867267612|
|34610 |2   |4.0      |9.363531867267612|
|34610 |2   |2.0      |9.363531867267612|
|34610 |2   |2.0      |9.363531867267612|
|34611 |2   |6.0      |9.363531867267612|
|34611 |2   |4.0      |9.363531867267612|
|34611 |2   |3.0      |9.363531867267612|
|34611 |2   |2.0      |9.363531867267612|
|34611 |2   |1.0      |9.363531867267612|
|34612 |2   |9.0      |9.363531867267612|
|34612 |2   |6.0      |9.363531867267612|
|34612 |2   |4.0      |9.363531867267612|
|34612 |2   |3.0      |9.363531867267612|
|34613 |2   |8.0      |9.363531867267612|
+------+----+---------+-----------

## Задание 10.
### Кластеризация студентов по активности 

Разделить студентов на 3 группы (низкая, средняя, высокая активность) с помощью K-Means.

Примечание.  Используйте KMeans из pyspark.ml.clustering. 
Обратите внимание, что по каждому студенту в наборе данных представлены логи, поэтому прежде чем проводить кластеризацию нужно сначала  провести агрегацию данных для каждого студента.

In [37]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

In [40]:
# 1. Агрегация данных по каждому студенту
student_activity = sdf.groupBy("userid").agg(
    # Используем среднее значение активности для каждого студента
    mean("s_course_viewed").alias("avg_course_viewed"),
    mean("s_q_attempt_viewed").alias("avg_attempt_viewed"),
    mean("s_a_submission_status_viewed").alias("avg_submission_status_viewed"),
    mean("s_all").alias("avg_s_all")
)

In [41]:
# 2. Используем VectorAssembler для подготовки данных для KMeans
# KMeans требует, чтобы входные данные были в виде вектора признаков.
vector_assembler = VectorAssembler(inputCols=["avg_course_viewed", "avg_attempt_viewed", 
                                              "avg_submission_status_viewed", "avg_s_all"], outputCol="features")
student_activity_vectorized = vector_assembler.transform(student_activity)

In [42]:
# 3. Применяем KMeans для кластеризации студентов на 3 группы
kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="cluster")
model = kmeans.fit(student_activity_vectorized)

In [43]:
# 4. Получаем результаты кластеризации
student_activity_with_cluster = model.transform(student_activity_vectorized)

In [44]:
# 5. Выводим результаты (студент, активность, кластер)
student_activity_with_cluster.select(
    "userid", 
    "avg_course_viewed",
    "avg_attempt_viewed",
    "avg_submission_status_viewed",
    "avg_s_all", 
    "cluster").show(truncate=False)

+------+--------------------+------------------+----------------------------+-------------------+-------+
|userid|avg_course_viewed   |avg_attempt_viewed|avg_submission_status_viewed|avg_s_all          |cluster|
+------+--------------------+------------------+----------------------------+-------------------+-------+
|35444 |0.4444444444444444  |0.0               |0.06944444444444445         |0.8333333333333334 |0      |
|34657 |3.2708333333333335  |0.2604166666666667|1.1979166666666667          |10.791666666666666 |0      |
|29089 |0.8333333333333334  |1.7708333333333333|0.20833333333333334         |4.375              |0      |
|29573 |0.7708333333333334  |0.4583333333333333|1.2916666666666667          |7.541666666666667  |0      |
|30966 |0.08333333333333333 |0.0               |0.041666666666666664        |0.16666666666666666|0      |
|35350 |1.1041666666666667  |1.3125            |0.19791666666666666         |4.125              |0      |
|29539 |1.4027777777777777  |0.305555555555555

In [46]:
student_activity_with_cluster.filter(col("cluster") == 0).show(truncate=False)  # Кластер 0

+------+--------------------+------------------+----------------------------+-------------------+-----------------------------------------------------------------------------+-------+
|userid|avg_course_viewed   |avg_attempt_viewed|avg_submission_status_viewed|avg_s_all          |features                                                                     |cluster|
+------+--------------------+------------------+----------------------------+-------------------+-----------------------------------------------------------------------------+-------+
|35444 |0.4444444444444444  |0.0               |0.06944444444444445         |0.8333333333333334 |[0.4444444444444444,0.0,0.06944444444444445,0.8333333333333334]              |0      |
|34657 |3.2708333333333335  |0.2604166666666667|1.1979166666666667          |10.791666666666666 |[3.2708333333333335,0.2604166666666667,1.1979166666666667,10.791666666666666]|0      |
|29089 |0.8333333333333334  |1.7708333333333333|0.20833333333333334         |4.3

In [47]:
student_activity_with_cluster.filter(col("cluster") == 1).show(truncate=False)  # Кластер 1

+------+------------------+------------------+----------------------------+------------------+----------------------------------------------------------------------------+-------+
|userid|avg_course_viewed |avg_attempt_viewed|avg_submission_status_viewed|avg_s_all         |features                                                                    |cluster|
+------+------------------+------------------+----------------------------+------------------+----------------------------------------------------------------------------+-------+
|20252 |10.333333333333334|7.791666666666667 |7.308333333333334           |38.36666666666667 |[10.333333333333334,7.791666666666667,7.308333333333334,38.36666666666667]  |1      |
|34452 |5.208333333333333 |11.770833333333334|4.604166666666667           |40.229166666666664|[5.208333333333333,11.770833333333334,4.604166666666667,40.229166666666664] |1      |
|33853 |6.513888888888889 |10.708333333333334|7.597222222222222           |51.236111111111114|[6.513

In [48]:
student_activity_with_cluster.filter(col("cluster") == 2).show(truncate=False)  # Кластер 2

+------+------------------+-------------------+----------------------------+------------------+----------------------------------------------------------------------------+-------+
|userid|avg_course_viewed |avg_attempt_viewed |avg_submission_status_viewed|avg_s_all         |features                                                                    |cluster|
+------+------------------+-------------------+----------------------------+------------------+----------------------------------------------------------------------------+-------+
|34155 |6.722222222222222 |0.0                |6.819444444444445           |34.875            |[6.722222222222222,0.0,6.819444444444445,34.875]                            |2      |
|34602 |3.875             |10.875             |0.625                       |29.166666666666668|[3.875,10.875,0.625,29.166666666666668]                                     |2      |
|33776 |3.9444444444444446|2.5                |3.888888888888889           |21.76388888888889 |

In [50]:
student_activity_with_cluster.groupBy("cluster").count().show()

+-------+-----+
|cluster|count|
+-------+-----+
|      1|  318|
|      2| 1181|
|      0| 4817|
+-------+-----+

