In [1]:
import random
import math
import copy

# 数据结构定义
class Employee:
    def __init__(self, name, position, phone, email, store, 
                 workday_pref=(0,6),  # 周一(0)到周日(6)
                 time_pref=('00:00', '23:59'),
                 max_daily_hours=24,
                 max_weekly_hours=168):
        self.name = name
        self.position = position
        self.phone = phone
        self.email = email
        self.store = store
        self.workday_pref = workday_pref
        self.time_pref = time_pref
        self.max_daily_hours = max_daily_hours
        self.max_weekly_hours = max_weekly_hours

class Shift:
    def __init__(self, day, start_time, end_time, required_positions):
        self.day = day             # 0-6 对应周一到周日
        self.start_time = start_time  # 格式 'HH:MM'
        self.end_time = end_time      # 格式 'HH:MM'
        self.required_positions = required_positions  # {职位: 人数}

# 模拟退火参数
INITIAL_TEMP = 1000
COOLING_RATE = 0.995
MIN_TEMP = 1e-3
NUM_ITERATIONS = 1000

def time_to_minutes(t):
    hours, mins = map(int, t.split(':'))
    return hours * 60 + mins

def calculate_shift_duration(shift):
    start = time_to_minutes(shift.start_time)
    end = time_to_minutes(shift.end_time)
    return (end - start) / 60

def generate_initial_solution(shifts, employees):
    schedule = []
    for shift in shifts:
        assignment = {}
        for position, count in shift.required_positions.items():
            candidates = [e for e in employees if e.position == position]
            selected = random.sample(candidates, min(count, len(candidates)))
            assignment[position] = selected
        schedule.append((shift, assignment))
    return schedule

def calculate_cost(schedule):
    cost = 0
    employee_hours = {e.name: 0 for e in employees}
    
    for shift, assignment in schedule:
        # 检查班次需求是否满足
        for position, count in shift.required_positions.items():
            assigned_count = len(assignment.get(position, []))
            if assigned_count < count:
                cost += 100 * (count - assigned_count)  # 严重惩罚
        
        # 检查员工约束
        for position, workers in assignment.items():
            for employee in workers:
                # 工作日偏好
                if not (employee.workday_pref[0] <= shift.day <= employee.workday_pref[1]):
                    cost += 10
                
                # 工作时间偏好
                shift_start = time_to_minutes(shift.start_time)
                shift_end = time_to_minutes(shift.end_time)
                pref_start = time_to_minutes(employee.time_pref[0])
                pref_end = time_to_minutes(employee.time_pref[1])
                if shift_start < pref_start or shift_end > pref_end:
                    cost += 5
                
                # 更新工作时长
                duration = calculate_shift_duration(shift)
                employee_hours[employee.name] += duration
                
                # 每日时长限制
                if duration > employee.max_daily_hours:
                    cost += 20
    
    # 检查每周时长限制
    for name, hours in employee_hours.items():
        if hours > next(e.max_weekly_hours for e in employees if e.name == name):
            cost += 50
    
    return cost

def generate_neighbor(current_schedule):
    new_schedule = copy.deepcopy(current_schedule)
    
    # 随机选择一个班次进行修改
    idx = random.randint(0, len(new_schedule)-1)
    shift, assignment = new_schedule[idx]
    
    # 随机选择一个职位进行修改
    positions = list(shift.required_positions.keys())
    if not positions:
        return new_schedule
    selected_pos = random.choice(positions)
    
    # 随机替换一个员工
    current_workers = assignment.get(selected_pos, [])
    if current_workers:
        remove_idx = random.randint(0, len(current_workers)-1)
        del current_workers[remove_idx]
    
    candidates = [e for e in employees if e.position == selected_pos]
    if candidates:
        new_worker = random.choice(candidates)
        current_workers.append(new_worker)
    
    return new_schedule

def simulated_annealing(employees, shifts):
    current_sol = generate_initial_solution(shifts, employees)
    current_cost = calculate_cost(current_sol)
    best_sol = copy.deepcopy(current_sol)
    best_cost = current_cost
    
    temp = INITIAL_TEMP
    
    while temp > MIN_TEMP:
        for _ in range(NUM_ITERATIONS):
            neighbor = generate_neighbor(current_sol)
            neighbor_cost = calculate_cost(neighbor)
            
            if neighbor_cost < current_cost or \
               random.random() < math.exp(-(neighbor_cost - current_cost)/temp):
                current_sol = neighbor
                current_cost = neighbor_cost
                
                if neighbor_cost < best_cost:
                    best_sol = copy.deepcopy(neighbor)
                    best_cost = neighbor_cost
        
        temp *= COOLING_RATE
    
    return best_sol, best_cost

# 示例使用
if __name__ == "__main__":
    # 创建测试数据
    employees = [
        Employee("Alice", "店员", "123-4567", "alice@store.com", "Store A",
                 workday_pref=(0, 4), max_daily_hours=8, max_weekly_hours=40),
        Employee("Bob", "门店经理", "234-5678", "bob@store.com", "Store A")
    ]
    
    shifts = [
        Shift(0, "09:00", "17:00", {"门店经理": 1, "店员": 2}),
        Shift(1, "10:00", "14:00", {"店员": 3})
    ]
    
    # 运行算法
    best_schedule, cost = simulated_annealing(employees, shifts)
    
    # 输出结果
    print(f"Final Cost: {cost}")
    for shift, assignment in best_schedule:
        print(f"\nShift on day {shift.day} ({shift.start_time}-{shift.end_time}):")
        for position, workers in assignment.items():
            print(f"  {position}: {[w.name for w in workers]}")

Final Cost: 300

Shift on day 0 (09:00-17:00):
  门店经理: ['Bob']
  店员: ['Alice']

Shift on day 1 (10:00-14:00):
  店员: ['Alice']


In [None]:
import logging
import random
import math
import copy
from datetime import datetime

# 配置日志格式
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

# 数据结构定义（保持不变）
class Employee:
    def __init__(self, name, position, phone, email, store, 
                 workday_pref=(0,6),  # 周一(0)到周日(6)
                 time_pref=('00:00', '23:59'),
                 max_daily_hours=24,
                 max_weekly_hours=168):
        self.name = name
        self.position = position
        self.phone = phone
        self.email = email
        self.store = store
        self.workday_pref = workday_pref
        self.time_pref = time_pref
        self.max_daily_hours = max_daily_hours
        self.max_weekly_hours = max_weekly_hours

class Shift:
    def __init__(self, day, start_time, end_time, required_positions):
        self.day = day
        self.start_time = start_time
        self.end_time = end_time
        self.required_positions = required_positions

# 辅助函数（保持不变）
def time_to_minutes(t):
    hours, mins = map(int, t.split(':'))
    return hours * 60 + mins

def calculate_shift_duration(shift):
    start = time_to_minutes(shift.start_time)
    end = time_to_minutes(shift.end_time)
    return (end - start) / 60

# 初始解生成（添加日志）
def generate_initial_solution(shifts, employees):
    logger.info("开始生成初始解...")
    schedule = []
    for shift in shifts:
        assignment = {}
        for position, count in shift.required_positions.items():
            candidates = [e for e in employees if e.position == position]
            selected = random.sample(candidates, min(count, len(candidates)))
            assignment[position] = selected
            logger.debug(f"班次{shift.day} {shift.start_time}-{shift.end_time} - 分配{position} {len(selected)}人")
        schedule.append((shift, assignment))
    logger.info(f"初始解生成完成，共安排{len(shifts)}个班次")
    return schedule

# 成本计算（添加详细日志）
def calculate_cost(schedule):
    logger.debug("开始计算方案成本...")
    cost = 0
    employee_hours = {e.name: 0 for e in employees}
    violation_details = []

    for shift, assignment in schedule:
        # 检查班次需求是否满足
        for position, count in shift.required_positions.items():
            assigned_count = len(assignment.get(position, []))
            if assigned_count < count:
                penalty = 100 * (count - assigned_count)
                violation_details.append(
                    f"班次{shift.day} {position} 缺少{count - assigned_count}人（惩罚+{penalty}）"
                )
                cost += penalty

        # 检查员工约束
        for position, workers in assignment.items():
            for employee in workers:
                # 工作日偏好
                if not (employee.workday_pref[0] <= shift.day <= employee.workday_pref[1]):
                    violation_details.append(
                        f"{employee.name} 工作日偏好冲突（周{shift.day+1}，偏好周{employee.workday_pref[0]+1}-周{employee.workday_pref[1]+1}）"
                    )
                    cost += 10
                
                # 工作时间偏好
                shift_start = time_to_minutes(shift.start_time)
                shift_end = time_to_minutes(shift.end_time)
                pref_start = time_to_minutes(employee.time_pref[0])
                pref_end = time_to_minutes(employee.time_pref[1])
                if shift_start < pref_start or shift_end > pref_end:
                    violation_details.append(
                        f"{employee.name} 时间偏好冲突（班次{shift.start_time}-{shift.end_time} vs 偏好{employee.time_pref[0]}-{employee.time_pref[1]}）"
                    )
                    cost += 5
                
                # 更新工作时长
                duration = calculate_shift_duration(shift)
                employee_hours[employee.name] += duration
                
                # 每日时长限制
                if duration > employee.max_daily_hours:
                    violation_details.append(
                        f"{employee.name} 单日超时（{duration}h > 限制{employee.max_daily_hours}h）"
                    )
                    cost += 20
    
    # 检查每周时长限制
    for name, hours in employee_hours.items():
        max_hours = next(e.max_weekly_hours for e in employees if e.name == name)
        if hours > max_hours:
            violation_details.append(
                f"{name} 周超时（{hours}h > 限制{max_hours}h）"
            )
            cost += 50

    # 记录详细违规信息
    if violation_details:
        logger.debug(f"发现{len(violation_details)}条违规：")
        for detail in violation_details[:3]:  # 只显示前3条避免日志过多
            logger.debug(f"  * {detail}")
        if len(violation_details) > 3:
            logger.debug(f"  还有{len(violation_details)-3}条未显示...")
    
    logger.debug(f"总成本计算完成：{cost}")
    return cost

# 邻居生成（添加调试日志）
def generate_neighbor(current_schedule):
    logger.debug("生成相邻解...")
    new_schedule = copy.deepcopy(current_schedule)
    
    idx = random.randint(0, len(new_schedule)-1)
    shift, assignment = new_schedule[idx]
    
    positions = list(shift.required_positions.keys())
    if not positions:
        return new_schedule
    selected_pos = random.choice(positions)
    
    current_workers = assignment.get(selected_pos, [])
    if current_workers:
        remove_idx = random.randint(0, len(current_workers)-1)
        removed = current_workers.pop(remove_idx)
        logger.debug(f"移除员工：{removed.name}（{selected_pos}）")
    
    candidates = [e for e in employees if e.position == selected_pos]
    if candidates:
        new_worker = random.choice(candidates)
        current_workers.append(new_worker)
        logger.debug(f"新增员工：{new_worker.name}（{selected_pos}）")
    
    return new_schedule

# 模拟退火主算法（添加详细日志）
def simulated_annealing(employees, shifts):
    logger.info("==== 开始模拟退火算法 ====")
    current_sol = generate_initial_solution(shifts, employees)
    current_cost = calculate_cost(current_sol)
    best_sol = copy.deepcopy(current_sol)
    best_cost = current_cost
    
    temp = INITIAL_TEMP
    iteration = 0
    
    logger.info(f"初始温度：{INITIAL_TEMP} 初始成本：{current_cost}")

    while temp > MIN_TEMP:
        iteration += 1
        logger.debug(f"当前温度：{temp:.2f} 当前成本：{current_cost} 最佳成本：{best_cost}")
        
        for _ in range(NUM_ITERATIONS):
            neighbor = generate_neighbor(current_sol)
            neighbor_cost = calculate_cost(neighbor)
            
            cost_diff = neighbor_cost - current_cost
            accept_prob = math.exp(-cost_diff/temp) if cost_diff > 0 else 1
            
            if neighbor_cost < current_cost or random.random() < accept_prob:
                current_sol = neighbor
                current_cost = neighbor_cost
                logger.debug(f"接受新解（成本变化：{cost_diff}）")
                
                if neighbor_cost < best_cost:
                    best_sol = copy.deepcopy(neighbor)
                    best_cost = neighbor_cost
                    logger.info(f"发现新最佳解！温度：{temp:.2f} 成本：{best_cost}")
        
        # 温度更新日志
        if iteration % 10 == 0:
            logger.info(f"迭代 [{iteration}] 温度：{temp:.2f} 当前成本：{current_cost} 最佳成本：{best_cost}")
        
        temp *= COOLING_RATE
    
    logger.info("==== 算法结束 ====")
    logger.info(f"最终最佳成本：{best_cost}")
    return best_sol, best_cost

# 结果输出函数（新增）
def print_schedule(schedule):
    logger.info("\n最终排班方案：")
    total_violations = 0
    for shift, assignment in schedule:
        logger.info(f"\n班次 {shift.day+1}（周{shift.day+1}）{shift.start_time}-{shift.end_time}:")
        for position, workers in assignment.items():
            logger.info(f"  {position}: {', '.join([w.name for w in workers])}")
            required = shift.required_positions.get(position, 0)
            if len(workers) < required:
                logger.warning(f"   ! 人手不足：需要{required}人，实际{len(workers)}人")
                total_violations += 1
    logger.info(f"\n总违规数：{total_violations}")

# 示例使用（添加更多测试数据）
if __name__ == "__main__":
    # 创建测试数据
    employees = [
        Employee("Alice", "店员", "123-4567", "alice@store.com", "Store A",
                 workday_pref=(0, 4),  # 周一到周五
                 time_pref=('08:00', '18:00'),
                 max_daily_hours=8, 
                 max_weekly_hours=40),
        Employee("Bob", "门店经理", "234-5678", "bob@store.com", "Store A"),
        Employee("Charlie", "店员", "345-6789", "charlie@store.com", "Store A",
                 max_daily_hours=6,
                 max_weekly_hours=30),
        Employee("David", "副经理", "456-7890", "david@store.com", "Store A")
    ]
    
    shifts = [
        Shift(0, "09:00", "17:00", {"门店经理": 1, "店员": 2}),
        Shift(1, "10:00", "14:00", {"店员": 3}),
        Shift(4, "08:00", "20:00", {"副经理": 1, "店员": 2}),
        Shift(5, "12:00", "21:00", {"店员": 4})
    ]
    
    # 运行算法
    best_schedule, cost = simulated_annealing(employees, shifts)
    
    # 输出结果
    print_schedule(best_schedule)