# 1. 环境设置与导入库
首先，我们需要导入所有必要的库，并设置设备（CPU 或 GPU）。

In [1]:
# Cell 1: 环境设置与导入库

import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision
import clip
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
import random

# 设备选择
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")


使用设备: cuda


# 2. 数据集类与预处理
定义 CelebADataset 类，并设置图像预处理方法。

In [2]:
# Cell 2: 数据集类与预处理

# CelebA数据集类
class CelebADataset(Dataset):
    def __init__(self, img_dir, attr_path, bbox_path, partition_path, 
                 transform=None, partition=0):
        """
        初始化数据集
        :param img_dir: 图像文件夹路径
        :param attr_path: 属性文件路径
        :param bbox_path: 边界框文件路径
        :param partition_path: 分区文件路径
        :param transform: 图像预处理
        :param partition: 使用的数据分区 (0: train, 1: val, 2: test)
        """
        self.img_dir = img_dir
        self.transform = transform

        # 读取属性文件
        attr_df = pd.read_csv(attr_path, sep=',', header=0)
        partition_df = pd.read_csv(partition_path, sep=',', header=0)
        
        # 合并属性文件和分区文件
        attr_df = attr_df.merge(partition_df, on='image_id')
        
        # 根据指定的分区进行筛选
        self.attr_df = attr_df[attr_df['partition'] == partition]
        
        # 读取边界框文件
        bbox_df = pd.read_csv(bbox_path, sep=',', header=0)
        
        # 合并边界框信息
        self.attr_df = self.attr_df.merge(bbox_df, on='image_id')

    def __len__(self):
        return len(self.attr_df)

    def __getitem__(self, idx):
        # 获取图像文件名
        img_name = self.attr_df.iloc[idx, 0]
        img_path = os.path.join(self.img_dir, img_name)
        
        # 打开图像并转换为RGB
        image = Image.open(img_path).convert('RGB')
        
        # 获取属性标签 
        attrs = self.attr_df.iloc[idx, 1:41].values
        attrs = (attrs + 1) // 2  # 将-1转为0，1保持1
        attrs = attrs.astype(np.float32)
        
        # 应用图像预处理
        if self.transform:
            image = self.transform(image)
        
        # 随机选择一个CLIP文本嵌入
        random_idx = random.randint(0, len(CLIP_TEXT_EMBEDDINGS) - 1)
        clip_embedding = CLIP_TEXT_EMBEDDINGS[random_idx]
        
        return image, attrs, clip_embedding

# 配置参数

# 图像预处理
IMAGE_SIZE = 224  # 从64修改为224
BATCH_SIZE = 128
NUM_WORKERS = 4

# 其他参数保持不变
LATENT_DIM = 128
COND_DIM = 40
CLIP_DIM = 512

# 训练参数
LEARNING_RATE = 1e-5
NUM_EPOCHS = 20  # 可在实验部分修改

# 其他配置
RANDOM_SEED = 42
NUM_RUNS_FIXED_EPOCHS = 5
EPOCH_OPTIONS = [10, 20, 30, 40, 50]
NUM_RUNS_PER_EPOCH = 3

# 设置随机种子以确保可重复性
torch.manual_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(RANDOM_SEED)


# 图像预处理
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])  # 修正归一化
])


# 3. 加载CLIP模型与预计算文本嵌入
加载 CLIP 模型并预先计算文本嵌入，以提高训练效率。

In [3]:
# Cell 3: 加载CLIP模型与预计算文本嵌入

# 加载CLIP模型
clip_model, preprocess = clip.load("ViT-B/32", device=device)

# 加载另一个CLIP模型到 CPU，用于数据加载
clip_model_cpu, preprocess_clip_cpu = clip.load("ViT-B/32", device='cpu')

# 文本提示词
TEXT_PROMPTS = [
    "A portrait of a young woman",
    "A realistic face with a smile",
    "A person with distinct facial features"
]

# 生成CLIP文本嵌入（提前计算以避免重复计算）
def generate_text_embeddings(text_prompts):
    text_tokens = clip.tokenize(text_prompts).to(device)
    with torch.no_grad():
        text_embeddings = clip_model.encode_text(text_tokens)
    return text_embeddings.cpu()  # 移动到CPU

# 预先计算文本嵌入
CLIP_TEXT_EMBEDDINGS = generate_text_embeddings(TEXT_PROMPTS)


# 4. 数据加载器设置
设置训练集和验证集的数据加载器。

In [4]:
# Cell 4: 数据加载器设置

# 数据集路径设置（请根据实际路径修改）
img_dir = '/root/autodl-tmp/celeba_datasets/img_align_celeba/img_align_celeba'
attr_path = '/root/autodl-tmp/celeba_datasets/list_attr_celeba.txt'
bbox_path = '/root/autodl-tmp/celeba_datasets/list_bbox_celeba.txt'
partition_path = '/root/autodl-tmp/celeba_datasets/list_eval_partition.txt'

# 创建训练集和验证集
train_dataset = CelebADataset(img_dir, attr_path, bbox_path, partition_path, 
                              transform=transform, partition=0)
val_dataset = CelebADataset(img_dir, attr_path, bbox_path, partition_path, 
                            transform=transform, partition=1)

# 数据加载器
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4, pin_memory=True)


# 5. 模型定义
定义 ClipCVAE 模型结构。

In [5]:
# Cell 5: 模型定义

class ClipCVAE(nn.Module):
    def __init__(self, img_channels=3, img_size=224, latent_dim=128, 
                 cond_dim=40, clip_dim=512):
        super(ClipCVAE, self).__init__()
        self.img_size = img_size
        self.latent_dim = latent_dim
        self.cond_dim = cond_dim
        self.clip_dim = clip_dim

        # 对应224x224输入，4次stride=2下采样后特征图大小为14x14
        # 编码器部分
        self.encoder = nn.Sequential(
            nn.Conv2d(img_channels + cond_dim + clip_dim, 64, kernel_size=4, stride=2, padding=1),  # 224->112
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),                                 # 112->56
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),                                # 56->28
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1),                                # 28->14
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Flatten()
        )
        
        # 计算编码器输出尺寸：512 * 14 * 14 = 100352
        enc_out_dim = 512 * 14 * 14
        self.fc_mu = nn.Linear(enc_out_dim, latent_dim)
        self.fc_logvar = nn.Linear(enc_out_dim, latent_dim)

        # 解码器部分
        # 将潜在向量映射回512*14*14的特征图
        self.decoder_input = nn.Linear(latent_dim + cond_dim + clip_dim, 512*14*14)
        self.decoder = nn.Sequential(
            # 14x14 -> 28x28
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            # 28x28 -> 56x56
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            # 56x56 -> 112x112
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            # 112x112 -> 224x224
            nn.ConvTranspose2d(64, img_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def encode(self, x, c, clip_embedding):
        # 调整条件和clip嵌入大小
        c = c.view(c.size(0), self.cond_dim, 1, 1).repeat(1, 1, self.img_size, self.img_size)
        clip_embedding = clip_embedding.view(clip_embedding.size(0), self.clip_dim, 1, 1).repeat(1, 1, self.img_size, self.img_size)
        x = torch.cat([x, c, clip_embedding], dim=1)
        x = self.encoder(x)
        mu = self.fc_mu(x)
        logvar = self.fc_logvar(x)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z, c, clip_embedding):
        # 拼接潜在变量、条件和clip嵌入
        z = torch.cat([z, c, clip_embedding], dim=1)
        x = self.decoder_input(z)
        # 恢复为[batch_size, 512, 14, 14]
        x = x.view(-1, 512, 14, 14)
        x = self.decoder(x)
        return x

    def forward(self, x, c, clip_embedding):
        mu, logvar = self.encode(x, c, clip_embedding)
        z = self.reparameterize(mu, logvar)
        recon_x = self.decode(z, c, clip_embedding)
        return recon_x, mu, logvar


# 6. 初始化模型、损失函数和优化器

In [6]:
# Cell 6: 初始化模型、损失函数和优化器

# 初始化模型参数
latent_dim = 128
cond_dim = 40
clip_dim = 512
model = ClipCVAE(img_channels=3, img_size=224, latent_dim=latent_dim, 
                cond_dim=cond_dim, clip_dim=clip_dim).to(device)

# 优化器
optimizer = optim.Adam(model.parameters(), lr=1e-5)

# 损失函数
criterion = nn.MSELoss(reduction='sum')

# 定义损失函数
def loss_function(recon_x, x, mu, logvar):
    recon_loss = criterion(recon_x, x)
    KL = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return recon_loss + KL


# 7. 训练与验证函数
定义训练和验证的函数，并记录每个epoch的损失。



In [7]:
# Cell 7: 训练与验证函数 (加入混合精度训练)

from torch.cuda.amp import autocast, GradScaler

def train_model(model, train_loader, val_loader, optimizer, num_epochs=50):
    scaler = GradScaler()  # 初始化GradScaler

    train_losses = []
    val_losses = []
    
    for epoch in range(1, num_epochs + 1):
        model.train()
        train_loss = 0
        for batch_idx, (data, attrs, clip_emb) in enumerate(tqdm(train_loader, desc=f"Epoch {epoch}/{num_epochs}")):
            data = data.to(device, non_blocking=True)
            attrs = attrs.to(device, non_blocking=True)
            clip_emb = clip_emb.to(device, non_blocking=True)
            
            optimizer.zero_grad()
            # 使用autocast在前向传播和损失计算中启用混合精度
            with autocast():
                recon_batch, mu, logvar = model(data, attrs, clip_emb)
                loss = loss_function(recon_batch, data, mu, logvar)
            
            # 使用scaler进行梯度缩放并反向传播
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
            train_loss += loss.item()
        
        avg_train_loss = train_loss / len(train_loader.dataset)
        train_losses.append(avg_train_loss)
        print(f"Epoch {epoch}, 平均训练损失: {avg_train_loss:.4f}")
        
        # 验证集评估
        model.eval()
        val_loss = 0
        with torch.no_grad():
            # 验证过程也可以使用autocast加速
            for data, attrs, clip_emb in val_loader:
                data = data.to(device, non_blocking=True)
                attrs = attrs.to(device, non_blocking=True)
                clip_emb = clip_emb.to(device, non_blocking=True)
                
                with autocast():
                    recon_batch, mu, logvar = model(data, attrs, clip_emb)
                    loss = loss_function(recon_batch, data, mu, logvar)
                val_loss += loss.item()
        
        avg_val_loss = val_loss / len(val_loader.dataset)
        val_losses.append(avg_val_loss)
        print(f"验证集平均损失: {avg_val_loss:.4f}")
    
    return model, train_losses, val_losses


# 8. 图像生成与可视化函数
定义生成图像和显示图像的函数。

In [8]:
# Cell 8: 图像生成与可视化函数

# 图像生成函数
def generate_images(model, attrs, text_prompts, device, num_images=16):
    """
    根据条件标签和文本提示生成图像
    """
    model.eval()
    
    with torch.no_grad():
        z = torch.randn(num_images, model.latent_dim).to(device)
        attrs = attrs[:num_images].to(device)
        
        # 对于每个图像随机选择一个文本嵌入
        text_embeddings = torch.stack([
            CLIP_TEXT_EMBEDDINGS[random.randint(0, len(CLIP_TEXT_EMBEDDINGS) - 1)] 
            for _ in range(num_images)
        ]).to(device)
        
        generated = model.decode(z, attrs, text_embeddings)
        generated = generated.cpu()
        return generated

# 可视化生成的图像
def show_images(images, title="Generated Images"):
    images = images * 0.5 + 0.5  # 反归一化
    grid = torchvision.utils.make_grid(images, nrow=4)
    np_grid = grid.numpy()
    plt.figure(figsize=(8,8))
    plt.imshow(np.transpose(np_grid, (1, 2, 0)))
    plt.title(title)
    plt.axis('off')
    plt.show()


# 9. 实验1：固定轮次，多次训练并记录损失
在固定的训练轮次下，多次训练模型，并记录每次训练的训练和验证损失，以计算平均收敛点。

In [9]:
# # Cell 9: Experiment 1 - Fixed Epochs, Multiple Trainings

# import copy

# # Set experiment parameters
# fixed_num_epochs = 20  # Fixed number of training epochs
# num_runs = 5  # Number of training runs

# # Store all run losses
# all_train_losses = []
# all_val_losses = []

# for run in range(1, num_runs + 1):
#     print(f"\n=== Run {run}/{num_runs} ===")
    
#     # Reinitialize the model and optimizer
#     model_run = ClipCVAE(img_channels=3, img_size=224, latent_dim=latent_dim, 
#                         cond_dim=cond_dim, clip_dim=clip_dim).to(device)
#     optimizer_run = optim.Adam(model_run.parameters(), lr=1e-5)
    
#     # Train the model
#     trained_model, train_losses, val_losses = train_model(
#         model_run, train_loader, val_loader, optimizer_run, num_epochs=fixed_num_epochs
#     )
    
#     all_train_losses.append(train_losses)
#     all_val_losses.append(val_losses)
    
#     # Free up memory
#     del trained_model
#     torch.cuda.empty_cache()

# # Calculate average loss
# avg_train_losses = np.mean(all_train_losses, axis=0)
# avg_val_losses = np.mean(all_val_losses, axis=0)

# # Plot average loss curve
# plt.figure(figsize=(10,5))
# plt.plot(range(1, fixed_num_epochs + 1), avg_train_losses, label='Average Training Loss')
# plt.plot(range(1, fixed_num_epochs + 1), avg_val_losses, label='Average Validation Loss')
# plt.xlabel('Epochs')
# plt.ylabel('Loss')
# plt.title(f'Average Loss Curve ({fixed_num_epochs} Epochs) for {num_runs} Training Runs')
# plt.legend()
# plt.show()


# 10. 实验2：改变轮次，多次训练并寻找最佳轮次
在多个不同的训练轮次下进行多次训练，记录每个轮次的平均损失，并分析最佳轮次。

In [10]:
# # Cell 10: 实验2 - 改变轮次，寻找最佳轮次

# # 设置实验参数
# epoch_options = [10, 20, 30, 40, 50]  # 不同的训练轮次选项
# num_runs_per_epoch = 3  # 每个轮次进行的训练次数

# # 存储不同轮次的损失
# epoch_train_losses = {epochs: [] for epochs in epoch_options}
# epoch_val_losses = {epochs: [] for epochs in epoch_options}

# for epochs in epoch_options: 
#     print(f"\n=== 训练轮次: {epochs} ===")
#     for run in range(1, num_runs_per_epoch + 1):
#         print(f"--- 运行 {run}/{num_runs_per_epoch} ---")
        
#         # 重新初始化模型和优化器
#         model_run = ClipCVAE(img_channels=3, img_size=224, latent_dim=latent_dim, 
#                             cond_dim=cond_dim, clip_dim=clip_dim).to(device)
#         optimizer_run = optim.Adam(model_run.parameters(), lr=1e-5)
        
#         # 训练模型
#         trained_model, train_losses, val_losses = train_model(
#             model_run, train_loader, val_loader, optimizer_run, num_epochs=epochs
#         )
        
#         # 记录最后一个epoch的损失
#         epoch_train_losses[epochs].append(train_losses[-1])
#         epoch_val_losses[epochs].append(val_losses[-1])
        
#         # 释放显存
#         del trained_model
#         torch.cuda.empty_cache()

# # 计算每个轮次的平均损失
# avg_epoch_train_losses = {epochs: np.mean(losses) for epochs, losses in epoch_train_losses.items()}
# avg_epoch_val_losses = {epochs: np.mean(losses) for epochs, losses in epoch_val_losses.items()}

# # 绘制不同轮次的平均验证损失
# epochs_sorted = sorted(epoch_options)
# val_loss_means = [avg_epoch_val_losses[epochs] for epochs in epochs_sorted]

# plt.figure(figsize=(10,5))
# plt.plot(epochs_sorted, val_loss_means, marker='o', label='平均验证损失')
# plt.xlabel('训练轮次')
# plt.ylabel('平均验证损失')
# plt.title('不同训练轮次下的平均验证损失')
# plt.xticks(epochs_sorted)
# plt.legend()
# plt.show()

# # 打印最佳轮次
# best_epoch = epochs_sorted[np.argmin(val_loss_means)]
# print(f"最佳训练轮次为: {best_epoch}, 对应的平均验证损失为: {avg_epoch_val_losses[best_epoch]:.4f}")


# 11. 保存模型与生成图像
在确定最佳轮次后，可以保存最终模型并生成一些示例图像。

In [11]:
# # Cell 11: 保存模型与生成图像

# # 假设最佳轮次为 `best_epoch`
# # 这里重新训练模型一次以获得最终模型
# best_num_epochs = 20
# print(f"\n=== 使用最佳轮次 {best_num_epochs} 重新训练模型 ===")

# # 重新初始化模型和优化器
# final_model = ClipCVAE(img_channels=3, img_size=224, latent_dim=latent_dim, 
#                       cond_dim=cond_dim, clip_dim=clip_dim).to(device)
# final_optimizer = optim.Adam(final_model.parameters(), lr=1e-5)

# # 训练模型
# final_trained_model, final_train_losses, final_val_losses = train_model(
#     final_model, train_loader, val_loader, final_optimizer, num_epochs=best_num_epochs
# )

# # 保存模型
# torch.save(final_trained_model.state_dict(), f'clip_cvae_celeba_epochs_{best_num_epochs}.pth')
# print(f"模型已保存为: clip_cvae_celeba_epochs_{best_num_epochs}.pth")

# # 生成图像示例
# data_iter = iter(val_loader)
# images, attrs, _ = next(data_iter)
# sample_attrs = attrs[:16]

# generated_images = generate_images(final_trained_model, sample_attrs, TEXT_PROMPTS, device, num_images=16)

# # 可视化生成的图像
# show_images(generated_images, title="CLIP引导的条件生成人脸图像")


## 利用映射网络转化image_embeddings生成

In [19]:
# # 假设 text_to_image_embedder.pth 是已训练好的映射网络权重文件
# # 首先在和CVAE生成图像同一个脚本中加载映射网络

# # 请确保与CVAE定义在同一代码块或在此处重新定义TextToImageEmbedder类
# class TextToImageEmbedder(nn.Module):
#     def __init__(self, clip_dim=512, embed_dim=512):
#         super(TextToImageEmbedder, self).__init__()
#         self.mapping = nn.Sequential(
#             nn.Linear(clip_dim, 1024),
#             nn.ReLU(),
#             nn.Linear(1024, embed_dim),
#             nn.ReLU(),
#             nn.Linear(embed_dim, embed_dim)
#         )
    
#     def forward(self, text_embeddings):
#         image_embeddings = self.mapping(text_embeddings)
#         return image_embeddings

# # 加载已训练的映射模型
# embedder = TextToImageEmbedder(clip_dim=512, embed_dim=512).to(device)
# embedder.load_state_dict(torch.load('text_to_image_embedder.pth', map_location=device))
# embedder.eval()

# # 修改后的 generate_images 函数
# def generate_images(model, attrs, text_prompts, device, num_images=16):
#     """
#     根据条件标签和文本提示生成图像，这里将文本嵌入映射为图像嵌入后再生成。
#     """
#     model.eval()
    
#     # 使用CLIP模型对传入的 text_prompts 进行编码（如果text_prompts本身就是已处理好的文本嵌入可跳过此步）
#     text_tokens = clip.tokenize(text_prompts).to(device)
#     with torch.no_grad():
#         raw_text_embeddings = clip_model.encode_text(text_tokens)
#     raw_text_embeddings = raw_text_embeddings.float()

#     with torch.no_grad():
#         z = torch.randn(num_images, model.latent_dim).to(device)
#         attrs = attrs[:num_images].to(device)
        
#         # 对每个生成图像随机选择一个文本提示对应的嵌入，然后映射为图像嵌入
#         chosen_img_embs = []
#         for i in range(num_images):
#             # 随机从文本集合中选取一个文本嵌入
#             random_index = random.randint(0, len(text_prompts) - 1)
#             selected_text_emb = raw_text_embeddings[random_index].unsqueeze(0)  # shape: (1, 512)
            
#             # 通过映射网络将文本嵌入转为图像嵌入
#             mapped_img_emb = embedder(selected_text_emb) # shape: (1, 512)
#             chosen_img_embs.append(mapped_img_emb)
        
#         chosen_img_embs = torch.cat(chosen_img_embs, dim=0) # shape: (num_images, 512)
        
#         generated = model.decode(z, attrs, chosen_img_embs)
#         generated = generated.cpu()
#         return generated

# # 测试新的生成过程
# data_iter = iter(val_loader)
# images, attrs, _ = next(data_iter)
# sample_attrs = attrs[:16]

# generated_images = generate_images(final_trained_model, sample_attrs, TEXT_PROMPTS, device, num_images=16)

# # 可视化生成的图像
# show_images(generated_images, title="CLIP映射后的条件生成人脸图像")


  embedder.load_state_dict(torch.load('text_to_image_embedder.pth', map_location=device))


NameError: name 'final_trained_model' is not defined