In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import time

In [None]:
from operator import ge
import numpy as np
from typing import Callable, List, Tuple
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse

class CMAES():
    def __init__(self, arg_names: List[str], ave_vec: List[float], sigma=1.0, max_iter=100, population=None, mu=None, fixed_args=None):
        self.arg_names = arg_names
        self.fixed_args = fixed_args or {}
        self.dim = len(ave_vec)
        self.max_iter = max_iter
        # 個体数と選抜数
        self.population = population if population else int(4 + 3 * np.log(self.dim))
        self.mu = mu if mu else int(np.floor(self.population / 2))
        # 平均値ベクトル
        self.m = np.array(ave_vec, dtype=np.float64)
        # 重み行列の計算(muを定義した後)
        self.weights = self.calc_weights()
        self.mu_eff = 1.0 / (self.weights**2).sum()
        self.sigma = float(sigma)
        self.C = np.identity(self.dim)
        self.c_1 = 2.0 / ((self.dim + 1.3) ** 2 + self.mu_eff)
        self.c_mu = min(
        1 - self.c_1,
        2.0 * (self.mu_eff - 2 + 1/self.mu_eff) / ((self.dim + 2) ** 2 + self.mu_eff)
        )
        self.chi = np.sqrt(self.dim) * (1 - 1 / (4 * self.dim) + 1 / (21 * (self.dim ** 2)))
        self.c_c = (4 + self.mu_eff / self.dim) / (self.dim + 4 + 2 * self.mu_eff / self.dim)
        self.c_sigma = (self.mu_eff + 2) / (self.dim + self.mu_eff + 5)
        self.p_c = np.zeros(self.dim)
        self.p_sigma = np.zeros(self.dim)
        self.loss = float('inf')
        self.best_val = None

        self.history = {
            'best_fitness': [],
            'mean_fitness': [],
            'worst_fitness': [],
            'mean_vector': [],
            'sigma': [],
            'eigenvalues': [],
            'populations': []  # 各世代の全個体
        }

    def sample(self) -> List[float]:
        """多次元正規分布からサンプリングをする"""
        arr = np.random.multivariate_normal(mean=self.m, cov=self.C, size=self.dim)
        arr = arr.tolist()[0]
        return arr

    def calc_weights(self):
        """対数重みを計算する"""
        raw_weights = np.log(self.mu + 0.5) - np.log(np.arange(1, self.mu + 1))
        return raw_weights / raw_weights.sum()

    def matrix_inverse_sqrt(self):
        # 固有値分解
        eigvals, eigvecs = np.linalg.eigh(self.C)

        # 数値安定性のために微小値で下限をつける
        eigvals = np.maximum(eigvals, 1e-20)

        # Λ^{-1/2}
        D_inv_sqrt = np.diag(1.0 / np.sqrt(eigvals))

        # C^{-1/2} = Q Λ^{-1/2} Q^T
        C_inv_sqrt = eigvecs @ D_inv_sqrt @ eigvecs.T
        return C_inv_sqrt

    def compute_d_sigma(self):
        return 1 + self.c_sigma + 2 * max(0, np.sqrt((self.mu_eff - 1) / (self.dim + 1)) - 1)

    def debug(self):
        print(f"weights: {self.weights}")
        print(f"")

    def record_history(self, fitness_values, population):
        self.history['best_fitness'].append(np.min(fitness_values))
        self.history['mean_fitness'].append(np.mean(fitness_values))
        self.history['worst_fitness'].append(np.max(fitness_values))
        self.history['mean_vector'].append(self.m.copy())
        self.history['sigma'].append(self.sigma)
        eigenvals, _ = np.linalg.eigh(self.C)
        self.history['eigenvalues'].append(eigenvals.copy())
        self.history['populations'].append(population.copy())

    def opt(self, f: Callable) -> Tuple[float, List[float]]:
        dim = self.dim
        mu_eff = self.mu_eff

        # 選抜を行うループ
        for gen in range(self.max_iter):
            print(f"{'='*5}{gen+1}世代目{'='*5}")
            # 個体集合を生成
            group: List[List[float]] = []
            for _ in range(self.population):
                group.append(self.sample())

            # 関数に入力する
            scores: List[Tuple[float, List[float]]] = []
            for x in group:
                arg_dict = {name: val for name, val in zip(self.arg_names, x)}
                arg_dict.update(self.fixed_args)
                current_loss = f(**arg_dict)
                scores.append((current_loss, x))

            # 損失で昇順に並べ替える
            scores.sort(key=lambda x: x[0])

            # 暫定出力値の更新
            if self.loss > scores[0][0]:
                # print(f"DEBUG loss: {scores[0][0]}")
                self.loss = scores[0][0]
                self.best_val = scores[0][1]
                print(f"最小値の更新: ")
                print(f"値: {self.loss}")
                print(f"ベクトル: {self.best_val}")

            fitness_values = np.array([i[0] for i in scores])
            population = np.array([i[1] for i in scores])
            # print(f"min(fitness_values): {np.min(fitness_values)}")
            self.record_history(fitness_values, population)

            # self.muの個体を取り出す
            elites = scores[:self.mu]
            elites = np.array([i[1] for i in elites])

            # 平均値ベクトルの更新
            m_old = self.m
            self.m = self.weights @ elites
            # print(f"m: {self.m}")

            # 共分散行列のランクmu更新
            C_mu = np.zeros((dim, dim))
            for i in range(self.mu):
                x = np.array(elites[i])
                y_i = x - m_old
                C_mu = C_mu + self.weights[i] * (np.outer(y_i, y_i) / self.mu)

            # print(f"[DEBUG] C_mu: \n{C_mu}")
            C_mu /= self.sigma ** 2

            # ステップサイズσの更新処理
            y = (self.m - m_old) / self.sigma
            p_sigma = (1 - self.c_sigma) * self.p_sigma
            p_sigma += np.sqrt(1 - (1 - self.c_sigma) ** 2) * mu_eff * (self.matrix_inverse_sqrt() @ y)

            p_sigma_norm = np.linalg.norm(p_sigma)
            self.sigma = self.sigma * np.exp(
                (self.c_sigma / self.compute_d_sigma())
                * (p_sigma_norm / self.chi - 1)
            )
            self.p_sigma = p_sigma

            """
            # ステップサイズが多すぎるときにCの更新を止める
            left = np.sqrt((self.p_sigma ** 2).sum()) / np.sqrt(1 - (1 - self.c_sigma) ** (2 * (gen+1)))
            right = (1.4 + 2 / (self.dim + 1)) * self.chi
            hsigma = 1 if left < right else 0
            d_hsigma = (1 - hsigma) * self.c_c * (2 - self.c_c)
            """

            # 共分散行列のランク1更新
            self.p_c = (1 - self.c_c) * self.p_c + np.sqrt(1 - (1 - self.c_c) ** 2) * np.sqrt(mu_eff) * y
            C_1 = np.outer(self.p_c, self.p_c)

            # 共分散行列の更新
            C_new = (1 - self.c_mu - self.c_1) * self.C + self.c_mu * C_mu + self.c_1 * C_1
            self.C = C_new

        # print(f"[DEBUG] m: {m}")
        return (self.loss, self.best_val)

    def plot_convergence(self, figsize=(12, 8), ans=None):
        """収束履歴をプロット"""
        fig, axes = plt.subplots(2, 2, figsize=figsize)

        # 適応度の履歴
        generations = range(len(self.history['best_fitness']))
        axes[0, 0].semilogy(generations, self.history['best_fitness'], 'b-', label='Best')
        axes[0, 0].semilogy(generations, self.history['mean_fitness'], 'g-', label='Mean')
        axes[0, 0].semilogy(generations, self.history['worst_fitness'], 'r-', label='Worst')
        axes[0, 0].set_xlabel('Generation')
        axes[0, 0].set_ylabel('Fitness')
        axes[0, 0].set_title('Fitness Evolution')
        axes[0, 0].legend()
        axes[0, 0].grid(True)

        # ステップサイズの履歴
        axes[0, 1].semilogy(generations, self.history['sigma'], 'purple')
        axes[0, 1].set_xlabel('Generation')
        axes[0, 1].set_ylabel('Step Size (σ)')
        axes[0, 1].set_title('Step Size Evolution')
        axes[0, 1].grid(True)

        if self.dim == 2:
            mean_vectors = np.array(self.history['mean_vector'])
            axes[1, 0].plot(mean_vectors[:, 0], mean_vectors[:, 1], 'o-', markersize=3)
            axes[1, 0].plot(mean_vectors[0, 0], mean_vectors[0, 1], 'go', markersize=8, label='Start')
            axes[1, 0].plot(mean_vectors[-1, 0], mean_vectors[-1, 1], 'ro', markersize=8, label='End')
            axes[1, 0].set_xlabel(self.arg_names[0])
            axes[1, 0].set_ylabel(self.arg_names[1])
            axes[1, 0].set_title('Mean Vector Trajectory')
            axes[1, 0].legend()
            axes[1, 0].grid(True)
            if ans:
                axes[1, 0].plot(ans[0], ans[1], 'r*', markersize=8, label='Answer')

        eigenvalues = np.array(self.history['eigenvalues'])
        for i in range(self.dim):
            axes[1, 1].semilogy(generations, eigenvalues[:, i], label=f'λ{i+1}')
        axes[1, 1].set_xlabel('Generation')
        axes[1, 1].set_ylabel('Eigenvalues')
        axes[1, 1].set_title('Covariance Matrix Eigenvalues')
        axes[1, 1].legend()
        axes[1, 1].grid(True)

        plt.tight_layout()
        plt.show()

    def plot_2d_optimization(self, objective_func, xlim=(-3, 3), ylim=(-3, 3), figsize=(10, 8)):
        """2次元最適化の可視化"""
        if self.dim != 2:
            print("2次元問題のみ対応")
            return

        fig, ax = plt.subplots(figsize=figsize)

        # 等高線プロット
        x = np.linspace(xlim[0], xlim[1], 100)
        y = np.linspace(ylim[0], ylim[1], 100)
        X, Y = np.meshgrid(x, y)
        Z = np.zeros_like(X)

        for i in range(X.shape[0]):
            for j in range(X.shape[1]):
                Z[i, j] = objective_func(X[i, j], Y[i, j])

        contour = ax.contour(X, Y, Z, levels=20, alpha=0.6)
        ax.clabel(contour, inline=True, fontsize=8)

        # 最適化軌跡
        mean_vectors = np.array(self.history['mean_vector'])
        ax.plot(mean_vectors[:, 0], mean_vectors[:, 1], 'r-o', markersize=4, linewidth=2, label='Mean trajectory')

        # 最終世代の個体群と分散楕円
        if self.history['populations']:
            final_pop = self.history['populations'][-1]
            ax.scatter(final_pop[:, 0], final_pop[:, 1], alpha=0.6, s=20, label='Final population')

            # 分散楕円
            mean = mean_vectors[-1]
            cov = self.C * (self.sigma ** 2)
            eigenvals, eigenvecs = np.linalg.eigh(cov)

            # 95%信頼楕円
            angle = np.degrees(np.arctan2(*eigenvecs[:, 0][::-1]))
            width, height = 2 * np.sqrt(eigenvals) * 2.448  # 95%信頼区間

            ellipse = Ellipse(mean, width, height, angle=angle,
                            facecolor='none', edgecolor='red', linewidth=2, alpha=0.8)
            ax.add_patch(ellipse)

        ax.set_xlabel(self.arg_names[0])
        ax.set_ylabel(self.arg_names[1])
        ax.set_title('2D Optimization Visualization')
        ax.legend()
        ax.grid(True)
        ax.set_xlim(xlim)
        ax.set_ylim(ylim)

        plt.show()

In [None]:
def load_mnist_data():
  """
  MNISTをロードして前処理
  """
  mnist = fetch_openml('mnist_784', version=1, as_frame=False)
  X, y = mnist.data, mnist.target.astype(int)
  # (0-255) -> (0, 1)
  X = X / 255.0

  X_train, X_test, y_train, y_test = train_test_split(
      X, y, test_size=0.2, random_state=42, stratify=y
  )

  X_train, X_val, y_train, y_val = train_test_split(
      X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
  )

  print(f"訓練データ: {X_train.shape}")
  print(f"検証データ: {X_val.shape}")
  print(f"テストデータ: {X_test.shape}")

  return X_train, X_val, X_test, y_train, y_val, y_test


In [None]:
def create_data_loaders(X_train, X_val, X_test, y_train, y_val, y_test, batch_size=32):
  """PyTorchのDataLoaderを作成"""
  # numpy array -> Pytorch Tensor
  X_train_tensor = torch.FloatTensor(X_train)
  X_val_tensor = torch.FloatTensor(X_val)
  X_test_tensor = torch.FloatTensor(X_test)
  y_train_tensor = torch.LongTensor(y_train)
  y_val_tensor = torch.LongTensor(y_val)
  y_test_tensor = torch.LongTensor(y_test)

  train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
  val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
  test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

  train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
  val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
  test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

  return train_loader, val_loader, test_loader

In [None]:
class SmallMLP(nn.Module):
  def __init__(self, input_size=784, hidden_size=128, num_classes=10, dropout_rate=0.2):
    super(SmallMLP, self).__init__()
    self.fc1 = nn.Linear(input_size, hidden_size)
    self.fc2 = nn.Linear(hidden_size, num_classes)
    self.dropout = nn.Dropout(dropout_rate)

  def forward(self, x):
    # (batch_size, 28, 28) → (batch_size, 784)
    x = x.view(x.size(0), -1)
    x = F.relu(self.fc1(x))
    x = self.dropout(x)
    x = self.fc2(x)

    return x



In [None]:
class TwoLayerMLP(nn.Module):
    """
    2層の隠れ層を持つMLP
    """
    def __init__(self, input_size=784, hidden1_size=256, hidden2_size=128,
                 num_classes=10, dropout_rate=0.2):
        super(TwoLayerMLP, self).__init__()

        self.fc1 = nn.Linear(input_size, hidden1_size)
        self.fc2 = nn.Linear(hidden1_size, hidden2_size)
        self.fc3 = nn.Linear(hidden2_size, num_classes)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = x.view(x.size(0), -1)

        # 第1隠れ層
        x = F.relu(self.fc1(x))
        x = self.dropout(x)

        # 第2隠れ層
        x = F.relu(self.fc2(x))
        x = self.dropout(x)

        # 出力層
        x = self.fc3(x)

        return x

In [None]:
def train_smallmlp_with_params(learning_rate, hidden_size, dropout_rate,
                     train_loader, val_loader, epochs=10):
    """
    指定されたハイパーパラメータでモデルを訓練
    """
    start_time = time.time()
    # デバイスの設定
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # print(f"Using device: {device}")

    # パラメータの制約
    learning_rate = max(0.0001, min(0.1, learning_rate / 100))
    hidden_size = max(32, min(512, int(hidden_size * 100)))
    dropout_rate = max(0.0, min(0.5, dropout_rate / 100))

    model = SmallMLP(hidden_size=hidden_size, dropout_rate=dropout_rate)
    model = model.to(device)  # モデルをGPUに移動

    # 訓練
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    model.train()
    for epoch in range(epochs):
        for data, target in train_loader:
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss = loss.item()

        training_time = time.time() - start_time
        # print(f'Epoch [{epoch+1}/{epochs}], ' f'Train Loss: {train_loss:.4f}, ' f'Time: {training_time:.2f}')

    # 検証
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in val_loader:
            # データをGPUに移動
            data = data.to(device)
            target = target.to(device)

            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    training_time = time.time() - start_time
    accuracy = correct / total
    print(f"training time: {training_time:.2f}s, accuracy: {accuracy:.4f}")
    return -accuracy  # CMA-ESは最小化なので負の値を返す

In [None]:
def train_2lmlp_with_params(learning_rate, hidden1_size, hidden2_size, dropout_rate,
                     train_loader, val_loader, epochs=10):
    """
    指定されたハイパーパラメータでモデルを訓練
    """
    start_time = time.time()
    # デバイスの設定
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # print(f"Using device: {device}")

    # パラメータの制約
    learning_rate = max(0.0001, min(0.1, learning_rate))
    hidden1_size = max(32, min(512, int(hidden1_size)))
    hidden2_size = max(32, min(512, int(hidden2_size)))
    dropout_rate = max(0.0, min(0.5, dropout_rate))

    model = TwoLayerMLP(hidden1_size=hidden1_size, hidden2_size=hidden2_size, dropout_rate=dropout_rate)
    model = model.to(device)  # モデルをGPUに移動

    # 訓練
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    model.train()
    for epoch in range(epochs):
        for data, target in train_loader:
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss = loss.item()

        training_time = time.time() - start_time
        # print(f'Epoch [{epoch+1}/{epochs}], ' f'Train Loss: {train_loss:.4f}, ' f'Time: {training_time:.2f}')

    # 検証
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in val_loader:
            # データをGPUに移動
            data = data.to(device)
            target = target.to(device)

            output = model(data)
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    training_time = time.time() - start_time
    accuracy = correct / total
    print(f"訓練時間: {training_time:.2f}秒")
    print(f"正解率: {accuracy}")
    return -accuracy  # CMA-ESは最小化なので負の値を返す

In [None]:
X_train, X_val, X_test, y_train, y_val, y_test = load_mnist_data()
train_loader, val_loader, test_loader = create_data_loaders(X_train, X_val, X_test, y_train, y_val, y_test, batch_size=256)

訓練データ: (44800, 784)
検証データ: (11200, 784)
テストデータ: (14000, 784)


In [None]:
train_smallmlp_with_params(0.001, 256, 0.2, train_loader, val_loader, epochs=15)

training time: 11.24s, accuracy: 0.9528


-0.9527678571428572

In [None]:
train_2lmlp_with_params(0.005, 256, 512, 0.2, train_loader, val_loader, epochs=15)

In [None]:
params = ["learning_rate", "hidden_size", "dropout_rate"]
fixed_args = {"train_loader": train_loader, "val_loader": val_loader, "epochs": 10}
init_point = [1.0, 2.56, 2.0]
cmaes = CMAES(arg_names=params, ave_vec=init_point, max_iter=10, fixed_args=fixed_args)

In [105]:
loss, value = cmaes.opt(train_smallmlp_with_params)

=====1世代目=====
training time: 6.81s, accuracy: 0.9726
training time: 7.07s, accuracy: 0.9739
training time: 7.06s, accuracy: 0.9283
training time: 7.10s, accuracy: 0.9658
training time: 7.30s, accuracy: 0.9650
training time: 6.57s, accuracy: 0.9339
training time: 7.35s, accuracy: 0.9596
最小値の更新: 
値: -0.9739285714285715
ベクトル: [0.7078588571313538, 1.563757489762975, 0.7176062004276067]
=====2世代目=====
training time: 6.85s, accuracy: 0.9622
training time: 7.05s, accuracy: 0.9654
training time: 7.24s, accuracy: 0.9654
training time: 6.82s, accuracy: 0.9264
training time: 7.26s, accuracy: 0.9703
training time: 6.49s, accuracy: 0.9711
training time: 7.24s, accuracy: 0.9675
=====3世代目=====
training time: 6.96s, accuracy: 0.9762
training time: 7.10s, accuracy: 0.9713
training time: 7.00s, accuracy: 0.9631
training time: 6.72s, accuracy: 0.9768
training time: 7.21s, accuracy: 0.9326
training time: 6.27s, accuracy: 0.9630
training time: 6.96s, accuracy: 0.9697
最小値の更新: 
値: -0.9767857142857143
ベクトル: 

In [107]:
value

[0.2844775778363134, 3.4931197031262182, 0.4560618736564812]

In [109]:
train_smallmlp_with_params(value[0], value[1], value[2], train_loader, val_loader, epochs=15)

training time: 11.05s, accuracy: 0.9754


-0.9753571428571428