# LinBandit-BO リワード比較実験

このノートブックでは、LinBandit-BOアルゴリズムについて、2つの異なるリワード設計を比較します：

1. **元のリワード設計**: 予測誤差に基づくリワード計算
2. **勾配ベースリワード設計**: GPモデルの勾配の絶対値を使ったリワード計算

## 実験設定
- アーム数: 次元数の0.5倍（20次元なら10本）に固定
- coordinate_ratio = 0.8で固定
- テスト関数: Styblinski-Tang, Rastrigin, Ackley (100次元中先頭5次元が有効)
- 20回の独立実行
- 300回の評価
- 収束性能と方向選択の比較

In [8]:
import math
import os
import pickle
import numpy as np
import matplotlib.pyplot as plt
import japanize_matplotlib
import torch

# BoTorch / GPyTorch
from botorch import fit_gpytorch_model
from botorch.models import SingleTaskGP
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.kernels import RBFKernel, ScaleKernel
from botorch.acquisition import ExpectedImprovement
from botorch.optim import optimize_acqf

# デフォルトのdtypeをfloat32に設定
torch.set_default_dtype(torch.float32)

# プロット設定
plt.rcParams["figure.dpi"] = 100

import warnings
warnings.filterwarnings("ignore")

## テスト関数群（100次元中先頭5次元が有効）

In [9]:
def styblinski_tang_100d(x, noise_std=1e-5):
    if not torch.is_tensor(x):
        x = torch.tensor(x, dtype=torch.float32)
    x5 = x[..., :5]
    res = 0.5 * torch.sum(x5**4 - 16.0*x5**2 + 5.0*x5, dim=-1)
    return res + torch.randn_like(res) * noise_std

def rastrigin_100d(x, noise_std=1e-5):
    if not torch.is_tensor(x):
        x = torch.tensor(x, dtype=torch.float32)
    x5 = x[..., :5]
    s = torch.sum(x5**2 - 10.0*torch.cos(2*math.pi*x5) + 10.0, dim=-1)
    return s + torch.randn_like(s) * noise_std

def ackley_100d(x, noise_std=1e-5):
    if not torch.is_tensor(x):
        x = torch.tensor(x, dtype=torch.float32)
    x5 = x[..., :5]
    d = 5
    sum_sq = torch.sum(x5**2, dim=-1)
    r = torch.sqrt(sum_sq / d)
    part1 = -20.0 * torch.exp(-0.2 * r)
    part2 = -torch.exp(torch.mean(torch.cos(2.0*math.pi*x5), dim=-1))
    res = part1 + part2 + 20.0 + math.e
    return res + torch.randn_like(res) * noise_std

## LinBandit-BO ベースクラス

In [10]:
class LinBanditBO_Base:
    """LinBandit-BO base class for reward comparison"""
    
    def __init__(self, X, objective_function, bounds, n_initial, n_max, dim,
                 algo_base_name="LinBanditBO_Base", coordinate_ratio=0.8, 
                 run_id=1, output_base_dir="output_results", n_arms=None):
        self.X = X.float()
        self.dim = dim
        self.n_arms = n_arms if n_arms is not None else dim
        self.A = torch.eye(dim)
        self.b = torch.zeros(dim)
        self.objective_function = objective_function
        self.bounds = bounds.float()
        self.n_initial = n_initial
        self.n_max = n_max
        self.Y = None
        self.best_value = None
        self.best_point = None
        self.model = None
        self.eval_history = []
        self.selected_direction_history = []
        self.theta_history = []
        self.coordinate_ratio = coordinate_ratio
        self.scale_init = 1.0
        self.run_id = run_id
        self.total_iterations = 0
        
        # ファイル名設定
        arms_ratio = self.n_arms / self.dim
        self.function_name_with_ratio = f"Arms_{arms_ratio:.1f}x_coord_{self.coordinate_ratio:.1f}"
        self.algo_name_for_run = f"{algo_base_name}_{self.function_name_with_ratio}_run{self.run_id}"
        
        self.output_dir = os.path.join(output_base_dir, algo_base_name, self.function_name_with_ratio)
        os.makedirs(self.output_dir, exist_ok=True)
        
    def update_model(self):
        """ガウス過程モデルの更新"""
        kernel = ScaleKernel(
            RBFKernel(ard_num_dims=self.X.shape[-1], dtype=torch.float32),
            dtype=torch.float32, noise_constraint=1e-3
        ).to(self.X)
        self.model = SingleTaskGP(self.X, self.Y, covar_module=kernel)
        mll = ExactMarginalLogLikelihood(self.model.likelihood, self.model)
        fit_gpytorch_model(mll)
        
    def initialize(self):
        """初期化：初期点での評価とモデル構築"""
        y_val = self.objective_function(self.X)
        self.Y = y_val.unsqueeze(-1).float()
        
        # スケーリング係数の計算
        y_max, y_min = self.Y.max().item(), self.Y.min().item()
        self.scale_init = (y_max - y_min) if (y_max - y_min) != 0 else 1.0
        
        # モデルの初期化
        self.update_model()
        
        # 最良点の初期化
        post_mean = self.model.posterior(self.X).mean.squeeze(-1)
        bi = post_mean.argmin()
        self.best_value = post_mean[bi].item()
        self.best_point = self.X[bi]
        self.eval_history = [self.best_value] * self.n_initial
        
    def generate_arms(self):
        """アーム（方向ベクトル）を生成"""
        num_coord = int(self.coordinate_ratio * self.n_arms)
        num_coord = min(num_coord, self.dim)
        
        # 座標方向の生成
        if num_coord > 0:
            idxs = np.random.choice(self.dim, num_coord, replace=False)
            
            coords = []
            for i in idxs:
                e = torch.zeros(self.dim, device=self.X.device)
                e[i] = 1.0
                coords.append(e)
                
            coord_arms = torch.stack(coords, 0)
        else:
            coord_arms = torch.zeros(0, self.dim, device=self.X.device)
        
        # ランダム方向の生成
        num_rand = self.n_arms - num_coord
        if num_rand > 0:
            rand_arms = torch.randn(num_rand, self.dim, device=self.X.device)
            norms = rand_arms.norm(dim=1, keepdim=True)
            rand_arms = torch.where(norms > 1e-9, rand_arms / norms, 
                                   torch.randn_like(rand_arms) / (torch.randn_like(rand_arms).norm(dim=1,keepdim=True)+1e-9))
        else:
            rand_arms = torch.zeros(0, self.dim, device=self.X.device)
            
        return torch.cat([coord_arms, rand_arms], 0)
    
    def select_arm(self, arms_features):
        """Linear UCBによる方向選択"""
        # LinUCBパラメータ
        sigma = 1.0
        L = 1.0
        lambda_reg = 1.0
        delta = 0.1
        S = 1.0
        
        # 現在のパラメータ推定
        A_inv = torch.inverse(self.A)
        theta = A_inv @ self.b
        self.theta_history.append(theta.clone())
        
        # 信頼幅の計算
        current_round_t = max(1, self.total_iterations)
        log_term_numerator = max(1e-9, 1 + (current_round_t - 1) * L**2 / lambda_reg)
        beta_t = (sigma * math.sqrt(self.dim * math.log(log_term_numerator / delta)) + 
                  math.sqrt(lambda_reg) * S)
        
        # UCBスコアの計算
        ucb_scores = []
        for i in range(arms_features.shape[0]):
            x = arms_features[i].view(-1, 1)
            mean = (theta.view(1, -1) @ x).item()
            try:
                var = (x.t() @ A_inv @ x).item()
            except torch.linalg.LinAlgError:
                var = (x.t() @ torch.linalg.pinv(self.A) @ x).item()
                
            ucb_scores.append(mean + beta_t * math.sqrt(max(var, 0)))
            
        return int(np.argmax(ucb_scores))
    
    def propose_new_x(self, direction):
        """選択された方向に沿った最適化"""
        ei = ExpectedImprovement(self.model, best_f=self.best_value, maximize=False)
        
        # 方向に沿った探索範囲の計算
        active_dims_mask = direction.abs() > 1e-9
        if not active_dims_mask.any():
            lb, ub = -1.0, 1.0
        else:
            ratios_lower = (self.bounds[0] - self.best_point) / (direction + 1e-12 * (~active_dims_mask))
            ratios_upper = (self.bounds[1] - self.best_point) / (direction + 1e-12 * (~active_dims_mask))
            
            t_bounds = torch.zeros(self.dim, 2, device=self.X.device)
            t_bounds[:, 0] = torch.minimum(ratios_lower, ratios_upper)
            t_bounds[:, 1] = torch.maximum(ratios_lower, ratios_upper)
            
            lb = -float('inf')
            ub = float('inf')
            for i in range(self.dim):
                if active_dims_mask[i]:
                    lb = max(lb, t_bounds[i, 0].item())
                    ub = min(ub, t_bounds[i, 1].item())
                    
        if lb > ub:
            domain_width = (self.bounds[1, 0] - self.bounds[0, 0]).item()
            lb = -0.1 * domain_width
            ub = 0.1 * domain_width
            
        one_d_bounds = torch.tensor([[lb], [ub]], dtype=torch.float32, device=self.X.device)
        
        def ei_on_line(t_scalar_tensor):
            t_values = t_scalar_tensor.squeeze(-1)
            points_on_line = self.best_point.unsqueeze(0) + t_values.reshape(-1, 1) * direction.unsqueeze(0)
            points_on_line_clamped = torch.clamp(points_on_line, self.bounds[0].unsqueeze(0), self.bounds[1].unsqueeze(0))
            return ei(points_on_line_clamped.unsqueeze(1))
        
        # 獲得関数の最適化
        cand_t, _ = optimize_acqf(
            ei_on_line,
            bounds=one_d_bounds,
            q=1,
            num_restarts=10,
            raw_samples=100
        )
        
        alpha_star = cand_t.item()
        new_x = self.best_point + alpha_star * direction
        new_x_clamped = torch.clamp(new_x, self.bounds[0], self.bounds[1])
        
        return new_x_clamped
    
    def calculate_reward(self, direction, new_x, predicted_mean, actual_y):
        """リワード計算（サブクラスでオーバーライド）"""
        raise NotImplementedError("Subclass must implement calculate_reward method")
    
    def optimize(self):
        """メインの最適化ループ"""
        self.initialize()
        n_iter = self.n_initial
        
        while n_iter < self.n_max:
            self.total_iterations += 1
            
            # 探索方向の候補生成
            arms_features = self.generate_arms()
            
            # Linear UCBによる方向選択
            sel_idx = self.select_arm(arms_features)
            direction = arms_features[sel_idx]
            self.selected_direction_history.append(direction.clone())
            
            # 選択された方向に沿った最適化
            new_x = self.propose_new_x(direction)
            
            # 予測と実際の評価
            with torch.no_grad():
                predicted_mean = self.model.posterior(new_x.unsqueeze(0)).mean.squeeze().item()
            actual_y = self.objective_function(new_x.unsqueeze(0)).squeeze().item()
            
            # リワード計算（サブクラスで実装）
            reward = self.calculate_reward(direction, new_x, predicted_mean, actual_y)
            
            # Linear Banditパラメータの更新
            x_arm = direction.view(-1, 1)
            self.A += x_arm @ x_arm.t()
            self.b += reward
            
            # データとモデルの更新
            self.X = torch.cat([self.X, new_x.unsqueeze(0)], 0)
            self.Y = torch.cat([self.Y, torch.tensor([[actual_y]], dtype=torch.float32, device=self.X.device)], 0)
            self.update_model()
            
            # 最良点の更新
            with torch.no_grad():
                posterior_mean = self.model.posterior(self.X).mean.squeeze(-1)
            current_best_idx = posterior_mean.argmin()
            self.best_value = posterior_mean[current_best_idx].item()
            self.best_point = self.X[current_best_idx]
            
            self.eval_history.append(self.best_value)
            n_iter += 1
            
        return self.best_point, self.best_value

## LinBandit-BO：元のリワード設計（予測誤差ベース）

In [11]:
class LinBanditBO_Original(LinBanditBO_Base):
    """LinBandit-BO with original reward design (prediction error based)"""
    
    def __init__(self, X, objective_function, bounds, n_initial, n_max, dim,
                 algo_base_name="LinBanditBO_Original", coordinate_ratio=0.8, 
                 run_id=1, output_base_dir="output_results", n_arms=None):
        super().__init__(X, objective_function, bounds, n_initial, n_max, dim,
                         algo_base_name, coordinate_ratio, run_id, output_base_dir, n_arms)
    
    def calculate_reward(self, direction, new_x, predicted_mean, actual_y):
        """元のリワード設計：予測誤差ベース"""
        prediction_error = abs(predicted_mean - actual_y)
        reward = 10.0 * (1.0 - math.exp(-prediction_error / self.scale_init))
        return reward * direction

## LinBandit-BO：勾配ベースリワード設計

In [12]:
class LinBanditBO_Gradient(LinBanditBO_Base):
    """LinBandit-BO with gradient-based reward design"""
    
    def __init__(self, X, objective_function, bounds, n_initial, n_max, dim,
                 algo_base_name="LinBanditBO_Gradient", coordinate_ratio=0.8, 
                 run_id=1, output_base_dir="output_results", n_arms=None):
        super().__init__(X, objective_function, bounds, n_initial, n_max, dim,
                         algo_base_name, coordinate_ratio, run_id, output_base_dir, n_arms)
    
    def calculate_reward(self, direction, new_x, predicted_mean, actual_y):
        """勾配ベースリワード設計"""
        # 勾配計算用の点を準備
        new_x_for_grad = new_x.clone().unsqueeze(0)
        new_x_for_grad.requires_grad_(True)
        
        # GPモデルで事後分布を取得
        posterior = self.model.posterior(new_x_for_grad)
        mean_at_new_x = posterior.mean
        
        # 勾配を計算
        mean_at_new_x.sum().backward()
        grad_vector = new_x_for_grad.grad.squeeze(0)
        
        # 報酬ベクトルを定義（絶対値を取ることで影響の大きさを評価）
        reward_vector = grad_vector.abs()
        
        return reward_vector

## 実験実行

In [13]:
def generate_initial_points(n_initial, dim, bounds):
    return torch.rand(n_initial, dim) * (bounds[1] - bounds[0]) + bounds[0]

In [14]:
if __name__ == "__main__":
    test_funcs = [
        ("StyblinskiTang", styblinski_tang_100d, -195.83),
        ("Rastrigin", rastrigin_100d, 0.0),
        ("Ackley", ackley_100d, 0.0),
    ]
    dim = 20
    bounds = torch.tensor([[-5.0]*dim, [5.0]*dim], dtype=torch.float32)
    n_initial = 5
    n_iter = 300  
    n_runs = 20
    
    # アーム数を次元の0.5倍に設定
    n_arms = dim // 2  # 20次元なら10本

    output_base_dir = "output_results_linbandit_reward_comparison"
    os.makedirs(output_base_dir, exist_ok=True)

    coordinate_ratio = 0.8

    # 全実行で共通の初期点
    initial_points_all_runs = [
        generate_initial_points(n_initial, dim, bounds)
        for _ in range(n_runs)
    ]

    algorithms = [
        ("Original_Reward", LinBanditBO_Original),
        ("Gradient_Reward", LinBanditBO_Gradient)
    ]

    for func_name_short, func_eval, global_opt_val in test_funcs:
        print(f"========== テスト関数実行中: {func_name_short} ==========")

        # 全アルゴリズムの結果を保存
        all_algorithm_results = {}

        for algo_name, algo_class in algorithms:
            print(f"--- {algo_name} アルゴリズム実行中 (アーム数: {n_arms}本) ---")
            
            histories_for_this_algo = []
            dim_sums_for_this_algo = []

            # シンプルなプログレス表示
            for run_idx in range(n_runs):
                print(f"\r  実行中: {run_idx + 1}/{n_runs}", end="", flush=True)
                
                initial_X_for_run = initial_points_all_runs[run_idx].clone().to(dtype=torch.float32)

                optimizer = algo_class(
                    X=initial_X_for_run,
                    objective_function=func_eval,
                    bounds=bounds,
                    n_initial=n_initial,
                    n_max=n_iter,
                    dim=dim,
                    algo_base_name=func_name_short,
                    coordinate_ratio=coordinate_ratio,
                    run_id=run_idx + 1,
                    output_base_dir=output_base_dir,
                    n_arms=n_arms  # アーム数を0.5倍に設定
                )

                _, _ = optimizer.optimize()

                histories_for_this_algo.append(optimizer.eval_history)

                if optimizer.selected_direction_history:
                    directions_tensor = torch.stack(optimizer.selected_direction_history, 0)
                    abs_sum_per_dim = directions_tensor.abs().sum(dim=0).cpu().numpy()
                    dim_sums_for_this_algo.append(abs_sum_per_dim)
                else:
                    dim_sums_for_this_algo.append(np.zeros(dim))
            
            print()  # 改行

            # 収束統計の計算
            eval_histories_np_array = np.array(histories_for_this_algo)
            mean_convergence = eval_histories_np_array.mean(axis=0)
            std_convergence = eval_histories_np_array.std(axis=0)

            if dim_sums_for_this_algo:
                avg_dim_abs_sum = np.mean(np.stack(dim_sums_for_this_algo, 0), axis=0)
            else:
                avg_dim_abs_sum = np.zeros(dim)

            all_algorithm_results[algo_name] = {
                'mean_hist': mean_convergence,
                'std_hist': std_convergence,
                'avg_dim_abs_sum': avg_dim_abs_sum
            }

            # 個別アルゴリズムの方向プロットを保存
            plot_save_dir = os.path.join(output_base_dir, func_name_short, f"{algo_name}_arms_{n_arms}_coord_{coordinate_ratio:.1f}")
            os.makedirs(plot_save_dir, exist_ok=True)
            
            plt.figure(figsize=(10, 6))
            plt.bar(np.arange(dim), avg_dim_abs_sum, alpha=0.7)
            plt.xlabel("次元インデックス", fontsize=12)
            plt.ylabel("方向成分絶対値の平均和", fontsize=12)
            title_str = (f"{func_name_short} - {algo_name} (アーム数: {n_arms}本)\n"
                        f"coord_ratio={coordinate_ratio:.1f}, {n_runs}回実行")
            plt.title(title_str, fontsize=14)
            plt.xticks(np.arange(0, dim, step=max(1, dim//10)), fontsize=10)
            plt.yticks(fontsize=10)
            plt.grid(axis='y', linestyle='--', alpha=0.7)
            plt.tight_layout()
            plt.savefig(os.path.join(plot_save_dir, "average_dimension_abs_sum.png"), dpi=150)
            plt.close()

        # 比較収束プロットの作成
        plt.figure(figsize=(12, 8))
        iters_plot = np.arange(1, n_iter + 1)
        
        colors = ['blue', 'red']
        for i, (algo_name, results) in enumerate(all_algorithm_results.items()):
            plt.plot(iters_plot, results['mean_hist'], 
                    label=f"{algo_name}", color=colors[i], linewidth=2)
            plt.fill_between(iters_plot,
                           results['mean_hist'] - results['std_hist'],
                           results['mean_hist'] + results['std_hist'],
                           alpha=0.2, color=colors[i])

        plt.axhline(global_opt_val, color='green', linestyle='--', label='大域最適値', linewidth=2)
        plt.xlabel("評価回数", fontsize=14)
        plt.ylabel("発見された最良目的値 (平均 ± 標準偏差)", fontsize=14)
        plt.title(f"{func_name_short} - LinBandit-BO リワード比較\n(アーム数: {n_arms}本, coordinate_ratio={coordinate_ratio:.1f})", fontsize=16)
        plt.legend(fontsize=12)
        plt.xticks(fontsize=12)
        plt.yticks(fontsize=12)
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()

        # 比較プロットの保存
        comparison_plot_save_dir = os.path.join(output_base_dir, func_name_short)
        os.makedirs(comparison_plot_save_dir, exist_ok=True)
        plt.savefig(os.path.join(comparison_plot_save_dir, f"{func_name_short}_linbandit_reward_comparison.png"), dpi=150)
        plt.close()

        # 方向比較プロットの作成
        plt.figure(figsize=(12, 5))
        for i, (algo_name, results) in enumerate(all_algorithm_results.items()):
            plt.subplot(1, 2, i+1)
            plt.bar(np.arange(dim), results['avg_dim_abs_sum'], alpha=0.7, color=colors[i])
            plt.xlabel("次元インデックス", fontsize=10)
            plt.ylabel("方向成分絶対値の平均和", fontsize=10)
            plt.title(f"{algo_name}\n{func_name_short} (アーム数: {n_arms}本)", fontsize=12)
            plt.xticks(np.arange(0, dim, step=max(1, dim//5)), fontsize=8)
            plt.yticks(fontsize=8)
            plt.grid(axis='y', linestyle='--', alpha=0.5)
            
        plt.suptitle(f"LinBandit-BO リワード比較 - {func_name_short}", fontsize=14)
        plt.tight_layout()
        plt.savefig(os.path.join(comparison_plot_save_dir, f"{func_name_short}_direction_comparison.png"), dpi=150)
        plt.close()

        # 最終性能の棒グラフ
        plt.figure(figsize=(8, 6))
        algo_names_list = list(all_algorithm_results.keys())
        final_means = [all_algorithm_results[name]['mean_hist'][-1] for name in algo_names_list]
        final_stds = [all_algorithm_results[name]['std_hist'][-1] for name in algo_names_list]
        x_pos = np.arange(len(algo_names_list))
        
        bars = plt.bar(x_pos, final_means, yerr=final_stds, capsize=10, 
                       color=colors[:len(algo_names_list)], alpha=0.7)
        
        plt.axhline(global_opt_val, color='black', linestyle='--', label='大域最適値', linewidth=2)
        plt.xlabel("リワード設計", fontsize=14)
        plt.ylabel("最終的な最良値 (平均 ± 標準偏差)", fontsize=14)
        plt.title(f"{func_name_short} - LinBandit-BO リワード比較\n(アーム数: {n_arms}本)", fontsize=16)
        plt.xticks(x_pos, algo_names_list, fontsize=12)
        plt.yticks(fontsize=12)
        plt.legend(fontsize=12)
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.savefig(os.path.join(comparison_plot_save_dir, f"{func_name_short}_final_performance.png"), dpi=150)
        plt.close()

        print(f"========== テスト関数完了: {func_name_short} ==========")

    print("全ての実験が完了しました。")
    print(f"アーム数設定: {n_arms}本 (次元数の0.5倍)")
    print(f"coordinate_ratio: {coordinate_ratio}")

--- Original_Reward アルゴリズム実行中 (アーム数: 10本) ---
  実行中: 2/20

KeyboardInterrupt: 

## 結果の分析と考察

実験完了後、以下の観点から分析を行います：

### 1. リワード設計の違いによる影響
- **元のリワード**: 予測誤差に基づく単一のスカラー値でbanditパラメータを更新
- **勾配ベースリワード**: GPモデルの勾配ベクトルの絶対値で各次元を個別に更新

### 2. 収束性能の比較
- どちらのリワード設計が faster convergence を示すか
- 最終的な最適化性能の差

### 3. 方向選択の特性
- 勾配ベースリワードが有効次元（0-4）により効率的に集中するか
- 各次元の使用頻度の違い

### 4. アーム数設定の影響
- 次元数の0.5倍（10本）のアーム数設定での効果
- 計算効率とのトレードオフ

### 5. テスト関数による違い
- 関数の特性（滑らかさ、多峰性）によるリワード設計の効果の違い
- Styblinski-Tang, Rastrigin, Ackley での性能比較

実験を実行して、これらの観点から結果を確認してください。