In [34]:
import torch.nn as nn
import torch

class TinyllmRotaryEmbedding(nn.Module):
    def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
        """ 旋转位置编码
            - dim (int): 旋转嵌入的维度大小。
            - max_position_embeddings (int): 预计算的最大位置嵌入数，默认为2048。
            - base (int): 用于计算逆频率的基本频率，默认为10000。
        """
        super().__init__()

        self.dim = dim
        self.max_position_embeddings = max_position_embeddings
        self.base = base
        # 计算逆频率值，并将其注册为模型的缓冲区
        inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float().to(device) / self.dim))
        self.register_buffer("inv_freq", inv_freq, persistent=False)

        # 为了支持`torch.jit.trace`功能，立即计算预存储的余弦和正弦缓存
        self._set_cos_sin_cache(
            seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.get_default_dtype()
        )

    def _set_cos_sin_cache(self, seq_len, device, dtype):
        """ 预计算的余弦和正弦缓存
        """
        self.max_seq_len_cached = seq_len
        # 创建一个从0到最大序列长度-1的整数张量，与 inv_freq 具有相同的设备和数据类型
        t = torch.arange(self.max_seq_len_cached, device=device, dtype=torch.int64).type_as(self.inv_freq)

        # 计算每个位置与每个维度的频率，形成频谱矩阵
        freqs = torch.outer(t, self.inv_freq)
        
        # 不同于论文中的实现，这里采用了不同的排列方式以获得相同的计算结果
        emb = torch.cat((freqs, freqs), dim=-1)
        self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
        self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)

    def forward(self, x, seq_len=None):
        # x: [bs, num_attention_heads, seq_len, head_size]
        if seq_len > self.max_seq_len_cached:
            self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)

        return (
            self.cos_cached[:seq_len].to(dtype=x.dtype),
            self.sin_cached[:seq_len].to(dtype=x.dtype),
        )

def rotate_half(x):
    """ 旋转输入一半的 hidden dim
    """
    x1 = x[..., : x.shape[-1] // 2]
    x2 = x[..., x.shape[-1] // 2 :]
    return torch.cat((-x2, x1), dim=-1)


# Copied from transformers.models.mistral.modeling_mistral.apply_rotary_pos_emb
def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1):
    """ 在 qk 应用旋转位置编码

    Args:
        q (`torch.Tensor`): q
        k (`torch.Tensor`): k
        cos (`torch.Tensor`): 旋转位置嵌入的余弦部分
        sin (`torch.Tensor`): 旋转位置嵌入的正弦部分
        position_ids (`torch.Tensor`): 与q和k对应位置的标记索引。例如，在处理KV缓存时，可以使用偏移过的位置ID。
        unsqueeze_dim (`int`, *optional*, defaults to 1): 'unsqueeze_dim' 参数指定了沿哪个维度对 cos[position_ids] 
            和 sin[position_ids] 进行扩展，以便它们能够适当地广播到 q 和 k 的维度上。
            例如，注意 cos[position_ids] 和 sin[position_ids] 具有形状 [batch_size, seq_len, head_dim]。
            那么，如果 q 和 k 的形状分别为 [batch_size, heads, seq_len, head_dim]，
            则设置 unsqueeze_dim=1 可使 cos[position_ids] 和 sin[position_ids] 可以广播到 q 和 k 的形状上。
            同样地，如果 q 和 k 的形状为 [batch_size, seq_len, heads, head_dim]，则应将 unsqueeze_dim 设置为 2
    Returns:
        包含使用旋转位置嵌入变换后的q和k张量的 `tuple(torch.Tensor)`。
    """
    # print("ori cos: ", cos.shape)
    cos = cos[position_ids].unsqueeze(unsqueeze_dim)
    sin = sin[position_ids].unsqueeze(unsqueeze_dim)

    # print("q: ", q.shape)
    # print("cos: ", cos.shape)
    # print("sin: ", sin.shape)
    # print("rotate_half: ", rotate_half(q).shape)
    q_embed = (q * cos) + (rotate_half(q) * sin)
    k_embed = (k * cos) + (rotate_half(k) * sin)
    return q_embed, k_embed

In [None]:
q, k = torch.randn(4, 4, 12, 16), torch.randn(4, 4, 6, 16) # (bs, n_head, seq_len, n_dim)
rotary_emb = TinyllmRotaryEmbedding(dim=16)
cos, sin = rotary_emb(q, seq_len=4)
q, k = apply_rotary_pos_emb(q, k, cos, sin, unsqueeze_dim=2)
q.shape, k.shape

In [None]:
torch.randn(12, 16)[None].unsqueeze(1).shape

In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
from pretrain import load_model_tokenizer


tokenizer, model = load_model_tokenizer(tokenizer_name='Qwen/Qwen3-0.6B', seq_len=1024, device=device)

In [None]:
from pretrain import sample
sample(tokenizer, model, '中国首都是哪?')

In [None]:
ds = load_dataset(tokenizer, num_proc=args.ds_num_proc, seq_len=args.block_size)

In [None]:
import torch

torch.tensor([2, 4]).ne(2)

In [None]:
"12121333".split('<|start|>assistant', maxsplit=1)

In [None]:
from transformers import AutoTokenizer
from trl import DataCollatorForCompletionOnlyLM

tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-0.6B", trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token

# print(tokenizer.max_length)
# Example messages
messages = [
    {"role": "system", "content": "You are a helpful assistant?"},
    {"role": "user", "content": "Hi, who are you?"},
    {"role": "assistant", "content": "I'm an AI assistant."},
    {"role": "user", "content": "What's your job?"},
    {"role": "assistant", "content": "Helping humans solve problems."}
]

# Convert to tokenized input
prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True,  enable_thinking=False)

print(prompt)
# # Tokenize
tokenized = tokenizer(prompt, return_tensors="pt", padding=True)

# Create a batch format expected by the collator
batch = [{
    "input_ids": tokenized["input_ids"][0],
    "attention_mask": tokenized["attention_mask"][0]
}]

# Use the collator to mask non-assistant tokens
collator = DataCollatorForCompletionOnlyLM(
    tokenizer=tokenizer,
    instruction_template="<|im_start|>user",  # 开始 loss 的位置
    response_template="<|im_start|>assistant",  # 如果你想从 assistant 开始一直算 loss，可以省略
    mlm=False,
)

collated = collator(batch)

# Show input_ids and labels (masked)
print("\n🔹 Tokens:")
print([tokenizer.decode([id]) for id in collated['input_ids'][0]])

print("\n🔹 Labels (for loss):")
for token_id, label_id in zip(collated["input_ids"][0], collated["labels"][0]):
    token = tokenizer.decode([token_id.item()])
    label = tokenizer.decode([label_id.item()]) if label_id != -100 else "MASKED"
    print(f"{token!r:20} -> {label!r}")
    

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen1.5-0.5B", trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token

# Example chat
messages = [
    {"role": "user", "content": "What is AI?"},
    {"role": "assistant", "content": "AI stands for Artificial Intelligence."}
]

# Apply template and tokenize directly
out = tokenizer.apply_chat_template(
    messages,
    tokenize=True,            # 🔸 This returns tokenized output (not a string)
    return_tensors="pt"       # 🔸 This returns PyTorch tensors
)

print(out)                 # <class 'transformers.tokenization_utils_base.BatchEncoding'>
# print(out.keys())               # dict_keys(['input_ids', 'attention_mask'])
print(out["input_ids"][0].shape)   # e.g., torch.Size([1, 50])
print(tokenizer.decode(out["input_ids"][0]))  # Decode to see result