In [1]:
from typing import List, Dict, Any
import re

In [2]:
flex = 3 

In [3]:
class OperationMode:
    """
    代表一個操作在單一機器上的執行方式 (一個模式)。
    它是處理時間和機器ID的來源，支持機器的不同處理時間。
    """

    def __init__(self, machine_id: int, processing_time: int):
        self.machine_id: int = machine_id  # 機器 ID
        self.processing_time: int = processing_time  # 該模式下的處理時間

    def __str__(self):
        return f"Mode(m={self.machine_id}, t={self.processing_time})"


class Operation:
    """代表一個工作中的單一加工步驟。
    包含該操作的所有可行模式清單，這是彈性的核心。"""

    def __init__(self, op_id: int, job_id: int, pos_in_job: int, modes: list[OperationMode]):
        self.op_id: int = op_id  # 全局唯一 ID
        self.job_id: int = job_id  # 所屬工作 ID
        self.pos_in_job: int = pos_in_job  # 在工作中的順序位置 (1-based)
        self.modes: list[OperationMode] = modes  # 可行模式清單 (List[OperationMode])

    def __str__(self):
        modes_str = ", ".join(str(m) for m in self.modes)
        return (
            f"Operation(op_id={self.op_id}, job_id={self.job_id}, pos_in_job={self.pos_in_job}, "
            f"modes=[{modes_str}])"
        )


class Job:
    """代表一個工作，包含其所有操作的有序序列。"""

    def __init__(self, job_id: int, operations: list[Operation]):
        self.job_id: int = job_id  # 工作 ID
        self.operations: list[Operation] = operations  # 有序的操作清單

    def __str__(self):
        # Only print crucial information: op_id, pos_in_job, and modes count
        ops_str = ", ".join("(" + ", ".join(str(mode) for mode in op.modes) + ")" for op in self.operations)
        return f"Job(j_id={self.job_id}, op_cnt == {len(self.operations)}, ops == [{ops_str}])"

In [4]:
def parse_tai_to_abstract_fjs(file_content: str) -> Dict[str, Any]:
    """
    解析 Tai's JSSP (或 FJSSP) 格式內容，轉換為抽象的 FJSSP Job 結構。

    每一行代表一個工作：
    - 第一行：job_cnt 與 machine_cnt
    - 每個 job（其餘每一行）:
        數字為 pair: (machine_id, processing_time) 依序排列，代表每個 operation 可行的 (machine, time)。

    Args:
        file_content: FJSSP 檔案的完整文字內容。

    Returns:
        一個字典，包含 'jobs' (List[Job]) 和原始參數 machine_cnt, job_cnt。
    """

    # 分行
    lines = [line.strip() for line in file_content.strip().split("\n") if line.strip()]
    if len(lines) < 1:
        raise ValueError("內容缺少參數行")

    # 第一行：取出 job_cnt 與 machine_cnt
    top_tokens = lines[0].split()
    if len(top_tokens) < 2:
        raise ValueError("首行缺少 job_cnt 與 machine_cnt 參數")
    job_cnt = int(top_tokens[0])
    machine_cnt = int(top_tokens[1])

    # 從第二行開始，逐行處理每個工作
    all_jobs_data = []
    current_op_id = 0

    for job_idx, line in enumerate(lines[1:]):
        tokens = line.strip().split()
        if len(tokens) < 1:
            continue
        job_operations = []

        for i in range(0, len(tokens), 2):
            pos = i // 2 + 1
            machine_id = int(tokens[i])
            processing_time = int(tokens[i + 1])

            mode = OperationMode(machine_id=machine_id, processing_time=processing_time)

            operation = Operation(op_id=current_op_id, job_id=job_idx, pos_in_job=pos, modes=[mode])
            job_operations.append(operation)
            current_op_id += 1
        
        all_jobs_data.append(Job(job_id=job_idx, operations=job_operations))

    # Compute the union of all jobs' operations
    all_operations = []
    for job in all_jobs_data:
        all_operations.extend(job.operations)

    return {
        "jobs": all_jobs_data,
        "job_cnt": job_cnt,
        "machine_cnt": machine_cnt,
        "operations": all_operations,  # union of all job's operations
    }

In [None]:
def add_parallel_machines_to_ops(data: Dict[str, Any], flex: int) -> List[Job]:
    """
    For each machine_id, collect its operations, create (flex-1) new machines,
    and for each operation, add the new machines (with the same processing time) to its modes list.

    Args:
        data: Dictionary containing 'jobs', 'machine_cnt', etc.
        flex: The degree of parallelism (number of parallel machines per original machine).
    Returns:
        List of jobs with modified operations
    """
    
    jobs = data["jobs"]
    M_orig = data["machine_cnt"]
    next_machine_id = M_orig  # Start assigning new IDs after the originals

    for machine_id in range(M_orig):
        new_machine_ids = [next_machine_id + i for i in range(flex - 1)]
        next_machine_id += flex - 1
        # For each op whose first mode's machine_id equals machine_id, append new modes for the new machines
        for op in [op for op in data["operations"] if op.modes[0].machine_id == machine_id]:
            orig_time = op.modes[0].processing_time
            # Append new modes for each new machine (flex-1 replicas)
            for new_machine_id in new_machine_ids:
                new_mode = OperationMode(machine_id=new_machine_id, processing_time=orig_time)
                op.modes.append(new_mode)

    return jobs

In [6]:
with open("tai_j10_m10_1.data", "r") as f:
    content = f.read()
    data = parse_tai_to_abstract_fjs(content)

In [7]:
# 執行彈性模式添加
fjs_data_parallel = add_parallel_machines_to_ops(data, flex)