In [1]:
import sys

sys.path.append("../../")

from app.common.logger import logger
from app.config.utils import Configuration, init_config
from app.common.llm_clients.openai_client import OpenAILLMClient

from ml.data_processing.prompt import STEP_1_2_SYNTHETIC_TEXTBOOK_PROMPT

In [2]:
config = Configuration()
init_config(config)
config = config()

# Data Load & Grouping

In [3]:
import json
import re

In [None]:
with open("../data/amr_guide_preprocessed.json", "r", encoding="utf-8") as f:
    arm_guide_preprocessed = json.load(f)

In [16]:
def group_by_h1(data_list):
    grouped_data = {}

    for item in data_list:
        original_text = item.get("original", "")

        # 정규표현식으로 첫 줄의 대제목(# ...) 추출
        # 예: "# 1. Daily Operating\n## 1.1 ..." -> "# 1. Daily Operating"
        match = re.search(r'^# .+', original_text)

        if match:
            h1_title = match.group(0).strip()
        else:
            # 대제목이 없는 경우 (예외 처리)
            h1_title = "Others"

        if h1_title not in grouped_data:
            grouped_data[h1_title] = []

        grouped_data[h1_title].append(item)

    return grouped_data

def group_by_h2(data_list):
    grouped_data = {}

    for item in data_list:
        original_text = item.get("original", "")

        # 정규표현식으로 두 번째 줄의 중제목(## ...) 추출
        match = re.search(r'^## .+', original_text)

        if match:
            h2_title = match.group(0).strip()
        else:
            # 두 번째 줄의 중제목이 없는 경우 (예외 처리)
            h2_title = "Others"

        if h2_title not in grouped_data:
            grouped_data[h2_title] = []

        grouped_data[h2_title].append(item)

    return grouped_data

In [6]:
grouped_amr_guide = group_by_h1(arm_guide_preprocessed)

In [7]:
grouped_amr_guide.keys()

dict_keys(['# 1. Daily Operating', '# 2. Trouble Shooting: Call Assignment', '# 3. Trouble Shooting: Navigation & Obstacle', '# 4. Trouble Shooting: Docking & H/W', '# 5. Trouble Shooting: Model Specific', '# 6. Operational Manual: Control Tools (조작 및 유지보수 매뉴얼)', '# 7. Appendix'])

In [4]:
# API 전처리 문서 불러오기
with open("../data/step1/HACS_preprocessed.json", "r", encoding="utf-8") as f:
    hacs_preprocessed = json.load(f)

with open("../data/step1/MCS_preprocessed.json", "r", encoding="utf-8") as f:
    mcs_preprocessed = json.load(f)

# 에러 처리 시나리오 전처리 문서 불러오기
with open("../data/step1/error_scenario_preprocessed.json", "r", encoding="utf-8") as f:
    error_scenario_preprocessed = json.load(f)

# 합성 데이터 생성

In [5]:
model = "google/gemini-3-pro-preview"
provider = "openrouter"
api_key = config["openrouter"]["api_key"]
params = {
    "max_tokens": 32768,
    "temperature": 0.0
}
llm_client = OpenAILLMClient(model=model, provider=provider, api_key=api_key, params=params)

[32m2026-01-12 10:55:01.785[0m | [1mINFO    [0m | [36mapp.common.llm_clients.openai_client[0m:[36m_create_clients[0m:[36m57[0m - [1mCreating OpenRouter client.[0m


## 그룹 별 텍스트북 생성

In [None]:
resuls = []
for k, v in grouped_amr_guide.items():
    context = [x['preprocessed'] for x in v]
    context = f"{k}\n" + "\n".join(context)
    user_prompt = f"""[입력 텍스트]
    {context}"""
    response = llm_client.generate(system_prompt=STEP_1_2_SYNTHETIC_TEXTBOOK_PROMPT, chat_messages=[{"role": "user", "content": user_prompt}])
    result = response.choices[0].message.content
    logger.info(result)

    json_result = {
        'id': k,
        'preprocessed_group': context,
        'textbook': result
    }
    resuls.append(json_result)

[32m2026-01-02 17:42:55.515[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1m# BMA 자동화 시스템 기술 백서: 운영 로직 및 전력 관리 아키텍처
## (Technical Whitepaper: Operational Logic and Power Management Architecture in BMA Systems)

**작성자:** 수석 아키텍트 (Chief Architect)
**문서 등급:** Confidential (Internal Use Only)
**적용 대상:** 현대자동차그룹 BMA 공정 0.3T/1T AMR 및 WEB ACS 운영 엔지니어

---

### 1. 서론 (Introduction)

본 챕터에서는 BMA(Battery Module Assembly) 공정의 핵심 물류 자원인 AMR(Autonomous Mobile Robot)과 이를 관제하는 상위 시스템 WEB ACS(Activity Control System) 간의 **논리적 상호작용(Logical Interaction)** 및 **전력 관리 메커니즘(Power Management Mechanism)**을 다룬다.

단순한 조작 절차를 넘어, 시스템이 미션(Mission)을 생성하고 할당하는 알고리즘적 배경과, 리튬이온 배터리 기반의 모빌리티 하드웨어가 전기적 안정성을 유지하기 위해 요구되는 물리적 제약 사항을 공학적 관점에서 심층 분석한다.

---

### 2. 미션 디스패칭 아키텍처와 Place Cycle 제어 논리
#### (Mission Dispatching Architecture & Place Cycle Control Logic)

WEB ACS는 공정 내 수백 개의 노드(Node)와 링크(Link)를 관리하며, 각 위치(Place)의 상태에 따라 AMR에게 작업을 할당한다. 이때 **'Place Cycle'** 설정은 시스템이 물리적 공간을 논리적 가용 자원으

In [None]:
import json

with open("amr_guide_textbook.json", "w", encoding="utf-8") as f:
    json.dump(resuls, f, ensure_ascii=False, indent=4)

In [5]:
with open("../data/amr_guide_textbook.json", "r", encoding="utf-8") as f:
    resuls = json.load(f)

## 단락 별 텍스트북 생성

In [None]:
from ml.data_processing.prompt import SYNTHETIC_SPECIFIC_PROMPT

In [9]:
for i, x in enumerate(arm_guide_preprocessed):
    context = x['preprocessed']
    user_prompt = f"""챕터 번호는 없어도 돼.

[입력 텍스트]
{context}"""
    logger.info(user_prompt)
    response = llm_client.generate(system_prompt=STEP_1_2_SYNTHETIC_TEXTBOOK_PROMPT, chat_messages=[{"role": "user", "content": user_prompt}])
    result = response.choices[0].message.content
    logger.info(result)
    json_result = {
        'id': i,
        'preprocessed_group': context,
        'textbook': result
    }
    resuls.append(json_result)
    logger.info(f"{len(resuls)} 텍스트북 생성 완료")

[32m2026-01-05 14:49:46.809[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1m챕터 번호는 없어도 돼.

[입력 텍스트]
WEB ACS의 Place Cycle 설정 시, AMR이 미션을 수신하고 동작하기 위해서는 Logistics 스위치와 Run 상태가 모두 ON으로 활성화되어야 한다.

WEB ACS의 Place Cycle 설정 절차는 오른쪽 톱니바퀴 아이콘 클릭 후 Place Cycle 메뉴를 선택하고, 검색창에서 제어할 Place 이름을 검색하여 Logistics 스위치와 Run 상태를 각각 ON과 RUNNING으로 전환하는 순서로 진행한다.

Place Cycle의 Logistics 기능은 시스템이 미션을 생성하고 AMR이 이를 수신하여 수행하도록 활성화하는 장치이며, 생산 가동 중에는 반드시 ON, 비가동 또는 종업 시에는 OFF 상태를 유지해야 한다.

Place Cycle의 RUN 상태는 가동 중인 생산 라인에서 필수적으로 활성화해야 하는 항목이며, 대부분의 운영 상황에서 상시 ON 상태를 유지한다.

생산 시작 전에는 Place Cycle의 Logistics와 Run 버튼을 ON으로 활성화하고, 생산 완료 후에는 해당 버튼들을 OFF로 변경하는 것을 운영 규칙으로 한다.[0m
[32m2026-01-05 14:50:21.377[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m10[0m - [1m# WEB ACS 기반의 Place Cycle 제어 논리와 미션 디스패칭(Mission Dispatching) 메커니즘

## 1. 서론: BMA 공정 내 노드(Node) 관리의 중요성

현대자동차그룹 BMA(Battery Module Assembly) 공정의 자동화 시스템에서 **WEB ACS(Automated Control System)**는 수십 대의 AMR(Auto

In [10]:
with open("amr_guide_textbook_merged.json", "w", encoding="utf-8") as f:
    json.dump(resuls, f, ensure_ascii=False, indent=4)

In [14]:
for i, x in enumerate(acs_alarm_preprocessed):
    context = x['preprocessed']
    user_prompt = f"""챕터 번호는 없어도 돼.

[입력 텍스트]
{context}"""
    logger.info(user_prompt)
    response = llm_client.generate(system_prompt=STEP_1_2_SYNTHETIC_TEXTBOOK_PROMPT, chat_messages=[{"role": "user", "content": user_prompt}])
    result = response.choices[0].message.content
    logger.info(result)

    json_result = {
        'id': i,
        'preprocessed_group': context,
        'textbook': result
    }
    result2.append(json_result)
    logger.info(f"{len(result2)} 텍스트북 생성 완료")

[32m2026-01-05 15:26:36.122[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m7[0m - [1m챕터 번호는 없어도 돼.

[입력 텍스트]
BMA 공정 AMR의 [10000] RFID 감지 안됨(RFID not detected) 경고는 주행 경로상의 RFID 태그 인식이 불가능한 상태를 의미한다. **시스템 자동 복구가 실패하여 수동 조작이 필요한 경우,** 운영자는 AMR의 정지 위치를 확인하고 RFID 리더기와 태그의 정렬 상태를 점검해야 한다.

BMA 공정 AMR의 [10001] 리프트 미션 실패(Lift mission fail) 경고는 팔레트 상하강 동작이 지정된 시간 내에 완료되지 않았음을 나타낸다. **시스템 자동 복구가 실패하여 수동 조작이 필요한 경우,** 운영자는 리프트 구동부의 기계적 간섭 여부를 확인한 후 HMI를 통해 리프트 위치를 초기화해야 한다.

BMA 공정 AMR의 [10002] 턴테이블 미션 실패(Turntable mission fail) 경고는 상단 턴테이블의 회전 동작이 정상 범위 내에서 종료되지 않았을 때 발생한다. **시스템 자동 복구가 실패하여 수동 조작이 필요한 경우,** 운영자는 턴테이블의 회전 반경 내 이물질을 제거하고 수동 모드에서 원점 복귀를 수행해야 한다.

BMA 공정 AMR의 [10003] 도킹 중 장애물 감지(Obstacle detected during docking) 경고는 설비 진입 및 도킹 과정에서 안전 센서가 물체를 감지하여 주행이 중단된 상태를 의미한다. **시스템 자동 복구가 실패하여 수동 조작이 필요한 경우,** 운영자는 도킹 경로상의 장애물을 제거한 후 주행 재개 버튼을 조작해야 한다.

BMA 공정 AMR의 [10004] 거치대 도킹 실패(Wing docking fail) 경고는 AMR이 윙(Wing) 타입 거치대와의 물리적 결합 위치 정밀도 확보에 실패했음을 나타낸다. **시스템 자동 복구가 

In [15]:
with open("acs_alarm_textbook_merged.json", "w", encoding="utf-8") as f:
    json.dump(result2, f, ensure_ascii=False, indent=4)

In [7]:
hacs_results = []
for i, x in enumerate(hacs_preprocessed):
    context = x['preprocessed']
    user_prompt = f"""[입력]
{context}"""
    logger.info(user_prompt)
    response = llm_client.generate(system_prompt=SYNTHETIC_SPECIFIC_PROMPT, chat_messages=[{"role": "user", "content": user_prompt}])
    result = response.choices[0].message.content
    logger.info(result)
    json_result = {
        'id': i,
        'preprocessed_group': context,
        'textbook': result
    }
    hacs_results.append(json_result)

with open("hacs_textbook_merged2.json", "w", encoding="utf-8") as f:
    json.dump(hacs_results, f, ensure_ascii=False, indent=4)

[32m2026-01-09 16:52:51.755[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m6[0m - [1m[입력]
# Hyundai Automated Control System WebBackend API 문서
## 1. User Controller. 사용자 인증 및 관리 API
### 1.1 사용자 등록
- **Endpoint**: `PUT /api/user/register`
- **설명**: 새로운 사용자 등록
- **Request Body**:
```json
{
  "userInfo": {
    "UserId": "string",
    "UserName": "string",
    "Password": "string",
    "Email": "string"
  }
}
```
- **Response**:
```json
{
  "Success": true/false
}
```
- **Status Code**: 200 OK, 500 Internal Server Error

[0m
[32m2026-01-09 16:53:13.239[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1m## [API] 사용자 등록 (User Registration)

**1. 개요 (Overview)**
`PUT /api/user/register` API는 현대 자동화 제어 시스템(HACS) WebBackend에 접근할 수 있는 신규 사용자 계정을 생성한다. 이 API는 시스템 관리자나 운영자가 BMA(Battery Manufacturing Automation) 공정 모니터링 및 제어 권한을 획득하기 위한 최초의 자격 증명(Credentials)을 등록하는 데 사용된다.

**2. 요청 명세 (Request Specification)**
- **Endpoint:** `/api/user/register`
-

In [8]:
mcs_results = []
for i, x in enumerate(mcs_preprocessed):
    context = x['preprocessed']
    user_prompt = f"""[입력]
{context}"""
    logger.info(user_prompt)
    response = llm_client.generate(system_prompt=SYNTHETIC_SPECIFIC_PROMPT, chat_messages=[{"role": "user", "content": user_prompt}])
    result = response.choices[0].message.content
    logger.info(result)
    json_result = {
        'id': i,
        'preprocessed_group': context,
        'textbook': result
    }
    mcs_results.append(json_result)

with open("mcs_textbook_merged2.json", "w", encoding="utf-8") as f:
    json.dump(mcs_results, f, ensure_ascii=False, indent=4)

[32m2026-01-09 17:26:13.188[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m6[0m - [1m[입력]
# MCS.Backend API 문서. MCS(Material Control System) Backend API 문서입니다. 이 API는 AMR(Autonomous Mobile Robot) 시스템을 제어하고 모니터링하기 위한 RESTful API를 제공합니다.
## PLCController. PLC(Programmable Logic Controller) 센서 값을 관리하는 API입니다.
### 1. PUT /api/PLC/sensorvalue

센서 값을 업데이트합니다.

**Endpoint:** `PUT /api/PLC/sensorvalue`

**Request Body:**
```json
{
  "SensorId": 1001,
  "Current": "25.5"
}
```

**Response:**
```json
{
  "Success": true,
  "Error": null
}
```

**인증:** JWT Token 필요

**설명:**
- 사용자 정보는 JWT Token에서 추출됩니다.
- 센서 값을 PLC에 기록하고 결과를 반환합니다.

---

[0m
[32m2026-01-09 17:26:33.689[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m9[0m - [1m## [API] PLC 센서 값 갱신 (Update PLC Sensor Value)

**1. 개요 (Overview)**
`PUT /api/PLC/sensorvalue` API는 MCS(Material Control System)가 관리하는 특정 PLC(Programmable Logic Controller) 센서의 현재 상태 값을 강제로 갱신하거나 보정하는 기능을 수행한다. 이 API는 물리적 센서 데이터의 소

### 에러 시나리오는 바로 conversation 포맷으로 변경

In [5]:
model = "google/gemini-3-pro-preview"
provider = "openrouter"
api_key = config["openrouter"]["api_key"]
params = {
    "max_tokens": 32768,
    "temperature": 0.7
}
llm_client = OpenAILLMClient(model=model, provider=provider, api_key=api_key, params=params)

[32m2026-01-12 11:13:31.627[0m | [1mINFO    [0m | [36mapp.common.llm_clients.openai_client[0m:[36m_create_clients[0m:[36m57[0m - [1mCreating OpenRouter client.[0m


In [6]:
# ==========================================
# [Track 3] 행동 & 절차 데이터 증강용 입력 데이터
# ==========================================

# 1. 공통 로직 (Base Logic) - Scenario 1
base_logics = {
    "Scenario_1": """
    1. 상태 확인: GET /api/object/status (에러 및 미션 확인)
    2. 작업 취소: PUT /api/command/cancel (기존 작업 중단)
    3. 물리적 후진(Undock): PUT /api/command/move (Target: Undock_Point)
    4. 미션 재할당: GET /api/Object/Mission -> PUT /api/Command/retrycommand
    5. 모드 복구: PUT /api/command/mode (Auto)
    """
}

# 2. 갈아 끼울 부품 (Cases) - Scenario 1 그룹
cases = [
    # Case 1: 도킹 마커 미인식
    {
        "code": "80018",
        "name": "Dock Not Find",
        "desc": "로봇이 도킹 스테이션 앞에서 마커를 인식하지 못해 진입 직전 멈춰 서 있는 상태.",
        "reasoning": "현재 위치에서는 센서가 마커를 놓쳤기 때문에 제자리 재시도는 무의미함. 반드시 Undock을 통해 로봇을 뒤로 물려 시야(FOV)를 확보하고 마커를 다시 스캔해야 함.",
        "group": "Scenario_1"
    },

    # Case 2: 진입 각도 초과
    {
        "code": "80015",
        "name": "Heading Exceeded",
        "desc": "도킹 시도 중 AMR의 진입 각도(Heading)가 허용 오차 범위를 초과하여 진입이 차단된 상태.",
        "reasoning": "이미 진입 각도가 틀어진 상태(Skew)이므로 단순 재시도는 실패함. Undock으로 도킹 구역을 완전히 빠져나와 차체 정렬을 초기화한 후 똑바로 재진입해야 함.",
        "group": "Scenario_1"
    },

    # Case 3: IR 센서 감지 실패
    {
        "code": "10015",
        "name": "IR Detection Fail",
        "desc": "AMR 하부의 IR 센서가 설비 스토퍼(Dog Plate)를 감지하지 못해 도킹 위치를 확정하지 못한 상태.",
        "reasoning": "설비 도그플레이트 파손이나 위치 틀어짐이 원인일 수 있음. Undock으로 빠져나와 센서 감지 범위를 리셋하고, 재진입 시 IR 센서가 정상적으로 신호를 잡는지 다시 확인해야 함.",
        "group": "Scenario_1"
    }
]

# ==========================================
# [Scenario 2] 도킹/주행 중 멈춤 (User 개입 필수)
# ==========================================

# 1. 공통 로직 (Base Logic) - Scenario 2
# 특징: 중간에 API 호출이 아닌 '작업자의 물리적 조치'가 반드시 포함됨
base_logics["Scenario_2"] = """
    1. 상태 확인: GET /api/object/status (에러 코드 및 상세 원인 파악)
    2. 작업 취소: PUT /api/command/cancel (ACS Job 강제 종료)
    3. [현장 조치]: 에러 유형에 따라 '수동 이동(Jog)', '장애물 제거', '물리적 리셋', '위치 초기화(Init)' 등 특정 조치 수행 (작업자 개입 필수)
    4. 미션 재할당: GET /api/Object/Mission (기존 미션 조회) -> PUT /api/Command/retrycommand
    5. 모드 복구: PUT /api/command/mode (Auto)
"""

# 2. 갈아 끼울 부품 (Cases) - Scenario 2 그룹
cases_scenario_2 = [
    # Group A: 위치 및 경로 이탈 (수동 이동/보정 필요)
    {
        "code": "80023",
        "name": "Pos Exceed",
        "desc": "도킹 프로세스 중 로봇의 위치가 허용 범위를 벗어나거나 하드웨어 이슈로 멈춘 상태.",
        "reasoning": "로봇이 스스로 경로에 복귀할 수 없음. 작업 취소(Cancel) 후, 충돌 여부를 확인하고 수동 조작(Jog)으로 안전한 위치까지 이동시킨 뒤 재시도해야 함.",
        "group": "Scenario_2"
    },
    {
        "code": "80024",
        "name": "Timeout",
        "desc": "도킹 또는 주행 중 정해진 시간 내에 목표에 도달하지 못해 타임아웃이 발생함.",
        "reasoning": "물리적인 간섭이나 경로상 문제일 가능성이 높음. Cancel 후 수동으로 로봇을 안전 구역으로 빼내고 재시도 명령을 내려야 함.",
        "group": "Scenario_2"
    },
    {
        "code": "11",
        "name": "ACS_ERROR_WHILE_NAVIGATING",
        "desc": "AMR이 지정된 경로(Path)를 벗어나 '주행 불가 영역(흰색 배경)'에 위치함.",
        "reasoning": "ACS 상에서 로봇이 길을 잃은 상태. JOG 모드를 활성화하여 작업자가 직접 로봇을 정상 경로(색상이 있는 라인) 위로 끌어다 놓아야만 다시 명령을 수행할 수 있음.",
        "group": "Scenario_2"
    },
    {
        "code": "80003",
        "name": "ERROR_CONFIDENCE_LOW",
        "desc": "AMR이 현재 자신의 위치를 잃어버림(Delocalization). 맵과 매칭되는 지형지물을 찾지 못함.",
        "reasoning": "단순 이동으로는 해결되지 않음. 주변 장애물을 치워 시야를 확보하거나, ACS 상에서 'Init Position' 기능을 사용해 수동으로 로봇의 현재 좌표와 헤딩을 지정해줘야 함.",
        "group": "Scenario_2"
    },

    # Group B: 하드웨어 센서 및 장애물 이슈 (제거/리셋 필요)
    {
        "code": "80014",
        "name": "Encoder Error",
        "desc": "주행 중 모터 회전수를 감지하는 엔코더 데이터가 업데이트되지 않음.",
        "reasoning": "일시적인 통신 오류일 수 있으나 하드웨어 행(Hang) 상태일 가능성이 큼. Job Cancel 후 기체 측면의 물리적 'RESET 버튼'을 눌러 하드웨어를 재부팅해야 함.",
        "group": "Scenario_2"
    },
    {
        "code": "20014",
        "name": "HOKUYO Lidar Obstacle",
        "desc": "라이다 센서가 반사판(작업조끼, 안전콘 등)을 장애물로 오인하여 정지함.",
        "reasoning": "소프트웨어적인 해제가 불가능함. 반사체를 2m 이상 이격시키고, 렌즈를 닦은 후 물리적 'RESET 버튼'을 눌러 OSSD 락을 풀어야 함.",
        "group": "Scenario_2"
    },
    {
        "code": "30020",
        "name": "OSSD LIDAR ERROR",
        "desc": "안전 비상 정지(OSSD) 신호가 발생하여 이동이 차단됨. 주로 센서 오염이나 근접 장애물 때문임.",
        "reasoning": "센서 표면의 이물질을 제거하거나 장애물을 치워야 함. 물리적 원인이 제거되지 않으면 시스템 리셋을 해도 다시 에러가 발생함.",
        "group": "Scenario_2"
    },
    {
        "code": "80025",
        "name": "PAUSED_BY_LIDAR_OBSTACLE",
        "desc": "라이다가 로봇의 '충돌 감지 영역(Safety Zone)' 내에서 장애물을 감지함.",
        "reasoning": "도킹 구역 내 이물질(실, 조각)이나 빛 반사가 원인임. 로봇을 Undock 하거나 장애물을 물리적으로 제거한 후 재시도(CallRetry) 해야 함.",
        "group": "Scenario_2"
    },
    {
        "code": "80026",
        "name": "PAUSE_BY_CAMERA_OBSTACLE",
        "desc": "3D 카메라가 상단부나 바닥의 장애물(케이블, 튀어나온 패널)을 감지함.",
        "reasoning": "라이다로는 안 보이는 높이의 장애물일 수 있음. 해당 장애물을 제거하거나 설비 위치를 조정해야 하며, 해결 전까지는 로봇이 움직이지 못함.",
        "group": "Scenario_2"
    }
]

# 기존 cases 리스트에 병합
cases.extend(cases_scenario_2)

# ==========================================
# [Scenario 3] 신호 인터페이스 에러 (PLC Reset 필수)
# ==========================================

# 1. 공통 로직 (Base Logic) - Scenario 3
# 특징: 물리적 이동(Undock)이 없고, 대신 'PLC 제어 명령(dock_reset -> dock_ok)'이 포함됨
base_logics["Scenario_3"] = """
    1. 상태 확인: GET /api/object/status (에러 코드 및 CurrentRackObject/DockOutArea 확인)
    2. 작업 취소: PUT /api/command/cancel (ACS Job 중단)
    3. PLC 리셋: PUT /api/plc/placecommand (Command: "dock_reset") -> 설비 신호 초기화
    4. 핸드쉐이크 재개: PUT /api/plc/placecommand (Command: "dock_ok") -> 작업 가능 상태 알림
    5. 미션 재할당: GET /api/Object/Mission (기존 미션 조회) -> PUT /api/Command/retrycommand
    6. 모드 복구: PUT /api/command/mode (Auto)
"""

# 2. 갈아 끼울 부품 (Cases) - Scenario 3 그룹
cases_scenario_3 = [
    # Case 1: 작업 요청 신호 꺼짐 (U/L_REQ OFF)
    {
        "code": "70008",
        "name": "U_REQ/L_REQ OFF ALARM",
        "desc": "로봇이 도착했는데 설비(PLC)가 작업을 요청하는 신호(REQ)를 갑자기 꺼버린 상태.",
        "reasoning": "설비 측의 일시적인 신호 끊김이거나 센서 오인식임. PLC의 상태를 'dock_reset'으로 초기화한 뒤, 다시 'dock_ok'를 보내 로봇이 준비되었음을 강제로 알려줘야 함.",
        "group": "Scenario_3"
    },

    # Case 2: 준비 완료 신호 꺼짐 (READY OFF)
    {
        "code": "70009",
        "name": "READY OFF ALARM",
        "desc": "작업 도중 설비의 준비(READY) 신호가 꺼져서 인터페이스가 중단됨.",
        "reasoning": "설비가 준비 태세를 풀었으므로 로봇도 대기하게 됨. 'dock_reset'으로 설비의 에러를 클리어하고, 'dock_ok' 신호로 재작업 의사를 전달하여 핸드쉐이킹을 복구해야 함.",
        "group": "Scenario_3"
    },

    # Case 3: 작업 시작 지연 (WORK START TIME OVER)
    {
        "code": "70010",
        "name": "WORK START TIME OVER",
        "desc": "도킹 완료 후 일정 시간 내에 실제 작업(이송/적재)이 시작되지 않음.",
        "reasoning": "서로 '네가 먼저 시작해'라며 눈치만 보는 교착 상태(Deadlock). PLC 로직을 'dock_reset'으로 리셋하여 타이머를 초기화하고 프로세스를 처음부터 다시 트리거해야 함.",
        "group": "Scenario_3"
    },

    # Case 4: T2 타임아웃 (내부 동작 불가)
    {
        "code": "70011",
        "name": "T2 TIME OVER",
        "desc": "물건을 받기 전(Loading 전) 설비 내부의 선행 동작이 시간 내에 완료되지 않음.",
        "reasoning": "설비 내부 실린더나 센서가 늦게 반응한 경우임. 'dock_reset'을 통해 설비에게 '다시 준비하라'는 명령을 내리고, 'dock_ok'로 로봇은 대기 상태임을 재확인시켜야 함.",
        "group": "Scenario_3"
    }
]

# 기존 cases 리스트에 병합
cases.extend(cases_scenario_3)

# ==========================================
# [Scenario 4] 적재/이재 작업 중단 (Payload & Data Sync)
# ==========================================

# 1. 공통 로직 (Base Logic) - Scenario 4
# 특징: '적재 상태(LoadedState)' 확인과 '데이터 수정(Payload Update)'이 핵심
base_logics["Scenario_4"] = """
    1. 상태 확인: GET /api/object/status (LoadedState: 0=Empty, 3=Full 확인)
    2. 작업 취소: PUT /api/command/cancel (ACS Job 중단)
    3. 적재물 확인: 실제 로봇 상단에 화물(SKID/Box)이 있는지 육안 확인 (필수)
    4. 설비 리셋: PUT /api/plc/placecommand (Command: "dock_reset" -> "dock_ok")
    5. 데이터 동기화: PUT /api/command/payload (실제 화물 상태에 맞춰 시스템 값 덮어쓰기)
    6. 작업 완료/재개: 상황에 따라 PUT /api/command/jobcomplete (강제 완료) 또는 Auto Mode 복귀
"""

# 2. 갈아 끼울 부품 (Cases) - Scenario 4 그룹
cases_scenario_4 = [
    # Case 1: T4 타임아웃 (이재 중 멈춤)
    {
        "code": "70012",
        "name": "T4 TIME OVER",
        "desc": "물건을 주고받는(이재) 도중 센서 중복 감지나 끼임으로 인해 시간 초과 발생.",
        "reasoning": "물리적 이재는 완료되었으나 신호가 꼬인 경우가 많음. 실제 적재가 완료되었다면 'Payload Save'로 정보를 갱신하고, 설비 신호를 리셋한 뒤 작업을 마쳐야 함.",
        "group": "Scenario_4"
    },

    # Case 2: 단순 끼임 (Resume 가능)
    {
        "code": "Simple Jam",
        "name": "Conveyor Jam / Delay",
        "desc": "컨베이어 이송 중 박스가 살짝 걸리거나 일시적 지연으로 알람 발생.",
        "reasoning": "심각한 데이터 불일치가 아님. 0.3T AMR HMI에서 'RESUME' 버튼을 눌러 작업을 재개(Retry)하는 것만으로 해결 가능함.",
        "group": "Scenario_4"
    },

    # Case 3: Port Full (공급 불가)
    {
        "code": "Port Full",
        "name": "Supply Port Full",
        "desc": "물건을 주러 갔는데(Supply), 설비 포트에 이미 다른 물건이 차 있어서 작업을 못 함.",
        "reasoning": "물리적으로 불가능한 작업임. Payload를 'Full(적재 상태)'로 유지한 채 'Job Complete'로 강제 완료 처리하고, 로봇을 창고로 회차시켜야 함.",
        "group": "Scenario_4"
    },

    # Case 4: 1단만 공급 (부분 완료)
    {
        "code": "Partial Supply",
        "name": "Supply Incomplete (1/2)",
        "desc": "2단 공급 미션이었으나 1단만 주고 멈춤. 남은 1개는 로봇 위에 있음.",
        "reasoning": "잔여 화물 정보를 업데이트해야 함. Payload를 '1EA-Full, 1EA-None'으로 수정하고 'Job Complete' 처리하여 재고 정보를 맞춰야 함.",
        "group": "Scenario_4"
    },

    # Case 5: 빈 박스 없음 (회수 불가)
    {
        "code": "No Empty Box",
        "name": "Return Port Empty",
        "desc": "빈 박스를 가지러 갔는데(Return), 설비 포트가 비어있음.",
        "reasoning": "가져올 물건이 없음. Payload를 'Clear(없음)'로 설정하고 'Job Complete' 처리하여 빈차로 복귀시켜야 함.",
        "group": "Scenario_4"
    },

    # Case 6: Work 없음 (작업물 미준비)
    {
        "code": "No Work",
        "name": "Work Not Ready / Start Delay",
        "desc": "도킹했으나 설비가 물건을 내어주지 않음 (지게차 간섭 등으로 실제 물건 부재).",
        "reasoning": "기다려도 소용없음. 설비 측 'Work Reset' 후 로봇 상태를 'After Docking'으로 변경하여 미션을 취소하거나 재시도 대기 상태로 만들어야 함.",
        "group": "Scenario_4"
    },

    # Case 7: 센서 미감지/잔류 (Phantom Sensor)
    {
        "code": "Sensor Error",
        "name": "On-board Sensor Fail",
        "desc": "물건이 있는데 센서가 안 켜지거나(미감지), 물건이 없는데 센서가 켜짐(잔류).",
        "reasoning": "하드웨어 점검 필요. 'Payload Clear' 후 로봇을 점검 구역으로 빼내야 하며, 절대 그대로 자동 운행을 시키면 안 됨 (유령 재고 발생 위험).",
        "group": "Scenario_4"
    }
]

# 기존 cases 리스트에 병합
cases.extend(cases_scenario_4)

In [None]:
user_question_templates = {
    # Type A: 표준 절차서 (Standard SOP) -> "문서 줘", "절차 알려줘"
    "STANDARD_SOP": [
        "BMA 공정에서 발생하는 [{code}] {name} 에러의 표준 처리 절차(SOP)를 서술하시오.", # 정석
        "[{code}] 에러 발생 시 준수해야 할 공식 복구 매뉴얼을 작성해 주세요.", # 정중
        "[{code}] {name} 에러에 대한 표준 운영 절차서는 무엇입니까?", # 의문형
    ],

    # Type B: 현장 매뉴얼 (Field Manual) -> "급해", "어떻게 해?", "도와줘"
    "FIELD_MANUAL": [
        "지금 현장에 [{code}] 알람이 떴습니다. 즉시 조치할 수 있는 가이드를 주세요.", # 다급함
        "[{code}] {name} 에러 발생. 현장 대응 방법 요약 좀.", # 보고체/구어체
        "[{code}] 트러블슈팅 가이드." # 초단답형 (검색 키워드 식)
    ],

    # Type C: 사고 리포트 (Incident Report) -> "사례 알려줘", "리포트 써줘"
    "INCIDENT_REPORT": [
        "[{code}] 에러와 관련된 과거 해결 사례를 리포트 형식으로 작성하시오.", # 정석
        "[{code}] {name} 발생 시 올바른 조치 과정을 담은 사고 분석 보고서를 써주세요.", # 요청
        "[{code}] 에러 대응 이력 리포트 생성." # 단답형
    ]
}

# SOP 데이터 증강을 위한 Think Template (스타일별 맞춤)
sop_think_templates = {
    # Type A: 표준 절차서 (Standard SOP)
    "STANDARD_SOP": """<think>
사용자가 '{code} {name}' 에러 상황에 대한 공식적인 표준 복구 절차(SOP)를 요청했다.
이 에러는 '{reasoning}'이(가) 주 원인이므로, 단순 재시도가 아닌 '{base_logic_summary}' 로직을 적용해야 한다.
이에 따라 BMA 공정의 안전 규정과 표준 프로토콜을 준수하는 기술 문서를 작성하여 제시한다.
</think>""",

    # Type B: 현장 매뉴얼 (Field Manual)
    "FIELD_MANUAL": """<think>
사용자가 '{code}' 에러 발생 직후의 현장 대응 가이드를 긴급히 요청했다.
현재 상황은 '{desc}' 상태이므로, 신속한 복구를 위해 작업자가 즉시 수행해야 할 행동 지침(Action Item) 위주로 답변을 구성해야 한다.
특히 '{reasoning}' 이슈를 해결하기 위한 구체적인 조작법과 주의사항을 강조하여 설명한다.
</think>""",

    # Type C: 사고 리포트 (Incident Report)
    "INCIDENT_REPORT": """<think>
사용자가 '{code}' 에러의 해결 사례에 대한 분석 리포트를 요청했다.
과거 사례를 재구성하여, 문제 발생 원인('{desc}')과 당시 적용된 해결 로직('{base_logic_summary}')의 인과관계를 명확히 서술해야 한다.
이를 통해 해당 절차를 준수해야 하는 공학적/경험적 근거를 제시한다.
</think>""",

    # Type D: 기술 Q&A (Tech Q&A)
    "TECH_QNA": """<think>
사용자가 '{code}' 에러 처리 과정의 기술적 원리와 당위성에 대해 질문했다.
사용자는 특정 단계(예: Undock, Payload Update 등)가 왜 필요한지 궁금해하고 있다.
따라서 '{reasoning}'이라는 시스템적 배경을 근거로 들어, 해당 로직이 필수적인 이유를 논리적으로 설명한다.
</think>"""
}

In [20]:
from ml.data_processing.prompt import SOP_AUGMENTATION_PROMPTS
from ml.utils.util import parse_json

In [26]:
generated_dataset = []
for case in cases:
    group_name = case['group']
    base_logic_text = base_logics[group_name]
    formatted_input_info = f"""
    [Target Situation]
    - Error Code: {case['code']}
    - Name: {case['name']}
    - Symptom: {case['desc']}
    - Context/Reasoning: {case['reasoning']}

    [Recovery Logic]{base_logic_text}"""

    for style_name, prompt_template in SOP_AUGMENTATION_PROMPTS.items():
        if style_name != "TECH_QNA":
            user_questions = user_question_templates[style_name]
            for selected_template in user_questions:
                # (C) 데이터 파싱 및 조립
                final_user_content = ""
                final_assistant_content = ""

                response = llm_client.generate(system_prompt=prompt_template, chat_messages=[{"role": "user", "content": formatted_input_info}])
                raw_content = response.choices[0].message.content
                final_user_content = selected_template.format(
                    code=case['code'],
                    name=case['name']
                )

                # Think Block + 생성된 본문
                think_block = sop_think_templates[style_name].format(
                    code=case['code'],
                    name=case['name'],
                    desc=case['desc'],
                    reasoning=case['reasoning'],
                    base_logic_summary=base_logic_text.strip()
                )
                final_assistant_content = think_block + "\n\n" + raw_content

                # (D) 최종 저장 (동일)
                data_entry = {
                    "conversation": [
                        { "role": "system", "content": "당신은 BMA 공정 자동화 시스템 전문 엔지니어입니다." },
                        { "role": "user", "content": final_user_content },
                        { "role": "assistant", "content": final_assistant_content }
                    ]
                }
                logger.info(data_entry)
                generated_dataset.append(data_entry)

        else:
            for _ in range(3):
                response = llm_client.generate(system_prompt=prompt_template, chat_messages=[{"role": "user", "content": formatted_input_info}])
                raw_content = response.choices[0].message.content
                qa_data = parse_json(raw_content)
                final_user_content = qa_data["question"]
                think_block = sop_think_templates[style_name].format(
                    code=case['code'],
                    name=case['name'],
                    desc=case['desc'],
                    reasoning=case['reasoning'],
                    base_logic_summary=base_logic_text.strip()
                )
                final_assistant_content = think_block + "\n\n" + qa_data["answer"]
                data_entry = {
                    "conversation": [
                        { "role": "system", "content": "당신은 BMA 공정 자동화 시스템 전문 엔지니어입니다." },
                        { "role": "user", "content": final_user_content },
                        { "role": "assistant", "content": final_assistant_content }
                    ]
                }
                logger.info(data_entry)
                generated_dataset.append(data_entry)

[32m2026-01-12 11:44:13.368[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m47[0m - [1m{'conversation': [{'role': 'system', 'content': '당신은 BMA 공정 자동화 시스템 전문 엔지니어입니다.'}, {'role': 'user', 'content': 'BMA 공정에서 발생하는 [80018] Dock Not Find 에러의 표준 처리 절차(SOP)를 서술하시오.'}, {'role': 'assistant', 'content': "<think>\n사용자가 '80018 Dock Not Find' 에러 상황에 대한 공식적인 표준 복구 절차(SOP)를 요청했다.\n이 에러는 '현재 위치에서는 센서가 마커를 놓쳤기 때문에 제자리 재시도는 무의미함. 반드시 Undock을 통해 로봇을 뒤로 물려 시야(FOV)를 확보하고 마커를 다시 스캔해야 함.'이(가) 주 원인이므로, 단순 재시도가 아닌 '1. 상태 확인: GET /api/object/status (에러 및 미션 확인)\n    2. 작업 취소: PUT /api/command/cancel (기존 작업 중단)\n    3. 물리적 후진(Undock): PUT /api/command/move (Target: Undock_Point)\n    4. 미션 재할당: GET /api/Object/Mission -> PUT /api/Command/retrycommand\n    5. 모드 복구: PUT /api/command/mode (Auto)' 로직을 적용해야 한다.\n이에 따라 BMA 공정의 안전 규정과 표준 프로토콜을 준수하는 기술 문서를 작성하여 제시한다.\n</think>\n\n# 표준 절차: [80018] Dock Not Find 복구\n\n## 1. 개요\n본 문서는 BMA 공정 내 로봇이 도킹 스테이션 진입 시도 중 마커를 인식하지 못하여 발생하는 **[80018] Dock Not

In [27]:
len(generated_dataset)

with open("../data/error_scenario_qwen_conversation_format.json", "w", encoding="utf-8") as f:
    json.dump(generated_dataset, f, ensure_ascii=False, indent=4)