# 기본환경 설정

In [None]:
import os, re, unicodedata, pathlib, random
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
import sentencepiece as spm
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# 경로/디바이스
DATA_DIR = pathlib.Path("data/ko_wikisource_clean"); DATA_DIR.mkdir(exist_ok=True)
RAW_TXT  = DATA_DIR/"raw.txt"
NORM_TXT = DATA_DIR/"norm_nfc.txt"
SPM_PREFIX = str(DATA_DIR/"spm_ko")

device = "cuda" if torch.cuda.is_available() else "cpu"
print("device:", device)


In [None]:
# 필요에 따라 더 추가하세요
URLS = [
    "https://ko.wikisource.org/wiki/B%EB%85%80%EC%9D%98_%EC%86%8C%EB%AC%98",  # B녀의 소묘
    "https://ko.wikisource.org/wiki/%EB%8C%80%EB%8F%99%EA%B0%95%EC%9D%80_%EC%86%8D%EC%82%AD%EC%9D%B8%EB%8B%A4",  # 대동강은 속삭인다
    "https://ko.wikisource.org/wiki/%EB%8C%80%ED%83%95%EC%A7%80_%EC%95%84%EC%A3%BC%EB%A8%B8%EB%8B%88",  # 대탕지 아주머니
    "https://ko.wikisource.org/wiki/%EB%8F%84%EC%8B%9C%EC%99%80_%EC%9C%A0%EB%A0%B9",  # 도시와 유령
    "https://ko.wikisource.org/wiki/%EB%8F%84%EC%A0%95", # 도정
    "https://ko.wikisource.org/wiki/%EB%A7%8C%EB%AC%B4%EB%B0%A9", # 만무방
    "https://ko.wikisource.org/wiki/%EB%AC%B4%EB%AA%85%EC%B4%88", # 무명초
    "https://ko.wikisource.org/wiki/%EB%AC%BC", # 물
    "https://ko.wikisource.org/wiki/%EB%AC%BC%EB%A0%88%EB%B0%A9%EC%95%84", # 물레방아
    "https://ko.wikisource.org/wiki/%EB%B0%98%EC%97%AD%EC%9E%90", # 반역자
    "https://ko.wikisource.org/wiki/%EC%9A%A9%EA%B3%BC_%EC%9A%A9%EC%9D%98_%EB%8C%80%EA%B2%A9%EC%A0%84", # 용과 용의 대격전
    "https://ko.wikisource.org/wiki/%EC%9A%B0%EC%97%B0%EC%9D%98_%EA%B8%B0%EC%A0%81", # 우연의 기적
    "https://ko.wikisource.org/wiki/%EC%9A%B4%EC%88%98_%EC%A2%8B%EC%9D%80_%EB%82%A0", # 운수 좋은 날
    "https://ko.wikisource.org/wiki/%EC%9B%90%EC%88%98%EB%A1%9C_%EC%9D%80%EC%9D%B8", # 원수로 은인
    "https://ko.wikisource.org/wiki/%EC%9C%A0%EB%AC%B4", # 유무
    "https://ko.wikisource.org/wiki/%EC%9C%A4%EA%B4%91%ED%98%B8", # 윤광호
    "https://ko.wikisource.org/wiki/%EC%9D%B4_%EC%9E%94%EC%9D%84", # 이 잔을
    "https://ko.wikisource.org/wiki/%EC%9D%B4%EC%8B%9D%EA%B3%BC_%EB%8F%84%EC%8A%B9", # 이식과 도승
    "https://ko.wikisource.org/wiki/%EC%9D%BC%ED%91%9C%EC%9D%98_%EA%B3%B5%EB%8A%A5", # 일표의 공능
    "https://ko.wikisource.org/wiki/%EC%9E%A1%EC%B4%88", # 잡초
    "https://ko.wikisource.org/wiki/%EC%9E%A5%EB%AF%B8_%EB%B3%91%EB%93%A4%EB%8B%A4", # 장미 병들다
    "https://ko.wikisource.org/wiki/%EC%A0%81%EA%B4%B4%EC%9C%A0%EC%9D%98", # 적괴유의
    "https://ko.wikisource.org/wiki/%EC%A0%81%EB%A7%89%ED%95%9C_%EC%A0%80%EB%85%81", # 적막한 저녁
    "https://ko.wikisource.org/wiki/%EC%A0%81%EB%B9%88", # 적빈
    "https://ko.wikisource.org/wiki/%EC%A0%84%EC%A0%9C%EC%9E%90", # 전제자
    "https://ko.wikisource.org/wiki/%EC%A0%95%EC%97%B4%EC%9D%98_%EB%82%99%EB%9E%91%EA%B3%B5%EC%A3%BC", # 정열의 낙랑공주
    "https://ko.wikisource.org/wiki/%EC%A0%95%EC%A1%B0_(%EA%B9%80%EC%9C%A0%EC%A0%95)", # 정조 (김유정)
    "https://ko.wikisource.org/wiki/%EC%A0%95%ED%9D%AC", # 정희
    "https://ko.wikisource.org/wiki/%EC%A2%85%EC%83%9D%EA%B8%B0", # 종생기
    "https://ko.wikisource.org/wiki/%EC%A3%84%EC%99%80_%EB%B2%8C_(%EA%B9%80%EB%8F%99%EC%9D%B8)", # 죄와 벌 (김동인)
    "https://ko.wikisource.org/wiki/%EC%A7%80%EB%8F%84%EC%9D%98_%EC%95%94%EC%8B%A4", # 지도의 암실
    "https://ko.wikisource.org/wiki/%EC%A7%80%ED%95%98%EC%B4%8C", # 지하촌
    "https://ko.wikisource.org/wiki/%EC%AB%93%EA%B8%B0%EC%96%B4_%EA%B0%80%EB%8A%94_%EC%9D%B4%EB%93%A4", # 쫓기어 가는 이들
    "https://ko.wikisource.org/wiki/%EC%B2%AD%EC%B6%98", # 청춘
    "https://ko.wikisource.org/wiki/%EC%B4%88%EC%B7%8C%EC%97%B0%ED%99%94%ED%8E%B8", # 초췌연화편
    "https://ko.wikisource.org/wiki/%EC%B4%9D%EA%B0%81%EA%B3%BC_%EB%A7%B9%EA%BD%81%EC%9D%B4", # 총각과 맹꽁이
    "https://ko.wikisource.org/wiki/%EC%B9%98%EC%88%99", # 치숙
    "https://ko.wikisource.org/wiki/%ED%83%9C%ED%98%95", # 태형
    "https://ko.wikisource.org/wiki/%ED%88%AC%ED%99%98%EA%B8%88%EC%9D%80", # 투환금은
    "https://ko.wikisource.org/wiki/%ED%95%B4%EB%8F%8B%EC%9D%B4", # 해돋이
    "https://ko.wikisource.org/wiki/%ED%99%8D%EC%9C%A4%EC%84%B1%EA%B3%BC_%EC%A0%88%EB%B6%80", # 홍윤성과 절부
    "https://ko.wikisource.org/wiki/%EC%82%AC%EA%B0%81%EC%A0%84%EA%B8%B0", # 사각전기
    "https://ko.wikisource.org/wiki/%EC%82%AC%EC%83%9D%EC%95%84", # 사생아
    "https://ko.wikisource.org/wiki/%EC%82%AC%EC%9C%84", # 사위
    "https://ko.wikisource.org/wiki/%EC%82%B0%EA%B3%A8", # 산골
    "https://ko.wikisource.org/wiki/%EC%82%B0%EA%B3%A8_%EB%82%98%EA%B7%B8%EB%84%A4", # 산골 나그네
    "https://ko.wikisource.org/wiki/%EC%82%B0%EB%82%A8", # 산남
]

# Data Loading

In [None]:
def fetch_wikisource_text(url):
    headers = {
        "User-Agent": (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/120.0.0.0 Safari/537.36"
        )
    }
    r = requests.get(url, headers=headers, stream=True, timeout=30)
    r.raise_for_status()
    soup = BeautifulSoup(r.text, "lxml")

    # 본문 컨테이너
    content = soup.select_one("div.mw-parser-output")
    if content is None:
        return ""

    # 표/주석/각주 등 제거
    for selector in ["table", "div.reflist", "ol.references", "sup.reference"]:
        for tag in content.select(selector):
            tag.decompose()

    # 문단과 리스트 텍스트만 추출
    texts = []
    for tag in content.find_all(["p","li","dd","dt","blockquote"]):
        txt = tag.get_text(" ", strip=True)
        if txt:
            texts.append(txt)

    text = "\n".join(texts)

    # 위키마크업 잔여 정리
    text = re.sub(r"\[편집\]|\[.*?편집\]", "", text)
    text = re.sub(r"\s+", " ", text)
    return text.strip()


# 수집 → NFC 정규화 → 저장

In [None]:
all_texts = []
for u in tqdm(URLS):
    try:
        t = fetch_wikisource_text(u)
        print(f"Fetched {len(t):,} chars from {u}")
        if len(t) > 1000:
            all_texts.append(f"\n\n### SOURCE: {u}\n{t}")
    except Exception as e:
        print("Failed:", u, e)

raw_text = "\n".join(all_texts)
RAW_TXT.write_text(raw_text, encoding="utf-8")

# 한글 자모 분리 방지: NFC 정규화. ᄀ(U+1100)+ᅡ(U+1161)(분리 자모) ↔ 가(U+AC00)(완성형)
norm_text = unicodedata.normalize("NFC", raw_text) # Normalization Form Composition, Normalization Form Decomposition

# 공백 정리
norm_text = re.sub(r"\r\n?", "\n", norm_text)
norm_text = re.sub(r"[ \t]+", " ", norm_text)
norm_text = re.sub(r"\n{3,}", "\n\n", norm_text)

NORM_TXT.write_text(norm_text, encoding="utf-8")
(len(raw_text), len(norm_text), str(NORM_TXT))


# 토크나이저 생성

In [None]:
import sentencepiece as spm

In [None]:
# 1) 한글 음절만 required에 유지
ko_chars = "".join(chr(c) for c in range(0xAC00, 0xD7A4))  # 가–힣

In [None]:
# 2) 문장부호는 user_defined_symbols로만 (원래 리스트 그대로 사용 가능)
user_syms = ["《","》","〈","〉","—","…","“","”","‘","’","·","『","』","「","」","‧"]

In [None]:
# 3) 혹시라도 겹치는 게 있으면 required에서 제거 (방어적)
required = "".join(ch for ch in ko_chars if ch not in set(user_syms))

In [None]:
kwargs = dict(
    input=str(NORM_TXT),                 # 학습에 사용할 입력 코퍼스 경로(또는 쉼표로 여러 파일). str(...)로 문자열 보장
    model_prefix=SPM_PREFIX,             # 출력 모델/사전 파일 접두사(sp.model / sp.vocab 파일명 앞부분)
    model_type="unigram",                # 토크나이저 모델 유형: unigram(기본), bpe, char, word 등 중 선택

    vocab_size=16000,                    # 최종 어휘 수(특수토큰 포함). 너무 작으면 OOV↑, 너무 크면 메모리/속도↓
    character_coverage=0.9999,           # 문자 커버리지 목표. 저빈도 문자를 잘라낼 기준(라틴 기반 언어는 1.0 근처 추천)

    # ★ 라운드트립(원문 복원) 보장용 설정
    normalization_rule_name="identity",  # 정규화 규칙 비활성화(원문을 그대로 유지). NFKC 등 사용 안 함
    byte_fallback=True,                  # 어휘에 없는 임의의 문자도 바이트 시퀀스로 분해해 인코딩/디코딩 가능하게 함
    hard_vocab_limit=False,              # True면 vocab_size를 절대 넘지 않음. False면 필수 심볼 때문에 소폭 초과 허용

    add_dummy_prefix=True,               # 문장 앞에 공백 토큰(▁)을 강제로 추가해 단어 경계 학습을 안정화
                                         # 디코딩 시에도 공백 복원에 유리(문두 토큰화 일관성↑)

    unk_id=0,                            # <unk> 토큰의 ID(미등록 토큰 대체). -1이면 비활성화
    unk_surface="<unk>",                 # <unk>가 출력될 때 보이는 표면 문자열. unknown (토크나이저가 어휘(vocab)로 표현할 수 없는 문자)
    bos_id=1, bos_piece="<s>",           # 문장 시작 토큰(BOS) 비활성화(-1). 사용하려면 0 이상의 ID로 지정
    eos_id=2, eos_piece="</s>",          # 문장 종료 토큰(EOS) 비활성화(-1). 사용하려면 0 이상의 ID로 지정
    pad_id=-3, pad_piece="<pad>",         # PAD 토큰 비활성화(-1). 필요 시 ID 지정하여 패딩 토큰 사용

    user_defined_symbols=user_syms,      # 그대로 하나의 토큰으로 취급할 사용자 정의 “문자열” 목록(예: <url>, <email>, 특수기호 등)

    max_sentence_length=100000,          # 단일 문장(줄)에서 학습에 사용할 최대 길이(문자 수). 너무 길면 잘라서 사용
    input_sentence_size=4_000_000,       # 학습에 샘플링해 사용할 문장 개수(코퍼스가 매우 클 때 속도/메모리 절충)
    shuffle_input_sentence=True,         # 샘플링 전 문장을 섞어 편향 감소(코퍼스 순서성 영향 완화)
)


try:
    spm.SentencePieceTrainer.train(required_chars=required, **kwargs)
except TypeError:
    # 구버전 sentencepiece면 required_chars 미지원 → 그냥 진행
    print("[WARN] required_chars 미지원 → 커버리지/어휘 설정으로 진행합니다.")
    spm.SentencePieceTrainer.train(**kwargs)

In [None]:
sp = spm.SentencePieceProcessor()
sp.load(SPM_PREFIX + ".model")
print("vocab_size:", sp.get_piece_size(), "unk_id:", sp.unk_id(), "unk_piece:", sp.id_to_piece(sp.unk_id()))

In [None]:
# 라운드트립 체크
def roundtrip_ok(txt: str) -> bool:
    pieces = sp.encode(txt, out_type=str)
    back = sp.decode(pieces)
    return txt == back

test_str = "《테스트》 “따옴표”… — 『괄호』 「문장」 ‧ 끝!"
print("roundtrip:", roundtrip_ok(test_str))

# 토큰화 무손실 확인 (decode == 원문)

In [None]:
import unicodedata

def debug_roundtrip(s):
    s_nfc = unicodedata.normalize("NFC", s)
    pieces = sp.encode(s_nfc, out_type=str)
    ids    = sp.encode(s_nfc, out_type=int)
    back_p = sp.decode(pieces)
    back_i = sp.decode(ids)
    print("pieces:", pieces)
    print("ids   :", ids)
    print("back_p:", back_p)
    print("back_i:", back_i)
    # UNK 존재 여부
    unk_positions = [i for i,t in enumerate(ids) if t == sp.unk_id()]
    if unk_positions:
        print("⚠️  UNK at positions:", unk_positions)
    assert back_i == s_nfc, "decode(encode(x))가 원문과 다릅니다!"

test_line = "성춘향은 이 도령을 깊이 사모하였더라."
debug_roundtrip(test_line)


# 언어모델용 Dataset/DataLoader

In [None]:
# 데이터
ids_full = sp.encode(NORM_TXT.read_text(encoding="utf-8"), out_type=int)

In [None]:
# 데이터 셋
class LMDataset(Dataset):
    def __init__(self, ids, block=256, indices=None):
        self.ids = torch.tensor(ids, dtype=torch.long)   # 전체 코퍼스 토큰 ID 시퀀스를 int64 텐서(1D)로 고정
        self.block = block                               # 입력 시퀀스 길이(컨텍스트; 모델의 토큰 창 크기)
        N = len(self.ids) - block - 1                    # 가능한 샘플 시작 위치 수: i∈[0, N-1]
        if indices is None:
            self.indices = torch.arange(N)               # 시작 인덱스 전체(0..N-1)를 사용
        else:
            self.indices = indices                       # 미리 샘플링/분할된 시작 인덱스 집합 사용(예: train/val 스플릿)

    def __len__(self):
        return len(self.indices)                         # 데이터셋 길이 = 시작 인덱스 개수

    def __getitem__(self, idx):
        i = int(self.indices[idx])                       # idx번째 샘플의 원본 시작 위치 i
        x = self.ids[i:i+self.block]                     # 입력 시퀀스 x: 길이 block
        y = self.ids[i+1:i+self.block+1]                 # 타깃 시퀀스 y: x의 한 칸 오른쪽 시프트(다음 토큰 예측)
        return x, y

In [None]:
# 학습/검증 분할 및 DataLoader 구성
block_size, batch_size = 256, 64                         # 컨텍스트 길이와 배치 크기 설정
N = len(ids_full) - block_size - 1                       # ids_full로부터 만들 수 있는 총 샘플 수
all_idx = torch.randperm(N)                              # 0..N-1 무작위 순열(셔플된 시작 인덱스)
n_tr = int(N*0.9)                                        # 90%를 학습 세트로, 10%를 검증 세트로 분할

train_idx, val_idx = all_idx[:n_tr], all_idx[n_tr:]      # 학습/검증 인덱스 분리

train_ds = LMDataset(ids_full, block_size, train_idx)    # 학습용 데이터셋: 무작위 시작 위치들만 사용
val_ds   = LMDataset(ids_full, block_size, val_idx)      # 검증용 데이터셋: 나머지 시작 위치들 사용

train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True,  drop_last=True)
# - shuffle=True : 각 에폭마다 샘플 순서를 섞어 학습 안정화
# - drop_last=True: 마지막 미완성 배치를 버려 고정 배치 크기 유지(배치정규화 등 안정성↑)

val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, drop_last=False)
# - shuffle=False: 검증 시 순서 유지(재현성)
# - drop_last=False: 검증에서는 남는 샘플까지 포함해 모두 평가

# 모델
- LSTM 은 순차 문맥을 인코딩
- nn.MultiheadAttention(batch_first=True) 로 자기어텐션을 적용
- Causal mask 로 미래 토큰 차단 (언어모델 특성 유지)
- 잔차(residual) + LayerNorm 으로 안정화

In [None]:
# 모델
class LSTMAttnLM(nn.Module):
    def __init__(self, vocab_size, emb=384, hid=384, layers=2, heads=6, drop=0.3):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb)                  # 토큰 ID -> 임베딩 벡터 (B,T,emb)
        self.lstm = nn.LSTM(emb, hid, num_layers=layers,          # LSTM 스택: 입력 emb 차원, 은닉 hid 차원
                            batch_first=True, dropout=drop)       # (B,T,H) 출력, 층간 드롭아웃 적용
        self.attn = nn.MultiheadAttention(hid, heads,             # 은닉 차원 hid에서 멀티헤드 어텐션
                                          dropout=drop, batch_first=True)
        self.ln = nn.LayerNorm(hid)                               # 잔차 이후 LayerNorm
        self.fc = nn.Linear(hid, vocab_size, bias=False)          # LM 헤드: hid -> vocab 로짓
        self.drop = nn.Dropout(drop)

        # weight tying을 위한 투영층:
        # - 임베딩 차원(emb)과 LSTM/어텐션 은닉 차원(hid)이 다르면 hid -> emb로 선형 투영
        # - 같다면 항등연산
        if emb != hid:
            self.proj = nn.Linear(hid, emb, bias=False)
        else:
            self.proj = nn.Identity()

        # 출력층 가중치를 임베딩 가중치와 공유(weight tying)
        # (fc.weight의 shape이 임베딩과 호환되도록 proj로 emb 차원으로 맞춘 다음 사용)
        self.fc.weight = self.emb.weight

    def forward(self, x):
        h,_ = self.lstm(self.emb(x))                               # 임베딩 -> LSTM 통과: h:(B,T,H)
        T = h.size(1)                                              # 시퀀스 길이 T
        mask = torch.full((T,T), float("-inf"), device=h.device).triu(1)
        # causal mask(상삼각; 미래 토큰 보지 않기): i가 j보다 작은 위치는 -inf로 마스킹
        # [[  0, -inf, -inf, -inf],
        #  [  0,    0, -inf, -inf],
        #  [  0,    0,    0, -inf],
        #  [  0,    0,    0,    0]]

        a,_ = self.attn(h, h, h, attn_mask=mask, need_weights=False)
        # 자기어텐션: query/key/value 모두 h. 마스크로 미래 토큰 차단. (B,T,H)

        y = self.ln(h + self.drop(a))                              # 잔차 연결 후 정규화
        y = self.proj(y)                                           # (필요 시) hid -> emb 차원 투영
        return self.fc(y)                                          # 어휘 분포 로짓 반환: (B,T,vocab)


In [None]:
vocab_size = sp.get_piece_size()
model = LSTMAttnLM(vocab_size).to(device)
model

# 학습

In [None]:
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=0.1)

In [None]:
def run_epoch(loader, train=True):
    model.train(train)
    total_loss, total_tokens = 0.0, 0
    for x,y in loader:
        x,y = x.to(device), y.to(device)
        if train: optimizer.zero_grad()
        logits = model(x)
        loss = criterion(logits.reshape(-1, vocab_size), y.reshape(-1))
        if train:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
        total_loss += loss.item() * y.numel()         # 토큰 수로 집계
        total_tokens += y.numel()
    if train: scheduler.step()
    return total_loss / max(1, total_tokens)

In [None]:
# 학습
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

def run_epoch(loader, train=True):
    model.train(train)
    total, n = 0.0, 0
    for x,y in loader:
        x,y = x.to(device), y.to(device)
        if train: optimizer.zero_grad()
        logits = model(x)
        loss = criterion(logits.reshape(-1, vocab_size), y.reshape(-1))
        if train:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
        total += loss.item()*x.size(0); n += x.size(0)
    return total/max(1,n)

best = 1e9
for ep in range(1, 5):  # 데모: 5 epoch
    tr = run_epoch(train_loader, True)
    va = run_epoch(val_loader, False)
    if va < best:
        best = va
        torch.save(model.state_dict(), DATA_DIR/"best_lstm_attn.pt")
    print(f"[{ep:02d}] train_ce={tr:.4f}  val_ce={va:.4f}")
torch.save(model.state_dict(), DATA_DIR/"best_lstm_attn.pt")

# 생성

In [None]:
def sample(model, sp, prompt, max_new_tokens=300, temperature=0.9, top_k=50):
    model.eval()
    x = torch.tensor(sp.encode(prompt, out_type=int), dtype=torch.long, device=device).unsqueeze(0)
    with torch.no_grad():
        for _ in range(max_new_tokens):
            x_cond = x[:, -block_size:]               # 길이 제한
            logits = model(x_cond)                    # (1, T, V)
            last = logits[:, -1, :] / max(1e-6, temperature)
            if top_k:
                v, _ = torch.topk(last, k=min(top_k, last.size(-1)))
                last = torch.where(last < v[:, -1].unsqueeze(-1), torch.full_like(last, -1e10), last)
            probs = torch.softmax(last, dim=-1)
            next_id = torch.multinomial(probs, 1)     # (1,1)
            x = torch.cat([x, next_id], dim=1)
    return sp.decode(x[0].tolist())

model.load_state_dict(torch.load(DATA_DIR/"best_lstm_attn.pt", map_location=device))
print(sample(model, sp, "성춘향은", max_new_tokens=300, temperature=0.9, top_k=50))
