In [1]:
import pandas as pd
import re
import getpass
import os
import sys
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import current_date
from pyspark.sql import Window
os.environ['SPARK_HOME'] = '/opt/conda/lib/python3.7/site-packages/pyspark'
os.environ['PYSPARK_PYTHON'] = "python-3.7-cdt-pyspark-2.3.1.tar.gz/bin/python3"
os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf/"
#os.environ['PATH']="/bin:{os.environ['PATH']}"
def init_spark():
    spark = (
        SparkSession
            .builder
            .appName(getpass.getuser())
            .master('yarn')
            .config('spark.driver.memory', '15G')
            .config('spark.driver.maxResultSize', '15G')
            .config('spark.executor.cores', '1')
            .config('spark.executor.memory', '15G')
            .config('spark.yarn.queue', 'bdpc')
            .config('spark.rpc.message.maxSize', '2000')
            .config('spark.shuffle.service.enabled', 'true')
            .config('spark.dynamicAllocation.enabled', 'true')
            .config('spark.dynamicAllocation.maxExecutors', '400')
            .config('spark.sql.shuffle.partitions', '500')
            .config('spark.dynamicAllocation.cachedExecutorIdleTimeout', '1200s')
            .config('spark.yarn.dist.archives', 'hdfs:///share/lib/python/env/python-3.7-cdt-pyspark-2.3.1.tar.gz')
            .config('spark.sql.execution.pandas.respectSessionTimeZone', 'false')
            .config('spark.sql.broadcastTimeout', '36000')
            .config('spark.task.maxFailures','10')
            .config("spark.port.maxRetries","32")
            .config('spark.sql.execution.pandas.respectSessionTimeZone', 'false')
            .config('spark.hadoop.yarn.timeline-service.enabled', 'false')
            .config('spark.driver.extraJavaOptions', '-Dhdp.version=current')
            .config('spark.hadoop.yarn.client.failover-proxy-provider', 'org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider')
            .config('spark.yarn.am.extraJavaOptions', '-Dhdp.version=current')
            .config("spark.blockManager.port","2849")
            .config("spark.driver.port","2850")
            .config("spark.driver.bindAddress","0.0.0.0")
            .config("spark.driver.host","jupyterhub-tst.do.x5.ru")
            .config("spark.ui.port","2851")
            .config("hive.exec.dynamic.partition.mode", "nonstrict")
            .enableHiveSupport()
            .getOrCreate()
    )
    return spark
 
spark = init_spark()
sc = spark.sparkContext
sql = spark.sql
sqlContext = SQLContext(sc)

In [3]:
schema_data = StructType([
    StructField('row_id', LongType(), False),
    StructField('timestamp', LongType(), False),
    StructField('user_id', IntegerType(), False),
    StructField('content_id', IntegerType(), False),
    StructField('content_type_id', IntegerType(), False),
    StructField('task_container_id', IntegerType(), False),
    StructField('user_answer', IntegerType(), False),
    StructField('answered_correctly', IntegerType(), False),
    StructField('prior_question_elapsed_time', DoubleType(), True),
    StructField('prior_question_had_explanation', BooleanType(), True),
])
schema_questions = StructType([
    StructField('question_id', IntegerType(), False),
    StructField('bundle_id', IntegerType(), False),
    StructField('correct_answer', IntegerType(), False),
    StructField('part', IntegerType(), False),
    StructField('tags', StringType(), False)
])

In [4]:
data = spark.read.csv('train.csv', schema=schema_data, sep=',', header=True)
questions = spark.read.csv('questions.csv', schema=schema_questions, sep=',', header=True)
# lectures = spark.read.csv('lectures.csv', sep=',', header=True)

In [9]:
(
    data
    .filter(F.col('prior_question_elapsed_time').isNull())
    .groupby('answered_correctly')
    .agg(F.count('row_id'))
).show()

+------------------+-------------+
|answered_correctly|count(row_id)|
+------------------+-------------+
|                 1|       266940|
|                -1|      1959032|
|                 0|       125566|
+------------------+-------------+



In [13]:
data.dtypes

[('row_id', 'bigint'),
 ('timestamp', 'bigint'),
 ('user_id', 'int'),
 ('content_id', 'int'),
 ('content_type_id', 'int'),
 ('task_container_id', 'int'),
 ('user_answer', 'int'),
 ('answered_correctly', 'int'),
 ('prior_question_elapsed_time', 'double'),
 ('prior_question_had_explanation', 'boolean')]

In [17]:
questions.dtypes

[('question_id', 'int'),
 ('bundle_id', 'int'),
 ('correct_answer', 'int'),
 ('part', 'int'),
 ('tags', 'string')]

# Делаем полный датафрейм

In [6]:
full_df = (
    # оставляем только вопросы
    data.filter(F.col('content_type_id') == 0)
    # присоединяем вопросы
    .join(questions.withColumnRenamed('question_id', 'content_id'), on='content_id', how='left')
    # Приводим типы
    .fillna(False, subset=['prior_question_had_explanation'])
    .withColumn('prior_question_had_explanation', F.col('prior_question_had_explanation').cast('int'))
    .fillna(0., subset=['prior_question_elapsed_time'])
    # Проверяем правильность ответов
    .withColumn('realy_correct', (F.col('user_answer') == F.col('correct_answer')).cast('int'))
    .withColumn(
        'answered_correctly',
        F.when(F.col('realy_correct').isNotNull() & (F.col('realy_correct') != F.col('answered_correctly')), F.col('realy_correct'))
        .otherwise(F.col('answered_correctly'))
    )
    .drop('realy_correct', 'correct_answer', 'content_type_id')
)
full_df.dtypes

[('content_id', 'int'),
 ('row_id', 'bigint'),
 ('timestamp', 'bigint'),
 ('user_id', 'int'),
 ('task_container_id', 'int'),
 ('user_answer', 'int'),
 ('answered_correctly', 'int'),
 ('prior_question_elapsed_time', 'double'),
 ('prior_question_had_explanation', 'int'),
 ('bundle_id', 'int'),
 ('part', 'int'),
 ('tags', 'string')]

In [7]:
%%time
full_df.repartition(100, 'user_id').write.mode('overwrite').saveAsTable('nsurkov.train_table_new')

CPU times: user 16.9 ms, sys: 10.9 ms, total: 27.8 ms
Wall time: 1min 46s


# Формируем признаки вопросов

In [2]:
full_df = spark.table('nsurkov.train_table_new')

In [13]:
# full_df.count()
# 99271300

99271300

In [11]:
full_df.dtypes

[('content_id', 'int'),
 ('row_id', 'bigint'),
 ('timestamp', 'bigint'),
 ('user_id', 'int'),
 ('task_container_id', 'int'),
 ('user_answer', 'int'),
 ('answered_correctly', 'int'),
 ('prior_question_elapsed_time', 'double'),
 ('prior_question_had_explanation', 'int'),
 ('bundle_id', 'int'),
 ('part', 'int'),
 ('tags', 'string')]

In [14]:
# Характеристики ответов в разрезе пачки
window_user = Window.partitionBy('user_id').orderBy('timestamp')
group_cols = ['user_id', 'bundle_id', 'timestamp']
q_time_chars = (
    full_df
    .drop_duplicates(group_cols)
    .withColumn('question_elapsed_time', F.lead('prior_question_elapsed_time').over(window_user))
    .withColumn('question_had_explanation', F.lead('prior_question_had_explanation').over(window_user))
    .filter(F.col('question_elapsed_time').isNotNull())
    .select(*group_cols, 'question_elapsed_time', 'question_had_explanation')
    .repartition(100, group_cols)
)

In [15]:
# Характеристики ответов в разрезе вопроса
q_chars = (
    full_df
    .select('user_id', 'content_id', 'bundle_id', 'timestamp', 'answered_correctly')
    .repartition(100, group_cols)
    .join(q_time_chars, on=group_cols, how='left')
    .groupby('content_id')
    .agg(
        F.mean('question_elapsed_time').alias('question_elapsed_time_mean'),
        F.mean('question_had_explanation').alias('question_had_explanation_mean'),
        F.count('user_id').alias('question_answers_count'),
        F.countDistinct('user_id').alias('question_unique_users_answers'),
        F.sum('answered_correctly').alias('question_correct_answers_count')
    )
    .withColumn(
        'question_user_var',
        F.col('question_unique_users_answers') / F.col('question_answers_count')
    )
    .drop('question_unique_users_answers')
    .withColumn(
        'question_correct_answers_mean',
        F.col('question_correct_answers_count') / F.col('question_answers_count')
    )
)

In [17]:
# Объединяем характеристики и записываем
questions_characteristics = (
    questions
    .select(
        F.col('question_id').alias('content_id'), 'bundle_id', 'part'
    )
    .join(q_chars, on=['content_id'], how='left')
)

In [18]:
questions_characteristics.dtypes

[('content_id', 'int'),
 ('bundle_id', 'int'),
 ('part', 'int'),
 ('question_elapsed_time_mean', 'double'),
 ('question_had_explanation_mean', 'double'),
 ('question_answers_count', 'bigint'),
 ('question_correct_answers_count', 'bigint'),
 ('question_user_var', 'double'),
 ('question_correct_answers_mean', 'double')]

In [19]:
%%time
questions_characteristics.repartition(1).write.mode('overwrite').saveAsTable('nsurkov.questions_chars')

CPU times: user 37.8 ms, sys: 26.1 ms, total: 63.8 ms
Wall time: 4min 38s


In [20]:
questions_characteristics = spark.table('nsurkov.questions_chars')

In [21]:
questions_characteristics.count()

13523

In [22]:
questions_characteristics.filter(F.col('question_answers_count').isNull()).count()

0

In [23]:
questions_characteristics.filter(F.col('question_answers_count') < 10).count()

11

In [24]:
questions_characteristics.show()

+----------+---------+----+--------------------------+-----------------------------+----------------------+------------------------------+------------------+-----------------------------+
|content_id|bundle_id|part|question_elapsed_time_mean|question_had_explanation_mean|question_answers_count|question_correct_answers_count| question_user_var|question_correct_answers_mean|
+----------+---------+----+--------------------------+-----------------------------+----------------------+------------------------------+------------------+-----------------------------+
|       241|      241|   2|          19889.1523414345|           0.9889349930843707|                 10179|                          7949|0.8867275763827488|           0.7809215050594361|
|       449|      449|   2|         19988.88888888889|           0.9722222222222222|                   180|                           116|0.8944444444444445|           0.6444444444444445|
|      2443|     2443|   3|         21074.27561286666|      

# Формируем признаки пользователей

## Формирование признаков в разрезе user_id, bundle_id, timestamp

In [2]:
full_df = spark.table('nsurkov.train_table_new')
q_chars = spark.table('nsurkov.questions_chars')

In [3]:
q_chars.dtypes

[('content_id', 'int'),
 ('bundle_id', 'int'),
 ('part', 'int'),
 ('question_elapsed_time_mean', 'double'),
 ('question_had_explanation_mean', 'double'),
 ('question_answers_count', 'bigint'),
 ('question_correct_answers_count', 'bigint'),
 ('question_user_var', 'double'),
 ('question_correct_answers_mean', 'double')]

In [4]:
full_df.dtypes

[('content_id', 'int'),
 ('row_id', 'bigint'),
 ('timestamp', 'bigint'),
 ('user_id', 'int'),
 ('task_container_id', 'int'),
 ('user_answer', 'int'),
 ('answered_correctly', 'int'),
 ('prior_question_elapsed_time', 'double'),
 ('prior_question_had_explanation', 'int'),
 ('bundle_id', 'int'),
 ('part', 'int'),
 ('tags', 'string')]

In [12]:
# def form_bundles(full_df, q_chars):
#     # Ищем насколько пользователь медленнее остальных отвечал на вопросы
#     q_time_chars = (
#         q_chars
#         .select(
#             F.col('bundle_id').alias('prior_bundle_id'),
#             F.col('question_elapsed_time_mean').alias('prior_bundle_elapsed_time_mean')
#         )
#         .drop_duplicates(['prior_bundle_id'])
#     )
#     # Переходим к характеристикам пачек, так как временные характеристики для вопросов пачки одинаковые
#     bundles = (
#         full_df
#         .fillna(False, ['prior_question_had_explanation'])
#         .fillna(0., ['prior_question_elapsed_time'])
#         .withColumn('prior_question_had_explanation', F.col('prior_question_had_explanation').cast('int'))
#         .groupby('user_id', 'timestamp', 'bundle_id', 'prior_question_elapsed_time', 'prior_question_had_explanation')
#         .agg(
#             F.count('row_id').alias('answers_count'),
#             F.sum('answered_correctly').alias('correct_answers_count')
#         )
#         .repartition(800, ['user_id', 'bundle_id'])
#     )
#     # Ищем комулятивные характеристики пользователей
#     bundles = (
#         bundles
#         # Характеристики в разрезе контента
#         # Количество ответов пользователя
#         .withColumn('user_answers_count', F.sum('answers_count').over(window_user))
#         .drop('answers_count')
#         # Количество правильных ответов пользователя
#         .withColumn('user_correct_answers_count', F.sum('correct_answers_count').over(window_user))
#         .drop('correct_answers_count')
#         # Характеристики в разрезе пачки
#         # Количество пачек на которые ответил пользоваетль до текущей
#         .withColumn('user_bundle_count', (F.row_number().over(window_user) - 1))
#         # Количество объяснений, которые просмотрел пользователь до текущего ответа
#         .withColumn('user_explanation_count', F.sum('prior_question_had_explanation').over(window_user))
#         .drop('prior_question_had_explanation')
#         # Суммарное время ответов пользователя до текущего ответа
#         .withColumn('user_elapsed_time', F.sum('prior_question_elapsed_time').over(window_user))
#         # Суммарная задержка при ответе на пачки вопросов до текущего ответа
#         .withColumn('prior_bundle_id', F.lag('bundle_id').over(window_user))
#         .join(q_time_chars, on=['prior_bundle_id'], how='left')
#         .withColumn('prior_bundle_elapsed_time_mean', 
#                     F.when(F.col('prior_bundle_elapsed_time_mean').isNull(), F.col('prior_question_elapsed_time')).otherwise(F.col('prior_bundle_elapsed_time_mean')))
#         .withColumn('prior_bundle_delay', (F.col('prior_question_elapsed_time') - F.col('prior_bundle_elapsed_time_mean')) / F.col('prior_bundle_elapsed_time_mean'))
#         .withColumn('user_total_bundle_delay', F.sum('prior_bundle_delay').over(window_user))
#         .drop('prior_bundle_elapsed_time_mean', 'prior_bundle_id', 'prior_bundle_delay', 'prior_question_elapsed_time')
#         # Колонка ответов на которые пользователь уже отвечал
#         .withColumn('bundle_id_str', F.concat(F.lit(' '), F.col('bundle_id').cast('string'), F.lit(',')))
#         .withColumn('user_bundles_list', F.collect_list('bundle_id_str').over(window_user))
#         .withColumn('user_bundles_list', F.array_join('user_bundles_list', ''))
#         .drop('bundle_id_str')
#         # Колонка попыток пользователя ответить на вопрос, включая текущий
#         .withColumn('attempts', attemts_udf(F.struct('bundle_id', 'user_bundles_list')))
#     )
#     return bundles

In [8]:
def form_bundles(full_df, q_chars):
    window_user = Window.partitionBy('user_id').orderBy('timestamp')
    # Переходим к характеристикам пачек, так как временные характеристики для вопросов пачки одинаковые
    bundles = (
        full_df
        .join(q_chars.select('content_id', 'question_elapsed_time_mean'), on='content_id', how='left')
        .groupby(['user_id', 'bundle_id', 'timestamp', 'prior_question_elapsed_time', 'prior_question_had_explanation'])
        .agg(
            F.count('content_id').alias('bundle_content_count'), # количество вопросов в пачке
            F.sum('question_elapsed_time_mean').alias('bundle_elapsed_time_avg') # суммарное среднее время ответов на вопросы из пачки
        )
        # Суммарное время ответа пользователя на пачку
        .withColumn(
            'user_bundle_elapsed_time', 
            F.lead('prior_question_elapsed_time', default=0).over(window_user) * F.col('bundle_content_count')
        )
        # Количество объяснений полученных на пачку
        .withColumn(
            'user_bundle_explanation_count', 
            F.lead('prior_question_had_explanation', default=0).over(window_user) * F.col('bundle_content_count')
        )
        # Общая задержка ответа пользователя на пачку
        .withColumn(
            'user_bundle_delay',
            F.when(F.col('user_bundle_elapsed_time') == 0, 0)
            .otherwise(F.col('user_bundle_elapsed_time') - F.col('bundle_elapsed_time_avg'))
        )
        # Суммарное количество времени, потраченное пользователем на ответы до текущего момента
        .withColumn(
            'user_elapsed_time',
            F.sum('user_bundle_elapsed_time').over(window_user) - F.col('user_bundle_elapsed_time')
        )
        # Суммарное количество объяснений на вопросы, просмотренных пользователем до текущего момента
        .withColumn(
            'user_explanation_count',
            F.sum('user_bundle_explanation_count').over(window_user) - F.col('user_bundle_explanation_count')
        )
        # Суммарная задержка пользователя при ответе на вопросы до текущего момента
        .withColumn(
            'user_delay_total',
            F.sum('user_bundle_delay').over(window_user) - F.col('user_bundle_delay')
        )
        .drop('prior_question_elapsed_time', 'prior_question_had_explanation', 
              'user_bundle_elapsed_time', 'user_bundle_explanation_count', 'user_bundle_delay')
    )
    
    # Также будем считать попытки ответить на пачку вопросов, так как в разрезе контента решение не влезает во время
    attemts_udf = F.udf(lambda x: len(re.findall(x[0], x[1])), IntegerType())
    bundles = (
        bundles
        # Колонка пачек вопросов на которые пользователь уже отвечал
        .withColumn('bundle_id_str', F.concat(F.lit(' '), F.col('bundle_id').cast('string'), F.lit(',')))
        .withColumn('user_bundle_list', F.collect_list('bundle_id_str').over(window_user))
        .withColumn('user_bundle_list', F.array_join('user_bundle_list', ''))
        # Колонка попыток пользователя ответить на пачку вопросов, включая текущий
        .withColumn('attempts', attemts_udf(F.struct('bundle_id_str', 'user_bundle_list')))
        .drop('bundle_id_str')
        # Колонка попыток пользователя (Умноженное на количество вопросов в каждой пачке!)
        .withColumn('attempts_count', F.sum(F.col('attempts') * F.col('bundle_content_count')).over(window_user))
    )
    
    return bundles

In [9]:
def form_user_chars(full_df, q_chars):
    window_user = Window.partitionBy('user_id').orderBy('timestamp')
    attemts_udf = F.udf(lambda x: len(re.findall(' ' + str(x[0]) + ',', x[1])))
    # Формирование признаков в разрезе вопроса, которые можно задать ипользуя только данные от ответов
    user_contetn_chars = (
        full_df
        .withColumn('user_correct_answers_count', F.sum('answered_correctly').over(window_user))
        .withColumn('user_answers_count', F.count('row_id').over(window_user))
    )
    
    # Добавляем характеристики пачек и вопросов
    bundles = form_bundles(full_df, q_chars)
    user_full_chars = (
        user_contetn_chars
        .join(bundles, on=['user_id', 'bundle_id', 'timestamp'], how='left')
        .join(q_chars.drop('bundle_id', 'part'), on='content_id', how='left')
    )
    return user_full_chars

In [10]:
expanded_test_dataframe = form_user_chars(full_df, q_chars)
expanded_test_dataframe.dtypes

[('content_id', 'int'),
 ('user_id', 'int'),
 ('bundle_id', 'int'),
 ('timestamp', 'bigint'),
 ('row_id', 'bigint'),
 ('task_container_id', 'int'),
 ('user_answer', 'int'),
 ('answered_correctly', 'int'),
 ('prior_question_elapsed_time', 'double'),
 ('prior_question_had_explanation', 'int'),
 ('part', 'int'),
 ('tags', 'string'),
 ('user_correct_answers_count', 'bigint'),
 ('user_answers_count', 'bigint'),
 ('bundle_content_count', 'bigint'),
 ('bundle_elapsed_time_avg', 'double'),
 ('user_elapsed_time', 'double'),
 ('user_explanation_count', 'bigint'),
 ('user_delay_total', 'double'),
 ('user_bundle_list', 'string'),
 ('attempts', 'int'),
 ('attempts_count', 'bigint'),
 ('question_elapsed_time_mean', 'double'),
 ('question_had_explanation_mean', 'double'),
 ('question_answers_count', 'bigint'),
 ('question_correct_answers_count', 'bigint'),
 ('question_user_var', 'double'),
 ('question_correct_answers_mean', 'double')]

In [11]:
%%time
expanded_test_dataframe.repartition(1000).write.mode('overwrite').saveAsTable('nsurkov.train_data_with_features')

CPU times: user 359 ms, sys: 223 ms, total: 582 ms
Wall time: 16min 59s


In [12]:
spark.table('nsurkov.train_data_with_features').count()
#99271302

99271302

In [24]:
spark.table('nsurkov.train_data_with_features').drop_duplicates(['user_id', 'timestamp']).count()

76483589

## Формирование признаков для выборок

In [2]:
schema_data = StructType([
    StructField('row_id', LongType(), False),
    StructField('timestamp', LongType(), False),
    StructField('user_id', IntegerType(), False),
    StructField('content_id', IntegerType(), False),
    StructField('content_type_id', IntegerType(), False),
    StructField('task_container_id', IntegerType(), False),
    StructField('user_answer', IntegerType(), False),
    StructField('answered_correctly', IntegerType(), False),
    StructField('prior_question_elapsed_time', DoubleType(), True),
    StructField('prior_question_had_explanation', BooleanType(), True),
])
data = spark.read.csv('train.csv', schema=schema_data, sep=',', header=True).repartition(100, 'user_id')

full_train_df = spark.table('nsurkov.train_data_with_features').repartition('user_id')
train_features = spark.table('nsurkov.train_data_for_model').repartition('user_id')

In [5]:
# Формирование тренировочного датафрейма
window_user = Window.partitionBy('user_id').orderBy('timestamp')
train_features = (
    full_train_df
    .drop('task_container_id', 'user_answer', 'tags', 'user_bundle_list', 'bundle_elapsed_time_avg',
          'question_correct_answers_count')
    .withColumn(
        'user_accuracy', F.col('user_correct_answers_count') / F.col('user_answers_count')
    )
    .drop('user_correct_answers_count')
    .withColumn(
        'user_attempts_rate', F.col('attempts_count') / F.col('user_answers_count')
    )
    .drop('attempts_count')
    .withColumn(
        'user_bundle_content_count_sum',
        F.sum('bundle_content_count').over(window_user) - F.col('bundle_content_count')
    )
    .drop('bundle_content_count')
    .withColumn(
        'user_elapsed_time_mean', F.col('user_elapsed_time') / F.col('user_bundle_content_count_sum')
    )
    .drop('user_elapsed_time')
    .withColumn(
        'user_explanation_rate', F.col('user_explanation_count') / F.col('user_bundle_content_count_sum')
    )
    .drop('user_explanation_count')
    .withColumn(
        'user_delay_mean', F.col('user_delay_total') / F.col('user_bundle_content_count_sum')
    )
    .drop('user_delay_total')
)

In [6]:
train_features.dtypes

[('content_id', 'int'),
 ('user_id', 'int'),
 ('bundle_id', 'int'),
 ('timestamp', 'bigint'),
 ('row_id', 'bigint'),
 ('answered_correctly', 'int'),
 ('prior_question_elapsed_time', 'double'),
 ('prior_question_had_explanation', 'int'),
 ('part', 'int'),
 ('user_answers_count', 'bigint'),
 ('attempts', 'int'),
 ('question_elapsed_time_mean', 'double'),
 ('question_had_explanation_mean', 'double'),
 ('question_answers_count', 'bigint'),
 ('question_user_var', 'double'),
 ('question_correct_answers_mean', 'double'),
 ('user_accuracy', 'double'),
 ('user_attempts_rate', 'double'),
 ('user_bundle_content_count_sum', 'bigint'),
 ('user_elapsed_time_mean', 'double'),
 ('user_explanation_rate', 'double'),
 ('user_delay_mean', 'double')]

In [7]:
%%time
train_features.repartition(100).write.mode('overwrite').saveAsTable('nsurkov.train_data_for_model')

CPU times: user 22.3 ms, sys: 9.3 ms, total: 31.6 ms
Wall time: 1min 53s


In [10]:
%%time
# Формирование валидационных выборок

# Формирование выборки из последних ответов пользователя
user_last_answers_times = data.groupby('user_id').agg(F.max('timestamp').alias('timestamp')).drop_duplicates()
valid_users = user_last_answers_times.limit(100000)
valid_data = data.join(valid_users, on=['user_id', 'timestamp'], how='inner')
valid_data.repartition(1).write.mode('overwrite').saveAsTable('nsurkov.valid_data_last_user_answers')
print('valid_data_saved')

# Формирование выборки из ползователей, которые дали мало ответов
valid_users = (
    train_features.filter(F.col('user_answers_count') < 5)
    .select('user_id', 'timestamp')
    .limit(100000)
    .drop_duplicates()
)
valid_data = data.join(valid_users, on=['user_id', 'timestamp'], how='inner')
valid_data.repartition(1).write.mode('overwrite').saveAsTable('nsurkov.valid_data_first_user_answers')
print('valid_data_saved')

valid_data_saved
valid_data_saved
CPU times: user 47.7 ms, sys: 31.2 ms, total: 78.9 ms
Wall time: 5min 6s


In [8]:
%%time
users_chars_full = (
    full_train_df
    .drop('content_id', 'row_id', 'task_container_id', 'user_answer', 'answered_correctly', 
          'prior_question_elapsed_time', 'prior_question_had_explanation', 'part', 'tags', 
          'question_elapsed_time_mean', 'question_had_explanation_mean', 'question_answers_count', 
          'question_correct_answers_count', 'question_user_var', 'question_correct_answers_mean')
)
# Формирование характеристик пользователя для валидационных выборок
for table in ['valid_data_last_user_answers', 'valid_data_first_user_answers']:
    tag = table[10:]
    valid_users = spark.table('nsurkov.' + table).select('user_id', 'timestamp').drop_duplicates()
    users_chars_full_valid = users_chars_full.join(valid_users, on=['user_id', 'timestamp'], how='left_anti')
    user_last_answers= users_chars_full_valid.groupby('user_id').agg(F.max('user_answers_count').alias('user_answers_count')).drop_duplicates()
    user_characteristics = (
        users_chars_full_valid
        .join(user_last_answers, on=['user_id', 'user_answers_count'], how='inner')
        .drop_duplicates(['user_id', 'user_answers_count'])
        .withColumn('timestamp', F.lit(-1))
    )
    user_characteristics.repartition(2).write.mode('overwrite').saveAsTable('nsurkov.users_characteristics' + tag)

CPU times: user 124 ms, sys: 26 ms, total: 150 ms
Wall time: 7min 2s


In [9]:
%%time
users_chars_full = (
    full_train_df
    .drop('content_id', 'row_id', 'task_container_id', 'user_answer', 'answered_correctly', 
          'prior_question_elapsed_time', 'prior_question_had_explanation', 'part', 'tags', 
          'question_elapsed_time_mean', 'question_had_explanation_mean', 'question_answers_count', 
          'question_correct_answers_count', 'question_user_var', 'question_correct_answers_mean')
)
# Формирование характеристик пользователя полных
user_last_answers= users_chars_full.groupby('user_id').agg(F.max('user_answers_count').alias('user_answers_count')).drop_duplicates()
user_characteristics = (
    users_chars_full
    .join(user_last_answers, on=['user_id', 'user_answers_count'], how='inner')
    .drop_duplicates(['user_id', 'user_answers_count'])
    .withColumn('timestamp', F.lit(-1))
)
user_characteristics.repartition(2).write.mode('overwrite').saveAsTable('nsurkov.users_characteristics')

CPU times: user 42.3 ms, sys: 18.8 ms, total: 61.2 ms
Wall time: 3min


# Собираем пандосовские датафреймы

In [2]:
import pickle
import gc

In [11]:
%%time
# Загрузка характеристик пользователей
tables = ['users_characteristics', 
          'users_characteristics_last_user_answers',
          'users_characteristics_first_user_answers']
for table in tables:
    df = spark.table('nsurkov.' + table).toPandas()
    df['attempts_count'] = df['attempts_count'].astype(int)
    df.drop(columns='attempts', inplace=True)
    with open(table + '.pickle', 'wb') as fo:
        pickle.dump(df, fo)
    del df

CPU times: user 11.9 s, sys: 4.3 s, total: 16.2 s
Wall time: 47.8 s


In [12]:
gc.collect()

38

In [18]:
%%time
# Загрузка валидационных выборок
tables = ['valid_data_last_user_answers', 'valid_data_first_user_answers']
for table in tables:
    df = spark.table('nsurkov.' + table).toPandas()
    with open(table + '.pickle', 'wb') as fo:
        pickle.dump(df, fo)
    del df

CPU times: user 1.78 s, sys: 64.1 ms, total: 1.85 s
Wall time: 4.66 s


In [19]:
gc.collect()

58

In [13]:
%%time
questions_chars = spark.table('nsurkov.questions_chars').toPandas()
with open('questions_chars.pickle', 'wb') as fo:
    pickle.dump(questions_chars, fo)
questions_chars.head()

CPU times: user 66.4 ms, sys: 7.14 ms, total: 73.6 ms
Wall time: 3.5 s


Unnamed: 0,content_id,bundle_id,part,question_elapsed_time_mean,question_had_explanation_mean,question_answers_count,question_correct_answers_count,question_user_var,question_correct_answers_mean
0,241,241,2,19889.152341,0.988935,10179,7949,0.886728,0.780922
1,449,449,2,19988.888889,0.972222,180,116,0.894444,0.644444
2,2443,2443,3,21074.275613,0.969959,7071,6001,0.882619,0.848678
3,2488,2488,3,27337.510215,0.973514,9770,8422,0.857114,0.862027
4,2721,2719,4,22766.415996,0.961126,4485,3102,0.87068,0.691639


In [21]:
# spark.table('nsurkov.train_data_for_model').select(F.max('row_id')).show()
# 101230331

+-----------+
|max(row_id)|
+-----------+
|  101230331|
+-----------+



In [3]:
%%time
train_data = (
    spark.table('nsurkov.train_data_for_model')
    .filter(F.col('row_id') > (101230331 - 10000000))
).toPandas()
with open('train_data_part1.pickle', 'wb') as fo:
    pickle.dump(train_data, fo)
train_data.head()

CPU times: user 2min 11s, sys: 41.5 s, total: 2min 52s
Wall time: 3min 57s


Unnamed: 0,content_id,user_id,bundle_id,timestamp,row_id,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation,part,user_answers_count,...,question_had_explanation_mean,question_answers_count,question_user_var,question_correct_answers_mean,user_accuracy,user_attempts_rate,user_bundle_content_count_sum,user_elapsed_time_mean,user_explanation_rate,user_delay_mean
0,10054,1993563954,10054,159322343,93934511,1,23250.0,1,6,352,...,0.95801,5567,0.887911,0.835279,0.696023,1.028409,1002,13260.699601,0.339321,2238.258118
1,5318,2039378502,5318,203261,96138592,1,57000.0,0,5,7,...,0.289229,16417,0.870256,0.586465,0.285714,1.0,6,33833.333333,0.0,9082.906733
2,6249,1944375310,6249,11557190375,91596890,1,25000.0,1,5,241,...,0.989171,16782,0.851686,0.663747,0.780083,1.008299,324,15694.438272,0.648148,-1755.011544
3,8759,1944375310,8759,1976726592,91596802,1,41000.0,1,5,154,...,0.988738,4809,0.912872,0.583073,0.733766,1.0,201,16597.004975,0.61194,-879.472801
4,3852,2091928018,3852,23426800745,98587928,1,22000.0,1,5,1414,...,0.991797,51908,0.854743,0.654157,0.49505,1.212871,1563,31308.37428,0.882278,11233.050716


In [3]:
%%time
train_data = (
    spark.table('nsurkov.train_data_for_model')
    .filter(
        (F.col('row_id') > (101230331 - 20000000))
        & (F.col('row_id') <= (101230331 - 10000000))
    )
).toPandas()
with open('train_data_part2.pickle', 'wb') as fo:
    pickle.dump(train_data, fo)
train_data.head()

CPU times: user 2min 10s, sys: 42.6 s, total: 2min 52s
Wall time: 4min 24s


Unnamed: 0,content_id,user_id,bundle_id,timestamp,row_id,answered_correctly,prior_question_elapsed_time,prior_question_had_explanation,part,user_answers_count,...,question_had_explanation_mean,question_answers_count,question_user_var,question_correct_answers_mean,user_accuracy,user_attempts_rate,user_bundle_content_count_sum,user_elapsed_time_mean,user_explanation_rate,user_delay_mean
0,3728,1912870039,3728,213893,90053157,0,43000.0,0,5,6,...,0.846117,14525,0.828296,0.479312,0.5,1.0,5,37600.0,0.0,10446.629258
1,6266,1772757000,6266,8033921177,83198063,0,27333.0,1,5,2349,...,0.960705,26727,0.825906,0.514274,0.646658,1.094508,6358,12227.880623,0.358131,2092.111272
2,2791,1738178997,2791,2400806127,81686476,0,12000.0,1,4,989,...,0.96122,4597,0.908636,0.778334,0.535895,1.352882,1860,10831.694086,0.51828,-3116.645613
3,6788,1762007735,6785,173677404,82739142,0,35750.0,1,6,62,...,0.961276,18231,0.851571,0.731227,0.532258,1.0,142,11168.985915,0.197183,374.863934
4,1062,1857437153,1062,1802691269,87302040,1,51400.0,1,2,94,...,0.989659,14002,0.832524,0.748679,0.648936,1.0,161,13484.428571,0.391304,-1303.229357


# Черновик

In [2]:
import pandas as pd
import pickle

In [22]:
with open('valid_data_table.pickle', 'rb') as fo:
    valid_data_table = pickle.load(fo)
with open('user_last_answers_valid.pickle', 'rb') as fo:
    user_last_answers_valid = pickle.load(fo)

In [23]:
valid_data_table[['user_id', 'timestamp']].drop_duplicates().merge(user_last_answers_valid[['user_id', 'timestamp']].drop_duplicates(), on=['user_id', 'timestamp'], how='inner')

Unnamed: 0,user_id,timestamp


In [None]:
last_answers_df.limit(10).show()

In [20]:
user_last_answers_times.dtypes

[('user_id', 'int'), ('bundle_id', 'string'), ('timestamp', 'bigint')]

In [34]:
import numpy as np

In [11]:
table = spark.createDataFrame(pd.DataFrame({
    'user_id': ['1'] * 5,
    'timestamp': [1, 2, 3, 4, 5],
    'content': [' 1,', ' 2,', ' 3,', ' 4,', ' 2,'],
    'content_int': [i for i in range(5)]
}))
table.show()

+-------+---------+-------+-----------+
|user_id|timestamp|content|content_int|
+-------+---------+-------+-----------+
|      1|        1|     1,|          0|
|      1|        2|     2,|          1|
|      1|        3|     3,|          2|
|      1|        4|     4,|          3|
|      1|        5|     2,|          4|
+-------+---------+-------+-----------+



In [16]:
table.withColumn('content_str', F.concat(F.col('content_int').cast('string'), F.lit(','))).show()

+-------+---------+-------+-----------+-----------+
|user_id|timestamp|content|content_int|content_str|
+-------+---------+-------+-----------+-----------+
|      1|        1|     1,|          0|         0,|
|      1|        2|     2,|          1|         1,|
|      1|        3|     3,|          2|         2,|
|      1|        4|     4,|          3|         3,|
|      1|        5|     2,|          4|         4,|
+-------+---------+-------+-----------+-----------+



In [7]:
win_spec = Window.partitionBy('user_id').orderBy('timestamp')
table_cumsum = (
    table.withColumn('cumsum', F.collect_list('content').over(win_spec))
    .withColumn('cumsum', F.array_join("cumsum", ''))
)
table_cumsum.show()

+-------+---------+-------+---------------+
|user_id|timestamp|content|         cumsum|
+-------+---------+-------+---------------+
|      1|        1|     1,|             1,|
|      1|        2|     2,|          1, 2,|
|      1|        3|     3,|       1, 2, 3,|
|      1|        4|     4,|    1, 2, 3, 4,|
|      1|        5|     2,| 1, 2, 3, 4, 2,|
+-------+---------+-------+---------------+



In [10]:
import re
attemts_udf = F.udf(lambda x: len(re.findall(x[0], x[1])))
table_cumsum.withColumn('attempts', attemts_udf(F.struct('content', 'cumsum'))).show()

+-------+---------+-------+---------------+--------+
|user_id|timestamp|content|         cumsum|attempts|
+-------+---------+-------+---------------+--------+
|      1|        1|     1,|             1,|       1|
|      1|        2|     2,|          1, 2,|       1|
|      1|        3|     3,|       1, 2, 3,|       1|
|      1|        4|     4,|    1, 2, 3, 4,|       1|
|      1|        5|     2,| 1, 2, 3, 4, 2,|       2|
+-------+---------+-------+---------------+--------+



In [2]:
df = spark.sql('SHOW TABLES IN nsurkov').toPandas()

In [3]:
for table in df.tableName:
    print(table)

In [13]:
for table in df.tableName:
    sql_q = 'DROP TABLE nsurkov.{}'.format(table)
    spark.sql(sql_q)