## Пример на pyspark

В качестве набора данных для примера будем использовать данные конкурса про ответы студентов на тесты
https://www.kaggle.com/c/riiid-test-answer-prediction

При подключении к spark драйверу установим лимиты по памяти и по числу ядер. Также выберем номер порта для Spark UI

Нужно выбрать уникальное имя приложения и номер порта, чтобы не войти в коллизию с другими пользователями

In [1]:
%pylab inline
import pandas as pd
import numpy as np

Populating the interactive namespace from numpy and matplotlib


In [None]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("OTUS")
        .config("spark.dynamicAllocation.enabled", "true")
        .config("spark.executor.memory", "2g")
        .config("spark.driver.memory", "1g")
        .getOrCreate()
)

Данные будем читать из заранее сконвертированного parquet

In [None]:
# Читаем файл с HDFS
# Сначала попробуем прочитать как текст, чтобы увидеть структуру
riiid_df = spark.read.text("data/2022-11-04.txt")

### Исследование структуры файла
Посмотрим первые несколько строк файла, чтобы понять его формат

In [23]:
# Используем системную команду head для просмотра файла на HDFS
!hdfs dfs -cat data/2022-11-04.txt | head -20

# tranaction_id | tx_datetime | customer_id | terminal_id | tx_amount | tx_time_seconds | tx_time_days | tx_fraud | tx_fraud_scenario
1832792610,2022-11-04 14:22:18,0,53,63.58,101139738,1170,0,0
1832792611,2022-11-04 02:12:24,0,53,92.95,101095944,1170,0,0
1832792612,2022-11-04 12:49:35,3,205,48.88,101134175,1170,0,0
1832792613,2022-11-04 02:40:01,5,383,24.69,101097601,1170,0,0
1832792614,2022-11-04 08:02:05,6,858,95.48,101116925,1170,0,0
1832792615,2022-11-04 05:45:04,8,931,60.98,101108704,1170,0,0
1832792616,2022-11-04 20:01:50,8,931,28.48,101160110,1170,0,0
1832792617,2022-11-04 15:11:42,9,450,7.89,101142702,1170,0,0
1832792618,2022-11-04 11:20:49,10,549,63.37,101128849,1170,0,0
1832792619,2022-11-04 23:11:46,10,549,78.02,101171506,1170,0,0
1832792620,2022-11-04 10:26:06,11,337,68.54,101125566,1170,0,0
1832792621,2022-11-04 14:13:55,11,337,12.04,101139235,1170,0,0
1832792622,2022-11-04 04:17:01,11,975,76.00,101103421,1170,0,0
1832792623,2022-11-04 07:39:18,11,975,12.08,101115558,1170

In [12]:
# Посмотрим первые 20 строк файла как текст
riiid_df.show(20, truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------+
|_c0                                                                                                                                  |
+-------------------------------------------------------------------------------------------------------------------------------------+
|# tranaction_id | tx_datetime | customer_id | terminal_id | tx_amount | tx_time_seconds | tx_time_days | tx_fraud | tx_fraud_scenario|
|1832792610                                                                                                                           |
|1832792611                                                                                                                           |
|1832792612                                                                                                                           |
|1832792613                                     

In [13]:
# Подсчитаем общее количество строк
print(f"Общее количество строк: {riiid_df.count()}")

Общее количество строк: 46998984


После просмотра первых строк определим разделитель и прочитаем файл правильно

In [47]:
# Файл имеет заголовок с разделителем '|', но данные с разделителем ','
# Опеределяем схему
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType

# Определяем схему вручную на основе заголовка
schema = StructType([
    StructField("tranaction_id", LongType(), True),
    StructField("tx_datetime", StringType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("terminal_id", IntegerType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_time_seconds", LongType(), True),
    StructField("tx_time_days", IntegerType(), True),
    StructField("tx_fraud", IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True)
])

# Читаем файл
riiid_df = (
    spark.read.csv(
        "data/2022-11-04.txt",
        sep=",",           # разделитель - запятая
        schema=schema      # используем определённую схему
    )
    .filter(col("tranaction_id").isNotNull())  # убираем строку с заголовком (она станет null)
)

Проверим структуру данных после правильного чтения

In [48]:
# Выведем схему данных
riiid_df.printSchema()

root
 |-- tranaction_id: long (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: long (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)



In [49]:
# Посмотрим первые записи
riiid_df.show(10, truncate=False)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|tx_datetime        |customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|1832792610   |2022-11-04 14:22:18|0          |53         |63.58    |101139738      |1170        |0       |0                |
|1832792611   |2022-11-04 02:12:24|0          |53         |92.95    |101095944      |1170        |0       |0                |
|1832792612   |2022-11-04 12:49:35|3          |205        |48.88    |101134175      |1170        |0       |0                |
|1832792613   |2022-11-04 02:40:01|5          |383        |24.69    |101097601      |1170        |0       |0                |
|1832792614   |2022-11-04 08:02:05|6          |858        |95.48    |101116925      |1170        |0       |0          

In [37]:
# Проверим количество строк и колонок
print(f"Количество строк: {riiid_df.count()}")
print(f"Количество колонок: {len(riiid_df.columns)}")
print(f"Названия колонок: {riiid_df.columns}")

Количество строк: 46998983
Количество колонок: 9
Названия колонок: ['tranaction_id', 'tx_datetime', 'customer_id', 'terminal_id', 'tx_amount', 'tx_time_seconds', 'tx_time_days', 'tx_fraud', 'tx_fraud_scenario']


Схема данных и первые 10 записей

In [34]:
riiid_df.printSchema()

root
 |-- tranaction_id: long (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: long (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)



In [35]:
riiid_df.show(10)

+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|   1832792610|2022-11-04 14:22:18|          0|         53|    63.58|      101139738|        1170|       0|                0|
|   1832792611|2022-11-04 02:12:24|          0|         53|    92.95|      101095944|        1170|       0|                0|
|   1832792612|2022-11-04 12:49:35|          3|        205|    48.88|      101134175|        1170|       0|                0|
|   1832792613|2022-11-04 02:40:01|          5|        383|    24.69|      101097601|        1170|       0|                0|
|   1832792614|2022-11-04 08:02:05|          6|        858|    95.48|      101116925|        1170|       0|           

In [50]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter
riiid_df

tranaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
1832792610,2022-11-04 14:22:18,0,53,63.58,101139738,1170,0,0
1832792611,2022-11-04 02:12:24,0,53,92.95,101095944,1170,0,0
1832792612,2022-11-04 12:49:35,3,205,48.88,101134175,1170,0,0
1832792613,2022-11-04 02:40:01,5,383,24.69,101097601,1170,0,0
1832792614,2022-11-04 08:02:05,6,858,95.48,101116925,1170,0,0
1832792615,2022-11-04 05:45:04,8,931,60.98,101108704,1170,0,0
1832792616,2022-11-04 20:01:50,8,931,28.48,101160110,1170,0,0
1832792617,2022-11-04 15:11:42,9,450,7.89,101142702,1170,0,0
1832792618,2022-11-04 11:20:49,10,549,63.37,101128849,1170,0,0
1832792619,2022-11-04 23:11:46,10,549,78.02,101171506,1170,0,0


### Сохраним в паркет

In [51]:
# Проверим, что null-строки убраны
print(f"Количество записей: {riiid_df.count()}")
riiid_df.show(5)

# Сохраним в parquet
(
    riiid_df
        .write
        .mode("overwrite")
        .parquet("data/2022-11-04.parquet")
)

Количество записей: 46998983
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|   1832792610|2022-11-04 14:22:18|          0|         53|    63.58|      101139738|        1170|       0|                0|
|   1832792611|2022-11-04 02:12:24|          0|         53|    92.95|      101095944|        1170|       0|                0|
|   1832792612|2022-11-04 12:49:35|          3|        205|    48.88|      101134175|        1170|       0|                0|
|   1832792613|2022-11-04 02:40:01|          5|        383|    24.69|      101097601|        1170|       0|                0|
|   1832792614|2022-11-04 08:02:05|          6|        858|    95.48|      101116925|    

In [53]:
# Посмотрим содержимое директории parquet
!hdfs dfs -ls data/2022-11-04.parquet

print("\n" + "="*50)
print("Размер файлов внутри parquet директории:")
print("="*50)
!hdfs dfs -du -h data/2022-11-04.parquet

Found 25 items
-rw-r--r--   1 ubuntu hadoop          0 2025-10-28 14:57 data/2022-11-04.parquet/_SUCCESS
-rw-r--r--   1 ubuntu hadoop   38373646 2025-10-28 14:57 data/2022-11-04.parquet/part-00000-1b3fb4df-c0b6-4aba-a614-6a24de12e602-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop   39334709 2025-10-28 14:57 data/2022-11-04.parquet/part-00001-1b3fb4df-c0b6-4aba-a614-6a24de12e602-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop   40269792 2025-10-28 14:57 data/2022-11-04.parquet/part-00002-1b3fb4df-c0b6-4aba-a614-6a24de12e602-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop   41225825 2025-10-28 14:57 data/2022-11-04.parquet/part-00003-1b3fb4df-c0b6-4aba-a614-6a24de12e602-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop   38764936 2025-10-28 14:57 data/2022-11-04.parquet/part-00004-1b3fb4df-c0b6-4aba-a614-6a24de12e602-c000.snappy.parquet
-rw-r--r--   1 ubuntu hadoop   39660129 2025-10-28 14:57 data/2022-11-04.parquet/part-00005-1b3fb4df-c0b6-4aba-a614-6a24de12e602-c000.snappy.parquet
-

Теперь прочитаем parquet и проверим, что данные сохранились

In [54]:
# Читаем parquet обратно
df_from_parquet = spark.read.parquet("data/2022-11-04.parquet")

print(f"Количество записей в parquet: {df_from_parquet.count()}")
print(f"Схема данных:")
df_from_parquet.printSchema()
print("\nПервые 5 записей:")
df_from_parquet.show(5)

Количество записей в parquet: 46998983
Схема данных:
root
 |-- tranaction_id: long (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- terminal_id: integer (nullable = true)
 |-- tx_amount: double (nullable = true)
 |-- tx_time_seconds: long (nullable = true)
 |-- tx_time_days: integer (nullable = true)
 |-- tx_fraud: integer (nullable = true)
 |-- tx_fraud_scenario: integer (nullable = true)


Первые 5 записей:
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|tranaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+-------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|   1838826044|2022-11-07 15:08:53|     850577|        324|    65.72|      101401733|        1173|       0|                0|
|   1838826045|2022-11

Замерим время выполнения простых запросов с группировками

In [39]:
from pyspark.sql import functions as f
from pyspark.sql.functions import col

In [40]:
%%time
(
riiid_df
    .select('content_id', 'answered_correctly')
    .groupBy('content_id')
    .mean('answered_correctly')
    .show()
)

AnalysisException: cannot resolve '`content_id`' given input columns: [customer_id, terminal_id, tranaction_id, tx_amount, tx_datetime, tx_fraud, tx_fraud_scenario, tx_time_days, tx_time_seconds];;
'Project ['content_id, 'answered_correctly]
+- Relation[tranaction_id#344L,tx_datetime#345,customer_id#346,terminal_id#347,tx_amount#348,tx_time_seconds#349L,tx_time_days#350,tx_fraud#351,tx_fraud_scenario#352] csv


In [41]:
%%time
(
riiid_df
    .select('user_id', 'answered_correctly')
    .where(col('answered_correctly') != -1)
    .groupby('user_id')
    .mean('answered_correctly')
    .show()
)

AnalysisException: cannot resolve '`user_id`' given input columns: [customer_id, terminal_id, tranaction_id, tx_amount, tx_datetime, tx_fraud, tx_fraud_scenario, tx_time_days, tx_time_seconds];;
'Project ['user_id, 'answered_correctly]
+- Relation[tranaction_id#344L,tx_datetime#345,customer_id#346,terminal_id#347,tx_amount#348,tx_time_seconds#349L,tx_time_days#350,tx_fraud#351,tx_fraud_scenario#352] csv


## Упражнение 1
Выведите top 10 студентов с наилучшими результатами. 
Обратите внимание, что поле answered_correctly равно -1, если это была лекция, а не тест. Такие записи нужно исключить.

## Упражнение 2
Выведите top 10 задач с наихудшими результатами

## pyspark user defined functions (UDF)

Как и для других языков, поддерживаемых Spark, для python есть возможность использовать UDF. При этом возникают дополнительные накладные расходы по сравнению с Java и Scala на маршалинг данных.

In [None]:
from pyspark.sql.types import LongType

def to_months(ms):
    return ms // 31536000000 // 12 #1 year = 31536000000 ms

to_months_udf = f.udf(to_months, LongType())

Замерим время выполнения без UDF

In [None]:
%%time
(
    riiid_df
        .select("content_id", "timestamp")
        .groupby("content_id")
        .mean("timestamp")
        .show()
)

Применим простой UDF к похожему запросу

In [None]:
%%time
(
    riiid_df
        .select("content_id", to_days_udf("timestamp").alias("months"))
        .groupBy("content_id")
        .mean("months")
        .show()
)

Перепишем логику, которая была в UDF

In [None]:
%%time
(
 riiid_df
    .select("content_id", (col("timestamp") / 31536000000 / 12).alias("months"))
    .groupby("content_id")
    .mean("months")
    .show()
)

## Упражнение 4
Постройте гистограмму по числу месяцев до первого взаимодействия студента с заданием

## Обогащение данных

Таблица с вопросами лежит в отдельном файле questions.csv. 

In [None]:
questions = spark.read.csv("riiid/questions.csv", header=True, inferSchema=True)

In [None]:
questions.count()

In [None]:
questions.printSchema()

Объединим ее с ответами при условии, что эта запись ссылкается на вопрос если content_type_id и content_id - идентификатор вопроса.

In [None]:
df = (
    riiid_df.
        where(f.col("content_type_id") == 0).
        join(questions, riiid_df.content_id == questions.question_id, 'left')
)

In [None]:
df

Проверим, что вероястность правильного ответа не зависит от его номера.

In [None]:
%%time
(
df
    .select('correct_answer', 'answered_correctly')
    .groupby('correct_answer')
    .mean('answered_correctly')
    .show()
)

## Упражнение 5

В файле "riiid/lectures.csv" хранится информация об лекциях. Объедините эту таблицу с основным набором данных при условии, что content_type_id == 1.