In [1]:
import torch

In [3]:
from importlib.metadata import version

pkgs = [
    "huggingface_hub",  # to download pretrained weights
    "sentencepiece",    # to implement the tokenizer
    "torch",            # to implement the model
]
for p in pkgs:
    print(f"{p} version: {version(p)}")

huggingface_hub version: 0.32.3
sentencepiece version: 0.2.0
torch version: 2.7.0


In [4]:
import torch
import torch.nn as nn

In [16]:
class RMSNorm(nn.Module):
    def __init__(self, dim, eps=1e-5):
        super().__init__()
        self.eps = eps
        self.dim = dim
        self.weight = nn.Parameter(torch.ones(dim))

    def forward(self, x):
        irms = x.pow(2).mean(-1, keepdim=True)
        return self.weight * x * torch.rsqrt(irms + self.eps)

In [17]:
test_tensor = torch.randn(2,3,4)

rms_norm = RMSNorm(dim=test_tensor.size(-1))
rms_norm_torch = nn.RMSNorm(test_tensor.size(-1), eps=1e-5)

assert torch.allclose(rms_norm(test_tensor), rms_norm_torch(test_tensor))

In [18]:
class SiLU(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x * torch.sigmoid(x)

In [19]:
silu = SiLU()

assert torch.allclose(silu(test_tensor), nn.SiLU()(test_tensor))

In [21]:
class LlamaFeedForward(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        # 32 bit를 올려서 컨버팅하는것 보단 dtype 바로 하는게 메모리 덜씀
        self.fc1 = nn.Linear(cfg.emb_dim, cfg.hidden_dim, dtype=cfg.dtype, bias=False)
        self.fc2 = nn.Linear(cfg.emb_dim, cfg.hidden_dim, dtype=cfg.dtype, bias=False)
        self.fc3 = nn.Linear(cfg.hidden_dim, cfg.emb_dim, dtype=cfg.dtype, bias=False)
        self.silu = SiLU()

    def forward(self, x):
        gate = self.silu(self.fc1(x))
        swiglu_out = self.fc2(x) * gate
        x = self.fc3(swiglu_out)
        return x

In [27]:
torch.arange(0, 101, 2)[:50] / 100.0

tensor([0.0000, 0.0200, 0.0400, 0.0600, 0.0800, 0.1000, 0.1200, 0.1400, 0.1600,
        0.1800, 0.2000, 0.2200, 0.2400, 0.2600, 0.2800, 0.3000, 0.3200, 0.3400,
        0.3600, 0.3800, 0.4000, 0.4200, 0.4400, 0.4600, 0.4800, 0.5000, 0.5200,
        0.5400, 0.5600, 0.5800, 0.6000, 0.6200, 0.6400, 0.6600, 0.6800, 0.7000,
        0.7200, 0.7400, 0.7600, 0.7800, 0.8000, 0.8200, 0.8400, 0.8600, 0.8800,
        0.9000, 0.9200, 0.9400, 0.9600, 0.9800])

In [None]:
def precompute_rope_params(dim, theta_base=10_000, context_length=4096):
    assert dim % 2 == 0, "embedding dim must be even"

    # inverse freqs

    # dim이 짝수라고 가정
    exponent = torch.arange(0, dim, 2)[: dim // 2].float() / dim  # shape: (dim//2,)

    # theta_base가 파이썬 float라면 → 연산 순간 0-차원(scalar) Tensor로 승격
    # shape: ()
    power_term = theta_base ** exponent  # broadcasting: () vs (dim//2,) → 결과 shape (dim//2,)

    inv_freqs = 1.0 / power_term        # 1.0 역시 scalar → 최종 shape (dim//2,)

    
    # position ids
    position = torch.arange(context_length)

    # angles
    angles = position[:, None] * inv_freqs[None, :]  # shape: (context_length, dim//2)

    # expand angles
    angles = torch.cat([angles, angles], dim=1) # shape : (context_length, dim)

    # precomptue cos and sin
    cos = torch.cos(angles)
    sin = torch.sin(angles)

    return cos, sin

In [None]:
def compute_rope(x, cos, sin):
    batch_size, num_heads, seq_len, head_dim = x.shape
    assert head_dim % 2 == 0, "Head dimension must be even"

    x1 = x[..., : head_dim // 2] # first half
    x2 = x[..., head_dim // 2:] # second half

    cos = cos[:seq_len, :].unsqueeze(0).unsqueeze(0) # shape: (1, 1, seq_len, head_dim)
    sin = sin[:seq_len, :].unsqueeze(0).unsqueeze(0) # shape: (1, 1, seq_len, head_dim)

    # Apply the rotary transformation
    rotated = torch.cat((-x2, x1), dim=-1)
    x_rotated = (x * cos) + (rotated * sin)
    return x_rotated.to(dtype=x.dtype)

In [None]:
batch_size = 2
context_len = 5
num_heads = 4
head_dim = 16

cos, sin = precompute_rope_params(dim=head_dim, context_length=context_len)

queries = torch.randn(batch_size, num_heads, context_len, head_dim)
keys = torch.randn(batch_size, num_heads, context_len, head_dim)

queries_rot = compute_rope(queries, cos, sin)
keys_rot = compute_rope(keys, cos, sin)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_in, d_out, context_length, num_heads, dtype=None):
        super().__init__()

        self.d_out = d_out
        self.num_heads = num_heads
        self.head_dim = d_out // num_heads
        
        self.to_qkv = nn.Linear(d_in, 3 * d_out, bias=False, dtype=dtype)
        self.out_proj = nn.Linear(d_out, d_out, bias=False, dtype=dtype)

        self.register_buffer("mask", torch.triu(torch.ones(context_length, context_length, dtype=torch.bool), diagonal=1))

        cos, sin = precompute_rope_params(dim=head_dim, context_length=context_length)
        self.register_buffer("cos", cos)
        self.register_buffer("sin", sin)

    def forward(self, x):
        b, num_tokens, d_in = x.shape

        q, k, v = self.to_qkv(x).split(self.d_out, dim=-1)
        q = q.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)
        k = k.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)
        v = v.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)

        k = compute_rope(k, self.cos, self.sin)
        q = compute_rope(q, self.cos, self.sin)

        attn_scores = q @ k.transpose(2, 3)

        mask_bool = self.mask[:num_tokens, :num_tokens]

        attn_scores.masked_fill_(mask_bool, -torch.inf)

        attn_weights = torch.softmax(attn_scores / k.shape[-1]**0.5, dim=-1)
        
        context_vec = (attn_weights @ v).transpose(1, 2)

        context_vec = context_vec.reshape(b, num_tokens, self.d_out)

        context_vec = self.out_proj(context_vec)

        return context_vec




In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.att = MultiHeadAttention(
            d_in = cfg.emb_dim,
            d_out=cfg.emb_dim,
            context_length=cfg.context_length,
            num_heads=cfg.n_heads,
            dtype=cfg.dtype
        )

        self.ff = FeedForward(cfg)
        self.norm1 = RMSNorm(cfg.emb_dim)
        self.norm2 = RMSNorm(cfg.emb_dim)

    def forward(self, x):
        residue = x
        x = self.norm1(x)
        x = attn(x)
        x = x = residue

        residue = x
        x = self.norm2(x)
        x = self.ff(x)
        x = x + residue
        
        return x

In [None]:
class Llama2Model(nn.Module):
    def __init__(self, cfg):
        super().__init__()

        self.tok_emb = nn.Embedding(cfg.vocab_size, cfg.emb_dim, cfg.dtype)

        self.blocks = nn.Sequential(
            *[TransformerBlock(cfg) for _ in range(cfg.n_layers)]
        )

        self.final_norm = RMSNorm(cfg.emb_dim)
        self.out_head = nn.Linear(cfg.emb_dim, cfg.vocab_size, bias=False, dtype=cfg.dtype)

    def forward(self, in_idx):
        # batch_size, seq_len = in_idx.shape
        tok_embs = self.tok_emb(in_idx)
        x = tok_embs
        x = self.blocks(x)
        x = self.final_norm(x)
        logits = self.out_head(x)
        return logits


In [None]:
@dataclass
class Llama2Config:
    vocab_size: int = 32000
    context_length: int = 4096
    emb_dim: int = 4096
    n_heads: int = 32
    n_layers: int = 32
    hiddem_dim = 1108
    dtype = torch.bfloat16

In [None]:
cfg = Llama2Config()
model = Llama2Model(cfg)

In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params:,}")

In [None]:
from huggingface_hub import hf_hub_download

tokenizer_file = hf_hub_download(
    repo_id="meta-llama/Llama-2-7b",
    filename="tokenizer.model",
    local_dir="Llama-2-7b"
)

In [None]:
import sentencepiece as spm

class LlamaTokenizer:
    def __init__(self, tokenizer_file):
        sp = spm.SentencePieceProcessor()
        sp.load(tokenizer_file)
        self.tokenizer = sp

    def encode(self, text):
        return self.tokenizer.encode_as_ids(text)
    def decode(self, ids):
        return self.tokenizer.decode_pieces(ids)
    
    b

In [None]:
def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text)
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)  # Add batch dimension
    return encoded_tensor


def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

def generate(model, idx, max_new_tokens, context_size, temperature=1.0, top_k=None, eos_id=None):
    for _ in range(max_new_tokens):
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
        logits = logits[:, -1, :]

        #  Filter logits with top_k sampling
        if top_k is not None:
            # Keep only top_k values
            top_logits, _ = torch.topk(logits, top_k)
            min_val = top_logits[:, -1]
            logits = torch.where(logits < min_val, torch.tensor(float("-inf")).to(logits.device), logits)

        # Apply temperature
        if temperature > 0.0:
            logits = logits / temperature

            # Apply softmax to get probabilities
            probs = torch.softmax(logits, dim=-1)

            idx_next = torch.multinomial(probs, num_samples=1)

        else:
            idx_next = torch.argmax(logits, dim=-1, keepdim=True)
            
        if idx_next == eos_id:
            break

        idx = torch.cat((idx, idx_next), dim=1)

    return idx

            



In [None]:
token_ids = generate(
    model=model,
    idx=text_to_token_ids("Every effort moves", tokenizer).to(device),
    max_new_tokens=30,
    context_size=cfg.context_length,
    top_k=1,
    temperature=0
)

print("Output text:\n", token_ids_to_text(token_ids, tokenizer))

In [None]:
weights_file = hf_hub_download(
    repo_id="meta-llama/Llama-2-7b",
    filename="consolidated.00.pth",
    local_dir="Llama-2-7b"
)

In [None]:
weights = torch.load(weights_file, weights_only=True)

In [None]:
list(weights.keys())[:15]

In [None]:
def assign(left, right):
    if left.shape != right.shape:
        raise ValueError(f"Shape mismatch: {left.shape} vs {right.shape}")
    
    if isinstance(right, torch.Tensor):
        return nn.Parameter(right.clone().detach())
    else:
        return nn.Parameter(torch.tensor(right))
    

def load_weights_into_llama(model, param_config, params):
    model.tok_emb.weight = assign(model.tok_emb.weight, params["tok_embeddings.weight"])

    for l in range(param_config.n_layers):
        
        # Attnention weights
        model.blocks[l].att.to_qkv.weight = assign(
            model.blocks[l].att.W_query.weight, params[f"layers.{l}.attention.wq.weight"]
        )
        
        model.blocks[l].att.W_key.weight = assign(
            model.blocks[l].att.W_key.weight, params[f"layers.{l}.attention.wk.weight"]
        )

        model.blocks[l].att.W_value.weight = assign(
            model.blocks[l].att.W_value.weight, params[f"layers.{l}.attention.wv.weight"]
        )

        model.blocks[l].att.out_proj.weight = assign(
            model.blocks[l].att.out_proj.weight, params[f"layers.{l}.attention.wo.weight"]
        )

        model.blocks[l].norm1.weight = assign(
            model.blocks[l].norm1.weight, params[f"layers.{l}.attention_norm.weight"]
        )

        # Feed Forward weights
        model.blocks[l].ff.fc1.weight = assign(
            model.blocks[l].ff.fc1.weight, params[f"layers.{l}.feed_forward.w1.weight"]
        )

        # For some reason w2 and w3 are provided in the wrong order in the weights file
        model.blocks[l].ff.fc2.weight = assign(
            model.blocks[l].ff.fc2.weight, params[f'layers.{l}.feed_forward.w3.weight']
        )

        model.blocks[l].ff.fc3.weight = assign(
            model.blocks[l].ff.fc3.weight, params[f'layers.{l}.feed_forward.w2.weight']
        )

        model.blocks[l].norm2.weight = assign(
            model.blocks[l].norm2.weight, params[f'layers.{l}.ffn_norm.weight']
        )

    # Load output layer weights
    model.final_norm.weight = assign(model.final_norm.weight, params["norm.weight"])
    model.out_head.weight = assign(model.out_head.weight, params["output.weight"])


load_weights_into_llama(model, cfg, weights)
model.to(device)

In [None]:
torch.manual_seed(123)

token_ids = generate(
    model=model,
    idx=text_to_token_ids("Every effort", tokenizer).to(device),
    max_new_tokens=25,
    context_size=cfg.context_length,
    top_k=1,
    temperature=0.
)

In [None]:
print("Output text:\n", token_ids_to_text(token_ids, tokenizer))

In [None]:
# instruction-finetuned model

weights_file = hf_hub_download(
    repo_id="meta-llama/Llama-2-7b-chat",
    filename="consolidated.00.pth",
    local_dir="Llama-2-7b-chat"
)

model = Llama2Model(cfg)
weights = torch.load(weights_file, weights_only=True)
load_weights_into_llama(model, config, weights)
model.to(device)

In [None]:
token_ids = generate(
    model=model,
    idx=text_to_token_ids("What do llamas eat?", tokenizer).to(device),
    max_new_tokens=25,
    context_size=LLAMA2_CONFIG_7B["context_length"],
    top_k=1,
    temperature=0.
)

print("Output text:\n", token_ids_to_text(token_ids, tokenizer))