# Combination of the gradinet boosting and hyperopt notebooks

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
!pip3.5 install xgboost hyperopt



In [3]:
%matplotlib inline
%config InlineBackend.figure_format ='retina'

import os
import sys
import glob
import pickle
import time
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from typing import Dict

import pyspark
import pyspark.sql.functions as F
from pyspark import keyword_only
from pyspark.sql.types import *
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK

os.environ['PYSPARK_SUBMIT_ARGS'] = """
--jars xgboost4j-spark-0.72.jar,xgboost4j-0.72.jar
--py-files sparkxgb.zip pyspark-shell
""".replace('\n', ' ')

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("spark_sql_examples") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

sys.path.append('../gradient_boosting/notebooks')

from sparkxgb.xgboost import *

from utils.metrics import rocauc, logloss, ne, calibration, df_with_proba_column
from utils.processing import split_by_col

## Metrics

In [4]:
def get_ate(groups, control_name) -> pd.DataFrame:
    """Get Average Treatment Effect
    groups - dictionary where keys - names of models, values - dicts of pairs <metric_name>, <metric_value>
    control_name - name of baseline model
    
    return pd.DataFrame (rows corresponds to metrics, cols corresponds to models and ATE with respect to control)
    """
    
    metric_names = []
    for metric_name_values in groups.values():
        for metric_name, _ in metric_name_values.items():
            if metric_name not in metric_names:
                metric_names.append(metric_name)
    metric_names = list(sorted(metric_names))
    
    if control_name not in groups:
        raise ValueError("Control experiment is not in the group.")
    control_values = groups[control_name]
    if len(control_values) != len(metric_names):
        raise ValueError("Control experiment does not have all the metrics computed.")

    model_names = list(sorted(groups.keys()))
    metric_model_ates = []
    for metric_name in metric_names:
        control_value = control_values[metric_name]
        model_ates = []
        for model_name in model_names:
            if metric_name in groups[model_name]:
                ate = (groups[model_name][metric_name] - control_value) / control_value * 100
            else:
                ate = None
            model_ates.append(ate)
        metric_model_ates.append(model_ates)

    ates_df = pd.DataFrame(data=metric_model_ates, index=metric_names, columns=model_names)
    return ates_df

In [5]:
ROC_AUC_HANDLE = 'Area Under ROC'
CALIBRATION_HANDLE = 'Calibration Δ'

## Data

In [6]:
DATA_PATH = '/workspace/data/criteo/dac'
TRAIN_PATH = os.path.join(DATA_PATH, 'train.csv')

In [7]:
SAMPLE_RATE = 0.4
SEED = 82

In [8]:
df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + TRAIN_PATH) \
    .sample(False, SAMPLE_RATE, seed=SEED)

In [9]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (

In [10]:
NUM_FEATURES_TO_USE = 13
CATEGORICAL_FEATURES_TO_USE = 26
MEAN_TARGET_ENCODER_MIN_EXAMPLES = 50

num_indexes = range(NUM_FEATURES_TO_USE)
num_columns = ['_c{}'.format(i) for i in range(1, 14)]
num_columns = [num_columns[num_index] for num_index in num_indexes]

cat_indexes = range(CATEGORICAL_FEATURES_TO_USE)
cat_columns = ['_c{}'.format(i) for i in range(14, 40)]
cat_columns = [cat_columns[cat_index] for cat_index in cat_indexes]

num_columns, cat_columns

(['_c1',
  '_c2',
  '_c3',
  '_c4',
  '_c5',
  '_c6',
  '_c7',
  '_c8',
  '_c9',
  '_c10',
  '_c11',
  '_c12',
  '_c13'],
 ['_c14',
  '_c15',
  '_c16',
  '_c17',
  '_c18',
  '_c19',
  '_c20',
  '_c21',
  '_c22',
  '_c23',
  '_c24',
  '_c25',
  '_c26',
  '_c27',
  '_c28',
  '_c29',
  '_c30',
  '_c31',
  '_c32',
  '_c33',
  '_c34',
  '_c35',
  '_c36',
  '_c37',
  '_c38',
  '_c39'])

In [11]:
df = df.fillna(0, subset=num_columns)

In [12]:
train_df, val_df, test_df = split_by_col(df, 'id', [0.8, 0.1, 0.1])

In [13]:
class MeanTargetEncoderModel(
    pyspark.ml.Model,
    HasInputCol,
    HasOutputCol,
    pyspark.ml.util.DefaultParamsWritable,
    pyspark.ml.util.DefaultParamsReadable):
    """Fitted Model"""
    
    def __init__(self):        
        super(MeanTargetEncoderModel, self).__init__()
        self.inputOutputMapping = Param(self, "inputOutputMapping", "inputOutputMapping")
        self._setDefault(inputOutputMapping={})

    @keyword_only
    def setParams(self, inputCol, outputCol, inputOutputMapping):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setInputOutputMapping(self, value):
        return self._set(inputOutputMapping=value)

    def getInputOutputMapping(self):
        return self.getOrDefault(self.inputOutputMapping)
        
    def transform(self, dataset: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
        inputOutputMapping = self.getInputOutputMapping()
        total_sum = sum(map(lambda sum_cnt: sum_cnt[0], inputOutputMapping.values()))
        total_cnt = sum(map(lambda sum_cnt: sum_cnt[1], inputOutputMapping.values()))
        mean_default_value = float(total_sum / total_cnt)
        def map_column_using_mapping(input_value):
            sum_cnt = inputOutputMapping.get(input_value, (0, 0))
            if sum_cnt[1] < MEAN_TARGET_ENCODER_MIN_EXAMPLES:
                return mean_default_value
            else:
                return float(sum_cnt[0] / sum_cnt[1])
    
        map_column_using_mapping_udf = F.udf(map_column_using_mapping, FloatType())

        return dataset \
            .withColumn(self.getOutputCol(), map_column_using_mapping_udf(self.getInputCol()))

In [14]:
class MeanTargetEncoder(pyspark.ml.Estimator):
    """Estimator."""

    def __init__(self, inputCol: str, featuresCol: str, outputCol: str):
        self.inputCol = inputCol
        self.featuresCol = featuresCol
        self.outputCol = outputCol
    
    def fit(self, dataset: pyspark.sql.DataFrame) -> MeanTargetEncoderModel:
        inputOutputMapping = dataset \
            .groupby(self.inputCol) \
            .agg(F.sum(self.featuresCol).alias(self.outputCol + "_sum"), 
                 F.count(self.featuresCol).alias(self.outputCol + "_count")) \
            .select(
                F.col(self.inputCol), F.col(self.outputCol + "_sum"), F.col(self.outputCol + "_count")) \
            .rdd \
            .map(lambda row: (row[0], (row[1], row[2]))) \
            .collectAsMap()
        mte_model = MeanTargetEncoderModel()
        mte_model.setParams(
            inputCol=self.inputCol, outputCol=self.outputCol, inputOutputMapping=inputOutputMapping)
        return mte_model

In [15]:
cat_enc_columns = [cat_col + '_enc' for cat_col in cat_columns]

cat_col, cat_enc_col = cat_columns[0], cat_enc_columns[0]
mean_target_encoder = MeanTargetEncoder(cat_col, '_c0', cat_enc_col)
mean_target_encoder_model = mean_target_encoder.fit(train_df)
mean_target_encoder_model.transform(df).take(1)

[Row(_c0=1, _c1=0, _c2=277, _c3=0, _c4=3, _c5=7318, _c6=24, _c7=6, _c8=3, _c9=98, _c10=0, _c11=1, _c12=0, _c13=3, _c14='8cf07265', _c15='9adf4cf9', _c16='2e76fb61', _c17='0b1ad9da', _c18='4cf72387', _c19='fe6b92e5', _c20='75dcaaca', _c21='0b153874', _c22='a73ee510', _c23='3b08e48b', _c24='8aabdae8', _c25='9886a0a7', _c26='edcf17ce', _c27='07d13a8f', _c28='2aaebd23', _c29='338c0d09', _c30='e5ba7672', _c31='c7dbecd5', _c32=None, _c33=None, _c34='60d2d691', _c35=None, _c36='3a171ecb', _c37='90b6276f', _c38=None, _c39=None, id=30, _c14_enc=0.2543465197086334)]

#### Pipeline

In [16]:
# from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler

# cat_index_columns = [col_name + "_index" for col_name in cat_columns]
# cat_vec_columns = [col_name + "_vec" for col_name in cat_columns]

# cat_indexers = [
#     StringIndexer(inputCol=cat_column, outputCol=cat_index_column, handleInvalid='keep')
#     for cat_column, cat_index_column in zip(cat_columns, cat_index_columns)]
# ohe_estimator = OneHotEncoderEstimator(inputCols=cat_index_columns, outputCols=cat_vec_columns)

# assembler = VectorAssembler(inputCols=num_columns + cat_vec_columns, outputCol="features")

# pipeline = Pipeline(stages=cat_indexers + [ohe_estimator, assembler])

mean_target_encoders = [
    MeanTargetEncoder(cat_col, '_c0', cat_enc_col) 
    for cat_col, cat_enc_col in zip(cat_columns, cat_enc_columns)]

assembler = \
    VectorAssembler(inputCols=num_columns + cat_enc_columns, outputCol="features") \
    .setHandleInvalid("keep")

pipeline = Pipeline(stages=mean_target_encoders + [assembler])

In [17]:
MTE_PIPELINE_MODEL_PATH = os.path.join(DATA_PATH, 'ctr_pipeline_model')

pipeline_model = pipeline.fit(train_df)
pipeline_model.write().overwrite().save(MTE_PIPELINE_MODEL_PATH)

In [18]:
pipeline_model = PipelineModel.load(MTE_PIPELINE_MODEL_PATH)
pipeline_model.stages

[MeanTargetEncoderModel_43109fb33246,
 MeanTargetEncoderModel_22d505e4fbaa,
 MeanTargetEncoderModel_5beb25c2f649,
 MeanTargetEncoderModel_66cfb8b2eb53,
 MeanTargetEncoderModel_fe74fac2940a,
 MeanTargetEncoderModel_3f04f4e963f5,
 MeanTargetEncoderModel_5dc3f36923fc,
 MeanTargetEncoderModel_c7f740e2261b,
 MeanTargetEncoderModel_0c8288cf738e,
 MeanTargetEncoderModel_5a55a9bdc966,
 MeanTargetEncoderModel_bc7c443f409f,
 MeanTargetEncoderModel_f32dbc83ead2,
 MeanTargetEncoderModel_14acab1e0d8d,
 MeanTargetEncoderModel_06712ee47896,
 MeanTargetEncoderModel_906e618e0203,
 MeanTargetEncoderModel_ae33dd97b624,
 MeanTargetEncoderModel_6d08991d7a7e,
 MeanTargetEncoderModel_2558b2fac327,
 MeanTargetEncoderModel_6b3f6ad5d1fb,
 MeanTargetEncoderModel_6e2542552212,
 MeanTargetEncoderModel_65422be95150,
 MeanTargetEncoderModel_ca42f23b1b53,
 MeanTargetEncoderModel_92e8361c8722,
 MeanTargetEncoderModel_2c3b89e95933,
 MeanTargetEncoderModel_7ac311e2b809,
 MeanTargetEncoderModel_915fe3374972,
 VectorAssem

In [19]:
train_df = pipeline_model \
    .transform(train_df) \
    .select(F.col('_c0').alias('label'), 'features', 'id') \
    .cache()
val_df = pipeline_model \
    .transform(val_df) \
    .select(F.col('_c0').alias('label'), 'features', 'id') \
    .cache()
test_df = pipeline_model \
    .transform(test_df) \
    .select(F.col('_c0').alias('label'), 'features', 'id') \
    .cache()

# Train and evaluate

### Negative subsampling and recalibration

In [20]:
train_df.count()

1171867

In [21]:
positive_train_df = train_df.filter(F.col('label') == 1.0)
positive_train_df.count()

298142

In [22]:
negative_train_df = train_df.filter(F.col('label') == 0.0)
negative_train_df.count()

873725

In [26]:
static_params = {
    'featuresCol': "features", 
    'labelCol': "label", 
    'predictionCol': "prediction",
    'eval_metric': 'logloss',
    'objective': 'binary:logistic',
    'nthread': 1,
    'silent': 0,
    'nworkers': 1
}

baseline_params = {
    'colsample_bytree': 0.9,
    'eta': 0.15,
    'gamma': 0.9,
    'max_depth': 6,
    'min_child_weight': 50.0,
    'subsample': 0.9,
    'num_round': 20
}

space = {**static_params, **baseline_params}

# Results from several runs of hyperopt. The tuning procedure is the same as in hyperopt.ipynb notebook.

space['num_round'] = 100
space['eta'] = 0.3

space['max_depth'] = 7
space['min_child_weight'] = 50.0

space['gamma'] = 0.9

In [27]:
def find_the_best_log_xgboost_model(train_df, val_df, negative_downsampling_rate):
    time_start = time.time()

    param1 = 'gamma'
#     param2 = 'min_child_weight'
    param1_choice  = [0.5, 0.8, 0.9, 0.95, 0.99]
#     param2_choice = [10.0, 25.0, 50.0, 75.0, 100.0]
    assert space[param1] in param1_choice
#     assert space[param2] in param2_choice

    space[param1] = hp.choice(param1, param1_choice)
#     space[param2] = hp.choice(param2, param2_choice)
    
    def xgboost_objective(space):
        estimator = XGBoostEstimator(**space)
        success = False
        attempts = 0
        model = None
        while not success and attempts < 2:
            try:
                model = estimator.fit(train_df)
                success = True
            except Exception as e:
                attempts += 1
                print(e)
                print('Try again')

        roc_auc = rocauc(
            model, val_df, probabilities_col='probabilities', negative_downsampling_rate=negative_downsampling_rate)
        calibr = calibration(
            model, val_df, probabilities_col='probabilities', negative_downsampling_rate=negative_downsampling_rate)

        print('LOSS: {}, ROC-AUC: {}, CALIBRATION: {}'.format(-roc_auc, roc_auc, calibr))
        return {'loss': -roc_auc, 'rocauc': roc_auc, 'calibration': calibr, 'status': STATUS_OK}

    trials = Trials()
    best_opt = fmin(
        fn=xgboost_objective, space=space, algo=tpe.suggest, 
        max_evals=len(param1_choice), trials=trials)
    space[param1] = param1_choice[best_opt[param1]]
#     space[param2] = param2_choice[best_opt[param2]]
    
    estimator = XGBoostEstimator(**space)
    print('Tuned params are', estimator._input_kwargs_processed())
    model = estimator.fit(train_df)

    time_finish = time.time()
    print('Tuning time is ' + str(time_finish - time_start) + 's')
    return model

In [28]:
best_model, best_rocauc, best_downsampling_rate = None, None, None

all_metrics = {}
search_space = [1.0]
for index, negative_downsampling_rate in enumerate(search_space, 1):
    print('*' * 100)
    print('MODEL\'s NEGATIVE DOWNSAMPLING RATE = ' + str(negative_downsampling_rate))
    rebalanced_train_df = negative_train_df \
        .sample(False, negative_downsampling_rate, seed=SEED) \
        .union(positive_train_df)
    cur_model = find_the_best_log_xgboost_model(
        rebalanced_train_df,
        val_df,
        negative_downsampling_rate)

    metrics = {}
    metrics[ROC_AUC_HANDLE] = rocauc(
        cur_model, test_df, probabilities_col='probabilities', negative_downsampling_rate=negative_downsampling_rate)
    metrics[CALIBRATION_HANDLE] = calibration(
        cur_model, test_df, probabilities_col='probabilities', negative_downsampling_rate=negative_downsampling_rate)    
    model_handle = index = str(index) + '.xgb_mte_' + str(negative_downsampling_rate)
    all_metrics[model_handle] = metrics
    print('Test metrics are', metrics)

    cur_rocauc = metrics[ROC_AUC_HANDLE]
    if best_rocauc is None or best_rocauc < cur_rocauc:
        best_model = cur_model
        best_rocauc = cur_rocauc
        best_downsampling_rate = negative_downsampling_rate

****************************************************************************************************
MODEL's NEGATIVE DOWNSAMPLING RATE = 1.0
LOSS: -0.7833087804512839, ROC-AUC: 0.7833087804512839, CALIBRATION: 0.43642035177855254
LOSS: -0.7831396129778134, ROC-AUC: 0.7831396129778134, CALIBRATION: 0.43665626884059866
LOSS: -0.7834431445233924, ROC-AUC: 0.7834431445233924, CALIBRATION: 0.4367349078612808
LOSS: -0.7835186930781098, ROC-AUC: 0.7835186930781098, CALIBRATION: 0.43856981834386227
LOSS: -0.7834431445233965, ROC-AUC: 0.7834431445233965, CALIBRATION: 0.4367349078612808
100%|██████████| 5/5 [38:50<00:00, 466.16s/trial, best loss: -0.7835186930781098]
Tuned params are {'eta': 0.3, 'subsample': 0.9, 'eval_metric': 'logloss', 'nworkers': 1, 'num_round': 100, 'nthread': 1, 'min_child_weight': 50.0, 'gamma': 0.9, 'colsample_bytree': 0.9, 'silent': 0, 'max_depth': 7, 'objective': 'binary:logistic', 'predictionCol': 'prediction', 'labelCol': 'label', 'featuresCol': 'features'}
Tuning 

In [29]:
print('Search space:', search_space)
best_rocauc, best_downsampling_rate

Search space: [1.0]


(0.7808397201250101, 1.0)

In [30]:
MODEL_DIR = '/workspace/models/criteo/dac'

BEST_CTR_MODEL_NAME = 'ctr_model_' + str(best_downsampling_rate)
BEST_CTR_MODEL_PATH = os.path.join(MODEL_DIR, BEST_CTR_MODEL_NAME)

best_model._call_java("booster").saveModel(BEST_CTR_MODEL_PATH)

In [31]:
BASELINE_HANDLE = '1.xgb_mte_1.0'
get_ate(all_metrics, BASELINE_HANDLE)

Unnamed: 0,1.xgb_mte_1.0
Area Under ROC,0.0
Calibration Δ,0.0


# Inference

In [32]:
TEST_DATA_PATH = '/workspace/data/mlbd-20-ctr-prediction-1'
TEST_TEST_PATH = os.path.join(TEST_DATA_PATH, 'test.csv')

comp_df = sqlContext.read.format("com.databricks.spark.csv") \
    .option("delimiter", ",") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('file:///' + TEST_TEST_PATH) \
    .fillna(0, subset=num_columns)

comp_df.count()

917961

In [33]:
comp_df_parts = split_by_col(comp_df, 'id', [0.25, 0.25, 0.25, 0.25])
for comp_df_part in comp_df_parts:
    print(comp_df_part.count())

229490
229490
229490
229491


In [34]:
submission_path = os.path.join(TEST_DATA_PATH, 'submission_' + BEST_CTR_MODEL_NAME + '.csv')
print(submission_path)

with open(submission_path, 'w') as writer:
    writer.write("id,proba\n")

/workspace/data/mlbd-20-ctr-prediction-1/submission_ctr_model_1.0.csv


In [35]:
for comp_df_part in comp_df_parts:

    comp_df_part = pipeline_model \
        .transform(comp_df_part) \
        .select('features', 'id') \
        .cache()

    comp_df_part = df_with_proba_column(
        best_model, comp_df_part, probabilities_col='probabilities',
        negative_downsampling_rate=best_downsampling_rate)

    print(comp_df_part.take(1))

    comp_part_predictions = comp_df_part \
        .rdd \
        .map(lambda row: (row.id, row.proba)) \
        .collect()

    with open(submission_path, 'a') as writer:
        for row in comp_part_predictions:
            writer.write(",".join(map(str, row))+"\n")

[Row(features=DenseVector([0.0, 19.0, 2.0, 4.0, 4576.0, 6.0, 6.0, 5.0, 15.0, 0.0, 2.0, 0.0, 4.0, 0.2542, 0.2578, 0.2544, 0.2517, 0.255, 0.33, 0.3165, 0.2544, 0.269, 0.3247, 0.3163, 0.2544, 0.3163, 0.2177, 0.3027, 0.2544, 0.3066, 0.3145, 0.2544, 0.2544, 0.2544, 0.2544, 0.2686, 0.2477, 0.2544, 0.2544]), id=566935904713, probabilities=DenseVector([0.5678, 0.4322]), prediction=0.0, proba=0.4322131872177124)]
[Row(features=DenseVector([0.0, 20.0, 4.0, 6.0, 25653.0, 0.0, 0.0, 32.0, 6.0, 0.0, 0.0, 0.0, 6.0, 0.2534, 0.0815, 0.1303, 0.1582, 0.2474, 0.2469, 0.291, 0.2543, 0.269, 0.2159, 0.2671, 0.1303, 0.2671, 0.2177, 0.0682, 0.1303, 0.3066, 0.1005, 0.2544, 0.2544, 0.1303, 0.2544, 0.2418, 0.2021, 0.2544, 0.2544]), id=601295737752, probabilities=DenseVector([0.9743, 0.0257]), prediction=0.0, proba=0.02572101354598999)]
[Row(features=DenseVector([0.0, 246.0, 2.0, 1.0, 0.0, 0.0, 0.0, 5.0, 28.0, 0.0, 0.0, 0.0, 1.0, 0.2635, 0.3072, 0.1611, 0.1613, 0.2543, 0.2469, 0.1104, 0.2543, 0.1278, 0.2159, 0.110