# Initialization

In [1]:
import torch
import torch.nn as nn
import torch.distributions as D
import torch.nn.functional as F
import torch.optim as optim
import time
import random
import numpy as np

# seed = 1234
# torch.manual_seed(seed)
# random.seed(seed)
# np.random.seed(seed)

# Mixture Gaussians

In [2]:
def make_correlated_mixtures(
    components_num: int,
    dim_num: int,
    A=None,
    b=None,
    epsilon=1e-3
):
    """
    生成 comp1、comp2 这两组混合高斯分布，使得两者的各分量一一对应 (index 相同)。
    其中 comp2[i] 的均值 = A * comp1[i] + b, 并做一点随机扰动。
    协方差也可加扰动。
    """
    if A is None:
        # 缺省设为单位阵
        A = torch.eye(dim_num)
    if b is None:
        # 缺省设为 0
        b = torch.zeros(dim_num)

    # comp1 的各分量
    means1 = torch.randn(components_num, dim_num)   # 随机均值
    covs1 = []
    for _ in range(components_num):
        # 随机生成正定协方差
        M = torch.randn(dim_num, dim_num)
        cov = M @ M.T + epsilon * torch.eye(dim_num)
        covs1.append(cov)

    # comp2 的各分量: 受 comp1 的参数影响（线性映射 + 随机扰动）
    means2 = []
    covs2 = []
    for i in range(components_num):
        mean2 = means1[i] @ A.T + b  # A是 (dim, dim)，外加一个可选偏置 b
        # 给 means2 加一点随机噪声，以免完全重合
        mean2 = mean2 + 0.02 * torch.randn(dim_num)

        M2 = torch.randn(dim_num, dim_num)
        cov2 = M2 @ M2.T + epsilon * torch.eye(dim_num)
        # 也可以考虑把 cov1[i] 做相似变换： cov2 = A @ covs1[i] @ A.T + ...
        # 这里只做简单的随机生成
        covs2.append(cov2)
        means2.append(mean2)

    # 最终构建 D.MultivariateNormal
    comp1 = []
    comp2 = []
    for i in range(components_num):
        dist1 = D.MultivariateNormal(loc=means1[i], covariance_matrix=covs1[i])
        dist2 = D.MultivariateNormal(loc=means2[i], covariance_matrix=covs2[i])
        comp1.append(dist1)
        comp2.append(dist2)

    return comp1, comp2

# Generate Samples

In [3]:
def sample_pairs_extra_dim(
    comp1: list,
    comp2: list,
    N: int,
    comp_var: int
):
    """
    从 comp1 和 comp2 (均为 List[MultivariateNormal], 长度 L) 中采样:
      - 对每个样本 i:
        1) 从 comp2[ main_idx_i ] 采样 -> sample2[i] (dim维)
        2) 从 comp1[ main_idx_i ] 采样 1 个向量 + 再从 comp1 里随机抽 comp_var-1 个分量采样
           拼接成 (comp_var * dim) 的向量 -> sample1[i]
    返回:
        sample1: shape (N, comp_var * dim)
        sample2: shape (N, dim)
    """
    assert len(comp1) == len(comp2), "comp1 和 comp2 长度必须相同"
    L = len(comp1)
    dim = comp1[0].mean.shape[0]
    
    # 创建结果张量
    sample1 = torch.empty(N, comp_var * dim)
    sample2 = torch.empty(N, dim)
    
    # 1) 随机生成每个样本的主要分量: main_indices
    main_indices = torch.randint(0, L, (N,))
    
    # 2) 为额外 (comp_var - 1) 份采样生成随机分量索引: other_indices
    if comp_var > 1:
        other_indices = torch.randint(0, L, (N, comp_var - 1))
    else:
        # comp_var=1 时没有额外分量
        other_indices = None
    
    # ================== A) 批量处理 “主要”分量 ==================
    #    - 对 comp2[d] 做一次性采样 -> 填入 sample2
    #    - 对 comp1[d] 做一次性采样 -> 填入 sample1[:, 0:dim]
    for d in range(L):
        # 找出 main_indices == d 的所有行
        mask_i = (main_indices == d).nonzero(as_tuple=True)[0]
        count_d = mask_i.shape[0]
        if count_d == 0:
            continue
        
        # 从 comp2[d] 批量采样 -> 放到 sample2[mask_i]
        x2 = comp2[d].sample((count_d,))  # shape (count_d, dim)
        sample2[mask_i] = x2
        
        # 从 comp1[d] 批量采样 -> 放到 sample1[mask_i, 0:dim]
        x1_main = comp1[d].sample((count_d,))  # shape (count_d, dim)
        sample1[mask_i, 0:dim] = x1_main
    
    # ================== B) 批量处理 “额外”分量 ==================
    #   如果 comp_var=1，就不需要额外分量了
    if comp_var > 1:
        # other_indices 的 shape: (N, comp_var-1)
        
        # 遍历每个分量 d，集中处理
        for d in range(L):
            # 找出所有 (i, j) 使得 other_indices[i,j] = d
            i_2d, j_2d = (other_indices == d).nonzero(as_tuple=True)
            # 这样 i_2d 与 j_2d 形状相同, each pair (i_2d[k], j_2d[k]) 是一个位置
            count_2d = i_2d.shape[0]
            if count_2d == 0:
                continue
            
            # 一次性从 comp1[d] 采样 count_2d 个 (dim,) 向量
            x_d = comp1[d].sample((count_2d,))
            
            # 需要把 x_d[k] 填到 sample1[i_2d[k], offset: offset+dim],
            #   其中 offset = (1 + j_2d[k]) * dim
            #   (因为第0块已经是主要分量, 剩余的 j=0,1,...,comp_var-2 分别放第1..comp_var-1 块)
            
            # 为了批量赋值，需要对 j_2d 进行分组
            # 下面示例做一个简单的循环, 以 j_2d 的取值为分组
            unique_j = j_2d.unique()
            
            # 当前在 x_d 中的位置
            cur_idx = 0
            for j_val in unique_j:
                # 本组大小
                mask_this_j = (j_2d == j_val)
                group_count = mask_this_j.sum().item()
                # 取出在 x_d 里的对应部分
                x_sub = x_d[cur_idx : cur_idx + group_count]
                cur_idx += group_count
                
                # 找到对应的 i
                i_sub = i_2d[mask_this_j]
                
                # 计算 sample1 对应的 offset
                offset = (1 + j_val.item()) * dim
                sample1[i_sub, offset : offset+dim] = x_sub
    
    return sample1, sample2


# Encoder

In [8]:
class ResidualBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.fc1 = nn.Linear(dim, dim)
        self.fc2 = nn.Linear(dim, dim)
    
    def forward(self, x):
        residual = x
        out = F.relu(self.fc1(x))
        out = self.fc2(out)
        return F.relu(out + residual)

class ResMLPBackbone(nn.Module):
    def __init__(self, input_dim, embed_dim, num_blocks=3):
        super().__init__()
        self.input_layer = nn.Linear(input_dim, embed_dim)
        self.blocks = nn.Sequential(*[ResidualBlock(embed_dim) for _ in range(num_blocks)])
    
    def forward(self, x):
        x = F.relu(self.input_layer(x))
        x = self.blocks(x)
        return x

# Contrastive Loss (InfoNCE)

In [10]:
class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.07):
        super().__init__()
        # We'll train over log_temp to keep it in a stable range
        self.log_temp = nn.Parameter(torch.log(torch.tensor(1/temperature)))

    def forward(self, emb1, emb2):
        """
        emb1: shape (N, EMBED_DIM)
        emb2: shape (N, EMBED_DIM)
        Returns: scalar loss
        """
        # Normalize embeddings
        emb1 = F.normalize(emb1, dim=-1)  # shape (N, EMBED_DIM)
        emb2 = F.normalize(emb2, dim=-1)  # shape (N, EMBED_DIM)

        # Compute cosine similarities: (N, EMBED_DIM) x (N, EMBED_DIM) -> (N, N)
        # Here, (N, E) dot (N, E) => we'll do it by matrix multiply
        sim_matrix = emb1 @ emb2.T  # shape (N, N)

        # Scale by temperature
        temperature = torch.exp(self.log_temp)
        logits = sim_matrix / temperature  # shape (N, N)

        # We want each row i to match column i => label[i] = i
        labels = torch.arange(emb1.size(0), device=emb1.device)

        # Cross-entropy loss for emb1 -> emb2
        loss_i = F.cross_entropy(logits, labels)
        # Cross-entropy loss for emb2 -> emb1 (transpose)
        loss_t = F.cross_entropy(logits.T, labels)

        # Final contrastive loss
        loss = (loss_i + loss_t) / 2.0
        return loss

# Experiment Set UP

In [78]:
num_steps = 50000
batch_size = 256
dim_num = 1
comp_var = 1            # 样本拼接倍数，示例用 1
components_num = 10000     # 混合分布中分量个数
embed_dim = 128          # 编码后输出 embedding 的维度
lr = 1e-4               # 学习率
device = 'cuda' if torch.cuda.is_available() else 'cpu'

encoder1 = ResMLPBackbone(input_dim=comp_var * dim_num, embed_dim=embed_dim, num_blocks=12)
encoder2 = ResMLPBackbone(input_dim=dim_num, embed_dim=embed_dim, num_blocks=12)

# criterion = ContrastiveLoss(temperature=0.07)

params = list(encoder1.parameters()) + list(encoder2.parameters())
optimizer = optim.Adam(params, lr=lr)

comp1, comp2 = make_correlated_mixtures(components_num, dim_num)

sample1, sample2 = sample_pairs_extra_dim(comp1=comp1, comp2=comp2, N=N, comp_var=comp_var)
print(f'sample1: {sample1.shape}, sample2: {sample2.shape}')

sample1: torch.Size([512, 1]), sample2: torch.Size([512, 1])


In [79]:
criterion = ContrastiveLoss(temperature=0.07)

encoder1 = encoder1.to(device)
encoder2 = encoder2.to(device)
encoder1.train()
encoder2.train()

for step in range(num_steps):
    # ======================= 3) 采样一批 (sample1, sample2) =======================
    sample1, sample2 = sample_pairs_extra_dim(
        comp1=comp1, 
        comp2=comp2, 
        N=batch_size, 
        comp_var=comp_var
    )
    
    # 假设在 GPU 上训练，可执行 .to(device)
    sample1 = sample1.to(device)
    sample2 = sample2.to(device)
    sample1 = F.normalize(sample1, dim=-1)
    sample2 = F.normalize(sample2, dim=-1)
    
    # ======================= 4) 前向传播 =======================
    emb1 = encoder1(sample1)
    emb2 = encoder2(sample2)
    
    loss = criterion(emb1, emb2)
    
    # ======================= 5) 反向传播 & 更新参数 =======================
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (step+1) % 100 == 0:
        print(f"step [{step+1}/{num_steps}], loss = {loss.item():.4f}")


step [100/50000], loss = 5.5305
step [200/50000], loss = 5.5304
step [300/50000], loss = 5.5303
step [400/50000], loss = 5.5334
step [500/50000], loss = 5.5305
step [600/50000], loss = 5.5310
step [700/50000], loss = 5.5343
step [800/50000], loss = 5.5291
step [900/50000], loss = 5.5307
step [1000/50000], loss = 5.5297
step [1100/50000], loss = 5.5329
step [1200/50000], loss = 5.5297
step [1300/50000], loss = 5.5280
step [1400/50000], loss = 5.5324
step [1500/50000], loss = 5.5313
step [1600/50000], loss = 5.5286
step [1700/50000], loss = 5.5303
step [1800/50000], loss = 5.5327
step [1900/50000], loss = 5.5318
step [2000/50000], loss = 5.5271
step [2100/50000], loss = 5.5289
step [2200/50000], loss = 5.5314
step [2300/50000], loss = 5.5302
step [2400/50000], loss = 5.5300
step [2500/50000], loss = 5.5328
step [2600/50000], loss = 5.5302
step [2700/50000], loss = 5.5328
step [2800/50000], loss = 5.5310
step [2900/50000], loss = 5.5316
step [3000/50000], loss = 5.5294
step [3100/50000], 

KeyboardInterrupt: 

In [80]:
num_analysis_samples = 500
sample1_analysis, sample2_analysis = sample_pairs_extra_dim(
    comp1=comp1,
    comp2=comp2,
    N=num_analysis_samples,
    comp_var=comp_var
)
sample1_analysis = sample1_analysis.to(device)
sample2_analysis = sample2_analysis.to(device)

encoder1.eval()
encoder2.eval()

with torch.no_grad():
    emb1_all = encoder1(sample1_analysis)
    emb2_all = encoder2(sample2_analysis)

# Retrieval

In [81]:
import torch

def evaluate_retrieval(image_features, text_features, num_captions=5):
    """
    评估 image-to-text (I2T) 和 text-to-image (T2I) 的检索性能指标。
    
    Args:
        image_features (torch.Tensor): shape 为 (N, D) 的图片特征张量，其中 N 为图片数量。
        text_features (torch.Tensor): shape 为 (N * num_captions, D) 的文本特征张量，每张图片对应 num_captions 个 caption。
        num_captions (int): 每张图片对应的 caption 数量（默认 5）。
    
    Returns:
        dict: 包含以下键值对的字典：
            {
                'I2T_top1': float,  # 图片检索文本 Top1 准确率
                'I2T_top5': float,  # 图片检索文本 Top5 准确率
                'I2T_top10': float, # 图片检索文本 Top10 准确率
                'T2I_top1': float,  # 文本检索图片 Top1 准确率
                'T2I_top5': float,  # 文本检索图片 Top5 准确率
                'T2I_top10': float, # 文本检索图片 Top10 准确率
            }
    """
    # 确保文本数量与图片数量及每张图片的 caption 数匹配
    num_images = image_features.size(0)
    assert text_features.size(0) == num_images * num_captions, "文本特征数量与图片数量不匹配！"

    # 若未归一化，则先归一化特征（如果已归一化可省略）
    image_features = image_features / image_features.norm(dim=-1, keepdim=True)
    text_features = text_features / text_features.norm(dim=-1, keepdim=True)
    
    # 计算相似度矩阵：shape 为 (N, N * num_captions)
    similarity = image_features @ text_features.t()
    
    # 评估 image-to-text (I2T)
    I2T_top1, I2T_top5, I2T_top10 = 0, 0, 0
    for i in range(num_images):
        sim_i = similarity[i]  # 第 i 张图片与所有文本之间的相似度
        # 获取从大到小排序后的文本索引
        sorted_indices = torch.argsort(sim_i, descending=True)
        # 第 i 张图片的 ground truth 文本索引范围
        gt_indices = list(range(i * num_captions, i * num_captions + num_captions))
        # Top1 检索：如果 ground truth 中任一索引出现在前 1 个，则认为正确
        if any(idx in sorted_indices[:1] for idx in gt_indices):
            I2T_top1 += 1
        # Top5 检索
        if any(idx in sorted_indices[:5] for idx in gt_indices):
            I2T_top5 += 1
        # Top10 检索
        if any(idx in sorted_indices[:10] for idx in gt_indices):
            I2T_top10 += 1

    I2T_top1_score = I2T_top1 / num_images
    I2T_top5_score = I2T_top5 / num_images
    I2T_top10_score = I2T_top10 / num_images

    # 评估 text-to-image (T2I)
    # 这里可以利用相似度矩阵的转置，shape 为 (N * num_captions, N)
    similarity_t = similarity.t()
    T2I_top1, T2I_top5, T2I_top10 = 0, 0, 0
    for j in range(text_features.size(0)):
        sim_j = similarity_t[j]  # 第 j 个文本与所有图片的相似度
        sorted_indices = torch.argsort(sim_j, descending=True)
        # 对于第 j 个文本，其对应图片索引为 j // num_captions
        gt_image = j // num_captions
        if gt_image in sorted_indices[:1]:
            T2I_top1 += 1
        if gt_image in sorted_indices[:5]:
            T2I_top5 += 1
        if gt_image in sorted_indices[:10]:
            T2I_top10 += 1

    total_texts = text_features.size(0)
    T2I_top1_score = T2I_top1 / total_texts
    T2I_top5_score = T2I_top5 / total_texts
    T2I_top10_score = T2I_top10 / total_texts

    results = {
        'I2T_top1': I2T_top1_score,
        'I2T_top5': I2T_top5_score,
        'I2T_top10': I2T_top10_score,
        'T2I_top1': T2I_top1_score,
        'T2I_top5': T2I_top5_score,
        'T2I_top10': T2I_top10_score,
    }

    return results

In [82]:
import pandas as pd

retrieval_results = evaluate_retrieval(emb1_all, emb2_all, num_captions=1)

coco_retrieval_data = {
    "Metric": ["I2T_top1", "I2T_top5", "I2T_top10", "T2I_top1", "T2I_top5", "T2I_top10"],
    "var1-var1": [
        retrieval_results["I2T_top1"],
        retrieval_results["I2T_top5"],
        retrieval_results["I2T_top10"],
        retrieval_results["T2I_top1"],
        retrieval_results["T2I_top5"],
        retrieval_results["T2I_top10"],
    ],
}
df = pd.DataFrame(coco_retrieval_data)
print(df)

      Metric  var1-var1
0   I2T_top1      0.002
1   I2T_top5      0.014
2  I2T_top10      0.038
3   T2I_top1      0.006
4   T2I_top5      0.022
5  T2I_top10      0.044


In [None]:
def rotate_and_calc_similarity(image_features, text_features):

    # ========== 1) 计算全局均值并归一化 ==========
    # image 全局均值: (D,)
    mean_image = image_features.mean(dim=0)
    # text 全局均值:  (D,)
    mean_text  = text_features.mean(dim=0)

    # 归一化 (若想直接点乘当作余弦相似度, image/text_features 本身也需归一化)
    mean_image_norm = mean_image / (mean_image.norm() + 1e-12)
    mean_text_norm  = mean_text  / (mean_text.norm()  + 1e-12)

    # ========== 2) 计算夹角 theta 并构造二维平面内的正交向量 ==========
    # cos_angle = a·b
    cos_angle = torch.dot(mean_image_norm, mean_text_norm)
    # 防止浮点误差导致 acos 输入超出 [-1,1]
    cos_angle = torch.clamp(cos_angle, -1.0, 1.0)
    theta = torch.acos(cos_angle)  # 弧度

    sin_angle = torch.sqrt(1 - cos_angle**2 + 1e-12)
    # 在 (mean_image_norm, mean_text_norm) 所张平面上，构造与 mean_image_norm 正交的单位向量 v
    v = (mean_text_norm - cos_angle * mean_image_norm) / (sin_angle + 1e-12)

    # ========== 3) 在 (mean_image_norm, v) 平面内对 image_features 做旋转 ==========
    # 投影系数
    proj_a = image_features @ mean_image_norm  # (N,)
    proj_v = image_features @ v                # (N,)

    # 2D 旋转
    # new_a = a*cosθ - v*sinθ
    # new_v = a*sinθ + v*cosθ
    rotated_proj_a = proj_a * torch.cos(theta) - proj_v * torch.sin(theta)  # (N,)
    rotated_proj_v = proj_a * torch.sin(theta) + proj_v * torch.cos(theta)  # (N,)

    # 在平面内的分量(旋转后)
    rotated_parallel = (rotated_proj_a.unsqueeze(1) * mean_image_norm.unsqueeze(0)
                      + rotated_proj_v.unsqueeze(1) * v.unsqueeze(0))

    # 原本在平面内的分量
    orig_parallel = (proj_a.unsqueeze(1) * mean_image_norm.unsqueeze(0)
                   + proj_v.unsqueeze(1) * v.unsqueeze(0))

    # 正交分量(不在平面内, 保持不变)
    orthogonal_component = image_features - orig_parallel

    # 旋转后的图像特征 (N, D)
    rotated_image_features = rotated_parallel + orthogonal_component

    # ========== 4) 计算相似度矩阵 (N, 5N) ==========
    # 如果想用余弦相似度, 这里要保证 rotated_image_features 与 text_features 已各自归一化。
    # 否则就是一般点乘。
    similarity_matrix = rotated_image_features @ text_features.T  # (N, 5N)

    # ========== 5) 通过广播为 "正例" 与 "负例" 构造布尔掩码 ==========
    # 对第 i 行(图像 i), 正例对应的列区间是 [5*i, ..., 5*i+4]
    #   => j_idx // 5 == i_idx

    N = image_features.size(0)
    i_idx = torch.arange(N, device=similarity_matrix.device).unsqueeze(1).expand(N, 5*N)
    j_idx = torch.arange(5*N, device=similarity_matrix.device).unsqueeze(0).expand(N, 5*N)

    pos_mask = (j_idx // 5 == i_idx)  # 形状 (N, 5N), True 表示匹配
    # 所有正例相似度 => (N*5,) 向量
    pos_sims = similarity_matrix[pos_mask]
    # 如果你想分别取 "每个图像 vs. 它的 5 条文本" 的平均，可以 reshape => (N,5) 再 mean
    # 这会先对每张图像5个caption做平均，再对 N 张图像平均
    mean_pos = pos_sims.view(N, 5).mean()

    # 负例相似度(不匹配)
    neg_sims = similarity_matrix[~pos_mask]  # (N*(5N-5),)
    mean_neg = neg_sims.mean()

    return mean_pos.item(), mean_neg.item()

In [84]:
pos_sim, neg_sim = rotate_and_calc_similarity(emb1_all, emb2_all)

IndexError: The shape of the mask [500, 2500] at index 1 does not match the shape of the indexed tensor [500, 500] at index 1