In [None]:
import pandas as pd, hashlib
import matplotlib.font_manager as fm
import matplotlib.pyplot as plt
import platform
import os

# ===== 한글/한자 폰트 자동 설정 =====
def setup_korean_fonts():
    """macOS/Windows/Linux에서 한글/한자를 지원하는 폰트 자동 설정"""
    system = platform.system()
    
    # 시스템별 한글/한자 폰트 우선순위
    font_candidates = []
    if system == 'Darwin':  # macOS
        font_candidates = [
            'AppleGothic',           # 한글 (macOS 기본)
            'Apple SD Gothic Neo',   # 한글 (macOS)
            'Arial Unicode MS',      # 한글+한자
            'Nanum Gothic',          # 나눔고딕
        ]
    elif system == 'Windows':
        font_candidates = [
            'Malgun Gothic',    # 맑은고딕 (Windows 기본)
            'Gulim',           # 굴림
            'Batang',          # 바탕
            'NanumGothic',     # 나눔고딕
        ]
    else:  # Linux
        font_candidates = [
            'NanumGothic',
            'Noto Sans CJK KR',
            'Noto Sans KR',
            'DejaVu Sans',
        ]
    
    # 사용 가능한 폰트 찾기
    available_fonts = [f.name for f in fm.fontManager.ttflist]
    
    for font in font_candidates:
        if font in available_fonts:
            print(f"[FONT] 한글/한자 폰트 설정: {font}")
            plt.rcParams['font.family'] = font
            plt.rcParams['axes.unicode_minus'] = False  # 마이너스 기호 깨짐 방지
            return font
    
    # 폰트를 찾지 못한 경우 경고
    print("[WARNING] 한글/한자 지원 폰트를 찾지 못했습니다. 텍스트가 깨질 수 있습니다.")
    plt.rcParams['font.family'] = 'DejaVu Sans'
    plt.rcParams['axes.unicode_minus'] = False
    return None

# 폰트 설정 적용
setup_korean_fonts()

PATH = "gwashi.csv"                      # 입력
ENC  = "utf-8"                           # 필요시 "cp949"
N    = 12                                # 해시 길이(접두사 제외)

# === 매핑: 각자 컬럼명에 맞게 수정 ===
MAP = {
    "person": dict(prefix="PE", fields=["writer"]),                    # 예: 작자명
    "answer": dict(prefix="A",  fields=["writer","q_id","year"]),      # 예: (작자, 문제, 연도)
    "question": dict(prefix="Q", fields=["q_name","category1","category2"]),
}

def sha(prefix,*fs,n=N):
    s = "||".join("" if pd.isna(x) else str(x).strip() for x in fs)
    return f"{prefix}{hashlib.sha1(s.encode()).hexdigest()[:n]}"

def build_id_series(df, prefix, fields, n=N):
    return [sha(prefix, *[df[f].iloc[i] if f in df.columns else "" for f in fields], n=n) for i in range(len(df))]

def stability_and_uniqueness(df, key_name, prefix, fields):
    v1 = build_id_series(df, prefix, fields)
    v2 = build_id_series(df, prefix, fields)           # 동일 입력 재계산
    stab = (pd.Series(v1) == pd.Series(v2)).mean()     # 재현성(=1.0이 정상)
    dup  = pd.Series(v1).value_counts()
    dup_tbl = dup[dup>1].reset_index().rename(columns={"index":key_name,0:"cnt"})
    return stab, dup_tbl

df = pd.read_csv(PATH, encoding=ENC, engine="python")

reports = {}
for ent, cfg in MAP.items():
    stab, dup_tbl = stability_and_uniqueness(df, f"{ent}_id", cfg["prefix"], cfg["fields"])
    reports[ent] = {"stability": float(stab), "dup_cnt": int(len(dup_tbl)), "dup_table": dup_tbl.head(20)}

# 출력 요약
for ent, r in reports.items():
    print(f"[{ent.upper()}] 재현성={r['stability']:.4f}  중복key개수={r['dup_cnt']}")
    if r["dup_cnt"]>0:
        print(r["dup_table"])

In [2]:
# -*- coding: utf-8 -*-
"""
Entity graph (Exam–Question–Time) + Temporal flow visualization (Korean font fixed)
"""
import os, json
from collections import defaultdict, Counter
import pandas as pd, numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import networkx as nx
import platform

# ===== 한글/한자 폰트 자동 설정 =====
def setup_korean_fonts():
    """macOS/Windows/Linux에서 한글/한자를 지원하는 폰트 자동 설정"""
    system = platform.system()
    
    # 시스템별 한글/한자 폰트 우선순위
    font_candidates = []
    if system == 'Darwin':  # macOS
        font_candidates = [
            'AppleGothic',           # 한글 (macOS 기본)
            'Apple SD Gothic Neo',   # 한글 (macOS)
            'Arial Unicode MS',      # 한글+한자
            'Nanum Gothic',          # 나눔고딕
        ]
    elif system == 'Windows':
        font_candidates = [
            'Malgun Gothic',    # 맑은고딕 (Windows 기본)
            'Gulim',           # 굴림
            'Batang',          # 바탕
            'NanumGothic',     # 나눔고딕
        ]
    else:  # Linux
        font_candidates = [
            'NanumGothic',
            'Noto Sans CJK KR',
            'Noto Sans KR',
            'DejaVu Sans',
        ]
    
    # 사용 가능한 폰트 찾기
    available_fonts = [f.name for f in fm.fontManager.ttflist]
    
    for font in font_candidates:
        if font in available_fonts:
            print(f"[FONT] 한글/한자 폰트 설정: {font}")
            plt.rcParams['font.family'] = font
            plt.rcParams['axes.unicode_minus'] = False  # 마이너스 기호 깨짐 방지
            return font
    
    # 폰트를 찾지 못한 경우 경고
    print("[WARNING] 한글/한자 지원 폰트를 찾지 못했습니다. 텍스트가 깨질 수 있습니다.")
    plt.rcParams['font.family'] = 'DejaVu Sans'
    plt.rcParams['axes.unicode_minus'] = False
    return None

# 폰트 설정 적용
setup_korean_fonts()

# ===== Path setup =====
BASE = "/Users/songhune/Library/Mobile Documents/com~apple~CloudDocs/Workspace/korean_eda"
IN_JSONL = os.path.join(BASE, "notebook", "eda_outputs", "1번실험", "triples_no_answer.jsonl")
OUT_DIR  = os.path.join(BASE, "notebook", "experiments", "graphs")
os.makedirs(OUT_DIR, exist_ok=True)

def infer_type(nid):
    if not nid: return "Unknown"
    return {"E":"Exam","Q":"Question","T":"Time"}.get(nid[0].upper(),"Unknown")

# ===== Load =====
rows=[]
with open(IN_JSONL,encoding="utf-8") as f:
    for l in f:
        if l.strip(): rows.append(json.loads(l))
print(f"[LOAD] {len(rows)} rows")

# ===== Parse nodes/edges =====
G = nx.MultiDiGraph()
exam_to_time, q_to_exam, time_year = {}, {}, {}
q_cat, q_sub = {}, {}
exam_stage, exam_typeA, exam_kind = {}, {}, {}

for r in rows:
    for key in ("exam","question","time"):
        if key in r:
            nid = r[key].get("id")
            if nid:
                G.add_node(nid, type=infer_type(nid), name=r[key].get("name",""))
    for t in r["triples"]:
        s,p,o,o_type = t["s"], t["p"], t["o"], t.get("o_type","")
        if o_type=="id":
            G.add_node(s,type=infer_type(s))
            G.add_node(o,type=infer_type(o))
            G.add_edge(s,o,predicate=p)
            if p=="isHeldOn": exam_to_time[s]=o
            if p=="isPartOf": q_to_exam[s]=o
        else:
            # literals as attributes
            if infer_type(s)=="Time" and p=="year":
                try: time_year[s]=int(float(o))
                except: pass
            if infer_type(s)=="Question":
                if p=="hasCategory": q_cat[s]=o
                if p=="hasSubcategory": q_sub[s]=o
            if infer_type(s)=="Exam":
                if p=="hasStage": exam_stage[s]=o
                if p=="hasTypeA": exam_typeA[s]=o
                if p=="hasCategory": exam_kind[s]=o

print(f"[GRAPH] nodes={G.number_of_nodes()} edges={G.number_of_edges()}")

# ===== Predicate counts =====
pred_counts = Counter(d.get("predicate","") for _,_,d in G.edges(data=True))
pred_df = pd.DataFrame(sorted(pred_counts.items(), key=lambda x:-x[1]), columns=["predicate","count"])
pred_df.to_csv(os.path.join(OUT_DIR,"edge_density.csv"),index=False)

# ===== Temporal base table =====
records=[]
for q,e in q_to_exam.items():
    t=exam_to_time.get(e)
    y=time_year.get(t)
    if y is None: continue
    records.append({
        "year":y,
        "exam_id":e,
        "question_id":q,
        "stage":exam_stage.get(e,""),
        "typeA":exam_typeA.get(e,""),
        "examKind":exam_kind.get(e,""),
        "qCategory":q_cat.get(q,""),
        "qSubcategory":q_sub.get(q,"")
    })
flow_df=pd.DataFrame(records)
flow_df.to_csv(os.path.join(OUT_DIR,"temporal_flow_base.csv"),index=False)

# ===== Figure A: schema subset =====
def draw_schema(year_min=1393,year_max=1410):
    keep_T={t for t,y in time_year.items() if year_min<=y<=year_max}
    keep_E={e for e,t in exam_to_time.items() if t in keep_T}
    keep_Q={q for q,e in q_to_exam.items() if e in keep_E}
    H=nx.MultiDiGraph()
    for n in list(keep_T)+list(keep_E)+list(keep_Q):
        H.add_node(n,type=infer_type(n))
    for s,t,d in G.edges(data=True):
        if s in H and t in H and d["predicate"] in ("isHeldOn","isPartOf"):
            H.add_edge(s,t,predicate=d["predicate"])
    pos={}
    def layer(nodes,y):
        for i,n in enumerate(sorted(nodes)): pos[n]=(i,y)
    Tn=[n for n in H if infer_type(n)=="Time"]
    En=[n for n in H if infer_type(n)=="Exam"]
    Qn=[n for n in H if infer_type(n)=="Question"]
    layer(Tn,2); layer(En,1); layer(Qn,0)
    plt.figure(figsize=(14,9))
    nx.draw(H,pos,with_labels=False,node_size=400,arrows=True)
    labs={n:H.nodes[n].get("name","")[:12] for n in H}
    nx.draw_networkx_labels(H,pos,labs,font_size=8)
    e_labels={(u,v):d["predicate"] for u,v,d in H.edges(data=True)}
    nx.draw_networkx_edge_labels(H,pos,edge_labels=e_labels,font_size=8)
    plt.title(f"Exam–Question–Time schema (subset {year_min}-{year_max})")
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR,"A_schema_small_1393_1410.png"),dpi=200,bbox_inches='tight')
    plt.close()

draw_schema()

# ===== Figure B: edge density =====
plt.figure(figsize=(9,5))
plt.bar(pred_df["predicate"],pred_df["count"],color="steelblue")
plt.xticks(rotation=45,ha='right')
plt.ylabel("Edge count")
plt.title("Edge density by predicate")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR,"B_edge_density.png"),dpi=200,bbox_inches='tight')
plt.close()

# ===== Figure C: temporal flow =====
annual=flow_df.groupby("year")["question_id"].nunique().reset_index(name="n_questions")
plt.figure(figsize=(12,5))
plt.plot(annual["year"],annual["n_questions"],color="darkslateblue",lw=1.8)
plt.xlabel("Year"); plt.ylabel("Number of questions")
plt.title("Number of questions per year")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR,"C1_questions_per_year.png"),dpi=200,bbox_inches='tight')
plt.close()

# Stacked area by stage
top_stages=flow_df["stage"].value_counts().head(5).index.tolist()
tmp=flow_df.copy()
tmp["stage_top"]=np.where(tmp["stage"].isin(top_stages),tmp["stage"],"Others")
pivot=(tmp.groupby(["year","stage_top"])["question_id"]
        .nunique().reset_index()
        .pivot(index="year",columns="stage_top",values="question_id").fillna(0))
plt.figure(figsize=(12,6))
plt.stackplot(pivot.index,*[pivot[c] for c in pivot.columns],labels=pivot.columns)
plt.legend(loc="upper left",fontsize=9)
plt.xlabel("Year"); plt.ylabel("Number of questions")
plt.title("Questions by stage over time")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR,"C2_stage_over_time.png"),dpi=200,bbox_inches='tight')
plt.close()

# Stacked area by category
top_cats=flow_df["qCategory"].value_counts().head(5).index.tolist()
tmp=flow_df.copy()
tmp["cat_top"]=np.where(tmp["qCategory"].isin(top_cats),tmp["qCategory"],"Others")
pivot=(tmp.groupby(["year","cat_top"])["question_id"]
        .nunique().reset_index()
        .pivot(index="year",columns="cat_top",values="question_id").fillna(0))
plt.figure(figsize=(12,6))
plt.stackplot(pivot.index,*[pivot[c] for c in pivot.columns],labels=pivot.columns)
plt.legend(loc="upper left",fontsize=9)
plt.xlabel("Year"); plt.ylabel("Number of questions")
plt.title("Questions by category over time")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR,"C3_category_over_time.png"),dpi=200,bbox_inches='tight')
plt.close()

print(f"\n✅ Figures saved in {OUT_DIR}")

[FONT] 한글/한자 폰트 설정: AppleGothic
[LOAD] 3348 rows
[GRAPH] nodes=5469 edges=6696


  plt.tight_layout()



✅ Figures saved in /Users/songhune/Library/Mobile Documents/com~apple~CloudDocs/Workspace/korean_eda/notebook/experiments/graphs


In [4]:
# -*- coding: utf-8 -*-
"""
Useful diagnostics & visualization (Korean font fixed + Improved labels)
- A' Tripartite network (with informative styling)
- B' Link completeness funnel (Q→E, E→T, Q→E→T)
- B'' Attribute coverage heatmap
"""
import os, json, math, re
from collections import defaultdict, Counter
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import seaborn as sns
import networkx as nx
import platform

# ===== 한글/한자 폰트 자동 설정 (개선 버전) =====
def setup_korean_fonts():
    """macOS/Windows/Linux에서 한글/한자를 지원하는 폰트 자동 설정"""
    system = platform.system()
    
    # 시스템별 한글/한자 폰트 우선순위
    font_candidates = []
    if system == 'Darwin':  # macOS
        font_candidates = [
            'AppleGothic',           # 한글 (macOS 기본)
            'Apple SD Gothic Neo',   # 한글 (macOS)
            'Arial Unicode MS',      # 한글+한자
            'Nanum Gothic',          # 나눔고딕
        ]
    elif system == 'Windows':
        font_candidates = [
            'Malgun Gothic',    # 맑은고딕 (Windows 기본)
            'Gulim',           # 굴림
            'Batang',          # 바탕
            'NanumGothic',     # 나눔고딕
        ]
    else:  # Linux
        font_candidates = [
            'NanumGothic',
            'Noto Sans CJK KR',
            'Noto Sans KR',
            'DejaVu Sans',
        ]
    
    # 사용 가능한 폰트 찾기
    available_fonts = [f.name for f in fm.fontManager.ttflist]
    
    selected_font = None
    for font in font_candidates:
        if font in available_fonts:
            selected_font = font
            break
    
    if not selected_font:
        print("[WARNING] 한글/한자 지원 폰트를 찾지 못했습니다. 텍스트가 깨질 수 있습니다.")
        return None
    
    # 강제 설정 (matplotlib + pyplot 모두)
    matplotlib.rcParams['font.family'] = selected_font
    matplotlib.rcParams['font.sans-serif'] = [selected_font]
    matplotlib.rcParams['axes.unicode_minus'] = False
    
    plt.rcParams['font.family'] = selected_font
    plt.rcParams['font.sans-serif'] = [selected_font]
    plt.rcParams['axes.unicode_minus'] = False
    
    plt.rc('font', family=selected_font)
    
    print(f"[FONT] 한글/한자 폰트 설정: {selected_font}")
    return selected_font

# 폰트 설정 적용
setup_korean_fonts()

# seaborn 설정 (이게 폰트를 리셋할 수 있음!)
sns.set_context("talk")
sns.set_style("whitegrid")

# seaborn 설정 후 폰트 다시 적용 (중요!)
setup_korean_fonts()

# ========= Paths =========
BASE = "/Users/songhune/Library/Mobile Documents/com~apple~CloudDocs/Workspace/korean_eda"
IN_JSONL = os.path.join(BASE, "notebook", "eda_outputs", "1번실험","triples_no_answer.jsonl")
OUT_DIR  = os.path.join(BASE, "notebook", "experiments", "graphs")
os.makedirs(OUT_DIR, exist_ok=True)

# ========= Helpers =========
def infer_type(nid: str) -> str:
    if not nid: return "Unknown"
    return {"E":"Exam","Q":"Question","T":"Time"}.get(nid[0].upper(),"Unknown")

def safe_int(x):
    try: return int(float(x))
    except: return None

def smart_truncate(text: str, max_len: int = 8, node_type: str = "Unknown") -> str:
    """
    스마트 텍스트 축약
    - Time: 연도만 표시
    - Exam: 연도 + 핵심단어
    - Question: 첫 단어 + 말줄임
    """
    if not text:
        return ""
    
    # Time 노드: 연도만 추출
    if node_type == "Time":
        year_match = re.search(r'(\d{4})', text)
        if year_match:
            return year_match.group(1)
        return text[:max_len]
    
    # Exam 노드: 연도 + 첫 단어
    if node_type == "Exam":
        year_match = re.search(r'(\d{4})', text)
        year = year_match.group(1) if year_match else ""
        # 연도 제거 후 첫 단어 추출
        text_without_year = re.sub(r'\d{4}년?_?', '', text)
        first_word = text_without_year.split('_')[0] if '_' in text_without_year else text_without_year
        first_word = first_word[:6]
        return f"{year}\n{first_word}" if year else first_word
    
    # Question 노드: 첫 부분만
    if node_type == "Question":
        # 연도 제거
        text_clean = re.sub(r'\d{4}년?_?', '', text)
        parts = text_clean.split('_')
        if len(parts) >= 2:
            return f"{parts[0][:4]}\n{parts[1][:4]}"
        return text_clean[:max_len] + "…"
    
    # 기본: 단순 자르기
    return text[:max_len] + ("…" if len(text) > max_len else "")

# ========= Load =========
rows = []
with open(IN_JSONL, encoding="utf-8") as f:
    for line in f:
        line = line.strip()
        if line:
            rows.append(json.loads(line))

# ========= Parse triples into structures =========
# Nodes
node_name = {}
node_type = {}

# Edges (only id→id)
edges = []  # (s, p, o)

# Literal attributes per node
lit = defaultdict(lambda: defaultdict(list))

for r in rows:
    for key in ("exam", "question", "time"):
        if key in r and isinstance(r[key], dict):
            nid = r[key].get("id")
            if nid:
                node_type[nid] = infer_type(nid)
                # prefer non-empty name
                nm = r[key].get("name") or ""
                if nm:
                    node_name[nid] = nm
    for t in r.get("triples", []):
        s, p, o = t.get("s"), t.get("p"), t.get("o")
        o_type = t.get("o_type", "")
        if not s or not p or o is None: 
            continue
        if o_type == "id":
            edges.append((s, p, o))
            # register node types if missing
            if s not in node_type: node_type[s] = infer_type(s)
            if o not in node_type: node_type[o] = infer_type(o)
        else:
            lit[s][p].append(o)

# Quick dictionaries for key relations
exam_to_time = {}      # E -> T
question_to_exam = {}  # Q -> E
for s, p, o in edges:
    if p == "isHeldOn" and node_type.get(s) == "Exam" and node_type.get(o) == "Time":
        exam_to_time[s] = o
    if p == "isPartOf" and node_type.get(s) == "Question" and node_type.get(o) == "Exam":
        question_to_exam[s] = o

# Useful literal maps
time_year = {t: safe_int(vals[0]) for t,vals in ((tid, lit[tid].get("year", [""])) for tid,typ in node_type.items() if typ=="Time") if vals}
q_cat     = {q: vals[0] for q,vals in ((q, lit[q].get("hasCategory", [""])) for q,typ in node_type.items() if typ=="Question") if vals and vals[0]}
exam_stage= {e: vals[0] for e,vals in ((eid, lit[eid].get("hasStage", [""])) for eid,typ in node_type.items() if typ=="Exam") if vals and vals[0]}

# ========= A' Tripartite (informative) =========
def draw_tripartite_subset(year_min=1393, year_max=1410, max_questions_per_exam=12):
    # pick subset by year
    keep_time = {t for t,y in time_year.items() if y is not None and year_min <= y <= year_max}
    keep_exam = {e for e,t in exam_to_time.items() if t in keep_time}
    keep_q    = {q for q,e in question_to_exam.items() if e in keep_exam}

    # downsample per exam to avoid overplot
    if max_questions_per_exam is not None:
        by_exam = defaultdict(list)
        for q in keep_q:
            by_exam[question_to_exam[q]].append(q)
        keep_q = set(sum([qs[:max_questions_per_exam] for qs in by_exam.values()], []))

    H = nx.MultiDiGraph()
    for n in list(keep_time) + list(keep_exam) + list(keep_q):
        H.add_node(n, t=node_type.get(n,"Unknown"))

    for s,p,o in edges:
        if s in H and o in H and p in ("isHeldOn","isPartOf"):
            H.add_edge(s,o,predicate=p)

    # positions: layered layout with more spacing
    pos = {}
    def place(layer_nodes, y, spacing=1.5):
        sorted_nodes = sorted(layer_nodes)
        for i, n in enumerate(sorted_nodes):
            pos[n] = (i * spacing, y)

    T_nodes = [n for n in H if H.nodes[n]['t']=="Time"]
    E_nodes = [n for n in H if H.nodes[n]['t']=="Exam"]
    Q_nodes = [n for n in H if H.nodes[n]['t']=="Question"]

    place(T_nodes, 2.5, spacing=2.0)   # Time: 넓은 간격
    place(E_nodes, 1.5, spacing=1.8)   # Exam: 중간 간격
    place(Q_nodes, 0.5, spacing=1.2)   # Question: 좁은 간격

    # node style
    color_map = {"Time":"#5B8FF9", "Exam":"#61DDAA", "Question":"#F6BD16", "Unknown":"#999999"}
    node_sizes = []
    node_colors = []
    for n in H.nodes():
        deg = H.degree(n)
        size = 150 + 60*deg  # 노드 크기 약간 축소
        node_sizes.append(size)
        node_colors.append(color_map.get(H.nodes[n]['t'],"#999999"))

    # edge style
    widths = []
    alphas = []
    for u,v,k in H.edges(keys=True):
        p = H.get_edge_data(u,v,k).get("predicate","")
        if p == "isPartOf":
            widths.append(1.2)
            alphas.append(0.25)
        elif p == "isHeldOn":
            widths.append(2.0)
            alphas.append(0.4)
        else:
            widths.append(0.8)
            alphas.append(0.2)

    plt.figure(figsize=(20,10))
    
    # draw edges first
    for (u,v,k), w, a in zip(H.edges(keys=True), widths, alphas):
        nx.draw_networkx_edges(H, pos, edgelist=[(u,v)], width=w, alpha=a, 
                              arrows=True, arrowstyle='-|>', arrowsize=8)

    # draw nodes
    nx.draw_networkx_nodes(H, pos, node_color=node_colors, node_size=node_sizes, 
                          linewidths=0.5, edgecolors="#333333")
    
    # draw labels with smart truncation
    short_labels = {}
    for n in H.nodes():
        nm = node_name.get(n,"")
        ntype = H.nodes[n]['t']
        
        if not nm:
            if ntype == "Time":
                y = time_year.get(n, "")
                nm = str(y) if y else "T"
            else:
                nm = ntype[:1]
        
        short_labels[n] = smart_truncate(nm, max_len=10, node_type=ntype)
    
    # 폰트 크기를 더 작게
    nx.draw_networkx_labels(H, pos, short_labels, font_size=6)

    # Legend
    from matplotlib.lines import Line2D
    leg_elems = [
        Line2D([0],[0], marker='o', color='w', label='Time', 
               markerfacecolor=color_map["Time"], markersize=10),
        Line2D([0],[0], marker='o', color='w', label='Exam', 
               markerfacecolor=color_map["Exam"], markersize=10),
        Line2D([0],[0], marker='o', color='w', label='Question', 
               markerfacecolor=color_map["Question"], markersize=10),
        Line2D([0],[0], color='#333333', lw=2.0, label='isHeldOn (Exam→Time)'),
        Line2D([0],[0], color='#333333', lw=1.2, label='isPartOf (Question→Exam)'),
    ]
    plt.legend(handles=leg_elems, loc='upper left', frameon=True, fontsize=10)

    # Title
    plt.title(f"Tripartite network (subset {year_min}-{year_max})\n"
              f"Nodes sized by degree; edges styled by predicate\n"
              f"Labels: Time=year, Exam=year+keyword, Question=abbreviated", 
              fontsize=14)
    plt.tight_layout()
    outpath = os.path.join(OUT_DIR, f"A_prime_tripartite_{year_min}_{year_max}.png")
    plt.savefig(outpath, dpi=200, bbox_inches='tight')
    plt.close()
    return outpath

# ========= B' Link completeness funnel =========
def draw_link_funnel():
    all_Q = {n for n,t in node_type.items() if t=="Question"}
    all_E = {n for n,t in node_type.items() if t=="Exam"}

    q_has_e = len(question_to_exam)
    e_has_t = len(exam_to_time)

    # Q→E→T chain completeness
    q_chain = sum(1 for q in all_Q if q in question_to_exam and question_to_exam[q] in exam_to_time)

    q_total = len(all_Q)
    e_total = len(all_E)

    metrics = pd.DataFrame([
        {"stage":"Question→Exam", "value": 100.0 * (q_has_e / q_total if q_total else 0), "num": q_has_e, "den": q_total},
        {"stage":"Exam→Time",    "value": 100.0 * (e_has_t / e_total if e_total else 0), "num": e_has_t, "den": e_total},
        {"stage":"Q→E→T chain",  "value": 100.0 * (q_chain / q_total if q_total else 0), "num": q_chain, "den": q_total},
    ])

    plt.figure(figsize=(9,5))
    ax = sns.barplot(data=metrics, x="stage", y="value")
    ax.bar_label(ax.containers[0], labels=[f"{v:.1f}% ({n}/{d})" for v,n,d in zip(metrics['value'], metrics['num'], metrics['den'])],
                 padding=3, fontsize=11)
    plt.ylim(0,100)
    plt.ylabel("Completeness (%)")
    plt.xlabel("")
    plt.title("Link completeness across the core chain")
    plt.tight_layout()
    outpath = os.path.join(OUT_DIR, "B_prime_link_completeness.png")
    plt.savefig(outpath, dpi=200, bbox_inches='tight')
    plt.close()
    return outpath

# ========= B'' Attribute coverage heatmap =========
Q_ATTRS = [("hasCategory","Category"), ("hasSubcategory","Subcategory"),
           ("hasAbstract","Abstract"), ("hasContent","Content"),
           ("hasSource","Source"), ("hasSourceURL","SourceURL")]
E_ATTRS = [("hasTypeA","TypeA"), ("hasTypeB","TypeB"), ("hasCategory","ExamKind"),
           ("hasStage","Stage"), ("hasRound","Round"),
           ("isRecordedIn","RecordTitle"), ("hasRecordURL","RecordURL")]
T_ATTRS = [("year","Year"), ("month","Month"), ("day","Day"),
           ("sexagenaryKR","KR Sexagenary"), ("sexagenaryCN","CN Sexagenary")]

def coverage_for(entity_type, attrs):
    nodes = [n for n,t in node_type.items() if t==entity_type]
    total = len(nodes)
    cov = []
    for p,pretty in attrs:
        count = 0
        for n in nodes:
            vals = lit[n].get(p, [])
            if vals:
                if any(str(v).strip() for v in vals):
                    count += 1
        cov.append(100.0 * (count / total if total else 0))
    return [round(x,1) for x in cov]

def draw_attr_heatmap():
    data = []
    idx = []
    for etype, attrs in [("Question", Q_ATTRS), ("Exam", E_ATTRS), ("Time", T_ATTRS)]:
        idx.append(etype)
        data.append(coverage_for(etype, attrs))
    
    cols = [pretty for _,pretty in Q_ATTRS] + [pretty for _,pretty in E_ATTRS] + [pretty for _,pretty in T_ATTRS]
    mat = np.zeros((3, len(cols))) * np.nan
    start = 0
    for i,(attrs,etype) in enumerate([(Q_ATTRS,"Question"),(E_ATTRS,"Exam"),(T_ATTRS,"Time")]):
        for j,(_,pretty) in enumerate(attrs):
            mat[i, start+j] = coverage_for(etype, attrs)[j]
        start += len(attrs)
    df = pd.DataFrame(mat, index=["Question","Exam","Time"], columns=cols)

    plt.figure(figsize=(max(12, len(cols)*0.6), 5))
    ax = sns.heatmap(df, annot=True, fmt=".1f", cmap="YlGnBu", vmin=0, vmax=100, 
                     cbar_kws={"label":"Coverage (%)"},
                     linewidths=0.5, linecolor="#EEEEEE", annot_kws={"size":10})
    plt.title("Attribute coverage by entity")
    plt.xticks(rotation=45, ha="right")
    plt.tight_layout()
    outpath = os.path.join(OUT_DIR, "B_double_prime_attr_coverage.png")
    plt.savefig(outpath, dpi=200, bbox_inches='tight')
    plt.close()
    return outpath

# ========= Run & save =========
a_path = draw_tripartite_subset(1393, 1410, max_questions_per_exam=10)
b1_path = draw_link_funnel()
b2_path = draw_attr_heatmap()

print("✅ Saved:")
print("  ", a_path)
print("  ", b1_path)
print("  ", b2_path)

[FONT] 한글/한자 폰트 설정: AppleGothic
[FONT] 한글/한자 폰트 설정: AppleGothic
✅ Saved:
   /Users/songhune/Library/Mobile Documents/com~apple~CloudDocs/Workspace/korean_eda/notebook/experiments/graphs/A_prime_tripartite_1393_1410.png
   /Users/songhune/Library/Mobile Documents/com~apple~CloudDocs/Workspace/korean_eda/notebook/experiments/graphs/B_prime_link_completeness.png
   /Users/songhune/Library/Mobile Documents/com~apple~CloudDocs/Workspace/korean_eda/notebook/experiments/graphs/B_double_prime_attr_coverage.png
