# Criteo 1 TiB benchmark

In this experiment we will evalutate a number of machine learning tools on a varying size of train data to determine how fast they learn, how much memory they consume etc.

We will assess Vowpal Wabbit and XGBoost in local mode, and Spark.ML models in cluster mode.

We will use terabyte click logs released by Criteo and sample needed amount of data from them.

This instance of experiment notebook focuses on data preparation and training VW & XGBoost locally.

Let's go!

# Table of contents

* [Configuration](#Configuration)
* [Data preparation](#Data-preparation)
  * [Criteo → LibSVM](#Criteo-→-LibSVM)
  * [LibSVM → Train and test (sampling)](#LibSVM-→-Train-and-test-(sampling%29)
  * [LibSVM train and test → VW train and test](#LibSVM-train-and-test-→-VW-train-and-test)
  * [Local data](#Local-data)
* [Local training](#Local-training)

In [None]:
%load_ext autotime
%matplotlib inline

from __future__ import print_function

## Configuration
[_(back to toc)_](#Table-of-contents)

Paths:

In [None]:
criteo_data_remote_path = 'criteo/plain'
libsvm_data_remote_path = 'criteo/libsvm'
vw_data_remote_path = 'criteo/vw'

local_data_path = 'criteo/data'
local_results_path = 'criteo/results'
local_runtime_path = 'criteo/runtime'

In [None]:
import os


criteo_day_template = os.path.join(criteo_data_remote_path, 'day_{}')
libsvm_day_template = os.path.join(libsvm_data_remote_path, 'day_{}')
vw_day_template = os.path.join(vw_data_remote_path, 'day_{}')

libsvm_train_template = os.path.join(libsvm_data_remote_path, 'train', '{}')
libsvm_test_template = os.path.join(libsvm_data_remote_path, 'test', '{}')
vw_train_template = os.path.join(vw_data_remote_path, 'train', '{}')
vw_test_template = os.path.join(vw_data_remote_path, 'test', '{}')

local_libsvm_test_template = os.path.join(local_data_path, 'data.test.{}.libsvm')
local_libsvm_train_template = os.path.join(local_data_path, 'data.train.{}.libsvm')
local_vw_test_template = os.path.join(local_data_path, 'data.test.{}.vw')
local_vw_train_template = os.path.join(local_data_path, 'data.train.{}.vw')

In [None]:
def ensure_directory_exists(path):
    if not os.path.exists(path):
        os.makedirs(path)

Days to work on:

In [None]:
days = list(range(0, 23 + 1))

Samples to take:

In [None]:
train_samples = [
    10000, 30000,  # tens of thousands
    100000, 300000,  # hundreds of thousands
    1000000, 3000000,  # millions
    10000000, 30000000,  # tens of millions
    100000000, 300000000,  # hundreds of millions
    1000000000, 3000000000,  # billions
]
test_samples = [1000000]

Spark configuration and initialization:

In [None]:
total_cores = 256

In [None]:
executor_cores = 4
executor_instances = total_cores / executor_cores
memory_per_core = 4

In [None]:
app_name = 'Criteo experiment'

master = 'yarn'

settings = {
    'spark.network.timeout': '600',
    
    'spark.driver.cores': '16',
    'spark.driver.maxResultSize': '16G',
    'spark.driver.memory': '32G',
    
    'spark.executor.cores': str(executor_cores),
    'spark.executor.instances': str(executor_instances),
    'spark.executor.memory': str(memory_per_core * executor_cores) + 'G',
    
    'spark.speculation': 'true',
    'spark.yarn.queue': 'root.HungerGames',
}

In [None]:
from pyspark.sql import SparkSession


builder = SparkSession.builder

builder.appName(app_name)
builder.master(master)
for k, v in settings.items():
    builder.config(k, v)

spark = builder.getOrCreate()
sc = spark.sparkContext

sc.setLogLevel('ERROR')

Logging:

In [None]:
import sys
import logging
reload(logging)


handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('[%(asctime)s] %(message)s')
handler.setFormatter(formatter)

ensure_directory_exists(local_runtime_path)
file_handler = logging.FileHandler(filename=os.path.join(local_runtime_path, 'mylog.log'), mode='a')
file_handler.setFormatter(formatter)

logger = logging.getLogger()
logger.addHandler(handler)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)

In [None]:
logger.info('Spark version: %s.', spark.version)

## Data preparation
[_(back to toc)_](#Table-of-contents)

Poor man's HDFS API:

In [None]:
def hdfs_exists(path):
    l = !hadoop fs -ls $path 2>/dev/null
    return len(l) != 0

def hdfs_success(path):
    return hdfs_exists(os.path.join(path, '_SUCCESS'))

def hdfs_delete(path, recurse=False):
    if recurse:
        _ = !hadoop fs -rm -r $path
    else:
        _ = !hadoop fs -rm $path

def hdfs_get(remote_path, local_path):
    remote_path_glob = os.path.join(remote_path, 'part-*')
    _ = !hadoop fs -cat $remote_path_glob >$local_path

Load RDDs from one place and save them to another converted:

In [None]:
def convert_chunked_data(input_path_template, output_path_template, chunks, load_rdd, convert_row, transform_rdd=None):
    for chunk in chunks:
        input_path = input_path_template.format(chunk)
        output_path = output_path_template.format(chunk)

        if hdfs_success(output_path):
            logger.info('Chunk "%s" is already converted and saved to "%s", skipping.', chunk, output_path)
            continue

        logger.info('Reading chunk "%s" data from "%s".', chunk, input_path)
        rdd = load_rdd(input_path)

        if hdfs_exists(output_path):
            logger.info('Cleaning "%s".', output_path)
            hdfs_delete(output_path, recurse=True)

        logger.info('Processing and saving to "%s".', output_path)
        rdd = rdd.map(convert_row)
        
        if transform_rdd is not None:
            rdd = transform_rdd(rdd)
        
        rdd.saveAsTextFile(output_path)

        logger.info('Done with chunk "%s".', chunk)

### Criteo → LibSVM
[_(back to toc)_](#Table-of-contents)

Criteo RDD is actually a DataFrame:

In [None]:
def load_criteo_rdd(path):
    return (
        spark
        .read
        .option('header', 'false')
        .option('inferSchema', 'true')
        .option('delimiter', '\t')
        .csv(path)
        .rdd
    )

Simply add an index to each existing column except the first one which is a target:

In [None]:
def criteo_to_libsvm(row):
    return (
        str(row[0])
        + ' '
        + ' '.join(
            [
                # integer features
                str(i) + ':' + str(row[i])
                for i in range(1, 13 + 1)
                if row[i] is not None
            ] + [
                # string features converted from hex to int
                str(i) + ':' + str(int(row[i], 16))
                for i in range(14, 39 + 1)
                if row[i] is not None
            ]
        )
    )

Do it for all days:

In [None]:
convert_chunked_data(criteo_day_template, libsvm_day_template, days, load_criteo_rdd, criteo_to_libsvm)

### LibSVM → Train and test (sampling)
[_(back to toc)_](#Table-of-contents)

Let's name samples as their shortened "engineering" notation - e.g. 1e5 is 100k etc.:

In [None]:
def sample_name(sample):
    return str(sample)[::-1].replace('000', 'k')[::-1]

Load data, sample a bit more than needed and cut at exact desired number of lines by zipping with index and filtering upto required index:

In [None]:
oversample = 1.03
sampled_partitions = 256


def sample_and_save(input_path_template, output_path_template, days, samples):
    union = None
    union_count = None
    
    for sample in samples:
        name = sample_name(sample)
        output_path = output_path_template.format(name)
        
        if hdfs_success(output_path):
            logger.info('Sample "%s" is already written to "%s", skipping.', sample, output_path)
            continue
            
        logger.info('Preparing to write sample to "%s".', output_path)
        
        if union is None:
            rdds = map(lambda day: sc.textFile(input_path_template.format(day)), days)
            union = reduce(lambda left, right: left.union(right), rdds)

            union_count = union.count()
            logger.info('Total number of lines for days "%s" is "%s".', days, union_count)
            
        ratio = float(sample) / union_count
        
        sampled_union = (
            union
            .sample(False, min(1.0, oversample * ratio))
            .zipWithIndex()
            .filter(lambda z: z[1] < sample)
            .map(lambda z: z[0])
        )
        
        if hdfs_exists(output_path):
            logger.info('Cleaning "%s".', output_path)
            hdfs_delete(output_path, recurse=True)
            
        logger.info('Writing sample "%s" to "%s".', sample, output_path)
        sampled_union.coalesce(sampled_partitions).saveAsTextFile(output_path)
        
        logger.info('Saved "%s" lines to "%s".', sc.textFile(output_path).count(), output_path)

Sample all LibSVM data:

In [None]:
sample_and_save(libsvm_day_template, libsvm_test_template, days[-1:], test_samples)

In [None]:
sample_and_save(libsvm_day_template, libsvm_train_template, days[:-1], train_samples)

### LibSVM train and test → VW train and test
[_(back to toc)_](#Table-of-contents)

LibSVM RDD is a text file:

In [None]:
def load_libsvm_rdd(path):
    return sc.textFile(path)

Conversion is trivial - we only have to map target to {-1, 1} and convert categorical features to VW feature names as a whole:

In [None]:
def libsvm_to_vw(line):
    parts = line.split(' ')
    parts[0] = '1 |' if parts[0] == '1' else '-1 |'
    for i in range(1, len(parts)):
        index, _, value = parts[i].partition(':')
        if int(index) >= 14:
            parts[i] = index + '_' + value
    return ' '.join(parts)

Also, data for VW should be well shuffled:

In [None]:
import hashlib


def calculate_hash(something):
    m = hashlib.md5()
    m.update(str(something))
    return m.hexdigest()

def random_sort(rdd):
    return (
        rdd
        .zipWithIndex()
        .sortBy(lambda z: calculate_hash(z[1]))
        .map(lambda z: z[0])
    )

Convert all LibSVM samples:

In [None]:
convert_chunked_data(libsvm_test_template, vw_test_template, [sample_name(sample) for sample in test_samples], load_libsvm_rdd, libsvm_to_vw, transform_rdd=random_sort)

In [None]:
convert_chunked_data(libsvm_train_template, vw_train_template, [sample_name(sample) for sample in train_samples], load_libsvm_rdd, libsvm_to_vw, transform_rdd=random_sort)

Spark is no longer needed:

In [None]:
spark.stop()

### Local data
[_(back to toc)_](#Table-of-contents)

Download all sampled data to local directory:

In [None]:
ensure_directory_exists(local_data_path)

In [None]:
def count_lines(path):
    with open(path) as f:
        for i, _ in enumerate(f):
            pass
    return i + 1

def download_data(remote_template, local_template, samples):
    for sample in samples:
        name = sample_name(sample)
        remote_path = remote_template.format(name)
        local_path = local_template.format(name)
        if os.path.exists(local_path):
            count = count_lines(local_path)
            if count == sample:
                logger.info('File "%s" is already loaded, skipping.', local_path)
                continue
            else:
                logger.info('File "%s" already exists but number of lines "%s" is wrong (must be "%s"), reloading.', local_path, count, sample)
        logger.info('Loading file "%s" as local file "%s".', remote_path, local_path)
        hdfs_get(remote_path, local_path)
        count = count_lines(local_path)
        logger.info('File loaded to "%s", number of lines is "%s".', local_path, count)
        assert count == sample, 'File "{}" contains wrong number of lines "{}" (must be "{}").'.format(local_path, count, sample)

In [None]:
download_data(libsvm_test_template, local_libsvm_test_template, test_samples)

In [None]:
download_data(libsvm_train_template, local_libsvm_train_template, train_samples)

In [None]:
download_data(vw_test_template, local_vw_test_template, test_samples)

In [None]:
download_data(vw_train_template, local_vw_train_template, train_samples)

## Local training
[_(back to toc)_](#Table-of-contents)

Measuring model quality and ML engine technical metrics:

In [None]:
import sys 
from matplotlib import pyplot
from sklearn.metrics import (
    auc,
    log_loss,
    roc_curve,
)


def measure(engine, sample, test_file, time_file, predictions_file):
    
    def get_last_in_line(s):
        return s.rstrip().split( )[-1]

    def parse_elapsed_time(s):
        return reduce(lambda a, b: a * 60 + b, map(float, get_last_in_line(s).split(':')))

    def parse_max_memory(s):
        return int(get_last_in_line(s)) * 1024

    def parse_cpu(s):
        return float(get_last_in_line(s).rstrip('%')) / 100 


    elapsed = -1
    memory = -1
    cpu = -1

    with open(time_file, 'rb') as f:
        for line in f:
            if 'Elapsed (wall clock) time' in line:
                elapsed = parse_elapsed_time(line)
            elif 'Maximum resident set size' in line:
                memory = parse_max_memory(line)
            elif 'Percent of CPU' in line:
                cpu = parse_cpu(line)

    with open(test_file, 'rb') as f:
        labels = [line.rstrip().split(' ')[0] == '1' for line in f]

    with open(predictions_file, 'rb') as f:
        scores = [float(line.rstrip().split(' ')[0]) for line in f]

    fpr, tpr, _ = roc_curve(labels, scores)
    roc_auc = auc(fpr, tpr)
    ll = log_loss(labels, scores)
    
    figure = pyplot.figure(figsize=(6, 6))
    pyplot.plot(fpr, tpr, linewidth=2.0)
    pyplot.plot([0, 1], [0, 1], 'k--')
    pyplot.xlabel('FPR')
    pyplot.ylabel('TPR')
    pyplot.title('{} {} - {:.3f} ROC AUC'.format(engine, sample_name(sample), roc_auc))
    pyplot.show()

    return {
        'Engine': engine,
        'Train size': sample,
        'ROC AUC': roc_auc,
        'Log loss': ll,
        'Train time': elapsed,
        'Maximum memory': memory,
        'CPU load': cpu,
    }

Settings for VW & XGBoost and how to run them; I use (a little bit patched for correctness sake) GNU Time to measure running time, CPU load and memory consumption; configurations for VW & XGBoost are obtained via Hyperopt:

In [None]:
def get_time_command_and_file(train_file):
    time_file = train_file + '.time'
    return [
        '/usr/local/bin/time',
        '-v',
        '--output=' + time_file,
    ], time_file

def get_vw_commands_and_predictions_file(train_file, test_file):
    model_file = train_file + '.model'
    predictions_file = test_file + '.predictions'
    return [
        'vw83',
        '--link=logistic',
        '--loss_function=logistic',
        '-b', '29',
        '-l', '0.3',
        '--initial_t', '1',
        '--decay_learning_rate', '0.5',
        '--power_t', '0.5',
        '--l1', '1e-15',
        '--l2', '0',
        '-d', train_file,
        '-f', model_file,
    ], [
        'vw83',
        '--loss_function=logistic',
        '-t',
        '-i', model_file,
        '-d', test_file,
        '-p', predictions_file,
    ], predictions_file


xgboost_conf = [
    'booster = gbtree',
    'objective = binary:logistic',
    'nthread = 24',
    'eval_metric = logloss',
    'max_depth = 7',
    'num_round = 200',
    'eta = 0.2',
    'gamma = 0.4',
    'subsample = 0.8',
    'colsample_bytree = 0.8',
    'min_child_weight = 20',
    'alpha = 3',
    'lambda = 100',
]


def get_xgboost_commands_and_predictions_file(train_file, test_file, cache=False):
    config_file = os.path.join(local_runtime_path, 'xgb.conf')
    ensure_directory_exists(local_runtime_path)
    with open(config_file, 'wb') as f:
        for line in xgboost_conf:
            print(line, file=f)
    model_file = train_file + '.model'
    predictions_file = test_file + '.predictions'
    if cache:
        train_file = train_file + '#' + train_file + '.cache'
    return [
        'xgboost',
        config_file,
        'data=' + train_file,
        'model_out=' + model_file,
    ], [
        'xgboost',
        config_file,
        'task=pred',
        'test:data=' + test_file,
        'model_in=' + model_file,
        'name_pred=' + predictions_file,
    ], predictions_file

def get_xgboost_ooc_commands_and_predictions_file(train_file, test_file):
    return get_xgboost_commands_and_predictions_file(train_file, test_file, cache=True)

In [None]:
engines = {
    'vw': (get_vw_commands_and_predictions_file, local_vw_train_template, local_vw_test_template),
    'xgb': (get_xgboost_commands_and_predictions_file, local_libsvm_train_template, local_libsvm_test_template),
    'xgb.ooc': (get_xgboost_ooc_commands_and_predictions_file, local_libsvm_train_template, local_libsvm_test_template),
}

Train & test everything:

In [None]:
import subprocess


measurements = []

for sample in train_samples:
    for engine in engines:
        logger.info('Training "%s" on "%s" lines of data.', engine, sample)
        
        get_commands_and_predictions_file, train_template, test_template = engines[engine]

        train_file = train_template.format(sample_name(sample))
        test_file = test_template.format(sample_name(test_samples[0]))
        logger.info('Will train on "%s" and test on "%s".', train_file, test_file)

        command_time, time_file = get_time_command_and_file(train_file)
        command_engine_train, command_engine_test, predictions_file = get_commands_and_predictions_file(train_file, test_file)

        logger.info('Performing train.')
        subprocess.call(command_time + command_engine_train)

        logger.info('Performing test.')
        subprocess.call(command_engine_test)

        logger.info('Measuring results.')
        measurement = measure(engine, sample, test_file, time_file, predictions_file)
        logger.info(measurement)
        measurements.append(measurement)

Load measurements:

In [None]:
import pandas


measurements_df = pandas.DataFrame(measurements).sort_values(by=['Engine', 'Train size'])
measurements_df

Plot measurements:

In [None]:
def extract_data_for_plotting(df, what):
    return reduce(
        lambda left, right: pandas.merge(left, right, how='outer', on='Train size'),
        map(
            lambda name: df[df.Engine == name][['Train size', what]].rename(columns={what: name}),
            df.Engine.unique(),
        ),
    )   

def plot_stuff(df, what, ylabel=None, **kwargs):
    data = extract_data_for_plotting(df, what).set_index('Train size')
    ax = data.plot(marker='o', figsize=(6, 6), title=what, grid=True, linewidth=2.0, **kwargs)  # xlim=(1e4, 4e9)
    if ylabel is not None:
        ax.set_ylabel(ylabel)


plot_stuff(measurements_df, 'ROC AUC', logx=True)
plot_stuff(measurements_df, 'Log loss', logx=True)
plot_stuff(measurements_df, 'Train time', loglog=True, ylabel='s')
plot_stuff(measurements_df, 'Maximum memory', loglog=True, ylabel='bytes')
plot_stuff(measurements_df, 'CPU load', logx=True)