In [1]:
#!/usr/bin/env python3
"""
改进的Constrained NSGA-II
更好地处理离散约束条件 x1*x4 = 0
"""

import numpy as np
import matplotlib.pyplot as plt
import random
from typing import List, Tuple, Dict
import copy

class EnhancedIndividual:
    """增强的个体类，包含约束情况标识"""
    def __init__(self, variables: np.ndarray, constraint_case: int = None):
        self.variables = variables.copy()
        self.objectives = None
        self.constraints = None
        self.constraint_violation = 0.0
        self.rank = None
        self.crowding_distance = 0.0
        self.feasible = False
        self.constraint_case = constraint_case  # 1: x1=0, 2: x4=0
        
    def copy(self):
        new_ind = EnhancedIndividual(self.variables, self.constraint_case)
        new_ind.objectives = self.objectives.copy() if self.objectives is not None else None
        new_ind.constraints = self.constraints.copy() if self.constraints is not None else None
        new_ind.constraint_violation = self.constraint_violation
        new_ind.rank = self.rank
        new_ind.crowding_distance = self.crowding_distance
        new_ind.feasible = self.feasible
        return new_ind

class EnhancedConstrainedNSGAII:
    """增强的约束NSGA-II算法"""
    
    def __init__(self, population_size=100, max_generations=300, 
                 crossover_prob=0.9, mutation_prob=0.2):
        self.population_size = population_size
        self.max_generations = max_generations
        self.crossover_prob = crossover_prob
        self.mutation_prob = mutation_prob
        
        # 问题定义
        self.n_variables = 4  # x1, x2, x4, x6
        self.n_objectives = 2  # f1, f3
        self.n_constraints = 2  # x1*x4=0, x2*x6=10000
        
        # 变量边界
        self.lower_bounds = np.array([0.0, 0.0, 0.0, 0.0])
        self.upper_bounds = np.array([10.0, 200.0, 200.0, 600.0])
        
        # 历史记录
        self.history = {
            'generation': [],
            'best_f1': [],
            'best_f3': [],
            'feasible_ratio': [],
            'case1_ratio': [],
            'case2_ratio': []
        }
        
    def evaluate_individual(self, individual: EnhancedIndividual):
        """评估个体"""
        x = individual.variables
        x1, x2, x4, x6 = x[0], x[1], x[2], x[3]
        
        # 目标函数
        f1 = 35 * (x1**0.6) + 35 * (x2**0.6)
        f3 = 200 * x1 * x6 - 2000000
        
        individual.objectives = np.array([f1, f3])
        
        # 约束条件
        g1 = abs(x1 * x4)  # x1*x4 = 0
        g2 = abs(x2 * x6 - 10000)  # x2*x6 = 10000
        
        individual.constraints = np.array([g1, g2])
        
        # 约束违反度
        constraint_tolerance = 1e-6
        individual.constraint_violation = (
            max(0, g1 - constraint_tolerance) + 
            max(0, g2 - constraint_tolerance)
        )
        
        individual.feasible = individual.constraint_violation <= constraint_tolerance
        
        # 确定约束情况
        if abs(x1) < 1e-6:
            individual.constraint_case = 1  # x1 = 0
        elif abs(x4) < 1e-6:
            individual.constraint_case = 2  # x4 = 0
        else:
            individual.constraint_case = 0  # 违反约束
    
    def create_case1_individual(self) -> EnhancedIndividual:
        """创建情况1的个体: x1 = 0"""
        x1 = 0.0
        x4 = random.uniform(self.lower_bounds[2], self.upper_bounds[2])
        
        # 确保x2*x6 = 10000
        x2 = random.uniform(16.67, self.upper_bounds[1])  # x2最小值为10000/600
        x6 = 10000 / x2
        
        if x6 > self.upper_bounds[3]:
            x6 = self.upper_bounds[3]
            x2 = 10000 / x6
            
        variables = np.array([x1, x2, x4, x6])
        individual = EnhancedIndividual(variables, constraint_case=1)
        self.evaluate_individual(individual)
        return individual
    
    def create_case2_individual(self) -> EnhancedIndividual:
        """创建情况2的个体: x4 = 0"""
        x4 = 0.0
        x1 = random.uniform(0.1, self.upper_bounds[0])  # 避免x1=0
        
        # 确保x2*x6 = 10000
        x2 = random.uniform(16.67, self.upper_bounds[1])
        x6 = 10000 / x2
        
        if x6 > self.upper_bounds[3]:
            x6 = self.upper_bounds[3]
            x2 = 10000 / x6
            
        variables = np.array([x1, x2, x4, x6])
        individual = EnhancedIndividual(variables, constraint_case=2)
        self.evaluate_individual(individual)
        return individual
    
    def create_initial_population(self) -> List[EnhancedIndividual]:
        """创建平衡的初始种群"""
        population = []
        
        # 50% 情况1 (x1=0), 50% 情况2 (x4=0)
        case1_count = self.population_size // 2
        case2_count = self.population_size - case1_count
        
        # 创建情况1的个体
        for _ in range(case1_count):
            individual = self.create_case1_individual()
            population.append(individual)
        
        # 创建情况2的个体
        for _ in range(case2_count):
            individual = self.create_case2_individual()
            population.append(individual)
            
        return population
    
    def case_aware_crossover(self, parent1: EnhancedIndividual, parent2: EnhancedIndividual) -> Tuple[EnhancedIndividual, EnhancedIndividual]:
        """考虑约束情况的交叉"""
        if random.random() > self.crossover_prob:
            return parent1.copy(), parent2.copy()
        
        # 如果父代来自同一种约束情况，进行常规交叉
        if parent1.constraint_case == parent2.constraint_case:
            return self.within_case_crossover(parent1, parent2)
        else:
            # 不同约束情况，随机选择一种情况创建子代
            if random.random() < 0.5:
                return self.create_offspring_for_case(parent1.constraint_case, parent1, parent2)
            else:
                return self.create_offspring_for_case(parent2.constraint_case, parent1, parent2)
    
    def within_case_crossover(self, parent1: EnhancedIndividual, parent2: EnhancedIndividual) -> Tuple[EnhancedIndividual, EnhancedIndividual]:
        """同一约束情况内的交叉"""
        eta_c = 20
        child1_vars = np.zeros(self.n_variables)
        child2_vars = np.zeros(self.n_variables)
        
        for i in range(self.n_variables):
            if random.random() <= 0.5:
                p1_val = parent1.variables[i]
                p2_val = parent2.variables[i]
                
                if abs(p1_val - p2_val) > 1e-14:
                    u = random.random()
                    if u <= 0.5:
                        beta = (2 * u) ** (1 / (eta_c + 1))
                    else:
                        beta = (1 / (2 * (1 - u))) ** (1 / (eta_c + 1))
                    
                    child1_vars[i] = 0.5 * ((1 + beta) * p1_val + (1 - beta) * p2_val)
                    child2_vars[i] = 0.5 * ((1 - beta) * p1_val + (1 + beta) * p2_val)
                else:
                    child1_vars[i] = p1_val
                    child2_vars[i] = p2_val
            else:
                child1_vars[i] = parent1.variables[i]
                child2_vars[i] = parent2.variables[i]
        
        # 边界检查
        child1_vars = np.clip(child1_vars, self.lower_bounds, self.upper_bounds)
        child2_vars = np.clip(child2_vars, self.lower_bounds, self.upper_bounds)
        
        # 根据约束情况修复
        case = parent1.constraint_case
        child1_vars = self.repair_for_case(child1_vars, case)
        child2_vars = self.repair_for_case(child2_vars, case)
        
        child1 = EnhancedIndividual(child1_vars, case)
        child2 = EnhancedIndividual(child2_vars, case)
        
        return child1, child2
    
    def create_offspring_for_case(self, case: int, parent1: EnhancedIndividual, parent2: EnhancedIndividual) -> Tuple[EnhancedIndividual, EnhancedIndividual]:
        """为特定约束情况创建子代"""
        if case == 1:
            child1 = self.create_case1_individual()
            child2 = self.create_case1_individual()
        else:
            child1 = self.create_case2_individual()
            child2 = self.create_case2_individual()
            
        return child1, child2
    
    def repair_for_case(self, variables: np.ndarray, case: int) -> np.ndarray:
        """根据约束情况修复变量"""
        x = variables.copy()
        
        if case == 1:  # x1 = 0
            x[0] = 0.0
        elif case == 2:  # x4 = 0
            x[2] = 0.0
        
        # 修复 x2*x6 = 10000
        if x[1] > 0:
            target_x6 = 10000 / x[1]
            if target_x6 <= self.upper_bounds[3]:
                x[3] = target_x6
            else:
                x[1] = 10000 / self.upper_bounds[3]
                x[3] = self.upper_bounds[3]
        else:
            x[1] = 50.0
            x[3] = 200.0
            
        return x
    
    def case_aware_mutation(self, individual: EnhancedIndividual) -> EnhancedIndividual:
        """考虑约束情况的变异"""
        if random.random() > self.mutation_prob:
            return individual.copy()
        
        # 有小概率改变约束情况
        if random.random() < 0.1:
            if individual.constraint_case == 1:
                return self.create_case2_individual()
            else:
                return self.create_case1_individual()
        
        # 在同一约束情况内变异
        eta_m = 20
        mutated = individual.copy()
        
        # 根据约束情况决定哪些变量可以变异
        if individual.constraint_case == 1:  # x1 = 0
            mutable_indices = [1, 2, 3]  # x2, x4, x6
        else:  # x4 = 0
            mutable_indices = [0, 1, 3]  # x1, x2, x6
        
        for i in mutable_indices:
            if random.random() <= (1.0 / len(mutable_indices)):
                val = mutated.variables[i]
                delta_l = val - self.lower_bounds[i]
                delta_u = self.upper_bounds[i] - val
                
                rnd = random.random()
                
                if rnd < 0.5:
                    bl = 2 * rnd
                    delta = (bl ** (1 / (eta_m + 1))) - 1
                    mutated.variables[i] = val + delta * delta_l
                else:
                    bu = 2 * (1 - rnd)
                    delta = 1 - (bu ** (1 / (eta_m + 1)))
                    mutated.variables[i] = val + delta * delta_u
        
        # 边界检查和约束修复
        mutated.variables = np.clip(mutated.variables, self.lower_bounds, self.upper_bounds)
        mutated.variables = self.repair_for_case(mutated.variables, individual.constraint_case)
        
        return mutated
    
    def tournament_selection(self, population: List[EnhancedIndividual], tournament_size=3) -> EnhancedIndividual:
        """锦标赛选择"""
        tournament = random.sample(population, tournament_size)
        
        # 首先按可行性排序
        feasible = [ind for ind in tournament if ind.feasible]
        infeasible = [ind for ind in tournament if not ind.feasible]
        
        if feasible:
            tournament = feasible
            tournament.sort(key=lambda x: (x.rank, -x.crowding_distance))
        else:
            tournament.sort(key=lambda x: x.constraint_violation)
            
        return tournament[0]
    
    def fast_non_dominated_sort(self, population: List[EnhancedIndividual]) -> List[List[EnhancedIndividual]]:
        """快速非支配排序"""
        feasible = [ind for ind in population if ind.feasible]
        infeasible = [ind for ind in population if not ind.feasible]
        
        fronts = []
        
        if feasible:
            feasible_fronts = self._non_dominated_sort(feasible)
            fronts.extend(feasible_fronts)
        
        if infeasible:
            infeasible.sort(key=lambda x: x.constraint_violation)
            fronts.append(infeasible)
        
        # 分配rank
        for rank, front in enumerate(fronts):
            for ind in front:
                ind.rank = rank
                
        return fronts
    
    def _non_dominated_sort(self, population: List[EnhancedIndividual]) -> List[List[EnhancedIndividual]]:
        """标准非支配排序"""
        n = len(population)
        domination_counts = [0] * n
        dominated_solutions = [[] for _ in range(n)]
        
        for i in range(n):
            for j in range(i+1, n):
                if self.dominates(population[i], population[j]):
                    dominated_solutions[i].append(j)
                    domination_counts[j] += 1
                elif self.dominates(population[j], population[i]):
                    dominated_solutions[j].append(i)
                    domination_counts[i] += 1
        
        fronts = []
        current_front = [i for i in range(n) if domination_counts[i] == 0]
        
        while current_front:
            fronts.append([population[i] for i in current_front])
            next_front = []
            
            for i in current_front:
                for j in dominated_solutions[i]:
                    domination_counts[j] -= 1
                    if domination_counts[j] == 0:
                        next_front.append(j)
            
            current_front = next_front
            
        return fronts
    
    def dominates(self, ind1: EnhancedIndividual, ind2: EnhancedIndividual) -> bool:
        """判断支配关系"""
        if not ind1.feasible and not ind2.feasible:
            return ind1.constraint_violation < ind2.constraint_violation
        elif ind1.feasible and not ind2.feasible:
            return True
        elif not ind1.feasible and ind2.feasible:
            return False
        else:
            # 两个都可行，比较目标函数
            better_in_all = True
            better_in_one = False
            
            for i in range(len(ind1.objectives)):
                if ind1.objectives[i] > ind2.objectives[i]:  # 最小化
                    better_in_all = False
                elif ind1.objectives[i] < ind2.objectives[i]:
                    better_in_one = True
                    
            return better_in_all and better_in_one
    
    def calculate_crowding_distance(self, front: List[EnhancedIndividual]):
        """计算拥挤距离"""
        if len(front) <= 2:
            for ind in front:
                ind.crowding_distance = float('inf')
            return
            
        for ind in front:
            ind.crowding_distance = 0
            
        for obj_idx in range(self.n_objectives):
            front.sort(key=lambda x: x.objectives[obj_idx])
            
            front[0].crowding_distance = float('inf')
            front[-1].crowding_distance = float('inf')
            
            obj_range = front[-1].objectives[obj_idx] - front[0].objectives[obj_idx]
            if obj_range > 0:
                for i in range(1, len(front)-1):
                    distance = (front[i+1].objectives[obj_idx] - front[i-1].objectives[obj_idx]) / obj_range
                    front[i].crowding_distance += distance
    
    def environmental_selection(self, population: List[EnhancedIndividual]) -> List[EnhancedIndividual]:
        """环境选择"""
        fronts = self.fast_non_dominated_sort(population)
        new_population = []
        
        for front in fronts:
            if len(new_population) + len(front) <= self.population_size:
                new_population.extend(front)
            else:
                self.calculate_crowding_distance(front)
                front.sort(key=lambda x: -x.crowding_distance)
                remaining = self.population_size - len(new_population)
                new_population.extend(front[:remaining])
                break
                
        return new_population
    
    def run(self) -> Dict:
        """运行算法"""
        print("初始化平衡种群...")
        population = self.create_initial_population()
        
        print(f"开始进化 (共{self.max_generations}代)...")
        
        for generation in range(self.max_generations):
            # 创建子代
            offspring = []
            
            while len(offspring) < self.population_size:
                parent1 = self.tournament_selection(population)
                parent2 = self.tournament_selection(population)
                
                child1, child2 = self.case_aware_crossover(parent1, parent2)
                child1 = self.case_aware_mutation(child1)
                child2 = self.case_aware_mutation(child2)
                
                self.evaluate_individual(child1)
                self.evaluate_individual(child2)
                
                offspring.extend([child1, child2])
            
            # 合并父代和子代
            combined_population = population + offspring[:self.population_size]
            
            # 环境选择
            population = self.environmental_selection(combined_population)
            
            # 统计信息
            feasible_pop = [ind for ind in population if ind.feasible]
            case1_pop = [ind for ind in population if ind.constraint_case == 1]
            case2_pop = [ind for ind in population if ind.constraint_case == 2]
            
            if feasible_pop:
                best_f1 = min(ind.objectives[0] for ind in feasible_pop)
                best_f3 = min(ind.objectives[1] for ind in feasible_pop)
            else:
                best_f1 = min(ind.objectives[0] for ind in population)
                best_f3 = min(ind.objectives[1] for ind in population)
            
            feasible_ratio = len(feasible_pop) / len(population)
            case1_ratio = len(case1_pop) / len(population)
            case2_ratio = len(case2_pop) / len(population)
            
            # 记录历史
            self.history['generation'].append(generation)
            self.history['best_f1'].append(best_f1)
            self.history['best_f3'].append(best_f3)
            self.history['feasible_ratio'].append(feasible_ratio)
            self.history['case1_ratio'].append(case1_ratio)
            self.history['case2_ratio'].append(case2_ratio)
            
            if generation % 30 == 0:
                print(f"第{generation}代: 可行解={feasible_ratio:.1%}, "
                      f"情况1={case1_ratio:.1%}, 情况2={case2_ratio:.1%}, "
                      f"最佳f1={best_f1:.1f}, 最佳f3={best_f3:.0f}")
        
        # 提取Pareto前沿
        feasible_pop = [ind for ind in population if ind.feasible]
        if not feasible_pop:
            print("警告: 没有找到可行解!")
            feasible_pop = population
            
        pareto_front = self._extract_pareto_front(feasible_pop)
        
        return {
            'population': population,
            'pareto_front': pareto_front,
            'history': self.history
        }
    
    def _extract_pareto_front(self, population: List[EnhancedIndividual]) -> List[EnhancedIndividual]:
        """提取Pareto前沿"""
        fronts = self._non_dominated_sort(population)
        return fronts[0] if fronts else []

def main():
    """主函数"""
    print("=" * 70)
    print("Enhanced Constrained NSGA-II for Heat Exchanger Network Design")
    print("=" * 70)
    
    # 初始化算法
    nsga2 = EnhancedConstrainedNSGAII(
        population_size=100,
        max_generations=300,
        crossover_prob=0.9,
        mutation_prob=0.2
    )
    
    # 运行算法
    results = nsga2.run()
    
    # 分析结果
    print("\n" + "=" * 60)
    print("结果分析")
    print("=" * 60)
    
    pareto_front = results['pareto_front']
    population = results['population']
    
    feasible_count = len([ind for ind in population if ind.feasible])
    case1_count = len([ind for ind in population if ind.constraint_case == 1])
    case2_count = len([ind for ind in population if ind.constraint_case == 2])
    
    print(f"最终可行解数量: {feasible_count}/{len(population)}")
    print(f"情况1 (x1=0) 解数量: {case1_count}")
    print(f"情况2 (x4=0) 解数量: {case2_count}")
    print(f"Pareto最优解数量: {len(pareto_front)}")
    
    if pareto_front:
        print(f"\nPareto前沿解:")
        print("No.  Case  x1      x2      x4      x6      f1        f3")
        print("-" * 65)
        
        # 按约束情况分组显示
        case1_solutions = [ind for ind in pareto_front if ind.constraint_case == 1]
        case2_solutions = [ind for ind in pareto_front if ind.constraint_case == 2]
        
        print("情况1 (x1=0):")
        for i, ind in enumerate(case1_solutions[:5]):
            x1, x2, x4, x6 = ind.variables
            f1, f3 = ind.objectives
            print(f"{i+1:2d}   1    {x1:6.3f}  {x2:6.1f}  {x4:6.1f}  {x6:6.1f}  {f1:8.1f}  {f3:8.0f}")
        
        print("\n情况2 (x4=0):")
        for i, ind in enumerate(case2_solutions[:5]):
            x1, x2, x4, x6 = ind.variables
            f1, f3 = ind.objectives
            print(f"{i+1:2d}   2    {x1:6.3f}  {x2:6.1f}  {x4:6.1f}  {x6:6.1f}  {f1:8.1f}  {f3:8.0f}")
    
    # 保存结果
    save_enhanced_results(results, nsga2)
    
    return results

def save_enhanced_results(results, nsga2):
    """保存增强版结果"""
    import json
    
    # 准备数据
    pareto_data = []
    for ind in results['pareto_front']:
        pareto_data.append({
            'variables': ind.variables.tolist(),
            'objectives': ind.objectives.tolist(),
            'constraint_case': ind.constraint_case,
            'feasible': ind.feasible
        })
    
    # 保存到JSON
    output_data = {
        'problem_info': {
            'n_variables': nsga2.n_variables,
            'n_objectives': nsga2.n_objectives,
            'n_constraints': nsga2.n_constraints,
            'population_size': nsga2.population_size,
            'max_generations': nsga2.max_generations
        },
        'pareto_front': pareto_data,
        'history': results['history']
    }
    
    with open('enhanced_nsga2_results.json', 'w') as f:
        json.dump(output_data, f, indent=2)
    
    print(f"\n增强版结果已保存到: /enhanced_nsga2_results.json")

if __name__ == "__main__":
    results = main()

Enhanced Constrained NSGA-II for Heat Exchanger Network Design
初始化平衡种群...
开始进化 (共300代)...
第0代: 可行解=100.0%, 情况1=53.0%, 情况2=47.0%, 最佳f1=236.0, 最佳f3=-2000000
第30代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第60代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第90代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第120代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第150代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第180代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第210代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第240代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000
第270代: 可行解=100.0%, 情况1=100.0%, 情况2=0.0%, 最佳f1=189.3, 最佳f3=-2000000

结果分析
最终可行解数量: 100/100
情况1 (x1=0) 解数量: 100
情况2 (x4=0) 解数量: 0
Pareto最优解数量: 100

Pareto前沿解:
No.  Case  x1      x2      x4      x6      f1        f3
-----------------------------------------------------------------
情况1 (x1=0):
 1   1     0.000    1