##1. GPT-2 pre-trained weights 파일 로드 경로 셀

In [1]:
!pip -q install regex safetensors

import os, json
import regex as re
import numpy as np
from safetensors.numpy import load_file
from google.colab import drive
from pathlib import Path

drive.mount("/content/drive")

def pick_base():
    required = ["model.safetensors", "config.json", "vocab.json", "merges.txt"]

    # 1) 현재 작업 폴더(/content 등)에서 먼저 찾기
    here = Path(".").resolve()
    if all((here / f).exists() for f in required):
        return here

    # 2) Drive 안에서 model.safetensors 위치를 찾아 BASE로 사용
    drive_root = Path("/content/drive/MyDrive")
    hits = list(drive_root.rglob("model.safetensors"))
    if not hits:
        raise FileNotFoundError("Drive(MyDrive)에서 model.safetensors를 찾지 못했습니다. 파일이 Drive에 있는지 확인하세요.")

    base = hits[0].parent  # 첫 번째 발견 폴더를 BASE로 사용
    # 나머지 필수 파일도 같은 폴더에 있는지 확인
    for f in required:
        if not (base / f).exists():
            raise FileNotFoundError(f"'{base}' 폴더에 {f} 가 없습니다. 4개 파일을 같은 폴더에 두세요.")
    return base

BASE = pick_base()

MODEL_PATH  = BASE / "model.safetensors"
CONFIG_PATH = BASE / "config.json"
VOCAB_PATH  = BASE / "vocab.json"
MERGES_PATH = BASE / "merges.txt"

print("BASE:", BASE)
print("MODEL_PATH:", MODEL_PATH)


Mounted at /content/drive
BASE: /content/drive/MyDrive/Colab Notebooks/GPT-2_numpy_Quantized_TF_inference
MODEL_PATH: /content/drive/MyDrive/Colab Notebooks/GPT-2_numpy_Quantized_TF_inference/model.safetensors


## 2. 수식 구현 함수 셀

In [2]:
# =========================================================
# 0) GPT-2 BPE Tokenizer (pure python)
# =========================================================

def bytes_to_unicode():
    """
    GPT-2 byte encoder mapping.
    Maps bytes (0..255) to unicode chars to make reversible BPE.
    """
    bs = list(range(ord("!"), ord("~")+1)) + list(range(ord("¡"), ord("¬")+1)) + list(range(ord("®"), ord("ÿ")+1))
    cs = bs[:]
    n = 0
    for b in range(256):
        if b not in bs:
            bs.append(b)
            cs.append(256 + n)
            n += 1
    cs = [chr(c) for c in cs]
    return dict(zip(bs, cs))

BYTE_ENCODER = bytes_to_unicode()
BYTE_DECODER = {v: k for k, v in BYTE_ENCODER.items()}

# This regex is the standard GPT-2 pattern (as used in many implementations)
PATTERN = re.compile(
    r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""",
    re.IGNORECASE
)

def get_pairs(word):
    # word: tuple of symbols
    pairs = set()
    prev = word[0]
    for ch in word[1:]:
        pairs.add((prev, ch))
        prev = ch
    return pairs

class GPT2BPETokenizer:
    def __init__(self, vocab_json_path: str, merges_txt_path: str):
        with open(vocab_json_path, "r", encoding="utf-8") as f:
            self.encoder = json.load(f)              # token(str) -> id(int)
        self.decoder = {v: k for k, v in self.encoder.items()}  # id -> token(str)

        with open(merges_txt_path, "r", encoding="utf-8") as f:
            merges = f.read().splitlines()
        merges = merges[1:]  # first line is a header
        merges = [tuple(m.split()) for m in merges if m and not m.startswith("#")]
        self.bpe_ranks = {pair: i for i, pair in enumerate(merges)}

        self.cache = {}

        # special token
        self.eos_token_id = 50256

    def bpe(self, token: str):
        if token in self.cache:
            return self.cache[token]

        word = tuple(token)
        pairs = get_pairs(word)

        if not pairs:
            self.cache[token] = token
            return token

        while True:
            bigram = min(pairs, key=lambda p: self.bpe_ranks.get(p, 10**10))
            if bigram not in self.bpe_ranks:
                break

            first, second = bigram
            new_word = []
            i = 0
            while i < len(word):
                try:
                    j = word.index(first, i)
                    new_word.extend(word[i:j])
                    i = j
                except ValueError:
                    new_word.extend(word[i:])
                    break

                if i < len(word)-1 and word[i] == first and word[i+1] == second:
                    new_word.append(first + second)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1

            word = tuple(new_word)
            if len(word) == 1:
                break
            pairs = get_pairs(word)

        out = " ".join(word)
        self.cache[token] = out
        return out

    def encode(self, text: str):
        # 1) split into "words"
        tokens = []
        for m in re.finditer(PATTERN, text):
            piece = m.group(0)
            # 2) byte encode -> unicode string
            piece_bytes = piece.encode("utf-8")
            piece_trans = "".join(BYTE_ENCODER[b] for b in piece_bytes)
            # 3) bpe
            bpe_tokens = self.bpe(piece_trans).split(" ")
            # 4) map to ids
            tokens.extend(self.encoder[t] for t in bpe_tokens)
        return tokens

    def decode(self, token_ids):
        # token_ids -> concatenated token strings -> byte decode
        text = "".join(self.decoder[t] for t in token_ids)
        byte_arr = [BYTE_DECODER[c] for c in text]
        return bytes(byte_arr).decode("utf-8", errors="replace")


# =========================================================
# 1) Config (GPT-2)
# =========================================================
class GPT2Config:
    def __init__(self, vocab_size=50257, n_layers=12, d_model=768, n_heads=12, d_ff=3072, max_seq=1024, ln_eps=1e-5):
        assert d_model % n_heads == 0
        self.vocab_size = vocab_size
        self.n_layers = n_layers
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_head = d_model // n_heads
        self.d_ff = d_ff
        self.max_seq = max_seq
        self.ln_eps = ln_eps


# =========================================================
# 2) Core ops
# =========================================================
def layernorm(x, g, b, eps=1e-5):
    mu = x.mean()
    var = ((x - mu) ** 2).mean()
    xhat = (x - mu) / np.sqrt(var + eps)
    return xhat * g + b

def gelu_new(x):
    # GPT-2 approximation (same style you already used)
    return 0.5 * x * (1.0 + np.tanh(np.sqrt(2.0/np.pi) * (x + 0.044715 * (x**3))))

def softmax_1d(logits):
    m = np.max(logits)
    exps = np.exp(logits - m)
    s = np.sum(exps)
    return exps / (s + 1e-12)

def top_k_filter(logits, k):
    if k <= 0 or k >= logits.shape[0]:
        return logits
    out = logits.copy()
    idx = np.argpartition(out, -k)[:-k]
    out[idx] = -1e30
    return out

def sample_from_probs(probs):
    r = np.random.rand()
    c = np.cumsum(probs)
    return int(np.searchsorted(c, r, side="right"))

# =========================================================
# ３）Quantization utils (W8A16)
# =========================================================
def quantize_w_int8(W: np.ndarray, axis=None):
    """
    Symmetric INT8 quantization.
    axis=None  : per-tensor
    axis=0/1   : per-channel (선택)
    Returns: (W_q:int8, scale:float32 or float32 array)
    """
    W = W.astype(np.float32)

    if axis is None:
        s = np.max(np.abs(W)) / 127.0 + 1e-12
        Wq = np.round(W / s).astype(np.int8)
        return Wq, np.float32(s)

    # per-channel
    maxv = np.max(np.abs(W), axis=axis, keepdims=True)
    s = maxv / 127.0 + 1e-12
    Wq = np.round(W / s).astype(np.int8)
    return Wq, s.astype(np.float32)


def make_int8_weights(W_fp32: dict):
    """
    Build INT8 weights (W8A16).
    - Keep embeddings & LN params in FP32
    - Quantize Linear weights only
    """
    W8 = {}
    W8["tok_emb"] = W_fp32["tok_emb"]      # FP32 유지
    W8["pos_emb"] = W_fp32["pos_emb"]      # FP32 유지
    W8["ln_f_g"]  = W_fp32["ln_f_g"]
    W8["ln_f_b"]  = W_fp32["ln_f_b"]

    W8["blocks"] = []
    for blk in W_fp32["blocks"]:
        b = {}

        # LN / bias는 FP32 유지
        b["ln1_g"] = blk["ln1_g"]; b["ln1_b"] = blk["ln1_b"]
        b["ln2_g"] = blk["ln2_g"]; b["ln2_b"] = blk["ln2_b"]
        b["bqkv"]  = blk["bqkv"]
        b["bo"]    = blk["bo"]
        b["b1"]    = blk["b1"]
        b["b2"]    = blk["b2"]

        # ---- Quantize Linear weights (per-tensor부터 시작) ----
        b["Wqkv_q"], b["Wqkv_s"] = quantize_w_int8(blk["Wqkv"], axis=None)
        b["Wo_q"],   b["Wo_s"]   = quantize_w_int8(blk["Wo"],   axis=None)
        b["W1_q"],   b["W1_s"]   = quantize_w_int8(blk["W1"],   axis=None)
        b["W2_q"],   b["W2_s"]   = quantize_w_int8(blk["W2"],   axis=None)

        W8["blocks"].append(b)

    return W8

def make_fp32_as_int8dict(W_fp32: dict):
    """
    FP32 weights를 INT8-dict 인터페이스로 '포장'한다.
    - Wqkv_q/Wo_q/W1_q/W2_q에 FP32 weight를 그대로 넣고
    - scale은 1.0으로 둬서 결과가 FP32와 동일하게 나오게 함
    """
    W2 = {}
    W2["tok_emb"] = W_fp32["tok_emb"]
    W2["pos_emb"] = W_fp32["pos_emb"]
    W2["ln_f_g"]  = W_fp32["ln_f_g"]
    W2["ln_f_b"]  = W_fp32["ln_f_b"]

    W2["blocks"] = []
    for blk in W_fp32["blocks"]:
        b = {}
        # LN / bias는 그대로
        b["ln1_g"] = blk["ln1_g"]; b["ln1_b"] = blk["ln1_b"]
        b["ln2_g"] = blk["ln2_g"]; b["ln2_b"] = blk["ln2_b"]
        b["bqkv"]  = blk["bqkv"]
        b["bo"]    = blk["bo"]
        b["b1"]    = blk["b1"]
        b["b2"]    = blk["b2"]

        # "q" 자리에 FP32 weight 그대로, scale=1.0
        b["Wqkv_q"] = blk["Wqkv"].astype(np.float32); b["Wqkv_s"] = np.float32(1.0)
        b["Wo_q"]   = blk["Wo"].astype(np.float32);   b["Wo_s"]   = np.float32(1.0)
        b["W1_q"]   = blk["W1"].astype(np.float32);   b["W1_s"]   = np.float32(1.0)
        b["W2_q"]   = blk["W2"].astype(np.float32);   b["W2_s"]   = np.float32(1.0)

        W2["blocks"].append(b)

    return W2



# =========================================================
# ４) KV Cache (same idea, works for GPT-2 incremental decoding)
# =========================================================
class KVCache:
    """
    For each layer:
      K: (T, H, Dh)
      V: (T, H, Dh)
    """
    def __init__(self, cfg: GPT2Config):
        self.cfg = cfg
        self.K = [None for _ in range(cfg.n_layers)]
        self.V = [None for _ in range(cfg.n_layers)]
        self.T = 0

    def append(self, layer, k_h, v_h):
        assert 0 <= layer < self.cfg.n_layers
        assert self.T < self.cfg.max_seq
        assert k_h.shape == (self.cfg.n_heads, self.cfg.d_head)
        assert v_h.shape == (self.cfg.n_heads, self.cfg.d_head)

        if self.K[layer] is None:
            self.K[layer] = k_h[None, :, :]
            self.V[layer] = v_h[None, :, :]
        else:
            self.K[layer] = np.concatenate([self.K[layer], k_h[None, :, :]], axis=0)
            self.V[layer] = np.concatenate([self.V[layer], v_h[None, :, :]], axis=0)

    def step_done(self):
        self.T += 1


# =========================================================
# ５) Load GPT-2 weights from model.safetensors
#    and map into a numpy-friendly dict
# =========================================================
def load_gpt2_weights_from_safetensors(model_safetensors_path: str):
    """
    Returns a dict W with:
      tok_emb: (V, D)
      pos_emb: (max_seq, D)
      blocks: list of per-layer dict:
        ln1_g, ln1_b
        Wqkv, bqkv
        Wo, bo
        ln2_g, ln2_b
        W1, b1
        W2, b2
      ln_f_g, ln_f_b
    We tie output head to tok_emb (standard GPT-2).
    """
    T = load_file(model_safetensors_path)

    W = {}
    W["tok_emb"] = T["wte.weight"].astype(np.float32)   # (V, D)
    W["pos_emb"] = T["wpe.weight"].astype(np.float32)   # (max_seq, D)

    # blocks
    blocks = []
    i = 0
    while f"h.{i}.ln_1.weight" in T:
        blk = {}
        blk["ln1_g"] = T[f"h.{i}.ln_1.weight"].astype(np.float32)
        blk["ln1_b"] = T[f"h.{i}.ln_1.bias"].astype(np.float32)

        # fused qkv
        blk["Wqkv"] = T[f"h.{i}.attn.c_attn.weight"].astype(np.float32)  # (D, 3D)
        blk["bqkv"] = T[f"h.{i}.attn.c_attn.bias"].astype(np.float32)    # (3D,)

        blk["Wo"] = T[f"h.{i}.attn.c_proj.weight"].astype(np.float32)    # (D, D)
        blk["bo"] = T[f"h.{i}.attn.c_proj.bias"].astype(np.float32)      # (D,)

        blk["ln2_g"] = T[f"h.{i}.ln_2.weight"].astype(np.float32)
        blk["ln2_b"] = T[f"h.{i}.ln_2.bias"].astype(np.float32)

        blk["W1"] = T[f"h.{i}.mlp.c_fc.weight"].astype(np.float32)       # (D, 4D)
        blk["b1"] = T[f"h.{i}.mlp.c_fc.bias"].astype(np.float32)         # (4D,)
        blk["W2"] = T[f"h.{i}.mlp.c_proj.weight"].astype(np.float32)     # (4D, D)
        blk["b2"] = T[f"h.{i}.mlp.c_proj.bias"].astype(np.float32)       # (D,)

        blocks.append(blk)
        i += 1

    W["blocks"] = blocks
    W["ln_f_g"] = T["ln_f.weight"].astype(np.float32)
    W["ln_f_b"] = T["ln_f.bias"].astype(np.float32)

    return W


# =========================================================
# ６) One-step forward (GPT-2 style: learned pos emb, fused QKV)
# =========================================================
def attention_step(cfg: GPT2Config, blk, x_ln, pos, cache: KVCache, layer: int):
    """
    x_ln: (D,) already layernormed
    returns attention output (D,)
    """
    D, H, Dh = cfg.d_model, cfg.n_heads, cfg.d_head

    assert cache.T == pos  # incremental decoding

    # fused qkv: (D,) @ (D, 3D) -> (3D,)
    Wqkv = blk["Wqkv_q"].astype(np.float32) * blk["Wqkv_s"]
    qkv = x_ln @ Wqkv + blk["bqkv"]
    q, k, v = np.split(qkv, 3)

    # reshape to heads: (H, Dh)
    q = q.reshape(H, Dh)
    k = k.reshape(H, Dh)
    v = v.reshape(H, Dh)

    # append current token k,v to cache
    cache.append(layer, k.astype(np.float32), v.astype(np.float32))

    K = cache.K[layer]  # (T+1, H, Dh)
    V = cache.V[layer]  # (T+1, H, Dh)
    Tlen = K.shape[0]

    scale = 1.0 / np.sqrt(Dh)

    out = np.zeros((H, Dh), dtype=np.float32)
    for h in range(H):
        scores = (K[:, h, :] @ q[h]) * scale   # (Tlen,)
        probs = softmax_1d(scores)
        out[h] = probs @ V[:, h, :]            # (Dh,)

    out = out.reshape(D)
    Wo = blk["Wo_q"].astype(np.float32) * blk["Wo_s"]
    out = out @ Wo + blk["bo"]
    return out.astype(np.float32)

def ffn_step(cfg: GPT2Config, blk, x_ln2):
    W1 = blk["W1_q"].astype(np.float32) * blk["W1_s"]
    h  = x_ln2 @ W1 + blk["b1"]
    h = gelu_new(h)
    W2 = blk["W2_q"].astype(np.float32) * blk["W2_s"]
    y = h @ W2 + blk["b2"]       # (D,)
    return y.astype(np.float32)

def gpt2_step(cfg: GPT2Config, W, token_id: int, pos: int, cache: KVCache):
    """
    One token step forward; updates cache. Returns last hidden (D,)
    """
    assert 0 <= token_id < cfg.vocab_size
    assert 0 <= pos < cfg.max_seq

    # GPT-2 input: tok_emb + pos_emb
    x = (W["tok_emb"][token_id] + W["pos_emb"][pos]).astype(np.float32)

    for layer in range(cfg.n_layers):
        blk = W["blocks"][layer]

        # LN1 -> Attn -> Residual (pre-LN)
        x_ln = layernorm(x, blk["ln1_g"], blk["ln1_b"], eps=cfg.ln_eps).astype(np.float32)
        a = attention_step(cfg, blk, x_ln, pos, cache, layer)
        x = (x + a).astype(np.float32)

        # LN2 -> FFN -> Residual
        x_ln2 = layernorm(x, blk["ln2_g"], blk["ln2_b"], eps=cfg.ln_eps).astype(np.float32)
        f = ffn_step(cfg, blk, x_ln2)
        x = (x + f).astype(np.float32)

    x = layernorm(x, W["ln_f_g"], W["ln_f_b"], eps=cfg.ln_eps).astype(np.float32)

    cache.step_done()
    return x

def logits_from_hidden(W, h):
    # GPT-2 typically ties lm_head weight to token embedding
    # logits = h @ W["tok_emb"].T
    return (h @ W["tok_emb"].T).astype(np.float32)

# =========================================================
# ７) Generation
# =========================================================
def generate(cfg: GPT2Config, W, tokenizer: GPT2BPETokenizer, prompt: str,
             max_new_tokens=64, temperature=1.0, top_k=0, do_sample=False, stop_on_eos=True):
    tokens = tokenizer.encode(prompt)
    assert len(tokens) <= cfg.max_seq

    cache = KVCache(cfg)

    # prefill
    h = None
    for pos, tid in enumerate(tokens):
        h = gpt2_step(cfg, W, tid, pos, cache)

    # generate loop
    for _ in range(max_new_tokens):
        logits = logits_from_hidden(W, h)

        t = max(1e-6, float(temperature))
        logits = logits / t
        logits = top_k_filter(logits, top_k)
        probs = softmax_1d(logits)

        if do_sample:
            next_id = sample_from_probs(probs)
        else:
            next_id = int(np.argmax(probs))

        tokens.append(next_id)

        if stop_on_eos and next_id == tokenizer.eos_token_id:
            break

        pos = len(tokens) - 1
        if pos >= cfg.max_seq:
            break
        h = gpt2_step(cfg, W, next_id, pos, cache)

    return tokenizer.decode(tokens), tokens



## 3. 추론 실행 셀

In [3]:
# =========================================================
# ８) Load GPT-2 & Run FP32/INT8 Generation
# =========================================================

# 1) tokenizer
tokenizer = GPT2BPETokenizer(VOCAB_PATH, MERGES_PATH)

# 2) raw FP32 weights (원본 그대로)
W_raw = load_gpt2_weights_from_safetensors(MODEL_PATH)

# 3) config
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
    c = json.load(f)

cfg = GPT2Config(
    vocab_size=c.get("vocab_size", 50257),
    n_layers=c.get("n_layer", 12),
    d_model=c.get("n_embd", 768),
    n_heads=c.get("n_head", 12),
    d_ff=4 * c.get("n_embd", 768),
    max_seq=c.get("n_positions", 1024),
    ln_eps=c.get("layer_norm_epsilon", 1e-5),
)

# FP32 비교용 "포장"
W_fp32 = make_fp32_as_int8dict(W_raw)

# INT8 적용용 (반드시 raw에서 만들어야 함)
W_int8 = make_int8_weights(W_raw)

prompt = "Hello! This is a NumPy GPT-2. "

# FP32 (greedy)
out_fp32, toks_fp32 = generate(cfg, W_fp32, tokenizer, prompt,
                       max_new_tokens=40, do_sample=False)

# INT8 (greedy)
out_int8, toks_int8 = generate(cfg, W_int8, tokenizer, prompt,
                       max_new_tokens=40, do_sample=False)

print("FP32:", out_fp32)
print("FP32 #tokens:", len(toks_fp32))

print("INT8:", out_int8)
print("INT8 #tokens:", len(toks_int8))


FP32: Hello! This is a NumPy GPT-2.  I'm not sure if I'm going to use it, but I'm sure it's a good one.  I'm not sure if I'm going to use it, but I'm
FP32 #tokens: 53
INT8: Hello! This is a NumPy GPT-2.  I'm not sure if I'm going to be able to use it in my Python code, but I'm sure I can use it in my Python code.  I'm sure I can
INT8 #tokens: 53
