In [1]:
import torch
from torch.utils.cpp_extension import CUDA_HOME

print("PyTorch 버전:", torch.__version__)
print("torch.version.cuda:", torch.version.cuda)
print("torch.cuda.is_available():", torch.cuda.is_available())
print("torch.backends.cudnn.enabled:", torch.backends.cudnn.enabled)
print("CUDA device count:", torch.cuda.device_count())
print("CUDA device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "N/A")
print("CUDA_HOME:", CUDA_HOME)


PyTorch 버전: 2.5.1+cu121
torch.version.cuda: 12.1
torch.cuda.is_available(): True
torch.backends.cudnn.enabled: True
CUDA device count: 1
CUDA device name: NVIDIA GeForce RTX 4050 Laptop GPU
CUDA_HOME: C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v12.9


In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.normalizers import NFKC
from tokenizers.processors import TemplateProcessing
import torch
import torch.nn as nn
import torch.nn.functional as F
import math


# 1. 초기 토크나이저 구성
tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
tokenizer.normalizer = NFKC()
tokenizer.pre_tokenizer = Whitespace()

# 2. 학습자 정의
trainer = BpeTrainer(
    vocab_size=3000,
    special_tokens=["[UNK]", "[PAD]", "[BOS]", "[EOS]"]
)

# 3. 학습 실행
tokenizer.train(["./create-dataset/dataset/train.txt"], trainer)

# 4. 후처리 설정 (BOS, EOS)
tokenizer.post_processor = TemplateProcessing(
    single="[BOS] $A [EOS]",
    pair="[BOS] $A [EOS] $B:1 [EOS]:1",
    special_tokens=[
        ("[BOS]", tokenizer.token_to_id("[BOS]")),
        ("[EOS]", tokenizer.token_to_id("[EOS]")),
    ],
)

# 5. 저장 (Hugging Face 호환 json 파일)
tokenizer.save("tokenizer/kojson-tokenizer.json")


In [55]:
from transformers import PreTrainedTokenizerFast

tokenizer = PreTrainedTokenizerFast(
    tokenizer_file="tokenizer/kojson-tokenizer.json",
    unk_token="[UNK]",
    pad_token="[PAD]",
    bos_token="[BOS]",
    eos_token="[EOS]"
)

print(tokenizer.encode("22,9는 우회로개방 상태야", add_special_tokens=True))


[2, 298, 7, 1229, 836, 330, 3]


In [None]:
class RotaryEmbedding(nn.Module):
    def __init__(self, dim):
        super().__init__()
        inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
        self.register_buffer("inv_freq", inv_freq)

    def forward(self, seq_len, device):
        t = torch.arange(seq_len, device=device).type_as(self.inv_freq)
        freqs = torch.einsum("i,j->ij", t, self.inv_freq)
        emb = torch.cat([torch.sin(freqs), torch.cos(freqs)], dim=-1)
        return emb[None, :, :]

def apply_rotary(x, rope):
    x1, x2 = x[..., ::2], x[..., 1::2]
    sin, cos = rope[..., ::2], rope[..., 1::2]
    return torch.cat([x1 * cos - x2 * sin, x1 * sin + x2 * cos], dim=-1)

class CausalSelfAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads
        self.scale = self.head_dim ** -0.5
        self.qkv = nn.Linear(d_model, d_model * 3)
        self.out_proj = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.rope = RotaryEmbedding(self.head_dim)

    def forward(self, x):
        B, T, D = x.size()
        qkv = self.qkv(x).reshape(B, T, 3, self.n_heads, self.head_dim).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        rope = self.rope(T, x.device)
        q = apply_rotary(q, rope)
        k = apply_rotary(k, rope)

        attn_scores = torch.matmul(q, k.transpose(-2, -1)) * self.scale
        mask = torch.triu(torch.ones(T, T, device=x.device), diagonal=1).bool()
        attn_scores = attn_scores.masked_fill(mask, float("-inf"))

        attn_weights = F.softmax(attn_scores, dim=-1)
        attn_output = torch.matmul(self.dropout(attn_weights), v)
        attn_output = attn_output.transpose(1, 2).reshape(B, T, D)

        return self.out_proj(attn_output)

class FeedForward(nn.Module):
    def __init__(self, d_model, hidden_dim, dropout=0.1):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d_model, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, d_model),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

class TransformerBlock(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        self.ln1 = nn.LayerNorm(d_model)
        self.attn = CausalSelfAttention(d_model, n_heads, dropout)
        self.ln2 = nn.LayerNorm(d_model)
        self.ffn = FeedForward(d_model, 4 * d_model, dropout)

    def forward(self, x):
        x = x + self.attn(self.ln1(x))
        x = x + self.ffn(self.ln2(x))
        return x

class forbus_slm(nn.Module):
    def __init__(self, vocab_size, d_model=512, n_layers=6, n_heads=8, max_len=512, dropout=0.1):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model)
        self.pos_emb = nn.Parameter(torch.zeros(1, max_len, d_model))  # Not used with RoPE
        self.blocks = nn.Sequential(*[
            TransformerBlock(d_model, n_heads, dropout) for _ in range(n_layers)
        ])
        self.ln_f = nn.LayerNorm(d_model)
        self.head = nn.Linear(d_model, vocab_size, bias=False)

        # ✅ Weight Tying
        self.head.weight = self.token_emb.weight

    def forward(self, x):
        B, T = x.shape
        tok = self.token_emb(x)
        x = self.blocks(tok)
        x = self.ln_f(x)
        return self.head(x)


In [66]:
from torch.utils.data import Dataset
class PromptCompletionDataset(Dataset):
    def __init__(self, path, tokenizer, max_len=128):
        import json
        self.samples = []
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                data = json.loads(line)
                text = f"{data['prompt']} -> {data['completion']}"
                tokenized = tokenizer(text, padding='max_length', truncation=True, max_length=max_len)
                self.samples.append(torch.tensor(tokenized["input_ids"]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return self.samples[idx]


In [None]:
from torch.utils.data import DataLoader
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = forbus_slm(vocab_size=tokenizer.vocab_size).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-4)
dataset = PromptCompletionDataset("./create-dataset/dataset/train.jsonl", tokenizer)
loader = DataLoader(dataset, batch_size=32, shuffle=True)
print(device)

for epoch in range(10):
    model.train()
    total_loss = 0
    for batch in loader:
        batch = batch.to(device)
        optimizer.zero_grad()
        output = model(batch[:, :-1])
        loss = F.cross_entropy(output.reshape(-1, tokenizer.vocab_size), batch[:, 1:].reshape(-1), ignore_index=tokenizer.pad_token_id)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} | Loss: {total_loss / len(loader):.4f}")
    torch.save(model.state_dict(), f"models/mini_gpt_epoch{epoch+1}.pth")

cuda
Epoch 1 | Loss: 5.1332
Epoch 2 | Loss: 0.8563
Epoch 3 | Loss: 0.5850
Epoch 4 | Loss: 0.4957
Epoch 5 | Loss: 0.4741
Epoch 6 | Loss: 0.4717
Epoch 7 | Loss: 0.4625
Epoch 8 | Loss: 0.4635
Epoch 9 | Loss: 0.4601
Epoch 10 | Loss: 0.4614


In [68]:
import torch
import torch.nn.functional as F

def apply_repetition_penalty(logits, input_ids, penalty=1.2):
    # 입력 토큰들에 반복 패널티 적용
    for i in range(logits.size(0)):
        token_ids = input_ids[i].unique()
        logits[i, token_ids] /= penalty
    return logits

def generate(model, input_ids, tokenizer, max_new_tokens=64, top_k=10, penalty=1.2, stop_token="}"):
    model.eval()
    device = input_ids.device
    stop_ids = tokenizer.encode(stop_token, add_special_tokens=False)

    for _ in range(max_new_tokens):
        with torch.no_grad():
            logits = model(input_ids)[:, -1, :]  # (B, vocab)
            logits = apply_repetition_penalty(logits, input_ids, penalty=penalty)

            # Top-k sampling
            top_k = min(top_k, logits.size(-1))
            values, indices = torch.topk(logits, top_k, dim=-1)
            probs = F.softmax(values, dim=-1)
            next_token = indices.gather(-1, torch.multinomial(probs, num_samples=1))

            input_ids = torch.cat([input_ids, next_token], dim=1)

            # stop_token 또는 eos_token 발견 시 종료
            if any(t.item() in stop_ids for t in next_token[0]):
                break

            if input_ids.size(1) >= 128:
                break

    return input_ids


In [None]:

# 모델 및 토크나이저 준비
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = forbus_slm(vocab_size=tokenizer.vocab_size).to(device)
model.load_state_dict(torch.load("models/mini_gpt_epoch9.pth", map_location=device))
model.eval()

# 입력 텍스트
test_text = "(22,9)는 '우회로개방' 상태야"
input_ids = tokenizer(test_text, return_tensors="pt")["input_ids"].to(device)

# 텍스트 생성
output_ids = generate(model, input_ids, tokenizer, max_new_tokens=64, top_k=10, penalty=1.2, stop_token="}")
decoded = tokenizer.decode(output_ids[0], skip_special_tokens=True)

# JSON만 추출
start = decoded.find("{")
end = decoded.find("}") + 1
if start != -1 and end > start:
    json_part = decoded[start:end]
    print("⤷ 모델 출력 (JSON):")
    print(json_part)
else:
    print("⤷ 모델 출력:")
    print(decoded)

  model.load_state_dict(torch.load("models/mini_gpt_epoch9.pth", map_location=device))


⤷ 모델 출력 (JSON):
{" action ": " change_traffic_status ", " target ": [ 22 , 9 ], " status ": " detour_opened "}


In [72]:
import torch
from torch.utils.data import DataLoader
import torch.nn.functional as F
from tqdm import tqdm
import json

# === collate_fn 정의 ===
from torch.nn.utils.rnn import pad_sequence
def collate_fn(batch):
    return pad_sequence(batch, batch_first=True, padding_value=tokenizer.pad_token_id)

# === 평가 함수 ===
def evaluate_on_jsonl(model, tokenizer, jsonl_path, max_new_tokens=64, top_k=10, penalty=1.2):
    model.eval()
    device = next(model.parameters()).device

    total = 0
    valid_json = 0
    exact_match = 0
    failed_cases = []

    with open(jsonl_path, "r", encoding="utf-8") as f:
        for line in tqdm(f, desc="Evaluating"):
            item = json.loads(line)
            prompt = item["prompt"]
            target_json_str = item["completion"]

            input_ids = tokenizer(prompt, return_tensors="pt")["input_ids"].to(device)

            # === generate 호출 ===
            with torch.no_grad():
                output_ids = generate(model, input_ids, tokenizer,
                                      max_new_tokens=max_new_tokens,
                                      top_k=top_k,
                                      penalty=penalty,
                                      stop_token="}")
                output_text = tokenizer.decode(output_ids[0], skip_special_tokens=True)

            # === JSON 추출 ===
            start = output_text.find("{")
            end = output_text.rfind("}") + 1
            pred_json_str = output_text[start:end] if start != -1 and end > start else ""

            total += 1
            try:
                pred_json = json.loads(pred_json_str)
                target_json = json.loads(target_json_str)
                valid_json += 1

                if pred_json == target_json:
                    exact_match += 1
                else:
                    failed_cases.append({
                        "prompt": prompt,
                        "expected": target_json,
                        "predicted": pred_json
                    })
            except Exception:
                failed_cases.append({
                    "prompt": prompt,
                    "expected": target_json_str,
                    "decoded": output_text
                })

    # === 결과 출력 ===
    print("\n📊 평가 결과")
    print(f"총 샘플 수: {total}")
    print(f"✅ JSON 파싱 성공: {valid_json}/{total} ({valid_json / total:.2%})")
    print(f"🎯 정확히 일치한 정답: {exact_match}/{total} ({exact_match / total:.2%})")

    if failed_cases:
        print("\n⚠️ 실패 예시 (최대 3개):")
        for case in failed_cases[:3]:
            print(json.dumps(case, ensure_ascii=False, indent=2))


In [None]:
model = forbus_slm(vocab_size=tokenizer.vocab_size).to(device)
model.load_state_dict(torch.load("models/mini_gpt_epoch9.pth", map_location=device))
model.eval()

evaluate_on_jsonl(
    model=model,
    tokenizer=tokenizer,
    jsonl_path="./create-dataset/dataset/test.jsonl",
    max_new_tokens=64,
    top_k=10,
    penalty=1.2
)


  model.load_state_dict(torch.load("models/mini_gpt_epoch9.pth", map_location=device))
Evaluating: 947it [13:22,  1.24it/s]