In [4]:
from kamgon import *
import time

In [5]:
class JobShopSchedulingProblem(CSPProblem):
    """
    ปัญหาการจัดตารางงานในโรงงาน (Job Shop Scheduling)
    ที่ต้องจัดสรรงานไปยังเครื่องจักรโดยคำนึงถึงลำดับการดำเนินงานและเวลาที่ใช้
    """
    
    def __init__(self, jobs: List[str], machines: List[str], 
                 operations: Dict[str, List[tuple]], 
                 processing_times: Dict[tuple, int],
                 machine_availability: Dict[str, List[tuple]]):
        """
        เริ่มต้นปัญหา Job Shop Scheduling
        
        Args:
            jobs: รายการงาน
            machines: รายการเครื่องจักร
            operations: ลำดับการดำเนินงานสำหรับแต่ละงาน [(เครื่องจักร, ลำดับ), ...]
            processing_times: เวลาที่ใช้ในการประมวลผล {(งาน, เครื่องจักร): เวลา}
            machine_availability: ช่วงเวลาที่เครื่องจักรสามารถใช้งานได้
        """
        self.jobs = jobs
        self.machines = machines
        self.operations = operations
        self.processing_times = processing_times
        self.machine_availability = machine_availability
        
        # สร้างตัวแปรสำหรับแต่ละ operation
        variables = []
        domains = {}
        
        for job in jobs:
            job_operations = operations.get(job, [])
            for i, (machine, _) in enumerate(job_operations):
                var_name = f"{job}_op{i}_{machine}"
                variables.append(var_name)
                
                # สร้าง domain ที่เป็นช่วงเวลาที่เป็นไปได้
                processing_time = processing_times.get((job, machine), 1)
                available_slots = machine_availability.get(machine, [(0, 24)])
                
                possible_start_times = []
                for start_time, end_time in available_slots:
                    for t in range(start_time, end_time - processing_time + 1):
                        possible_start_times.append(t)
                
                domains[var_name] = possible_start_times
        
        super().__init__(variables, domains)
    
    def is_consistent(self, variable: str, value: Any, assignment: Dict[str, Any]) -> bool:
        """
        ตรวจสอบข้อจำกัดของ Job Shop Scheduling
        
        Args:
            variable: ชื่อตัวแปร operation
            value: เวลาเริ่มต้นของ operation
            assignment: การจัดตารางที่มีอยู่แล้ว
            
        Returns:
            True ถ้าการจัดตารางนี้ไม่ขัดแย้งกับข้อจำกัด
        """
        job, op_index, machine = self._parse_variable(variable)
        start_time = value
        processing_time = self.processing_times.get((job, machine), 1)
        end_time = start_time + processing_time
        
        for assigned_var, assigned_start in assignment.items():
            if assigned_var == variable:
                continue
                
            assigned_job, assigned_op_index, assigned_machine = self._parse_variable(assigned_var)
            assigned_processing_time = self.processing_times.get((assigned_job, assigned_machine), 1)
            assigned_end = assigned_start + assigned_processing_time
            
            # ข้อจำกัดที่ 1: เครื่องจักรเดียวไม่สามารถทำงานหลายอย่างพร้อมกันได้
            if machine == assigned_machine:
                if not (end_time <= assigned_start or assigned_end <= start_time):
                    return False
            
            # ข้อจำกัดที่ 2: งานเดียวกันต้องทำตามลำดับที่กำหนด
            if job == assigned_job:
                if op_index < assigned_op_index and end_time > assigned_start:
                    return False
                if op_index > assigned_op_index and start_time < assigned_end:
                    return False
        
        return True
    
    def _parse_variable(self, variable: str) -> tuple:
        """แยกชื่อตัวแปรเพื่อหาข้อมูลงาน operation และเครื่องจักร"""
        parts = variable.split('_')
        job = parts[0]
        op_part = parts[1]  # เช่น 'op0', 'op1'
        op_index = int(op_part[2:])
        machine = parts[2]
        return job, op_index, machine
    
    def calculate_makespan(self, assignment: Dict[str, Any]) -> int:
        """
        คำนวณเวลารวมที่ใช้ในการทำงานทั้งหมด (makespan)
        
        Args:
            assignment: การจัดตารางที่สมบูรณ์
            
        Returns:
            เวลาสูงสุดที่ใช้ในการทำงานทั้งหมด
        """
        max_end_time = 0
        
        for variable, start_time in assignment.items():
            job, op_index, machine = self._parse_variable(variable)
            processing_time = self.processing_times.get((job, machine), 1)
            end_time = start_time + processing_time
            max_end_time = max(max_end_time, end_time)
        
        return max_end_time
    
    def get_machine_schedule(self, assignment: Dict[str, Any]) -> Dict[str, List[tuple]]:
        """
        จัดกลุ่มตารางงานตามเครื่องจักร
        
        Args:
            assignment: การจัดตารางที่สมบูรณ์
            
        Returns:
            Dictionary ที่จัดกลุ่มงานตามเครื่องจักร
        """
        machine_schedule = {machine: [] for machine in self.machines}
        
        for variable, start_time in assignment.items():
            job, op_index, machine = self._parse_variable(variable)
            processing_time = self.processing_times.get((job, machine), 1)
            end_time = start_time + processing_time
            
            machine_schedule[machine].append((job, start_time, end_time))
        
        # เรียงลำดับตามเวลาเริ่มต้น
        for machine in machine_schedule:
            machine_schedule[machine].sort(key=lambda x: x[1])
        
        return machine_schedule


In [6]:
class UniversitySchedulingProblem(CSPProblem):
    """
    ระบบจัดตารางเรียนมหาวิทยาลัยที่ครอบคลุมข้อจำกัดที่ซับซ้อน
    รวมถึงการจัดสรรอาจารย์ ห้องเรียน และการหลีกเลี่ยงการทับซ้อนของรายวิชา
    """
    
    def __init__(self, courses: List[str], instructors: Dict[str, List[str]], 
                 rooms: List[str], time_slots: List[str], 
                 room_capacity: Dict[str, int], course_enrollment: Dict[str, int],
                 instructor_availability: Dict[str, List[str]],
                 room_requirements: Dict[str, List[str]]):
        """
        เริ่มต้นระบบจัดตารางเรียนมหาวิทยาลัย
        
        Args:
            courses: รายการรายวิชา
            instructors: รายการอาจารย์ที่สามารถสอนแต่ละวิชาได้
            rooms: รายการห้องเรียน
            time_slots: รายการช่วงเวลา
            room_capacity: ความจุของแต่ละห้อง
            course_enrollment: จำนวนนักเรียนที่ลงทะเบียนแต่ละวิชา
            instructor_availability: ช่วงเวลาที่อาจารย์สามารถสอนได้
            room_requirements: ความต้องการพิเศษของห้องสำหรับแต่ละวิชา
        """
        self.instructors = instructors
        self.rooms = rooms
        self.time_slots = time_slots
        self.room_capacity = room_capacity
        self.course_enrollment = course_enrollment
        self.instructor_availability = instructor_availability
        self.room_requirements = room_requirements
        
        # สร้าง domain ที่เป็น combination ของอาจารย์ เวลา และห้อง
        domains = {}
        for course in courses:
            possible_assignments = []
            available_instructors = instructors.get(course, [])
            required_rooms = room_requirements.get(course, rooms)
            
            for instructor in available_instructors:
                available_times = instructor_availability.get(instructor, time_slots)
                for time in available_times:
                    for room in required_rooms:
                        if (room in self.room_capacity and 
                            self.room_capacity[room] >= course_enrollment.get(course, 0)):
                            possible_assignments.append((instructor, time, room))
            
            domains[course] = possible_assignments
        
        super().__init__(courses, domains)
    
    def is_consistent(self, variable: str, value: Any, assignment: Dict[str, Any]) -> bool:
        """
        ตรวจสอบข้อจำกัดของการจัดตารางเรียน
        
        Args:
            variable: ชื่อรายวิชา
            value: tuple ของ (อาจารย์, เวลา, ห้อง)
            assignment: การจัดตารางที่มีอยู่แล้ว
            
        Returns:
            True ถ้าการจัดตารางนี้ไม่ขัดแย้งกับข้อจำกัด
        """
        instructor, time, room = value
        
        for assigned_course, assigned_value in assignment.items():
            if assigned_course == variable:
                continue
                
            assigned_instructor, assigned_time, assigned_room = assigned_value
            
            # ข้อจำกัดที่ 1: อาจารย์คนเดียวไม่สามารถสอนหลายวิชาในเวลาเดียวกันได้
            if instructor == assigned_instructor and time == assigned_time:
                return False
            
            # ข้อจำกัดที่ 2: ห้องเดียวไม่สามารถใช้หลายวิชาในเวลาเดียวกันได้
            if room == assigned_room and time == assigned_time:
                return False
        
        return True
    
    def get_schedule_summary(self, assignment: Dict[str, Any]) -> Dict[str, List[str]]:
        """
        สร้างสรุปตารางเรียนตามช่วงเวลา
        
        Args:
            assignment: การจัดตารางที่สมบูรณ์
            
        Returns:
            Dictionary ที่จัดกลุ่มตารางตามช่วงเวลา
        """
        schedule = {time: [] for time in self.time_slots}
        
        for course, (instructor, time, room) in assignment.items():
            schedule[time].append(f"{course} ({instructor}) - {room}")
        
        return schedule

In [7]:
class AdvancedCSPSolver:
    """
    ระบบจัดการปัญหา CSP ขั้นสูงที่รองรับการวิเคราะห์และเปรียบเทียบผลลัพธ์
    """
    
    def __init__(self):
        self.problems = {}
        self.solutions = {}
        self.performance_metrics = {}
    
    def register_problem(self, name: str, problem: CSPProblem, description: str = ""):
        """
        ลงทะเบียนปัญหา CSP พร้อมคำอธิบาย
        
        Args:
            name: ชื่อปัญหา
            problem: instance ของปัญหา CSP
            description: คำอธิบายปัญหา
        """
        self.problems[name] = {
            'problem': problem,
            'description': description,
            'variables': len(problem.variables),
            'total_domain_size': sum(len(domain) for domain in problem.domains.values())
        }
    
    def solve_with_comparison(self, name: str, algorithms: List[str] = None, 
                            verbose: bool = False) -> Dict[str, Any]:
        """
        แก้ปัญหาด้วยอัลกอริทึมหลายตัวและเปรียบเทียบผลลัพธ์
        
        Args:
            name: ชื่อปัญหา
            algorithms: รายการอัลกอริทึมที่ต้องการทดสอบ
            verbose: เปิดใช้งานการแสดงผลรายละเอียด
            
        Returns:
            ผลลัพธ์การเปรียบเทียบอัลกอริทึม
        """
        if name not in self.problems:
            return {"error": f"ไม่พบปัญหา {name}"}
        
        if algorithms is None:
            algorithms = ["backtracking", "dfs", "best_first"]
        
        problem = self.problems[name]['problem']
        results = {}
        
        for algorithm in algorithms:
            if verbose:
                print(f"กำลังทดสอบอัลกอริทึม {algorithm} สำหรับ {name}...")
            
            start_time = time.time()
            
            try:
                if algorithm == "backtracking":
                    solution = backtracking_search(problem, verbose=False)
                else:
                    solution = csp_search(problem, algorithm, verbose=False)
                
                solve_time = time.time() - start_time
                
                results[algorithm] = {
                    'success': solution is not None,
                    'solution': solution,
                    'time': solve_time,
                    'variables_assigned': len(solution) if solution else 0
                }
                
            except Exception as e:
                results[algorithm] = {
                    'success': False,
                    'error': str(e),
                    'time': time.time() - start_time
                }
        
        # เก็บผลลัพธ์การเปรียบเทียบ
        self.performance_metrics[name] = results
        
        # เลือกคำตอบที่ดีที่สุด
        best_solution = None
        best_time = float('inf')
        
        for algorithm, result in results.items():
            if result['success'] and result['time'] < best_time:
                best_solution = result['solution']
                best_time = result['time']
        
        if best_solution:
            self.solutions[name] = best_solution
        
        return {
            'problem_info': self.problems[name],
            'algorithm_results': results,
            'best_solution': best_solution,
            'best_time': best_time if best_solution else None
        }
    
    def generate_report(self, name: str) -> str:
        """
        สร้างรายงานสรุปผลลัพธ์การแก้ปัญหา
        
        Args:
            name: ชื่อปัญหา
            
        Returns:
            รายงานในรูปแบบข้อความ
        """
        if name not in self.problems:
            return f"ไม่พบปัญหา {name}"
        
        problem_info = self.problems[name]
        performance = self.performance_metrics.get(name, {})
        
        report = f"""
=== รายงานการแก้ปัญหา {name} ===

ข้อมูลปัญหา:
- คำอธิบาย: {problem_info['description']}
- จำนวนตัวแปร: {problem_info['variables']}
- ขนาด Domain รวม: {problem_info['total_domain_size']}

ผลลัพธ์การเปรียบเทียบอัลกอริทึม:
"""
        
        for algorithm, result in performance.items():
            if result['success']:
                report += f"- {algorithm}: สำเร็จ (เวลา: {result['time']:.4f} วินาที)\n"
            else:
                report += f"- {algorithm}: ล้มเหลว (เวลา: {result['time']:.4f} วินาที)\n"
        
        if name in self.solutions:
            report += f"\nคำตอบที่ได้: {len(self.solutions[name])} ตัวแปรได้รับการกำหนดค่า\n"
        
        return report
    
    def export_solution(self, name: str, format: str = "dict") -> Any:
        """
        ส่งออกคำตอบในรูปแบบที่กำหนด
        
        Args:
            name: ชื่อปัญหา
            format: รูปแบบการส่งออก ("dict", "json", "csv")
            
        Returns:
            คำตอบในรูปแบบที่ต้องการ
        """
        if name not in self.solutions:
            return None
        
        solution = self.solutions[name]
        
        if format == "dict":
            return solution
        elif format == "json":
            import json
            return json.dumps(solution, ensure_ascii=False, indent=2)
        elif format == "csv":
            import csv
            import io
            output = io.StringIO()
            writer = csv.writer(output)
            writer.writerow(['Variable', 'Value'])
            for var, value in solution.items():
                writer.writerow([var, value])
            return output.getvalue()
        else:
            return solution


In [8]:
# การใช้งานระบบการจัดการ CSP ขั้นสูง
if __name__ == "__main__":
    print("=== ระบบจัดการปัญหา CSP ขั้นสูง ===")
    
    solver = AdvancedCSPSolver()
    
    # ลงทะเบียนปัญหาต่างๆ
    solver.register_problem(
        "university_schedule",
        UniversitySchedulingProblem(
            ['คณิตศาสตร์ 1', 'ฟิสิกส์ 1', 'เคมี 1'],
            {
                'คณิตศาสตร์ 1': ['อ.สมชาย'],
                'ฟิสิกส์ 1': ['อ.วิรัช'],
                'เคมี 1': ['อ.ปิยะ']
            },
            ['ห้อง101', 'ห้อง102'],
            ['08:00-10:00', '10:00-12:00'],
            {'ห้อง101': 50, 'ห้อง102': 40},
            {'คณิตศาสตร์ 1': 45, 'ฟิสิกส์ 1': 35, 'เคมี 1': 30},
            {
                'อ.สมชาย': ['08:00-10:00', '10:00-12:00'],
                'อ.วิรัช': ['08:00-10:00', '10:00-12:00'],
                'อ.ปิยะ': ['08:00-10:00', '10:00-12:00']
            },
            {'คณิตศาสตร์ 1': ['ห้อง101', 'ห้อง102'], 'ฟิสิกส์ 1': ['ห้อง101', 'ห้อง102'], 'เคมี 1': ['ห้อง101', 'ห้อง102']}
        ),
        "การจัดตารางเรียนมหาวิทยาลัยแบบง่าย"
    )
    
    solver.register_problem(
        "job_shop",
        JobShopSchedulingProblem(
            ['งาน1', 'งาน2'],
            ['เครื่องA', 'เครื่องB'],
            {
                'งาน1': [('เครื่องA', 0), ('เครื่องB', 1)],
                'งาน2': [('เครื่องB', 0), ('เครื่องA', 1)]
            },
            {
                ('งาน1', 'เครื่องA'): 3,
                ('งาน1', 'เครื่องB'): 2,
                ('งาน2', 'เครื่องA'): 2,
                ('งาน2', 'เครื่องB'): 3
            },
            {'เครื่องA': [(0, 10)], 'เครื่องB': [(0, 10)]}
        ),
        "การจัดตารางงานในโรงงานแบบง่าย"
    )
    
    # ทดสอบและเปรียบเทียบอัลกอริทึม
    for problem_name in ['university_schedule', 'job_shop']:
        print(f"\n--- การทดสอบปัญหา {problem_name} ---")
        
        results = solver.solve_with_comparison(problem_name, verbose=True)
        
        if results.get('best_solution'):
            print(f"พบคำตอบที่ดีที่สุดใน {results['best_time']:.4f} วินาที")
            
            # สร้างและแสดงรายงาน
            report = solver.generate_report(problem_name)
            print(report)
            
            # ส่งออกคำตอบในรูปแบบ JSON
            json_solution = solver.export_solution(problem_name, "json")
            print(f"คำตอบในรูปแบบ JSON:\n{json_solution}")
            
        else:
            print("ไม่พบคำตอบสำหรับปัญหานี้")

=== ระบบจัดการปัญหา CSP ขั้นสูง ===

--- การทดสอบปัญหา university_schedule ---
กำลังทดสอบอัลกอริทึม backtracking สำหรับ university_schedule...
กำลังทดสอบอัลกอริทึม dfs สำหรับ university_schedule...
กำลังทดสอบอัลกอริทึม best_first สำหรับ university_schedule...
พบคำตอบที่ดีที่สุดใน 0.0000 วินาที

=== รายงานการแก้ปัญหา university_schedule ===

ข้อมูลปัญหา:
- คำอธิบาย: การจัดตารางเรียนมหาวิทยาลัยแบบง่าย
- จำนวนตัวแปร: 3
- ขนาด Domain รวม: 10

ผลลัพธ์การเปรียบเทียบอัลกอริทึม:
- backtracking: สำเร็จ (เวลา: 0.0000 วินาที)
- dfs: สำเร็จ (เวลา: 0.0000 วินาที)
- best_first: สำเร็จ (เวลา: 0.0000 วินาที)

คำตอบที่ได้: 3 ตัวแปรได้รับการกำหนดค่า

คำตอบในรูปแบบ JSON:
{
  "คณิตศาสตร์ 1": [
    "อ.สมชาย",
    "08:00-10:00",
    "ห้อง101"
  ],
  "ฟิสิกส์ 1": [
    "อ.วิรัช",
    "08:00-10:00",
    "ห้อง102"
  ],
  "เคมี 1": [
    "อ.ปิยะ",
    "10:00-12:00",
    "ห้อง101"
  ]
}

--- การทดสอบปัญหา job_shop ---
กำลังทดสอบอัลกอริทึม backtracking สำหรับ job_shop...
กำลังทดสอบอัลกอริทึม dfs สำหรับ job_shop..