In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from deap import base, creator, tools, algorithms
import random

In [2]:
# 打印CUDA信息
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU count: {torch.cuda.device_count()}")
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")

PyTorch version: 2.5.1+cu118
CUDA available: True
CUDA version: 11.8
GPU count: 1
GPU 0: NVIDIA GeForce RTX 3060 Laptop GPU


In [3]:
# 新增设备统一管理
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

## 1：预训练CNN模型
首先，定义并训练一个CNN模型在CIFAR-10数据集上。

In [4]:
# 定义CNN模型
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.relu2 = nn.ReLU()
        self.conv3 = nn.Conv2d(128, 256, 3, padding=1)
        self.relu3 = nn.ReLU()
        self.pool = nn.MaxPool2d(2, 2)
        self.fc = nn.Linear(256 * 4 * 4, 10)

    def forward(self, x):
        x = self.pool(self.relu1(self.conv1(x)))
        x = self.pool(self.relu2(self.conv2(x)))
        x = self.pool(self.relu3(self.conv3(x)))
        x = x.view(-1, 256 * 4 * 4)
        x = self.fc(x)
        return x

In [5]:
# 数据加载和训练函数
def train_model():
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=100, shuffle=True)
    
    # 使用全局device，模型直接创建在GPU上
    model = SimpleCNN().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    for epoch in range(10):
        model.train()
        for inputs, labels in trainloader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
    torch.save(model.state_dict(), 'pretrained_model.pth')

In [6]:
train_model()

Files already downloaded and verified


## 2：设置遗传算法框架
使用DEAP库配置遗传算法。

In [8]:
# 创建适应度和个体类
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)

# 染色体长度：conv1(64) + conv2(128) = 192
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=192)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)



## 3：定义剪枝和评估函数
加载预训练模型并定义剪枝逻辑及适应度评估。

In [11]:
# 剪枝算法
def prune_model(original_model, conv1_indices, conv2_indices):
    # 创建新模型时直接放在GPU
    pruned_model = SimpleCNN().to(device)
    
    # 剪枝conv1（增加空检查）
    if len(conv1_indices) == 0:
        del pruned_model  # 显存释放
        return None
    # 权重转移使用GPU张量
    pruned_model.conv1.weight.data = original_model.conv1.weight.data[conv1_indices].clone().detach().to(device)
    pruned_model.conv1.bias.data = original_model.conv1.bias.data[conv1_indices].clone().detach().to(device)
    
    # 剪枝conv2（增加维度检查）
    if len(conv2_indices) == 0:
        del pruned_model
        return None
    # 使用跨设备安全索引
    conv2_weight = original_model.conv2.weight.data[conv2_indices][:, conv1_indices, :, :].clone().detach().to(device)
    pruned_model.conv2.weight.data = conv2_weight
    pruned_model.conv2.bias.data = original_model.conv2.bias.data[conv2_indices].clone().detach().to(device)
    
    # 自动匹配设备
    pruned_model.conv3.weight.data = original_model.conv3.weight.data[:, :len(conv2_indices), :, :].clone().detach().to(device)
    pruned_model.conv3.bias.data = original_model.conv3.bias.data.clone().detach().to(device)
    
    # 全连接层设备同步
    pruned_model.fc.weight.data = original_model.fc.weight.data.clone().detach().to(device)
    pruned_model.fc.bias.data = original_model.fc.bias.data.clone().detach().to(device)
    
    return pruned_model

In [None]:
# 适应度函数
def evaluate(individual):
    print("Evaluating individual:", individual)
    
    # 解码染色体
    conv1_indices = [i for i, bit in enumerate(individual[:64]) if bit == 1]
    conv2_indices = [i for i, bit in enumerate(individual[64:192]) if bit == 1]
    if not conv1_indices or not conv2_indices:
        torch.cuda.empty_cache()  # 清理显存
        return (0,)
    
    # 加载预训练模型到GPU
    original_model = SimpleCNN().to(device)
    original_model.load_state_dict(torch.load('pretrained_model.pth', map_location=device))  # 确保加载到GPU
    
    try:
        pruned_model = prune_model(original_model, conv1_indices, conv2_indices)
        if pruned_model is None:
            return (0,)
        
        # 使用异步数据传输
        accuracy = test_accuracy(pruned_model)
        
        # 显存清理
        del original_model
        del pruned_model
        torch.cuda.empty_cache()
        
        return (accuracy,)
    except RuntimeError as e:  # 捕捉显存溢出
        print(f"显存溢出: {str(e)}")
        return (0,)

In [13]:
# 测试CNN的准确率，供适应度函数调用
def test_accuracy(model):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False)
    
    model.to(device)  # 确保模型在GPU
    model.eval()
    
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in testloader:
            # 数据自动转移到GPU
            inputs = inputs.to(device, non_blocking=True)  # 异步传输
            labels = labels.to(device, non_blocking=True)
            
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return correct / total

In [14]:
### GPU批处理加速
toolbox.register("evaluate", evaluate)
# 添加CUDA并行评估
import multiprocessing
pool = multiprocessing.Pool(processes=torch.cuda.device_count())  # 使用GPU数量作为进程数
toolbox.register("map", pool.map)

In [15]:
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

## 4：运行遗传算法
执行遗传算法优化过程。

In [16]:
def main():
    population = toolbox.population(n=20)
    NGEN = 5
    CXPB = 0.5
    MUTPB = 0.2
    
    for gen in range(NGEN):
        offspring = algorithms.varAnd(population, toolbox, CXPB, MUTPB)
        fits = toolbox.map(toolbox.evaluate, offspring)
        for fit, ind in zip(fits, offspring):
            ind.fitness.values = fit
        population = toolbox.select(offspring, k=len(population))
    
    best_ind = tools.selBest(population, k=1)[0]
    print(f"最佳个体适应度: {best_ind.fitness.values[0]}")

In [None]:
if __name__ == "__main__":
    # 初始化CUDA环境
    torch.backends.cudnn.benchmark = True  # 启用cuDNN自动优化
    main()
    # 最后关闭进程池
    pool.close()
    pool.join()

In [4]:
# 终止进程池（如果还在运行）
try:
    pool.terminate()
    pool.join()
    print("已成功关闭 multiprocessing 进程池")
except:
    print("未找到正在运行的 multiprocessing 进程")

# 清理 CUDA 缓存
torch.cuda.empty_cache()
print("CUDA 显存缓存已清理")

未找到正在运行的 multiprocessing 进程
CUDA 显存缓存已清理


## 5：微调剪枝后的模型
对找到的最佳剪枝模型进行微调以恢复性能。

In [21]:
def fine_tune(pruned_model):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=100, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    pruned_model.to(device)  # 确保模型在GPU
    optimizer = optim.Adam(pruned_model.parameters(), lr=0.0001)
    
    # 使用混合精度训练
    scaler = torch.cuda.amp.GradScaler()
    
    for epoch in range(5):
        pruned_model.train()
        for inputs, labels in trainloader:
            inputs = inputs.to(device, non_blocking=True)
            labels = labels.to(device, non_blocking=True)
            
            optimizer.zero_grad()
            with torch.cuda.amp.autocast():  # 自动混合精度
                outputs = pruned_model(inputs)
                loss = criterion(outputs, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
    
    return pruned_model