# 結論

データ点数，素子数と層数（パラメータ数），データの次元数に比例して計算量が多くなり，GPU の優位性が上がっていく

ベイズ最適化は基本的にデータ点数が少ないため，CPU での計算で十分である場合がある（ユニット数 64, 潜在層 3 程度） 

In [5]:
!pip install -U pip
!pip install botorch

Collecting pip
  Using cached pip-24.2-py3-none-any.whl.metadata (3.6 kB)
Using cached pip-24.2-py3-none-any.whl (1.8 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-24.2


In [1]:
import torch
from botorch.optim import optimize_acqf
from botorch.acquisition import UpperConfidenceBound
from scipy.optimize import minimize
import numpy as np

In [2]:
def get_opposite(direction: str) -> str:
    """
    Examples
    --------
    >>> get_opposite('a')
    'c'
    """
    pair_dict = {"a": "c", "c": "a", "b": "d", "d": "b"}
    return pair_dict.get(direction, "")


def judge_continuity(d_from: str, current_direction: str) -> bool:
    """
    Examples
    --------
    >>> judge_continuity('a', 'ad')
    False
    >>> judge_continuity('a', 'bc')
    True
    """
    d_opposite = get_opposite(d_from)
    return d_opposite in current_direction


def get_next_coordinate(
    d_to: str, current_coordinate: tuple[int, int]
) -> tuple[int, int]:
    """
    Examples
    --------
    >>> get_next_coordinate('a', (0, 0))
    (-1, 0)
    """
    update_dict = {"a": (-1, 0), "b": (0, -1), "c": (0, 1), "d": (1, 0)}
    delta = update_dict.get(d_to, (0, 0))
    return (current_coordinate[0] + delta[0], current_coordinate[1] + delta[1])


def judge_location_validity(current: tuple[int, int], shape: tuple[int, int]) -> bool:
    """
    Examples
    --------
    >>> judge_location_validity((-1, 0), (3, 3))
    False
    >>> judge_location_validity((1, 2), (3, 3))
    True
    """
    return 0 <= current[0] < shape[0] and 0 <= current[1] < shape[1]


def get_d_to(d_from: str, current_direction: str) -> str:
    """
    Examples
    --------
    >>> get_d_to('a', 'ad')
    'd'
    """
    return (
        current_direction[1] if current_direction[0] == d_from else current_direction[0]
    )


def navigate_through_matrix(direction_matrix, start, goal):
    history = []
    current = start
    shape = direction_matrix.shape

    if direction_matrix[current] != "bd":
        return history

    history.append(current)
    d_to = "d"
    next_pos = get_next_coordinate(d_to, current)

    while judge_location_validity(next_pos, shape) and current != goal:
        if not judge_continuity(d_to, direction_matrix[next_pos]):
            break

        current = next_pos
        history.append(current)
        if current == goal:
            break

        direction = direction_matrix[current]
        d_from = get_opposite(d_to)
        d_to = get_d_to(d_from, direction)
        next_pos = get_next_coordinate(d_to, current)

    return history


def manhattan_distance(coord1: tuple[int, int], coord2: tuple[int, int]) -> int:
    """
    Examples
    --------
    >>> manhattan_distance((0, 0), (3, 3))
    6
    """
    return abs(coord1[0] - coord2[0]) + abs(coord1[1] - coord2[1])


class WarcraftObjectiveBoTorch:
    def __init__(
        self,
        weight_matrix: torch.tensor,
    ) -> None:
        self.weight_matrix = weight_matrix
        self.shape = weight_matrix.shape
        self.search_space_1d_dict = {
            -3: "oo", -2: "ab", -1: "ac", 0: "ad", 1: "bc", 2: "bd", 3: "cd"
        }
        self.reverse_search_space_1d_dict = {v: k for k, v in self.search_space_1d_dict.items()}

    def string_to_tensor(
        self,
        direction_matrix: np.ndarray
    ) -> torch.tensor:
        tensor_matrix = torch.zeros(self.shape, dtype=torch.int)
        for i in range(self.shape[0]):
            for j in range(self.shape[1]):
                tensor_matrix[i, j] = self.reverse_search_space_1d_dict.get(direction_matrix[i, j], 0.0)
        return torch.tensor(tensor_matrix, dtype=torch.float32)

    def tensor_to_string(
        self,
        x: torch.tensor
    ) -> np.ndarray:
        direction_matrix = np.zeros(self.shape, dtype=object)
        for i in range(self.shape[0]):
            for j in range(self.shape[1]):
                direction_matrix[i, j] = self.search_space_1d_dict.get(x[i, j].item())
        return direction_matrix

    def __call__(
        self,
        x: torch.tensor
    ) -> torch.Tensor:
        if type(x) == torch.Tensor:
            direction_matrix = self.tensor_to_string(x)
        else:
            direction_matrix = x

        start = (0, 0)
        goal = (self.shape[0]-1, self.shape[1]-1)

        history = navigate_through_matrix(direction_matrix, start, goal)

        if history:
            path_weight = sum(self.weight_matrix[point] for point in history)
            norm_const = manhattan_distance(start, goal)
            loss1 = 1 - (1 - manhattan_distance(history[-1], goal) / norm_const) + path_weight
        else:
            loss1 = 1

        mask = direction_matrix != "oo"
        loss2 = self.weight_matrix[mask].sum()

        loss = loss1 + loss2
        score = -loss

        return score

    def visualize(
        self,
        x: torch.tensor
    ) -> None:
        direction_matrix = self.tensor_to_string(x)
        print(direction_matrix)


def generate_initial_data(
    objective_function: callable,
    dataset_size: int,
    shape: tuple[int, int],
) -> torch.tensor:
    values = torch.tensor([-3, -2, -1, 0, 1, 2, 3])
    n, m = shape
    X_train = values[torch.randint(0, len(values), (dataset_size, n, m))]
    y_train = torch.stack([objective_function(x) for x in X_train]).unsqueeze(-1)
    return X_train, y_train


weight_matrix = torch.Tensor([
    [0.1, 0.4, 0.8, 0.8],
    [0.2, 0.4, 0.4, 0.8],
    [0.8, 0.1, 0.1, 0.2],
])

In [20]:
# import torch
# import torch.nn as nn
# from torch.distributions.normal import Normal
# from botorch.models.model import Model
# from gpytorch.likelihoods import GaussianLikelihood
# from gpytorch.distributions import MultivariateNormal

# # Bayesian Linear Regression class
# class BayesianLinearRegression(nn.Module):
#     def __init__(self, input_dim, output_dim):
#         super(BayesianLinearRegression, self).__init__()
#         self.input_dim = input_dim
#         self.output_dim = output_dim

#         # Parameters for prior distributions of weights and biases
#         self.w_mu = nn.Parameter(torch.zeros(input_dim, output_dim))
#         self.w_log_sigma = nn.Parameter(torch.zeros(input_dim, output_dim))
#         self.b_mu = nn.Parameter(torch.zeros(output_dim))
#         self.b_log_sigma = nn.Parameter(torch.zeros(output_dim))

#     def forward(self, x):
#         w_sigma = torch.exp(self.w_log_sigma)
#         b_sigma = torch.exp(self.b_log_sigma)

#         # Sample weights and biases
#         w = self.w_mu + w_sigma * torch.randn_like(self.w_mu)
#         b = self.b_mu + b_sigma * torch.randn_like(self.b_mu)

#         return torch.matmul(x, w) + b

#     def predict_dist(self, x):
#         y = self.forward(x)

#         # Compute uncertainty in the output
#         w_sigma = torch.exp(self.w_log_sigma)
#         b_sigma = torch.exp(self.b_log_sigma)

#         # Calculate the standard deviation considering the uncertainty in weights and biases
#         output_sigma = torch.sqrt(torch.matmul(x**2, w_sigma**2) + b_sigma**2)

#         return Normal(y, output_sigma)

# # Bayesian MLP class
# class BayesianMLP(nn.Module):
#     def __init__(self, input_dim, min_val=None, max_val=None, hidden_unit_size=64, clipping=False):
#         super(BayesianMLP, self).__init__()
#         self.hidden1 = nn.Linear(input_dim, hidden_unit_size)
#         self.hidden2 = nn.Linear(hidden_unit_size, hidden_unit_size)
#         self.hidden3 = nn.Linear(hidden_unit_size, hidden_unit_size)
#         self.hidden4 = nn.Linear(hidden_unit_size, hidden_unit_size)
#         self.hidden5 = nn.Linear(hidden_unit_size, hidden_unit_size)
#         self.relu = nn.ReLU()
#         self.bayesian_output = BayesianLinearRegression(hidden_unit_size, 1)
#         self.min_val = min_val
#         self.max_val = max_val
#         self.clipping = clipping

#     def forward(self, x):
#         x = self.relu(self.hidden1(x))
#         x = self.relu(self.hidden2(x))
#         x = self.relu(self.hidden3(x))
#         x = self.relu(self.hidden4(x))
#         x = self.relu(self.hidden5(x))

#         # Get output from Bayesian linear regression
#         y_dist = self.bayesian_output.predict_dist(x)

#         if self.min_val or self.max_val:
#             y_mean = torch.clamp(y_dist.mean, min=self.min_val, max=self.max_val)
#         else:
#             y_mean = y_dist.mean

#         if self.clipping:
#             y_mean = y_mean.clamp(min=-1e6, max=1e6)
#             y_stddev = y_dist.stddev.clamp(min=1e-6, max=1e1)
#         else:
#             y_stddev = y_dist.stddev

#         return Normal(y_mean, y_stddev)

# # Model class using Bayesian MLP
# class BayesianMLPModel(Model):
#     def __init__(self, train_X, train_Y, min_val=None, max_val=None, clipping=False):
#         super().__init__()
#         self.bayesian_mlp = BayesianMLP(train_X.shape[1], min_val, max_val, clipping=clipping, hidden_unit_size=64 * 20)
#         self.likelihood = GaussianLikelihood()
#         self._num_outputs = 1
#         self._train_inputs = train_X.to(train_X.device)  # Ensure it's on the right device
#         self._train_targets = train_Y.to(train_Y.device)  # Ensure it's on the right device

#     def forward(self, x):
#         return self.bayesian_mlp(x.to(x.device))

#     def posterior(self, X, observation_noise=False, **kwargs):
#         pred_dist = self.bayesian_mlp(X.to(X.device))
#         mean = pred_dist.mean.squeeze(-1)  # Ensure mean is 2D
#         stddev = pred_dist.stddev.squeeze(-1)  # Ensure stddev is 2D
#         covar = torch.diag_embed(stddev**2)
#         return MultivariateNormal(mean, covar)

#     @property
#     def num_outputs(self):
#         return self._num_outputs

#     @property
#     def train_inputs(self):
#         return self._train_inputs

#     @property
#     def train_targets(self):
#         return self._train_targets

# # Function to train the model with GPU support
# def fit_pytorch_model(model, num_epochs=1000, learning_rate=0.01):
#     optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
#     model.train()
#     for epoch in range(num_epochs):
#         # print(f"epoch: {epoch}")
#         optimizer.zero_grad()
#         loss = -model(model.train_inputs).log_prob(model.train_targets).mean()
#         loss.backward()
#         optimizer.step()

In [4]:
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
from botorch.models.model import Model
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.distributions import MultivariateNormal

# Bayesian Linear Regression class
class BayesianLinearRegression(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(BayesianLinearRegression, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim

        # Parameters for prior distributions of weights and biases
        self.w_mu = nn.Parameter(torch.zeros(input_dim, output_dim))
        self.w_log_sigma = nn.Parameter(torch.zeros(input_dim, output_dim))
        self.b_mu = nn.Parameter(torch.zeros(output_dim))
        self.b_log_sigma = nn.Parameter(torch.zeros(output_dim))

    def forward(self, x):
        w_sigma = torch.exp(self.w_log_sigma)
        b_sigma = torch.exp(self.b_log_sigma)

        # Sample weights and biases
        w = self.w_mu + w_sigma * torch.randn_like(self.w_mu)
        b = self.b_mu + b_sigma * torch.randn_like(self.b_mu)

        return torch.matmul(x, w) + b

    def predict_dist(self, x):
        y = self.forward(x)

        # Compute uncertainty in the output
        w_sigma = torch.exp(self.w_log_sigma)
        b_sigma = torch.exp(self.b_log_sigma)

        # Calculate the standard deviation considering the uncertainty in weights and biases
        output_sigma = torch.sqrt(torch.matmul(x**2, w_sigma**2) + b_sigma**2)

        return Normal(y, output_sigma)


# Bayesian MLP class with adjustable hidden units and layers
class BayesianMLP(nn.Module):
    def __init__(self, input_dim, hidden_unit_size=64, num_hidden_layers=3, min_val=None, max_val=None, clipping=False):
        super(BayesianMLP, self).__init__()
        layers = []
        layers.append(nn.Linear(input_dim, hidden_unit_size))
        layers.append(nn.ReLU())

        for _ in range(num_hidden_layers - 1):
            layers.append(nn.Linear(hidden_unit_size, hidden_unit_size))
            layers.append(nn.ReLU())

        self.hidden_layers = nn.Sequential(*layers)
        self.bayesian_output = BayesianLinearRegression(hidden_unit_size, 1)
        self.min_val = min_val
        self.max_val = max_val
        self.clipping = clipping

    def forward(self, x):
        x = self.hidden_layers(x)

        # Get output from Bayesian linear regression
        y_dist = self.bayesian_output.predict_dist(x)

        if self.min_val or self.max_val:
            y_mean = torch.clamp(y_dist.mean, min=self.min_val, max=self.max_val)
        else:
            y_mean = y_dist.mean

        if self.clipping:
            y_mean = y_mean.clamp(min=-1e6, max=1e6)
            y_stddev = y_dist.stddev.clamp(min=1e-6, max=1e1)
        else:
            y_stddev = y_dist.stddev

        return Normal(y_mean, y_stddev)

# Model class using Bayesian MLP with adjustable hidden units and layers
class BayesianMLPModel(Model):
    def __init__(self, train_X, train_Y, hidden_unit_size=64, num_hidden_layers=3, min_val=None, max_val=None, clipping=False):
        super().__init__()
        self.bayesian_mlp = BayesianMLP(
            input_dim=train_X.shape[1], 
            hidden_unit_size=hidden_unit_size, 
            num_hidden_layers=num_hidden_layers, 
            min_val=min_val, 
            max_val=max_val, 
            clipping=clipping
        )
        self.likelihood = GaussianLikelihood()
        self._num_outputs = 1
        self._train_inputs = train_X.to(train_X.device)  # Ensure it's on the right device
        self._train_targets = train_Y.to(train_Y.device)  # Ensure it's on the right device

    def forward(self, x):
        return self.bayesian_mlp(x.to(x.device))

    def posterior(self, X, observation_noise=False, **kwargs):
        pred_dist = self.bayesian_mlp(X.to(X.device))
        mean = pred_dist.mean.squeeze(-1)  # Ensure mean is 2D
        stddev = pred_dist.stddev.squeeze(-1)  # Ensure stddev is 2D
        covar = torch.diag_embed(stddev**2)
        return MultivariateNormal(mean, covar)

    @property
    def num_outputs(self):
        return self._num_outputs

    @property
    def train_inputs(self):
        return self._train_inputs

    @property
    def train_targets(self):
        return self._train_targets
    

# Function to train the model with GPU support
def fit_pytorch_model(model, num_epochs=1000, learning_rate=0.01):
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    model.train()
    for epoch in range(num_epochs):
        # print(f"epoch: {epoch}")
        optimizer.zero_grad()
        loss = -model(model.train_inputs).log_prob(model.train_targets).mean()
        loss.backward()
        optimizer.step()

In [7]:
import time
import warnings
import torch
from botorch.optim import optimize_acqf
from botorch.acquisition import UpperConfidenceBound
# from src.bnn import BayesianMLPModel
# from src.bnn import fit_pytorch_model

warnings.filterwarnings("ignore")

# Function to run the optimization
def run_optimization(device):
    print(f"Running on device: {device}")
    start_time = time.time()

    # Step 1: Define the Objective Function
    weight_matrix = torch.Tensor([
        [0.1, 0.4, 0.8, 0.8],
        [0.2, 0.4, 0.4, 0.8],
        [0.8, 0.1, 0.1, 0.2],
    ]).to(device)

    objective_function = WarcraftObjectiveBoTorch(weight_matrix=weight_matrix)

    # Step 2: Generate Initial Data
    X_train, y_train = generate_initial_data(objective_function, 10, weight_matrix.shape)

    # Flatten X_train and move it to the correct device
    n_samples = X_train.shape[0]
    X_train_flat = X_train.view(n_samples, -1).float().to(device)
    y_train = y_train.to(device)

    # Step 3: Train the Bayesian MLP Model
    model = BayesianMLPModel(X_train_flat, y_train, hidden_unit_size=128, num_hidden_layers=5).to(device)
    fit_pytorch_model(model, num_epochs=1000, learning_rate=0.01)

    # Repeat optimization for a specified number of iterations
    n_iterations = 5

    for iteration in range(n_iterations):
        print(f"Iteration {iteration + 1}/{n_iterations}")
        # Step 4: Define the Acquisition Function
        ucb = UpperConfidenceBound(model, beta=0.1)

        # Step 5: Optimize the Acquisition Function
        candidate_flat, acq_value = optimize_acqf(
            acq_function=ucb,
            bounds=torch.tensor([[-3.0] * X_train_flat.shape[1], [3.0] * X_train_flat.shape[1]]).to(device),
            q=1,
            num_restarts=5,
            raw_samples=20,
        )

        candidate_flat = torch.round(candidate_flat).to(device)
        min_key, max_key = -3, 3
        candidate_flat = torch.clamp(candidate_flat, min=min_key, max=max_key)
        candidate = candidate_flat.view(weight_matrix.shape).to(device)
        y_new = objective_function(candidate).to(device)

        # Update the Model
        X_train = torch.cat([X_train.to(device), candidate.unsqueeze(0).to(device)])
        y_train = torch.cat([y_train.to(device), y_new.unsqueeze(0).unsqueeze(-1).to(device)])
        X_train_flat = X_train.view(X_train.shape[0], -1).float().to(device)

        # Refit the Bayesian MLP model
        model = BayesianMLPModel(X_train_flat, y_train, hidden_unit_size=128, num_hidden_layers=5).to(device)
        fit_pytorch_model(model, num_epochs=1000, learning_rate=0.01)

    print("Optimization completed.")
    optim_idx = y_train.argmax()
    print(f'Optimal solution: \n{X_train[optim_idx]}, \nFunction value: {y_train[optim_idx].item()}')

    end_time = time.time()
    print(f"Total time on {device}: {end_time - start_time} seconds")



# # Run on GPU
# if torch.cuda.is_available():
#     run_optimization(torch.device("cuda"))
# else:
#     print("GPU not available.")

# Run on CPU
run_optimization(torch.device("cpu"))

Running on device: cpu
Iteration 1/5
Iteration 2/5
Iteration 3/5
Iteration 4/5
Iteration 5/5
Optimization completed.
Optimal solution: 
tensor([[ 0., -3., -3., -2.],
        [-1., -1.,  0.,  3.],
        [-3.,  3., -3., -3.]]), 
Function value: -3.8000001907348633
Total time on cpu: 4.562145948410034 seconds


NameError: name 'model' is not defined