# Cluster_model (Blinded)

이 노트북은 내부 고유명사/회사명 제거 및 중립화 처리된 연구용 버전입니다.
실제 데이터/브랜드/서비스명은 포함하지 않습니다.

코드 구현 상세는 `modules/clustering_core.py` 를 참고

In [None]:
import pandas as pd
import numpy as np
import re
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import AgglomerativeClustering

pd.options.mode.chained_assignment = None


1. 데이터 로드

In [None]:
# -----------------------------
# 1. Load Data 
# -----------------------------
# 실제 운영 환경에서는 웹 업로드 또는 DB 연동으로 대체될 수 있음
data = pd.read_excel("sample_product_data.xlsx")  

# (Context) 이 단계는 상품명 분류 파이프라인 중,
# '시리즈(Series) 단위로 묶어야 하는 대상만 선별'하는 1차 필터입니다.
# 예: 특정 L_category에 상품이 충분히 많이 모여 있는 경우만 Series 후보로 간주

# -----------------------------
# 2. Identify categories that need grouping
# -----------------------------
# 1) L_category별 데이터 수 집계
count_df = data.groupby('L_category')['L_category'].count().reset_index(name='count')

# 2) 시리즈로 묶을지 여부 결정 기준 (count > 3 이면 series 대상)
#    - 기준값은 데이터 특성과 비즈니스 판단에 따라 유동적 (예: 3개 이상이면 패턴이 존재할 가능성 판단)
count_df['series_yn'] = np.where(count_df['count'] > 3, 'y', 'n')

# 3) 원본 데이터에 series 여부 병합
use_df = data.merge(count_df[['L_category', 'series_yn']], on='L_category', how='left')

# -----------------------------
# 3. 시리즈 대상만 필터링
# -----------------------------
df = use_df[use_df['series_yn'] == 'y']
# df.to_excel("depth_test.xlsx", index=False)  # (로컬 export 시 사용)


2. 데이터 전처리를 통한 분류 패턴 유도

In [None]:
# -----------------------------
# 2. 데이터 전처리 (Text Normalization & Noise Trimming)
# -----------------------------
# 이 단계는 "좁혀 들어가는 방식"의 전처리 전략을 사용합니다.
# 특히 내부 콘텐츠 담당자들이 상품명을 작성할 때,
# - 사이트명 / 대분류 / 세부 태그를 앞단 혹은 끝단에 붙이는 특성이 존재한다고 가정합니다.
# 따라서 1차로 일반적인 특수문자/공백 정리 → 2차로 L_category / 사이트명 등을 제거해
# "실제 핵심 상품명만 남도록" 트리밍합니다.
#
# ex) [브랜드패스] JLPT N5 끝장패키지 → 노이즈 제거 후 "n5"
#     → 이후 클러스터링에 직접적으로 유용한 핵심 키워드만 남기는 전략

def clean_text(text):
    """기본적인 특수문자/괄호 제거 전처리 + lower 정규화"""
    text = str(text)
    text = re.sub(r"[\[\]\(\)]", " ", text)  # 괄호 제거 (내부 한글은 유지)
    text = re.sub(r"[^가-힣A-Za-z0-9\s]", " ", text)
    text = re.sub(r"\s+", " ", text)
    return text.strip().lower()

# 특수 공백(Zero-width, NBSP 등) 정규화
_WS_HARD = r"[\u00A0\u202F\u2007\u2000-\u2006\u2008-\u200A\u200B-\u200D\u2060]"
def _normalize_spaces(s: str) -> str:
    s = re.sub(_WS_HARD, " ", s)
    s = re.sub(r"\s+", " ", s)
    return s.strip()

# 문자열을 퍼지 매칭용 정규식으로 변환 (노이즈 제거용)
def _build_fuzzy_pattern(cat: str) -> str:
    base = re.sub(_WS_HARD, "", cat)
    base = re.sub(r"[\s\W_]+", "", base)
    parts = [re.escape(ch) for ch in base]
    if not parts:
        return r"$^"  # (빈 패턴 방지)
    return r"(?:[\s\W_]*)".join(parts)

def clean_and_trim_text(row):
    """
    상품명(pname)에서 L_category / 사이트명 등을 fuzzy하게 제거하여
    핵심 상품명만 남기는 'trim' 처리
    """
    text = str(row["pname"])
    category = str(row["L_category"])
    site_name = str(row.get("site_name", ""))  # 내부 필드명 노출 회피용 (브랜드나 채널명 등)

    text = re.sub(r"[\[\]\(\)]", " ", text)  # 괄호 제거
    text_norm = _normalize_spaces(text)

    # L_category 제거
    if category and category.lower() != "nan":
        cat_pat = _build_fuzzy_pattern(_normalize_spaces(category))
        text_norm = re.sub(cat_pat, " ", text_norm, flags=re.IGNORECASE)

    # site_name 제거
    if site_name and site_name.lower() != "nan":
        site_pat = _build_fuzzy_pattern(_normalize_spaces(site_name))
        text_norm = re.sub(site_pat, " ", text_norm, flags=re.IGNORECASE)

    # 다시 불필요한 특수문자 제거 + 공백/소문자 정규화
    text_norm = re.sub(r"[^가-힣A-Za-z0-9\s]", " ", text_norm)
    text_norm = _normalize_spaces(text_norm).lower()
    return text_norm

# 1차 기본 전처리
df["product_name_clean"] = df["pname"].astype(str).apply(clean_text)
# 2차 전처리 (카테고리 / 사이트명 제거 후 핵심만 남기기)
df["product_name_trimmed"] = df.apply(clean_and_trim_text, axis=1)

# -----------------------------
# 3. 테스트용 필터 (프론트엔드에서 실제로 선택되는 조건 역할)
# -----------------------------
df_test = df[
    (df["site_code"] == "sample_channel") &
    (df["L_category"].isin(["sample_L1", "sample_L2", "sample_L3", "sample_series_A"]))
].copy()
df_test.reset_index(drop=True, inplace=True)

print("전처리 결과 샘플:")
df_test[["site_name", "L_category", "pname", "product_name_trimmed"]]


3. Bert 임베딩

In [None]:
# -----------------------------
# 3. Sentence-BERT 모델 로드
# -----------------------------
# 한국어 기반 상품명을 '의미 유사도' 기준으로 군집화하기 위해
# HuggingFace 공개 모델을 사용합니다. (언어 의존적 유사도 학습에 최적화)
# → 단순 문자열 매칭이 아닌 의미 embedding 기반 Clustering이 가능해짐

model = SentenceTransformer("jhgan/ko-sroberta-multitask")  
# 위 모델은 한국어 문장 임베딩 성능 대비 속도가 빠른 편이라 실무형 분류에 적합하다고 판단

4-1. 앞단 문맥 중심 임베딩에 집중한 1차 클러스터링 구조

In [None]:
# -----------------------------
# 4. "앞단 중심" 임베딩 전략
# -----------------------------
# 교육/커머스 도메인에서는 상품명 앞단에 브랜드명 / 핵심 컨셉이 배치되는 경우가 매우 많음
# 예)
#   [브랜드명] JLPT N5 끝장패키지
#   [플랫폼명] 토익 700+ 실전반
# → 따라서 '문장 전체'가 아닌 '앞단 토큰'에 가중치를 더 주는 커스텀 Embedding 전략 사용

def front_only_embedding(text, front_ratio=0.8, min_tokens=2, decay_rate=0.25):
    tokens = re.split(r"\s+", str(text).strip())
    n = len(tokens)
    if n == 0:
        return np.zeros(model.get_sentence_embedding_dimension())

    # 앞단 토큰 비중 설정
    split_idx = max(min_tokens, int(n * front_ratio))
    front_tokens = tokens[:split_idx]
    back_tokens = tokens[split_idx:]

    # 앞단 토큰 고가중치 임베딩
    front_emb = model.encode(front_tokens, show_progress_bar=False)
    front_weights = np.array([np.exp(-decay_rate * i) for i in range(len(front_tokens))])
    front_weights /= front_weights.sum()
    front_vec = np.average(front_emb, axis=0, weights=front_weights)

    # 뒷단도 너무 생략되지 않도록 0.01 비중으로 최소 반영
    if back_tokens:
        back_emb = model.encode(back_tokens, show_progress_bar=False)
        back_vec = np.mean(back_emb, axis=0) * 0.01
    else:
        back_vec = np.zeros(model.get_sentence_embedding_dimension())

    return front_vec + back_vec

# -----------------------------
# 5. L_category별 1차 군집화 (semantic grouping)
# -----------------------------
# 동일 L_category 내에서 의미 기반으로 상품명 세분화
# Agglomerative Clustering 활용 (사전 k 지정 없이 distance threshold 기반 자동 분기)
# → 실제 실무에서는 사람이 수동으로 분류하던 패턴을 자동화하기 위한 시도

cluster_results = []

for cat in sorted(df_test["L_category"].unique()):
    subset = df_test[df_test["L_category"] == cat].copy()
    if len(subset) < 2:
        print(f"{cat} 카테고리는 상품이 {len(subset)}개라 스킵됨")
        continue

    subset["embedding_front"] = subset["product_name_trimmed"].apply(
        lambda x: front_only_embedding(x, front_ratio=0.7, min_tokens=2, decay_rate=0.25)
    )
    X_front = np.vstack(subset["embedding_front"].values)

    cluster_front = AgglomerativeClustering(
        n_clusters=None,
        distance_threshold=9,
        metric='euclidean',
        linkage='ward'
    )
    subset["cluster_lv1"] = cluster_front.fit_predict(X_front)
    cluster_results.append(subset)

if not cluster_results:
    raise ValueError("클러스터 결과가 비어 있습니다. df_test 조건 또는 데이터 확인 필요")

df_lv1 = pd.concat(cluster_results, ignore_index=True)
print(f"✅ 1차 클러스터링 완료: 총 {len(df_lv1)}개 상품 처리됨\n")

# -----------------------------
# 6. 대표명(대표_lv1명) 생성
# -----------------------------
# 자동 분류된 클러스터에 사람이 '대표명'을 빠르게 부여할 수 있도록
# 머신이 추천하는 초간단 Core Phrase 기반의 대표명 생성

def extract_core_phrase(text):
    text = str(text)
    text = re.sub(r"\bv\d+|\bver\d+|\b\d{2,4}(\.\d+)?\b", "", text)    # 버전 관련 제거
    text = re.sub(r"(교재|포함|미포함|only|ver|버전)", "", text)      # 의미 분류에 불필요한 단어 제거
    text = re.sub(r"\s+", " ", text).strip()
    tokens = text.split()
    core = " ".join(tokens[:3]) if len(tokens) > 3 else text
    return core.strip()

df_lv1["대표핵심어"] = df_lv1["product_name_trimmed"].apply(extract_core_phrase)

rep_lv1 = (
    df_lv1.groupby(["site_name", "L_category", "cluster_lv1"])
    .agg(대표핵심어=("대표핵심어", lambda x: x.mode()[0] if not x.mode().empty else x.iloc[0]))
    .reset_index()
)

rep_lv1["대표_lv1명"] = rep_lv1.apply(
    lambda row: f"{row['site_name']}_{row['L_category']}_{row['대표핵심어']}", axis=1
)

df_lv1 = df_lv1.merge(
    rep_lv1[["site_name", "L_category", "cluster_lv1", "대표_lv1명"]],
    on=["site_name", "L_category", "cluster_lv1"],
    how="left"
)

# -----------------------------
# 7. 결과 인사이트 프린트 (사람 검증용)
# -----------------------------
print("=== 1차 (L_category별 앞단 중심) Clustering 결과 ===\n")
for cat, group in df_lv1.groupby("L_category"):
    print(f"[{cat}] 카테고리 내 클러스터")
    for rep_name, subset in group.groupby("대표_lv1명"):
        print(f" ▶ {rep_name} ({len(subset)}개)")
        for s in subset["product_name_trimmed"].tolist():
            print(f"   - {s}")
        print()
    print("-" * 70)

# 후속 단계에서 사용될 핵심 결과셋
front_df_lv1 = (
    df_lv1[['site_name', 'L_category', '대표_lv1명', 'pname']]
    .sort_values(by=['L_category','대표_lv1명'])
    .reset_index(drop=True)
)


4-2. 뒷단 문맥 중심 임베딩에 집중한 2차 클러스터링 구조

In [None]:
# ==========================================================
# 8. 1단계 이후 선택적으로 진행: "뒷단 중심" 세분화
# ==========================================================
# 목적:
# - 1단계(앞단 중심)로 큰 묶음 생성
# - 2단계는 버전/옵션/프로모션/기간 등 "뒷단 신호"로 세분화
# - 계층: L_category → 대표_lv1명 → 대표_lv2명
#
# 결과물:
# - 엑셀/사람 검증 친화형 컬럼 유지(site_name, L_category, 대표_lv1명, 대표_lv2명, pname)

# -----------------------------
# 8-1) "뒷단 중심" 임베딩 함수
# -----------------------------
def back_only_embedding(text, back_ratio=0.5, min_tokens=2, decay_rate=0.25):
    """
    문장 뒤쪽 토큰에 가중치를 더 주는 임베딩.
    - back_ratio: 뒤쪽으로 볼 비중(0~1)
    - min_tokens: 최소 뒤쪽 토큰 수
    - decay_rate: 뒤쪽 내부 가중치 감쇠율(뒤에서 앞으로 갈수록 감소)
    """
    tokens = re.split(r"\s+", str(text).strip())
    n = len(tokens)
    if n == 0:
        return np.zeros(model.get_sentence_embedding_dimension())

    split_idx = max(n - max(min_tokens, int(n * back_ratio)), 0)
    front_tokens = tokens[:split_idx]
    back_tokens = tokens[split_idx:]

    # 뒤쪽: 역순 가중 평균
    back_emb = model.encode(back_tokens, show_progress_bar=False)
    back_weights = np.array([np.exp(-decay_rate * i) for i in range(len(back_tokens))])[::-1]
    back_weights /= back_weights.sum()
    back_vec = np.average(back_emb, axis=0, weights=back_weights)

    # 앞쪽: 거의 무시(0.01배)
    if front_tokens:
        front_emb = model.encode(front_tokens, show_progress_bar=False)
        front_vec = np.mean(front_emb, axis=0) * 0.01
    else:
        front_vec = np.zeros(model.get_sentence_embedding_dimension())

    return back_vec + front_vec


# -----------------------------
# 8-2) 뒷단 중심 클러스터링
# -----------------------------
depth_threshold_lv2 = 5  # 낮추면 더 잘게, 올리면 덜 잘게
cluster_results = []

for lv1_name in sorted(df_lv1["대표_lv1명"].unique()):
    subset = df_lv1[df_lv1["대표_lv1명"] == lv1_name].copy()

    # 소규모 그룹은 세분화 생략
    if len(subset) < 3:
        subset["cluster_lv2"] = 0
        subset["대표_lv2명"] = subset["대표_lv1명"] + "_세분화0"
        cluster_results.append(subset)
        continue

    # 임베딩(뒷단 중심)
    subset["embedding_back"] = subset["product_name_trimmed"].apply(
        lambda x: back_only_embedding(x, back_ratio=0.5, min_tokens=2, decay_rate=0.25)
    )
    X_back = np.vstack(subset["embedding_back"].values)

    # 거리 임계값 기반 계층 군집
    cluster_back = AgglomerativeClustering(
        n_clusters=None,
        distance_threshold=depth_threshold_lv2,
        metric="euclidean",
        linkage="ward"
    )
    subset["cluster_lv2"] = cluster_back.fit_predict(X_back)

    # 대표_lv2명 생성(간단 규칙: first 사용; 필요시 mode 기반으로 변경 가능)
    rep_lv2 = (
        subset.groupby(["대표_lv1명", "cluster_lv2"])
        .agg(대표_trim=("product_name_trimmed", "first"))
        .reset_index()
    )
    rep_lv2["대표_lv2명"] = rep_lv2.apply(
        lambda r: f"{r['대표_lv1명']}_{r['대표_trim']}_세분화{r['cluster_lv2']}",
        axis=1
    )

    subset = subset.merge(
        rep_lv2[["대표_lv1명", "cluster_lv2", "대표_lv2명"]],
        on=["대표_lv1명", "cluster_lv2"],
        how="left"
    )

    cluster_results.append(subset)

# 통합
if not cluster_results:
    raise ValueError("2단계 클러스터 결과가 비어 있음. 상위 단계/필터 확인 필요.")
df_final = pd.concat(cluster_results, ignore_index=True)


# -----------------------------
# 8-3) 결과 확인(사람 검증용 콘솔 출력)
# -----------------------------
print("\n=== 2단계 (뒷단 중심 세분화) 클러스터 결과 ===\n")
for lv1_name, g1 in df_final.groupby("대표_lv1명"):
    print(f"[1차 대표] {lv1_name}")
    for lv2_name, g2 in g1.groupby("대표_lv2명"):
        print(f"  ▶ {lv2_name} ({len(g2)}개)")
        for s in g2["product_name_trimmed"].tolist():
            print("     -", s)
    print("-" * 70)


# -----------------------------
# 8-4) 엑셀/대시보드용 최종 DF
# -----------------------------
front_df_lv2 = (
    df_final[["site_name", "L_category", "대표_lv1명", "대표_lv2명", "pname"]]
    .sort_values(by=["L_category", "대표_lv1명", "대표_lv2명"])
    .reset_index(drop=True)
)

# 저장(선택)
# front_df_lv2.to_excel("cluster_lv2_result.xlsx", index=False)
# df_final.to_excel("cluster_lv2_full_result.xlsx", index=False)
