In [1]:
%matplotlib inline
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.patches as mpatches
import random
import time
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)
import itertools
import math
from deap import base, creator, tools, algorithms

In [None]:
class DynamicJSSPSolver:
    def __init__(self, transport_time, agv_count):
        self.transport_time = self._symmetrize_transport(transport_time)#补齐运输时间
        self.agv_count = agv_count
        self.jobs = []
    
    def _symmetrize_transport(self, transport_dict):
        symmetric = transport_dict.copy()
        for (a, b), t in transport_dict.items():
            symmetric[(b, a)] = t
        return symmetric
    
    def update_production_data(self, time_matrix, machine_matrix):
        self.jobs = self._preprocess_data(time_matrix, machine_matrix)
    
    def _preprocess_data(self, time_matrix, machine_matrix):
        jobs = []
        for job_idx in range(len(time_matrix)):
            steps = []
            for step_idx in range(len(time_matrix[job_idx])):
                duration = time_matrix[job_idx][step_idx]
                machine = machine_matrix[job_idx][step_idx]
                
                if duration == -1 or machine == -1:#若没有任务
                    break
                
                steps.append({
                    'machine': machine - 1,  # 转为0-based
                    'duration': duration,
                    'transport': []
                })
            jobs.append(steps)
        
        # 添加运输任务
        for job in jobs:
            prev_loc = 'L/U'
            for i, step in enumerate(job):
                machine = f'M{step["machine"]+1}'
                
                # 前运输
                step['transport'].append({
                    'from': prev_loc,
                    'to': machine,
                    'time': self.transport_time.get((prev_loc, machine), 0)
                })
                
                # 后运输
                if i == len(job)-1:
                    step['transport'].append({
                        'from': machine,
                        'to': 'L/U',
                        'time': self.transport_time.get((machine, 'L/U'), 0)
                    })
                else:
                    next_machine = f'M{job[i+1]["machine"]+1}'
                    step['transport'].append({
                        'from': machine,
                        'to': next_machine,
                        'time': self.transport_time.get((machine, next_machine), 0)
                    })
                prev_loc = machine
        return jobs
    
    class ScheduleGene:
        def __init__(self, job_id, step_id, agv_assign):
            self.job_id = job_id
            self.step_id = step_id
            self.agv_assign = agv_assign  # [前运输AGV, 后运输AGV]
    
    class Chromosome:
        def __init__(self, genes, agv_count, jobs, transport_time):  # 新增transport_time参数
            self.genes = genes
            self.agv_count = agv_count
            self.jobs = jobs
            self.transport_time = transport_time  # 存储运输时间字典
            self._makespan = None
            self._schedule = {'machines': defaultdict(list), 'agvs': defaultdict(list)}
        
        def decode(self):
            self._schedule['machines'].clear()
            self._schedule['agvs'].clear()
            
            machine_times = defaultdict(float)
            agv_times = [0.0] * self.agv_count
            agv_positions = ['L/U'] * self.agv_count
            job_progress = defaultdict(float)
            
            for gene in self.genes:
                job_id = gene.job_id
                step_id = gene.step_id
                step = self.jobs[job_id][step_id]
                
                # ===== 前运输处理 =====
                agv_pre = gene.agv_assign[0]
                pre_trans = step['transport'][0]
                start_pos_pre = pre_trans['from']
                
                # 处理空载移动
                current_pos_pre = agv_positions[agv_pre]
                if current_pos_pre != start_pos_pre:
                    move_time_pre = self.transport_time.get((current_pos_pre, start_pos_pre), 0)
                    move_start_pre = max(agv_times[agv_pre], job_progress.get((job_id, step_id-1), 0))
                    move_end_pre = move_start_pre + move_time_pre
                    
                    # 记录空载移动任务
                    self._schedule['agvs'][agv_pre].append({
                        'start': move_start_pre,
                        'end': move_end_pre,
                        'from': current_pos_pre,
                        'to': start_pos_pre,
                        'job': job_id,
                        'step': step_id,
                        'type': 'empty'
                    })
                    
                    agv_times[agv_pre] = move_end_pre
                    agv_positions[agv_pre] = start_pos_pre
                
                # 处理前运输
                start_pre = max(
                    job_progress.get((job_id, step_id-1), 0.0),
                    agv_times[agv_pre]
                )
                end_pre = start_pre + pre_trans['time']
                
                self._schedule['agvs'][agv_pre].append({
                    'start': start_pre,
                    'end': end_pre,
                    'from': pre_trans['from'],
                    'to': pre_trans['to'],
                    'job': job_id,
                    'step': step_id,
                    'type': 'loaded'
                })
                
                agv_times[agv_pre] = end_pre
                agv_positions[agv_pre] = pre_trans['to']
                
                # ===== 加工处理 =====
                machine = f'M{step["machine"]+1}'
                process_start = max(end_pre, machine_times[machine])
                process_end = process_start + step['duration']
                machine_times[machine] = process_end
                
                self._schedule['machines'][machine].append({
                    'start': process_start,
                    'end': process_end,
                    'job': job_id,
                    'step': step_id
                })
                
                # ===== 后运输处理 =====
                agv_post = gene.agv_assign[1]
                post_trans = step['transport'][1]
                start_pos_post = post_trans['from']
                
                # 处理空载移动
                current_pos_post = agv_positions[agv_post]
                if current_pos_post != start_pos_post:
                    move_time_post = self.transport_time.get((current_pos_post, start_pos_post), 0)
                    move_start_post = max(agv_times[agv_post], process_end)
                    move_end_post = move_start_post + move_time_post
                    
                    self._schedule['agvs'][agv_post].append({
                        'start': move_start_post,
                        'end': move_end_post,
                        'from': current_pos_post,
                        'to': start_pos_post,
                        'job': job_id,
                        'step': step_id,
                        'type': 'empty'
                    })
                    
                    agv_times[agv_post] = move_end_post
                    agv_positions[agv_post] = start_pos_post
                
                # 处理后运输
                start_post = max(process_end, agv_times[agv_post])
                end_post = start_post + post_trans['time']
                
                self._schedule['agvs'][agv_post].append({
                    'start': start_post,
                    'end': end_post,
                    'from': post_trans['from'],
                    'to': post_trans['to'],
                    'job': job_id,
                    'step': step_id,
                    'type': 'loaded'
                })
                
                agv_times[agv_post] = end_post
                agv_positions[agv_post] = post_trans['to']
                
                job_progress[(job_id, step_id)] = end_post
            
            # 计算总时间
            all_ends = chain(
                (t['end'] for m in self._schedule['machines'].values() for t in m),
                (t['end'] for a in self._schedule['agvs'].values() for t in a)
            )
            self._makespan = max(all_ends, default=0)
            return self._makespan
        
        @property
        def makespan(self):
            if self._makespan is None:
                self.decode()
            return self._makespan
    
    def solve(self, pop_size=50, max_gen=100, cx_prob=0.8, mut_prob=0.2):
        self.pop_size = pop_size
        self.max_gen = max_gen
        self.cx_prob = cx_prob
        self.mut_prob = mut_prob
        self.elite_size = 2
        
        if not self.jobs:
            raise ValueError("请先使用update_production_data()加载生产数据")
        
        population = self._initialize_population()
        best = min(population, key=lambda x: x.makespan)
        history = []
        
        for gen in range(self.max_gen):
            population.sort(key=lambda x: x.makespan)
            current_best = population[0].makespan
            history.append(current_best)
            
            if current_best < best.makespan:
                best = deepcopy(population[0])
            
            elites = population[:self.elite_size]
            offspring = self._generate_offspring(population)
            population = elites + offspring[:self.pop_size - self.elite_size]
            
            #print(f"Gen {gen+1}: Best={current_best}")
        
        return best, history
    
    def _initialize_population(self):
        total_steps = sum(len(job) for job in self.jobs)
        population = []
    
        for _ in range(self.pop_size):
            queues = {job_id: deque(range(len(steps))) for job_id, steps in enumerate(self.jobs)}
            genes = []
            while len(genes) < total_steps:
                available_jobs = [j for j, q in queues.items() if q]
                job_id = random.choice(available_jobs)
                step_id = queues[job_id].popleft()
                agv_assign = [
                    random.randint(0, self.agv_count-1),
                    random.randint(0, self.agv_count-1)
                ]
                genes.append(self.ScheduleGene(job_id, step_id, agv_assign))       
            population.append(self.Chromosome(genes, self.agv_count, self.jobs, self.transport_time))  # 传递transport_time
        return population
    
    def _generate_offspring(self, population):
        offspring = []
        while len(offspring) < self.pop_size - self.elite_size:
            p1, p2 = random.sample(population[:self.pop_size//2], 2)
            
            if random.random() < self.cx_prob:
                child = self._dynamic_crossover(p1, p2)
            else:
                child = deepcopy(random.choice([p1, p2]))
            
            child = self._dynamic_mutation(child)
            offspring.append(child)
        return offspring
    
    def _dynamic_crossover(self, parent1, parent2):#交叉
        child_genes = []
        job_progress = defaultdict(int)
        ptr1, ptr2 = 0, 0
        total_steps = sum(len(job) for job in self.jobs)
        max_attempts = total_steps * 5
        
        while len(child_genes) < total_steps and max_attempts > 0:
            max_attempts -= 1
            if random.random() < 0.5:
                while ptr1 < len(parent1.genes):
                    gene = parent1.genes[ptr1]
                    ptr1 += 1
                    if job_progress[gene.job_id] == gene.step_id:
                        child_genes.append(gene)
                        job_progress[gene.job_id] += 1
                        break
            else:
                while ptr2 < len(parent2.genes):
                    gene = parent2.genes[ptr2]
                    ptr2 += 1
                    if job_progress[gene.job_id] == gene.step_id:
                        child_genes.append(gene)
                        job_progress[gene.job_id] += 1
                        break
        
        try:
            if len(child_genes) < total_steps:
                child_genes += self._generate_random_genes(total_steps - len(child_genes))
        except RuntimeError as e:
            print(f"交叉操作失败: {str(e)}")
            return random.choice([parent1, parent2])
    
        return self.Chromosome(child_genes, self.agv_count, self.jobs, self.transport_time)  # 传递transport_time
    
    def _generate_random_genes(self, num):
        genes = []
        job_progress = defaultdict(int)
        while len(genes) < num:
            available = [j for j in range(len(self.jobs)) 
                        if job_progress[j] < len(self.jobs[j])]
            if not available:
                raise RuntimeError("无法生成足够的基因，请检查生产数据是否正确")
            job = random.choice(available)
            step = job_progress[job]
            agv_assign = [
                random.randint(0, self.agv_count-1),
                random.randint(0, self.agv_count-1)
            ]
            genes.append(self.ScheduleGene(job, step, agv_assign))
            job_progress[job] += 1
        return genes
    
    def _dynamic_mutation(self, chrom):
        new_genes = deepcopy(chrom.genes)
    
        if random.random() < self.mut_prob:
            swap_candidates = []
            for i in range(len(new_genes)):
                current_job = new_genes[i].job_id
                if i == 0 or new_genes[i-1].job_id != current_job:
                    swap_candidates.append(i)
        
            if len(swap_candidates) >= 2:
                idx1, idx2 = random.sample(swap_candidates, 2)
                new_genes[idx1], new_genes[idx2] = new_genes[idx2], new_genes[idx1]
    
        for gene in new_genes:
            if random.random() < 0.2:
                if random.random() < 0.5:
                    gene.agv_assign[0] = random.randint(0, self.agv_count-1)
                else:
                    gene.agv_assign[1] = random.randint(0, self.agv_count-1)
    
        return self.Chromosome(new_genes, self.agv_count, self.jobs, self.transport_time)  # 传递transport_time
    
    #可视化
    def visualize(self, best_chrom, history):
        #折线图
        plt.figure(figsize=(10, 6))
        plt.plot(history, 'b-', linewidth=1.5)
        plt.title("Optimization Process", fontsize=14)
        plt.xlabel("Generation", fontsize=12)
        plt.ylabel("Makespan", fontsize=12)
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()

        #甘特图
        #plt.figure(figsize=(16, 8))
        plt.figure(figsize=(30, 15))
        best_chrom.decode()
        schedule = best_chrom._schedule
        colors = plt.cm.tab20.colors
        
        MACHINE_ROW_HEIGHT = 3
        AGV_ROW_HEIGHT = 3
        FONT_SIZE = 9
        MIN_DURATION_FOR_TEXT = 0

        y_ticks = []
        y_labels = []
        #for idx, (machine, tasks) in enumerate(sorted(schedule['machines'].items())):
        for idx, (machine, tasks) in enumerate(
            sorted(schedule['machines'].items(), key=lambda x: int(''.join(filter(str.isdigit, x[0]))))
        ):
            y = idx * 3
            for task in tasks:
                plt.barh(y=y, 
                         width=task['end'] - task['start'],
                         left=task['start'],
                         height=2,
                         color=colors[task['job'] % 20],
                         edgecolor='black')
                plt.text((task['start'] + task['end']) / 2, y,
                         f"J{task['job']+1}-S{task['step']+1}",
                         ha='center', va='center', fontsize=8)
            y_ticks.append(y)
            y_labels.append(f'Machine {machine}')

        agv_base_y = len(schedule['machines']) *  MACHINE_ROW_HEIGHT + 2
        for idx, (agv, tasks) in enumerate(schedule['agvs'].items()):
            y = agv_base_y + idx * AGV_ROW_HEIGHT
            for task in tasks:
                color = colors[task['job'] % 20] if task['type'] == 'loaded' else 'lightgray'
                alpha = 0.5 if task['type'] == 'loaded' else 0.3
                label = f"{task['from']}→{task['to']}" if task['type'] == 'loaded' else f"E {task['from']}→{task['to']}"
                
                plt.barh(y=y,
                         width=task['end'] - task['start'],
                         left=task['start'],
                         height=2,
                         color=color,
                         alpha=1,
                         edgecolor='black')
                
                if (task['end'] - task['start']) >= MIN_DURATION_FOR_TEXT:
                    plt.text((task['start'] + task['end'])/2, y + AGV_ROW_HEIGHT/2,
                             label,
                             ha='center', 
                             va='center',
                             fontsize=FONT_SIZE-1,
                             rotation=45 ,#if (task['end'] - task['start']) < 5 else 0,
                             color='darkblue')
                    
            y_ticks.append(y + AGV_ROW_HEIGHT/2)
            y_labels.append(f'AGV{agv+1}')

        plt.yticks(y_ticks, y_labels)
        plt.xlabel('Time (minutes)', fontsize=12)
        plt.title('Scheduling Gantt Chart', fontsize=14)
        plt.grid(axis='x', linestyle='--', alpha=0.7)

        patches = [mpatches.Patch(color=colors[i%20], label=f'Job {i+1}') 
                  for i in range(len(self.jobs))]
        plt.legend(handles=patches,
                   bbox_to_anchor=(1.15, 1),
                   loc='upper left',
                   borderaxespad=0.,
                   title="Job Legend")

        plt.subplots_adjust(left=0.25, right=0.8, hspace=0.4)
        plt.show()

# 运行
if __name__ == "__main__":
    solver = DynamicJSSPSolver(
        transport_time=TRANSPORT_TIME,
        agv_count=3
    )
    
    solver.update_production_data(time_matrix, machine_matrix)
    best_solution, history = solver.solve(pop_size=50, max_gen=500)
    print(f"最优完成时间: {best_solution.makespan} 分钟")
    solver.visualize(best_solution, history)

In [None]:
class DynamicJSSPSolver:
    def __init__(self, transport_time, agv_count):
        self.transport_time = self._symmetrize_transport(transport_time)
        self.agv_count = agv_count
        self.jobs = []
    
    def _symmetrize_transport(self, transport_dict):
        symmetric = transport_dict.copy()
        for (a, b), t in transport_dict.items():
            symmetric[(b, a)] = t
        return symmetric
    
    def update_production_data(self, time_matrix, machine_matrix):
        self.jobs = self._preprocess_data(time_matrix, machine_matrix)
    
    def _preprocess_data(self, time_matrix, machine_matrix):
        jobs = []
        for job_idx in range(len(time_matrix)):
            steps = []
            for step_idx in range(len(time_matrix[job_idx])):
                duration = time_matrix[job_idx][step_idx]
                machine = machine_matrix[job_idx][step_idx]
                
                if duration == -1 or machine == -1:
                    break
                
                steps.append({
                    'machine': machine - 1,
                    'duration': duration,
                    'transport': []
                })
            jobs.append(steps)
        
        # 添加运输任务
        for job in jobs:
            prev_loc = 'L/U'
            for i, step in enumerate(job):
                machine = f'M{step["machine"]+1}'
                
                # 前运输
                step['transport'].append({
                    'from': prev_loc,
                    'to': machine,
                    'time': self.transport_time.get((prev_loc, machine), 0)
                })
                
                # 后运输
                if i == len(job)-1:
                    step['transport'].append({
                        'from': machine,
                        'to': 'L/U',
                        'time': self.transport_time.get((machine, 'L/U'), 0)
                    })
                else:
                    next_machine = f'M{job[i+1]["machine"]+1}'
                    step['transport'].append({
                        'from': machine,
                        'to': next_machine,
                        'time': self.transport_time.get((machine, next_machine), 0)
                    })
                prev_loc = machine
        return jobs
    
    class ScheduleGene:
        def __init__(self, job_id, step_id, agv_assign):
            self.job_id = job_id
            self.step_id = step_id
            self.agv_assign = agv_assign  # [前运输AGV, 后运输AGV]
    
    class Chromosome:
        def __init__(self, genes, agv_count, jobs, transport_time):
            self.genes = genes
            self.agv_count = agv_count
            self.jobs = jobs
            self.transport_time = transport_time
            self._makespan = None
            self._schedule = {'machines': defaultdict(list), 'agvs': defaultdict(list)}
        
        def decode(self):
            self._schedule['machines'].clear()
            self._schedule['agvs'].clear()
            
            machine_times = defaultdict(float)
            agv_times = [0.0] * self.agv_count
            agv_positions = ['L/U'] * self.agv_count
            job_progress = defaultdict(float)
            
            for gene in self.genes:
                job_id = gene.job_id
                step_id = gene.step_id
                step = self.jobs[job_id][step_id]
                
                # ===== 前运输处理 =====
                agv_pre = gene.agv_assign[0]
                pre_trans = step['transport'][0]
                start_pos_pre = pre_trans['from']
                
                # 处理空载移动
                current_pos_pre = agv_positions[agv_pre]
                if current_pos_pre != start_pos_pre:
                    move_time_pre = self.transport_time.get((current_pos_pre, start_pos_pre), 0)
                    move_start_pre = max(agv_times[agv_pre], job_progress.get((job_id, step_id-1), 0))
                    move_end_pre = move_start_pre + move_time_pre
                    
                    self._schedule['agvs'][agv_pre].append({
                        'start': move_start_pre,
                        'end': move_end_pre,
                        'from': current_pos_pre,
                        'to': start_pos_pre,
                        'job': job_id,
                        'step': step_id,
                        'type': 'empty'
                    })
                    
                    agv_times[agv_pre] = move_end_pre
                    agv_positions[agv_pre] = start_pos_pre
                
                # 处理前运输
                start_pre = max(
                    job_progress.get((job_id, step_id-1), 0.0),
                    agv_times[agv_pre]
                )
                end_pre = start_pre + pre_trans['time']
                
                self._schedule['agvs'][agv_pre].append({
                    'start': start_pre,
                    'end': end_pre,
                    'from': pre_trans['from'],
                    'to': pre_trans['to'],
                    'job': job_id,
                    'step': step_id,
                    'type': 'loaded'
                })
                
                agv_times[agv_pre] = end_pre
                agv_positions[agv_pre] = pre_trans['to']
                
                # ===== 加工处理 =====
                machine = f'M{step["machine"]+1}'
                process_start = max(end_pre, machine_times[machine])
                process_end = process_start + step['duration']
                machine_times[machine] = process_end
                
                self._schedule['machines'][machine].append({
                    'start': process_start,
                    'end': process_end,
                    'job': job_id,
                    'step': step_id
                })
                
                # ===== 后运输处理 =====
                agv_post = gene.agv_assign[1]
                post_trans = step['transport'][1]
                start_pos_post = post_trans['from']
                
                # 处理空载移动
                current_pos_post = agv_positions[agv_post]
                if current_pos_post != start_pos_post:
                    move_time_post = self.transport_time.get((current_pos_post, start_pos_post), 0)
                    move_start_post = max(agv_times[agv_post], process_end)
                    move_end_post = move_start_post + move_time_post
                    
                    self._schedule['agvs'][agv_post].append({
                        'start': move_start_post,
                        'end': move_end_post,
                        'from': current_pos_post,
                        'to': start_pos_post,
                        'job': job_id,
                        'step': step_id,
                        'type': 'empty'
                    })
                    
                    agv_times[agv_post] = move_end_post
                    agv_positions[agv_post] = start_pos_post
                
                # 处理后运输
                start_post = max(process_end, agv_times[agv_post])
                end_post = start_post + post_trans['time']
                
                self._schedule['agvs'][agv_post].append({
                    'start': start_post,
                    'end': end_post,
                    'from': post_trans['from'],
                    'to': post_trans['to'],
                    'job': job_id,
                    'step': step_id,
                    'type': 'loaded'
                })
                
                agv_times[agv_post] = end_post
                agv_positions[agv_post] = post_trans['to']
                
                job_progress[(job_id, step_id)] = end_post
            
            all_ends = chain(
                (t['end'] for m in self._schedule['machines'].values() for t in m),
                (t['end'] for a in self._schedule['agvs'].values() for t in a)
            )
            self._makespan = max(all_ends, default=0)
            return self._makespan
        
        @property
        def makespan(self):
            if self._makespan is None:
                self.decode()
            return self._makespan
    
    def solve(self, pop_size=50, max_gen=100, cx_prob=0.8, mut_prob=0.2):
        self.pop_size = pop_size
        self.max_gen = max_gen
        self.cx_prob = cx_prob
        self.mut_prob = mut_prob
        self.elite_size = 2
        
        if not self.jobs:
            raise ValueError("请先使用update_production_data()加载生产数据")
        
        population = self._initialize_population()
        best = min(population, key=lambda x: x.makespan)
        history = []
        
        for gen in range(self.max_gen):
            population.sort(key=lambda x: x.makespan)
            current_best = population[0].makespan
            history.append(current_best)
            
            if current_best < best.makespan:
                best = deepcopy(population[0])
            
            elites = population[:self.elite_size]
            offspring = self._generate_offspring(population)
            population = elites + offspring[:self.pop_size - self.elite_size]
        
        return best, history
    
    def _initialize_population(self):#初期解
        total_steps = sum(len(job) for job in self.jobs)
        population = []
    
        for _ in range(self.pop_size):
            queues = {job_id: deque(range(len(steps))) for job_id, steps in enumerate(self.jobs)}
            genes = []
            while len(genes) < total_steps:
                available_jobs = [j for j, q in queues.items() if q]
                job_id = random.choice(available_jobs)
                step_id = queues[job_id].popleft()
                agv_assign = [
                    random.randint(0, self.agv_count-1),
                    random.randint(0, self.agv_count-1)
                ]
                genes.append(self.ScheduleGene(job_id, step_id, agv_assign))       
            population.append(self.Chromosome(genes, self.agv_count, self.jobs, self.transport_time))  # 传递transport_time
        return population
    
    def _generate_offspring(self, population):
        offspring = []
        while len(offspring) < self.pop_size - self.elite_size:
            p1, p2 = random.sample(population[:self.pop_size//2], 2)
            
            if random.random() < self.cx_prob:
                child = self._dynamic_crossover(p1, p2)
            else:
                child = deepcopy(random.choice([p1, p2]))
            
            child = self._dynamic_mutation(child)
            offspring.append(child)
        return offspring
    
    def _dynamic_crossover(self, parent1, parent2):#新交叉
        jobs = list(range(len(self.jobs)))
        random.shuffle(jobs)
        split = random.randint(1, len(jobs)-1)
        J1 = set(jobs[:split])
        J2 = set(jobs[split:])
        
        child_genes = []
        job_steps = defaultdict(list)
        
        # 收集父代1中J1作业的基因
        for gene in parent1.genes:
            if gene.job_id in J1:
                job_steps[gene.job_id].append(gene)
        
        # 收集父代2中J2作业的基因
        for gene in parent2.genes:
            if gene.job_id in J2:
                job_steps[gene.job_id].append(gene)
        
        # 按作业顺序合并基因
        for job_id in range(len(self.jobs)):
            if job_id in J1:
                child_genes.extend([g for g in parent1.genes if g.job_id == job_id])
            else:
                child_genes.extend([g for g in parent2.genes if g.job_id == job_id])
        
        return self.Chromosome(child_genes, self.agv_count, self.jobs, self.transport_time)
    
    def _dynamic_mutation(self, chrom):
        new_genes = deepcopy(chrom.genes)
        agv_times = [0.0] * self.agv_count
        agv_positions = ['L/U'] * self.agv_count
        job_progress = defaultdict(float)
        machine_times = defaultdict(float)
        
        # 预解码以获取AGV状态
        for gene in new_genes:
            job_id = gene.job_id
            step_id = gene.step_id
            step = self.jobs[job_id][step_id]
            pre_agv, post_agv = gene.agv_assign
            
            # 前运输处理
            pre_trans = step['transport'][0]
            if agv_positions[pre_agv] != pre_trans['from']:
                move_time = self.transport_time.get((agv_positions[pre_agv], pre_trans['from']), 0)
                move_start = max(agv_times[pre_agv], job_progress.get((job_id, step_id-1), 0.0))
                agv_times[pre_agv] = move_start + move_time
                agv_positions[pre_agv] = pre_trans['from']
            transport_start = max(agv_times[pre_agv], job_progress.get((job_id, step_id-1), 0.0))
            agv_times[pre_agv] = transport_start + pre_trans['time']
            agv_positions[pre_agv] = pre_trans['to']
            
            # 加工时间
            machine = f'M{step["machine"]+1}'
            process_start = max(agv_times[pre_agv], machine_times[machine])
            process_end = process_start + step['duration']
            machine_times[machine] = process_end
            
            # 后运输处理
            post_trans = step['transport'][1]
            if agv_positions[post_agv] != post_trans['from']:
                move_time = self.transport_time.get((agv_positions[post_agv], post_trans['from']), 0)
                move_start = max(agv_times[post_agv], process_end)
                agv_times[post_agv] = move_start + move_time
                agv_positions[post_agv] = post_trans['from']
            transport_start = max(agv_times[post_agv], process_end)
            agv_times[post_agv] = transport_start + post_trans['time']
            agv_positions[post_agv] = post_trans['to']
            
            job_progress[(job_id, step_id)] = agv_times[post_agv]
        
        # 变异操作
        for gene in new_genes:
            if random.random() < self.mut_prob:
                job_id = gene.job_id
                step_id = gene.step_id
                step = self.jobs[job_id][step_id]
                
                # 随机选择变异前或后运输
                if random.choice([True, False]):
                    trans = step['transport'][0]
                    candidates = []
                    for agv in range(self.agv_count):
                        move_time = self.transport_time.get((agv_positions[agv], trans['from']), 0)
                        prev_end = job_progress.get((job_id, step_id-1), 0.0)
                        arrival = max(agv_times[agv], prev_end) + move_time + trans['time']
                        candidates.append((agv, arrival))
                    best_agv = min(candidates, key=lambda x: x[1])[0]
                    gene.agv_assign[0] = best_agv
                    
                else:
                    trans = step['transport'][1]
                    candidates = []
                    for agv in range(self.agv_count):
                        move_time = self.transport_time.get((agv_positions[agv], trans['from']), 0)
                        process_end = machine_times[f'M{step["machine"]+1}']
                        arrival = max(agv_times[agv], process_end) + move_time + trans['time']
                        candidates.append((agv, arrival))
                    best_agv = min(candidates, key=lambda x: x[1])[0]
                    gene.agv_assign[1] = best_agv
            
        return self.Chromosome(new_genes, self.agv_count, self.jobs, self.transport_time)
            
    
    def visualize(self, best_chrom, history):
        plt.figure(figsize=(10, 6))
        plt.plot(history, 'b-', linewidth=1.5)
        plt.title("Optimization Process", fontsize=14)
        plt.xlabel("Generation", fontsize=12)
        plt.ylabel("Makespan", fontsize=12)
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.tight_layout()
        plt.show()

        plt.figure(figsize=(30, 15))
        best_chrom.decode()
        schedule = best_chrom._schedule
        colors = plt.cm.tab20.colors
        
        MACHINE_ROW_HEIGHT = 3
        AGV_ROW_HEIGHT = 3
        FONT_SIZE = 9

        y_ticks = []
        y_labels = []
        for idx, (machine, tasks) in enumerate(
            sorted(schedule['machines'].items(), key=lambda x: int(''.join(filter(str.isdigit, x[0]))))
        ):
            y = idx * 3
            for task in tasks:
                plt.barh(y=y, 
                         width=task['end'] - task['start'],
                         left=task['start'],
                         height=2,
                         color=colors[task['job'] % 20],
                         edgecolor='black')
                plt.text((task['start'] + task['end']) / 2, y,
                         f"J{task['job']+1}-S{task['step']+1}",
                         ha='center', va='center', fontsize=8)
            y_ticks.append(y)
            y_labels.append(f'Machine {machine}')

        agv_base_y = len(schedule['machines']) *  MACHINE_ROW_HEIGHT + 2
        for idx, (agv, tasks) in enumerate(schedule['agvs'].items()):
            y = agv_base_y + idx * AGV_ROW_HEIGHT
            for task in tasks:
                color = colors[task['job'] % 20] if task['type'] == 'loaded' else 'lightgray'
                label = f"{task['from']}→{task['to']}" if task['type'] == 'loaded' else f"E {task['from']}→{task['to']}"
                
                plt.barh(y=y,
                         width=task['end'] - task['start'],
                         left=task['start'],
                         height=2,
                         color=color,
                         edgecolor='black')
                
                plt.text((task['start'] + task['end'])/2, y + AGV_ROW_HEIGHT/2,
                         label,
                         ha='center', 
                         va='center',
                         fontsize=FONT_SIZE-1,
                         rotation=45,
                         color='darkblue')
                    
            y_ticks.append(y + AGV_ROW_HEIGHT/2)
            y_labels.append(f'AGV{agv+1}')

        plt.yticks(y_ticks, y_labels)
        plt.xlabel('Time (minutes)', fontsize=12)
        plt.title('Scheduling Gantt Chart', fontsize=14)
        plt.grid(axis='x', linestyle='--', alpha=0.7)

        patches = [mpatches.Patch(color=colors[i%20], label=f'Job {i+1}') 
                  for i in range(len(self.jobs))]
        plt.legend(handles=patches,
                   bbox_to_anchor=(1.15, 1),
                   loc='upper left',
                   borderaxespad=0.,
                   title="Job Legend")

        plt.subplots_adjust(left=0.25, right=0.8, hspace=0.4)
        plt.show()
        
# 运行
if __name__ == "__main__":
    solver = DynamicJSSPSolver(
        transport_time=TRANSPORT_TIME,
        agv_count=3
    )
    
    solver.update_production_data(time_matrix, machine_matrix)
    best_solution, history = solver.solve(pop_size=50, max_gen=1000)
    print(f"最优完成时间: {best_solution.makespan} 分钟")
    solver.visualize(best_solution, history)

In [1]:
import numpy as np
import pandas as pd
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)
import random
import matplotlib.pyplot as plt

seed_value=3
random.seed(seed_value)

distribution="normal Printed Circuit Board" # 正規分布（多層回路基盤を作る）

#要素
#注文
odder_mean = 20  # オーダー数平均値
odder_std_dev = 5  # オーダー数標準偏差
odder_size = 1000  # オーダー数样本サイズ

lot_mean = 7  # 注文された製品のロット数平均値
lot_std_dev = 3  # 注文された製品のロット数標準偏差
lot_size = 1000  # 注文された製品のロット数样本サイズ

#機械加工工程設定
m_low=1 #最小機械番号
m_high=14 #最大機械番号(m_high-1)

M1_t_unit = 0.5  # M1の1単位操作時間（銅箔付きの絶縁板1を取る）
M2_t_unit = 0.5  # M2の1単位操作時間（銅箔付きの絶縁板2を取る）
M3_t_unit = 2  # M3の1単位操作時間（表面の汚れや油分を除去し）
M4_t_unit = 8  # M4の1単位操作時間（パターン転写、フォトリソグラフィ方式）
M5_t_unit = 7  # M5の1単位操作時間（パターン転写、ダイレクト印刷方式）
M6_t_base = 250  # M6のベース操作時間（エッチングとレジスト除去）
M7_t_base = 600  # M7のベース操作時間（積層）
M8_t_unit = 3 # M8の1単位操作時間（ドリル加工）
M9_t_base = 750  # M9のベース操作時間（スルーホールめっき）
M10_t_unit = 5 # M10の1単位操作時間（ソルダーレジスト塗布）
M11_t_unit = 4 # M11の1単位操作時間（表面処理）
M12_t_unit = 2 # M12の1単位操作時間（電気検査）
M13_t_unit = 1 # M13の1単位操作時間（包装）

#加工機械順番
MS_J=np.array([[1,3,4,6,7,8,9,10,11,12,13,0,0,0,0,0,0],#1層
               [2,3,5,6,7,8,9,10,11,12,13,0,0,0,0,0,0],#1層
               [1,3,4,6,5,6,7,8,9,10,11,12,13,0,0,0,0],#2層
               [2,3,5,6,4,6,7,8,9,10,11,12,13,0,0,0,0],#2層
               [1,3,5,6,4,6,7,8,9,4,6,10,11,12,13,0,0],#3層
               [2,3,5,6,4,6,7,8,9,4,6,5,6,10,11,12,13]])#4層

p_number_max=17 #すべての種類の製品に応じて最大プロセス数
count_type=6 #製品の種類数

odder_data = np.round(np.random.normal(odder_mean, odder_std_dev, odder_size))
odder_data = odder_data[odder_data > 0]  # マイナスやゼロを除外

print(odder_data)

[11. 14. 20. 13. 23. 34. 13. 24. 27. 24. 16. 24. 19. 30. 27. 19. 20. 17.
 20. 16. 23. 15. 25. 17. 28. 20. 16. 28. 28. 22. 19. 14. 17. 19. 20. 24.
 14. 13. 15. 20. 22. 20. 23. 19. 16. 23. 21. 15. 18. 20. 15. 15. 19. 23.
 21. 24. 19. 27. 24. 30. 18. 33. 16. 23. 19. 15. 22. 24. 16. 18. 14. 34.
 14. 17. 20. 23. 26. 18. 11. 22. 21. 24. 15. 17. 14. 22. 15. 17. 16. 17.
 22. 20. 21. 31. 18. 27. 26. 23. 21. 21. 14. 26. 21. 16. 26. 16. 15. 28.
 12. 20. 14. 16. 17. 29. 21. 22. 16. 21. 22. 19. 19. 11. 23. 18. 20. 11.
 25. 21. 22. 12. 19. 16. 26. 15. 22. 14. 20. 13. 12. 21. 18. 21. 22. 20.
 16. 20. 27. 18. 13.  9. 17. 21. 20. 19. 33. 19. 19. 21. 27. 12. 20. 13.
 19. 20. 21. 22. 26. 23. 13. 20. 18. 26. 22. 24. 19. 19. 22. 25. 15. 29.
 10. 26. 30. 18. 14. 20. 22. 21. 17. 26. 17. 19. 18. 12. 16. 31. 25. 24.
 15. 25. 19. 25. 12. 18. 17. 23. 19. 13. 21. 21. 18. 16. 20. 19. 20. 14.
 18. 21. 17. 16. 18. 15. 22. 25. 22. 19. 22. 10. 18. 18. 21. 23. 17.  6.
  9. 26. 20. 21. 24. 24. 19. 19. 21. 22. 19. 21. 18