In [1]:
import numpy as np

In [2]:
# error display
def error_display(linucb_set, true_set):
    return "different. linucb setting : {0}, true settings : {1}".format(linucb_set, true_set)

In [3]:
# 既存のクラスにインスタンスメソッドを追加する関数
def add_instance_method(Class, method):
    aetattr(Class, method.__name__, method)

In [123]:
# LogisticTSアルゴリズムのクラス
class LogisticTS:
    # コンストラクタ
    # ハイパーパラメータなどを設定する
    # num_theta_max_iter theta の探索を行う反復回数の上限
    # epsilon theta の探索を行う際の収束許容誤差
    # interval_update_theta thetaの更新を行う間隔. この回数だけ反復したら theta, hessian を更新する
    def __init__(self, sigma_0=0.1, num_theta_max_iter=1000, epsilon=0.01, interval_update_theta=20):
        self.sigma_0 = sigma_0
        self.num_theta_max_iter = num_theta_max_iter
        self.epsilon = epsilon
        self.interval_update_theta = interval_update_theta
        
    # 初期化関数
    # 入力：
    # n_arms     引ける腕の数
    # n_features 文脈の次元数
    # sigma      誤差項の分散
    def initialize(self, n_arms, n_features, sigma):
        self.n_arms = n_arms
        self.n_features = n_features
        self.sigma  = sigma
        self.theta  = np.zeros(n_features)
        
        # 現在の反復回数
        self.iter_num = 0
        
        # 引いた腕のcontextリスト
        self.lst_context_history = []
        # 引いた腕による報酬のリスト
        self.lst_reward_history = []
        
        # 乱数生成に必要なヘシアンの逆行列
        self.H_inv = self.calc_hessian_inv(self.theta)
        
        return
        
    # 負の対数事後確率の勾配 G の計算
    # 計算式は『バンディット問題の理論とアルゴリズム』参照
    def calc_gradient(self, theta):
        # 初期値
        ret = []
        ret.append(theta / self.sigma_0)
        
        # 以下、引いた腕によって変わる
        for context in self.lst_context_history:
            exp_theta_T_context = np.exp(theta.dot(context))
            ret.append((exp_theta_T_context*context)/(1 + exp_theta_T_context))
            
        for idx, reward in enumerate(self.lst_reward_history):
            if reward == 1:
                ret.append(self.lst_context_history[idx] * -1)
                
        return np.array(ret).sum(axis=0)
    
    # 負の対数事後確率のヘシアン H の逆行列の計算
    # 計算式は『バンディット問題の理論とアルゴリズム』参照
    def calc_hessian_inv(self, theta):
        ret = []
        ret.append(np.identity(self.n_features)/self.sigma_0)

        for context in self.lst_context_history:
            exp_theta_T_context = np.exp(theta.dot(context))
            ret.append((exp_theta_T_context*(np.matrix(context).T)*context)/(1+exp_theta_T_context)**2)

        return np.linalg.inv(np.array(ret).sum(axis=0))
        
    # theta の係数格納
    def set_theta(self, theta):
        self.theta = theta
        return
        
    # 多変量正規分布からパラメータを作成
    def get_theta_by_normalDistribution(self):
        theta_tild = np.random.multivariate_normal(
            self.theta,
            self.H_inv
        )
        
        return theta_tild
    
    # 腕を選択
    def select_arm(self, context):
        np_context = np.array(context)
        theta_tild = self.get_theta_by_normalDistribution()
        
        score = context.dot(theta_tild)
        return np.argmax(score)
        
    # 得られた報酬のリストに追加
    def add_reward(self, reward):
        self.lst_reward_history.append(reward)
        return
        
    # 選択した腕のcontextをリストに追加
    def add_context(self, context):
        self.lst_context_history.append(context)
        return
    
    # 観測した報酬からtheta更新
    def update_theta(self):
        # 設定した更新間隔に達していなければ、更新しない
        if self.iter_num % self.interval_update_theta != 0:
            return
        
        # thetaが収束するか規定回数を反復するまで計算する
        theta_hat = self.theta
        for i in range(self.num_theta_max_iter):
            theta_before = theta_hat
            G = self.calc_gradient(theta_hat)
            H_inv = self.calc_hessian_inv(theta_hat)
            
            theta_hat = theta_hat - H_inv.dot(G)
            
            # 収束したら終了
            ## 収束しなかったらthetaを更新して再計算
            if np.linalg.norm(theta_hat - theta_before) < self.epsilon:
                break
        # 収束したら更新
        self.set_theta(theta_hat)
        
        return 

In [124]:
# コンストラクタのテスト
def test_LogisticTS_init():
    sigma_0 = 0.2
    num_theta_max_iter=100
    epsilon=0.1
    interval_update_theta=10
    
    model = LogisticTS(sigma_0, num_theta_max_iter, epsilon, interval_update_theta)
    
    # your settings
    assert model.sigma_0 == sigma_0, "sigma_0 " + error_display(model.sigma_0, sigma_0)
    assert model.num_theta_max_iter == num_theta_max_iter, "num_theta_max_iter " + error_display(model.num_theta_max_iter, num_theta_max_iter)
    assert model.epsilon == epsilon, "epsilon " + error_display(model.epsilon, epsilon)
    assert model.interval_update_theta == interval_update_theta, "interval_update_theta " + error_display(model.interval_update_theta, interval_update_theta)
    
    return

In [125]:
test_LogisticTS_init()

In [58]:
# 初期化のテスト
def test_LogisticTS_initialize():
    lu = LogisticTS()
    n_arms = 3
    n_features = 4
    sigma = 0.1
    
    lu.initialize(n_arms, n_features, sigma)
    
    # your settings
    assert lu.n_arms == n_arms, "n_arms " + error_display(lu.n_arms, n_arms)
    assert lu.n_features == n_features, "n_features " + error_display(lu.n_features, n_features)
    assert lu.sigma == sigma, "sigma " + error_display(lu.sigma, sigma)
    
    # initialize
    assert (lu.theta == np.zeros(n_features)).all(), "theta different"
    assert (lu.H_inv == np.linalg.inv(np.identity(lu.n_features)/lu.sigma_0)).all(), "H different"
    
    # check size
    assert lu.theta.shape[0] == n_features, "theta size " + error_display(lu.theta.shape[0], n_features)
    
    # theta はベクトルのはずなので2次元目が存在したらエラー
    try:
        lu.theta.shape[1]
    except:
        print("OK")
    else:
        assert False, "LinUCB.theta has column!"
        
    
    # すべてのテスト完了
    print("Conglatulations!")
    return

In [59]:
test_LogisticTS_initialize()

OK
Conglatulations!


In [60]:
model = LogisticTS()
n_arms = 3
n_features = 2
sigma = 0.1
model.initialize(n_arms, n_features, sigma)
model.H_inv

array([[0.1, 0. ],
       [0. , 0.1]])

In [61]:
# get_theta_by_normalDistribution のテスト
def test_LogisticTS_get_theta_by_normalDistribution():
    # parameter settings
    lu = LogisticTS()
    n_arms = 3
    n_features = 2
    sigma = 0.1
    lu.initialize(n_arms, n_features, sigma)
    
    theta_tild = lu.get_theta_by_normalDistribution()
    
    assert theta_tild.shape[0] == n_features, "n_features " + error_display(len(ucb_score), n_arms)
    
    print(theta_tild)
    
    # すべてのテスト完了
    print("Conglatulations!")
    return

In [62]:
test_LogisticTS_get_theta_by_normalDistribution()

[-0.03843923  0.41232332]
Conglatulations!


In [84]:
# 腕の選択に関するテスト
def test_LogisticTS_select_arm():
    # parameter settings
    lu = LogisticTS()
    n_arms = 3
    n_features = 2
    sigma = 0.1
    lu.initialize(n_arms, n_features, sigma)
    
    theta_tild = lu.get_theta_by_normalDistribution()
    
    context = np.array([[1,2], [4,5], [7,8]])
    selected_arm = lu.select_arm(context)
    
    print(selected_arm)
    
#     assert selected_arm == np.argmax(ucb_score), "selected arm " + error_display(selected_arm, np.argmax(ucb_score))

In [85]:
test_LogisticTS_select_arm()

2


In [104]:
# add_reward に関するテスト
def test_LogisticTS_add_reward():
    # parameter settings
    lu = LogisticTS()
    n_arms = 3
    n_features = 2
    sigma = 0.1
    lu.initialize(n_arms, n_features, sigma)
    
    theta_tild = lu.get_theta_by_normalDistribution()
    
    context = np.array([[1,2], [4,5], [7,8]])
    selected_arm = lu.select_arm(context)
    
    reward_list = [1,2,3]
    lu.add_reward(reward_list[selected_arm])
    
    print(selected_arm)
    assert lu.lst_reward_history[0] == reward_list[selected_arm], "reward_list " + error_display(lu.lst_reward_history[0], reward_list[selected_arm])
    assert len(lu.lst_reward_history) == 1, "reward_list is too long. The length is {0}".format(len(lu.lst_reward_history))

In [105]:
test_LogisticTS_add_reward()

0


In [112]:
# add_context に関するテスト
def test_LogisticTS_add_context():
    # parameter settings
    lu = LogisticTS()
    n_arms = 3
    n_features = 2
    sigma = 0.1
    lu.initialize(n_arms, n_features, sigma)
    
    theta_tild = lu.get_theta_by_normalDistribution()
    
    context = np.array([[1,2], [4,5], [7,8]])
    selected_arm = lu.select_arm(context)
    
    lu.add_context(context[selected_arm])
    context_history = lu.lst_context_history
    
    print(selected_arm)
    assert (context_history[0] == context[selected_arm]).all(), "context_history " + error_display(lu.lst_reward_history[0], reward_list[selected_arm])
    assert len(context_history) == 1, "context_history is too long. The length is {0}".format(len(context_history))

In [113]:
test_LogisticTS_add_context()

2


In [100]:
# update_theta に関するテスト
def test_LogisticTS_update_theta():
    # parameter settings
    lu = LogisticTS()
    n_arms = 3
    n_features = 2
    sigma = 0.1
    lu.initialize(n_arms, n_features, sigma)
    
    theta_tild = lu.get_theta_by_normalDistribution()
    
    context = np.array([[1,2], [4,5], [7,8]])
    selected_arm = lu.select_arm(context)
    
    reward_list = [1,2,3]
    lu.add_reward(reward_list[selected_arm])
    lu.add_context(context[selected_arm])
    
    lu.update_theta()
    
    assert (lu.theta != np.zeros(n_features)).all(), "theta is not changed."
    print(lu.theta)

In [101]:
test_LogisticTS_update_theta()

[-0.10644978 -0.12165689]
