In [4]:
from typing import Optional, Tuple

import numpy as np
from obp.dataset import SyntheticBanditDataset
from obp.utils import sigmoid
from obp.types import BanditFeedback
import pandas as pd

In [2]:
def _get_pokemon_id(x: float) -> int:
    return (x * 10 ** 5).astype(int) % 151 + 1

In [5]:
def _reward_function(
    context: np.ndarray,
    action_context: np.ndarray,
    random_state: Optional[int] = None,
) -> np.ndarray:

    pokemon_ids = _get_pokemon_id(context.flatten())
    pokemon_zukan = PokemonZukan()

    capture_probabilities = []
    for pokemon_id in pokemon_ids:
        l = []
        for action in ACTIONS:
            ball_performance = action.performance
            pokemon_capture_dificulty = pokemon_zukan.get_capture_dificulty(pokemon_id)
            capture_probability = sigmoid(ball_performance - pokemon_capture_dificulty)
            l.append(capture_probability)
        capture_probabilities.append(l)
    capture_probabilities = np.array(capture_probabilities)
    return capture_probabilities

In [6]:
def _behavior_policy(
    context: np.ndarray,
    action_context: np.ndarray,
    random_state: Optional[int] = None,
) -> np.ndarray:
    pokemon_ids = _get_pokemon_id(context.flatten())
    policy = np.array([rule_based_policy(pokemon_id) for pokemon_id in pokemon_ids])
    return policy

In [7]:
# 現状のSyntheticBanditDatasetは整数値のcontextを生成できないので、暫定的に後処理でcontextを追加する
def _update_context(data: BanditFeedback) -> BanditFeedback:
    pokemon_ids = _get_pokemon_id(data['context'].flatten())
    pokemon_zukan = PokemonZukan()
    rewards = np.array([pokemon_zukan.get_reward(pokemon_id) for pokemon_id in pokemon_ids])
    capture_dificulties = np.array([pokemon_zukan.get_capture_dificulty(pokemon_id) for pokemon_id in pokemon_ids])

    new_context = np.concatenate([pokemon_ids[:, np.newaxis], capture_dificulties[:, np.newaxis], rewards[:, np.newaxis]], axis=1)
    data['context'] = new_context
    return data

In [8]:
# 現状、捕獲したかどうかのrewardになっているので、
# 「捕獲した場合は謝礼金をもらえて、捕獲しなかった場合は何ももらえない」「ボールのコストを差し引く」を考慮したrewardにする
def _update_reward(data: BanditFeedback) -> BanditFeedback:
    rewards = data['context'][:, 2]
    costs = np.array([ACTIONS[action_id].cost for action_id in data['action'].flatten()])
    data['binary_reward'] = data['reward']
    data['reward'] = rewards * data['reward'] - costs
    return data

In [9]:
def _update_expected_reward(data: BanditFeedback) -> BanditFeedback:
    rewards = data['context'][:, 2]
    costs = np.array([action.cost for action in ACTIONS])
    data['expected_reward'] = rewards[:, np.newaxis] * data['expected_reward'] - costs[np.newaxis, :]
    return data

In [10]:
def _post_process(data: BanditFeedback) -> BanditFeedback:
    data = _update_context(data)
    data = _update_reward(data)
    data = _update_expected_reward(data)
    return data

In [11]:
def synthesize_data() -> Tuple[BanditFeedback, BanditFeedback]:
    dataset = SyntheticBanditDataset(
        n_actions=len(ACTIONS),
        dim_context=1,  # pokemon_idの元になるfloat値を生成する
        reward_function=_reward_function,
        behavior_policy_function=_behavior_policy,
        random_state=615,
    )
    training_data = _post_process(dataset.obtain_batch_bandit_feedback(n_rounds=50000))
    validation_data = _post_process(dataset.obtain_batch_bandit_feedback(n_rounds=50000))
    test_data = _post_process(dataset.obtain_batch_bandit_feedback(n_rounds=1000))
    return training_data, validation_data, test_data

In [14]:
from typing import List
from dataclasses import dataclass
import math

In [15]:
@dataclass
class Action:
    label: str
    label_en: str
    performance: int
    cost: int
    color: str
    marker: str

In [16]:
ACTIONS: List[Action] = [
    Action(label='逃げる', label_en='RUN AWAY', performance=-math.inf, cost=0, color='black', marker='o'),
    Action(label='モンスターボールを投げる', label_en='THROW MONSTER-BALL', performance=50, cost=100, color='red', marker='s'),
    Action(label='スーパーボールを投げる', label_en='THROW SUPER-BALL', performance=100, cost=500, color='blue', marker='v'),
    Action(label='ハイパーボールを投げる', label_en='THROW HYPER-BALL', performance=200, cost=2000, color='orange', marker='^'),
    Action(label='マスターボールを投げる', label_en='THROW MASTER-BALL', performance=math.inf, cost=10000, color='violet', marker='x'),
]

In [21]:
# ポケモンの捕獲難度と謝礼金を教えてくれるポケモン図鑑クラス
class PokemonZukan:
    def __init__(self) -> None:
        self._data = pd.read_csv('pokemon.csv', index_col='id')

    def get_capture_dificulty(self, pokemon_id: int) -> int:
        return self._data.loc[pokemon_id]['capture_dificulty']

    def get_reward(self, pokemon_id: int) -> int:
        return self._data.loc[pokemon_id]['reward']

    def get_name(self, pokemon_id: int) -> str:
        return self._data.loc[pokemon_id]['name']

In [19]:
def rule_based_policy(pokemon_id: int) -> Tuple:
    pokemon_zukan = PokemonZukan()

    capture_dificulty = pokemon_zukan.get_capture_dificulty(pokemon_id)
    reward = pokemon_zukan.get_reward(pokemon_id)
    name = pokemon_zukan.get_name(pokemon_id)

    if reward >= 10000:
        return 0.1, 0.1, 0.1, 0.2, 0.5

    if reward >= 2000:
        return 0.05, 0.2, 0.3, 0.4, 0.05

    if reward >= 500:
        return 0.1, 0.3, 0.5, 0.05, 0.05

    if reward >= 100:
        return 0.4, 0.4, 0.1, 0.05, 0.05

    return 0.8, 0.05, 0.05, 0.05, 0.05

In [22]:
training_data, validation_data, test_data = synthesize_data()

In [24]:
training_data

{'n_rounds': 50000,
 'n_actions': 5,
 'context': array([[   46,    65,  1296],
        [   91,   195, 20736],
        [   29,    20,  1191],
        ...,
        [   37,    65,  1502],
        [   68,   210, 12155],
        [   40,   205,  5862]]),
 'action_context': array([[1, 0, 0, 0, 0],
        [0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 1, 0],
        [0, 0, 0, 0, 1]]),
 'action': array([4, 0, 1, ..., 3, 4, 3]),
 'position': None,
 'reward': array([-8704,     0,  1091, ...,  -498,  2155, -2000]),
 'expected_reward': array([[    0.        ,   -99.99960355,   796.        ,  -704.        ,
         -8704.        ],
        [    0.        ,  -100.        ,  -500.        , 18597.21704323,
         10736.        ],
        [    0.        ,  1091.        ,   691.        ,  -809.        ,
         -8809.        ],
        ...,
        [    0.        ,   -99.99954053,  1002.        ,  -498.        ,
         -8498.        ],
        [    0.        ,  -100.        ,  -500.  

In [25]:
validation_data

{'n_rounds': 50000,
 'n_actions': 5,
 'context': array([[  18,  210, 9900],
        [  83,  210, 2763],
        [  26,  180, 9509],
        ...,
        [  10,    0,  366],
        [  57,  180, 8582],
        [  92,   65, 2234]]),
 'action_context': array([[1, 0, 0, 0, 0],
        [0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 1, 0],
        [0, 0, 0, 0, 1]]),
 'action': array([4, 0, 1, ..., 3, 3, 3]),
 'position': None,
 'reward': array([ -100,     0,  -100, ..., -1634,  6582,   234]),
 'expected_reward': array([[    0.        ,  -100.        ,  -500.        , -1999.5505611 ,
          -100.        ],
        [    0.        ,  -100.        ,  -500.        , -1999.87456569,
         -7237.        ],
        [    0.        ,  -100.        ,  -500.        ,  7508.9999804 ,
          -491.        ],
        ...,
        [    0.        ,   266.        ,  -134.        , -1634.        ,
         -9634.        ],
        [    0.        ,  -100.        ,  -500.        ,  6581.9999

In [26]:
test_data

{'n_rounds': 1000,
 'n_actions': 5,
 'context': array([[   98,    30,  3164],
        [   82,   195,  9509],
        [   26,   180,  9509],
        ...,
        [   40,   205,  5862],
        [  129,     0,   458],
        [  146,   252, 23452]]),
 'action_context': array([[1, 0, 0, 0, 0],
        [0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 1, 0],
        [0, 0, 0, 0, 1]]),
 'action': array([4, 0, 1, 4, 3, 0, 3, 3, 2, 4, 0, 2, 2, 4, 2, 0, 0, 3, 0, 1, 4, 2,
        2, 2, 0, 4, 3, 4, 3, 1, 0, 3, 2, 2, 2, 3, 1, 2, 2, 4, 0, 1, 1, 4,
        1, 1, 0, 1, 4, 0, 1, 3, 3, 3, 3, 3, 3, 3, 2, 0, 2, 4, 3, 2, 2, 2,
        2, 2, 0, 2, 2, 4, 4, 3, 0, 1, 0, 1, 3, 1, 2, 2, 2, 2, 4, 1, 3, 2,
        1, 3, 0, 0, 4, 3, 3, 2, 4, 3, 3, 3, 3, 2, 1, 4, 2, 4, 2, 3, 3, 4,
        3, 3, 4, 1, 0, 3, 2, 3, 2, 4, 2, 4, 1, 0, 4, 4, 3, 0, 4, 2, 2, 4,
        1, 2, 3, 2, 3, 0, 2, 3, 4, 0, 2, 2, 3, 1, 4, 1, 1, 2, 1, 4, 0, 3,
        3, 4, 2, 3, 3, 4, 4, 0, 2, 2, 4, 1, 2, 1, 3, 3, 0, 2, 2, 2, 1, 3,
     

In [28]:
from abc import ABCMeta, abstractmethod
from typing import List

from obp.policy import IPWLearner, NNPolicyLearner
from obp.types import BanditFeedback
from obp.ope import DirectMethod, InverseProbabilityWeighting as IPS
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

In [29]:
def deterministic_rule_based_policy(pokemon_id: int) -> Tuple:
    pokemon_zukan = PokemonZukan()

    capture_dificulty = pokemon_zukan.get_capture_dificulty(pokemon_id)
    reward = pokemon_zukan.get_reward(pokemon_id)
    name = pokemon_zukan.get_name(pokemon_id)

    if reward >= 10000:
        return 0, 0, 0, 0, 1

    if reward >= 2000:
        return 0, 0, 0, 1, 0

    if reward >= 500:
        return 0, 0, 1, 0, 0

    if reward >= 100:
        return 0, 1, 0, 0, 0

    return 1, 0, 0, 0, 0

In [30]:
class BaseModel(metaclass=ABCMeta):
    @abstractmethod
    def fit(self, data: BanditFeedback) -> None:
        raise NotImplementedError

    @abstractmethod
    def predict(self, context: np.ndarray) -> List[int]:
        raise NotImplementedError

In [31]:
class IPWModel(BaseModel):
    def __init__(self) -> None:
        self._model = NNPolicyLearner(
            n_actions=len(ACTIONS),
            dim_context=2,
            off_policy_objective=IPS().estimate_policy_value_tensor,
            random_state=615
        )
        self._scaler = StandardScaler()

    def fit(self, data: BanditFeedback) -> None:
        context = data["context"][:, 1:]
        self._scaler.fit(context)
        scaled_context = self._scaler.transform(context)

        self._model.fit(
            context=scaled_context,
            action=data["action"],
            # reward=data["binary_reward"],
            reward=data["reward"],
            pscore=data["pscore"],
        )

    def predict(self, context: np.ndarray) -> np.ndarray:
        scaled_context = self._scaler.transform(context[:, 1:])
        return self._model.predict(context=scaled_context)

In [32]:
class RuleBasedModel(BaseModel):
    def __init__(self) -> None:
        pass

    def fit(self, data: BanditFeedback) -> None:
        # 学習フェーズはない
        pass

    def predict(self, context: np.ndarray) -> np.ndarray:
        pokemon_ids = context[:, 0]
        predictions = []
        for pokemon_id in pokemon_ids:
            probabilities = rule_based_policy(pokemon_id)
            prediction = np.random.multinomial(n=1, pvals=probabilities, size=1)[0]
            predictions.append(prediction)
        return  np.array(predictions)[:, :, np.newaxis]

In [33]:
class DeterministicRuleBasedModel(BaseModel):
    def __init__(self) -> None:
        pass

    def fit(self, data: BanditFeedback) -> None:
        # 学習フェーズはない
        pass

    def predict(self, context: np.ndarray) -> np.ndarray:
        pokemon_ids = context[:, 0]
        predictions = []
        for pokemon_id in pokemon_ids:
            prediction = deterministic_rule_based_policy(pokemon_id)
            predictions.append(np.array(prediction))
        return  np.array(predictions)[:, :, np.newaxis]

In [34]:
models = dict(IPW=IPWModel(), RULE_BASED=DeterministicRuleBasedModel())

AttributeError: 'InverseProbabilityWeighting' object has no attribute 'estimate_policy_value_tensor'