In [1]:
from docplex.mp.model import Model
import math

def calculate_C(p):
    """计算生产期限上界C"""
    max_p = max([p[i][j] for i in range(len(p)) for j in range(len(p[0]))])
    sum_max = sum([max([p[i][j] for i in range(len(p))]) for j in range(len(p[0]))])
    return max_p + sum_max

def main_scenario_peak():
    # 初始化模型
    mdl = Model('FlowShop_With_Peak')
    
    # ========== 参数定义 ==========
    n = 5                   # 工件数
    m = 3                   # 机器数
    r = [10, 6, 8]          # 机器能耗(kWh/min)
    p = [                   # 加工时间(分钟)
        [20, 32, 14],
        [33, 34, 20],
        [45, 14, 30],
        [63, 22, 16],
        [38, 15, 35]
    ]
    beta = 1.75             # 生产紧急系数
    C = calculate_C(p)
    T = beta * C            # 生产期限
    print(f"生产期限T={T}分钟 ({T/60:.2f}小时)")

    # ========== 分时电价定义（含尖峰时段） ==========
    time_slots = [
        # 平峰时段 7:00-10:00 (0-180分钟)
        {'start': 0,    'end': 180,  'price': 0.7181, 'type': '平峰'},
        
        # 高峰时段 10:00-11:00 (180-240分钟)
        {'start': 180,  'end': 240,  'price': 1.2238, 'type': '高峰'},
        
        # 尖峰时段 11:00-13:00 (240-360分钟)
        {'start': 240,  'end': 360,  'price': 1.3472, 'type': '尖峰'},
        
        # 高峰时段 13:00-15:00 (360-480分钟)
        {'start': 360,  'end': 480,  'price': 1.2238, 'type': '高峰'},
        
        # 平峰时段 15:00-16:00 (480-540分钟)
        {'start': 480,  'end': 540,  'price': 0.7181, 'type': '平峰'},
        
        # 尖峰时段 16:00-17:00 (540-600分钟)
        {'start': 540,  'end': 600,  'price': 1.3472, 'type': '尖峰'},
        
        # 平峰时段 17:00-18:00 (600-660分钟)
        {'start': 600,  'end': 660,  'price': 0.7181, 'type': '平峰'},
        
        # 高峰时段 18:00-21:00 (660-840分钟)
        {'start': 660,  'end': 840,  'price': 1.2238, 'type': '高峰'},
        
        # 平峰时段 21:00-23:00 (840-960分钟)
        {'start': 840,  'end': 960,  'price': 0.7181, 'type': '平峰'},
        
        # 低谷时段 23:00-7:00 (960-1440分钟)
        {'start': 960,  'end': 1440, 'price': 0.2417, 'type': '低谷'}
    ]
    
    # 筛选有效时段（仅保留在T内的部分）
    valid_slots = []
    for slot in time_slots:
        slot_start = slot['start']
        slot_end = min(slot['end'], T)
        if slot_start < slot_end:
            valid_slots.append({
                'start': slot_start,
                'end': slot_end,
                'price': slot['price'],
                'type': slot['type']
            })
    
    print("\n有效电价时段:")
    for idx, s in enumerate(valid_slots):
        print(f"时段{idx}: {s['start']/60:.1f}h-{s['end']/60:.1f}h {s['type']} 价格={s['price']}元/kWh")

    # ========== 决策变量 ==========
    # 1. 顺序变量（工件i是否在工件j之前）
    y = {(i, j): mdl.binary_var(name=f'y_{i}_{j}') for i in range(n) for j in range(n) if i != j}
    
    # 2. 时间变量（工件i在机器j上的开始和完成时间）
    s = {(i, j): mdl.continuous_var(lb=0, ub=T, name=f's_{i}_{j}') for i in range(n) for j in range(m)}
    c = {(i, j): mdl.continuous_var(lb=0, ub=T, name=f'c_{i}_{j}') for i in range(n) for j in range(m)}
    
    # 3. 时段加工时间（机器j在时段k的加工时长）
    t = {(j, k): mdl.continuous_var(lb=0, name=f't_{j}_{k}') for j in range(m) for k in range(len(valid_slots))}
    
    # 4. 尖峰时段使用标志（可选，用于分析）
    peak_usage = {(j, k): mdl.binary_var(name=f'peak_{j}_{k}') 
                 for j in range(m) for k, slot in enumerate(valid_slots) if slot['type'] == '尖峰'}

    # ========== 约束条件 ==========
    # 1. 唯一顺序约束
    for i in range(n):
        for j in range(n):
            if i < j:
                mdl.add_constraint(y[i, j] + y[j, i] == 1, ctname=f'order_{i}_{j}')
    
    # 2. 同一机器顺序约束
    M = T * 2  # 大M值
    for j in range(m):
        for i in range(n):
            for ip in range(n):
                if i != ip:
                    mdl.add_constraint(
                        s[ip, j] >= c[i, j] - M*(1 - y[i, ip]),
                        ctname=f'machine_{j}_order_{i}_{ip}'
                    )
    
    # 3. 跨机器顺序约束
    for i in range(n):
        for j in range(m-1):
            mdl.add_constraint(s[i, j+1] >= c[i, j], ctname=f'cross_{i}_{j}')
    
    # 4. 加工时间约束
    for i in range(n):
        for j in range(m):
            mdl.add_constraint(c[i, j] == s[i, j] + p[i][j], ctname=f'process_{i}_{j}')
    
    # 5. 生产期限约束
    for i in range(n):
        mdl.add_constraint(c[i, m-1] <= T, ctname=f'deadline_{i}')
    
    # 6. 分时电价时段加工时间计算
    for j in range(m):
        for k, slot in enumerate(valid_slots):
            slot_start = slot['start']
            slot_end = slot['end']
            
            # 计算机器j在时段k的总加工时间
            total_time = 0
            for i in range(n):
                # 计算工件i在机器j的加工时间段与当前时段的交集
                overlap_start = mdl.max(s[i, j], slot_start)
                overlap_end = mdl.min(c[i, j], slot_end)
                duration = mdl.max(0, overlap_end - overlap_start)
                total_time += duration
            
            mdl.add_constraint(t[j, k] == total_time, ctname=f'time_{j}_{k}')
            
            # 尖峰时段使用标志约束
            if slot['type'] == '尖峰':
                mdl.add_constraint(peak_usage[j, k] >= t[j, k]/M)
                mdl.add_constraint(peak_usage[j, k] <= t[j, k])

    # ========== 目标函数 ==========
    # 基础电费成本
    base_cost = sum(r[j] * t[j, k] * slot['price'] 
                   for j in range(m) for k, slot in enumerate(valid_slots))
    
    # 尖峰时段惩罚项（可选）
    peak_penalty = 10 * sum(peak_usage[j, k] 
                          for j in range(m) for k, slot in enumerate(valid_slots) 
                          if slot['type'] == '尖峰')
    
    mdl.minimize(base_cost + peak_penalty)

    # ========== 模型求解 ==========
    solution = mdl.solve(log_output=True)
    
    # ========== 结果输出 ==========
    if solution:
        # 辅助函数：分钟转时间字符串
        def minutes_to_time(mins, start_hour=7):
            total_mins = int(round(mins))
            hours = (start_hour + total_mins // 60) % 24
            minutes = total_mins % 60
            return f"{hours:02d}:{minutes:02d}"
        
        print("\n=== 最优解 ===")
        print(f"总用电成本: {solution.get_objective_value():.2f} 元")
        
        # 统计尖峰时段使用情况
        peak_usage_total = sum(solution.get_value(t[j, k]) 
                             for j in range(m) for k, slot in enumerate(valid_slots) 
                             if slot['type'] == '尖峰')
        print(f"尖峰时段总加工时间: {peak_usage_total:.1f} 分钟")
        
        # 获取工件顺序 - 拓扑排序
        precedence = {i: [] for i in range(n)}
        for i in range(n):
            for j in range(n):
                if i != j and solution.get_value(y[i, j]) > 0.5:
                    precedence[i].append(j)
        
        in_degree = {i: 0 for i in range(n)}
        for i in precedence:
            for j in precedence[i]:
                in_degree[j] += 1
        
        queue = [i for i in range(n) if in_degree[i] == 0]
        seq_order = []
        while queue:
            node = queue.pop(0)
            seq_order.append(node+1)  # 转换为1-based编号
            for neighbor in precedence[node]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        print("\n最优工件顺序:", " → ".join([f"J{x}" for x in seq_order]))
        
        print("\n=== 详细调度方案 ===")
        printed_jobs = set()
        for job in seq_order:
            i = job - 1
            if i in printed_jobs:
                continue
            printed_jobs.add(i)
            
            print(f"\n工件{job}:")
            for j in range(m):
                start = solution.get_value(s[i, j])
                end = solution.get_value(c[i, j])
                print(f"  机器{j+1}: {minutes_to_time(start)}-{minutes_to_time(end)}")
                
                # 计算各时段加工详情
                print("    时段详情:")
                for k, slot in enumerate(valid_slots):
                    slot_start = slot['start']
                    slot_end = slot['end']
                    overlap_start = max(start, slot_start)
                    overlap_end = min(end, slot_end)
                    duration = max(0, overlap_end - overlap_start)
                    
                    if duration > 0.1:  # 忽略极小值
                        cost = r[j] * duration * slot['price']
                        print(f"    - {slot['type']} {minutes_to_time(overlap_start)}-{minutes_to_time(overlap_end)}: "
                              f"{duration:.1f}分钟, 电价={slot['price']}元/kWh, 电费={cost:.2f}元")
    else:
        print("未找到可行解")

if __name__ == "__main__":
    main_scenario_peak()

生产期限T=341.25分钟 (5.69小时)

有效电价时段:
时段0: 0.0h-3.0h 平峰 价格=0.7181元/kWh
时段1: 3.0h-4.0h 高峰 价格=1.2238元/kWh
时段2: 4.0h-5.7h 尖峰 价格=1.3472元/kWh
Version identifier: 22.1.2.0 | 2024-12-09 | 8bd2200c8
CPXPARAM_Read_DataCheck                          1
Tried aggregator 2 times.
MIP Presolve eliminated 436 rows and 181 columns.
MIP Presolve modified 120 coefficients.
Aggregator did 323 substitutions.
Reduced MIP has 206 rows, 373 columns, and 656 nonzeros.
Reduced MIP has 128 binaries, 0 generals, 0 SOSs, and 230 indicators.
Presolve time = 0.00 sec. (1.73 ticks)
Probing fixed 0 vars, tightened 230 bounds.
Probing time = 0.00 sec. (0.51 ticks)
Tried aggregator 2 times.
Detecting symmetries...
MIP Presolve eliminated 0 rows and 20 columns.
MIP Presolve modified 18 coefficients.
Aggregator did 10 substitutions.
Reduced MIP has 197 rows, 343 columns, and 628 nonzeros.
Reduced MIP has 118 binaries, 0 generals, 0 SOSs, and 209 indicators.
Presolve time = 0.02 sec. (0.69 ticks)
Probing fixed 0 vars, tightene

In [2]:
import numpy as np
import random
from datetime import datetime, timedelta
from collections import defaultdict

# 分时电价时段定义（从7:00开始，单位：分钟）
time_slots = [
    {'start': 0,    'end': 180,  'price': 0.7181, 'type': '平峰'},    # 7:00-10:00
    {'start': 180,  'end': 240,  'price': 1.2238, 'type': '高峰'},    # 10:00-11:00
    {'start': 240,  'end': 360,  'price': 1.3472, 'type': '尖峰'},    # 11:00-13:00
    {'start': 360,  'end': 480,  'price': 1.2238, 'type': '高峰'},    # 13:00-15:00
    {'start': 480,  'end': 540,  'price': 0.7181, 'type': '平峰'},    # 15:00-16:00
    {'start': 540,  'end': 600,  'price': 1.3472, 'type': '尖峰'},    # 16:00-17:00
    {'start': 600,  'end': 660,  'price': 0.7181, 'type': '平峰'},    # 17:00-18:00
    {'start': 660,  'end': 840,  'price': 1.2238, 'type': '高峰'},    # 18:00-21:00
    {'start': 840,  'end': 960,  'price': 0.7181, 'type': '平峰'},    # 21:00-23:00
    {'start': 960,  'end': 1440, 'price': 0.2417, 'type': '低谷'}     # 23:00-7:00
]

# 机器能耗 (kWh/min)
r = [10, 6, 8]

# 加工时间 (分钟)
processing_time = [
    [20, 32, 14],  # 工件1
    [33, 34, 20],  # 工件2
    [45, 14, 30],  # 工件3
    [63, 22, 16],  # 工件4
    [38, 15, 35]   # 工件5
]

n_jobs = len(processing_time)
n_machines = len(processing_time[0])

class GeneticAlgorithm:
    def __init__(self, pop_size=50, max_gen=200):
        self.pop_size = pop_size
        self.max_gen = max_gen
        self.population = []
        
    def initialize_population(self):
        # 生成随机排列作为初始种群
        self.population = [random.sample(range(1, n_jobs+1), n_jobs) for _ in range(self.pop_size)]
    
    def evaluate(self, sequence):
        # 评估单个序列的总成本
        scheduler = Scheduler(sequence)
        scheduler.generate_schedule()
        cost, _ = scheduler.calculate_cost()
        return cost
    
    def selection(self):
        # 轮盘赌选择
        costs = np.array([self.evaluate(ind) for ind in self.population])
        fitness = 1 / (costs + 1e-6)  # 最小化问题取倒数
        selected_idx = np.random.choice(
            len(self.population), 
            size=self.pop_size, 
            p=fitness/fitness.sum()
        )
        return [self.population[i] for i in selected_idx]
    
    def crossover(self, parent1, parent2):
        # 顺序交叉(OX)
        size = len(parent1)
        cxpoint1, cxpoint2 = sorted(random.sample(range(size), 2))
        
        child1 = [None] * size
        child2 = [None] * size
        
        # 复制中间段
        child1[cxpoint1:cxpoint2+1] = parent1[cxpoint1:cxpoint2+1]
        child2[cxpoint1:cxpoint2+1] = parent2[cxpoint1:cxpoint2+1]
        
        # 填充剩余部分
        ptr1 = (cxpoint2 + 1) % size
        ptr2 = (cxpoint2 + 1) % size
        
        for i in range(size):
            current_pos = (cxpoint2 + 1 + i) % size
            if parent2[current_pos] not in child1:
                child1[ptr1] = parent2[current_pos]
                ptr1 = (ptr1 + 1) % size
            
            if parent1[current_pos] not in child2:
                child2[ptr2] = parent1[current_pos]
                ptr2 = (ptr2 + 1) % size
        
        return child1, child2
    
    def mutation(self, individual):
        # 交换变异
        idx1, idx2 = random.sample(range(len(individual)), 2)
        individual[idx1], individual[idx2] = individual[idx2], individual[idx1]
        return individual
    
    def run(self):
        self.initialize_population()
        best_sequence = None
        best_cost = float('inf')
        
        for gen in range(self.max_gen):
            # 评估
            costs = [self.evaluate(ind) for ind in self.population]
            min_cost = min(costs)
            if min_cost < best_cost:
                best_cost = min_cost
                best_sequence = self.population[costs.index(min_cost)]
            
            # 选择
            selected = self.selection()
            
            # 交叉
            offspring = []
            for i in range(0, len(selected), 2):
                if i+1 < len(selected) and random.random() < 0.8:
                    child1, child2 = self.crossover(selected[i], selected[i+1])
                    offspring.extend([child1, child2])
                else:
                    offspring.extend([selected[i], selected[i+1]] if i+1 < len(selected) else [selected[i]])
            
            # 变异
            for i in range(len(offspring)):
                if random.random() < 0.2:
                    offspring[i] = self.mutation(offspring[i])
            
            # 新一代
            self.population = offspring
        
        return best_sequence, best_cost

# 调度器类（与之前相同）
class Scheduler:
    def __init__(self, sequence):
        self.sequence = sequence
        self.machine_timeline = defaultdict(list)
        self.job_completion = {}
        self.schedule = []
    
    def generate_schedule(self):
        for job_idx in self.sequence:
            job = job_idx - 1  # 转换为0-based索引
            prev_end = 0
            
            for machine in range(n_machines):
                duration = processing_time[job][machine]
                
                if machine > 0:
                    prev_end = self.job_completion.get((job_idx, machine-1), 0)
                
                start_time = self.find_available_slot(machine, prev_end, duration)
                end_time = start_time + duration
                
                self.machine_timeline[machine].append((start_time, end_time, job_idx))
                self.job_completion[(job_idx, machine)] = end_time
                
                self.schedule.append({
                    'job': job_idx,
                    'machine': machine + 1,
                    'start': start_time,
                    'end': end_time,
                    'duration': duration
                })
    
    def find_available_slot(self, machine, earliest_start, duration):
        timeline = sorted(self.machine_timeline[machine], key=lambda x: x[0])
        
        if not timeline:
            return max(earliest_start, 0)
        
        # 检查第一个窗口
        if earliest_start + duration <= timeline[0][0]:
            return max(earliest_start, 0)
        
        # 检查中间窗口
        for i in range(len(timeline)-1):
            current_end = timeline[i][1]
            next_start = timeline[i+1][0]
            available_start = max(current_end, earliest_start)
            if next_start - available_start >= duration:
                return available_start
        
        # 检查最后一个窗口
        last_end = timeline[-1][1]
        return max(last_end, earliest_start)
    
    def calculate_cost(self):
        total_cost = 0
        peak_time = 0
        
        for record in self.schedule:
            start = record['start']
            end = record['end']
            machine = record['machine'] - 1
            remaining = record['duration']
            current_time = start
            
            while remaining > 0:
                slot = self.get_time_slot(current_time)
                available = min(remaining, slot['end'] - current_time)
                
                cost = available * slot['price'] * r[machine]
                total_cost += cost
                
                if slot['type'] == '尖峰':
                    peak_time += available
                
                remaining -= available
                current_time += available
        
        return total_cost, peak_time
    
    def get_time_slot(self, time):
        time = time % 1440
        for slot in time_slots:
            if slot['start'] <= time < slot['end']:
                return slot
        return time_slots[-1]

# 格式化输出（与之前相同）
def format_time(minutes):
    base_time = datetime.strptime("07:00", "%H:%M")
    delta = timedelta(minutes=minutes)
    return (base_time + delta).strftime("%H:%M")

def print_solution(sequence, cost, peak_time, schedule):
    print("=== 最优解 ===")
    print(f"总用电成本: {cost:.2f} 元")
    print(f"尖峰时段总加工时间: {peak_time:.1f} 分钟\n")
    
    job_order = " → ".join([f"J{job}" for job in sequence])
    print(f"最优工件顺序: {job_order}\n")
    
    print("=== 详细调度方案 ===\n")
    
    job_records = defaultdict(dict)
    for record in schedule:
        job = record['job']
        machine = record['machine']
        job_records[job][machine] = record
    
    for job in sequence:
        print(f"工件{job}:")
        for machine in range(1, n_machines+1):
            record = job_records[job][machine]
            start = record['start']
            end = record['end']
            duration = record['duration']
            
            print(f"  机器{machine}: {format_time(start)}-{format_time(end)}")
            print("    时段详情:")
            
            current_time = start
            remaining = duration
            
            while remaining > 0:
                slot = Scheduler([]).get_time_slot(current_time)
                available = min(remaining, slot['end'] - current_time)
                
                start_str = format_time(current_time)
                end_str = format_time(current_time + available)
                
                cost = available * slot['price'] * r[machine-1]
                print(f"    - {slot['type']} {start_str}-{end_str}: {available:.1f}分钟, "
                      f"电价={slot['price']:.4f}元/kWh, 电费={cost:.2f}元")
                
                remaining -= available
                current_time += available
            
            print()
        print()

# 主程序
if __name__ == "__main__":
    # 运行遗传算法求解最优序列
    ga = GeneticAlgorithm(pop_size=50, max_gen=200)
    best_sequence, best_cost = ga.run()
    
    # 生成详细调度方案
    scheduler = Scheduler(best_sequence)
    scheduler.generate_schedule()
    total_cost, peak_time = scheduler.calculate_cost()
    
    # 输出结果
    print_solution(best_sequence, total_cost, peak_time, scheduler.schedule)

=== 最优解 ===
总用电成本: 2821.34 元
尖峰时段总加工时间: 0.0 分钟

最优工件顺序: J5 → J1 → J2 → J3 → J4

=== 详细调度方案 ===

工件5:
  机器1: 07:00-07:38
    时段详情:
    - 平峰 07:00-07:38: 38.0分钟, 电价=0.7181元/kWh, 电费=272.88元

  机器2: 07:38-07:53
    时段详情:
    - 平峰 07:38-07:53: 15.0分钟, 电价=0.7181元/kWh, 电费=64.63元

  机器3: 07:53-08:28
    时段详情:
    - 平峰 07:53-08:28: 35.0分钟, 电价=0.7181元/kWh, 电费=201.07元


工件1:
  机器1: 07:38-07:58
    时段详情:
    - 平峰 07:38-07:58: 20.0分钟, 电价=0.7181元/kWh, 电费=143.62元

  机器2: 07:58-08:30
    时段详情:
    - 平峰 07:58-08:30: 32.0分钟, 电价=0.7181元/kWh, 电费=137.88元

  机器3: 08:30-08:44
    时段详情:
    - 平峰 08:30-08:44: 14.0分钟, 电价=0.7181元/kWh, 电费=80.43元


工件2:
  机器1: 07:58-08:31
    时段详情:
    - 平峰 07:58-08:31: 33.0分钟, 电价=0.7181元/kWh, 电费=236.97元

  机器2: 08:31-09:05
    时段详情:
    - 平峰 08:31-09:05: 34.0分钟, 电价=0.7181元/kWh, 电费=146.49元

  机器3: 09:05-09:25
    时段详情:
    - 平峰 09:05-09:25: 20.0分钟, 电价=0.7181元/kWh, 电费=114.90元


工件3:
  机器1: 08:31-09:16
    时段详情:
    - 平峰 08:31-09:16: 45.0分钟, 电价=0.7181元/kWh, 电费=323.14元

  机器2: 09:16-