# 사운드바 모델 매핑 Agent (Databricks용)

로그 CSV의 관측 기기명(NAME1~4, NAME_BT)을 표준 canonical 사운드바 모델명으로 매핑합니다.
**모든 로직이 이 노트북 안에 포함되어 있어, Databricks에 노트북만 올려서 실행할 수 있습니다.**

1. 경로 설정
2. 공통 모듈 코드 (정규화, DB, 검색, 검증, Agent)
3. 데이터 로드 및 예측
4. 결과 저장


## 1. 경로 설정

Databricks에서는 DBFS 경로(예: `/dbfs/FileStore/...`) 또는 위젯으로 경로를 지정할 수 있습니다.


In [None]:
from pathlib import Path

# 아래 경로를 환경에 맞게 수정하세요. Databricks: /dbfs/FileStore/... 등
INPUT_CSV = Path("/path/to/HDMI_BT_Log.csv")
SOUNDBAR_DB = Path("/path/to/soundbar_list.py")
OUTPUT_PRED = Path("/path/to/output/pred.csv")

OUTPUT_PRED.parent.mkdir(parents=True, exist_ok=True)
print("INPUT_CSV:", INPUT_CSV, "존재:", INPUT_CSV.exists())
print("SOUNDBAR_DB:", SOUNDBAR_DB, "존재:", SOUNDBAR_DB.exists())


## 2. 공통 모듈 코드 (정규화, 사운드바 DB, 로그 질의, 브랜드 추출, 비사운드바 탐지, 임베딩 검색, 검증, Agent)

아래 셀들을 순서대로 실행하면 됩니다.


### 정규화 (normalize)


In [None]:
"""
텍스트 정규화 유틸리티 모듈입니다.

본 프로젝트는 로그 문자열과 DB 모델 문자열 간의 매칭을 수행하므로,
대소문자/특수문자/공백 등 표면적인 차이를 최소화하는 정규화가 중요합니다.
"""

from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Iterable, Optional


_WS_RE = re.compile(r"\s+")
_BRACKET_RE = re.compile(r"\([^)]*\)")
_KEEP_CHARS_RE = re.compile(r"[^A-Z0-9\.\+/\- ]+")


@dataclass(frozen=True)
class NormalizationConfig:
    """정규화 동작을 제어하는 설정값입니다."""

    drop_bracketed: bool = True
    keep_dot_plus_slash_dash: bool = True
    uppercase: bool = True
    collapse_whitespace: bool = True
    remove_hyphen: bool = True  # HT-S40R -> HTS40R 매칭 개선


def normalize_text(text: Optional[str], config: Optional[NormalizationConfig] = None) -> str:
    """
    주어진 문자열을 매칭 친화적으로 정규화합니다.

    - 대문자 변환
    - 괄호(...) 제거(옵션)
    - 영숫자/일부 기호(. + / -) 이외 제거
    - 공백 정리

    Args:
        text: 입력 텍스트 (None 가능).
        config: 정규화 설정. None이면 기본값 사용.

    Returns:
        정규화된 문자열(빈 문자열 가능).
    """
    if not text:
        return ""
    cfg = config or NormalizationConfig()

    s = text.strip()
    if cfg.uppercase:
        s = s.upper()
    if cfg.drop_bracketed:
        # 예: "LG SQC2(CC)" -> "LG SQC2"
        s = _BRACKET_RE.sub(" ", s)

    if cfg.keep_dot_plus_slash_dash:
        s = _KEEP_CHARS_RE.sub(" ", s)
    else:
        s = re.sub(r"[^A-Z0-9 ]+", " ", s)

    if cfg.collapse_whitespace:
        s = _WS_RE.sub(" ", s).strip()
    if cfg.remove_hyphen:
        s = s.replace("-", "").strip()
        if cfg.collapse_whitespace:
            s = _WS_RE.sub(" ", s).strip()
    return s


def normalize_brand(brand: Optional[str]) -> str:
    """
    브랜드 문자열을 정규화합니다.

    Args:
        brand: 브랜드(제조사) 문자열.

    Returns:
        정규화된 브랜드 문자열(대문자, 공백 정리).
    """
    return normalize_text(brand)


def unique_preserve_order(items: Iterable[str]) -> list[str]:
    """
    입력 시퀀스에서 중복을 제거하되, 최초 등장 순서를 유지합니다.

    Args:
        items: 문자열 iterable.

    Returns:
        중복 제거된 리스트.
    """
    seen: set[str] = set()
    out: list[str] = []
    for x in items:
        if x and x not in seen:
            seen.add(x)
            out.append(x)
    return out



### 사운드바 DB (soundbar_db)


In [None]:
"""
사운드바 표준 모델 DB 로드/파싱/정규화 모듈입니다.

현재 워크스페이스의 `soundbar_list.py`는 `data = [...]` 형태로 모델 목록을 보유합니다.
각 항목은 아래 구조(공백 구분 토큰)로 가정합니다.

- 첫 토큰: Brand
- 마지막 3토큰: Grade, Year, supportDolbyAtmos (혹은 'null')
- 중간 토큰: Model (공백이 있을 수 있으므로 join)

파일 입출력은 오류 처리를 포함합니다.
"""

from __future__ import annotations

import ast
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional



@dataclass(frozen=True)
class SoundbarRecord:
    """사운드바 DB의 단일 레코드입니다."""

    brand: str
    model: str
    grade: Optional[str]
    year: Optional[int]
    support_dolby_atmos: Optional[bool]
    canonical: str


def _parse_optional_int(token: str) -> Optional[int]:
    """문자열 토큰을 int로 변환합니다. 실패 시 None을 반환합니다."""
    token = token.strip()
    if not token or token.lower() == "null":
        return None
    try:
        return int(token)
    except ValueError:
        return None


def _parse_optional_bool(token: str) -> Optional[bool]:
    """문자열 토큰을 bool로 변환합니다. 실패 시 None을 반환합니다."""
    token = token.strip()
    if not token or token.lower() == "null":
        return None
    if token.upper() in {"O", "Y", "YES", "TRUE", "T"}:
        return True
    if token.upper() in {"X", "N", "NO", "FALSE", "F"}:
        return False
    return None


def parse_soundbar_item(item: str) -> Optional[SoundbarRecord]:
    """
    `soundbar_list.py`의 data 항목(문자열 1개)을 파싱합니다.

    Args:
        item: 예) "LG S90TY High 2024 O"

    Returns:
        SoundbarRecord 또는 파싱 실패 시 None.
    """
    if not item or not item.strip():
        return None
    tokens = item.split()
    if len(tokens) < 2:
        return None

    brand_raw = tokens[0]
    brand = normalize_brand(brand_raw)

    grade: Optional[str] = None
    year: Optional[int] = None
    atmos: Optional[bool] = None
    model_tokens: list[str] = tokens[1:]

    # 최소 5토큰 이상이면 마지막 3토큰을 메타로 간주
    if len(tokens) >= 5:
        grade_token = tokens[-3]
        year_token = tokens[-2]
        atmos_token = tokens[-1]
        grade = None if grade_token.lower() == "null" else normalize_text(grade_token)
        year = _parse_optional_int(year_token)
        atmos = _parse_optional_bool(atmos_token)
        model_tokens = tokens[1:-3]

    model = normalize_text(" ".join(model_tokens))
    if not brand or not model:
        return None
    canonical = f"{brand} {model}".strip()

    return SoundbarRecord(
        brand=brand,
        model=model,
        grade=grade,
        year=year,
        support_dolby_atmos=atmos,
        canonical=canonical,
    )


def load_soundbar_db_from_py(soundbar_list_py: Path) -> list[SoundbarRecord]:
    """
    `soundbar_list.py`에서 soundbar DB를 로드합니다.

    Args:
        soundbar_list_py: `soundbar_list.py` 파일 경로.

    Returns:
        SoundbarRecord 리스트(중복 canonical은 최초 1개 유지).

    Raises:
        FileNotFoundError: 파일이 없을 때.
        OSError: 파일 읽기 실패.
        ValueError: `data = [...]` 파싱 실패.
    """
    try:
        text = soundbar_list_py.read_text(encoding="utf-8")
    except FileNotFoundError:
        raise
    except OSError as e:
        raise OSError(f"사운드바 DB 파일 읽기 실패: {soundbar_list_py}") from e

    # soundbar_list.py에서 `data = [...]`를 안전하게 추출하기 위해 ast 사용
    try:
        module = ast.parse(text)
    except SyntaxError as e:
        raise ValueError(f"soundbar_list.py 파싱 실패: {soundbar_list_py}") from e

    data_value = None
    for node in module.body:
        if isinstance(node, ast.Assign):
            for target in node.targets:
                if isinstance(target, ast.Name) and target.id == "data":
                    data_value = node.value
                    break
        if data_value is not None:
            break

    if data_value is None:
        raise ValueError("soundbar_list.py에서 `data` 변수를 찾지 못했습니다.")

    try:
        data_list = ast.literal_eval(data_value)
    except Exception as e:  # noqa: BLE001 - 안전한 오류 래핑
        raise ValueError("`data = [...]` 값을 literal_eval로 해석하지 못했습니다.") from e

    if not isinstance(data_list, list):
        raise ValueError("`data`는 list 타입이어야 합니다.")

    records: list[SoundbarRecord] = []
    canonicals: list[str] = []
    tmp: dict[str, SoundbarRecord] = {}
    for item in data_list:
        if not isinstance(item, str):
            continue
        rec = parse_soundbar_item(item)
        if rec is None:
            continue
        canonicals.append(rec.canonical)
        # 중복 canonical은 최초 등장 유지
        if rec.canonical not in tmp:
            tmp[rec.canonical] = rec

    ordered = unique_preserve_order(canonicals)
    for c in ordered:
        r = tmp.get(c)
        if r:
            records.append(r)
    return records


def get_brand_set(records: Iterable[SoundbarRecord]) -> set[str]:
    """
    레코드 리스트로부터 브랜드 집합을 구합니다.

    Args:
        records: SoundbarRecord iterable.

    Returns:
        브랜드 집합(정규화된 문자열).
    """
    return {r.brand for r in records if r.brand}



### 로그 질의 (log_features)


In [None]:
"""
로그 행에서 사운드바 후보 문자열(query)을 구성하고, placeholder/노이즈를 필터링합니다.

`raw_data/HDMI_BT_Log.csv`는 여러 입력 소스(NAME1~4)와 브랜드(BRAND1~4),
그리고 BT 장치명(NAME_BT)을 포함합니다. 여기서 실제 사운드바 모델 매칭에
의미있는 후보 문자열을 추출하는 것이 목적입니다.
"""

from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Any, Iterable, Optional

import math


_HDMI_RE = re.compile(r"^HDMI(\s*\d+)?$")
_PLACEHOLDER_EXACT = {
    "AV",
    "AUX",
    "USB",
    "OPTICAL",
    "DIGITAL",
    "DIGITAL IN",
    "TV",
    "TV IN",
    "IN",
    "HDMI",
}

_BT_HEADPHONE_HINT_RE = re.compile(
    r"\b(AIRPODS|EARBUD|EARBUDS|HEADPHONE|HEADPHONES|TUNE|TOUR|LIVE|WH-|WF-)\b"
)


@dataclass(frozen=True)
class LogQuery:
    """
    로그에서 추출한 후보 검색 질의 문자열입니다.

    Attributes:
        query: 정규화된 검색 질의
        raw: 원본 문자열(가능하면 유지)
        source: 어떤 필드에서 왔는지(예: NAME2, NAME_BT 등)
        weight: 이후 검색/검증에서 가중치로 사용할 값(기본 1.0)
    """

    query: str
    raw: str
    source: str
    weight: float = 1.0


def is_placeholder(text: str) -> bool:
    """
    HDMI/AV 같은 placeholder 입력 소스명을 탐지합니다.

    주의: \"APPLE TV\" 같은 정상 장치명은 placeholder로 처리하지 않도록
    짧은 토큰/정규식 기반으로만 필터합니다.

    Args:
        text: 정규화된 문자열(권장: `normalize_text` 결과)

    Returns:
        placeholder로 판단되면 True
    """
    if not text:
        return True
    s = text.strip().upper()
    if _HDMI_RE.match(s):
        return True
    if s in _PLACEHOLDER_EXACT:
        return True
    # 매우 짧고 의미없는 입력은 제외
    if len(s) <= 2:
        return True
    return False


def bt_looks_like_non_soundbar(bt_name: str) -> bool:
    """
    BT 장치명이 헤드폰/이어폰 등 사운드바가 아닐 가능성이 높은지 휴리스틱으로 판단합니다.

    Args:
        bt_name: 정규화된 BT 이름

    Returns:
        사운드바가 아닐 가능성이 높으면 True
    """
    if not bt_name:
        return False
    s = bt_name.upper()
    return _BT_HEADPHONE_HINT_RE.search(s) is not None


def build_log_queries_from_row(
    row: dict[str, Any],
    soundbar_brand_set: Optional[set[str]] = None,
    include_bt: bool = True,
) -> list[LogQuery]:
    """
    단일 로그 row에서 후보 질의 문자열 리스트를 생성합니다.

    생성 규칙(요약):
    - NAME1~4에서 placeholder 제외 후 후보 생성
    - BRANDi가 있으면 \"BRANDi NAMEi\" 형태 후보를 추가(브랜드 강화)
    - NAME_BT는 노이즈(헤드폰/이어폰) 가능성이 높아 기본 weight를 낮춰 추가(옵션)

    Args:
        row: pandas row를 `to_dict()`한 형태를 권장.
        soundbar_brand_set: 사운드바 DB 기반 브랜드 집합(정규화된 값).
            제공되면 브랜드가 DB에 있는 경우 가중치/우선순위를 강화합니다.
        include_bt: NAME_BT를 후보로 포함할지 여부.

    Returns:
        LogQuery 리스트(중복 제거, 순서 유지).
    """
    candidates: list[LogQuery] = []

    def _is_missing(x: Any) -> bool:
        """
        pandas NaN 등 결측값을 탐지합니다.

        주의:
        - CSV/전처리 과정에 따라 결측이 float NaN이 아니라 문자열 "nan"/"NaN"으로 들어올 수 있습니다.
          이 경우 query가 "NAN CHROMECAST"처럼 오염되어 매칭 오탐을 유발할 수 있어,
          문자열 결측도 결측으로 처리합니다.
        """
        if x is None:
            return True
        if isinstance(x, float) and math.isnan(x):
            return True
        if isinstance(x, str):
            s = x.strip().lower()
            if s in {"nan", "none", "null", ""}:
                return True
        return False

    def _add(query_raw: str, source: str, weight: float) -> None:
        q = normalize_text(query_raw)
        if not q or is_placeholder(q):
            return
        candidates.append(LogQuery(query=q, raw=str(query_raw), source=source, weight=weight))

    # NAME1~4 / BRAND1~4
    for i in range(1, 5):
        name_key = f"NAME{i}"
        brand_key = f"BRAND{i}"
        name_raw = row.get(name_key, "")
        brand_raw = row.get(brand_key, "")

        name = "" if _is_missing(name_raw) else normalize_text(str(name_raw))
        brand = "" if _is_missing(brand_raw) else normalize_brand(str(brand_raw))

        if name and not is_placeholder(name):
            _add(str(name_raw), name_key, weight=1.0)

            if brand:
                w = 1.1
                if soundbar_brand_set and brand in soundbar_brand_set:
                    w = 1.25
                # 모델명에 이미 브랜드가 있으면 모델명만, 없으면 브랜드+모델명
                if name.upper().startswith(brand.upper()):
                    brand_name_raw = str(name_raw).strip()
                else:
                    brand_name_raw = f"{str(brand_raw).strip()} {str(name_raw).strip()}".strip()
                _add(brand_name_raw, f"{brand_key}+{name_key}", weight=w)

    # NAME_BT (옵션)
    if include_bt:
        bt_raw = row.get("NAME_BT", "")
        bt = "" if _is_missing(bt_raw) else normalize_text(str(bt_raw))
        if bt and not is_placeholder(bt):
            weight = 0.63  # 0.6 → 0.63 (5% 상향, below_threshold 완화)
            if bt_looks_like_non_soundbar(bt):
                weight = 0.35
            _add(str(bt_raw), "NAME_BT", weight=weight)

    # 중복 제거(정규화 query 기준)
    ordered_queries = unique_preserve_order([c.query for c in candidates])
    first_by_query: dict[str, LogQuery] = {}
    for c in candidates:
        if c.query not in first_by_query:
            first_by_query[c.query] = c

    return [first_by_query[q] for q in ordered_queries if q in first_by_query]


@dataclass(frozen=True)
class TypedQuery:
    """
    소스별(타입별) 검색 질의입니다. BT/HDMI 각각 독립 예측을 위해 사용합니다.

    Attributes:
        type_: "BT" 또는 "HDMI"
        source: 필드명(NAME_BT, NAME1, NAME2, NAME3, NAME4)
        query: 정규화된 검색 질의
        raw: 원본 문자열(primary_query 출력용)
        weight: 가중치
    """

    type_: str
    source: str
    query: str
    raw: str
    weight: float = 1.0


def build_typed_queries_from_row(
    row: dict[str, Any],
    soundbar_brand_set: Optional[set[str]] = None,
    include_bt: bool = True,
) -> list[TypedQuery]:
    """
    단일 로그 row에서 소스별(BT/HDMI) 질의 리스트를 생성합니다.

    - BT: NAME_BT가 유효하면 1개
    - HDMI: (NAME1,BRAND1)~(NAME4,BRAND4) 각 쌍이 유효하면 1개씩, 최대 4개

    각 소스별로 독립 예측을 수행할 때 사용합니다.

    Args:
        row: pandas row를 dict로 변환한 값
        soundbar_brand_set: 사운드바 DB 브랜드 집합
        include_bt: NAME_BT 포함 여부

    Returns:
        TypedQuery 리스트(소스별 1개, 중복 없음)
    """
    result: list[TypedQuery] = []

    def _is_missing(x: Any) -> bool:
        if x is None:
            return True
        if isinstance(x, float) and math.isnan(x):
            return True
        if isinstance(x, str):
            s = x.strip().lower()
            if s in {"nan", "none", "null", ""}:
                return True
        return False

    # BT: NAME_BT
    if include_bt:
        bt_raw = row.get("NAME_BT", "")
        if not _is_missing(bt_raw):
            bt = normalize_text(str(bt_raw))
            if bt and not is_placeholder(bt):
                w = 0.6 if bt_looks_like_non_soundbar(bt) else 0.6
                result.append(
                    TypedQuery(type_="BT", source="NAME_BT", query=bt, raw=str(bt_raw).strip(), weight=w)
                )

    # HDMI: (NAME1,BRAND1) ~ (NAME4,BRAND4)
    for i in range(1, 5):
        name_key = f"NAME{i}"
        brand_key = f"BRAND{i}"
        name_raw = row.get(name_key, "")
        brand_raw = row.get(brand_key, "")

        name = "" if _is_missing(name_raw) else normalize_text(str(name_raw))
        brand = "" if _is_missing(brand_raw) else normalize_brand(str(brand_raw))

        if not name or is_placeholder(name):
            continue

        # primary_query 출력: 모델명에 브랜드가 있으면 모델명만, 없으면 브랜드+모델명
        if brand:
            raw = (
                str(name_raw).strip()
                if name.upper().startswith(brand.upper())
                else f"{str(brand_raw).strip()} {str(name_raw).strip()}".strip()
            )
        else:
            raw = str(name_raw).strip()
        if brand and not (name.upper().startswith(brand.upper())):
            query = normalize_text(f"{str(brand_raw).strip()} {str(name_raw).strip()}")
            w = 1.1
            if soundbar_brand_set and brand in soundbar_brand_set:
                w = 1.25
        else:
            query = name
            w = 1.0

        result.append(TypedQuery(type_="HDMI", source=name_key, query=query, raw=raw, weight=w))

    return result


def choose_primary_query(queries: Iterable[LogQuery]) -> Optional[LogQuery]:
    """
    여러 후보 LogQuery 중 1개를 대표 질의로 선택합니다.

    현재는 가장 weight가 크고, 길이가 적당히 긴 후보를 우선합니다.

    Args:
        queries: LogQuery iterable

    Returns:
        선택된 LogQuery 또는 후보가 없으면 None
    """
    qs = list(queries)
    if not qs:
        return None
    return sorted(qs, key=lambda q: (q.weight, len(q.query)), reverse=True)[0]



### 브랜드 추출 (brand_extractor)


In [None]:
"""
기기명에서 브랜드(Brand)를 추출하는 모듈입니다.

사운드바 DB의 known brand 집합을 사용하여 dictionary 기반 추출을 수행합니다.
(일반 NER 대신 도메인 특화 방식으로 정확도를 높입니다.)
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Set



@dataclass(frozen=True)
class BrandExtraction:
    """
    브랜드 추출 결과입니다.

    Attributes:
        brand: 추출된 브랜드(정규화된 문자열). 없으면 None.
        model_part: 브랜드 제외 나머지 부분(정규화). 없으면 query 전체.
        original_query: 원본 질의.
    """

    brand: Optional[str]
    model_part: str
    original_query: str


def extract_brand_from_query(
    query: str,
    known_brands: Set[str],
) -> BrandExtraction:
    """
    질의 문자열에서 브랜드를 추출합니다.

    전략:
    1. 정규화된 query의 첫 토큰이 known_brands에 있으면 brand로 사용.
    2. 첫 토큰이 없으면, query 내 임의 위치에서 가장 긴 brand 매칭을 탐색.
    3. brand가 없으면 model_part = query 전체.

    Args:
        query: 기기명 질의(원본 또는 정규화).
        known_brands: 사운드바 DB의 브랜드 집합(정규화된 값).

    Returns:
        BrandExtraction
    """
    if not query or not query.strip():
        return BrandExtraction(brand=None, model_part="", original_query=query or "")

    q = normalize_text(query)
    if not q:
        return BrandExtraction(brand=None, model_part="", original_query=query)

    tokens = q.split()
    if not tokens:
        return BrandExtraction(brand=None, model_part=q, original_query=query)

    # 1) 첫 토큰이 브랜드인 경우
    first = tokens[0]
    if first in known_brands:
        model_part = " ".join(tokens[1:]).strip()
        return BrandExtraction(brand=first, model_part=model_part or q, original_query=query)

    # 2) 쿼리 내에서 가장 긴 브랜드 매칭 탐색
    matched_brand: Optional[str] = None
    matched_len = 0
    for b in known_brands:
        if not b:
            continue
        # "LG" in "LG SPEAKER DS80TR" 또는 "SONOS" in "SONOS ARC"
        if b in q:
            if len(b) > matched_len:
                matched_len = len(b)
                matched_brand = b

    if matched_brand:
        # 브랜드 제거 후 model_part
        rest = q.replace(matched_brand, "", 1).strip()
        rest = " ".join(rest.split())
        return BrandExtraction(brand=matched_brand, model_part=rest or q, original_query=query)

    # 3) 브랜드 없음
    return BrandExtraction(brand=None, model_part=q, original_query=query)


### 비사운드바 기기 탐지 (device_type)


In [None]:
"""
Entity 기반 비사운드바 기기(스트리밍/셋톱박스/디코더 등) 탐지 모듈입니다.

블랙리스트 방식이 아닌, 질의가 "사운드바가 아닌 입력 소스"를 지칭하는지
패턴/엔티티 기반으로 판별합니다. 이를 통해 APPLE TV, SKY Q, ORANGE 디코더 등
입력 소스명이 사운드바로 잘못 매칭되는 것을 방지합니다.
"""

from __future__ import annotations

import re


# 스트리밍/미디어 플레이어 (사운드바 브랜드 아님)
_STREAMING_PATTERNS: tuple[str, ...] = (
    r"\bAPPLE\s+TV\b",
    r"\bFIRE\s+TV\b",
    r"\bAMAZON\s+FIRE\s+T\b",
    r"\bFIRETV\b",
    r"\bCHROMECAST\b",
    r"\bROKU\b",
    r"\bMAGENTATV\b",
    r"\bTELIA\s+TV\b",
    r"\bVODAFONE\s+TV\b",
    r"\bEE\s+TV\b",  # EE TV (BT/영국 통신사 스트리밍)
    r"\bSHIELD\b",  # NVIDIA Shield 셋톱박스
)

# 셋톱박스/위성 수신기
_SETTOP_PATTERNS: tuple[str, ...] = (
    r"\bSKY\b",  # SKY 단독 (SKY Q 외에 SKY, SKY+ 등)
    r"\bSKY\s+Q\b",
    r"\bSETTOP\s+BOX\b",
    r"\bSET\s+TOP\s+BOX\b",
    r"\bSTB\b",
    r"\bSF8008\b",  # 삼성/기타 셋톱박스 모델
    r"\bMEDIABOX\b",  # 미디어박스 (MEDIABOX 1 등)
)

# 게임 콘솔 (단독 질의로 사용될 때)
_CONSOLE_PATTERNS: tuple[str, ...] = (
    r"\bXBOX\b",
    r"\bPLAYSTATION\s+\d+\b",  # PLAYSTATION 4, 5, 6 등 모든 번호
    r"\bPS\d+\b",  # PS4, PS5, PS6 등 모든 번호
    r"\bNINTENDO\b",  # NINTENDO SWITCH, WII, WII U, DS, 3DS 등 모든 콘솔
    r"\bCONSOLA\s+DE\s+JUEGOS\b",
)

# 일반 미디어 소스 (사운드바 아님) - 디코더/인터넷 박스 포함
_GENERIC_SOURCE_PATTERNS: tuple[str, ...] = (
    r"\bBLURAY\s+PLAYER\b",
    r"\bBD\s+PLAYER\b",
    r"\bDVD\b",
    r"\bDVDPLAYER\b",
    r"\bAV\s+RECEIVER\b",
    r"\bTV\s+IN\b",  # TV IN WOONKAME 등: TV 입력 소스 라벨, 사운드바 아님
    # 디코더/인터넷 박스 (다국어) - 통신사(ORANGE 등) 조합은 cosine 유사도로 처리
    r"\bCODEUR\b",
    r"\bDECODER\b",
    r"\bBOX\s+INTERNET\b",
    r"\bBO\s+TIER\s+D\b",
    r"\bFIBRE\s+OPTIQUE\b",
)

# 사운드바 DB 브랜드인 경우 무시 (예: SONY BARRA DE SOM)
_KNOWN_SOUNDBAR_BRAND_PREFIX = (
    "LG ",
    "SAMSUNG ",
    "SONY ",
    "BOSE ",
    "JBL ",
    "PHILIPS ",
    "DENON ",
    "SONOS ",
    "TCL ",
    "HISENSE ",
    "HARMAN ",
    "YAMAHA ",
    "VIZIO ",
    "POLK ",
    "KLIPSCH ",
    "ROKU ",
)

_streaming_re = re.compile("|".join(_STREAMING_PATTERNS), re.IGNORECASE)
_settop_re = re.compile("|".join(_SETTOP_PATTERNS), re.IGNORECASE)
_console_re = re.compile("|".join(_CONSOLE_PATTERNS), re.IGNORECASE)
_generic_re = re.compile("|".join(_GENERIC_SOURCE_PATTERNS), re.IGNORECASE)


def is_non_soundbar_device(query: str) -> bool:
    """
    질의가 사운드바가 아닌 입력 소스(스트리밍/셋톱박스/디코더 등)를 지칭하는지 판별합니다.

    블랙리스트가 아닌 패턴 기반입니다. 예:
    - "APPLE TV" -> True (스트리밍 기기)
    - "SKY Q" -> True (셋톱박스)
    - "ORANGE FIBRE OPTIQUE BO TIER D CODEUR /BOX INTERNET" -> True (디코더)
    - "LG S40T" -> False (사운드바)
    - "SONY SONY BARRA DE SOM" -> False (사운드바)

    Args:
        query: 정규화된 질의 문자열

    Returns:
        비사운드바 기기이면 True, 사운드바일 가능성이 있으면 False
    """
    if not query or not query.strip():
        return False

    q = query.upper().strip()

    # 사운드바 DB 브랜드로 시작하면 비사운드바 아님 (예: LG S40T, SONY BARRA DE SOM)
    for prefix in _KNOWN_SOUNDBAR_BRAND_PREFIX:
        if q.startswith(prefix):
            return False

    # 스트리밍/셋톱/콘솔/일반 미디어(디코더 포함) 패턴 매칭
    if _streaming_re.search(q):
        return True
    if _settop_re.search(q):
        return True
    if _console_re.search(q):
        return True
    if _generic_re.search(q):
        return True

    return False


### 임베딩 검색 (embedding_retriever)


In [None]:
"""
정확도 우선 임베딩 기반 검색 모듈입니다.

- TF-IDF 프리필터 없이 전체 DB에 대해 임베딩 코사인 유사도로 랭킹
- 브랜드 필터(옵션): 추출된 brand로 후보 풀 축소
- 속도보다 정확도에 중점
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, Optional

import numpy as np



@dataclass(frozen=True)
class EmbeddingCandidate:
    """임베딩 검색 후보 1개를 표현합니다."""

    canonical: str
    score: float
    brand: str
    model_part: str


def _safe_import_sentence_transformers():
    """sentence-transformers를 지연 로드합니다."""
    try:
        from sentence_transformers import SentenceTransformer  # type: ignore
    except Exception:
        return None
    return SentenceTransformer


class EmbeddingRetriever:
    """
    임베딩 + 코사인 유사도 기반 검색기입니다.
    정확도 우선으로 TF-IDF 프리필터 없이 전체 DB에 대해 검색합니다.
    """

    def __init__(
        self,
        *,
        embedding_model_id: str = "BAAI/bge-small-en-v1.5",
        normalize_embeddings: bool = True,
        top_k: int = 20,
    ) -> None:
        """
        Args:
            embedding_model_id: sentence-transformers 모델 ID.
            normalize_embeddings: L2 정규화하여 cosine을 내적으로 계산.
            top_k: 반환 후보 수.
        """
        self.embedding_model_id = embedding_model_id
        self.normalize_embeddings = normalize_embeddings
        self.top_k = top_k

        self._embedder = None
        self._records: list[SoundbarRecord] = []
        self._canonicals: list[str] = []
        self._brands: list[str] = []
        self._model_parts: list[str] = []
        self._embeddings: Optional[np.ndarray] = None

    def fit(self, records: Iterable[SoundbarRecord]) -> None:
        """
        사운드바 DB 레코드로 인덱스를 구축합니다.

        Args:
            records: SoundbarRecord iterable.
        """
        self._records = list(records)
        self._canonicals = [r.canonical for r in self._records if r.canonical]
        self._brands = [r.brand or "" for r in self._records if r.canonical]
        self._model_parts = [
            (r.model or r.canonical or "").strip() for r in self._records if r.canonical
        ]

        if not self._canonicals:
            raise ValueError("인덱싱할 canonical 모델이 없습니다.")

        SentenceTransformer = _safe_import_sentence_transformers()
        if SentenceTransformer is None:
            raise ImportError(
                "sentence-transformers가 필요합니다. `pip install sentence-transformers`로 설치해 주세요."
            )

        self._embedder = SentenceTransformer(self.embedding_model_id)

        # 모델명 검색 정확도 향상: DB의 model_part만 임베딩 (full canonical 대신)
        texts_to_embed = [normalize_text(mp) for mp in self._model_parts]
        self._embeddings = self._embedder.encode(
            texts_to_embed,
            convert_to_numpy=True,
            normalize_embeddings=self.normalize_embeddings,
            show_progress_bar=True,
        ).astype(np.float32)

    def retrieve(
        self,
        query: str,
        *,
        brand_filter: Optional[str] = None,
        top_k: Optional[int] = None,
    ) -> list[EmbeddingCandidate]:
        """
        질의에 대해 임베딩 코사인 유사도로 TopK 후보를 반환합니다.

        Args:
            query: 질의 문자열(기기명 또는 모델 부분).
            brand_filter: 지정 시 해당 브랜드 후보만 검색.
            top_k: 반환 후보 수(override).

        Returns:
            EmbeddingCandidate 리스트(내림차순).
        """
        if self._embedder is None or self._embeddings is None:
            raise RuntimeError("EmbeddingRetriever.fit()을 먼저 호출해야 합니다.")

        q = normalize_text(query)
        if not q:
            return []

        k = int(top_k or self.top_k)
        k = max(1, min(k, len(self._canonicals)))

        # 브랜드 필터 적용
        if brand_filter:
            brand_norm = normalize_text(brand_filter)
            indices = [i for i in range(len(self._canonicals)) if self._brands[i] == brand_norm]
            if not indices:
                indices = list(range(len(self._canonicals)))
        else:
            indices = list(range(len(self._canonicals)))

        # 질의 임베딩
        q_emb = self._embedder.encode(
            [q],
            convert_to_numpy=True,
            normalize_embeddings=self.normalize_embeddings,
            show_progress_bar=False,
        ).astype(np.float32)[0]

        # 코사인 유사도 (정규화된 내적)
        cand_emb = self._embeddings[indices]
        scores = np.dot(cand_emb, q_emb).astype(np.float64)

        # TopK
        top_idx = np.argsort(-scores)[:k]
        out: list[EmbeddingCandidate] = []
        for i in top_idx:
            idx = indices[i]
            out.append(
                EmbeddingCandidate(
                    canonical=self._canonicals[idx],
                    score=float(scores[i]),
                    brand=self._brands[idx],
                    model_part=self._model_parts[idx],
                )
            )
        return out

    def compute_similarity(self, text1: str, text2: str) -> float:
        """
        두 문자열 간 코사인 유사도를 계산합니다.

        primary_query와 top1 candidate 간 직접 유사도 검증에 사용합니다.

        Args:
            text1: 첫 번째 문자열(예: primary_query)
            text2: 두 번째 문자열(예: canonical 모델명)

        Returns:
            코사인 유사도 [0, 1].
        """
        sims = self.compute_similarities_batch(text1, [text2])
        return sims[0] if sims else 0.0

    def compute_similarities_batch(self, query: str, canonicals: list[str]) -> list[float]:
        """
        질의와 여러 canonical 간 코사인 유사도를 1회 인코딩으로 계산합니다.
        (속도 개선: N회 별도 encode 대신 1회 배치 encode)

        Args:
            query: 질의 문자열
            canonicals: canonical 모델명 리스트

        Returns:
            각 canonical에 대한 유사도 리스트
        """
        if self._embedder is None or not canonicals:
            return [1.0] * len(canonicals) if canonicals else []
        q = normalize_text(query) or query
        if not q:
            return [0.0] * len(canonicals)
        texts = [q] + [normalize_text(c) or c for c in canonicals]
        embs = self._embedder.encode(
            texts,
            convert_to_numpy=True,
            normalize_embeddings=self.normalize_embeddings,
            show_progress_bar=False,
        ).astype(np.float32)
        q_emb = embs[0]
        return [float(np.dot(q_emb, embs[i + 1])) for i in range(len(canonicals))]


### 하이브리드 검색 (retrieval)


In [None]:
"""
후보 검색(retrieval) 모듈입니다.

구현 목표:
- 1차: TF-IDF(문자 n-gram)로 빠르게 TopN 후보를 축소(lexical)
- 2차: sentence-transformers 임베딩 cosine 유사도로 TopK 정밀 후보 생성(semantic, 옵션)

sentence-transformers가 설치되지 않은 환경에서는 TF-IDF만으로 동작합니다.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable, Optional

import numpy as np



@dataclass(frozen=True)
class RetrievedCandidate:
    """검색으로 얻은 후보 1개를 표현합니다."""

    canonical: str
    score: float
    lexical_score: float
    semantic_score: Optional[float]


def _safe_import_sklearn() -> tuple[object, object, object]:
    """scikit-learn을 지연 로드합니다(미설치 환경에서 오류 메시지 개선)."""
    try:
        from sklearn.feature_extraction.text import TfidfVectorizer  # type: ignore
        from sklearn.metrics.pairwise import linear_kernel  # type: ignore
    except Exception as e:  # noqa: BLE001
        raise ImportError(
            "scikit-learn이 필요합니다. `pip install scikit-learn`로 설치해 주세요."
        ) from e
    return TfidfVectorizer, linear_kernel, np


def _safe_import_sentence_transformers():
    """sentence-transformers를 지연 로드합니다(옵션)."""
    try:
        from sentence_transformers import SentenceTransformer  # type: ignore
    except Exception:
        return None
    return SentenceTransformer


class HybridRetriever:
    """
    TF-IDF + (옵션) 임베딩 기반 하이브리드 검색기입니다.

    Public API:
    - `fit(records)`: DB 인덱스 구축
    - `retrieve(query)`: TopK 후보 반환
    """

    def __init__(
        self,
        *,
        char_ngram_range: tuple[int, int] = (3, 5),
        min_df: int = 1,
        top_n_lexical: int = 200,
        top_k: int = 20,
        embedding_model_id: Optional[str] = "BAAI/bge-small-en-v1.5",
        use_embeddings: bool = True,
        normalize_embeddings: bool = True,
        random_state: int = 42,
    ) -> None:
        """
        Args:
            char_ngram_range: TF-IDF char n-gram 범위.
            min_df: TF-IDF 최소 문서 빈도.
            top_n_lexical: 1차 lexical 후보 수(2차 임베딩 계산 대상).
            top_k: 최종 반환 후보 수.
            embedding_model_id: sentence-transformers 모델 ID(옵션).
            use_embeddings: True이면 임베딩 기반 2차 검색 시도(미설치면 자동 비활성).
            normalize_embeddings: 임베딩을 L2 정규화하여 cosine을 내적으로 계산.
            random_state: 재현성(일부 모델/연산에서 사용).
        """
        self.char_ngram_range = char_ngram_range
        self.min_df = min_df
        self.top_n_lexical = top_n_lexical
        self.top_k = top_k
        self.embedding_model_id = embedding_model_id
        self.use_embeddings = use_embeddings
        self.normalize_embeddings = normalize_embeddings
        self.random_state = random_state

        self._canonicals: list[str] = []
        self._tfidf_vectorizer = None
        self._tfidf_matrix = None
        self._embedder = None
        self._embeddings: Optional[np.ndarray] = None

    @property
    def canonicals(self) -> list[str]:
        """인덱싱된 canonical 모델 문자열 리스트를 반환합니다."""
        return list(self._canonicals)

    def fit(self, records: Iterable[SoundbarRecord]) -> None:
        """
        사운드바 DB 레코드로 인덱스를 구축합니다.

        Args:
            records: SoundbarRecord iterable.

        Raises:
            ValueError: 인덱싱 가능한 canonical이 없을 때.
            ImportError: scikit-learn 미설치 시.
        """
        canonicals = [r.canonical for r in records if r.canonical]
        canonicals = [normalize_text(c) for c in canonicals]
        canonicals = [c for c in canonicals if c]
        if not canonicals:
            raise ValueError("인덱싱할 canonical 모델이 없습니다.")

        TfidfVectorizer, _, _ = _safe_import_sklearn()
        self._canonicals = canonicals
        self._tfidf_vectorizer = TfidfVectorizer(
            analyzer="char_wb",
            ngram_range=self.char_ngram_range,
            min_df=self.min_df,
            lowercase=False,
        )
        self._tfidf_matrix = self._tfidf_vectorizer.fit_transform(self._canonicals)

        self._maybe_init_embeddings()
        if self._embedder is not None:
            self._embeddings = self._encode_texts(self._canonicals)
        else:
            self._embeddings = None

    def _maybe_init_embeddings(self) -> None:
        """임베딩 모델을 초기화합니다(옵션)."""
        if not self.use_embeddings or not self.embedding_model_id:
            self._embedder = None
            return
        SentenceTransformer = _safe_import_sentence_transformers()
        if SentenceTransformer is None:
            self._embedder = None
            return

        try:
            self._embedder = SentenceTransformer(self.embedding_model_id)
        except Exception:
            # 다운로드/초기화 실패 시에도 TF-IDF만으로 동작 가능해야 함
            self._embedder = None

    def _encode_texts(self, texts: list[str]) -> np.ndarray:
        """텍스트 리스트를 임베딩합니다."""
        if self._embedder is None:
            raise RuntimeError("임베딩 모델이 초기화되지 않았습니다.")
        emb = self._embedder.encode(
            texts,
            convert_to_numpy=True,
            normalize_embeddings=self.normalize_embeddings,
            show_progress_bar=False,
        )
        return emb.astype(np.float32, copy=False)

    def retrieve(
        self,
        query: str,
        *,
        top_n_lexical: Optional[int] = None,
        top_k: Optional[int] = None,
    ) -> list[RetrievedCandidate]:
        """
        주어진 질의 문자열로 후보 TopK를 검색합니다.

        Args:
            query: 질의 문자열(원본 가능, 내부에서 정규화)
            top_n_lexical: 1차 후보 수(override)
            top_k: 최종 후보 수(override)

        Returns:
            RetrievedCandidate 리스트(내림차순 정렬).

        Raises:
            RuntimeError: `fit()`이 호출되지 않았을 때.
        """
        if self._tfidf_vectorizer is None or self._tfidf_matrix is None:
            raise RuntimeError("HybridRetriever.fit()을 먼저 호출해야 합니다.")

        q = normalize_text(query)
        if not q:
            return []

        n_lex = int(top_n_lexical or self.top_n_lexical)
        k = int(top_k or self.top_k)
        n_lex = max(1, min(n_lex, len(self._canonicals)))
        k = max(1, min(k, len(self._canonicals)))

        _, linear_kernel, _ = _safe_import_sklearn()
        q_vec = self._tfidf_vectorizer.transform([q])
        lex_scores = linear_kernel(q_vec, self._tfidf_matrix).ravel()

        # 1차: lexical TopN
        top_lex_idx = np.argpartition(-lex_scores, kth=min(n_lex - 1, len(lex_scores) - 1))[
            :n_lex
        ]
        top_lex_idx = top_lex_idx[np.argsort(-lex_scores[top_lex_idx])]

        # 2차: semantic cosine (옵션)
        sem_scores: Optional[np.ndarray] = None
        if self._embedder is not None and self._embeddings is not None:
            q_emb = self._encode_texts([q])[0]
            cand_emb = self._embeddings[top_lex_idx]
            # normalize_embeddings=True라면 cosine ~= dot
            sem_scores = np.dot(cand_emb, q_emb).astype(np.float32, copy=False)
            rerank_idx = np.argsort(-sem_scores)
            final_idx = top_lex_idx[rerank_idx][:k]
        else:
            final_idx = top_lex_idx[:k]

        out: list[RetrievedCandidate] = []
        for idx in final_idx:
            lex = float(lex_scores[idx])
            sem = None if sem_scores is None else float(sem_scores[np.where(top_lex_idx == idx)[0][0]])
            score = float(sem) if sem is not None else lex
            out.append(
                RetrievedCandidate(
                    canonical=self._canonicals[int(idx)],
                    score=score,
                    lexical_score=lex,
                    semantic_score=sem,
                )
            )
        return out

    def compute_similarity(self, text1: str, text2: str) -> float:
        """
        두 문자열 간 코사인 유사도를 계산합니다.

        primary_query와 top1 candidate 간 직접 유사도 검증에 사용합니다.

        Args:
            text1: 첫 번째 문자열(예: primary_query)
            text2: 두 번째 문자열(예: canonical 모델명)

        Returns:
            코사인 유사도 [0, 1]. 임베딩 미사용 시 1.0 반환.
        """
        sims = self.compute_similarities_batch(text1, [text2])
        return sims[0] if sims else (1.0 if self._embedder is None else 0.0)

    def compute_similarities_batch(self, query: str, canonicals: list[str]) -> list[float]:
        """
        질의와 여러 canonical 간 코사인 유사도를 1회 인코딩으로 계산합니다.

        Args:
            query: 질의 문자열
            canonicals: canonical 모델명 리스트

        Returns:
            각 canonical에 대한 유사도 리스트
        """
        if self._embedder is None or not canonicals:
            return [1.0] * len(canonicals) if canonicals else []
        q = normalize_text(query) or query
        if not q:
            return [0.0] * len(canonicals)
        texts = [q] + [normalize_text(c) or c for c in canonicals]
        embs = self._encode_texts(texts)
        q_emb = embs[0]
        return [float(np.dot(q_emb, embs[i + 1])) for i in range(len(canonicals))]



### 검증 규칙 (verify)


In [None]:
"""
UNKNOWN 결정 규칙 및 (옵션) 후보 제한 LLM 검증(verifier) 모듈입니다.

목표:
- 검색(retrieval) 결과가 불확실할 때, \"틀릴 바엔 UNKNOWN\"을 선택할 수 있도록 함
- 점수 threshold, 1-2위 margin, 입력 노이즈(BT 헤드폰 등) 신호를 반영
- (옵션) 상위 후보 목록 안에서만 선택하는 closed-book LLM verifier 제공
"""

from __future__ import annotations

import json
import re
from dataclasses import dataclass
from typing import Any, Optional, Set


# 질의 첫 토큰이 아래이면 generic 입력소스(TV IN, AV 등)로 간주. 알려진 브랜드 없을 때 UNKNOWN.
_GENERIC_INPUT_FIRST_TOKENS: frozenset[str] = frozenset(
    {"TV", "IN", "AV", "HDMI", "USB", "OPTICAL", "ARC", "AUX", "DIGITAL"}
)

UNKNOWN_LABEL = "UNKNOWN"


def _get_canonical_model_part(canonical: str) -> str:
    """canonical 'BRAND MODEL'에서 모델 부분만 추출합니다."""
    parts = canonical.upper().split()
    return " ".join(parts[1:]) if len(parts) > 1 else ""


def _compact_no_space(s: str) -> str:
    """공백 제거 후 문자열. ARC ULTRA vs ARCULTRA 매칭용."""
    return "".join(s.upper().split())


def _has_lexical_overlap(query: str, canonical: str, min_token_len: int = 2) -> bool:
    """
    질의와 canonical 간 의미 있는 어휘 중첩이 있는지 검사합니다.

    기기명 블랙리스트 없이, 질의가 사운드바 DB 후보와 무관할 때 UNKNOWN을
    선택하는 데 사용합니다. (예: "SKY Q" vs "SONY HTS40R" -> 중첩 없음 -> UNKNOWN)
    ARC ULTRA vs ARCULTRA: 공백 제거 후 동일하면 중첩으로 인정합니다.

    Args:
        query: 정규화된 질의 문자열
        canonical: "BRAND MODEL" 형식의 canonical 또는 model_part
        min_token_len: 중첩 판별 시 최소 토큰 길이(단일 문자 제외)

    Returns:
        중첩 있으면 True, 없으면 False
    """
    # 공백 제거 정규화: ARC ULTRA == ARCULTRA
    q_compact = _compact_no_space(query)
    c_compact = _compact_no_space(canonical)
    if q_compact and c_compact and (q_compact in c_compact or c_compact in q_compact):
        return True

    q_tokens = [t for t in query.upper().split() if len(t) >= min_token_len]
    c_tokens = [t for t in canonical.upper().split() if len(t) >= min_token_len]
    if not q_tokens or not c_tokens:
        return False
    for q in q_tokens:
        for c in c_tokens:
            if q in c or c in q:
                return True
    return False


def _levenshtein_distance(a: str, b: str) -> int:
    """
    Levenshtein distance(편집 거리)를 계산합니다.
    오타/유사 모델 overlap 판별에 사용합니다.
    """
    if a == b:
        return 0
    if not a:
        return len(b)
    if not b:
        return len(a)
    if len(a) < len(b):
        a, b = b, a
    prev = list(range(len(b) + 1))
    for i, ca in enumerate(a, start=1):
        cur = [i]
        for j, cb in enumerate(b, start=1):
            cur.append(
                min(
                    cur[j - 1] + 1,
                    prev[j] + 1,
                    prev[j - 1] + (0 if ca == cb else 1),
                )
            )
        prev = cur
    return prev[-1]


def _has_typo_overlap(query_part: str, canon_part: str, max_edit_distance: int = 2) -> bool:
    """
    오타/유사 모델 overlap을 검사합니다.
    LG US20A vs LG S20A, LG NS60TR vs LG S60TR 등 1~2자 차이 허용.

    Args:
        query_part: 질의의 모델 부분
        canon_part: canonical의 모델 부분
        max_edit_distance: 허용 최대 편집 거리

    Returns:
        오타 수준 overlap이면 True
    """
    q = (query_part or "").upper().replace(" ", "")
    c = (canon_part or "").upper().replace(" ", "")
    if not q or not c:
        return False
    # 3자 이상 공통 서브스트링
    for ln in range(3, min(len(q), len(c)) + 1):
        for i in range(len(q) - ln + 1):
            sub = q[i : i + ln]
            if sub in c:
                return True
    # Levenshtein 거리 <= 2 (길이 비슷할 때)
    if abs(len(q) - len(c)) <= 2 and _levenshtein_distance(q, c) <= max_edit_distance:
        return True
    return False


def _extract_model_numbers(text: str) -> list[int]:
    """모델명에서 숫자 시퀀스를 추출합니다. ENCHANT900→[900], S20A→[20]."""
    parts = re.findall(r"\d+", (text or "").upper())
    return [int(p) for p in parts if p]


def _pick_closest_model_candidate(
    query: str, candidates: list[RetrievedCandidate]
) -> Optional[RetrievedCandidate]:
    """
    small_margin 시 쿼리와 모델 번호가 가장 가까운 후보를 선택합니다.
    ENCHANT900 → ENCHANT800(800) 선호, ENCHANT1300(1300) 비선호.
    """
    q_nums = _extract_model_numbers(query)
    if not q_nums:
        return candidates[0] if candidates else None
    q_num = q_nums[-1]  # 주로 마지막 숫자가 모델 번호(900, 800 등)
    best: Optional[tuple[RetrievedCandidate, int]] = None
    for c in candidates:
        canon_part = _get_canonical_model_part(c.canonical) or c.canonical
        c_nums = _extract_model_numbers(canon_part)
        if not c_nums:
            continue
        c_num = c_nums[-1]
        diff = abs(q_num - c_num)
        if best is None or diff < best[1]:
            best = (c, diff)
    return best[0] if best else (candidates[0] if candidates else None)


def _has_model_part_overlap(query: str, canonical: str, min_substring_len: int = 4) -> bool:
    """
    질의와 canonical의 모델 부분 간 거의 일치 수준 중첩이 있는지 검사합니다.

    브랜드를 참조하지 못할 때, 모델명이 거의 일치해야만 예측하도록 합니다.
    (예: "SKY Q" vs "SONOS ARC" → model "ARC"와 중첩 없음 → UNKNOWN)
    (예: "CINEBAR 11" vs "JBL CINEMASB110" → "CINE" 등 4자 이상 중첩 → accept)

    Args:
        query: 정규화된 질의 문자열
        canonical: "BRAND MODEL" 형식의 canonical
        min_substring_len: 실질적 일치로 인정할 최소 중첩 길이

    Returns:
        모델 부분과 실질적 중첩 있으면 True
    """
    parts = canonical.upper().split()
    model_part = " ".join(parts[1:]) if len(parts) > 1 else ""
    if not model_part:
        return False
    q_tokens = [t for t in query.upper().split() if len(t) >= min_substring_len]
    if not q_tokens:
        return False
    for q in q_tokens:
        for i in range(len(q) - min_substring_len + 1):
            sub = q[i : i + min_substring_len]
            if sub in model_part:
                return True
        if q in model_part or model_part in q:
            return True
    return False


@dataclass(frozen=True)
class VerificationConfig:
    """검증/결정 로직 설정입니다."""

    accept_score_threshold: float = 0.52  # 0.55→0.52 (LG NS60TR 등 BT 가중치 완화)
    margin_threshold: float = 0.05
    prefer_unknown_on_uncertain: bool = True
    allow_bt_only_prediction: bool = False
    unknown_label: str = UNKNOWN_LABEL
    known_brands: Optional[Set[str]] = None  # 사운드바 DB 브랜드 집합, NER 판별용


@dataclass(frozen=True)
class Prediction:
    """
    최종 예측 결과입니다.

    Attributes:
        canonical_model: 예측된 canonical 모델명. UNKNOWN이면 None.
        confidence: [0, 1] 범위의 대략적 신뢰도(휴리스틱).
        evidence: 근거(입력 질의, 상위 후보/점수 등).
    """

    canonical_model: Optional[str]
    confidence: float
    evidence: dict[str, Any]


class RuleBasedVerifier:
    """
    retrieval 결과를 기반으로 UNKNOWN/모델을 결정하는 규칙 기반 verifier 입니다.

    Public API:
    - `verify(queries, candidates) -> Prediction`
    """

    def __init__(
        self,
        config: Optional[VerificationConfig] = None,
        llm_verifier: Optional["ClosedBookLLMVerifier"] = None,
    ) -> None:
        """
        Args:
            config: 검증 설정
            llm_verifier: (옵션) closed-book LLM verifier
        """
        self.config = config or VerificationConfig()
        self.llm_verifier = llm_verifier

    def verify(self, queries: list[LogQuery], candidates: list[RetrievedCandidate]) -> Prediction:
        """
        후보 리스트에서 최종 모델 또는 UNKNOWN을 선택합니다.

        Args:
            queries: 로그에서 추출된 질의 리스트
            candidates: retrieval 결과 (score 내림차순 권장)

        Returns:
            Prediction
        """
        primary = choose_primary_query(queries)
        if primary is None:
            return Prediction(
                canonical_model=None,
                confidence=1.0,
                evidence={"reason": "no_valid_query"},
            )

        if not candidates:
            return Prediction(
                canonical_model=None,
                confidence=1.0,
                evidence={"reason": "no_candidates", "primary_query": primary.query},
            )

        top1 = candidates[0]
        top2 = candidates[1] if len(candidates) > 1 else None
        margin = (top1.score - top2.score) if top2 is not None else 1.0

        # 룰: 브랜드 없이 "SOUNDBAR" 또는 "SOUND BAR" 단독 → ETC SOUNDBAR
        q_compact = _compact_no_space(primary.query)
        if q_compact == "SOUNDBAR":
            return Prediction(
                canonical_model="ETC SOUNDBAR",
                confidence=1.0,
                evidence={
                    "reason": "generic_soundbar_rule",
                    "primary_query": primary.query,
                },
            )

        # Entity-based: display primary(queries[0])이 비사운드바 기기면 UNKNOWN
        # APPLE TV, SKY Q, ORANGE 디코더 등 입력 소스는 사운드바가 아님
        if queries and is_non_soundbar_device(queries[0].query):
            return Prediction(
                canonical_model=None,
                confidence=0.85,
                evidence={
                    "reason": "non_soundbar_device",
                    "primary_query": queries[0].query,
                    "top1": top1.__dict__,
                },
            )

        # NER 기반: 질의에서 브랜드 추출 (known_brands = 사운드바 DB 브랜드 집합)
        known_brands = self.config.known_brands or set()
        extraction = (
            extract_brand_from_query(primary.query, known_brands) if known_brands else None
        )
        query_has_known_brand = extraction is not None and extraction.brand is not None

        # 브랜드 없음: no_known_brand로 UNKNOWN 반환하지 않음.
        # 모델명 + 사운드바 DB 간 코사인 유사도로 매칭(agent의 cosine gate에서 처리, 공백 제거 적용)

        soundbar_hint_re = re.compile(r"\b(SOUNDBAR|SOUND\s*BAR)\b")
        # overlap 검사 제거: cosine 유사도(0.70~0.85)로만 accept/reject 결정 (agent cosine gate)

        # "브랜드 + SOUNDBAR"처럼 모델이 부정확한 입력은, 브랜드의 generic soundbar 엔트리를 우대
        # - 예: "LG SOUND BAR" -> "LG LGSOUNDBAR" (DB에 존재하는 경우)
        generic_soundbar_relax_threshold = 0.45
        if soundbar_hint_re.search(primary.query):
            # query의 첫 토큰을 브랜드로 가정 (예: "LG LG SOUND BAR"의 첫 토큰 "LG")
            parts = primary.query.split()
            brand = parts[0] if parts else ""
            if brand and top1.canonical.startswith(f"{brand} "):
                # canonical 안에 "SOUNDBAR"가 포함된 generic 모델이면 낮은 threshold로 accept
                if "SOUNDBAR" in top1.canonical and top1.score >= generic_soundbar_relax_threshold:
                    return Prediction(
                        canonical_model=top1.canonical,
                        confidence=float(min(1.0, max(0.0, top1.score + 0.25))),
                        evidence={
                            "reason": "generic_soundbar_accept",
                            "primary_query": primary.query,
                            "top1": top1.__dict__,
                        },
                    )

        # BT만 있는 경우(헤드폰/이어폰 가능성이 높음) 보수적으로 UNKNOWN
        if (
            primary.source == "NAME_BT"
            and bt_looks_like_non_soundbar(primary.query)
            and not self.config.allow_bt_only_prediction
        ):
            # 매우 높은 점수면 예외 허용
            if top1.score < max(self.config.accept_score_threshold, 0.75):
                return Prediction(
                    canonical_model=None,
                    confidence=0.9,
                    evidence={
                        "reason": "bt_noise",
                        "primary_query": primary.query,
                        "top1": top1.__dict__,
                    },
                )

        # threshold 미만이면 UNKNOWN
        if top1.score < self.config.accept_score_threshold:
            return Prediction(
                canonical_model=None,
                confidence=1.0 - min(1.0, top1.score),
                evidence={
                    "reason": "below_threshold",
                    "primary_query": primary.query,
                    "top1": top1.__dict__,
                    "threshold": self.config.accept_score_threshold,
                },
            )

        # margin이 작으면 불확실 -> (옵션) LLM 검증
        # overlap 기반 margin 완화 제거, cosine gate에서 처리
        margin_thresh = self.config.margin_threshold
        if top2 is not None and margin < margin_thresh:
            if self.llm_verifier is not None:
                selected = self.llm_verifier.select(primary.query, candidates[:5])
                if selected is None:
                    return Prediction(
                        canonical_model=None,
                        confidence=0.6,
                        evidence={
                            "reason": "llm_unknown",
                            "primary_query": primary.query,
                            "top5": [c.__dict__ for c in candidates[:5]],
                        },
                    )
                return Prediction(
                    canonical_model=selected,
                    confidence=0.65,
                    evidence={
                        "reason": "llm_selected",
                        "primary_query": primary.query,
                        "selected": selected,
                        "top5": [c.__dict__ for c in candidates[:5]],
                    },
                )

            if self.config.prefer_unknown_on_uncertain:
                # cosine 기반: 모델 번호가 쿼리와 가장 가까운 후보 선택 (ENCHANT900→ENCHANT800 등)
                closest = _pick_closest_model_candidate(primary.query, candidates[:5])
                if closest is not None:
                    return Prediction(
                        canonical_model=closest.canonical,
                        confidence=0.6,
                        evidence={
                            "reason": "small_margin_closest_model",
                            "primary_query": primary.query,
                            "selected": closest.__dict__,
                            "margin": margin,
                            "margin_threshold": self.config.margin_threshold,
                        },
                    )
                return Prediction(
                    canonical_model=None,
                    confidence=0.55,
                    evidence={
                        "reason": "small_margin",
                        "primary_query": primary.query,
                        "top1": top1.__dict__,
                        "top2": top2.__dict__,
                        "margin": margin,
                        "margin_threshold": self.config.margin_threshold,
                    },
                )

        # 기본: top1 채택
        confidence = float(min(1.0, max(0.0, top1.score)))
        return Prediction(
            canonical_model=top1.canonical,
            confidence=confidence,
            evidence={
                "reason": "accepted",
                "primary_query": primary.query,
                "top1": top1.__dict__,
                "margin": margin,
            },
        )


class ClosedBookLLMVerifier:
    """
    후보 목록(topN) 안에서만 답하도록 강제하는 LLM verifier 입니다.

    이 구현은 transformers 기반 로컬 모델을 가정하며, 미설치/미로드 환경에서는
    ImportError/RuntimeError를 발생시킵니다.

    Public API:
    - `select(observed, candidates) -> canonical|None`
    """

    def __init__(
        self,
        model_id: str,
        *,
        device: str = "auto",
        max_new_tokens: int = 64,
        temperature: float = 0.0,
    ) -> None:
        """
        Args:
            model_id: HuggingFace 모델 ID (예: "Qwen/Qwen2.5-1.5B-Instruct")
            device: "auto" 또는 "cpu"/"cuda" 등
            max_new_tokens: 생성 토큰 수
            temperature: 샘플링 온도(0이면 거의 결정적)
        """
        self.model_id = model_id
        self.device = device
        self.max_new_tokens = max_new_tokens
        self.temperature = temperature
        self._generator = self._init_generator()

    def _init_generator(self):
        """transformers text-generation 파이프라인을 초기화합니다."""
        try:
            from transformers import pipeline  # type: ignore
        except Exception as e:  # noqa: BLE001
            raise ImportError(
                "transformers가 필요합니다. `pip install transformers`로 설치해 주세요."
            ) from e

        # device="auto" 지원 여부는 버전에 따라 다를 수 있어 예외를 허용
        try:
            return pipeline(
                "text-generation",
                model=self.model_id,
                device_map=self.device,
            )
        except Exception:
            # fallback: device_map 없이 시도
            return pipeline("text-generation", model=self.model_id)

    def select(
        self, observed: str, candidates: list[RetrievedCandidate], unknown_label: str = UNKNOWN_LABEL
    ) -> Optional[str]:
        """
        관측 문자열과 후보 목록을 입력으로 받아, 후보 중 하나 또는 UNKNOWN을 선택합니다.

        Args:
            observed: 관측된 기기명/질의 문자열
            candidates: 상위 후보 리스트(보통 Top5)
            unknown_label: UNKNOWN 라벨 문자열

        Returns:
            선택된 canonical 문자열 또는 None(UNKNOWN)
        """
        if not candidates:
            return None

        # closed-book 강제를 위해 후보 목록을 명시하고, JSON으로만 답하게 지시
        options = [c.canonical for c in candidates]
        prompt = self._build_prompt(observed, options, unknown_label=unknown_label)
        out = self._generator(
            prompt,
            max_new_tokens=self.max_new_tokens,
            do_sample=self.temperature > 0.0,
            temperature=self.temperature,
        )
        text = self._extract_generated_text(out)
        selected = self._parse_selection(text, options, unknown_label=unknown_label)
        return selected

    def _build_prompt(self, observed: str, options: list[str], unknown_label: str) -> str:
        """LLM 프롬프트를 생성합니다."""
        payload = {
            "observed": observed,
            "options": options,
            "rules": [
                "정답은 options 중 하나를 그대로 선택하거나 UNKNOWN을 선택한다.",
                "options에 없는 문자열을 생성하지 않는다.",
                "출력은 JSON 한 줄만 반환한다: {\"choice\": \"...\", \"reason\": \"...\"}",
            ],
        }
        return (
            "다음은 사운드바 모델 매핑 문제이다.\n"
            f"{json.dumps(payload, ensure_ascii=False)}\n"
            "JSON으로만 답해라.\n"
        )

    def _extract_generated_text(self, out: Any) -> str:
        """transformers pipeline 출력에서 생성 텍스트를 추출합니다."""
        if isinstance(out, list) and out:
            item = out[0]
            if isinstance(item, dict) and "generated_text" in item:
                return str(item["generated_text"])
        return str(out)

    def _parse_selection(
        self, generated_text: str, options: list[str], unknown_label: str
    ) -> Optional[str]:
        """
        생성 결과에서 choice를 파싱합니다.

        JSON 파싱 실패 시, options에 대한 문자열 포함 여부로 보조 추정합니다.
        """
        # 가장 마지막 JSON 객체를 찾기 위해 간단히 {...} 블록을 스캔
        m = re.findall(r"\{[\s\S]*?\}", generated_text)
        for blob in reversed(m):
            try:
                obj = json.loads(blob)
                choice = str(obj.get("choice", "")).strip()
                if not choice or choice.upper() == unknown_label:
                    return None
                if choice in options:
                    return choice
            except Exception:
                continue

        # fallback: 옵션 문자열이 그대로 포함되어 있으면 그 중 첫 매칭 반환
        for opt in options:
            if opt in generated_text:
                return opt
        return None



### 데이터 로더 (load_hdmi_bt_log_csv)


In [None]:
from dataclasses import dataclass
from pathlib import Path
import pandas as pd

@dataclass(frozen=True)
class CsvLoadResult:
    path: Path
    df: pd.DataFrame

def load_hdmi_bt_log_csv(path: Path, encoding: str = "utf-8") -> CsvLoadResult:
    try:
        df = pd.read_csv(path, encoding=encoding)
    except FileNotFoundError:
        raise
    except UnicodeDecodeError as e:
        raise OSError(f"CSV 인코딩 오류: {path}") from e
    except Exception as e:
        raise OSError(f"CSV 로드 실패: {path}") from e
    required_cols = ["NAME_BT", "NAME1", "BRAND1", "NAME2", "BRAND2", "NAME3", "BRAND3", "NAME4", "BRAND4"]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"필수 컬럼이 없습니다: {missing}")
    return CsvLoadResult(path=path, df=df)


### Agent (SoundbarModelAgent)


In [None]:
"""
사운드바 모델 매핑 Agent(오케스트레이션) 모듈입니다.

파이프라인:
1) 사운드바 DB 로드/정규화
2) 로그 행에서 후보 질의 생성
3) 하이브리드 검색(TF-IDF + 임베딩)으로 후보 TopK 생성
4) verifier로 UNKNOWN/모델 결정
"""

from __future__ import annotations

from dataclasses import dataclass
from difflib import SequenceMatcher
from pathlib import Path
from typing import Any, Optional

import pandas as pd

    LogQuery,
    TypedQuery,
    build_log_queries_from_row,
    build_typed_queries_from_row,
    choose_primary_query,
)
    Prediction,
    RuleBasedVerifier,
    VerificationConfig,
    _get_canonical_model_part,
)


@dataclass(frozen=True)
class AgentConfig:
    """Agent 구성/하이퍼파라미터 설정입니다."""

    top_n_lexical: int = 200
    top_k: int = 20
    embedding_model_id: Optional[str] = "BAAI/bge-small-en-v1.5"
    use_embeddings: bool = True
    include_bt: bool = True
    accept_score_threshold: float = 0.52  # 0.55→0.52 (LG NS60TR 등 BT 가중치 완화)
    margin_threshold: float = 0.05
    accuracy_mode: bool = False  # True: NER+임베딩 전용, 정확도 우선
    min_cosine_similarity: Optional[float] = 0.85  # primary_query vs top1 직접 유사도. None이면 비활성
    cosine_relaxed_with_overlap: float = 0.70  # overlap 있을 때 fallback 검사용 완화 임계치 (0.65→0.70, 오탐 감소)
    cosine_gate_max_candidates: int = 5  # cosine gate에서 검사하는 최대 후보 수
    min_lexical_similarity: Optional[float] = 0.3  # 모델 파트 렉시컬 유사도 미만이면 UNKNOWN. None이면 비활성


@dataclass(frozen=True)
class PredictionResult:
    """
    Agent의 단일 row 예측 결과입니다.

    Attributes:
        row_id: 원본 데이터 행 식별자(없으면 None)
        predicted: 예측 canonical 모델명(UNKNOWN이면 None)
        confidence: 신뢰도
        primary_query: 대표 질의(정규화 문자열)
        candidates: 상위 후보 리스트(내림차순)
        evidence: verifier에서 반환한 근거 dict
    """

    row_id: Optional[int]
    predicted: Optional[str]
    confidence: float
    primary_query: str
    candidates: list[RetrievedCandidate]
    evidence: dict[str, Any]


class SoundbarModelAgent:
    """
    사운드바 모델 매핑 Agent 입니다.

    Public API:
    - `predict_row(row_dict, row_id=None) -> PredictionResult`
    - `batch_predict(df) -> pd.DataFrame`
    """

    def __init__(self, soundbar_list_py: Path, config: Optional[AgentConfig] = None) -> None:
        """
        Args:
            soundbar_list_py: `soundbar_list.py` 경로
            config: AgentConfig
        """
        self.soundbar_list_py = soundbar_list_py
        self.config = config or AgentConfig()

        self._records: list[SoundbarRecord] = []
        self._brand_set: set[str] = set()
        if self.config.accuracy_mode:
            self._embedding_retriever = EmbeddingRetriever(
                embedding_model_id=self.config.embedding_model_id or "BAAI/bge-small-en-v1.5",
                top_k=self.config.top_k,
            )
            self._hybrid_retriever: Optional[HybridRetriever] = None
        else:
            self._embedding_retriever = None
            self._hybrid_retriever = HybridRetriever(
                top_n_lexical=self.config.top_n_lexical,
                top_k=self.config.top_k,
                embedding_model_id=self.config.embedding_model_id,
                use_embeddings=self.config.use_embeddings,
            )
        self._verifier: Optional[RuleBasedVerifier] = None

        self._load_and_build()

    def _load_and_build(self) -> None:
        """사운드바 DB 로드 및 인덱스를 구축합니다."""
        self._records = load_soundbar_db_from_py(self.soundbar_list_py)
        self._brand_set = get_brand_set(self._records)
        if self._embedding_retriever is not None:
            self._embedding_retriever.fit(self._records)
        elif self._hybrid_retriever is not None:
            self._hybrid_retriever.fit(self._records)

        self._verifier = RuleBasedVerifier(
            config=VerificationConfig(
                accept_score_threshold=self.config.accept_score_threshold,
                margin_threshold=self.config.margin_threshold,
                known_brands=self._brand_set,
            )
        )

    @property
    def brand_set(self) -> set[str]:
        """DB로부터 추출된 브랜드 집합을 반환합니다."""
        return set(self._brand_set)

    def _embedding_cands_to_retrieved(
        self, cands: list, weights: float = 1.0
    ) -> list[RetrievedCandidate]:
        """EmbeddingCandidate를 RetrievedCandidate로 변환합니다."""
        out: list[RetrievedCandidate] = []
        for c in cands:
            wscore = float(c.score) * float(weights)
            out.append(
                RetrievedCandidate(
                    canonical=c.canonical,
                    score=wscore,
                    lexical_score=0.0,
                    semantic_score=c.score,
                )
            )
        return out

    def _retrieve_accuracy(self, queries: list[LogQuery]) -> list[RetrievedCandidate]:
        """정확도 모드: NER로 브랜드 추출 후 임베딩 검색."""
        best: dict[str, RetrievedCandidate] = {}
        best_weighted: dict[str, float] = {}

        for q in queries:
            extraction = extract_brand_from_query(q.query, self._brand_set)
            search_query = extraction.model_part or extraction.original_query
            if not search_query.strip():
                continue
            cands = self._embedding_retriever.retrieve(
                search_query,
                brand_filter=extraction.brand,
                top_k=self.config.top_k,
            )
            for c in cands:
                wscore = float(c.score) * float(q.weight)
                prev = best_weighted.get(c.canonical)
                if prev is None or wscore > prev:
                    best_weighted[c.canonical] = wscore
                    best[c.canonical] = RetrievedCandidate(
                        canonical=c.canonical,
                        score=wscore,
                        lexical_score=0.0,
                        semantic_score=c.score,
                    )

        merged = list(best.values())
        merged.sort(key=lambda x: x.score, reverse=True)
        return merged

    def _retrieve_for_queries(self, queries: list[LogQuery]) -> list[RetrievedCandidate]:
        """
        여러 query에 대해 retrieval을 수행하고 결과를 통합합니다.

        통합 방식:
        - canonical별로 최고(weighted) 점수를 유지
        - 최종 점수 = candidate.score * query.weight
        """
        if self._embedding_retriever is not None:
            return self._retrieve_accuracy(queries)

        best: dict[str, RetrievedCandidate] = {}
        best_weighted: dict[str, float] = {}

        for q in queries:
            cands = self._hybrid_retriever.retrieve(
                q.query,
                top_n_lexical=self.config.top_n_lexical,
                top_k=self.config.top_k,
            )
            for c in cands:
                wscore = float(c.score) * float(q.weight)
                prev = best_weighted.get(c.canonical)
                if prev is None or wscore > prev:
                    best_weighted[c.canonical] = wscore
                    best[c.canonical] = RetrievedCandidate(
                        canonical=c.canonical,
                        score=wscore,
                        lexical_score=float(c.lexical_score),
                        semantic_score=c.semantic_score,
                    )

        merged = list(best.values())
        merged.sort(key=lambda x: x.score, reverse=True)
        return merged

    def _apply_cosine_gate(
        self,
        pred: Prediction,
        primary_query: str,
        candidates: list[RetrievedCandidate],
    ) -> Prediction:
        """
        UNKNOWN이 아닌 예측에 대해 2단계 검증을 수행합니다:
        1) 어휘/서브스트링 중첩: primary_query와 예측 모델 간 중첩이 없으면 UNKNOWN
        2) 코사인 유사도: primary_query vs canonical 간 유사도가 임계치 미만이면 UNKNOWN
        3) top1이 거절되면 overlap+cosine 통과하는 다른 후보로 fallback (ARC ULTRA → ARCULTRA 등)

        사운드바가 아닌 기기(SKY, EE TV 등) 오탐을 줄이며, 정답이 top2~k에 있는 경우도 반영합니다.
        """
        # below_threshold/small_margin: top1 실패 시 cosine 통과하는 다른 후보 시도
        if pred.canonical_model is None and candidates and pred.evidence.get("reason") in (
            "below_threshold",
            "small_margin",
        ):
            pseudo = Prediction(
                canonical_model=candidates[0].canonical,
                confidence=pred.confidence,
                evidence=pred.evidence,
            )
            pred = self._apply_cosine_gate(pseudo, primary_query, candidates)

        if pred.canonical_model is None:
            return pred

        # generic_soundbar_rule(SOUNDBAR→ETC SOUNDBAR): 규칙 기반이므로 cosine gate로 덮어쓰지 않음
        if pred.evidence.get("reason") == "generic_soundbar_rule":
            return pred

        retriever = self._embedding_retriever or self._hybrid_retriever
        threshold = self.config.min_cosine_similarity
        min_lex = getattr(self.config, "min_lexical_similarity", None)

        # 질의 모델 파트 추출 (SONOS ARC ULTRA → ARCULTRA, canonical 모델 파트와 비교)
        extraction = extract_brand_from_query(primary_query, self._brand_set)
        q_model_part = (extraction.model_part or extraction.original_query or "").strip()
        q_compact = "".join(q_model_part.upper().split()) if q_model_part else ""

        # cosine 통과하는 후보 중 선택 (상위 N개 검사, 통과 시 렉시컬 재순위화)
        max_n = getattr(self.config, "cosine_gate_max_candidates", 5) or 5
        candidates_to_check = candidates[:max_n]

        # 배치 유사도 계산 (1~2회 encode로 N개 후보 처리, 속도 개선)
        passing: list[tuple[RetrievedCandidate, float]] = []
        if threshold is None or retriever is None:
            passing = [(c, 1.0) for c in candidates_to_check]
        else:
            canonicals_to_check = [c.canonical for c in candidates_to_check]
            try:
                batch_sims = (
                    retriever.compute_similarities_batch(primary_query, canonicals_to_check)
                    if hasattr(retriever, "compute_similarities_batch")
                    else [retriever.compute_similarity(primary_query, canon) for canon in canonicals_to_check]
                )
                if q_compact and hasattr(retriever, "compute_similarities_batch"):
                    mp_compacts = [
                        "".join((_get_canonical_model_part(c.canonical) or c.canonical or "").upper().split())
                        for c in candidates_to_check
                    ]
                    compact_sims = retriever.compute_similarities_batch(q_compact, mp_compacts)
                    batch_sims = [max(s, cs) for s, cs in zip(batch_sims, compact_sims)]
                for c, sim in zip(candidates_to_check, batch_sims):
                    mp_compact = "".join((_get_canonical_model_part(c.canonical) or c.canonical or "").upper().split())
                    if q_compact and mp_compact and q_compact == mp_compact:
                        passing.append((c, 1.0))
                        continue
                    if sim >= threshold:
                        passing.append((c, sim))
            except Exception:
                pass

        # 렉시컬 재순위화: cosine 동일 시 모델 파트 문자열 유사도로 우선순위
        def _lex_ratio(cand: RetrievedCandidate) -> float:
            mp = "".join(
                (_get_canonical_model_part(cand.canonical) or cand.canonical or "").upper().split()
            )
            return SequenceMatcher(None, q_compact, mp).ratio() if q_compact and mp else 0.0

        passing_with_lex = [(c, sim, _lex_ratio(c)) for c, sim in passing]
        passing_with_lex.sort(key=lambda x: (x[1], x[2]), reverse=True)  # cosine desc, lex desc
        best = (passing_with_lex[0][0], passing_with_lex[0][1], passing_with_lex[0][2]) if passing_with_lex else None

        # 렉시컬 거부 게이트: 모델 파트 유사도가 매우 낮으면 UNKNOWN (YAMAHA RX-A1040 vs SRB40 등)
        if best is not None and min_lex is not None and best[2] < min_lex:
            top1 = candidates[0] if candidates else None
            return Prediction(
                canonical_model=None,
                confidence=0.85,
                evidence={
                    "reason": "low_lexical_similarity",
                    "primary_query": primary_query,
                    "top1": top1.__dict__ if top1 else None,
                    "lexical_ratio": round(best[2], 4),
                    "threshold": min_lex,
                },
            )

        if best is not None:
            c, sim = best[0], best[1]
            return Prediction(
                canonical_model=c.canonical,
                confidence=float(min(1.0, max(0.0, c.score))),
                evidence={
                    "reason": "cosine_gate_accepted",
                    "primary_query": primary_query,
                    "selected": c.__dict__,
                    "cosine_similarity": round(sim, 4) if sim > 0 else None,
                },
            )

        # 모든 후보가 통과 실패 → UNKNOWN (cosine 기반)
        top1 = candidates[0] if candidates else None
        reason = "low_cosine_similarity"
        first_sim = 0.0
        if top1 and retriever and threshold is not None:
            try:
                first_sim = retriever.compute_similarity(primary_query, top1.canonical)
            except Exception:
                pass
        return Prediction(
            canonical_model=None,
            confidence=0.85,
            evidence={
                "reason": reason,
                "primary_query": primary_query,
                "top1": top1.__dict__ if top1 else None,
                "cosine_similarity": round(first_sim, 4) if first_sim > 0 else None,
                "threshold": threshold,
            },
        )

    def predict_for_query(self, typed: TypedQuery) -> PredictionResult:
        """
        단일 TypedQuery에 대해 사운드바 모델을 예측합니다.

        Args:
            typed: TypedQuery (type_, source, query, raw, weight)

        Returns:
            PredictionResult
        """
        log_q = LogQuery(query=typed.query, raw=typed.raw, source=typed.source, weight=typed.weight)
        if self._embedding_retriever is not None:
            extraction = extract_brand_from_query(typed.query, self._brand_set)
            search_query = extraction.model_part or extraction.original_query
            cands = self._embedding_retriever.retrieve(
                search_query,
                brand_filter=extraction.brand,
                top_k=self.config.top_k,
            )
            weighted = self._embedding_cands_to_retrieved(cands, typed.weight)
        else:
            cands = self._hybrid_retriever.retrieve(
                typed.query,
                top_n_lexical=self.config.top_n_lexical,
                top_k=self.config.top_k,
            )
            weighted = [
                RetrievedCandidate(
                    canonical=c.canonical,
                    score=float(c.score) * float(typed.weight),
                    lexical_score=float(c.lexical_score) if c.lexical_score is not None else 0.0,
                    semantic_score=c.semantic_score,
                )
                for c in cands
            ]
        pred: Prediction = self._verifier.verify([log_q], weighted)
        # small_margin이면 verifier가 불확실한 것 → cosine gate로 후보 재검토
        if (
            pred.canonical_model is None
            and pred.evidence.get("reason") == "small_margin"
            and weighted
        ):
            pseudo = Prediction(
                canonical_model=weighted[0].canonical,
                confidence=pred.confidence,
                evidence=pred.evidence,
            )
            pred = self._apply_cosine_gate(pseudo, typed.query, weighted)
        else:
            pred = self._apply_cosine_gate(pred, typed.query, weighted)
        return PredictionResult(
            row_id=None,
            predicted=pred.canonical_model,
            confidence=float(pred.confidence),
            primary_query=typed.raw,
            candidates=weighted,
            evidence=dict(pred.evidence),
        )

    def predict_row(self, row: dict[str, Any], *, row_id: Optional[int] = None) -> PredictionResult:
        """
        단일 로그 row(dict)에 대해 사운드바 모델을 예측합니다.
        (기존 통합 방식: 모든 소스 후보를 합쳐 1개 예측)

        Args:
            row: CSV row를 dict로 변환한 값
            row_id: 행 식별자(선택)

        Returns:
            PredictionResult
        """
        queries = build_log_queries_from_row(
            row,
            soundbar_brand_set=self._brand_set,
            include_bt=self.config.include_bt,
        )
        primary_log = choose_primary_query(queries)
        primary = primary_log.query if primary_log else (queries[0].query if queries else "")
        candidates = self._retrieve_for_queries(queries)
        pred: Prediction = self._verifier.verify(queries, candidates)
        pred = self._apply_cosine_gate(pred, primary, candidates)
        out_primary = primary_log.raw if primary_log else primary
        return PredictionResult(
            row_id=row_id,
            predicted=pred.canonical_model,
            confidence=float(pred.confidence),
            primary_query=out_primary,
            candidates=candidates,
            evidence=dict(pred.evidence),
        )

    def batch_predict(self, df: pd.DataFrame, *, row_id_col: Optional[str] = None) -> pd.DataFrame:
        """
        여러 로그 row를 일괄 예측합니다 (기존 통합 방식: 행당 1개 예측).

        Args:
            df: 로그 DataFrame
            row_id_col: row id 컬럼명이 있으면 이를 사용(없으면 인덱스 사용)

        Returns:
            예측 결과 DataFrame
        """
        rows: list[dict[str, Any]] = df.to_dict(orient="records")
        out_rows: list[dict[str, Any]] = []
        for i, r in enumerate(rows):
            rid = i
            if row_id_col and row_id_col in df.columns:
                try:
                    rid = int(r.get(row_id_col))
                except Exception:
                    rid = i

            res = self.predict_row(r, row_id=rid)
            top1_score = res.candidates[0].score if res.candidates else None
            top5 = [c.canonical for c in res.candidates[:5]]
            top5_candidates = [c.__dict__ for c in res.candidates[:5]]
            out_rows.append(
                {
                    "row_id": rid,
                    "predicted_model": res.predicted if res.predicted is not None else "UNKNOWN",
                    "true_model": "",
                    "confidence": res.confidence,
                    "primary_query": res.primary_query,
                    "top1_score": top1_score,
                    "top5": json_dumps_safe(top5),
                    "top5_candidates": json_dumps_safe(top5_candidates),
                    "evidence": json_dumps_safe(res.evidence),
                }
            )
        return pd.DataFrame(out_rows)

    def batch_predict_per_source(self, df: pd.DataFrame, *, row_id_col: Optional[str] = None) -> pd.DataFrame:
        """
        여러 로그 row를 소스별(BT/HDMI)로 독립 예측합니다.
        행당 최대 5개(BT 1 + HDMI 4) 예측 결과를 반환합니다.

        Args:
            df: 로그 DataFrame
            row_id_col: row id 컬럼명이 있으면 이를 사용(없으면 인덱스 사용)

        Returns:
            예측 결과 DataFrame (row_id, type, predicted_model, true_model, primary_query 등)
        """
        rows: list[dict[str, Any]] = df.to_dict(orient="records")
        out_rows: list[dict[str, Any]] = []
        out_id = 0
        for i, r in enumerate(rows):
            rid = i
            if row_id_col and row_id_col in df.columns:
                try:
                    rid = int(r.get(row_id_col))
                except Exception:
                    rid = i

            typeds = build_typed_queries_from_row(
                r,
                soundbar_brand_set=self._brand_set,
                include_bt=self.config.include_bt,
            )
            for tq in typeds:
                res = self.predict_for_query(tq)
                top1_score = res.candidates[0].score if res.candidates else None
                top5 = [c.canonical for c in res.candidates[:5]]
                top5_candidates = [c.__dict__ for c in res.candidates[:5]]
                out_rows.append(
                    {
                        "id": out_id,
                        "row_id": rid,
                        "type": tq.type_,
                        "predicted_model": res.predicted if res.predicted is not None else "UNKNOWN",
                        "true_model": "",
                        "confidence": res.confidence,
                        "primary_query": res.primary_query,
                        "top1_score": top1_score,
                        "top5": json_dumps_safe(top5),
                        "top5_candidates": json_dumps_safe(top5_candidates),
                        "evidence": json_dumps_safe(res.evidence),
                    }
                )
                out_id += 1
        return pd.DataFrame(out_rows)


def json_dumps_safe(obj: Any) -> str:
    """
    JSON 직렬화를 안전하게 수행합니다.

    Args:
        obj: 임의 객체

    Returns:
        JSON 문자열(실패 시 repr)
    """
    import json

    try:
        return json.dumps(obj, ensure_ascii=False)
    except Exception:
        return repr(obj)



## 3. 데이터 로드 및 예측


In [None]:
import time
import pandas as pd

# CSV 로드
log_res = load_hdmi_bt_log_csv(INPUT_CSV, encoding="utf-8")
df_log = log_res.df
records = load_soundbar_db_from_py(SOUNDBAR_DB)
brand_set = get_brand_set(records)
print("로그 행 수:", len(df_log))
print("사운드바 DB 레코드 수:", len(records))

# Agent 생성 및 일괄 예측
config = AgentConfig(
    top_n_lexical=200,
    top_k=20,
    embedding_model_id="BAAI/bge-small-en-v1.5",
    use_embeddings=True,
    include_bt=True,
    accept_score_threshold=0.52,
    margin_threshold=0.05,
    accuracy_mode=True,
    min_cosine_similarity=0.85,
)
agent = SoundbarModelAgent(SOUNDBAR_DB, config=config)

t0 = time.perf_counter()
pred_df = agent.batch_predict(df_log)
elapsed = time.perf_counter() - t0
print(f"예측 완료: {len(pred_df)}행, 소요 시간: {elapsed:.1f}초")
display(pred_df.head(10))


## 4. 결과 저장


In [None]:
try:
    pred_df.to_csv(OUTPUT_PRED, index=False, encoding="utf-8")
    print("저장 완료:", OUTPUT_PRED)
except OSError as e:
    print("저장 실패:", e)
