# 1. 다중 Aspect 검출 로직

In [2]:
import pandas as pd
from pathlib import Path
import yaml
import re
from typing import Dict, Tuple

import re
import yaml
from pathlib import Path
from typing import Dict, List, Tuple

# ──────────────────────────────────────────────────────────────
# 1. 설정 값
# ──────────────────────────────────────────────────────────────
BRAND_YML = Path(
    "/content/drive/MyDrive/9. Lab/BARQA/Packaging & Download/Data/brands_0519.yaml"
)
ASPT_YML = Path(
    "/content/drive/MyDrive/9. Lab/BARQA/Packaging & Download/Data/aspects_0519.yaml"
)

# 삼성 기기에서만 의미가 있는 전용 어스펙트
SAMSUNG_REQ_ASP = {
    "Camera General", "Design General", "Chipset General",
    "Sustainability General", "Portrait Studio",
    "On-device", "Grip", "Thin", "Galaxy AI General",
}

SAMSUNG_KW_RX = re.compile(r"(?i)\b(?:samsung|samsung's|galaxy|갤럭시|삼성)\b")

# ──────────────────────────────────────────────────────────────
# 2. 보조 함수
# ──────────────────────────────────────────────────────────────
def _load_yaml(path: Path) -> Dict[str, str]:
    """YAML 파일을 dict로 로드"""
    with path.open(encoding="utf-8") as f:
        return yaml.safe_load(f)

def _compile_dict(src: Dict[str, str], flags: int = 0) -> Dict[str, re.Pattern]:
    """정규표현식 문자열 딕셔너리를 컴파일하여 반환"""
    return {k: re.compile(v, flags) for k, v in src.items()}

# ──────────────────────────────────────────────────────────────
# 3. 메인 클래스
# ──────────────────────────────────────────────────────────────
class BrandClassifier:
    def __init__(self) -> None:
        self.brand_rx  = _compile_dict(_load_yaml(BRAND_YML), flags=re.I)
        self.aspect_rx = _compile_dict(_load_yaml(ASPT_YML), flags=re.I)


    def classify(
        self, text: str, used_llm: bool = False
    ) -> Tuple[List[str], List[str], List[str], List[str]]:
        """문장에서 브랜드/어스펙트를 추출해 4-튜플로 반환"""

        brand_hits   = [(br, m.start()) for br, rx in self.brand_rx.items() if (m := rx.search(text))]
        aspect_hits  = [(ap, m.start()) for ap, rx in self.aspect_rx.items()  if (m := rx.search(text))]

        brand_hits.sort(key=lambda x: x[1])
        aspect_hits.sort(key=lambda x: x[1])

        detected_brands   = [b for b, _ in brand_hits]
        detected_aspects  = [a for a, _ in aspect_hits]

        # 2) 삼성 전용 어스펙트 보정
        final_brands, final_aspects = [], []
        for asp in detected_aspects:
            if asp not in SAMSUNG_REQ_ASP:
                # 일반 어스펙트 → 브랜드가 명시 안 돼도 Samsung으로 우선 지정
                final_brands.append("Samsung")
                final_aspects.append(asp)
            else:
                # 삼성 전용 어스펙트는 실제 삼성 언급이 있는지 확인
                if "Samsung" in detected_brands or SAMSUNG_KW_RX.search(text):
                    final_brands.append("Samsung")
                    final_aspects.append(asp)
                else:
                    # 무효 플래그만 남겨 휴먼 후처리 가능하게
                    final_brands.append("Unknown")
                    final_aspects.append("brand_required_aspect_detected")

        # 3) 어스펙트 없음 & 브랜드만 있는 경우 보완
        if not detected_aspects and detected_brands:
            final_brands  = list(set(detected_brands))
            final_aspects = final_brands.copy()

        # 4) 기본값 보정
        if not final_brands:
            final_brands = ["Unknown"]
        if not final_aspects:
            final_aspects = ["general"]

        # 5) 삼성 외 브랜드도 어스펙트로 추가
        others = [b for b in detected_brands if b != "Samsung"]
        final_brands.extend(others)
        final_aspects.extend(others)

        # 6) 정리: 중복·보조 토큰 제거
        final_brands  = [b for b in dict.fromkeys(final_brands)  if b != "Unknown"]
        final_aspects = [
            a for a in dict.fromkeys(final_aspects)
            if a not in {"Unknown", "brand_required_aspect_detected"}
        ]

        return final_brands, final_aspects, detected_brands, detected_aspects


# # YAML 경로
# BRAND_YML = Path("/content/drive/MyDrive/9. Lab/BARQA/Packaging & Download/Data/brands_0519.yaml")
# ASPT_YML  = Path("/content/drive/MyDrive/9. Lab/BARQA/Packaging & Download/Data/aspects_0519.yaml")

# def _load_yaml(path: Path) -> Dict[str, str]:
#     with path.open(encoding="utf-8") as f:
#         return yaml.safe_load(f)

# def _compile_dict(pattern_src: Dict[str, str],
#                   flags: int = 0) -> Dict[str, re.Pattern]:
#     return {k: re.compile(v, flags) for k, v in pattern_src.items()}

# class BrandClassifier:
#     def __init__(self) -> None:
#         brand_src   = _load_yaml(BRAND_YML)
#         aspect_src  = _load_yaml(ASPT_YML)

#         self.brand_rx  = _compile_dict(brand_src)
#         self.aspect_rx = _compile_dict(aspect_src)
#         self._samsung_kw_rx = re.compile(r"(?i)\b(?:samsung|samsung's|galaxy|갤럭시|삼성)\b")

#     def classify(self, text: str, used_llm: bool = False) -> Tuple[list, list]:
#         samsung_required_asp = [
#             'Camera General', 'Design General', 'Chipset General',
#             'Sustainability General', 'Portrait Studio',
#             'On-device', 'Grip', "Thin", "Galaxy AI General"
#         ]

#         brand_hits = []
#         aspect_hits = []

#         # 1. 모든 브랜드 탐색
#         for brand, rx in self.brand_rx.items():
#             match = rx.search(text)
#             if match:
#                 brand_hits.append((brand, match.start()))

#         # 2. 모든 어스펙트 탐색
#         for asp, arx in self.aspect_rx.items():
#             match = arx.search(text)
#             if match:
#                 aspect_hits.append((asp, match.start()))

#         # 3. 정렬 (등장 순서 기준)
#         brand_hits.sort(key=lambda x: x[1])
#         aspect_hits.sort(key=lambda x: x[1])

#         # 4. 리스트 추출
#         detected_brands = [b for b, _ in brand_hits]
#         detected_aspects = [a for a, _ in aspect_hits]

#         # 5. 로직 적용: 삼성 전용 aspect 처리
#         final_brands = []
#         final_aspects = []

#         for asp in detected_aspects:
#             if asp not in samsung_required_asp:
#                 final_aspects.append(asp)
#                 final_brands.append("Samsung")

#                 # 삼성 전용 아님 → 브랜드 없이도 허용 (또는 Samsung으로 가정할 수도 있음)
#             else:
#                 if "Samsung" in detected_brands or self._samsung_kw_rx.search(text):
#                     final_aspects.append(asp)
#                     final_brands.append("Samsung")
#                 else:
#                     # 삼성 키워드 없으면 해당 aspect는 무효 처리 or 특별히 표기
#                     final_aspects.append("brand_required_aspect_detected")
#                     final_brands.append("Unknown")

#         # 6. 후처리

#         if not detected_aspects and detected_brands:
#           final_aspects = list(set(detected_brands))
#           final_brands = list(set(detected_brands))

#         if not final_brands:
#             final_brands = {"Unknown"}

#         if not final_aspects:
#           if final_brands:
#             final_aspects = list(final_brands)
#           else:
#             final_aspects = ["general"]
#         else:
#           remain = [b for b in detected_brands if b != "Samsung"]
#           final_aspects.extend(remain)
#           final_brands.extend(remain)

#         final_brands = [ a for a in final_brands if a not in {"Unknown"}]
#         final_aspects = [ a for a in final_aspects if a not in {"Unknown","brand_required_aspect_detected"}]

#         return list(final_brands), final_aspects, detected_brands, detected_aspects

## Sample Test

In [3]:
classifier = BrandClassifier()
# txt = ("If you want to find out about the camera, processor and – especially – AI improvements in those devices, read our hands-on Samsung Galaxy S25 review, hands-on Samsung Galaxy S25 Plus review and our hands-on Samsung Galaxy S25 Ultra review.")
txt = """From the city streets to the open road, Changan’s electric vehicles have managed to combine sleek design, rugged capability, and sophisticated technology."""
brands, aspects, detected_b, detected_a = classifier.classify(txt)

print("📦 브랜드 목록:", brands)      # ['Samsung', 'Samsung', 'Apple']
print("🧩 어스펙트 목록:", aspects)    # ['Thin', 'Galaxy AI General', 'Apple']
print("- Raw 브랜드 :", detected_b)   # ['Samsung', 'Apple']
print("- Raw 어스펙트 :", detected_a) # ['Thin', 'Galaxy AI General']

📦 브랜드 목록: []
🧩 어스펙트 목록: []
- Raw 브랜드 : []
- Raw 어스펙트 : ['Design General']


# 3. [Package 코드] 다중 aspect에 대한 행 분리

In [4]:
import pandas as pd
import numpy as np
from pathlib import Path
from google.colab import drive

MODE = "prod"
drive.mount("/content/drive")

INPUT_PATH = Path("/content/translated_crawl_results.xlsx")
df = pd.read_excel(INPUT_PATH) #main 미사용시

if MODE == "dev":
    df["True_Aspect"] = df["Aspect"].fillna(df["Keywords(Company)"])
    df.drop_duplicates(['Media Title', 'Conversation Stream', 'True_Aspect'], inplace=True)

clf = BrandClassifier()
rows = []

for conv_id, g in df.groupby("Conversation Stream", sort=False):
    pred_brands, pred_aspects, *_ = clf.classify(str(conv_id))
    url = g["url"].iloc[0] if "url" in g.columns else np.nan
    pred_map = {asp.casefold(): br for br, asp in zip(pred_brands, pred_aspects)}

    if MODE == "dev":
        for true_val, senti in zip(g["True_Aspect"], g["Sentiment"]):
            norm_true = str(true_val).strip().casefold()
            if norm_true in pred_map:
                rows.append((g["Media Title"].iloc[0], conv_id, true_val, norm_true, pred_map[norm_true], senti, url))
                del pred_map[norm_true]
            else:
                rows.append((g["Media Title"].iloc[0], conv_id, true_val, np.nan, np.nan, senti, url))

    if MODE == "prod" and not pred_map:
        rows.append((g["Media Title"].iloc[0], conv_id, np.nan, np.nan, url))

    for asp_cf, br in pred_map.items():
        asp = next(a for a in pred_aspects if a.casefold() == asp_cf)
        if MODE == "dev":
            rows.append((g["Media Title"].iloc[0], conv_id, np.nan, asp, br, "NEED_LABELING", url))
        else:
            rows.append((g["Media Title"].iloc[0], conv_id, asp, br, url))

if MODE == "dev":
    df_expanded = pd.DataFrame(
        rows,
        columns=[
            "Media Title",
            "Conversation Stream",
            "True_Aspect",
            "Predicted_Aspect",
            "Predicted_Brand",
            "True_Sentiment",
            "url"
        ],
    )
else:
    df_expanded = pd.DataFrame(
        rows,
        columns=[
            "Media Title",
            "Conversation Stream",
            "Predicted_Aspect",
            "Predicted_Brand",
            "url"
        ],
    )


df_implicit = df_expanded.copy()

# 명시적 (explicit) 판단 기준: 'Predicted_Aspect'가 'general'이 아님
df_explicit = df_expanded[
    df_expanded["Predicted_Aspect"].notna() &
    (df_expanded["Predicted_Aspect"].str.lower() != "general")
].copy()

# 파일 경로 설정
OUTPUT_PATH_1 = "/content/implicit_case.xlsx"
OUTPUT_PATH_2 = "/content/Aspect_Extraction_explicit.xlsx"

df_implicit.to_excel(OUTPUT_PATH_1, index=False, engine="openpyxl")
df_explicit.to_excel(OUTPUT_PATH_2, index=False, engine="openpyxl")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## dev 모드일 경우, 휴먼에러 전처리

In [None]:
if MODE == "dev":
  df_dev = df_expanded.copy()
else:
  df_prod = df_expanded.copy()

NameError: name 'df_expanded' is not defined

In [None]:
if MODE == "dev":
    df_dev_processed = df_dev.copy()
    df_dev_processed["수기 라벨링"] = False
    df_dev_processed["Implicit"] = False

    drop_list   = []          # 나중에 한꺼번에 삭제
    update_rows = {}          # {대상인덱스들: 새 Predicted 값}

    for cid, grp in df_dev_processed.groupby("Conversation Stream"):

      # 1) 단일 행 & 수기 라벨링
      if len(grp) == 1:
          i = grp.index[0]

          # a) 수기 라벨링
          if pd.isna(grp.at[i, "True_Aspect"]) and pd.notna(grp.at[i, "Predicted_Aspect"]):
              df_dev_processed.at[i, "수기 라벨링"] = True

          # b) Implicit 플래그
          if pd.isna(grp.at[i, "Predicted_Aspect"]):
              df_dev_processed.at[i, "Implicit"] = True
          continue  # 단일 행 그룹은 여기서 끝

      # 2) Samsung 행 여부
      mask_s = grp["True_Aspect"].astype(str).str.strip().str.casefold() == "samsung"
      samsung_idx = grp.index[mask_s]

      if len(samsung_idx):
          # a) 삭제 예약
          drop_list.extend(samsung_idx)

          # b) 'True_Aspect가 NaN 이고 Predicted_Aspect는 notna' 행만 골라 True_Aspect 채우기
          cond_fill = grp.index.difference(samsung_idx)          # (1) 삼성-행 제외
          cond_fill = cond_fill.intersection(                    # (2) 두 조건 모두 만족
              grp.index[ grp["True_Aspect"].isna()               #     · True_Aspect 가 NaN
                        & grp["Predicted_Aspect"].notna() ]     #     · Predicted_Aspect 는 값이 있음
          )

          # (3) 업데이트 예약: {행 index : 새 True_Aspect 값(= Predicted_Aspect)}
          update_true = {i: df_dev_processed.at[i, "Predicted_Aspect"] for i in cond_fill}

          # (4) update_rows 사전에 합쳐 둠
          update_rows.update(update_true)

          # ▶ new: 그룹 전체에 "if" 표시
          df_dev_processed.loc[grp.index, "Samsung_branch"] = "if_has_samsung"


      # 3) 비-Samsung 행이면서 Predicted도 samsung이 아닐 때
      else:
          # ── 1) 이 그룹에 Predicted_Aspect == 'samsung' 값이 있는지 확인
          has_pa_samsung = (
              grp["Predicted_Aspect"]
              .astype(str).str.strip().str.casefold()
              .eq("samsung")
          ).any()

          if not has_pa_samsung:                             # ▸ 한 개도 없을 때만 실행
              # (1) Predicted_Aspect 가 NaN 이 아닌 행만 골라서
              mask_notna = df_dev_processed.loc[grp.index, "Predicted_Aspect"].notna()
              target_idx = grp.index[mask_notna]

              # 그룹 모든 행의 Predicted_Aspect ← 같은 행의 True_Aspect
              df_dev_processed.loc[target_idx, "True_Aspect"] = (
                  df_dev_processed.loc[target_idx, "Predicted_Aspect"]
              )
          # ▶ new: 그룹 전체에 "else_no_samsung" 표시
          df_dev_processed.loc[grp.index, "Samsung_branch"] = "else_no_samsung"

    # ── 일괄 적용 ─────────────────────────────────────────────
    if drop_list:
      df_dev_processed.drop(index=drop_list, inplace=True)

    for j, new_val in update_rows.items():
      if j in df_dev_processed.index:                               # 존재 확인
          df_dev_processed.at[j, "True_Aspect"] = new_val

    # ── (dev) Explicit & Implicit 분리───────────────────────────────────
    df_dev_explicit = df_dev_processed[df_dev_processed['Implicit']==False]
    df_implicit = df_dev_processed[df_dev_processed['Implicit']==True]

In [None]:
import pandas as pd
import numpy as np

def merge_rows(g: pd.DataFrame) -> pd.DataFrame:
    """
    그룹 g(같은 Conversation  Stream)를 받아서
    조건에 맞으면 2행 → 1행으로 축소, 아니면 그대로 반환
    """
    if len(g) == 2:
        # NaN 아닌 값이 정확히 하나씩 있는지 확인
        true_vals = g["True_Aspect"].dropna().unique()
        true_sent = g["True_Sentiment"].dropna().unique()
        pred_vals = g["Predicted_Aspect"].dropna().unique()
        pred_brands = g["Predicted_Brand"].dropna().unique()

        if len(true_vals) == 1 and len(pred_vals) == 1:
            # 새 행 만들기: 원본 첫 행을 복사해 값 채움
            new_row = g.iloc[0].copy()
            new_row["True_Aspect"]      = true_vals[0]
            new_row["Predicted_Aspect"] = pred_vals[0]
            new_row["Predicted_Brand"]   = pred_brands[0]
            new_row["True_Sentiment"]    = true_sent[0]
            return pd.DataFrame([new_row])   # 1행짜리 DF 반환

    # 조건을 만족하지 않으면 원본 그대로
    return g

if MODE == "dev":
  df_explicit = (
      df_dev_explicit
        .groupby("Conversation Stream", group_keys=False)
        .apply(merge_rows)
        .reset_index(drop=True)          # 선택: 인덱스 재정렬
  )

In [None]:
if MODE == "prod":
  df_implicit = df_prod[df_prod['Predicted_Aspect'].isna()]
  df_explicit = df_prod[~(df_prod['Predicted_Aspect'].isna())]

NameError: name 'df_prod' is not defined

In [None]:
df_implicit.to_excel(f"{OUTPUT_PATH_1}", index=False)
df_explicit.to_excel(f"{OUTPUT_PATH_2}", index=False)

NameError: name 'df_implicit' is not defined

# 참고. Sentiment Analysis 모델링을 위한 전처리 (라벨링 데이터에 끼워맞추기)

In [None]:
# # True_Aspect에 값이 있고 Predicted_Aspect와 동일한 행의 개수 계산
# count1 = len(df_expanded[(df_expanded['Predicted_Aspect'].notna()) & (df_expanded['True_Aspect'].apply(lambda x: str(x).strip().casefold()) == df_expanded['Predicted_Aspect'].apply(lambda x: str(x).strip().casefold()))])
# print(f"True_Aspect = Predicted_Aspect 행의 개수 (Package 코드): {count1}")
# count2 = len(df_explicit[(df_explicit['True_Aspect'].apply(lambda x: str(x).strip().casefold()) == df_explicit['Predicted_Aspect'].apply(lambda x: str(x).strip().casefold()))])
# print(f"True_Aspect = Predicted_Aspect 행의 개수 (Package 코드): {count2}")

In [None]:
# # 1. 엑셀 파일 로드
# df = pd.read_excel("(raw-2) 0523_S_core_explicit (수정).xlsx")  # 파일명에 맞게 경로 수정

# # 2. 분류기 인스턴스
# classifier = BrandClassifier()

# # 3. 결과 저장용 리스트
# wrong_predictions = []

# # 4. 각 행마다 분류하고, 오답일 경우만 저장
# for i, row in df.iterrows():
#     text = str(row["Conversation Stream"])
#     true_aspect = str(row["True_Aspect"]).strip().casefold()
#     company = row.get("Keywords(Company)", "")

#     brands, aspects, detected_brands, detected_aspects = classifier.classify(text)

#     true_list = list(map(lambda x : x.strip().casefold(), brands)) + list(map(lambda x: x.strip().casefold(), aspects))

#     # 조건 만족 시 Predicted_Aspect를 True_Aspect로 덮어쓰기
#     if true_aspect in true_list:
#         df.at[i, "Predicted_Aspect"] = df.at[i, "True_Aspect"]
#         df.at[i, "Modified"] = True

# df_a = df.copy()
# len(df[df["Predicted_Aspect"]==df["True_Aspect"]])