In [120]:
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
from enum import Enum
import random
import copy
import plotly.graph_objects as go

# 假设你已经定义了 Operation, OrderMultiFeature 和 StaticParameters 类
# 为了代码完整性，我在这里重新定义一下，但请注意，实际使用时你将直接引用你已有的变量。

class Operation(Enum):
    HR = "HR"
    AZ = "AZ"
    CA = "CA"

@dataclass
class OrderMultiFeature:
    """
    订单多特征类
    """
    order_no: str
    delivery_date: datetime # datetime对象,形如20241225
    plan_start_date: datetime # datetime对象, 形如20241101
    order_wt: float
    order_width: float
    order_thick: float

@dataclass
class StaticParameters:
    """
    静态参数类
    """
    operation_sequence: List[Operation] = field(default_factory=lambda: [Operation.HR, Operation.AZ, Operation.CA])
    transmission_time: Dict[Operation, float] = field(default_factory=lambda: {
        Operation.HR: 2.0,
        Operation.AZ: 4.0,
        Operation.CA: 1.0
    })
    speed: Dict[Operation, List[float]] = field(default_factory=lambda: {
        Operation.HR: [20.0, 15.0],
        Operation.AZ: [15.0, 16.0],
        Operation.CA: [10.0, 15.0, 12.0]
    })
    stock_limit: Dict[Operation, List[float]] = field(default_factory=lambda: {
        Operation.HR: [0.0, 1000.0],
        Operation.AZ: [0.0, 1000.0],
        Operation.CA: [0.0, 1000.0]
    })
    # 新增搭接费用参数，简化处理，实际中可能更复杂
    changeover_cost_matrix: Dict[Operation, Dict[Tuple[float, float], float]] = field(default_factory=dict)
    delay_penalty_per_hour: float = 100.0 # 拖期惩罚每小时
    width_change_penalty_factor: float = 1.0 # 宽度变化惩罚因子，从窄到宽惩罚高

    def __post_init__(self):
        # 预计算搭接费用矩阵 (简化版，仅考虑宽度从宽到窄和从窄到宽两种情况)
        # 实际生产中搭接费用会更复杂，这里仅为示例
        widths = sorted(list(set([o.order_width for o in orders_obj_list]))) # 假设orders_obj_list已传入
        for op in self.operation_sequence:
            self.changeover_cost_matrix[op] = {}
            for w1 in widths:
                for w2 in widths:
                    if w1 == w2:
                        self.changeover_cost_matrix[op][(w1, w2)] = 0.0
                    elif w1 > w2: # 从宽到窄
                        self.changeover_cost_matrix[op][(w1, w2)] = (w1 - w2) * 5.0 # 假设一个较小费用
                    else: # 从窄到宽
                        self.changeover_cost_matrix[op][(w1, w2)] = (w2 - w1) * 20.0 * self.width_change_penalty_factor # 假设一个较高费用

@dataclass
class ScheduleEntry:
    """
    单个合同在某个工序某个机组上的生产计划
    """
    order_no: str
    operation: Operation
    machine_id: int # 机组ID，从0开始
    start_time: datetime
    end_time: datetime
    order_wt: float
    order_width: float

@dataclass
class MachineSchedule:
    """
    单个机组的生产时间线
    """
    machine_id: int
    operation: Operation
    schedule: List[ScheduleEntry] = field(default_factory=list)

@dataclass
class ProductionPlan:
    """
    一个完整的生产方案，包含所有工序和机组的调度
    """
    order_priority: List[str] # 对应染色体，订单号的优先级列表
    schedules: Dict[Operation, Dict[int, MachineSchedule]] = field(default_factory=dict)
    total_tardiness: float = 0.0
    total_cost: float = 0.0
    is_feasible: bool = True
    stock_violations: Dict[Operation, List[Tuple[datetime, float, float]]] = field(default_factory=dict) # (时间, 当前库存, 限制)



In [121]:
class Scheduler:
    def __init__(self, orders: List[OrderMultiFeature], params: StaticParameters):
        self.orders = orders
        self.orders_map = {order.order_no: order for order in orders}
        self.params = params

    def get_production_time(self, order_wt: float, machine_speed: float) -> timedelta:
        """计算生产所需时间"""
        if machine_speed <= 0: # 避免除以零或负速度
            return timedelta(days=9999) # 视为无限长时间
        return timedelta(hours=order_wt / machine_speed)

    def find_available_machine_slot(self, operation: Operation, machine_schedules: Dict[int, MachineSchedule],
                                    earliest_start: datetime, latest_end: datetime,
                                    current_order_no: str, current_order_width: float,
                                    consider_stock_at_slot_time: bool = False,
                                    current_plan_for_stock_check: Optional[ProductionPlan] = None) -> Optional[Tuple[int, datetime, datetime, float]]:
        """
        在给定时间范围内，找到最合适的机组和时间段。
        选择空闲机组中处理速率快的，且必须在 latest_end 之前完成。
        """
        order_to_schedule = self.orders_map[current_order_no]
        available_slots = []

        # 遍历所有机组，寻找可能的排程
        for machine_id, speed in enumerate(self.params.speed[operation]):
            production_duration = self.get_production_time(order_to_schedule.order_wt, speed)
            if production_duration > timedelta(days=900): # 避免处理时间过长，视为不可行
                continue

            machine_schedule = machine_schedules[machine_id].schedule

            # 尝试在第一个合同之前插入
            if not machine_schedule:
                # 检查是否能在 latest_end 前完成
                possible_end_time = earliest_start + production_duration
                if possible_end_time <= latest_end:
                    available_slots.append((machine_id, earliest_start, possible_end_time, speed))
            else:
                first_entry = machine_schedule[0]
                if earliest_start + production_duration <= first_entry.start_time:
                    if earliest_start + production_duration <= latest_end:
                        available_slots.append((machine_id, earliest_start, earliest_start + production_duration, speed))

            # 尝试在现有合同之间插入
            for i in range(len(machine_schedule) - 1):
                prev_entry = machine_schedule[i]
                next_entry = machine_schedule[i+1]

                # 可用槽的开始时间是 max(earliest_start, 前一个合同的结束时间)
                slot_start = max(earliest_start, prev_entry.end_time)
                slot_end = slot_start + production_duration

                if slot_end <= next_entry.start_time and slot_end <= latest_end:
                    available_slots.append((machine_id, slot_start, slot_end, speed))

            # 尝试在最后一个合同之后插入
            if machine_schedule: # 如果机组有排程
                last_entry = machine_schedule[-1]
                slot_start = max(earliest_start, last_entry.end_time)
                slot_end = slot_start + production_duration
                if slot_end <= latest_end:
                    available_slots.append((machine_id, slot_start, slot_end, speed))
            # else: # 如果机组为空，这部分逻辑已经在上面第一个 if-else 块处理了

        # 从可用插槽中选择最优的：优先选择处理速率快的，并且使得结束时间尽量早
        if not available_slots:
            return None # 严格返回 None，表示没有找到满足 latest_end 的槽位

        # 排序策略：优先选择结束时间早的，然后在结束时间相同的情况下，选择速度快的（即生产时长短的）
        available_slots.sort(key=lambda x: (x[2], (x[2] - x[1]).total_seconds()))

        best_slot = available_slots[0]

        # 实时库存检查 (如果启用) - 这是一个更复杂的扩展，暂时先保留在最终检查
        # 如果需要实时检查，这里需要一个临时的计划，并计算该调度对库存的影响
        # 这里为了简化，我们先不实时检查库存，保持在 calculate_stock_levels 阶段统一检查。
        # 如果要实时检查，需要在每次 find_available_machine_slot 后，
        # 尝试创建一个临时的 ScheduleEntry，并评估它是否会导致库存违规。

        return best_slot

    def calculate_stock_levels(self, plan: ProductionPlan) -> Dict[Operation, Dict[datetime, float]]:
        """
        计算每个工序的库存水平随时间的变化。
        考虑同一时间点事件的顺序：先出库，后入库。
        """
        stock_changes = {op: [] for op in self.params.operation_sequence} # 存储 (时间, 变化量)

        # 收集所有生产事件的开始和结束时间
        for op_idx, op in enumerate(self.params.operation_sequence):
            for machine_id in plan.schedules.get(op, {}):
                for entry in plan.schedules[op][machine_id].schedule:
                    # 合同生产完成，进入后库 (入库事件)
                    stock_changes[op].append((entry.end_time, entry.order_wt))

                    # 如果不是最后一个工序，考虑下一个工序的消耗
                    if op_idx < len(self.params.operation_sequence) - 1:
                        next_op = self.params.operation_sequence[op_idx + 1]
                        # 查找该合同在下一个工序的开始时间
                        for next_machine_id in plan.schedules.get(next_op, {}):
                            for next_entry in plan.schedules[next_op][next_machine_id].schedule:
                                if next_entry.order_no == entry.order_no:
                                    # 下一个工序开始生产，库存减少 (出库事件)
                                    stock_changes[op].append((next_entry.start_time, -entry.order_wt))
                                    break
                            else:
                                continue
                            break

        # 对于 CA 的成品后库，只有入库，没有出库到下一个工序的事件
        # 我们可以认为，CA 的出库发生在 `CA_finish_time + transmission_time[CA]` 时，此时库存减少
        # 但按照你的描述 "下一个产线还未开始生产，此时库存增加...如果某一时刻这个合同在下一个产线开始生产，则库存减去这个合同的重量。"
        # 对于 CA，它没有下一个产线，所以其库存只增加，不减少。
        # 如果成品出库是按交货期或完成时间出库，需要在此处添加 CA 的出库逻辑。
        # 假设成品库只计算生产完成后的堆积，不考虑客户取走。

        stock_history = {op: {datetime.min: 0.0} for op in self.params.operation_sequence}

        for op in self.params.operation_sequence:
            # 按时间排序事件，相同时间点：先出库 (负变化量)，后入库 (正变化量)
            op_events = sorted(stock_changes[op], key=lambda x: (x[0], x[1])) # x[1] 为变化量，负值排在前面

            current_stock = 0.0
            for t, change in op_events:
                current_stock += change
                stock_history[op][t] = current_stock

        return stock_history

    def check_stock_constraints(self, plan: ProductionPlan) -> bool:
        """
        检查生产计划是否满足库存约束。
        如果违规，记录违规信息。
        """
        is_feasible = True
        stock_history_per_op = self.calculate_stock_levels(plan)
        plan.stock_violations = {op: [] for op in self.params.operation_sequence}

        for op in self.params.operation_sequence:
            min_stock, max_stock = self.params.stock_limit[op]
            sorted_times = sorted(stock_history_per_op[op].keys())

            # 确保在每个时间点都检查库存
            for t in sorted_times:
                current_stock = stock_history_per_op[op][t]
                if not (min_stock <= current_stock <= max_stock):
                    # 记录违规信息，包括违规时间、实际库存和限制
                    plan.stock_violations[op].append((t, current_stock, (min_stock, max_stock)))
                    is_feasible = False
                    # 发现一个违规就可以停止对当前工序的检查，因为已经确定不可行
                    # break # 也可以不break，收集所有违规
        return is_feasible


    def generate_production_plan(self, order_priority: List[str]) -> ProductionPlan:
        """
        根据订单优先级生成一个生产方案。
        按需调度：从最下游产线 (CA) 开始逆向规划。
        """
        plan = ProductionPlan(order_priority=order_priority)
        plan.schedules = {op: {mid: MachineSchedule(mid, op) for mid in range(len(self.params.speed[op]))}
                          for op in self.params.operation_sequence}

        # 逆向规划：从最下游产线 (CA) 开始
        for order_no in order_priority:
            order = self.orders_map[order_no]

            # 存储该订单在所有工序的调度结果，方便后续查找下游工序的开始时间
            order_schedules_in_plan: Dict[Operation, ScheduleEntry] = {}

            # 从最下游工序 (CA) 向上游工序 (AZ, HR) 规划
            for op_idx in range(len(self.params.operation_sequence) - 1, -1, -1): # CA -> AZ -> HR
                current_op = self.params.operation_sequence[op_idx]

                # 计算当前工序的最晚完成时间 (latest_end)
                if current_op == Operation.CA:
                    # CA 的最晚完成时间是订单交期减去其出库转运时间 (如果是成品出库，则需要)
                    # 这里定义为：CA 工序本身的生产必须在 order.delivery_date 之前完成
                    # 如果转运时间是从 CA 完成后开始计算到客户手中的，那么 CA 的生产结束时间就是交货期
                    # 但为了给转运留时间，通常是 delivery_date - transmission_time
                    latest_end_for_current_op = order.delivery_date - timedelta(hours=self.params.transmission_time[current_op])
                    print(f"CA 最晚结束时间：{latest_end_for_current_op}")
                else:
                    # 上游工序的最晚完成时间是下游工序的开始时间减去向下游的转运时间
                    downstream_op = self.params.operation_sequence[op_idx + 1]
                    downstream_entry = order_schedules_in_plan.get(downstream_op)

                    if downstream_entry is None:
                        # 这不应该发生，因为我们是逆向规划，下游工序应该已经调度完成
                        # 如果出现，说明逻辑有误，或者上游无法满足下游需求
                        # 暂时先标记为不可行
                        print(f"Error: Downstream schedule for {order_no} in {downstream_op.value} not found during {current_op.value} planning.")
                        plan.is_feasible = False
                        return plan

                    # 上游工序必须在下游工序开始前，将物料转运到位
                    # 所以上游工序的结束时间 <= 下游工序的开始时间 - (上游到下游的转运时间)
                    latest_end_for_current_op = downstream_entry.start_time - timedelta(hours=self.params.transmission_time[downstream_op])
                    print(f"{current_op.value} 最晚开始时间：{latest_end_for_current_op}")

                # 计算当前工序的最早开始时间 (earliest_start)
                # 理论上可以从订单计划开始日期开始，但也要考虑机组的可用性
                earliest_start_for_current_op = order.plan_start_date # 这是订单最早可以开始生产的时间

                # 确保最早开始时间不晚于最晚结束时间，否则无解
                if earliest_start_for_current_op >= latest_end_for_current_op:
                    plan.is_feasible = False
                    print(f"Order {order_no} at {current_op.value}: earliest_start ({earliest_start_for_current_op}) >= latest_end ({latest_end_for_current_op})")
                    return plan

                # 在机组中找到合适的插槽
                # 传入 `current_plan_for_stock_check` 是为了在 find_available_machine_slot 中可以进行实时库存检查
                machine_slot = self.find_available_machine_slot(
                    current_op,
                    plan.schedules[current_op], # 传入当前工序的机组调度状态
                    earliest_start_for_current_op,
                    latest_end_for_current_op,
                    order_no,
                    order.order_width,
                    # consider_stock_at_slot_time=True, # 暂时不开启实时库存检查，等调度逻辑稳定再考虑
                    # current_plan_for_stock_check=plan
                )

                if machine_slot:
                    machine_id, start_time, end_time, speed = machine_slot
                    entry = ScheduleEntry(
                        order_no=order_no,
                        operation=current_op,
                        machine_id=machine_id,
                        start_time=start_time,
                        end_time=end_time,
                        order_wt=order.order_wt,
                        order_width=order.order_width
                    )
                    plan.schedules[current_op][machine_id].schedule.append(entry)
                    plan.schedules[current_op][machine_id].schedule.sort(key=lambda x: x.start_time) # 保持机组时间线有序
                    order_schedules_in_plan[current_op] = entry # 记录当前订单在当前工序的调度结果

                else:
                    # 无法找到可行的插槽，方案不可行
                    plan.is_feasible = False
                    print(f"Order {order_no} at {current_op.value}: No slot found between {earliest_start_for_current_op} and {latest_end_for_current_op}")
                    return plan # 直接返回，标记为不可行

        # 2. 检查库存约束 (在所有调度完成后统一检查)
        plan.is_feasible = self.check_stock_constraints(plan)
        if not plan.is_feasible:
            print("库存约束违规。")
            return plan

        # 3. 计算拖期和成本
        plan.total_tardiness = self.calculate_tardiness(plan)
        plan.total_cost = self.calculate_production_cost(plan)

        return plan

In [122]:
class GeneticAlgorithm:
    def __init__(self, orders: List[OrderMultiFeature], params: StaticParameters,
                 population_size: int = 50, generations: int = 100,
                 mutation_rate: float = 0.1, crossover_rate: float = 0.8):
        self.orders = orders
        self.params = params
        self.population_size = population_size
        self.generations = generations
        self.mutation_rate = mutation_rate
        self.crossover_rate = crossover_rate
        self.scheduler = Scheduler(orders, params)
        self.order_nos = [order.order_no for order in orders]

    def initialize_population(self) -> List[List[str]]:
        """
        初始化种群，生成多种优先级染色体。
        """
        population = []

        # 1. 随机优先级
        for _ in range(self.population_size // 3):
            random_priority = self.order_nos[:]
            random.shuffle(random_priority)
            population.append(random_priority)

        # 2. 交期早的优先级高 (EDD - Earliest Due Date)
        edd_priority = sorted(self.orders, key=lambda x: x.delivery_date)
        population.append([o.order_no for o in edd_priority])

        # 3. 交期晚的优先级高 (LDD - Latest Due Date)
        ldd_priority = sorted(self.orders, key=lambda x: x.delivery_date, reverse=True)
        population.append([o.order_no for o in ldd_priority])

        # 4. 计划开始日期早的优先级高 (SPT - Shortest Processing Time is also an option, but not in our current data)
        # 这里用 plan_start_date 替代类似概念
        spt_priority = sorted(self.orders, key=lambda x: x.plan_start_date)
        population.append([o.order_no for o in spt_priority])

        # 5. 宽度递增排序 (假设有助于减少搭接成本，宽到窄费用低)
        width_inc_priority = sorted(self.orders, key=lambda x: x.order_width)
        population.append([o.order_no for o in width_inc_priority])

        # 6. 宽度递减排序
        width_dec_priority = sorted(self.orders, key=lambda x: x.order_width, reverse=True)
        population.append([o.order_no for o in width_dec_priority])

        # 填充剩余部分，确保种群大小
        while len(population) < self.population_size:
            random_priority = self.order_nos[:]
            random.shuffle(random_priority)
            population.append(random_priority)

        return population

    def calculate_fitness(self, plan: ProductionPlan) -> float:
        """
        计算适应度。目标是最小化拖期和生产成本。
        适应度函数通常是最大化问题，所以我们将成本和拖期取负数或者倒数。
        对于不可行方案，给予非常低的适应度。
        """
        if not plan.is_feasible:
            # 对于不可行方案，给予一个非常低的惩罚值
            return -1e9

        # 惩罚函数，将最小化问题转化为最大化问题
        # 拖期惩罚系数，成本惩罚系数
        tardiness_penalty = plan.total_tardiness * self.params.delay_penalty_per_hour
        total_penalty = tardiness_penalty + plan.total_cost

        # 适应度 = 1 / (1 + total_penalty) 或者 (max_penalty - total_penalty)
        # 这里我们使用一个简单的负值，目标是让其最大化（即 total_penalty 最小化）
        return -total_penalty

    def select(self, population_plans: List[ProductionPlan]) -> List[List[str]]:
        """
        选择操作：轮盘赌选择 (Roulette Wheel Selection) 或者锦标赛选择 (Tournament Selection)。
        这里使用锦标赛选择，更稳定。
        """
        selected_chromosomes = []
        # 过滤掉不合法的染色体
        feasible_plans = [p for p in population_plans if p.is_feasible]

        if not feasible_plans: # 如果所有方案都不可行，则随机选择
            for _ in range(self.population_size):
                selected_chromosomes.append(random.choice(population_plans).order_priority)
            return selected_chromosomes

        # 进行锦标赛选择
        tournament_size = 5 # 每次锦标赛选5个个体
        for _ in range(self.population_size):
            # 从可行方案中选择
            tournament_candidates = random.sample(feasible_plans, min(tournament_size, len(feasible_plans)))
            winner = max(tournament_candidates, key=lambda p: self.calculate_fitness(p))
            selected_chromosomes.append(winner.order_priority)

        return selected_chromosomes

    def crossover(self, parent1: List[str], parent2: List[str]) -> Tuple[List[str], List[str]]:
        """
        交叉操作：顺序交叉 (Order Crossover - OX) 适用于排列问题。
        """
        if random.random() < self.crossover_rate:
            size = len(parent1)
            p1, p2 = [0] * size, [0] * size

            # 随机选择两个交叉点
            cx_point1, cx_point2 = sorted(random.sample(range(size), 2))

            # 复制中间段
            p1[cx_point1:cx_point2] = parent2[cx_point1:cx_point2]
            p2[cx_point1:cx_point2] = parent1[cx_point1:cx_point2]

            # 填充剩余部分
            # 对于 P1，从 P1 的 cx_point2 开始，按 P2 的顺序填充未被复制的元素
            fill_p1_idx = cx_point2
            for item in parent1:
                if item not in p1[cx_point1:cx_point2]:
                    if fill_p1_idx >= size:
                        fill_p1_idx = (fill_p1_idx % size) # 循环填充
                    p1[fill_p1_idx] = item
                    fill_p1_idx += 1

            # 对于 P2，从 P2 的 cx_point2 开始，按 P1 的顺序填充未被复制的元素
            fill_p2_idx = cx_point2
            for item in parent2:
                if item not in p2[cx_point1:cx_point2]:
                    if fill_p2_idx >= size:
                        fill_p2_idx = (fill_p2_idx % size) # 循环填充
                    p2[fill_p2_idx] = item
                    fill_p2_idx += 1
            return p1, p2
        else:
            return parent1, parent2

    def mutate(self, chromosome: List[str]) -> List[str]:
        """
        变异操作：交换变异 (Swap Mutation)，随机交换两个元素。
        """
        if random.random() < self.mutation_rate:
            idx1, idx2 = random.sample(range(len(chromosome)), 2)
            chromosome[idx1], chromosome[idx2] = chromosome[idx2], chromosome[idx1]
        return chromosome

    def run(self) -> ProductionPlan:
        """
        运行遗传算法。
        """
        population = self.initialize_population()
        best_plan: Optional[ProductionPlan] = None
        best_fitness = -float('inf')

        for generation in range(self.generations):
            if (generation+1) % 10 == 0:
                print(f"Generation {generation+1}/{self.generations}, best_plan {best_fitness}")

            # 评估种群中的每个个体
            population_plans = []

            for chromosome in population:
                plan = self.scheduler.generate_production_plan(chromosome)

                population_plans.append(plan)

                fitness = self.calculate_fitness(plan)
                if fitness > best_fitness:
                    best_fitness = fitness
                    best_plan = copy.deepcopy(plan) # 深拷贝最优方案

            # 选择
            selected_chromosomes = self.select(population_plans)

            # 交叉和变异，生成新的种群
            new_population = []
            while len(new_population) < self.population_size:
                parent1 = random.choice(selected_chromosomes)
                parent2 = random.choice(selected_chromosomes)

                child1, child2 = self.crossover(parent1, parent2)

                new_population.append(self.mutate(child1))
                if len(new_population) < self.population_size:
                    new_population.append(self.mutate(child2))

            population = new_population

        # 确保返回的 best_plan 是一个完整的、经过调度计算的方案
        if best_plan is None and population_plans:
            # 如果在循环中没有更新 best_plan (例如所有方案都不可行)，则取最后一代中最好的一个
            best_plan = max(population_plans, key=self.calculate_fitness)

        return best_plan

In [123]:
def plot_gantt_chart(plan: ProductionPlan, orders_map: Dict[str, OrderMultiFeature]):
    """
    使用 Plotly 绘制生产甘特图。
    """
    if not plan or not plan.is_feasible:
        print("无法绘制甘特图：生产计划不可行或为空。")
        return

    gantt_data = []
    colors = {}
    unique_orders = list(orders_map.keys())
    # 为每个订单生成一个唯一的颜色
    for i, order_no in enumerate(unique_orders):
        r = int((i * 30 % 255 + 50) % 255)
        g = int((i * 50 % 255 + 100) % 255)
        b = int((i * 70 % 255 + 150) % 255)
        colors[order_no] = f'rgb({r},{g},{b})'

    for op_idx, op in enumerate(plan.schedules):
        for machine_id, machine_schedule in plan.schedules[op].items():
            for entry in machine_schedule.schedule:
                gantt_data.append(
                    dict(
                        Task=f"{op.value} - 机组{entry.machine_id+1}",
                        Start=entry.start_time.isoformat(),
                        Finish=entry.end_time.isoformat(),
                        Resource=entry.order_no, # 使用订单号作为资源，方便染色
                        Description=f"订单: {entry.order_no}<br>重量: {entry.order_wt}t<br>宽度: {entry.order_width}mm",
                        Operation=op.value,
                        OrderNo=entry.order_no,
                        Machine=entry.machine_id+1,
                    )
                )

    # 对数据进行排序，以便在甘特图中显示顺序
    # 优先按工序排序，然后按机组，最后按开始时间
    gantt_data.sort(key=lambda x: (
        [op.value for op in static_params.operation_sequence].index(x['Operation']),
        x['Machine'],
        x['Start']
    ))

    # 构建自定义颜色映射
    color_map = {order_no: colors[order_no] for order_no in unique_orders}

    fig = go.Figure(
        gantt=[
            go.Gantt(
                x=[d['Start'] for d in gantt_data],
                y=[d['Task'] for d in gantt_data],
                x_end=[d['Finish'] for d in gantt_data],
                marker_color=[color_map[d['OrderNo']] for d in gantt_data],
                text=[d['Description'] for d in gantt_data],
                hoverinfo='text',
                custom_data=[d['OrderNo'] for d in gantt_data], # 用于在 hover template 中显示订单号
                name = '', # 不显示图例，使用 color_map 统一管理
            )
        ]
    )

    fig.update_layout(
        title="生产计划甘特图",
        hoverlabel=dict(bgcolor="white", font_size=16, font_family="Rockwell"),
        xaxis_type="date",
        xaxis_title="时间",
        yaxis_title="工序 - 机组",
        height=800,
        margin=dict(l=150, r=50, t=80, b=80),
        legend=dict(x=1.02, y=1, xanchor="left", yanchor="top", title="订单号"),
    )

    # 添加订单号作为图例
    for order_no, color in color_map.items():
        fig.add_trace(go.Scatter(
            x=[None], y=[None],
            mode='markers',
            marker=dict(size=10, color=color),
            name=order_no,
            showlegend=True,
            hoverinfo='none',
        ))

    fig.show()

In [124]:
# 假设的 orders_obj_list 和 static_params
# 在实际运行时，你会从外部获取这些数据
orders_obj_list = [
    OrderMultiFeature(order_no="O001", delivery_date=datetime(2024, 11, 25), plan_start_date=datetime(2024, 10, 15), order_wt=50.0, order_width=1200.0, order_thick=10.0),
    OrderMultiFeature(order_no="O002", delivery_date=datetime(2024, 11, 25), plan_start_date=datetime(2024, 10, 15), order_wt=70.0, order_width=1500.0, order_thick=12.0),
    OrderMultiFeature(order_no="O003", delivery_date=datetime(2024, 11, 25), plan_start_date=datetime(2024, 10, 15), order_wt=60.0, order_width=1000.0, order_thick=8.0),
    OrderMultiFeature(order_no="O004", delivery_date=datetime(2024, 11, 25), plan_start_date=datetime(2024, 10, 15), order_wt=80.0, order_width=1300.0, order_thick=11.0),
    OrderMultiFeature(order_no="O005", delivery_date=datetime(2024, 11, 25), plan_start_date=datetime(2024, 10, 15), order_wt=40.0, order_width=1100.0, order_thick=9.0),
]
static_params = StaticParameters() # 初始化时会自动计算搭接费用矩阵

# 将 orders_obj_list 转换为以 order_no 为键的字典，方便查找
orders_map = {order.order_no: order for order in orders_obj_list}
print("开始运行遗传算法...")
ga = GeneticAlgorithm(orders_obj_list, static_params,
                      population_size=1, generations=1, # 调整种群大小和代数以获得更好的结果
                      mutation_rate=0.1, crossover_rate=0.8)

best_overall_plan = ga.run()

if best_overall_plan and best_overall_plan.is_feasible:
    print("\n找到的最优生产计划：")
    print(f"订单优先级: {best_overall_plan.order_priority}")
    print(f"总拖期 (小时): {best_overall_plan.total_tardiness:.2f}")
    print(f"总生产成本: {best_overall_plan.total_cost:.2f}")
    print("详细调度计划：")
    for op in static_params.operation_sequence:
        print(f"--- 工序: {op.value} ---")
        for machine_id in sorted(best_overall_plan.schedules[op].keys()):
            print(f"  机组 {machine_id+1}:")
            for entry in best_overall_plan.schedules[op][machine_id].schedule:
                print(f"    订单 {entry.order_no}: 开始 {entry.start_time.strftime('%Y-%m-%d %H:%M')}, 结束 {entry.end_time.strftime('%Y-%m-%d %H:%M')}")

    # 绘制甘特图
    plot_gantt_chart(best_overall_plan, orders_map)
else:
    print("\n未能找到可行的生产计划。")
    if best_overall_plan:
        print("最近一个不可行方案的库存违规信息:")
        for op, violations in best_overall_plan.stock_violations.items():
            if violations:
                print(f"  工序 {op.value} 违规:")
                for t, current, limits in violations:
                    print(f"    时间: {t.strftime('%Y-%m-%d %H:%M')}, 库存: {current:.2f}, 限制: {limits}")

开始运行遗传算法...
CA 最晚结束时间：2024-11-24 23:00:00
下游工序：Operation.CA，开始时间2024-10-15 00:00:00
AZ 最晚开始时间：2024-10-14 23:00:00
Order O001 at AZ: earliest_start (2024-10-15 00:00:00) >= latest_end (2024-10-14 23:00:00)
CA 最晚结束时间：2024-11-24 23:00:00
下游工序：Operation.CA，开始时间2024-10-15 00:00:00
AZ 最晚开始时间：2024-10-14 23:00:00
Order O001 at AZ: earliest_start (2024-10-15 00:00:00) >= latest_end (2024-10-14 23:00:00)
CA 最晚结束时间：2024-11-24 23:00:00
下游工序：Operation.CA，开始时间2024-10-15 00:00:00
AZ 最晚开始时间：2024-10-14 23:00:00
Order O001 at AZ: earliest_start (2024-10-15 00:00:00) >= latest_end (2024-10-14 23:00:00)
CA 最晚结束时间：2024-11-24 23:00:00
下游工序：Operation.CA，开始时间2024-10-15 00:00:00
AZ 最晚开始时间：2024-10-14 23:00:00
Order O003 at AZ: earliest_start (2024-10-15 00:00:00) >= latest_end (2024-10-14 23:00:00)
CA 最晚结束时间：2024-11-24 23:00:00
下游工序：Operation.CA，开始时间2024-10-15 00:00:00
AZ 最晚开始时间：2024-10-14 23:00:00
Order O002 at AZ: earliest_start (2024-10-15 00:00:00) >= latest_end (2024-10-14 23:00:00)

未能找到可行的生产计划。
最近一个不可行方案