# ExcitedStateDiffusion 臆測測試 Notebook

這個notebook用於測試 `ExcitedStateDiffusion` 類別的功能。

In [1]:
import torch
from polydiff.diffusion.schedules import LinearSchedule, CosineSchedule,QuadraticSchedule,ExponentialSchedule
from polydiff.diffusion.forward import ExcitedStateDiffusion

print("Libraries imported successfully!")

torch.manual_seed(42)

  from .autonotebook import tqdm as notebook_tqdm


Libraries imported successfully!


<torch._C.Generator at 0x11cb72fd0>

## 初始化 DiffusionSchedule 和 ExcitedStateDiffusion

我們將初始化 `DiffusionSchedule` 和 `ExcitedStateDiffusion` 類別的實例。

In [2]:
# 創建一個簡單的DiffusionSchedule實例
# 這裡我們使用線性beta排程作為範例

schedule = ExponentialSchedule(num_timesteps=30)

# 初始化ExcitedStateDiffusion
vocab_size = 100  # 假設詞彙表大小為100
mask_token_id = 99 # 假設mask token ID為99
pad_token_id = 0   # 假設pad token ID為0

excited_state_diffusion = ExcitedStateDiffusion(
    schedule=schedule,
    vocab_size=vocab_size,
    mask_token_id=mask_token_id,
    pad_token_id=pad_token_id
)

print("ExcitedStateDiffusion 實例已成功初始化！")

ExcitedStateDiffusion 實例已成功初始化！


## 測試 `forward_mask_process` 方法

我們將提供一個範例token序列和時間步，並測試 `forward_mask_process` 方法，以驗證被mask的token和mask位置是否正確。

In [3]:
# 範例token序列 (batch_size=2, seq_len=5)
x_start_sample = torch.randint(1, vocab_size-1, (1, 512), dtype=torch.long)  # 生成長度為512的隨機token序列，避免使用pad_token_id(0)和mask_token_id(99)

# 測試時間步
t = [0,5,10,15,20,25,29]  # 假設這些是我們感興趣的時間步
print("=== Forward Mask Process 測試結果 ===\n")
print("原始序列:\n", x_start_sample.tolist())
print()

# ✅ 預先固定 mask_noise
mask_noise = torch.rand_like(x_start_sample, dtype=torch.float32) * 0.5

for t_val in t:
    t_tensor = torch.tensor([t_val], dtype=torch.long)  # Changed to single element tensor
    
    # ✅ 傳入相同的 mask_noise
    masked_tokens, mask_positions = excited_state_diffusion.forward_mask_process(
        x_start_sample, t_tensor, mask_noise=mask_noise
    )
    
    print(f"時間步 t={t_val}:")
    print(f"  Masked 序列: {masked_tokens.tolist()}")
    print(f"  Mask 位置:   {mask_positions.tolist()}")
    print(f"  被mask數量:  {mask_positions.sum(dim=1).tolist()}")
    print()

# ✅ 最後再做 pad/mask_token 驗證（同樣使用同一個 mask_noise）
t_final = torch.tensor([t[-1], t[-1]], dtype=torch.long)
masked_tokens_final, mask_positions_final = excited_state_diffusion.forward_mask_process(
    x_start_sample, t_final, mask_noise=mask_noise
)

pad_mask = (x_start_sample == excited_state_diffusion.pad_token_id)
assert torch.all(mask_positions_final[pad_mask] == 0), "Pad token 不應該被mask！"
print("✓ Pad token 未被mask的驗證成功！")

masked_indices = (mask_positions_final == 1)
assert torch.all(masked_tokens_final[masked_indices] == excited_state_diffusion.mask_token_id), "被mask的位置沒有變成mask_token_id！"
print("✓ 被mask位置的token ID驗證成功！")


=== Forward Mask Process 測試結果 ===

原始序列:
 [[79, 66, 61, 13, 47, 18, 69, 69, 57, 22, 71, 55, 77, 19, 44, 31, 84, 14, 24, 39, 30, 85, 92, 32, 88, 6, 72, 30, 80, 38, 17, 13, 36, 92, 90, 43, 54, 53, 63, 65, 57, 93, 48, 76, 84, 26, 36, 59, 20, 48, 23, 24, 55, 48, 65, 40, 70, 69, 67, 43, 27, 15, 87, 97, 97, 66, 12, 59, 70, 22, 40, 68, 11, 34, 43, 6, 46, 66, 77, 74, 57, 58, 77, 32, 19, 17, 98, 27, 40, 62, 90, 82, 86, 20, 48, 30, 26, 32, 96, 49, 83, 30, 84, 2, 48, 84, 64, 74, 10, 69, 78, 15, 70, 2, 57, 10, 1, 58, 92, 81, 34, 33, 10, 53, 27, 24, 48, 31, 22, 56, 58, 57, 81, 3, 91, 51, 19, 17, 13, 49, 42, 11, 60, 60, 62, 13, 17, 31, 7, 32, 45, 59, 87, 41, 23, 66, 45, 14, 1, 88, 47, 95, 36, 74, 96, 53, 80, 85, 47, 62, 32, 13, 85, 18, 26, 32, 43, 75, 17, 59, 69, 53, 34, 19, 11, 95, 16, 53, 65, 70, 9, 55, 75, 73, 95, 20, 60, 28, 97, 65, 69, 82, 89, 7, 14, 25, 34, 68, 24, 71, 35, 1, 73, 85, 1, 43, 18, 33, 80, 81, 27, 41, 80, 84, 18, 28, 89, 22, 95, 13, 32, 98, 88, 66, 12, 88, 42, 66, 29, 80, 44, 67, 

RuntimeError: The expanded size of the tensor (1) must match the existing size (2) at non-singleton dimension 0.  Target sizes: [1, 512].  Tensor sizes: [2, 1]

## 測試 `get_mask_probabilities` 方法

我們將使用一系列時間步來測試 `get_mask_probabilities` 方法，以確保返回正確的mask概率。

In [None]:
test_timesteps = torch.tensor([0, 1, 5, 10, 15, 20, 25], dtype=torch.long)
mask_probs = excited_state_diffusion.get_mask_probabilities(test_timesteps)

print("測試時間步:\n", test_timesteps)
print("對應的Mask概率:\n", mask_probs)

# 驗證：mask概率應該在0到1之間，並且隨時間步增加
assert torch.all(mask_probs >= 0) and torch.all(mask_probs <= 1), "Mask概率應該在0到1之間！"
assert torch.all(mask_probs[1:] >= mask_probs[:-1]), "Mask概率應該隨時間步增加！"
print("Mask概率的範圍和趨勢驗證成功！")

## 測試 `progressive_masking` 方法

我們將使用一個範例token序列和時間步列表來測試 `progressive_masking` 方法，並可視化結果。

In [None]:
# x_start_progressive = torch.tensor([
#     [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# ], dtype=torch.long)

# timesteps_to_visualize = [0, 5, 10, 15, 20, 25]

# # 使用固定種子以獲得可重複的結果
# results = excited_state_diffusion.progressive_masking(
#     x_start_progressive,
#     timesteps_to_visualize,
#     fixed_seed=42
# )

# print("原始序列:", x_start_progressive.tolist()[0])
# print("\n--- 漸進式 Masking 結果 ---")
# for res in results:
#     print(f"\n時間步: {res['timestep']}")
#     print(f"  Mask 概率: {res['mask_probability']:.4f}")
#     print(f"  Masked 序列: {res['masked_tokens'].tolist()[0]}")
#     print(f"  Mask 位置: {res['mask_positions'].tolist()[0]}")
#     print(f"  被mask的token數量: {int(res['num_masked'])}")

# # 簡單驗證：確保mask數量隨時間步增加
# num_masked_counts = [int(res['num_masked']) for res in results]
# assert all(num_masked_counts[i] <= num_masked_counts[i+1] for i in range(len(num_masked_counts)-1)), "被mask的token數量應該隨時間步增加！"
# print("\n漸進式masking的token數量趨勢驗證成功！")

In [None]:
import torch
import torch.nn.functional as F

from typing import Tuple, Optional
torch.manual_seed(42)   
def sample_q_xt_given_x0_markov(
    x_0: torch.Tensor,              # (B, L) token ids
    t: torch.Tensor,                # (B,) timestep
    mask_noise: torch.Tensor,      # (B, L) 固定隨機數
    schedule,
    mask_token_id: int,
    pad_token_id: Optional[int] = None
) -> Tuple[torch.Tensor, torch.Tensor]:
    """
    True Markov-style q(x_t | x_0) with global noise mask.

    Returns:
        x_t, mask_pos (float)
    """
    B, L = x_0.shape
    alpha_t, _, _ = schedule.get_parameters(t)  # (B,)
    gamma_t = (1.0 - alpha_t).unsqueeze(1).expand(B, L)

    # Global mask condition
    mask = mask_noise < gamma_t

    # 不 mask pad
    if pad_token_id is not None:
        mask = mask & (x_0 != pad_token_id)

    x_t = x_0.clone()
    x_t[mask] = mask_token_id

    return x_t, mask.float()


In [None]:
class LinearSchedule:
    def __init__(self, num_timesteps, beta_start=0.0001, beta_end=0.05, device="cpu"):
        self.device = device
        self.num_timesteps = num_timesteps
        self.betas = torch.linspace(beta_start, beta_end, num_timesteps).to(device)
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)

    def get_parameters(self, t: torch.Tensor):
        alpha_t = self.alphas_cumprod[t]  # (B,)
        sqrt_alpha_t = torch.sqrt(alpha_t)
        sqrt_one_minus_alpha_t = torch.sqrt(1.0 - alpha_t)
        return alpha_t, sqrt_alpha_t, sqrt_one_minus_alpha_t


In [None]:
schedule = LinearSchedule(num_timesteps=30, beta_start=0.01, beta_end=0.2)

x_0 = torch.tensor([
    [1, 2, 3, 4, 5],
    [6, 7, 8, 9, 0]
], dtype=torch.long)

# 預先固定的 mask noise
mask_noise = torch.rand(x_0.shape)

# 對多個 timestep 進行 sampling
for t_val in [0, 5, 10, 15, 20, 25]:
    t_batch = torch.tensor([t_val] * x_0.shape[0])
    x_t, mask_pos = sample_q_xt_given_x0_markov(
        x_0, t_batch, mask_noise, schedule,
        mask_token_id=99, pad_token_id=0
    )
    print(f"時間步 t={t_val}:\n x_t: {x_t.tolist()}")
