### Installation

In [41]:
!pip install blobfile>=3.0.0 huggingface_hub>=0.24.7 ipywidgets>=8.1.2 safetensors>=0.4.4 sentencepiece>=0.1.99

### Check versions of important libraries

In [42]:
from importlib.metadata import version

# check versions of needed libs
libs = [
    "huggingface_hub",  # downloads pretrained weights from hf
    "tokenizers",       # handles text-to-tokens conversion  
    "torch",           # main ml framework
]
for lib in libs:
    print(f"{lib} version: {version(lib)}")

huggingface_hub version: 0.33.1
tokenizers version: 0.21.2
torch version: 2.6.0+cu124


### Choose between reasoning model and base model

In [43]:
USE_REASONING = True  # whether to use reasoning model or base

### Define a feedforward network with gating and SwiGLU activation

In [44]:
import torch
import torch.nn as nn

class FeedForward(nn.Module):
    # simple feedforward network with gating mechanisim
    def __init__(self, cfg):
        super().__init__()
        # 3 linear layers: 2 for gating, 1 for output
        self.gate = nn.Linear(cfg["emb_dim"], cfg["hidden_dim"], dtype=cfg["dtype"], bias=False)  
        self.up = nn.Linear(cfg["emb_dim"], cfg["hidden_dim"], dtype=cfg["dtype"], bias=False)
        self.down = nn.Linear(cfg["hidden_dim"], cfg["emb_dim"], dtype=cfg["dtype"], bias=False)

    def forward(self, x):
        # swiglu activation: silu(gate) * up_proj
        gate_out = self.gate(x)
        up_out = self.up(x) 
        activated = nn.functional.silu(gate_out) * up_out  # element-wise multiply
        return self.down(activated)

### RMSNorm: an alternative to LayerNorm with optional Qwen3 compatibility

In [45]:
class RMSNorm(nn.Module):
    # root mean square normaliztion - alternative to layernorm
    def __init__(self, dim, eps=1e-6, bias=False, qwen3_compat=True):
        super().__init__()
        self.eps = eps  # small value to avoid division by zero
        self.qwen3_compat = qwen3_compat  # compatibility flag
        self.weight = nn.Parameter(torch.ones(dim))  # learnable scale
        self.bias = nn.Parameter(torch.zeros(dim)) if bias else None  # optional bias

    def forward(self, x):
        orig_dtype = x.dtype
        
        # convert to float32 for numerical stability
        if self.qwen3_compat:
            x = x.to(torch.float32)

        # compute rms and normalize
        var = x.pow(2).mean(dim=-1, keepdim=True)  # mean of squares
        normed = x * torch.rsqrt(var + self.eps)   # x / sqrt(variance)
        normed = normed * self.weight              # scale

        if self.bias is not None:
            normed = normed + self.bias  # shift if bias exists

        return normed.to(orig_dtype)  # back to original dtype

### Compute rotary position encodings (RoPE) for attention mechanism

In [46]:
def compute_rope_freqs(head_dim, base=10_000, max_len=4096, dtype=torch.float32):
    # rotary position encoding - helps model understand position
    assert head_dim % 2 == 0, "head dim must be even for rope"

    # compute inverse frequencies for each dimension pair
    inv_freqs = 1.0 / (base ** (torch.arange(0, head_dim, 2, dtype=dtype)[:(head_dim // 2)].float() / head_dim))
    
    # position indices from 0 to max_len-1
    pos = torch.arange(max_len, dtype=dtype)
    
    # compute angles: pos * inv_freq for each position-frequency pair
    angles = pos[:, None] * inv_freqs[None, :]  # shape: (max_len, head_dim//2)
    
    # duplicate angles to match full head dimension
    angles = torch.cat([angles, angles], dim=1)  # shape: (max_len, head_dim)
    
    # precompute cos and sin for efficiency
    cos_vals = torch.cos(angles)
    sin_vals = torch.sin(angles)
    
    return cos_vals, sin_vals

### Rotary position encodings (RoPE) to attention input tensor

In [47]:
def apply_rope(x, cos_vals, sin_vals):
    # applies rotary encoding to input tensor
    # x shape: (batch, heads, seq_len, head_dim)
    b, h, seq_len, d = x.shape
    assert d % 2 == 0, "head dim must be even"
    
    # split into two halves for rotation
    x1 = x[..., :d//2]   # first half
    x2 = x[..., d//2:]   # second half
    
    # adjust cos/sin shapes to match input
    cos_vals = cos_vals[:seq_len, :].unsqueeze(0).unsqueeze(0)  # (1,1,seq_len,head_dim)
    sin_vals = sin_vals[:seq_len, :].unsqueeze(0).unsqueeze(0)
    
    # rotation: combine original and rotated components
    rotated = torch.cat((-x2, x1), dim=-1)  # rotate by 90 degrees
    result = (x * cos_vals) + (rotated * sin_vals)
    
    return result.to(dtype=x.dtype)

### Grouped query attention layer that shares key/value projections across head groups to save memory

In [48]:
class GroupedQueryAttention(nn.Module):
    # grouped query attention - saves memory by sharing k,v across heads
    def __init__(self, d_in, n_heads, n_kv_groups, head_dim=None, qk_norm=False, dtype=None):
        super().__init__()
        assert n_heads % n_kv_groups == 0, "heads must be divisible by kv groups"
        
        self.n_heads = n_heads
        self.n_kv_groups = n_kv_groups  
        self.group_size = n_heads // n_kv_groups  # how many q heads per kv head
        
        # calculate head dimension if not provided
        if head_dim is None:
            assert d_in % n_heads == 0, "d_in must divide evenly by n_heads"
            head_dim = d_in // n_heads
            
        self.head_dim = head_dim
        self.d_out = n_heads * head_dim
        
        # projection layers
        self.q_proj = nn.Linear(d_in, self.d_out, bias=False, dtype=dtype)
        self.k_proj = nn.Linear(d_in, n_kv_groups * head_dim, bias=False, dtype=dtype) 
        self.v_proj = nn.Linear(d_in, n_kv_groups * head_dim, bias=False, dtype=dtype)
        self.out_proj = nn.Linear(self.d_out, d_in, bias=False, dtype=dtype)
        
        # optional query/key normalization
        if qk_norm:
            self.q_norm = RMSNorm(head_dim, eps=1e-6)
            self.k_norm = RMSNorm(head_dim, eps=1e-6)
        else:
            self.q_norm = self.k_norm = None

    def forward(self, x, mask, cos_vals, sin_vals):
        b, seq_len, _ = x.shape
        
        # project to q, k, v
        q = self.q_proj(x)  # (b, seq_len, n_heads * head_dim)
        k = self.k_proj(x)  # (b, seq_len, n_kv_groups * head_dim)  
        v = self.v_proj(x)  # (b, seq_len, n_kv_groups * head_dim)
        
        # reshape to separate heads
        q = q.view(b, seq_len, self.n_heads, self.head_dim).transpose(1, 2)
        k = k.view(b, seq_len, self.n_kv_groups, self.head_dim).transpose(1, 2)
        v = v.view(b, seq_len, self.n_kv_groups, self.head_dim).transpose(1, 2)
        
        # apply normalization if enabled
        if self.q_norm:
            q = self.q_norm(q)
        if self.k_norm:
            k = self.k_norm(k)
            
        # apply rotary position encoding
        q = apply_rope(q, cos_vals, sin_vals)
        k = apply_rope(k, cos_vals, sin_vals)
        
        # expand k,v to match number of query heads
        k = k.repeat_interleave(self.group_size, dim=1)
        v = v.repeat_interleave(self.group_size, dim=1)
        
        # compute attention scores and apply mask
        scores = q @ k.transpose(2, 3)  # (b, heads, seq_len, seq_len)
        scores = scores.masked_fill(mask, -torch.inf)  # mask future tokens
        weights = torch.softmax(scores / self.head_dim**0.5, dim=-1)  # scale and softmax
        
        # apply attention weights to values
        out = (weights @ v).transpose(1, 2).reshape(b, seq_len, self.d_out)
        return self.out_proj(out)


### Single transformer block combining grouped attention and feedforward with RMS normalization and residual connections

In [49]:
class TransformerBlock(nn.Module):
    # single transformer layer with attention + feedforward
    def __init__(self, cfg):
        super().__init__()
        self.attn = GroupedQueryAttention(
            d_in=cfg["emb_dim"],
            n_heads=cfg["n_heads"], 
            head_dim=cfg["head_dim"],
            n_kv_groups=cfg["n_kv_groups"],
            qk_norm=cfg["qk_norm"],
            dtype=cfg["dtype"]
        )
        self.ff = FeedForward(cfg)
        self.norm1 = RMSNorm(cfg["emb_dim"], eps=1e-6)  # pre-attention norm
        self.norm2 = RMSNorm(cfg["emb_dim"], eps=1e-6)  # pre-ff norm

    def forward(self, x, mask, cos_vals, sin_vals):
        # attention block with residual connection
        residual = x
        x = self.norm1(x)  # pre-norm
        x = self.attn(x, mask, cos_vals, sin_vals) 
        x = x + residual   # residual connection
        
        # feedforward block with residual connection  
        residual = x
        x = self.norm2(x)  # pre-norm
        x = self.ff(x)
        x = x + residual   # residual connection
        
        return x

### Full transformer model with token embedding, stacked transformer blocks, rotary embeddings, and output projection

In [50]:
class Qwen3Model(nn.Module):
    # main transformer model
    def __init__(self, cfg):
        super().__init__()
        
        # embedding layer converts tokens to vectors
        self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"], dtype=cfg["dtype"])
        
        # stack of transformer blocks
        self.layers = nn.ModuleList([
            TransformerBlock(cfg) for _ in range(cfg["n_layers"])
        ])
        
        # final norm and output projection
        self.final_norm = RMSNorm(cfg["emb_dim"]) 
        self.lm_head = nn.Linear(cfg["emb_dim"], cfg["vocab_size"], bias=False, dtype=cfg["dtype"])
        
        # precompute rope frequencies
        if cfg["head_dim"] is None:
            hd = cfg["emb_dim"] // cfg["n_heads"]
        else:
            hd = cfg["head_dim"]
            
        cos_vals, sin_vals = compute_rope_freqs(
            head_dim=hd,
            base=cfg["rope_base"], 
            max_len=cfg["context_length"]
        )
        # register as buffers so they move with model to gpu/cpu
        self.register_buffer("cos_vals", cos_vals, persistent=False)
        self.register_buffer("sin_vals", sin_vals, persistent=False)
        self.cfg = cfg

    def forward(self, token_ids):
        # convert tokens to embeddings
        x = self.tok_emb(token_ids)
        
        # create causal mask to prevent looking at future tokens
        seq_len = x.shape[1]
        mask = torch.triu(torch.ones(seq_len, seq_len, device=x.device, dtype=torch.bool), diagonal=1)
        
        # pass through all transformer layers
        for layer in self.layers:
            x = layer(x, mask, self.cos_vals, self.sin_vals)
            
        # final normalization and projection to vocab
        x = self.final_norm(x) 
        logits = self.lm_head(x.to(self.cfg["dtype"]))
        return logits


### Model size configurations with hyperparameters for different parameter scales

In [51]:
# model size configuration
MODEL_SIZE = "0.6B"  # can be 0.6B, 1.7B, 4B, 8B, 14B, 32B

# config dicts for different model sizes
if MODEL_SIZE == "0.6B":
    CONFIG = {
        "vocab_size": 151_936,      # number of tokens in vocabulary
        "context_length": 40_960,   # max sequence length during training
        "emb_dim": 1024,           # embedding/hidden dimension
        "n_heads": 16,             # attention heads
        "n_layers": 28,            # transformer layers
        "hidden_dim": 3072,        # feedforward hidden size
        "head_dim": 128,           # dimension per attention head
        "qk_norm": True,           # normalize queries and keys
        "n_kv_groups": 8,          # kv groups for grouped attention
        "rope_base": 1_000_000.0,  # rope frequency base
        "dtype": torch.bfloat16,   # reduced precision for memory
    }
elif MODEL_SIZE == "1.7B":
    CONFIG = {
        "vocab_size": 151_936,
        "context_length": 40_960,
        "emb_dim": 2048,           # 2x bigger than 0.6B
        "n_heads": 16,
        "n_layers": 28, 
        "hidden_dim": 6144,        # 2x bigger
        "head_dim": 128,
        "qk_norm": True,
        "n_kv_groups": 8,
        "rope_base": 1_000_000.0,
        "dtype": torch.bfloat16,
    }
elif MODEL_SIZE == "4B":
    CONFIG = {
        "vocab_size": 151_936,
        "context_length": 40_960,
        "emb_dim": 2560,           # 25% bigger than 1.7B
        "n_heads": 32,             # 2x more heads
        "n_layers": 36,            # 29% more layers  
        "hidden_dim": 9728,        # ~3x bigger ff
        "head_dim": 128,
        "qk_norm": True,
        "n_kv_groups": 8,
        "rope_base": 1_000_000.0,
        "dtype": torch.bfloat16,
    }
elif MODEL_SIZE == "8B":
    CONFIG = {
        "vocab_size": 151_936,
        "context_length": 40_960,
        "emb_dim": 4096,           # 60% bigger than 4B
        "n_heads": 32,
        "n_layers": 36,            # 26% more layers
        "hidden_dim": 12288,
        "head_dim": 128,
        "qk_norm": True,
        "n_kv_groups": 8,
        "rope_base": 1_000_000.0,
        "dtype": torch.bfloat16,
    }
elif MODEL_SIZE == "14B":
    CONFIG = {
        "vocab_size": 151_936,
        "context_length": 40_960,
        "emb_dim": 5120,           # 25% bigger than 8B
        "n_heads": 40,             # 25% more heads
        "n_layers": 40,            # 11% more layers
        "hidden_dim": 17408,       # 42% bigger ff
        "head_dim": 128,
        "qk_norm": True,
        "n_kv_groups": 8,
        "rope_base": 1_000_000.0,
        "dtype": torch.bfloat16,
    }
elif MODEL_SIZE == "32B":
    CONFIG = {
        "vocab_size": 151_936,
        "context_length": 40_960,
        "emb_dim": 5120,
        "n_heads": 64,             # 60% more heads than 14B
        "n_layers": 64,            # 60% more layers
        "hidden_dim": 25600,       # 47% bigger ff
        "head_dim": 128,
        "qk_norm": True,
        "n_kv_groups": 8,
        "rope_base": 1_000_000.0,
        "dtype": torch.bfloat16,
    }
else:
    raise ValueError(f"Model size {MODEL_SIZE} not supported")

### create and test the model with dummy input and print parameter counts

In [52]:
# create and test model
torch.manual_seed(123)  # for reproducible results
model = Qwen3Model(CONFIG)
print(f"Model created: {model}")

Model created: Qwen3Model(
  (tok_emb): Embedding(151936, 1024)
  (layers): ModuleList(
    (0-27): 28 x TransformerBlock(
      (attn): GroupedQueryAttention(
        (q_proj): Linear(in_features=1024, out_features=2048, bias=False)
        (k_proj): Linear(in_features=1024, out_features=1024, bias=False)
        (v_proj): Linear(in_features=1024, out_features=1024, bias=False)
        (out_proj): Linear(in_features=2048, out_features=1024, bias=False)
        (q_norm): RMSNorm()
        (k_norm): RMSNorm()
      )
      (ff): FeedForward(
        (gate): Linear(in_features=1024, out_features=3072, bias=False)
        (up): Linear(in_features=1024, out_features=3072, bias=False)
        (down): Linear(in_features=3072, out_features=1024, bias=False)
      )
      (norm1): RMSNorm()
      (norm2): RMSNorm()
    )
  )
  (final_norm): RMSNorm()
  (lm_head): Linear(in_features=1024, out_features=151936, bias=False)
)


In [53]:
# test forward pass with dummy input
test_output = model(torch.tensor([1, 2, 3]).unsqueeze(0))
print(f"Test output shape: {test_output.shape}")

Test output shape: torch.Size([1, 3, 151936])


In [54]:
# calculate parameter counts
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")

Total parameters: 751,632,384


In [55]:
# account for weight tying between embedding and lm_head
unique_params = total_params - model.tok_emb.weight.numel()  
print(f"Unique parameters: {unique_params:,}")

Unique parameters: 596,049,920


### Estimate total GPU memory needed for model parameters, gradients, and buffers

In [56]:
def calc_memory_usage(model, dtype=torch.float32):
    # calculates model memory requirements
    total_params = 0
    total_grads = 0
    
    for param in model.parameters():
        param_count = param.numel()
        total_params += param_count
        
        # add gradient memory if param requires grad
        if param.requires_grad:
            total_grads += param_count
    
    # add buffer memory (non-trainable tensors)
    total_buffers = sum(buf.numel() for buf in model.buffers())
    
    # bytes per element for given dtype
    bytes_per_elem = torch.tensor(0, dtype=dtype).element_size()
    total_bytes = (total_params + total_grads + total_buffers) * bytes_per_elem
    
    # convert to GB
    total_gb = total_bytes / (1024**3)
    return total_gb

### Display memory usage for different precisions and move model to available device (GPU, MPS, or CPU)

In [57]:
# memory usage for different precisions
print(f"\nMemory usage:")
print(f"float32: {calc_memory_usage(model, torch.float32):.2f} GB")
print(f"bfloat16: {calc_memory_usage(model, torch.bfloat16):.2f} GB")
print(f"float16: {calc_memory_usage(model, torch.float16):.2f} GB")

# device selection and model loading
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("Using CUDA GPU")
elif torch.backends.mps.is_available():
    device = torch.device("mps") 
    print("Using Apple MPS")
else:
    device = torch.device("cpu")
    print("Using CPU")

model.to(device)


Memory usage:
float32: 5.64 GB
bfloat16: 2.82 GB
float16: 2.82 GB
Using CUDA GPU


Qwen3Model(
  (tok_emb): Embedding(151936, 1024)
  (layers): ModuleList(
    (0-27): 28 x TransformerBlock(
      (attn): GroupedQueryAttention(
        (q_proj): Linear(in_features=1024, out_features=2048, bias=False)
        (k_proj): Linear(in_features=1024, out_features=1024, bias=False)
        (v_proj): Linear(in_features=1024, out_features=1024, bias=False)
        (out_proj): Linear(in_features=2048, out_features=1024, bias=False)
        (q_norm): RMSNorm()
        (k_norm): RMSNorm()
      )
      (ff): FeedForward(
        (gate): Linear(in_features=1024, out_features=3072, bias=False)
        (up): Linear(in_features=1024, out_features=3072, bias=False)
        (down): Linear(in_features=3072, out_features=1024, bias=False)
      )
      (norm1): RMSNorm()
      (norm2): RMSNorm()
    )
  )
  (final_norm): RMSNorm()
  (lm_head): Linear(in_features=1024, out_features=151936, bias=False)
)

### Load pretrained weights from checkpoint dict into the model with shape checks and support for weight tying

In [58]:
def load_pretrained_weights(model, config, weights_dict):
    # loads weights from huggingface checkpoint into our model
    def assign_weight(left, right, name="unknown"):
        if left.shape != right.shape:
            raise ValueError(f"Shape mismatch for {name}: {left.shape} vs {right.shape}")
        return torch.nn.Parameter(right.clone().detach() if isinstance(right, torch.Tensor) else torch.tensor(right))

    # load embedding weights
    model.tok_emb.weight = assign_weight(
        model.tok_emb.weight, 
        weights_dict["model.embed_tokens.weight"], 
        "embedding"
    )

    # load transformer layer weights
    for layer_idx in range(config["n_layers"]):
        block = model.layers[layer_idx] 
        attn = block.attn
        
        # attention projection weights
        attn.q_proj.weight = assign_weight(
            attn.q_proj.weight,
            weights_dict[f"model.layers.{layer_idx}.self_attn.q_proj.weight"],
            f"layer_{layer_idx}_q_proj"
        )
        attn.k_proj.weight = assign_weight(
            attn.k_proj.weight,
            weights_dict[f"model.layers.{layer_idx}.self_attn.k_proj.weight"],
            f"layer_{layer_idx}_k_proj"
        )
        attn.v_proj.weight = assign_weight(
            attn.v_proj.weight,
            weights_dict[f"model.layers.{layer_idx}.self_attn.v_proj.weight"],
            f"layer_{layer_idx}_v_proj"
        )
        attn.out_proj.weight = assign_weight(
            attn.out_proj.weight,
            weights_dict[f"model.layers.{layer_idx}.self_attn.o_proj.weight"],
            f"layer_{layer_idx}_out_proj"
        )
        
        # qk normalization weights if they exist
        if hasattr(attn, "q_norm") and attn.q_norm is not None:
            attn.q_norm.weight = assign_weight(
                attn.q_norm.weight,
                weights_dict[f"model.layers.{layer_idx}.self_attn.q_norm.weight"],
                f"layer_{layer_idx}_q_norm"
            )
        if hasattr(attn, "k_norm") and attn.k_norm is not None:
            attn.k_norm.weight = assign_weight(
                attn.k_norm.weight,
                weights_dict[f"model.layers.{layer_idx}.self_attn.k_norm.weight"],
                f"layer_{layer_idx}_k_norm"
            )
        
        # layer normalization weights
        block.norm1.weight = assign_weight(
            block.norm1.weight,
            weights_dict[f"model.layers.{layer_idx}.input_layernorm.weight"],
            f"layer_{layer_idx}_norm1"
        )
        block.norm2.weight = assign_weight(
            block.norm2.weight,
            weights_dict[f"model.layers.{layer_idx}.post_attention_layernorm.weight"],
            f"layer_{layer_idx}_norm2"
        )
        
        # feedforward weights
        block.ff.gate.weight = assign_weight(
            block.ff.gate.weight,
            weights_dict[f"model.layers.{layer_idx}.mlp.gate_proj.weight"],
            f"layer_{layer_idx}_ff_gate"
        )
        block.ff.up.weight = assign_weight(
            block.ff.up.weight,
            weights_dict[f"model.layers.{layer_idx}.mlp.up_proj.weight"],
            f"layer_{layer_idx}_ff_up"
        )
        block.ff.down.weight = assign_weight(
            block.ff.down.weight,
            weights_dict[f"model.layers.{layer_idx}.mlp.down_proj.weight"],
            f"layer_{layer_idx}_ff_down"
        )
    
    # final layer norm and output head
    model.final_norm.weight = assign_weight(
        model.final_norm.weight, 
        weights_dict["model.norm.weight"], 
        "final_norm"
    )
    
    if "lm_head.weight" in weights_dict:
        model.lm_head.weight = assign_weight(
            model.lm_head.weight, 
            weights_dict["lm_head.weight"], 
            "lm_head"
        )
    else:
        # weight tying: reuse embedding weights for output
        print("Using weight tying for output head")
        model.lm_head.weight = assign_weight(
            model.lm_head.weight, 
            weights_dict["model.embed_tokens.weight"], 
            "lm_head_tied"
        )

### Download pretrained model weights from Hugging Face hub based on selected model size and mode (reasoning/base)

In [59]:
# download and load model weights
import json
import os
from pathlib import Path
from safetensors.torch import load_file
from huggingface_hub import hf_hub_download, snapshot_download

if USE_REASONING:
    repo_id = f"Qwen/Qwen3-{MODEL_SIZE}"
else:
    repo_id = f"Qwen/Qwen3-{MODEL_SIZE}-Base"

local_dir = Path(repo_id).parts[-1]  # extract folder name

print(f"Downloading weights from {repo_id}...")

Downloading weights from Qwen/Qwen3-0.6B...


# Download and Load Model Weights (Handling Sharded or Single File)

In [60]:
if MODEL_SIZE == "0.6B":
    # small model has single safetensors file
    weights_file = hf_hub_download(
        repo_id=repo_id,
        filename="model.safetensors",
        local_dir=local_dir,
    )
    weights = load_file(weights_file)
else:
    # larger models are sharded across multiple files
    repo_dir = snapshot_download(repo_id=repo_id, local_dir=local_dir)
    index_file = os.path.join(repo_dir, "model.safetensors.index.json")
    
    with open(index_file, "r") as f:
        index = json.load(f)
    
    weights = {}
    # load all shard files and combine
    for filename in set(index["weight_map"].values()):
        shard_path = os.path.join(repo_dir, filename)
        shard = load_file(shard_path)
        weights.update(shard)

load_pretrained_weights(model, CONFIG, weights)
model.to(device)
del weights  # free memory

print("Model weights loaded successfully!")

Model weights loaded successfully!


# Tokenizer Class for Text-Token Conversion with Chat Formatting

In [61]:
# tokenizer implementation
from tokenizers import Tokenizer

class Qwen3Tokenizer():
    # handles text <-> token conversion for qwen models
    def __init__(self, tokenizer_path="tokenizer.json", repo_id=None, add_gen_prompt=False, add_thinking=False):
        self.tokenizer_path = tokenizer_path
        self.add_gen_prompt = add_gen_prompt
        self.add_thinking = add_thinking
        
        # download tokenizer if not found locally
        tokenizer_file = Path(tokenizer_path)
        if not tokenizer_file.is_file() and repo_id is not None:
            _ = hf_hub_download(
                repo_id=repo_id,
                filename=str(tokenizer_file.name),
                local_dir=str(tokenizer_file.parent.name)
            )
        
        self.tokenizer = Tokenizer.from_file(tokenizer_path)

    def encode(self, text):
        # convert text to tokens using chat format
        messages = [{"role": "user", "content": text}]
        formatted = self.format_chat(
            messages,
            add_gen_prompt=self.add_gen_prompt,
            add_thinking=self.add_thinking
        )
        return self.tokenizer.encode(formatted).ids

    def decode(self, token_ids):
        # convert tokens back to text
        return self.tokenizer.decode(token_ids, skip_special_tokens=False)

    @staticmethod
    def format_chat(messages, add_gen_prompt=False, add_thinking=False):
        # formats messages into qwen chat template
        prompt = ""
        for msg in messages:
            prompt += f"<|im_start|>{msg['role']}\n{msg['content']}<|im_end|>\n"
        
        if add_gen_prompt:
            prompt += "<|im_start|>assistant"
            if not add_thinking:
                prompt += "<|think>\n\n<|/think>\n\n"  # reasoning markers
            else:
                prompt += "\n"
        return prompt

# Initialize Tokenizer with Optional Reasoning Prompts

In [62]:
# setup tokenizer
if USE_REASONING:
    tokenizer_path = f"Qwen3-{MODEL_SIZE}/tokenizer.json"
else:
    tokenizer_path = f"Qwen3-{MODEL_SIZE}-Base/tokenizer.json"

tokenizer = Qwen3Tokenizer(
    tokenizer_path=tokenizer_path,
    repo_id=repo_id,
    add_gen_prompt=USE_REASONING,
    add_thinking=USE_REASONING
)

# Test Tokenization and Decoding with Sample Prompt

In [63]:
# test tokenization
test_prompt = "Give me a short introduction to large language models."
token_ids = tokenizer.encode(test_prompt)
decoded_text = tokenizer.decode(token_ids)
print(f"Test tokenization:")
print(f"Original: {test_prompt}")
print(f"Tokens: {token_ids[:10]}... (showing first 10)")
print(f"Decoded: {decoded_text[:100]}...")

Test tokenization:
Original: Give me a short introduction to large language models.
Tokens: [151644, 872, 198, 35127, 752, 264, 2805, 16800, 311, 3460]... (showing first 10)
Decoded: <|im_start|>user
Give me a short introduction to large language models.<|im_end|>
<|im_start|>assist...


# Text Generation Function with Optional Sampling and Stopping Criteria

In [64]:
def generate_text(model, token_ids, max_tokens=150, context_size=None, temp=0.0, top_k=None, eos_id=None):
    # generates text by predicting next tokens one by one
    if context_size is None:
        context_size = model.cfg["context_length"]
    
    for _ in range(max_tokens):
        # only use last context_size tokens to avoid memory issues
        context_ids = token_ids[:, -context_size:]
        
        with torch.no_grad():
            logits = model(context_ids)
            next_logits = logits[:, -1, :]  # only care about last position
        
        # apply top-k filtering if specified
        if top_k is not None:
            top_vals, _ = torch.topk(next_logits, top_k)
            min_val = top_vals[:, -1]
            next_logits = torch.where(
                next_logits < min_val, 
                torch.tensor(-torch.inf).to(next_logits.device), 
                next_logits
            )
        
        # apply temperature and sample
        if temp > 0.0:
            next_logits = next_logits / temp
            probs = torch.softmax(next_logits, dim=-1)
            next_id = torch.multinomial(probs, num_samples=1)
        else:
            # greedy decoding
            next_id = torch.argmax(next_logits, dim=-1, keepdim=True)
        
        # stop if eos token encountered
        if eos_id is not None and next_id.item() == eos_id:
            break
            
        # append new token
        token_ids = torch.cat((token_ids, next_id), dim=1)
    
    return token_ids


# Generate Text with Timing and Performance Metrics

In [65]:
# generate text and measure performance
import time

print("\nGenerating text...")
torch.manual_seed(123)  # reproducible generation

start_time = time.time()

output_ids = generate_text(
    model=model,
    token_ids=torch.tensor(token_ids, device=device).unsqueeze(0),
    max_tokens=150,
    context_size=CONFIG["context_length"],
    top_k=1,  # greedy
    temp=0.0
)

gen_time = time.time() - start_time
output_text = tokenizer.decode(output_ids.squeeze(0).tolist())

print(f"Generation time: {gen_time:.2f} seconds")
print(f"Tokens generated: {output_ids.shape[1] - len(token_ids)}")
print(f"Tokens per second: {(output_ids.shape[1] - len(token_ids)) / gen_time:.1f}")

if torch.cuda.is_available():
    max_memory = torch.cuda.max_memory_allocated() / (1024**3)
    print(f"Peak GPU memory: {max_memory:.2f} GB")

print(f"\n\nGenerated text:\n{output_text}")


Generating text...
Generation time: 10.83 seconds
Tokens generated: 150
Tokens per second: 13.8
Peak GPU memory: 9.83 GB


Generated text:
<|im_start|>user
Give me a short introduction to large language models.<|im_end|>
<|im_start|>assistant
<think>
Okay, the user wants a short introduction to large language models. Let me start by recalling what I know. Large language models are AI systems that can understand and generate human language. They're trained on massive datasets, so they can learn complex patterns and nuances.

I should mention their ability to understand and generate text, not just specific tasks. Maybe include examples like chatbots or content generation. Also, emphasize their adaptability and efficiency. Oh, and maybe touch on their applications in various fields. Let me check if I'm covering all key points without being too technical. Keep it concise, around 3-4 sentences. Make sure it's clear and easy to understand.
</think>

Large language models (LLMs) are AI syste

# 1. parameter breakdown by component

In [66]:
def analyze_model_components(model):
    component_params = {}
    
    # embedding layer
    emb_params = model.tok_emb.weight.numel()
    component_params['Token Embedding'] = emb_params
    
    # transformer layers
    layer_params = 0
    for layer in model.layers:
        layer_params += sum(p.numel() for p in layer.parameters())
    component_params['Transformer Layers'] = layer_params
    
    # final components
    final_norm_params = sum(p.numel() for p in model.final_norm.parameters())
    lm_head_params = model.lm_head.weight.numel()
    
    component_params['Final Norm'] = final_norm_params
    component_params['LM Head'] = lm_head_params
    
    return component_params

components = analyze_model_components(model)
total = sum(components.values())

print("\nParameter Breakdown:")
for component, count in components.items():
    percentage = (count / total) * 100
    print(f"  {component}: {count:,} ({percentage:.1f}%)")


Parameter Breakdown:
  Token Embedding: 155,582,464 (20.7%)
  Transformer Layers: 440,466,432 (58.6%)
  Final Norm: 1,024 (0.0%)
  LM Head: 155,582,464 (20.7%)


# 2. memory analysis for different batch sizes

In [67]:
def memory_analysis(model, seq_len=1024):
    print(f"\nMemory Analysis (sequence length: {seq_len}):")
    
    batch_sizes = [1, 2, 4, 8, 16, 32]
    for batch_size in batch_sizes:
        if torch.cuda.is_available():
            torch.cuda.reset_peak_memory_stats()
            
        dummy_input = torch.randint(0, 1000, (batch_size, seq_len), device=device)
        
        with torch.no_grad():
            _ = model(dummy_input)
            
        if torch.cuda.is_available():
            memory_gb = torch.cuda.max_memory_allocated() / (1024**3)
            print(f"  Batch size {batch_size:2d}: {memory_gb:.2f} GB")

memory_analysis(model, seq_len=512)


Memory Analysis (sequence length: 512):
  Batch size  1: 4.43 GB
  Batch size  2: 4.72 GB
  Batch size  4: 5.16 GB
  Batch size  8: 6.03 GB
  Batch size 16: 7.78 GB
  Batch size 32: 11.27 GB


# 3. inference speed benchmarks

In [68]:
def speed_benchmark(model, tokenizer):
    print(f"\nSpeed Benchmarks:")
    
    test_prompts = [
        "What is machine learning?",
        "Explain quantum computing in simple terms.",
        "Write a short story about a robot.",
        "List the benefits of renewable energy."
    ]
    
    total_time = 0
    total_tokens = 0
    
    for i, prompt in enumerate(test_prompts):
        input_ids = tokenizer.encode(prompt)
        start = time.time()
        
        output = generate_text(
            model=model,
            token_ids=torch.tensor(input_ids, device=device).unsqueeze(0),
            max_tokens=50,
            temp=0.0
        )
        
        elapsed = time.time() - start
        tokens_generated = output.shape[1] - len(input_ids)
        
        total_time += elapsed
        total_tokens += tokens_generated
        
        print(f"  Prompt {i+1}: {tokens_generated} tokens in {elapsed:.2f}s ({tokens_generated/elapsed:.1f} tok/s)")
    
    print(f"  Average: {total_tokens/total_time:.1f} tokens/second")

speed_benchmark(model, tokenizer)


Speed Benchmarks:
  Prompt 1: 50 tokens in 4.06s (12.3 tok/s)
  Prompt 2: 50 tokens in 3.28s (15.3 tok/s)
  Prompt 3: 50 tokens in 3.28s (15.2 tok/s)
  Prompt 4: 50 tokens in 3.28s (15.3 tok/s)
  Average: 14.4 tokens/second


# 4. model configuration comparison

In [69]:
def compare_model_configs():
    print(f"\nModel Size Comparison:")
    
    configs = {
        "0.6B": {"emb_dim": 1024, "n_heads": 16, "n_layers": 28, "hidden_dim": 3072},
        "1.7B": {"emb_dim": 2048, "n_heads": 16, "n_layers": 28, "hidden_dim": 6144},
        "4B": {"emb_dim": 2560, "n_heads": 32, "n_layers": 36, "hidden_dim": 9728},
        "8B": {"emb_dim": 4096, "n_heads": 32, "n_layers": 36, "hidden_dim": 12288},
        "14B": {"emb_dim": 5120, "n_heads": 40, "n_layers": 40, "hidden_dim": 17408},
        "32B": {"emb_dim": 5120, "n_heads": 64, "n_layers": 64, "hidden_dim": 25600},
    }
    
    print(f"{'Size':<6} {'Emb Dim':<8} {'Heads':<6} {'Layers':<7} {'FF Dim':<8} {'Est Params':<12}")
    print("-" * 55)
    
    for size, cfg in configs.items():
        # rough parameter estimate
        vocab_size = 151_936
        emb_params = vocab_size * cfg["emb_dim"]
        layer_params = cfg["n_layers"] * (
            4 * cfg["emb_dim"]**2 +  # attention projections (rough)
            3 * cfg["emb_dim"] * cfg["hidden_dim"]  # feedforward
        )
        est_params = (emb_params + layer_params) / 1e9  # in billions
        
        print(f"{size:<6} {cfg['emb_dim']:<8} {cfg['n_heads']:<6} {cfg['n_layers']:<7} {cfg['hidden_dim']:<8} {est_params:.1f}B")

compare_model_configs()


Model Size Comparison:
Size   Emb Dim  Heads  Layers  FF Dim   Est Params  
-------------------------------------------------------
0.6B   1024     16     28      3072     0.5B
1.7B   2048     16     28      6144     1.8B
4B     2560     32     36      9728     4.0B
8B     4096     32     36      12288    8.5B
14B    5120     40     40      17408    15.7B
32B    5120     64     64      25600    32.7B


# 5. attention pattern analysis (simplified)

In [70]:
def analyze_attention_heads():
    print(f"\nAttention Configuration:")
    print(f"  Total attention heads: {CONFIG['n_heads']}")
    print(f"  Key-Value groups: {CONFIG['n_kv_groups']}")
    print(f"  Heads per KV group: {CONFIG['n_heads'] // CONFIG['n_kv_groups']}")
    print(f"  Head dimension: {CONFIG['head_dim']}")
    print(f"  Memory savings from GQA: {CONFIG['n_heads'] / CONFIG['n_kv_groups']:.1f}x")

analyze_attention_heads()


Attention Configuration:
  Total attention heads: 16
  Key-Value groups: 8
  Heads per KV group: 2
  Head dimension: 128
  Memory savings from GQA: 2.0x
