In [1]:
# Used to read the Parquet data
import pyarrow.parquet as parquet
# Used to train the baseline model
from sklearn.linear_model import LogisticRegression

# Used to calculate metrics
from sklearn.metrics import roc_auc_score

# Used to perform aggregations
import pandas

# Where the downloaded data are
input_path = './'
# Where to store results
output_path = './'

# Чтение данных

In [2]:
%%time
# Read a single day to train model on as Pandas dataframe
data = parquet.ParquetDataset(
    # Path to the dataset
    input_path + '/collabTrain/', 
    # Dates to read
    filters = [('date','=','2018-02-07')])\
    .read(
    # Columns to read
    columns = [
        'instanceId_userId',
        'feedback',
        'auditweights_svd_prelaunch', 
        'auditweights_ctr_high', 
        'auditweights_ctr_gender', 
        'auditweights_friendLikes'  
    ]).to_pandas()

CPU times: user 776 ms, sys: 124 ms, total: 900 ms
Wall time: 783 ms


  labels, = index.labels


Здесь мы используем немного другой АПИ, использующий дополнительные фишки паркета:
* Читаем партиционированный датасет и фильтруем по указанной партиции.
* Вычитываем только те колонки, которые планируем использовать - хранение в колоночном формате при этом существенно уменьшает объем, поднимаемый с диска (тогда как в рядных форматах, например, CSV, объем чтения с диска сократить не получилось бы).

In [3]:
data.head(10)

Unnamed: 0,instanceId_userId,feedback,auditweights_svd_prelaunch,auditweights_ctr_high,auditweights_ctr_gender,auditweights_friendLikes,date
0,189,[Ignored],0.626212,0.018481,0.000168,,2018-02-07
1,384,[Ignored],0.670187,0.026324,0.00211,,2018-02-07
2,384,[Ignored],0.706036,0.190227,0.037928,,2018-02-07
3,384,[Ignored],0.828584,0.092678,0.012441,,2018-02-07
4,384,[Ignored],0.869253,0.07803,0.014418,,2018-02-07
5,1017,[Liked],0.657293,0.078131,0.011765,,2018-02-07
6,1371,"[Clicked, Liked]",0.60073,0.10302,0.017116,,2018-02-07
7,1464,[Ignored],0.830225,0.176874,0.006687,1.0,2018-02-07
8,1917,[Ignored],0.677664,0.044811,0.002307,,2018-02-07
9,2076,[Disliked],0.35157,0.091754,0.015341,1.0,2018-02-07


In [4]:
%%time
# Construct the label (liked objects)
y = data['feedback'].apply(lambda x: 1.0 if("Liked" in x) else 0.0).values

# Extract the most interesting features
X = data[[
        'auditweights_svd_prelaunch', 
        'auditweights_ctr_high', 
        'auditweights_ctr_gender', 
        'auditweights_friendLikes']].fillna(0.0).values

CPU times: user 2.13 s, sys: 22.4 ms, total: 2.15 s
Wall time: 2.16 s


In [5]:
%%time
# Fit the model and check the weights
model = LogisticRegression(random_state=0, solver='lbfgs').fit(X, y)
model.coef_

CPU times: user 2.38 s, sys: 623 ms, total: 3 s
Wall time: 868 ms


Глядя на то, что обучение модели заняло в 3-4 раза меньше времени, чем вычисление меток начинаем что-то подозревать.

# Валидация

In [6]:
%%time
# Pick one week for the test period
test_dates = [[('date', '=', '2018-02-' + x)] for x in ['08','09','10','11','12','13','14']]

# Read the test data for those days, only required columns
test = parquet.ParquetDataset(input_path + '/collabTrain/', filters = test_dates)\
    .read(columns = [
    'instanceId_userId',
    'feedback',
    'auditweights_svd_prelaunch', 
    'auditweights_ctr_high', 
    'auditweights_ctr_gender', 
    'auditweights_friendLikes'  
    ]).to_pandas()

CPU times: user 2.42 s, sys: 468 ms, total: 2.89 s
Wall time: 1.91 s


  labels, = index.labels


Для теста мы возьмем неделю, непосредственно следующую за днем, на котором мы учились.

In [7]:
%%time
# Compute inverted predictions (to sort by later)
test["score"] = model.predict_proba(test[[
        'auditweights_svd_prelaunch', 
        'auditweights_ctr_high', 
        'auditweights_ctr_gender', 
        'auditweights_friendLikes']].fillna(0.0).values)[:, 1]

CPU times: user 330 ms, sys: 161 ms, total: 490 ms
Wall time: 235 ms


In [8]:
%%time
# Extract labels and project
test["label"] = test['feedback'].apply(lambda x: 1.0 if("Liked" in x) else 0.0)
test = test[["instanceId_userId", "score", "label"]]

CPU times: user 12.8 s, sys: 281 ms, total: 13.1 s
Wall time: 13.1 s


Обращает на себя внимание очень длинное время работы параграфа, вызыванное в первую очередь большим количеством записей и выросшими в связи с этим накладными расходами интепретатора.

In [9]:
test.head(10)

Unnamed: 0,instanceId_userId,score,label
0,252,0.128678,0.0
1,384,0.181584,0.0
2,384,0.232244,0.0
3,384,0.101478,0.0
4,384,0.358482,1.0
5,855,0.141115,0.0
6,1044,0.088706,0.0
7,1371,0.138298,0.0
8,1584,0.152127,0.0
9,2319,0.127875,0.0


# Неправильный вариант валидации

In [10]:
%%time
roc_auc_score(test.label, test.score)

CPU times: user 916 ms, sys: 179 ms, total: 1.1 s
Wall time: 1.1 s


0.7345059766307932

Сразу видно радикальное несоответствие результатам с сайта и это не оверфитинг. В данном варианте валидации есть существенная проблема - все объекты ранжируются единым списком, тогда как на практике (и в условиях конкурса) ранжирование делается индивидуально для пользователя. При смешении пользователь необоснованное преимущество получает та модель, которая тянеть наверх активнях юзеров, оставлющих много классов.

# Правильный вариант валидации

In [11]:
def auc(labels, scores):
    # This is important! AUC can be computed only when both positive and negative examples are
    # available
    if len(labels) > sum(labels) > 0:
        return roc_auc_score(labels, scores)

    return float('NaN')

In [12]:
%%time
test.groupby("instanceId_userId")\
    .apply(lambda y: auc(y.label.values, y.score.values))\
    .dropna().mean()

CPU times: user 3min 21s, sys: 920 ms, total: 3min 22s
Wall time: 3min 22s


0.6663475856675231

Ну а здесь прямо удручающее время работы, и опять из-за большого оверхеда на интерпретатор.

# Ускоренная грубой силой валидация

In [13]:
%%time
# Divide and conquer - split dataset into chunks by user ID
max_user = max(test.instanceId_userId)
batch_size = 1000000
batches = [test[test.instanceId_userId.between(x, x + batch_size)] for x in range(0,max_user,batch_size)]

CPU times: user 366 ms, sys: 44.5 ms, total: 411 ms
Wall time: 409 ms


In [14]:
# Define a routine for AUC calculation
def partitioned_auc(data):
    return data.groupby("instanceId_userId")\
        .apply(lambda y: auc(y.label.values, y.score.values))\
        .dropna()

In [15]:
from multiprocessing import Pool, cpu_count

In [16]:
%%time
# Start processing of the batches in several threads
with Pool(int(cpu_count() / 2)) as p:
    ret_list = p.map(
        partitioned_auc, 
        batches)

pandas.concat(ret_list).mean()

CPU times: user 94 ms, sys: 123 ms, total: 217 ms
Wall time: 57.3 s


Важный момент - этот подход не делает код более эффективным, он просто сжигает больше электричества активнее используя процессор. Кроме того не стоит жадничать и выделять все имеющиеся ядра - как правила в современных системах программы видят больше логических ядер чем реально есть в системе, при этом загрузка их всех единообразным процессом создаст неэффективную давку.

# Заключительная ремарка

Приведенный код является упрощенной версией валидатора, запущенного на сайте и не содержит дополнительных защит от попыток манипуляций (например, отправки дубликатов или сокрытия части объектов). Если вы обнаружите дополнительные способы "хакерской" накрутки метрики, пожалуйста, сообщите организаторам.