Generated by ChatGPT o3-mini-high and modified by kei-mo

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

# ===========================
# 1. ハイパーパラメータの設定
# ===========================
T = 1.0  # 最大時刻
beta = 10.0  # 定数beta（大きくすることでT=1でほぼ完全なノイズ状態に）
num_epochs = 10000  # 学習エポック数（デモ用なので少なめに設定してもOK）
batch_size = 128
learning_rate = 1e-3


# α(t)=exp(-beta*t)
def alpha_bar(t):
    return torch.exp(-beta * t)


# =========================================
# 2. 混合ガウス分布からサンプリングする関数
# =========================================
def sample_data(batch_size):
    # 2成分の混合ガウス：平均[-2,0] と [2,0]、共分散は単位行列
    means = [torch.tensor([-2.0, 0.0]), torch.tensor([2.0, 0.0])]
    # 各サンプルで成分をランダムに選択
    comp = torch.randint(0, 2, (batch_size,))
    samples = []
    for i in range(batch_size):
        mu = means[comp[i]]
        samples.append(mu + torch.randn(2))
    return torch.stack(samples, dim=0)


# ====================================
# 3. スコア推定のためのネットワークの定義
# ====================================
class ScoreNet(nn.Module):
    def __init__(self):
        super(ScoreNet, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(3, 128),  # 入力：2次元のxとスカラーt
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 2),  # 出力：2次元のスコア
        )

    def forward(self, x, t):
        # tが1次元の場合、unsqueezeして結合できるようにする
        if t.dim() == 1:
            t = t.unsqueeze(1)
        inp = torch.cat([x, t], dim=1)
        return self.net(inp)


# インスタンス生成
score_model = ScoreNet()
optimizer = optim.Adam(score_model.parameters(), lr=learning_rate)

# ===========================
# 4. スコアマッチングによる学習ループ
# ===========================
for epoch in range(num_epochs):
    optimizer.zero_grad()

    # (1) データ x0 を混合ガウス分布からサンプリング
    x0 = sample_data(batch_size)  # shape: [batch_size, 2]

    # (2) t を一様に [0, T] からサンプル（各サンプル毎に異なるt）
    t = torch.rand(batch_size) * T  # shape: [batch_size]

    # (3) α(t) を計算（shapeを合わせるためunsqueeze）
    a = alpha_bar(t).unsqueeze(1)  # shape: [batch_size, 1]

    # (4) ノイズ ε を標準正規分布からサンプリング
    noise = torch.randn_like(x0)

    # (5) forward process により x_t を生成
    sqrt_a = torch.sqrt(a)
    sqrt_one_minus_a = torch.sqrt(1 - a)
    x_t = sqrt_a * x0 + sqrt_one_minus_a * noise

    # (6) ターゲットスコア： s_target = -ε / sqrt(1 - α(t))
    target_score = -noise / sqrt_one_minus_a

    # (7) ネットワークによるスコア推定
    s_pred = score_model(x_t, t)

    # (8) MSE損失の計算
    loss = ((s_pred - target_score) ** 2).mean()
    loss.backward()
    optimizer.step()

    if epoch % 1000 == 0:
        print(f"Epoch {epoch}: loss = {loss.item():.6f}")


ModuleNotFoundError: No module named 'torch'

In [None]:
# ===========================================
# 5. 学習済みスコアを用いた逆SDEによるサンプリング
# ===========================================
def sample_reverse_sde(x_T, num_steps):
    """
    逆SDEのEuler-Maruyama法によるサンプリング
    逆SDE: dx = [ -0.5*beta*x - beta*s_theta(x,t) ] dt + sqrt(beta) dW̄
    """
    dt = T / num_steps
    x = x_T
    # 初期tはT（各サンプル毎に）
    t = T * torch.ones(x_T.size(0))
    for i in range(num_steps):
        # 逆SDEのドリフト項
        drift = -0.5 * beta * x - beta * score_model(x, t)
        # ノイズ項
        noise = torch.randn_like(x)
        x = x + drift * dt + torch.sqrt(torch.tensor(beta * dt)) * noise
        t = t - dt  # 時刻を逆行
    return x


# サンプル生成（num_samples個）
num_samples = 1000
# 逆SDEの開始点 x_T は、forward process により完全なノイズ状態となるので、標準正規分布からサンプル
x_T = torch.randn(num_samples, 2)
x_gen = sample_reverse_sde(x_T, num_steps=1000)

# ---------------------
# 6. 結果の可視化
# ---------------------
import numpy as np

# 生成されたサンプルのプロット
x_gen_np = x_gen.detach().numpy()
plt.figure(figsize=(6, 6))
plt.scatter(x_gen_np[:, 0], x_gen_np[:, 1], s=10, alpha=0.5)
plt.title("学習済みスコアによる逆SDEサンプリング結果")
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
plt.show()

# 元のデータ分布も確認
x_data = sample_data(1000).detach().numpy()
plt.figure(figsize=(6, 6))
plt.scatter(x_data[:, 0], x_data[:, 1], s=10, alpha=0.5, color="red")
plt.title("混合ガウス分布からの元データ")
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
plt.show()
