In [519]:
from IPython.utils import io
import os
import subprocess
import tqdm.notebook

TQDM_BAR_FORMAT = '{l_bar}{bar}| {n_fmt}/{total_fmt} [elapsed: {elapsed} remaining: {remaining}]'


In [520]:
import torch
from botorch import fit_gpytorch_model
from botorch.acquisition import UpperConfidenceBound
from botorch.models import SingleTaskGP
from botorch.optim import optimize_acqf
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.kernels import RBFKernel, ScaleKernel
import numpy as np
import pandas as pd
from scipy.special import softmax
import matplotlib.pyplot as plt
import seaborn as sns
from botorch.models import SaasFullyBayesianSingleTaskGP
from gpytorch.kernels import MaternKernel, ScaleKernel
from botorch.acquisition import ExpectedImprovement
from gpytorch import constraints

# 目的関数の用意 (Rosenbrock関数)

In [521]:
def styblinski_tang(x):
    indices = [0, 1, 2, 3, 4]
    x_selected = x[..., indices]
    return 0.5 * torch.sum(x_selected ** 4 - 16 * x_selected ** 2 + 5 * x_selected, dim=-1)

In [522]:
# styblinski_tang関数の最適解
global_optimum = -39.16599 * 5

# 初期点の生成関数

In [523]:
def generate_initial_points(n_initial, dim, bounds):
    return torch.rand(n_initial, dim) * (bounds[1] - bounds[0]) + bounds[0]

# モデル作成関数


In [524]:
def create_model(train_X, train_Y):
    kernel = ScaleKernel(RBFKernel(ard_num_dims=train_X.shape[-1], noise_constraint=1e-5))
    model = SingleTaskGP(train_X, train_Y, covar_module=kernel)
    return model

# ドロップアウトベイズクラス

In [525]:
class DropoutMixBO:
    def __init__(self, dim, active_dim, bounds, n_initial, obj_function, dropout_prob=0.1):

        self.dim = dim  # 全体の次元数
        self.active_dim = active_dim  # 活性化する次元数
        self.bounds = bounds  # 各次元の探索範囲
        self.obj_function = obj_function  # 最適化したい目的関数
        self.dropout_prob = dropout_prob  # ドロップアウトの確率
        self.X = generate_initial_points(n_initial, dim, bounds)  # 初期点を生成
        self.Y = self.obj_function(self.X)  # 初期点での目的関数の値を計算
        self.best_f = self.Y.min().item()  # 現在の最良の目的関数値
        self.best_x = self.X[self.Y.argmin()]  # 現在の最良の解
        self.eval_history = [self.best_f] * n_initial  # 評価履歴を初期化

    def optimize(self, n_iter):
        for _ in range(n_iter):  # 指定された回数だけ最適化を繰り返す
            # 全次元からランダムにactive_dim個選ぶ
            active_dims = np.random.choice(self.dim, self.active_dim, replace=False)

            train_X = self.X[:, active_dims].float()  # 選んだ次元のデータだけ抽出
            train_Y = self.Y.unsqueeze(-1).float()  # Yの形状を調整
            
            model = create_model(train_X, train_Y)  # GPモデルを作成
            mll = ExactMarginalLogLikelihood(model.likelihood, model)  # 尤度関数を定義
            fit_gpytorch_model(mll)  # モデルを学習

            EI = ExpectedImprovement(model, best_f=self.best_f, maximize=False)  # Expected Improvementを定義
            bounds_active = torch.stack([self.bounds[0][active_dims], self.bounds[1][active_dims]]).float()  # 活性化次元の探索範囲を定義
            
            candidate, _ = optimize_acqf(  # 獲得関数を最適化して次の候補点を見つける
                EI, bounds=bounds_active, q=1, num_restarts=10, raw_samples=100,
            )

            x_new = torch.zeros(self.dim)  # 新しい候補点を初期化
            if np.random.random() < self.dropout_prob:  # ドロップアウトを適用するかどうか決める
                x_new[active_dims] = candidate.squeeze()  # 活性化次元に候補点の値を設定
                inactive_dims = np.setdiff1d(range(self.dim), active_dims)  # 非活性化次元を特定
                x_new[inactive_dims] = (torch.rand(len(inactive_dims))  # 非活性化次元にランダムな値を設定
                                        * (self.bounds[1][inactive_dims] - self.bounds[0][inactive_dims])
                                        + self.bounds[0][inactive_dims])
            else:
                x_new[active_dims] = candidate.squeeze()  # 活性化次元に候補点の値を設定
                x_new[np.setdiff1d(range(self.dim), active_dims)] = self.best_x[  # 非活性化次元に最良解の値を設定
                    np.setdiff1d(range(self.dim), active_dims)]

            y_new = self.obj_function(x_new.unsqueeze(0))  # 新しい候補点での目的関数値を計算

            self.X = torch.cat([self.X, x_new.unsqueeze(0)])  # データセットに新しい点を追加
            self.Y = torch.cat([self.Y, y_new])  # 目的関数値のリストに新しい値を追加

            if y_new < self.best_f:  # もし新しい点が今までの最良値より良ければ
                self.best_f = y_new.item()  # 最良値を更新
                self.best_x = x_new  # 最良解を更新

            self.eval_history.append(self.best_f)  # 評価履歴に現在の最良値を追加

        return self.best_x, self.best_f  # 最適化が終わったら最良解と最良値を返す

# fullバンディットアルゴリズムを取り入れたDropoutMixBOクラス

In [526]:
class DropoutMixEST1BO:
    def __init__(self, dim, active_dim, bounds, n_initial, obj_function, dropout_prob=0.1, delta=0.1):
        self.dim = dim  # 全次元数
        self.active_dim = active_dim  # 活性化する次元数
        self.bounds = bounds  # 各次元の探索範囲
        self.obj_function = obj_function  # 最適化したい目的関数
        self.dropout_prob = dropout_prob  # ドロップアウトの確率
        self.delta = delta  # 信頼度
        self.X = generate_initial_points(n_initial, dim, bounds).float()  # 初期点を生成
        self.Y = self.obj_function(self.X).float()  # 初期点での目的関数の値を計算
        self.best_f = self.Y.min().item()  # 現在の最良の目的関数値
        self.best_x = self.X[self.Y.argmin()]  # 現在の最良の解
        self.eval_history = [self.best_f] * n_initial  # 評価履歴を初期化
        self.improvement_history = [] # 報酬の履歴を初期化
        self.iteration = 0  # 現在のイテレーション

        # CSARアルゴリズムの初期化
        self.N = list(range(self.dim))  # 全次元の集合
        self.accepted_dims = []  # 受理された次元
        self.rejected_dims = []  # 除外された次元
        self.remaining_dims = self.N.copy()  # 残りの次元
        self.theta_hat = np.zeros(self.dim)  # 各次元の推定報酬
        self.phase = 1  # 現在のフェーズ
        self.epsilon_t = 0.5  # 初期の精度レベル
        self.delta_t = (6 / np.pi ** 2) * self.delta  # 初期の信頼度レベル

    def EST1(self, N_t, k, epsilon_t, delta_t):
        n = len(N_t)
        # サンプル数を計算（m は 1 としていますが、必要に応じて調整してください）
        m = 1
        # N_t をサイズ 2k の部分集合に分割
        num_subsets = int(np.ceil(n / (2 * k)))
        subsets = []
        for i in range(num_subsets):
            subset = N_t[i * 2 * k:(i + 1) * 2 * k]
            if len(subset) < 2 * k:
                # 次元が足りない場合は繰り返しで埋める
                subset += subset[:(2 * k - len(subset))]
            subsets.append(subset)

        # 推定報酬と出現回数を初期化
        theta_hat = np.zeros(n)
        counts = np.zeros(n)

        # 各部分集合について推定を行う
        for subset in subsets:
            # サイズ 2k のハダマード行列を作成
            H = self.create_hadamard(2 * k)
            Z_hat = np.zeros(2 * k)

            # subset 内の次元を N_t のインデックスにマッピング
            subset_indices = [N_t.index(dim) for dim in subset]

            # ハダマード行列に従って部分集合をサンプリング
            for i in range(2 * k):
                h_row = H[i]
                if i == 0:
                    pos_dims = subset[:k]
                    neg_dims = subset[k:2 * k]
                else:
                    pos_dims = [subset[j] for j in range(2 * k) if h_row[j] == 1]
                    neg_dims = [subset[j] for j in range(2 * k) if h_row[j] == -1]

                pos_samples = []
                neg_samples = []

                for l in range(m):
                    # 正の次元の報酬をサンプリング
                    if len(pos_dims) > 0:
                        pos_sample = self.predict_without_x(pos_dims)
                        pos_samples.append(pos_sample)
                    else:
                        # pos_dims が空の場合、デフォルト値を使用（例として0を使用）
                        pos_samples.append(0)

                    # 負の次元の報酬をサンプリング
                    if len(neg_dims) > 0:
                        neg_sample = self.predict_without_x(neg_dims)
                        neg_samples.append(neg_sample)
                    else:
                        # neg_dims が空の場合、デフォルト値を使用（例として0を使用）
                        neg_samples.append(0)

                # 正の次元と負の次元の報酬の平均を計算
                mu_pos = np.mean(pos_samples) if len(pos_samples) > 0 else 0
                mu_neg = np.mean(neg_samples) if len(neg_samples) > 0 else 0

                if i == 0:
                    Z_hat[i] = mu_pos + mu_neg
                else:
                    Z_hat[i] = mu_pos - mu_neg

            # ハダマード行列を用いて次元ごとの報酬を推定
            theta_subset = (1 / (2 * k)) * H.T @ Z_hat

            # 推定された報酬を theta_hat に反映し、出現回数を更新
            for idx, dim_idx in enumerate(subset_indices):
                theta_hat[dim_idx] += theta_subset[idx]
                counts[dim_idx] += 1

        # 各次元の推定報酬の平均を計算
        theta_hat = theta_hat / counts

        return theta_hat

    def create_hadamard(self, n):
        # サイズnのハダマード行列を作成（nは2の累乗）
        assert n & (n - 1) == 0, "ハダマード行列のサイズは2の累乗である必要があります"
        H = np.array([[1]])
        while H.shape[0] < n:
            H = np.block([[H, H], [H, -H]])
        return H

    def predict(self, X, active_dims):
        # GPモデルを使用して予測を行う
        train_X = self.X[:, active_dims].float()
        train_Y = self.Y.unsqueeze(-1).float()
        model = create_model(train_X, train_Y)
        mll = ExactMarginalLogLikelihood(model.likelihood, model)
        fit_gpytorch_model(mll)
        model.eval()
        self.iteration += 1
        print(self.iteration)
        with torch.no_grad():
            y_pred = model(X.float()).mean.item()
        return y_pred

    def predict_without_x(self, active_dims):
        # GPモデルを使用して予測を行う
        train_X = self.X[:, active_dims].float()
        train_Y = self.Y.unsqueeze(-1).float()
        model = create_model(train_X, train_Y)
        mll = ExactMarginalLogLikelihood(model.likelihood, model)
        fit_gpytorch_model(mll)
        model.eval()
        self.iteration += 1
        print(self.iteration)

        # Expected Improvement (EI) 獲得関数の定義
        EI = ExpectedImprovement(model, best_f=self.best_f, maximize=False)
        bounds_active = torch.stack([self.bounds[0][active_dims], self.bounds[1][active_dims]]).float()

        # 獲得関数の最適化
        candidate, _ = optimize_acqf(
            EI, bounds=bounds_active, q=1, num_restarts=10, raw_samples=100,
            options={"maxiter": 200, "batch_limit": 5}
        )

        # 新しい候補点を構築
        x_new = torch.zeros(self.dim, dtype=torch.float32)
        x_new[active_dims] = candidate.squeeze()
        x_new[np.setdiff1d(range(self.dim), active_dims)] = self.best_x[
            np.setdiff1d(range(self.dim), active_dims)]

        with torch.no_grad():
            y_pred = model(x_new[active_dims].unsqueeze(0).float()).mean.item()
        return y_pred

        # 目的関数の評価
        y_new = self.obj_function(x_new.unsqueeze(0))
        if isinstance(y_new, torch.Tensor):
            y_new = y_new.item()

        improvement = np.exp(-((y_pred - y_new) ** 2))
        self.improvement_history.append(improvement)

        # 活性化次元の推定報酬を更新
        for dim in active_dims:
            self.theta_hat[dim] = (self.theta_hat[dim] + improvement) / 2  # 平均を取る

        # データセットに新しい点を追加
        self.X = torch.cat([self.X, x_new.unsqueeze(0)])
        self.Y = torch.cat([self.Y, y_new])

        # 最良の解を更新
        if y_new < self.best_f:
            self.best_f = y_new.item()
            self.best_x = x_new

        self.eval_history.append(self.best_f)

        return improvement

    def optimize(self, n_iter):
        while self.iteration < n_iter:
            # CSARアルゴリズムの実行
            while len(self.remaining_dims) + len(self.accepted_dims) > self.active_dim:
                # 推定アルゴリズムEST1を使用して報酬を推定
                theta_hat_t = self.EST1(self.remaining_dims, self.active_dim, self.epsilon_t, self.delta_t)
                # 推定報酬に基づいて次元をソート
                sorted_indices = np.argsort(-theta_hat_t)
                sorted_dims = [self.remaining_dims[i] for i in sorted_indices]

                # 受理および除外する次元を決定
                theta_k = theta_hat_t[sorted_indices[self.active_dim - 1]]
                theta_k_plus_1 = theta_hat_t[sorted_indices[self.active_dim]] if len(sorted_dims) > self.active_dim else -np.inf

                # A = [sorted_dims[i] for i in range(len(sorted_dims)) if theta_hat_t[sorted_indices[i]] - theta_k_plus_1 > 2 * self.epsilon_t]
                # R = [sorted_dims[i] for i in range(len(sorted_dims)) if theta_k - theta_hat_t[sorted_indices[i]] > 2 * self.epsilon_t]

                # m = 1 のときのepsilon_t
                epsilon_m1 = np.sqrt(2 * np.log(len(self.remaining_dims) / self.delta_t) )

                A = []
                R = [sorted_dims[i] for i in range(len(sorted_dims)) if theta_k - theta_hat_t[sorted_indices[i]] > 2 * epsilon_m1]

                #self.accepted_dims.extend(A)
                self.rejected_dims.extend(R)
                self.remaining_dims = [dim for dim in self.remaining_dims if dim not in A and dim not in R]

                # 精度と信頼度を更新
                self.phase += 1
                self.epsilon_t /= 2
                self.delta_t = (6 / (np.pi ** 2)) * (self.delta / (self.phase ** 2))

                # 必要な次元数が揃ったらループを抜ける
                if len(self.accepted_dims) >= self.active_dim:
                    break

            # 活性化次元を決定
            if len(self.accepted_dims) >= self.active_dim:
                active_dims = self.accepted_dims[:self.active_dim]
            else:
                active_dims = self.accepted_dims + self.remaining_dims[:(self.active_dim - len(self.accepted_dims))]

            active_dims = active_dims[:self.active_dim]  # 必要に応じて調整

            # 次のイテレーションのためにリセット
            self.remaining_dims = self.N.copy()
            self.accepted_dims = []
            self.rejected_dims = []
            self.phase = 1
            self.epsilon_t = 0.5
            self.delta_t = (6 / np.pi ** 2) * self.delta

        return self.best_x, self.best_f

# そのままベイズ最適化クラス

In [527]:
class BasicBO:
    def __init__(self, dim, bounds, n_initial, obj_function):
        self.dim = dim  # 次元数
        self.bounds = bounds  # 各次元の探索範囲
        self.obj_function = obj_function  # 最適化したい目的関数
        self.X = generate_initial_points(n_initial, dim, bounds).float()  # 初期点を生成
        self.Y = self.obj_function(self.X).float()  # 初期点での目的関数の値を計算
        self.best_f = self.Y.min().item()  # 現在の最良の目的関数値
        self.best_x = self.X[self.Y.argmin()]  # 現在の最良の解
        self.eval_history = [self.best_f] * n_initial  # 評価履歴を初期化

    def optimize(self, n_iter):
        for _ in range(n_iter):  # 指定された回数だけ最適化を繰り返す
            train_X = self.X  # すべての次元のデータを使用
            train_Y = self.Y.unsqueeze(-1)  # Yの形状を調整
            
            model = create_model(train_X, train_Y)  # GPモデルを作成
            mll = ExactMarginalLogLikelihood(model.likelihood, model)  # 尤度関数を定義
            fit_gpytorch_model(mll)  # モデルを学習

            EI = ExpectedImprovement(model, best_f=self.best_f, maximize=False)  # Expected Improvementを定義
            bounds = torch.stack([self.bounds[0], self.bounds[1]]).float()  # 探索範囲を定義
            
            candidate, _ = optimize_acqf(  # 獲得関数を最適化して次の候補点を見つける
                EI, bounds=bounds, q=1, num_restarts=10, raw_samples=100,
            )

            x_new = candidate.squeeze()  # 新しい候補点

            y_new = self.obj_function(x_new.unsqueeze(0))  # 新しい候補点での目的関数値を計算

            self.X = torch.cat([self.X, x_new.unsqueeze(0)])  # データセットに新しい点を追加
            self.Y = torch.cat([self.Y, y_new])  # 目的関数値のリストに新しい値を追加

            if y_new < self.best_f:  # もし新しい点が今までの最良値より良ければ
                self.best_f = y_new.item()  # 最良値を更新
                self.best_x = x_new  # 最良解を更新

            self.eval_history.append(self.best_f)  # 評価履歴に現在の最良値を追加

        return self.best_x, self.best_f  # 最適化が終わったら最良解と最良値を返す

# REMBO

In [528]:
class REMBO:
    def __init__(self, high_dim, low_dim, bounds, n_initial, obj_function):
        assert high_dim >= low_dim, "high_dim must be greater than or equal to low_dim"

        self.high_dim = high_dim
        self.low_dim = low_dim
        self.bounds = bounds
        self.obj_function = obj_function
        
        # すべてのテンソルをdouble型に変更
        self.A = torch.randn(high_dim, low_dim, dtype=torch.double)
        
        self.X_low = (torch.randn(n_initial, low_dim, dtype=torch.double) * 2 - 1)
        
        self.X_high = torch.clamp(self.X_low @ self.A.t(), bounds[0], bounds[1])
        assert self.X_high.shape == (n_initial, high_dim), f"Expected shape {(n_initial, high_dim)}, but got {self.X_high.shape}"
        
        self.Y = self.obj_function(self.X_high)
        
        self.best_f = self.Y.min().item()
        self.best_x = self.X_high[self.Y.argmin()]
        self.eval_history = [self.best_f] * n_initial

    def optimize(self, n_iter):
        for _ in range(n_iter):
            train_X_low = self.X_low
            train_Y = self.Y.unsqueeze(-1)
            model = SingleTaskGP(train_X_low, train_Y)
            mll = ExactMarginalLogLikelihood(model.likelihood, model)
            fit_gpytorch_model(mll)

            UCB = UpperConfidenceBound(model, beta=0.1)
            
            bounds_low = torch.stack([torch.ones(self.low_dim, dtype=torch.double) * -1, torch.ones(self.low_dim, dtype=torch.double)])
            candidate_low, _ = optimize_acqf(
                UCB, bounds=bounds_low, q=1, num_restarts=5, raw_samples=20,
            )

            x_high = torch.clamp(candidate_low @ self.A.t(), self.bounds[0], self.bounds[1])
            y_new = self.obj_function(x_high)

            self.X_low = torch.cat([self.X_low, candidate_low])
            self.X_high = torch.cat([self.X_high, x_high])
            self.Y = torch.cat([self.Y, y_new])

            if y_new < self.best_f:
                self.best_f = y_new.item()
                self.best_x = x_high.squeeze()

            self.eval_history.append(self.best_f)

        return self.best_x, self.best_f

# バンディットアルゴリズムを取り入れたDropoutMixBOクラス

In [529]:
class DropoutMixBO_BC:
    def __init__(self, dim, active_dim, bounds, n_initial, obj_function, dropout_prob=0.0, epsilon=0.1,
                 temperature=1e-3, reset_interval=1000, learning_rate=0.005, initial_beta=2.0, annealing_rate=1000):
        # クラスの初期化
        self.dim = dim
        self.active_dim = active_dim
        self.bounds = bounds
        self.dropout_prob = dropout_prob
        self.obj_function = obj_function
        self.epsilon = epsilon
        self.temperature = temperature
        self.reset_interval = reset_interval
        self.iteration = 0
        self.learning_rate = learning_rate
        self.initial_beta = initial_beta
        self.annealing_rate = annealing_rate

        # 初期点の生成と評価
        initial_X = generate_initial_points(n_initial, dim, bounds)
        initial_Y = obj_function(initial_X)

        self.X = initial_X.double()
        self.Y = initial_Y.double()

        self.best_f = self.Y.min().item()
        self.best_x = self.X[self.Y.argmin()]
        self.eval_history = [self.best_f] * n_initial
        self.improvement_history = []

        self.arm_rewards = np.zeros(dim)
        self.arm_counts = np.zeros(dim)
        self.total_pulls = 0
        self.dim_sensitivity = np.zeros(dim)

        self.arm_selection_history = []


    def select_active_dims(self):
        # 活性化する次元を選択
        self.iteration += 1

        # UCBスコアに基づいて選択
        ucb_scores = self.calculate_ucb_scores()

        # ソフトマックスの適用
        probabilities = softmax(ucb_scores / self.temperature)
        probabilities = np.nan_to_num(probabilities, nan=1.0 / self.dim, posinf=1.0, neginf=0.0)
        probabilities = np.clip(probabilities, 1e-10, 1)
        probabilities /= probabilities.sum()

        selected_arms = np.random.choice(self.dim, self.active_dim, replace=False, p=probabilities)

        # 選択された次元を記録
        arm_selection = np.zeros(self.dim)
        arm_selection[selected_arms] = 1
        self.arm_selection_history.append(arm_selection)

        return selected_arms

    def calculate_ucb_scores(self):
        # UCBスコアを計算
        exploration_term = np.sqrt(2 * np.log(self.total_pulls + 1) / (self.arm_counts + 1e-5))
        exploitation_term = self.arm_rewards / (self.arm_counts + 1e-5)

        # アニーリングによるβの調整
        beta = self.initial_beta * np.exp(-self.iteration / self.annealing_rate)

        ucb_scores = exploitation_term + beta * exploration_term
        return ucb_scores 

    def calculate_dimension_sensitivity(self, new_x, new_y):
        # new_y を Tensor に変換
        if not isinstance(new_y, torch.Tensor):
            new_y = torch.tensor([new_y], dtype=torch.double)
        # 新しいデータポイントを追加
        X_new = torch.cat([self.X, new_x.unsqueeze(0)], dim=0)
        Y_new = torch.cat([self.Y, new_y])

        self.X = X_new
        self.Y = Y_new

        # NumPy配列に変換
        X_np = self.X.cpu().numpy()
        Y_np = self.Y.cpu().numpy()

        sensitivities = np.zeros(self.dim)
        for i in range(self.dim):
            sorted_indices = np.argsort(X_np[:, i])
            sorted_x = X_np[sorted_indices, i]
            sorted_y = Y_np[sorted_indices]
            dx = np.diff(sorted_x)
            dy = np.diff(sorted_y)
            nonzero_dx = dx != 0
            diffs = np.zeros_like(dx)
            diffs[nonzero_dx] = dy[nonzero_dx] / dx[nonzero_dx]
            sensitivities[i] = np.mean(np.abs(diffs))

        total_sensitivity = np.sum(sensitivities) + 1e-10
        new_sensitivity = sensitivities / total_sensitivity

        # 指数移動平均で感度を更新
        alpha = 0.1
        self.dim_sensitivity = alpha * new_sensitivity + (1 - alpha) * self.dim_sensitivity

    def update_bandit(self, selected_dims, y_new, y_pred):
        # バンディットの更新
        improvement = np.exp(-((y_pred - y_new) ** 2))
        self.improvement_history.append(improvement)

        self.total_pulls += 1
        for dim in selected_dims:
            self.arm_counts[dim] += 1
            arm_contribution = improvement * self.dim_sensitivity[dim] / (
                    sum(self.dim_sensitivity[selected_dims]) + 1e-10)
            self.arm_rewards[dim] += arm_contribution


    def optimize(self, n_iter):
        # 最適化のメインループ
        for _ in range(n_iter):
            # 活性化する次元を選択
            active_dims = self.select_active_dims()
    
            # モデルの学習データを準備
            train_X = self.X[:, active_dims]
            train_Y = self.Y.unsqueeze(-1)
    
            # ガウス過程モデルの作成とフィッティング
            model = create_model(train_X, train_Y)
            mll = ExactMarginalLogLikelihood(model.likelihood, model)
            fit_gpytorch_model(mll)
    
            # Expected Improvement (EI) 獲得関数の定義
            EI = ExpectedImprovement(model, best_f=self.best_f, maximize=False)
            bounds_active = torch.stack([self.bounds[0][active_dims], self.bounds[1][active_dims]]).double()
    
            # 獲得関数の最適化
            candidate, _ = optimize_acqf(
                EI, bounds=bounds_active, q=1, num_restarts=10, raw_samples=100,
                options={"maxiter": 200, "batch_limit": 5}
            )
    
            # 新しい候補点を構築
            x_new = torch.zeros(self.dim, dtype=torch.double)
            x_new[active_dims] = candidate.squeeze()
            x_new[np.setdiff1d(range(self.dim), active_dims)] = self.best_x[
                np.setdiff1d(range(self.dim), active_dims)]
    
            # ガウス過程モデルによる予測値の計算
            with torch.no_grad():
                y_pred = model(x_new[active_dims].unsqueeze(0)).mean.item()
    
            # 目的関数の評価
            y_new = self.obj_function(x_new.unsqueeze(0))
            if isinstance(y_new, torch.Tensor):
                y_new = y_new.item()
    
            # 感度の更新
            self.calculate_dimension_sensitivity(x_new, y_new)
    
            # バンディットの更新
            self.update_bandit(active_dims, y_new, y_pred)
    
            # 最良値の更新
            if y_new < self.best_f:  # item() を取り除く
                self.best_f = y_new
                self.best_x = x_new
    
            # 評価履歴の更新
            self.eval_history.append(self.best_f)
    
        # 次元選択の履歴をDataFrameに変換
        self.arm_selection_df = pd.DataFrame(self.arm_selection_history,
                                             columns=[f'Arm_{i}' for i in range(self.dim)])
        self.arm_selection_df.index.name = 'Iteration'
    
        return self.best_x, self.best_f


    # 結果の保存と可視化
    def save_arm_selection_history(self, filename):
        self.arm_selection_df.to_csv(filename)


    def plot_dim_sensitivity(self):
        plt.figure(figsize=(12, 6))
        plt.bar(range(self.dim), self.dim_sensitivity)
        plt.xlabel('Dimension')
        plt.ylabel('Sensitivity')
        plt.title('Dimension Sensitivity')
        plt.xticks(range(self.dim))
        plt.grid(True)
        plt.show()

# 最適化の実行


In [530]:
dim = 10
active_dim = 2
bounds = torch.tensor([[-5.0] * dim, [5.0] * dim])
n_initial = 200
n_iter = 1000

In [531]:
dropout_bo_mix = DropoutMixBO(dim, active_dim, bounds, n_initial, styblinski_tang, dropout_prob=0.1)
dropout_bo_copy = DropoutMixBO(dim, active_dim, bounds, n_initial, styblinski_tang, dropout_prob=0.0)
dropout_bo_random = DropoutMixBO(dim, active_dim, bounds, n_initial, styblinski_tang, dropout_prob=1.0)
dropout_bandit_bc = DropoutMixBO_BC(dim, active_dim, bounds, n_initial, styblinski_tang, dropout_prob=0.0)
dropout_bandit_est1 = DropoutMixEST1BO(dim, active_dim, bounds, n_initial, styblinski_tang, dropout_prob=0.0)
basic_bo = BasicBO(dim, bounds, n_initial, styblinski_tang)
rembo = REMBO(dim, active_dim, bounds, n_initial, styblinski_tang)

In [532]:
try:
  with tqdm.notebook.tqdm(total=100, bar_format=TQDM_BAR_FORMAT) as pbar:
    with io.capture_output() as captured:
      dropout_mix_best_x , dropout_mix_best_f = dropout_bo_mix.optimize(n_iter)
      pbar.update(20)
      dropout_copy_best_x , dropout_copy_best_f = dropout_bo_copy.optimize(n_iter)
      pbar.update(20)
      dropout_random_best_x , dropout_random_best_f = dropout_bo_random.optimize(n_iter)
      pbar.update(10)
      rembo_best_x, rembo_best_f = rembo.optimize(n_iter)
      pbar.update(10)
      dropout_bandit_est1_best_x, dropout_bandit_est1_best_f = dropout_bandit_est1.optimize(n_iter)
      pbar.update(20) 
      basic_bo_best_x, basic_bo_best_f = basic_bo.optimize(n_iter)
      pbar.update(10)
      dropout_bandit_bc_ucb_best_x, dropout_bandit_bc_ucb_best_f = dropout_bandit_bc.optimize(n_iter)
      dropout_bandit_bc.save_arm_selection_history('dropout_bandit_bc_ucb_arm_selection_binary.csv')
      pbar.update(10)
    
except subprocess.CalledProcessError:
  print(captured)
  raise

  0%|          | 0/100 [elapsed: 00:00 remaining: ?]

KeyboardInterrupt: 

## アームの履歴 BC_UCBのバンディット

In [None]:
# Load the saved CSV file
df = pd.read_csv("dropout_bandit_bc_ucb_arm_selection_binary.csv", index_col="Iteration")

# Display the first few rows of the DataFrame
print(df.head())

# Calculate arm selection frequency
arm_freq = df.sum().sort_values(ascending=False)

# Create a bar plot of arm selection frequency
plt.figure(figsize=(12, 6))
sns.barplot(x=arm_freq.index, y=arm_freq.values)
plt.title("Arm Selection Frequency (BC-UCB)")
plt.xlabel("Arm")
plt.ylabel("Frequency")
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()

# Create a heatmap of arm selection over iterations
plt.figure(figsize=(12, 8))
sns.heatmap(df.T, cmap="YlOrRd", cbar_kws={'label': 'Selected'})
plt.title("Arm Selection Over Iterations (BC-UCB)")
plt.xlabel("Iteration")
plt.ylabel("Arm")
plt.tight_layout()
plt.show()

## Dropout_Banditのimprovementのプロット

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(range(len(dropout_bandit_bc.improvement_history)), dropout_bandit_bc.improvement_history, label='Dropout-Bandit-BC-UCB')
plt.xlabel('Iteration')
plt.ylabel('Improvement')
plt.title('Improvement vs Iteration')
plt.legend()
plt.grid(True)
plt.show()

## Dropout_Banditの次元重要度のプロット

In [None]:
dropout_bandit_bc.plot_dim_sensitivity()

# 結果のプロット

In [None]:
plt.figure(figsize=(12, 8))
plt.plot(range(1, n_initial + n_iter + 1), dropout_bo_mix.eval_history, label='Dropout-Mix BO')
plt.plot(range(1, n_initial + n_iter + 1), dropout_bo_copy.eval_history, label='Dropout-Copy BO')
plt.plot(range(1, n_initial + n_iter + 1), dropout_bo_random.eval_history, label='Dropout-Random BO')
plt.plot(range(1, n_initial + n_iter + 1), rembo.eval_history, label='REMBO')
plt.plot(range(1, n_initial + n_iter + 1), dropout_bandit_bc.eval_history, label='Dropout-Bandit-BC-UCB BO')
plt.plot(range(1, n_initial + n_iter + 1), basic_bo.eval_history, label='Basic BO')
plt.plot(range(1, n_initial + n_iter + 1), dropout_bandit_est1.eval_history, label='Dropout-Bandit-EST1 BO')
plt.axhline(y=global_optimum, color='r', linestyle='--', label='Global Optimum')
plt.xlabel('Iteration')
plt.ylabel('Best Function Value')
plt.title('Comparison of Optimization Algorithms　for Rosenbrock Function')
plt.legend()
plt.yscale('symlog')
plt.grid(True)
plt.show()