In [13]:
import numpy as np
import SearchSpace as ss
import ModelBuild as Builder
import TrainModel as Trainer
from google.colab import drive
import pandas as pd
import os
import torch
import random

import torch
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms

drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
random_params = ss.create_param_combinations(strategy="genetic")
print("random Search Combinations:", random_params)

random Search Combinations: [{'num_layers': 4, 'units_per_layer': 34, 'activation': 'sigmoid', 'learning_rate': 0.0012288466899514675, 'batch_size': 32, 'dropout_rate': 0.37415798952077933, 'l2_reg_strength': 3.253866542962493e-05}]


In [15]:
#
# 初始化种群
def initialize_population(size=50):
    population = []
    for _ in range(size):
        individual = ss.create_param_combinations(strategy="genetic")
        population.append(individual[0])
    return population

# 选择多个父代个体（精英选择+轮盘赌）
def selection(population, fitness_scores, num_parents=10):
    # 保留适应度最高的个体（精英选择）
    selected_indices = sorted(range(len(fitness_scores)), key=lambda i: fitness_scores[i])[:num_parents]
    selected_population = [population[i] for i in selected_indices]

    # 根据适应度选择其他父代（轮盘赌选择）
    total_fitness = sum(fitness_scores)
    selection_probs = [1 - (f / total_fitness) for f in fitness_scores]
    selected_population += random.choices(population, weights=selection_probs, k=num_parents)

    return selected_population

# 交叉操作生成新个体
def crossover(parent1, parent2):
    child = {}
    for key in parent1.keys():
        child[key] = random.choice([parent1[key], parent2[key]])
    return child

# 变异操作
def mutation(individual, search_space, mutation_rate=0.1):
    # 遍历个体的所有参数并随机变异
    for param, config in search_space.items():
        if random.random() < mutation_rate:  # 以一定概率进行变异
            if "values" in config:  # 离散的选择
                individual[param] = np.random.choice(config["values"])
            elif config["type"] == "continuous":  # 连续范围的选择
                if config.get("scale") == "log":
                    # Log-scale变异：在log范围内随机选择
                    individual[param] = np.exp(np.random.uniform(np.log(config["min"]), np.log(config["max"])))
                else:
                    # Linear-scale变异：在min和max之间随机选择
                    individual[param] = np.random.uniform(config["min"], config["max"])

                # 确保整数型参数（如 units_per_layer）变异后依然为整数
                if param == "units_per_layer":
                    individual[param] = int(individual[param])

    return individual


In [16]:
# 遗传算法搜索函数
def genetic_search(train_set, val_set, epochs=10, save_dir="/content/drive/MyDrive/DL_HPO/GeneticResult"):
    search_space = ss.get_search_space("genetic")
    population_size = 50
    generations = 20
    mutation_rate = 0.1
    num_parents = 10  # 每代保留的父代数量

    all_results = []
    best_result = None
    best_val_acc = 0.0
    best_model_wts = None
    total_training_time = 0  # 初始化总训练时间

    # 确保保存路径存在
    os.makedirs(save_dir, exist_ok=True)

    # 检查是否已经存在汇总文件，如果存在，直接加载以继续
    summary_file_path = f"{save_dir}/genetic_result.csv"
    if os.path.exists(summary_file_path):
        existing_results_df = pd.read_csv(summary_file_path)
        total_training_time = existing_results_df["total_training_time"].max()  # 获取已保存的总时间

    # 初始化种群
    population = initialize_population(population_size)

    for generation in range(generations):
        print(f"Generation {generation + 1}/{generations}")

        # 评估当前种群的适应度
        fitness_scores = []
        for param_index, params in enumerate(population):
            print(params)
            # 构建模型并加载数据
            model = Builder.build_model(params)
            batch_size = int(params["batch_size"])
            train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
            val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

            # 训练模型
            result = Trainer.train_model(model, train_loader, val_loader, epochs=epochs)
            fitness_scores.append(result["val_losses"][-1])  # 使用验证损失作为适应度

            # 累加当前模型的训练时间
            total_training_time += result["training_time"]

            # 记录结果
            result_summary = {
                "generation": generation,
                "param_index": param_index,
                "params": params,
                "strategy": "genetic",
                "train_loss": result["train_losses"][-1],
                "val_loss": result["val_losses"][-1],
                "train_accuracy": result["train_accuracies"][-1],
                "val_accuracy": result["val_accuracies"][-1],
                "training_time": result["training_time"],
                "total_training_time": total_training_time
            }
            all_results.append(result_summary)

            # 追加当前结果到汇总文件
            result_df = pd.DataFrame([result_summary])
            if os.path.exists(summary_file_path):
                result_df.to_csv(summary_file_path, mode='a', header=False, index=False)
            else:
                result_df.to_csv(summary_file_path, mode='w', index=False)

            # 更新最佳模型并保存
            if result["val_accuracies"][-1] > best_val_acc:
                best_val_acc = result["val_accuracies"][-1]
                best_result = result_summary
                best_model_wts = model.state_dict()

                # 保存最佳结果到 Google Drive
                best_result_df = pd.DataFrame([best_result])
                best_result_df.to_csv(f"{save_dir}/best_genetic_result.csv", index=False)
                torch.save(best_model_wts, f"{save_dir}/best_model_weights.pt")

        # 基于适应度选择父代并生成新一代
        selected_parents = selection(population, fitness_scores, num_parents)
        new_population = []

        while len(new_population) < population_size:
            parent1, parent2 = random.sample(selected_parents, 2)
            child = crossover(parent1, parent2)
            child = mutation(child, search_space,mutation_rate)
            new_population.append(child)

        # 更新种群
        population = new_population

    return all_results, best_result, best_model_wts

In [17]:
# 下载并预处理 MNIST 数据集
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# 加载 MNIST 数据集
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# 截取 1000 个训练样本和 100 个验证样本
train_subset = Subset(train_dataset, range(3000))
val_subset = Subset(test_dataset, range(500))

In [None]:
all_results, best_result, best_model_wts = genetic_search(train_subset, val_subset, epochs=10,save_dir="/content/drive/MyDrive/DL_HPO/GeneticResult")

Generation 1/20
{'num_layers': 2, 'units_per_layer': 70, 'activation': 'sigmoid', 'learning_rate': 0.0006154567295061833, 'batch_size': 64, 'dropout_rate': 0.2456922394788097, 'l2_reg_strength': 4.83732955789123e-07}
{'num_layers': 4, 'units_per_layer': 16, 'activation': 'tanh', 'learning_rate': 0.01984254734044944, 'batch_size': 256, 'dropout_rate': 0.26957003708751615, 'l2_reg_strength': 1.1565571735797797e-06}
{'num_layers': 4, 'units_per_layer': 111, 'activation': 'relu', 'learning_rate': 0.1464281537511819, 'batch_size': 128, 'dropout_rate': 0.551353021391362, 'l2_reg_strength': 4.100657697589167e-07}
{'num_layers': 4, 'units_per_layer': 32, 'activation': 'tanh', 'learning_rate': 0.009360891820260555, 'batch_size': 256, 'dropout_rate': 0.3296643958205882, 'l2_reg_strength': 0.00014526174930766284}
{'num_layers': 3, 'units_per_layer': 51, 'activation': 'tanh', 'learning_rate': 0.22862965523679232, 'batch_size': 64, 'dropout_rate': 0.5435152460381255, 'l2_reg_strength': 2.1089952513