In [1]:
import json
from tqdm import tqdm
from pathlib import Path
from utils import *
import copy

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split, ConcatDataset

device = 'cuda' if torch.cuda.is_available() else 'cpu'

torch.manual_seed(42)
np.random.seed(42)

In [2]:
# Default paths
ROOT = Path("Amazon_products")
TRAIN_PATH = ROOT / "train"
TEST_PATH = ROOT / "test"

TRAIN_CORPUS_PATH = TRAIN_PATH / "train_corpus.txt"
TEST_CORPUS_PATH = TEST_PATH  / "test_corpus.txt"
CLASSES_PATH = ROOT / "classes.txt"
HIERARCHY_PATH = ROOT / "class_hierarchy.txt"
REL_KEYWORDS_PATH = ROOT / "class_related_keywords.txt"

In [3]:
# ---------- Read-only loads ----------

train_pid2text    = load_pid2text(TRAIN_CORPUS_PATH)
test_pid2text     = load_pid2text(TEST_CORPUS_PATH)
classes_int       = load_classes_int(CLASSES_PATH)
classes_str       = load_classes_str(CLASSES_PATH)
rel_keywords      = load_keywords(REL_KEYWORDS_PATH)
class_graph_edges = load_class_graph(HIERARCHY_PATH)

print(f"#train={len(train_pid2text):,}  #test={len(test_pid2text):,}")

#train=29,487  #test=19,658


In [7]:
from collections import deque

num_classes = len(classes_int)

# 1) 무방향 인접 리스트 만들기
adj = [[] for _ in range(num_classes)]
for a, b in class_graph_edges:
    adj[a].append(b)
    adj[b].append(a)

# 2) parent/children 초기화
parents = [[] for _ in range(num_classes)]   # 각 노드의 부모 리스트 (0 또는 1개 들어감)
children = [[] for _ in range(num_classes)]  # 각 노드의 자식들
visited = [False] * num_classes
root_classes = []

# 3) 각 연결요소마다 하나씩 root 잡고 BFS
for start in range(num_classes):
    if visited[start]:
        continue

    # start를 이 연결요소의 root로 선택
    root = start
    root_classes.append(root)

    visited[root] = True
    q = deque([root])

    while q:
        u = q.popleft()
        for v in adj[u]:
            if not visited[v]:
                visited[v] = True
                # u -> v 방향으로 트리 엣지 잡기
                parents[v].append(u)
                children[u].append(v)
                q.append(v)

# 이제:
# parents[c]  = [parent] 또는 [] (root인 경우)
# children[c] = [자식 리스트]
# root_classes = 각 연결요소의 root들
print("roots:", root_classes)

roots: [0, 3, 23]


In [None]:
EMB_PATH = Path("Embeddings")

train_emb_dict = load_json(EMB_PATH / "train_embeddings.json")
test_emb_dict  = load_json(EMB_PATH / "test_embeddings.json")
class_emb_dict = load_json(EMB_PATH / "class_embeddings.json")

train_emb = torch.tensor(list(train_emb_dict.values())) # (N, d)
test_emb  = torch.tensor(list(test_emb_dict.values()))  # (N', d)
class_emb = torch.tensor(list(class_emb_dict.values())) # (C, d)

In [None]:
doc_embs   = F.normalize(train_emb, p=2, dim=1)   # (N, d)
class_embs = F.normalize(class_emb, p=2, dim=1)   # (C, d)

def compute_doc_class_sim(doc_embs, class_embs, batch_size=512):
    """
    doc_embs: (N_docs, d)
    class_embs: (N_classes, d)
    return: sim_matrix (N_docs, N_classes)  ㅡ 코사인 유사도
    """
    sims = []
    n_docs = doc_embs.size(0)
    for start in range(0, n_docs, batch_size):
        end = min(start + batch_size, n_docs)
        batch = doc_embs[start:end]           # (B, d)
        # 코사인 유사도 == 정규화 후 matmul
        sim_batch = batch @ class_embs.T      # (B, N_classes)
        sims.append(sim_batch)
    sims = torch.cat(sims, dim=0)             # (N_docs, N_classes)
    return sims

In [11]:
from heapq import nlargest

def mine_candidates_for_doc(sim_row,
                            root_classes,
                            children,
                            beam_width=10,
                            min_sim=-0.1,
                            max_depth=10):
    """
    sim_row: (N_classes,)  - 이 문서와 각 class의 similarity
    return: candidate_classes: set[int]
    """

    # 초기 beam: root 노드들
    # element: (path_nodes, path_score)
    beam = []
    for r in root_classes:
        s = sim_row[r].item()
        if s >= min_sim:
            beam.append(([r], s))
    candidate_classes = set([r for r, in [(r,) for r in root_classes]])

    depth = 0
    while beam and depth < max_depth:
        new_beam = []
        for path, path_score in beam:
            last = path[-1]
            for child in children[last]:
                s = sim_row[child].item()
                if s < min_sim:
                    continue
                # 경로 점수: 평균으로 예시
                new_score = (path_score * len(path) + s) / (len(path) + 1)
                new_path = path + [child]
                new_beam.append((new_path, new_score))
                candidate_classes.add(child)

        # 상위 beam_width 개만 유지
        if not new_beam:
            break
        beam = nlargest(beam_width, new_beam, key=lambda x: x[1])
        depth += 1

    return candidate_classes

In [12]:
def compute_local_confidence_for_doc(sim_row, c, parents, children):
    """
    sim_row: (N_classes,) for document D
    c: class index
    parents: list of list
    children: list of list
    """
    parent_ids = parents[c]
    sibling_ids = set()
    for p in parent_ids:
        for ch in children[p]:
            if ch != c:
                sibling_ids.add(ch)
    compare_ids = list(parent_ids) + list(sibling_ids)

    if not compare_ids:  # 루트급이면 margin 정의가 애매하니 0으로 둘 수도, 그냥 sim만 쓸 수도
        return 0.0

    max_ps = sim_row[compare_ids].max().item()
    conf = sim_row[c].item() - max_ps
    return conf


In [13]:
num_docs, num_classes = doc_embs.size(0), class_embs.size(0)
sim_matrix = compute_doc_class_sim(doc_embs, class_embs)  # (N_docs, N_classes)

all_conf_values = [[] for _ in range(num_classes)]  # per-class conf list
candidate_per_doc = []

for i in range(num_docs):
    sim_row = sim_matrix[i]  # (N_classes,)
    # 1) top-down candidate mining
    cand_classes = mine_candidates_for_doc(
        sim_row,
        root_classes=root_classes,
        children=children,
        beam_width=10,
        min_sim=-0.1,
        max_depth=10
    )
    candidate_per_doc.append(cand_classes)
    
    # 2) 후보에 대해서만 local margin 계산
    for c in cand_classes:
        conf = compute_local_confidence_for_doc(
            sim_row, c, parents=parents, children=children
        )
        # conf가 너무 작거나 음수인 건 버릴 수도 있음
        all_conf_values[c].append(conf)


KeyboardInterrupt: 

In [None]:
import numpy as np

class_median_conf = np.zeros(num_classes)
for c in range(num_classes):
    vals = all_conf_values[c]
    if len(vals) == 0:
        class_median_conf[c] = float('inf')  # core 안 생기게
    else:
        class_median_conf[c] = np.median(vals)


In [None]:
core_classes_per_doc = [[] for _ in range(num_docs)]

for i in range(num_docs):
    sim_row = sim_matrix[i]
    cand_classes = candidate_per_doc[i]
    for c in cand_classes:
        conf = compute_local_confidence_for_doc(
            sim_row, c, parents=parents, children=children
        )
        if conf >= class_median_conf[c]:
            core_classes_per_doc[i].append(c)
