In [2]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("Local demo")\
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")\
    .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc2-4-64268600-SNAPSHOT")\
    .getOrCreate()

In [3]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

from mmlspark.vw import VowpalWabbitContextualBandit, VowpalWabbitFeaturizer, VectorZipper

from pyspark.ml.tuning import ParamGridBuilder
from pyspark.sql.types import IntegerType
from pyspark.ml import Pipeline

In [25]:
def get_train_data_featurized():
    pandas_df = pd.read_csv('cbdata.csv').rename(columns={"cost": "label"})
    df = spark.createDataFrame(pandas_df)
    df = df.withColumn("chosenAction", df["chosenAction"].cast(IntegerType()))
    return featurize_data(df)

def get_test_data_featurized():
    pandas_df = pd.read_csv('cbdata_test.csv')
    df = spark.createDataFrame(pandas_df)
    return featurize_data(df)

def featurize_data(df):    
    shared_featurizer = VowpalWabbitFeaturizer(inputCols=[
        'sharedID', "sharedMajor", "sharedHobby", "sharedFavCharacter"]
                                               , outputCol='shared')
    action_featurizers = [
        VowpalWabbitFeaturizer(inputCols=['action1Topic'],
                               outputCol='action1Features'),
        VowpalWabbitFeaturizer(inputCols=['action2Topic'],
                               outputCol='action2Features'),
        VowpalWabbitFeaturizer(inputCols=['action3Topic'],
                               outputCol='action3Features'),
        VowpalWabbitFeaturizer(inputCols=['action4Topic'],
                               outputCol='action4Features'),
        VowpalWabbitFeaturizer(inputCols=['action5Topic'],
                               outputCol='action5Features')
    ]

    action_merger = VectorZipper(inputCols=[
        'action1Features', 'action2Features', 'action3Features',
        'action4Features', 'action5Features'
    ],
                                 outputCol='features')

    pipeline = Pipeline(stages=[
        shared_featurizer, action_featurizers[0], *action_featurizers, action_merger
    ])
    tranformation_pipeline = pipeline.fit(df)
    return tranformation_pipeline.transform(df)


# Dataset

In [39]:
pd.read_csv('cbdata.csv')

Unnamed: 0,chosenAction,cost,probability,sharedID,sharedMajor,sharedHobby,sharedFavCharacter,action1Topic,action2Topic,action3Topic,action4Topic,action5Topic
0,1,0.0,0.20,mk,psychology,kids,7of9,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
1,4,0.0,0.20,mk,psychology,kids,7of9,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
2,3,0.0,0.20,mk,psychology,kids,7of9,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
3,3,-1.0,0.20,rnc,engineering,hiking,spock,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
4,5,0.0,0.20,mk,psychology,kids,7of9,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
...,...,...,...,...,...,...,...,...,...,...,...,...
2471,2,-1.0,0.84,mk,psychology,kids,7of9,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
2472,2,0.0,0.84,mk,psychology,kids,7of9,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
2473,2,-1.0,0.84,mk,psychology,kids,7of9,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning
2474,5,0.0,0.84,rnc,engineering,hiking,spock,SkiConditions-VT,HerbGarden,BeyBlades,NYCLiving,MachineLearning


# Train

In [44]:
featurized_data = get_train_data_featurized()#.coalesce(1)
estimator2 = VowpalWabbitContextualBandit() \
    .setEpsilon(0.2)\
    .setUseBarrierExecutionMode(False)
model = estimator2.fit(featurized_data)
display(model.getPerformanceStatistics().toPandas())

Unnamed: 0,partitionId,arguments,learningRate,powerT,hashSeed,numBits,numberOfExamplesPerPass,weightedExampleSum,weightedLabelSum,averageLoss,...,timeTotalNs,timeNativeIngestNs,timeLearnNs,timeMultipassNs,ipsEstimate,snipsEstimate,timeMarshalPercentage,timeLearnPercentage,timeMultipassPercentage,timeSparkReadPercentage
0,0,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 0 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.154785,...,46228104,0,0,34100397,-0.154785,-0.109248,0.0,0.0,0.737655,0.262345
1,1,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 1 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.23509,...,52218235,0,0,34978211,-0.23509,-0.187293,0.0,0.0,0.669847,0.330153
2,2,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 2 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.074018,...,47768400,0,0,37266728,-0.074018,-0.05829,0.0,0.0,0.780154,0.219846
3,3,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 3 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.090846,...,44889416,0,0,32287440,-0.090846,-0.104023,0.0,0.0,0.719266,0.280734
4,4,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 4 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.318077,...,40395338,0,0,29864482,-0.318077,-0.302418,0.0,0.0,0.739305,0.260695
5,5,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 5 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.034674,...,50779851,0,0,41001062,-0.034674,-0.026776,0.0,0.0,0.807428,0.192572
6,6,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 6 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.110264,...,45319031,0,0,32310383,-0.110264,-0.122875,0.0,0.0,0.712954,0.287046
7,7,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 7 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.254277,...,43475607,0,0,32713057,-0.254277,-0.166314,0.0,0.0,0.752446,0.247554
8,8,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 8 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.089228,...,52315757,0,0,34909242,-0.089228,-0.118478,0.0,0.0,0.66728,0.33272
9,9,--cb_adf --cb_explore_adf --cb_type mtr --csoaa_ldf multiline --csoaa_rank --epsilon 0.200000002980232 --holdout_off --no_stdin --node 9 --span_server 192.168.1.99 --span_server_port 44787 --total 12 --unique_id 319059478,0.5,0.5,0,18,206,206.0,0.0,-0.239482,...,46046493,0,0,34497710,-0.239482,-0.184014,0.0,0.0,0.749193,0.250807


# Predict

In [None]:
featurized_test_data = get_test_data_featurized()
model.transform(featurized_test_data).drop("features", "shared", "action1Features", "action2Features", "action3Features", "action4Features", "action5Features").toPandas()

# Hyperparameter Sweep

In [None]:
# The dataframe is coalesced into 1 partition because for CFE we want 
# to learn on the whole dataset serially rather than distributed.
featurized_data = get_train_data_featurized().coalesce(1)
estimator1 = VowpalWabbitContextualBandit() \
    .setEpsilon(0.2)\
    .setParallelism(12) \
    .setUseBarrierExecutionMode(False)

paramGrid = []
paramGrid.extend(ParamGridBuilder() \
    .addGrid(estimator1.interactions, [[], ["::"]])\
    .addGrid(estimator1.learningRate, [0.1, 0.05, 0.005]) \
    .addGrid(estimator1.powerT, [0.0, 0.1]) \
    .build())

models = estimator1.parallelFit(featurized_data, paramGrid)
result = pd.concat([model.getPerformanceStatistics().toPandas() for model in models])
result[["arguments", "learningRate", "powerT", "ipsEstimate", "snipsEstimate"]].sort_values(by=['ipsEstimate'])