In [43]:
# pip install yake rake-nltk keybert sentence-transformers networkx langdetect scikit-learn pandas numpy python-dotenv
from __future__ import annotations
import json, random, re, os, time, math
from pathlib import Path
from dotenv import load_dotenv
from typing import List, Dict, Any
import numpy as np
import pandas as pd

from langdetect import detect
import networkx as nx

import yake
from rake_nltk import Rake
from keybert import KeyBERT
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
set_seed(42)

load_dotenv()
print(Path.cwd())

/home/user/workspace/redfin/redfin_label_api/notebooks


In [44]:
# == CONFIG ==
SAMPLE_SIZE        = 100              # 샘플 개수
STRATIFY_FIELD     = "source"         # 층화 샘플링 기준(없으면 None)
ID_FIELD           = "_id"            # 문서 고유 ID 필드
TEXT_FIELDS        = ["title", "summary", "body_text"]  # 라벨러에게 보여줄 텍스트
TOP_K_CANDIDATES   = 12               # 제안 키워드 개수
RANDOM_SEED        = 42

# 입출력 경로
PROJECT_ROOT = Path().cwd().parent
DATA_DIR = PROJECT_ROOT / "data" / "keywords"
INPUT_JSONL        = DATA_DIR / "articles.jsonl"  # 원본 수집 데이터(JSONL)
TEMPLATE_CSV       = DATA_DIR / "gold_template.csv"        # 라벨링 템플릿(단일 annotator용)
TEMPLATE_A_CSV     = DATA_DIR / "gold_template_A.csv"      # annotator A 배포용
TEMPLATE_B_CSV     = DATA_DIR / "gold_template_B.csv"      # annotator B 배포용
MERGED_CSV         = DATA_DIR / "gold_merged.csv"          # A/B 라벨 합친 뒤 점검용
FINAL_GOLD_JSON    = DATA_DIR / "gold_keywords.json"       # 최종 골드셋 {id: [kw,...]}

random.seed(RANDOM_SEED)

In [28]:
def read_jsonl(path: str) -> List[Dict[str, Any]]:
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for ln in f:
            ln = ln.strip()
            if not ln:
                continue
            try:
                rows.append(json.loads(ln))
            except json.JSONDecodeError:
                pass
    return rows

def concat_text(row: pd.Series, fields: list[str]) -> str:
    parts = []
    for k in fields:
        v = row[k] if k in row else ""
        if pd.isna(v):
            continue
        s = v if isinstance(v, str) else str(v)
        s = s.strip()
        if s and s.lower() != "nan":
            parts.append(s)
    return re.sub(r"\s+", " ", " ".join(parts)).strip()

def stratified_sample(df: pd.DataFrame, by: str, n: int) -> pd.DataFrame:
    if not by or by not in df.columns:
        return df.sample(n=min(n, len(df)), random_state=RANDOM_SEED)
    # 층별 균등 비율 샘플 (층 수에 따라 몫/나머지 분배)
    groups = df.groupby(by)
    per = max(1, n // max(1, groups.ngroups))
    out = groups.apply(lambda g: g.sample(n=min(per, len(g)), random_state=RANDOM_SEED)).reset_index(drop=True)
    if len(out) < n:
        remain = n - len(out)
        pool = df.drop(out.index, errors="ignore")
        extra = pool.sample(n=min(remain, len(pool)), random_state=RANDOM_SEED)
        out = pd.concat([out, extra], ignore_index=True)
    return out.head(min(n, len(df)))

In [29]:
def extract_first_n_from_jsonl(path: str, n: int) -> List[Dict[str, Any]]:
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, 1):
            if len(rows) >= n:
                break
                
            line = line.strip()
            if not line:
                continue
                
            try:
                data = json.loads(line)
                rows.append(data)
            except json.JSONDecodeError as e:
                print(f"경고: {line_num}번째 줄 JSON 파싱 실패 - {e}")
                continue
    
    print(f"총 {len(rows)}개의 데이터를 추출했습니다.")
    return rows


In [30]:
def interactive_extract_jsonl(path: str = None) -> List[Dict[str, Any]]:
    if path is None:
        path = str(INPUT_JSONL)
    
    # 파일 존재 확인
    if not Path(path).exists():
        print(f"오류: 파일을 찾을 수 없습니다 - {path}")
        return []
    
    # 사용자 입력 받기
    try:
        n = int(input(f"추출할 데이터 개수를 입력하세요 (기본값: 10): ") or "10")
        if n <= 0:
            print("오류: 양수를 입력해주세요.")
            return []
    except ValueError:
        print("오류: 숫자를 입력해주세요.")
        return []
    
    # 데이터 추출
    print(f"\n{path}에서 처음 {n}개 데이터를 추출 중...")
    data = extract_first_n_from_jsonl(path, n)
    
    if data:
        print(f"\n추출 완료! 데이터 미리보기:")
        for i, item in enumerate(data[:3], 1):
            print(f"{i}. ID: {item.get('_id', 'N/A')}")
            print(f"   Title: {item.get('title', 'N/A')[:80]}...")
            print(f"   Source: {item.get('source', 'N/A')}")
            print()
    
    return data

# 간편 사용을 위한 래퍼 함수들
def get_first_10(path: str = None) -> List[Dict[str, Any]]:
    """처음 10개 데이터 추출"""
    return extract_first_n_from_jsonl(path or str(INPUT_JSONL), 10)

def get_first_50(path: str = None) -> List[Dict[str, Any]]:
    """처음 50개 데이터 추출"""
    return extract_first_n_from_jsonl(path or str(INPUT_JSONL), 50)

def get_first_100(path: str = None) -> List[Dict[str, Any]]:
    """처음 100개 데이터 추출"""
    return extract_first_n_from_jsonl(path or str(INPUT_JSONL), 100)


In [39]:
import json

sample_10 = get_first_10()
sample_50 = get_first_50()

def save_jsonl(data: list[dict], path: str) -> None:
    with open(path, "w", encoding="utf-8") as f:
        for item in data:
            f.write(json.dumps(item, ensure_ascii=False) + "\n")

save_jsonl(sample_10, PROJECT_ROOT / "data" / "keywords" / "goldset_10.jsonl")
save_jsonl(sample_50, PROJECT_ROOT / "data" / "keywords" / "goldset_50.jsonl")
print("goldset_10.jsonl, goldset_50.jsonl 파일로 저장 완료")


# 방법 3: 다른 JSONL 파일에서 추출
print("\n=== 방법 3: 다른 파일에서 추출 ===")
# 다른 JSONL 파일이 있다면 경로를 지정할 수 있습니다
# other_data = extract_first_n_from_jsonl("다른파일.jsonl", 30)

# 방법 4: 인터랙티브 모드 (주피터에서는 input()이 제한적일 수 있음)
print("\n=== 방법 4: 인터랙티브 모드 ===")
print("interactive_extract_jsonl() 함수를 호출하면 사용자 입력을 받습니다.")
print("예: data = interactive_extract_jsonl()")

# 추출된 데이터를 DataFrame으로 변환하여 확인
if first_25:
    df_sample = pd.DataFrame(first_25)
    print(f"\nDataFrame 형태로 변환 완료:")
    print(f"컬럼: {list(df_sample.columns)}")
    print(f"행 수: {len(df_sample)}")
    print(f"\n첫 3행 미리보기:")
    print(df_sample[['_id', 'title', 'source', 'body_text', 'body_text_length', 'summary', 'category', 'tags']].head(3))


총 10개의 데이터를 추출했습니다.
총 50개의 데이터를 추출했습니다.
goldset_10.jsonl, goldset_50.jsonl 파일로 저장 완료

=== 방법 3: 다른 파일에서 추출 ===

=== 방법 4: 인터랙티브 모드 ===
interactive_extract_jsonl() 함수를 호출하면 사용자 입력을 받습니다.
예: data = interactive_extract_jsonl()

DataFrame 형태로 변환 완료:
컬럼: ['_id', 'guid', 'source', 'title', 'link', 'pub_date', 'author', 'category', 'tags', 'group', 'scraped_at', 'body_text', 'body_text_length', 'summary']
행 수: 25

첫 3행 미리보기:
                        _id  \
0  68b7e70933df1b522ba3be0a   
1  68b7e70933df1b522ba3be0b   
2  68b7e70933df1b522ba3be0c   

                                               title  \
0                Accelerating life sciences research   
1  Scaling domain expertise in complex, regulated...   
2         Mixi reimagines communication with ChatGPT   

                        source  \
0  OpenAI Blog (공식, 변경 가능성 주의)   
1  OpenAI Blog (공식, 변경 가능성 주의)   
2  OpenAI Blog (공식, 변경 가능성 주의)   

                                           body_text  body_text_length  \
0  August 22, 202

### 알고리즘별 키워드 추출

공통 파라미터:
- top_k = 10
- YAKE: n<=3, dedupLim=0.9
- RAKE: max_length=3
- TextRank: 윈도우=2, 단어그래프
- KeyBERT: all-MiniLM-L6-v2, use_maximum=True, nr_candidates=20, diversity=0.6


In [None]:
import os
import numpy as np

import yake
from rake_nltk import Rake
from keybert import KeyBERT
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
set_seed(42)

# 경량 임베딩(영문): CPU 가능
_EMBED = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
_KB = KeyBERT(model=_EMBED)

# 1. YAKE
def yake_candidates(text: str, k: int = TOP_K_CANDIDATES) -> List[str]:
    ex = yake.KeywordExtractor(lan="en", n=3, top=k, dedupLim=0.9, windowsSize=2)
    return [kw for kw,_ in ex.extract_keywords(text)]

# 2. RAKE
def rake_candidates(text: str, k: int = TOP_K_CANDIDATES) -> List[str]:
    return [kw for kw,_ in Rake.extract_keywords(text, keyphrase_ngram_range=(1,3),
                                                top_n=k, use_maxsum=True, nr_candidates=max(20, k*2),
                                                diversity=0.6)]


# 3. TextRank
def textrank_candidates(text: str, k: int = TOP_K_CANDIDATES) -> List[str]:
    return [kw for kw,_ in TextRank.extract_keywords(text, keyphrase_ngram_range=(1,3),
                                                    top_n=k, use_maxsum=True, nr_candidates=max(20, k*2),
                                                    diversity=0.6)]

# 4. KeyBERT
def keybert_candidates(text: str, k: int = TOP_K_CANDIDATES) -> List[str]:
    pairs = _KB.extract_keywords(text, keyphrase_ngram_range=(1,3),
                                 top_n=k, use_maxsum=True, nr_candidates=max(20, k*2),
                                 diversity=0.6)
    return [kw for kw,_ in pairs]

def make_candidates(text: str, k: int = TOP_K_CANDIDATES) -> Dict[str, List[str]]:
    if not text:
        return {"yake": [], "keybert": []}
    return {
        "yake": yake_candidates(text, k=k),
        "keybert": keybert_candidates(text, k=k),
    }


In [27]:
# 원본 로드
rows = read_jsonl(INPUT_JSONL)
df = pd.DataFrame(rows)

# 표시용 텍스트 구성
df["display_text"] = df.apply(lambda r: concat_text(r, TEXT_FIELDS), axis=1)

# 샘플링
sample_df = stratified_sample(df, STRATIFY_FIELD, SAMPLE_SIZE).copy()

# 후보 생성(라벨러 보조)
cands = sample_df["display_text"].apply(lambda t: make_candidates(t, k=TOP_K_CANDIDATES))
sample_df["cands_yake"]   = cands.apply(lambda d: ", ".join(d["yake"]))
sample_df["cands_keybert"] = cands.apply(lambda d: ", ".join(d["keybert"]))

# 라벨 입력 컬럼(비워서 제공)
sample_df["gold_keywords"] = ""         # 라벨러가 편집
sample_df["annotator"]     = ""         # 라벨러 이름/이니셜
sample_df["notes"]         = ""         # 주석/의견

# 템플릿에 필요한 핵심 컬럼만
cols = [ID_FIELD, "guid", "source", "title", "summary", "display_text", "cands_yake", "cands_keybert", "gold_keywords", "annotator", "notes"]
template = sample_df.loc[:, [c for c in cols if c in sample_df.columns]]

template.to_csv(TEMPLATE_CSV, index=False)
# 두 명에게 분배할 별도 파일도 생성(동일 내용)
template.to_csv(TEMPLATE_A_CSV, index=False)
template.to_csv(TEMPLATE_B_CSV, index=False)

template.head(3)


KeyboardInterrupt: 

In [None]:
# == 라벨링 위젯 ==
from IPython.display import display, clear_output
import ipywidgets as W
import pandas as pd
import os, json, re

# 진행 저장 경로 (중간 저장)
AUTOSAVE_CSV = "gold_template_working.csv"

def _ensure_cols(df: pd.DataFrame):
    for col in ("gold_keywords","annotator","notes","cands_yake","cands_keybert","display_text"):
        if col not in df.columns:
            df[col] = ""
    return df

def _normalize_kw_list(s: str) -> str:
    """
    쉼표/세미콜론 구분 → 정규화(소문자, 공백정리, 중복제거)
    """
    if not isinstance(s, str):
        return ""
    toks = re.split(r"[;,]", s)
    toks = [re.sub(r"\s+", " ", t).strip().lower() for t in toks if t.strip()]
    out = []
    for t in toks:
        if len(t) < 2: 
            continue
        if re.search(r"https?://", t):
            continue
        if t not in out:
            out.append(t)
    return ", ".join(out)

def launch_labeler(df: pd.DataFrame,
                   id_field: str = "_id",
                   show_fields = ("title","summary"),
                   autosave_csv: str = AUTOSAVE_CSV):
    """
    sample_df를 순차적으로 라벨링하는 인터랙티브 위젯.
    - df: sample_df (cands_yake / cands_keybert / display_text 포함)
    - id_field: 문서 고유키 컬럼
    - show_fields: 상단에 보여줄 필드들
    - autosave_csv: 이동 시 자동 저장되는 경로
    """
    assert id_field in df.columns, f"{id_field} not in dataframe"
    df = df.copy().reset_index(drop=True)
    df = _ensure_cols(df)

    # 이전 작업 이어붙이기
    if os.path.exists(autosave_csv):
        try:
            prev = pd.read_csv(autosave_csv)
            if id_field in prev.columns:
                df = df.drop(columns=[c for c in ("gold_keywords","annotator","notes") if c in df.columns], errors="ignore") \
                       .merge(prev[[id_field,"gold_keywords","annotator","notes"]],
                              on=id_field, how="left")
                for col in ("gold_keywords","annotator","notes"):
                    df[col] = df[col].fillna("")
        except Exception as e:
            print(f"[warn] failed to load previous autosave: {e}")

    N = len(df)
    idx = 0

    # ---- 위젯들 ----
    idx_disp = W.BoundedIntText(value=1, min=1, max=max(1, N), description="Index")
    progress = W.Label()
    id_label = W.HTML()

    # 표시 텍스트(가독)
    show_areas = []
    for f in show_fields:
        area = W.HTML(layout=W.Layout(border="1px solid #ddd", padding="8px", height="auto"))
        show_areas.append((f, area))

    # 후보 선택
    def _split_cands(s: str):
        return [x.strip() for x in s.split(",") if x.strip()]

    cands_yake_ms   = W.SelectMultiple(options=[], description="YAKE", layout=W.Layout(width="48%", height="140px"))
    cands_keybert_ms= W.SelectMultiple(options=[], description="KeyBERT", layout=W.Layout(width="48%", height="140px"))
    cands_box = W.HBox([cands_yake_ms, cands_keybert_ms])

    # 편집 영역
    annotator_tb = W.Text(value="", description="Annotator", layout=W.Layout(width="40%"))
    notes_ta     = W.Textarea(value="", description="Notes", layout=W.Layout(width="100%", height="80px"))
    gold_ta      = W.Textarea(value="", description="Gold Keywords (,로 구분)", layout=W.Layout(width="100%", height="110px"))

    # 버튼들
    btn_prev   = W.Button(description="◀ Prev", button_style="")
    btn_next   = W.Button(description="Next ▶", button_style="primary")
    btn_add_y  = W.Button(description="Add YAKE ▶", button_style="")
    btn_add_k  = W.Button(description="Add KeyBERT ▶", button_style="")
    btn_clear  = W.Button(description="Clear gold", button_style="warning")
    btn_save   = W.Button(description="💾 Save", button_style="success")

    msg = W.HTML()

    def _update_view():
        i = idx_disp.value - 1
        row = df.iloc[i]
        # 헤더/진행률
        id_label.value = f"<b>{id_field}</b>: {row.get(id_field)} | <b>source</b>: {row.get('source','')}"
        progress.value = f"{idx_disp.value}/{N}"

        # 본문 표시
        for f, area in show_areas:
            area.value = f"<b>{f}</b><br>{(row.get(f) or '')}"

        # 후보 목록
        cands_yake_ms.options    = _split_cands(row.get("cands_yake",""))
        cands_keybert_ms.options = _split_cands(row.get("cands_keybert",""))

        # 편집 값
        annotator_tb.value = row.get("annotator","") or ""
        notes_ta.value     = row.get("notes","") or ""
        gold_ta.value      = row.get("gold_keywords","") or ""

        msg.value = ""

    def _write_back(i):
        # normalize하고 df에 되쓰기
        df.at[i, "annotator"]     = annotator_tb.value.strip()
        df.at[i, "notes"]         = notes_ta.value.strip()
        df.at[i, "gold_keywords"] = _normalize_kw_list(gold_ta.value)

    def _autosave():
        try:
            df.to_csv(autosave_csv, index=False)
            return True, f"Saved to {autosave_csv}"
        except Exception as e:
            return False, f"Save failed: {e}"

    def _add_selected(ms_widget):
        cur = _normalize_kw_list(gold_ta.value)
        cur_set = set([s.strip() for s in cur.split(",") if s.strip()]) if cur else set()
        add_set = set([s.lower() for s in ms_widget.value])
        merged = list(dict.fromkeys([*cur_set, *add_set]))
        gold_ta.value = ", ".join(merged)

    # 이벤트 바인딩
    def on_prev(_):
        i = idx_disp.value - 1
        _write_back(i-1 if i>0 else 0)  # 현재 변경사항도 반영되도록
        ok, m = _autosave()
        idx_disp.value = max(1, i)
        _update_view()
        msg.value = f"<span style='color:{'green' if ok else 'red'}'>{m}</span>"

    def on_next(_):
        i = idx_disp.value - 1
        _write_back(i)
        ok, m = _autosave()
        idx_disp.value = min(N, i+2)
        _update_view()
        msg.value = f"<span style='color:{'green' if ok else 'red'}'>{m}</span>"

    def on_jump(change):
        # 인덱스 수동 변경 시 현재 변경사항 저장
        i = max(0, min(N-1, change["old"]-1))
        _write_back(i)
        _autosave()
        _update_view()

    def on_add_y(_): _add_selected(cands_yake_ms)
    def on_add_k(_): _add_selected(cands_keybert_ms)
    def on_clear(_): gold_ta.value = ""
    def on_save(_):
        i = idx_disp.value - 1
        _write_back(i)
        ok, m = _autosave()
        msg.value = f"<span style='color:{'green' if ok else 'red'}'>{m}</span>"

    btn_prev.on_click(on_prev)
    btn_next.on_click(on_next)
    btn_add_y.on_click(on_add_y)
    btn_add_k.on_click(on_add_k)
    btn_clear.on_click(on_clear)
    btn_save.on_click(on_save)
    idx_disp.observe(on_jump, names="value")

    header = W.HBox([idx_disp, progress])
    meta   = W.HTML("<b>Document Meta</b>")
    cand_btns = W.HBox([btn_add_y, btn_add_k, btn_clear, btn_save])
    nav    = W.HBox([btn_prev, btn_next])

    # 레이아웃
    box = W.VBox([
        header, id_label,
        W.HTML("<hr>"),
        meta,
        *(area for _, area in show_areas),
        W.HTML("<hr><b>Candidate Keywords</b>"),
        cands_box, cand_btns,
        W.HTML("<hr><b>Gold Label</b>"),
        annotator_tb, gold_ta, notes_ta,
        nav, msg
    ])

    # 초기화
    _update_view()
    display(box)

    # 반환: 작업본 DataFrame을 호출자도 참조 가능
    return df

# ---- 사용 예시 ----
# 아래 한 줄을 실행하면 위젯이 뜹니다.
# sample_df = stratified_sample(df, STRATIFY_FIELD, SAMPLE_SIZE).copy()
# sample_df = _ensure_cols(sample_df)
# labeled_df = launch_labeler(sample_df, id_field=ID_FIELD, show_fields=("title","summary","display_text"))


In [None]:
sample_df = stratified_sample(df, STRATIFY_FIELD, SAMPLE_SIZE).copy()
sample_df = _ensure_cols(sample_df)  # 안전하게 컬럼 준비
labeled_df = launch_labeler(sample_df, id_field=ID_FIELD, show_fields=("title","summary","display_text"))


NameError: name '_ensure_cols' is not defined

In [None]:
# 두 명의 CSV를 수집 후 합의 컬럼을 만듭니다. 기본은 교집합+부분합의 규칙을 제공하고, 불일치 건을 표시
def parse_kw_list(s: str) -> List[str]:
    if not isinstance(s, str) or not s.strip():
        return []
    # 쉼표/세미콜론 공용 파싱, 소문자/트림
    toks = re.split(r"[;,]", s)
    toks = [re.sub(r"\s+", " ", t).strip().lower() for t in toks if t.strip()]
    # 중복 제거, 길이 2 미만/URL 제거
    out = []
    for t in toks:
        if len(t) < 2: 
            continue
        if re.search(r"https?://", t):
            continue
        if t not in out:
            out.append(t)
    return out

def jaccard(a: List[str], b: List[str]) -> float:
    A, B = set(a), set(b)
    return len(A & B) / max(1, len(A | B))

# A/B 결과 로드
A = pd.read_csv(TEMPLATE_A_CSV)
B = pd.read_csv(TEMPLATE_B_CSV)

# 병합 키: ID_FIELD
merged = A.merge(B[[ID_FIELD, "gold_keywords"]], on=ID_FIELD, suffixes=("_A", "_B"), how="outer")

# 합의 규칙
final_kw, agree, jacc = [], [], []
for _, r in merged.iterrows():
    kwsA = parse_kw_list(r.get("gold_keywords_A",""))
    kwsB = parse_kw_list(r.get("gold_keywords_B",""))
    inter = sorted(set(kwsA) & set(kwsB))
    union = sorted(set(kwsA) | set(kwsB))
    # 기본 합의안: 교집합이 5개 미만이면 중요한 후보(교집합 + A/B 상위 2개씩)로 제안
    if len(inter) >= 5:
        fused = inter
    else:
        fused = sorted(set(inter + kwsA[:2] + kwsB[:2]))
    final_kw.append(", ".join(fused))
    agree.append(1 if len(inter) >= max(3, min(len(union)//2, 5)) else 0)
    jacc.append(jaccard(kwsA, kwsB))

merged["gold_keywords_final"] = final_kw
merged["annotator_agree_flag"] = agree       # 1=대체로 합의, 0=재검토 요망
merged["annotator_jaccard"]    = jacc

merged.to_csv(MERGED_CSV, index=False)
merged.head(3)
