# Pretrain 预训练详解

## 数据预处理

- Step 1.导入库

In [1]:
import itertools
import re
import json
import jsonlines
import psutil
import ujson
import numpy as np
import pandas as pd
from transformers import AutoTokenizer
from datasets import load_dataset
import os
from tqdm import tqdm

- Step 2.定义BOS和EOS标记，并加载分词器

In [2]:
# 定义BOS和EOS标记
bos_token = "<s>"      # beginning of sentence
eos_token = "</s>"     # end of sentence

In [3]:
# 加载训练好的分词器路径
tokenizer = AutoTokenizer.from_pretrained('../model/mateconv_tokenizer', use_fast=False)
print(f'加载的tokenizer词表大小: {len(tokenizer)}')

加载的tokenizer词表大小: 6400


- Step 3.读取部分数据

In [4]:
def preview_dataset(file_path, num_lines=5):
    """
    读取并展示数据集的前 num_lines 行
    """
    # 检查文件是否存在
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"{file_path} 文件不存在，请检查路径！")

    # 逐行读取并展示前 num_lines 行
    with jsonlines.open(file_path) as reader:
        for idx, obj in enumerate(reader):
            print(f"第 {idx + 1} 行数据: {obj}")
            if idx + 1 >= num_lines:
                break

# 指定文件路径和需要展示的行数
file_path = '../dataset/mobvoi_seq_monkey_general_open_corpus/mobvoi_seq_monkey_general_open_corpus.jsonl'
preview_dataset(file_path, num_lines=1)

第 1 行数据: {'text': '在查处虚开增值税专用发票案件中，常常涉及进项留抵税额和税款损失的认定和处理。在计算税款损失时，要不要将进项留抵税额包括在内？\n对此，实务中存在意见分歧。\n有人主张归并，即计算税款损失时包括进项留抵税额；\n有人主张剥离，即计算税款损失时剔除进项留抵税额。分析这个问题，需要确定进项留抵税额与税款损失之间是什么关系。\n理清这二者之间的关系，首先需要了解增值税的概念和其抵扣机制。增值税是以商品（货物、服务等）在流转过程中产生的增值额作为计税依据而征收的一种流转税。为避免重复征税，在增值税中存在抵扣链条机制。\n一般而言，交易上游企业缴纳的税额，交易下游企业可以对相应的税额进行抵扣。\n对增值税一般纳税人来说，其购进货物、服务等取得增值税专用发票，发票上的税额是进项税额。\n其出售货物、服务等，向购买方开具增值税专用发票，发票的税额是销项税额。\n一般情况下，销项税额减去进项税额的金额是应纳税额，企业根据应纳税额按期申报纳税。\n其次需要了解进项留抵税额的概念及产生原因。\n在计算销项税额和进项税额的差额时，有时会出现负数，即当期进项税额大于当期销项税额。这个差额在当期未实现抵扣，为进项留抵税额，在以后纳税人有销项税额时再进行抵扣。\n企业产生进项留抵税额的主要原因是其进项税额和销项税额时间上的不一致。\n例如，企业前期集中采购货物和服务，投资大，销项税率低于进项税率等。\n从税款抵扣的角度看，进项留抵税额只是购进的这部分进项税额参与到增值税应纳税额的计算过程中，但是其对应的进项税额抵扣还未真正实现，一般要等到其未来有相应的销项税额时，才能真正实现进项税额抵扣。\n可见，进项留抵税额处于不确定状态，能否抵扣受到很多因素影响，例如企业经营中断，没有销项税额，这时进项留抵税额就无法实现抵扣。但如果企业按照税收政策规定申请进项留抵退税，进项税额抵扣就随之实现。\n最后需要了解税款损失的概念。\n税款损失，通常是指因虚开增值税专用发票，导致国家税款被骗或者流失的金额。关于税款损失，实务中有多种表述。\n例如，北京大学法学院教授陈兴良曾谈到虚开行为本身不会造成国家税款损失，只有利用发票抵扣时才会造成国家税款损失。刘兵等编著的《虚开增值税专用发票案例司法观点和案例解析》一书中提到：“给国家税款造成损失的数额，实际上就是被骗取的国家税款在侦查终

- Step 4.统计与清理数据

In [5]:
def get_total_lines(file_path):
    """
    获取 JSONL 文件的总行数，不忽略错误，保证能够全面统计。
    """
    with open(file_path, 'rb') as f:  # 使用二进制模式避免编码问题
        return sum(1 for _ in f)

In [None]:
file_path = '../dataset/mobvoi_seq_monkey_general_open_corpus/mobvoi_seq_monkey_general_open_corpus.jsonl'
sum = get_total_lines(file_path)

In [6]:
def check_jsonl_format(file_path):
    """
    检查 JSONL 文件中的每一行是否是有效的 JSON 格式，带进度显示，并统计所有有问题的行。
    """
    total_lines = get_total_lines(file_path)  # 获取文件总行数
    valid_lines = 0
    invalid_lines = 0

    # 使用逐行读取，捕获 JSON 和编码错误
    with open(file_path, 'rb') as f:  # 使用二进制读取避免编码问题
        # 使用 tqdm 进度条显示检查进度
        for idx, line in tqdm(enumerate(f), total=total_lines, desc="Checking JSONL format"):
            try:
                # 先尝试将每行数据解码为 UTF-8
                decoded_line = line.decode('utf-8')
                # 然后检查是否是有效的 JSON 格式
                obj = jsonlines.Reader([decoded_line]).read()
                valid_lines += 1
            except UnicodeDecodeError as e:
                print(f"Encoding error at line {idx + 1}: {e}")
                invalid_lines += 1
            except jsonlines.InvalidLineError as e:
                print(f"Invalid JSON at line {idx + 1}: {e}")
                invalid_lines += 1

    print(f"检查完成，文件中共有 {valid_lines} 行有效的 JSON 数据，{invalid_lines} 行无效的 JSON 数据。")
    return valid_lines, invalid_lines

In [7]:
valid_lines, invalid_lines = check_jsonl_format(file_path)

Checking JSONL format: 100%|██████████| 13000000/13000000 [02:56<00:00, 73739.94it/s]

检查完成，文件中共有 13000000 行有效的 JSON 数据，0 行无效的 JSON 数据。





In [14]:
def remove_invalid_line(file_path, output_path, invalid_line_num):
    """
    读取文件，跳过指定的无效行，并将结果写入新文件
    """
    with open(file_path, 'rb') as infile, open(output_path, 'wb') as outfile:
        for idx, line in enumerate(infile):
            if idx + 1 != invalid_line_num:  # 跳过无效行
                outfile.write(line)

# 使用该函数删除第 9598787 行并保存为新文件
# remove_invalid_line('./dataset/mobvoi_seq_monkey_general_open_corpus.jsonl',
#                     './dataset/mobvoi_seq_monkey_general_open_corpus_cleaned.jsonl', 
#                     invalid_line_num=9598787)

- Step 5.定义处理函数（逐块处理数据）

In [19]:
def pretrain_process(chunk_size=50000):
    chunk_idx = 0

    with jsonlines.open('./dataset/mobvoi_seq_monkey_general_open_corpus/mobvoi_seq_monkey_general_open_corpus.jsonl') as reader:
        with open('./dataset/pretrain_data_demo.csv', 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['text'])

            while True:
                chunk = list(itertools.islice(reader, chunk_size))
                if not chunk:
                    break

                for idx, obj in enumerate(chunk):
                    try:
                        content = obj.get('text', '')
                        if len(content) > 512:
                            continue
                        writer.writerow([content])
                    except UnicodeDecodeError as e:
                        print(f"Skipping invalid line {chunk_idx * chunk_size + idx + 1}: {e}")
                        continue
                chunk_idx += 1
                print('chunk:', ((chunk_idx - 1) * chunk_size, chunk_idx * chunk_size), 'process end')

## Pretrain预训练

- 模型预训练
- 15个epoch预训练过程

In [None]:
# !deepspeed --master_port 29500 --num_gpus=2 pretrain.py --epochs 15

```python
import os
import platform
import argparse
import time
import math
import warnings

import pandas as pd
import torch
import torch.distributed as dist
from torch import optim
from torch.nn.parallel import DistributedDataParallel
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader, DistributedSampler
from contextlib import nullcontext

from transformers import AutoTokenizer

from model.model import Transformer
from model.LMConfig import LMConfig
from model.dataset import PretrainDataset

warnings.filterwarnings('ignore')


def Logger(content):
    if not ddp or dist.get_rank() == 0:
        print(content)


def get_lr(it, all):
    warmup_iters = args.warmup_iters
    lr_decay_iters = all
    min_lr = args.learning_rate / 10

    if it < warmup_iters:
        return args.learning_rate * it / warmup_iters
    if it > lr_decay_iters:
        return min_lr
    decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)
    assert 0 <= decay_ratio <= 1
    coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))
    return min_lr + coeff * (args.learning_rate - min_lr)


def train_epoch(epoch, wandb):
    start_time = time.time()
    for step, (X, Y, loss_mask) in enumerate(train_loader):
        X = X.to(args.device)
        Y = Y.to(args.device)
        loss_mask = loss_mask.to(args.device)

        lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

        with ctx:
            out = model(X, Y)
            loss = out.last_loss / args.accumulation_steps
            loss_mask = loss_mask.view(-1)
            loss = torch.sum(loss * loss_mask) / loss_mask.sum()

        scaler.scale(loss).backward()

        if (step + 1) % args.accumulation_steps == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)

            scaler.step(optimizer)
            scaler.update()

            optimizer.zero_grad(set_to_none=True)

        if step % args.log_interval == 0:
            spend_time = time.time() - start_time
            Logger(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.7f} epoch_Time:{}min:'.format(
                    epoch,
                    args.epochs,
                    step,
                    iter_per_epoch,
                    loss.item() * args.accumulation_steps,
                    optimizer.param_groups[-1]['lr'],
                    spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))

            if (wandb is not None) and (not ddp or dist.get_rank() == 0):
                wandb.log({"loss": loss.item() * args.accumulation_steps,
                           "lr": optimizer.param_groups[-1]['lr'],
                           "epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})

        if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
            model.eval()
            moe_path = '_moe' if lm_config.use_moe else ''
            ckp = f'{args.save_dir}/pretrain_{lm_config.dim}{moe_path}.pth'

            if isinstance(model, torch.nn.parallel.DistributedDataParallel):
                state_dict = model.module.state_dict()
            else:
                state_dict = model.state_dict()

            torch.save(state_dict, ckp)
            model.train()


def init_model():
    def count_parameters(model):
        return sum(p.numel() for p in model.parameters() if p.requires_grad)

    tokenizer = AutoTokenizer.from_pretrained('./model/minimind_tokenizer')

    model = Transformer(lm_config).to(args.device)
    # moe_path = '_moe' if lm_config.use_moe else ''

    Logger(f'LLM总参数量：{count_parameters(model) / 1e6:.3f} 百万')
    return model, tokenizer


def init_distributed_mode():
    if not ddp: return
    global ddp_local_rank, DEVICE

    dist.init_process_group(backend="nccl")
    ddp_rank = int(os.environ["RANK"])
    ddp_local_rank = int(os.environ["LOCAL_RANK"])
    ddp_world_size = int(os.environ["WORLD_SIZE"])
    DEVICE = f"cuda:{ddp_local_rank}"
    torch.cuda.set_device(DEVICE)


# torchrun --nproc_per_node 2 1-pretrain.py
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="MiniMind Pretraining")
    parser.add_argument("--out_dir", type=str, default="out", help="Output directory")
    parser.add_argument("--epochs", type=int, default=20, help="Number of epochs")
    parser.add_argument("--batch_size", type=int, default=64, help="Batch size")
    parser.add_argument("--learning_rate", type=float, default=2e-4, help="Learning rate")
    parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu",
                        help="Device to use")
    parser.add_argument("--dtype", type=str, default="bfloat16", help="Data type")
    parser.add_argument("--use_wandb", action="store_true", help="Use Weights & Biases")
    parser.add_argument("--wandb_project", type=str, default="MiniMind-Pretrain", help="Weights & Biases project name")
    parser.add_argument("--num_workers", type=int, default=1, help="Number of workers for data loading")
    parser.add_argument("--data_path", type=str, default="./dataset/pretrain_data.csv", help="Path to training data")
    parser.add_argument("--ddp", action="store_true", help="Use DistributedDataParallel")
    parser.add_argument("--accumulation_steps", type=int, default=8, help="Gradient accumulation steps")
    parser.add_argument("--grad_clip", type=float, default=1.0, help="Gradient clipping threshold")
    parser.add_argument("--warmup_iters", type=int, default=0, help="Number of warmup iterations")
    parser.add_argument("--log_interval", type=int, default=100, help="Logging interval")
    parser.add_argument("--save_interval", type=int, default=1000, help="Model saving interval")
    parser.add_argument('--local_rank', type=int, default=-1, help='local rank for distributed training')

    args = parser.parse_args()

    lm_config = LMConfig()
    max_seq_len = lm_config.max_seq_len
    args.save_dir = os.path.join(args.out_dir)
    os.makedirs(args.save_dir, exist_ok=True)
    os.makedirs(args.out_dir, exist_ok=True)
    tokens_per_iter = args.batch_size * max_seq_len
    torch.manual_seed(1337)
    device_type = "cuda" if "cuda" in args.device else "cpu"

    args.wandb_run_name = f"MiniMind-Pretrain-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"

    ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()

    ddp = int(os.environ.get("RANK", -1)) != -1  # is this a ddp run?
    ddp_local_rank, DEVICE = 0, "cuda:0"
    if ddp:
        init_distributed_mode()
        args.device = torch.device(DEVICE)

    if args.use_wandb and (not ddp or ddp_local_rank == 0):
        import wandb

        wandb.init(project=args.wandb_project, name=args.wandb_run_name)
    else:
        wandb = None

    model, tokenizer = init_model()
    df = pd.read_csv(args.data_path)
    df = df.sample(frac=1.0)
    train_ds = PretrainDataset(df, tokenizer, max_length=max_seq_len)
    train_sampler = DistributedSampler(train_ds) if ddp else None
    train_loader = DataLoader(
        train_ds,
        batch_size=args.batch_size,
        pin_memory=True,
        drop_last=False,
        shuffle=False,
        num_workers=args.num_workers,
        sampler=train_sampler
    )

    scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)

    if False and platform.system() != 'Windows' and float(torch.__version__.split('.')[0]) >= 2:
        Logger("compiling the model... (takes a ~minute)")
        unoptimized_model = model
        model = torch.compile(model)

    if ddp:
        model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
        model = DistributedDataParallel(model, device_ids=[ddp_local_rank])

    iter_per_epoch = len(train_loader)
    for epoch in range(args.epochs):
        train_epoch(epoch, wandb)

```

开始训练

<center><img src="https://ml2022.oss-cn-hangzhou.aliyuncs.com/img/0d46acea891701fd573115782288e05.jpg" alt="0d46acea891701fd573115782288e05" style="zoom:50%;" />

训练结束

<center><img src="https://ml2022.oss-cn-hangzhou.aliyuncs.com/img/9f1b2321dc79550b1e0f194e165ff22.png" alt="9f1b2321dc79550b1e0f194e165ff22" style="zoom:50%;" />

## 测试

测试运行（单字符预测）

In [11]:
# 导入必要的模块
import torch

import sys
sys.path.append('..')

from model.model import Transformer  # 确保路径正确
from model.LMConfig import LMConfig   # 导入 LMConfig

In [12]:
# 创建配置对象
lm_config = LMConfig(
    dim=512,                # 模型的维度
    n_layers=8,            # 层数
    n_heads=16,            # 注意力头数
    vocab_size=6400,       # 词汇表大小
    max_seq_len=512,       # 最大序列长度
    dropout=0.1            # Dropout 概率
    # 这里可以添加更多配置，根据需要进行调整
)

In [13]:
# 初始化 Transformer 模型
model = Transformer(lm_config)

In [14]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [15]:
device                                

device(type='cuda')

In [16]:
model.to(device)

# 检查模型结构和参数
print(model)

Transformer(
  (tok_embeddings): Embedding(6400, 512)
  (dropout): Dropout(p=0.1, inplace=False)
  (layers): ModuleList(
    (0-7): 8 x TransformerBlock(
      (attention): Attention(
        (wq): Linear(in_features=512, out_features=512, bias=False)
        (wk): Linear(in_features=512, out_features=256, bias=False)
        (wv): Linear(in_features=512, out_features=256, bias=False)
        (wo): Linear(in_features=512, out_features=512, bias=False)
        (attn_dropout): Dropout(p=0.1, inplace=False)
        (resid_dropout): Dropout(p=0.1, inplace=False)
      )
      (attention_norm): RMSNorm()
      (ffn_norm): RMSNorm()
      (feed_forward): FeedForward(
        (w1): Linear(in_features=512, out_features=1408, bias=False)
        (w2): Linear(in_features=1408, out_features=512, bias=False)
        (w3): Linear(in_features=512, out_features=1408, bias=False)
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (norm): RMSNorm()
  (output): Linear(in_features=512, 

In [18]:
# 加载模型权重
model.load_state_dict(torch.load('../out/pretrain_512.pth', map_location=device))
model.eval()  # 切换到评估模式

Transformer(
  (tok_embeddings): Embedding(6400, 512)
  (dropout): Dropout(p=0.1, inplace=False)
  (layers): ModuleList(
    (0-7): 8 x TransformerBlock(
      (attention): Attention(
        (wq): Linear(in_features=512, out_features=512, bias=False)
        (wk): Linear(in_features=512, out_features=256, bias=False)
        (wv): Linear(in_features=512, out_features=256, bias=False)
        (wo): Linear(in_features=512, out_features=512, bias=False)
        (attn_dropout): Dropout(p=0.1, inplace=False)
        (resid_dropout): Dropout(p=0.1, inplace=False)
      )
      (attention_norm): RMSNorm()
      (ffn_norm): RMSNorm()
      (feed_forward): FeedForward(
        (w1): Linear(in_features=512, out_features=1408, bias=False)
        (w2): Linear(in_features=1408, out_features=512, bias=False)
        (w3): Linear(in_features=512, out_features=1408, bias=False)
        (dropout): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (norm): RMSNorm()
  (output): Linear(in_features=512, 

In [19]:
# 准备输入文本
input_text = "你好，好久不"
input_ids = tokenizer.encode(input_text, return_tensors='pt').to(device)

In [20]:
# 生成输出
with torch.no_grad():
    output = model(input_ids)
    generated_ids = output.logits.argmax(dim=-1)  # 假设使用 argmax 选择输出
    generated_text = tokenizer.decode(generated_ids[0])

print(generated_text)

见


测试运行（多字符预测）

In [21]:
# 准备输入文本
input_text = "长江、"
input_ids = tokenizer.encode(input_text, return_tensors='pt').to(device)

In [24]:
# 生成多个 token
num_tokens_to_generate = 2  # 要生成的 token 数量
generated_tokens = []

with torch.no_grad():
    for _ in range(num_tokens_to_generate):
        output = model(input_ids)
        next_token = output.logits.argmax(dim=-1)[:, -1]  # 获取最后一个 token 的预测
        generated_tokens.append(next_token.item())  # 将 token ID 添加到列表中
        input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)  # 将新 token 添加到输入中

# 将生成的 token IDs 转换为文本
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)

# 打印最终回复
print(generated_text)

黄河


In [30]:
# 准备输入文本
input_text = "中国"
input_ids = tokenizer.encode(input_text, return_tensors='pt').to(device)

# 生成多个 token
num_tokens_to_generate = 100  # 要生成的 token 数量
generated_tokens = []

with torch.no_grad():
    for _ in range(num_tokens_to_generate):
        output = model(input_ids)
        next_token = output.logits.argmax(dim=-1)[:, -1]  # 获取最后一个 token 的预测
        generated_tokens.append(next_token.item())  # 将 token ID 添加到列表中
        input_ids = torch.cat([input_ids, next_token.unsqueeze(0)], dim=1)  # 将新 token 添加到输入中

# 将生成的 token IDs 转换为文本
generated_text = tokenizer.decode(generated_tokens, skip_special_tokens=True)

# 打印最终回复
print(generated_text)

共产党的领导下，中国共产党人坚定不移走中国特色社会主义道路，为实现中华民族伟大复兴的中国梦而不懈奋斗。2、在使用过程中，应注意以下几点：
1、在使用过程中，应注意避免阳光直射，以免灼伤皮肤。
2、在使用过程中，应注意避免阳光直射，以免�


---

## Eval Pretrain's Model

In [32]:
import random
import time

import numpy as np
import torch
import warnings
from transformers import AutoTokenizer, AutoModelForCausalLM
from model.model import Transformer
from model.LMConfig import LMConfig

warnings.filterwarnings('ignore')


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


def init_model(lm_config):
    tokenizer = AutoTokenizer.from_pretrained('../model/mateconv_tokenizer')
    model_from = 1  # 1从权重，2用transformers

    if model_from == 1:
        moe_path = '_moe' if lm_config.use_moe else ''
        ckp = f'../out/pretrain_{lm_config.dim}{moe_path}.pth'

        model = Transformer(lm_config)
        state_dict = torch.load(ckp, map_location=device)

        # 处理不需要的前缀
        unwanted_prefix = '_orig_mod.'
        for k, v in list(state_dict.items()):
            if k.startswith(unwanted_prefix):
                state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)

        for k, v in list(state_dict.items()):
            if 'mask' in k:
                del state_dict[k]

        # 加载到模型中
        model.load_state_dict(state_dict, strict=False)
    else:
        model = AutoModelForCausalLM.from_pretrained('minimind', trust_remote_code=True)
    model = model.to(device)

    print(f'模型参数: {count_parameters(model) / 1e6} 百万 = {count_parameters(model) / 1e9} B (Billion)')
    return model, tokenizer


def setup_seed(seed):
    random.seed(seed)  # 设置 Python 的随机种子
    np.random.seed(seed)  # 设置 NumPy 的随机种子
    torch.manual_seed(seed)  # 设置 PyTorch 的随机种子
    torch.cuda.manual_seed(seed)  # 为当前 GPU 设置随机种子（如果有）
    torch.cuda.manual_seed_all(seed)  # 为所有 GPU 设置随机种子（如果有）
    torch.backends.cudnn.deterministic = True  # 确保每次返回的卷积算法是确定的
    torch.backends.cudnn.benchmark = False  # 关闭 cuDNN 的自动调优，避免不确定性


if __name__ == "__main__":
    # -----------------------------------------------------------------------------
    out_dir = 'out'
    start = ""
    temperature = 0.7
    top_k = 8
    setup_seed(1337)
    # device = 'cpu'
    device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
    dtype = 'bfloat16'
    max_seq_len = 512
    lm_config = LMConfig()
    lm_config.max_seq_len = max_seq_len
    # -----------------------------------------------------------------------------

    model, tokenizer = init_model(lm_config)
    model = model.eval()
    # int(input('输入0自动测试，输入1问题测试：'))
    answer_way = 0
    stream = True

    prompt_datas = [
        '椭圆和圆的区别',
        '中国关于马克思主义基本原理',
        '人类大脑的主要功能是',
        '万有引力是',
        '世界上人口最多的国家是',
        'DNA的全称是',
        '数学中π的值大约是',
        '世界上最高的山峰是',
        '太阳系中最大的行星是',
        '二氧化碳的化学分子式是',
        '地球上最大的动物是',
        '地球自转一圈大约需要',
        '杭州市的美食有',
        '江苏省的最好的大学',
    ]

    qa_index = 0
    while True:
        start = time.time()
        if answer_way == 1:
            # run generation
            prompt = input('用户：')
        else:
            if qa_index >= len(prompt_datas):
                break
            prompt = prompt_datas[qa_index]
            print('问题：', prompt)
            qa_index += 1

        prompt = tokenizer.bos_token + prompt
        x = tokenizer(prompt).data['input_ids']
        x = (torch.tensor(x, dtype=torch.long, device=device)[None, ...])

        with torch.no_grad():
            res_y = model.generate(x, tokenizer.eos_token_id, max_new_tokens=max_seq_len, temperature=temperature,
                                   top_k=top_k, stream=stream)
            print('回答：', end='')
            try:
                y = next(res_y)
            except StopIteration:
                print("No answer")
                continue

            history_idx = 0
            while y != None:
                answer = tokenizer.decode(y[0].tolist())
                if answer and answer[-1] == '�':
                    try:
                        y = next(res_y)
                    except:
                        break
                    continue
                # print(answer)
                if not len(answer):
                    try:
                        y = next(res_y)
                    except:
                        break
                    continue

                print(answer[history_idx:], end='', flush=True)
                try:
                    y = next(res_y)
                except:
                    break
                history_idx = len(answer)
                if not stream:
                    break

            print('\n')

        end = time.time()
        print(end - start, 's')


模型参数: 26.878464 百万 = 0.026878464 B (Billion)
问题： 椭圆和圆的区别
回答：是很大的区别。
椭圆的区别
椭圆的区别是椭圆的区别，椭圆的区别在于椭圆的区别。椭圆形状是椭圆形，椭圆形是椭圆形，椭圆形是椭圆形，椭圆形是椭圆形。椭圆形的区别在于椭圆形的区别。椭圆形椭圆形的区别在于椭圆形的区别。椭圆形的区别是椭圆形状的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别就是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形状的区别。椭圆形的区别是椭圆形状的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别主要是椭圆形的区别，椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别。椭圆形的区别是椭圆形的区别

8.02924919128418 s
问题： 中国关于马克思主义基本原理
回答：论坛”在北京举行。

0.140608549118042 s
问题： 人类大脑的主要功能是
回答：通过脑细胞和脑神经的功能来实现，而大脑神经是大脑神经的一个系统，而神经是大脑神经的发育和生理过程。
1、神经递质
神经递质是人类大脑神经递质中重要成分，是人脑神经递质的主要成分。当神经递质过多时，会影响到神经递质，从而诱发神经递质的增加，从而导致大脑神经递质的增多，从而加重神经递质的增加。
2、神经递质
神经递质是神经递质，是神经递质中一种，也是神经递质中一部分的一种。神经递质在神经递质中起着重要作用，当神经递质过多时，会影响神经递质的增多，从而影响到神经递质的进展，导致神经递质增多，从而诱发神经递质的增加。
3、神经递质
神经递质是神经递质的一种，它是一种神经递质，是一种多态的物质，在神经递质的作用下，会释放出来一种叫做神经递质的物质。
4、神经递质
神经递质可以促进神经递质的增多，从而诱发神经递质增多。

5.374856948852539 s
问题： 万有引力是
回答：地球上最大的引力，它

- 备用代码

data_process.py

In [None]:
import csv
import itertools
import re
import json
import jsonlines
import psutil
import ujson
import numpy as np
import pandas as pd
from transformers import AutoTokenizer
from datasets import load_dataset

bos_token = "<s>"
eos_token = "</s>"


def pretrain_process(chunk_size=50000):
    chunk_idx = 0

    with jsonlines.open('./dataset/mobvoi_seq_monkey_general_open_corpus/mobvoi_seq_monkey_general_open_corpus.jsonl') as reader:
        with open('./dataset/pretrain_data_demo.csv', 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['text'])

            while True:
                chunk = list(itertools.islice(reader, chunk_size))
                if not chunk:
                    break

                for idx, obj in enumerate(chunk):
                    try:
                        content = obj.get('text', '')
                        if len(content) > 512:
                            continue
                        writer.writerow([content])
                    except UnicodeDecodeError as e:
                        print(f"Skipping invalid line {chunk_idx * chunk_size + idx + 1}: {e}")
                        continue
                chunk_idx += 1
                print('chunk:', ((chunk_idx - 1) * chunk_size, chunk_idx * chunk_size), 'process end')


def sft_process(contain_history=False):
    file_name = 'sft_data.csv'
    if not contain_history:
        file_name = 'sft_data_single.csv'

    def chinese_ratio(text):
        # 匹配所有中文字符
        chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)
        # 中文字符数量占比
        return len(chinese_chars) / len(text) if text else 0

    def process_and_write_data(data):
        q_lst, a_lst, history_lst = [], [], []
        for per in data:
            history, q, a = per['history'], per['q'], per['a']

            if (contain_history and not history) or not q or not a:
                continue
            if len(q) < 10 or len(a) < 5:
                continue
            if len(q) > 512 or len(a) > 512:
                continue
            # 判断q和a中中文字符占比是否超过70%
            if not (chinese_ratio(q) > 0.5 and chinese_ratio(a) > 0.5):
                continue

            q_lst.append(q)
            a_lst.append(a)
            if contain_history:
                history_lst.append(history)
            else:
                history_lst.append([])

        # 创建DataFrame并追加到CSV文件
        df = pd.DataFrame({'history': history_lst, 'q': q_lst, 'a': a_lst})
        # # 1、默认
        # df.to_csv(f'./dataset/{file_name}', mode='a', header=False, index=False, lineterminator='\r\n', encoding='utf-8')
        # 2、若遇到数据 `_csv.Error: need to escape, but no escapechar set` 问题，可加 escapechar='\\' 参数：
        df.to_csv(f'./dataset/{file_name}', mode='a', header=False, index=False, lineterminator='\r\n', escapechar='\\',
                  encoding='utf-8')

    chunk_size = 1000  # 每次处理的记录数
    data = []

    with open(f'./dataset/{file_name}', 'w', encoding='utf-8') as f:
        f.write('history,q,a\n')

    sft_datasets = ['./dataset/sft_data_zh.jsonl']
    if not contain_history:
        sft_datasets = ['./dataset/sft_data_zh.jsonl']

    chunk_num = 0
    for path in sft_datasets:
        with jsonlines.open(path) as reader:
            for idx, obj in enumerate(reader):
                try:
                    data.append({
                        'history': obj.get('history', ''),
                        'q': obj.get('input', '') + obj.get('q', ''),
                        'a': obj.get('output', '') + obj.get('a', '')
                    })

                    if len(data) >= chunk_size:
                        chunk_num += 1
                        process_and_write_data(data)
                        data = []
                        if chunk_num % 100 == 0:
                            print(f'chunk:{chunk_num} process end')
                except jsonlines.InvalidLineError as e:
                    print(f"Skipping invalid JSON line {idx + 1}: {e}")
                    continue

            if data:
                process_and_write_data(data)
                data = []


def rl_process():
    ################
    # Dataset
    ################

    dataset_paths = [
        './dataset/dpo/dpo_zh_demo.json',
        './dataset/dpo/dpo_train_data.json',
        './dataset/dpo/huozi_rlhf_data.json',
    ]

    train_dataset = load_dataset('json', data_files=dataset_paths)

    merged_data = []
    for split in train_dataset.keys():
        merged_data.extend(train_dataset[split])

    with open('./dataset/dpo/train_data.json', 'w', encoding='utf-8') as f:
        json.dump(merged_data, f, ensure_ascii=False, indent=4)


if __name__ == "__main__":
    tokenizer = AutoTokenizer.from_pretrained('./model/mateconv_tokenizer', use_fast=False)
    print('tokenizer词表大小：', len(tokenizer))

    ################
    # 1: pretrain
    # 2: sft
    # 3: RL
    ################
    process_type = 1

    if process_type == 1:
        pretrain_process()
    if process_type == 2:
        sft_process(contain_history=False)
    if process_type == 3:
        rl_process() 


pretrain.py

In [None]:
import os
import platform
import argparse
import time
import math
import warnings

import pandas as pd
import torch
import torch.distributed as dist
from torch import optim
from torch.nn.parallel import DistributedDataParallel
from torch.optim.lr_scheduler import CosineAnnealingLR
from torch.utils.data import DataLoader, DistributedSampler
from contextlib import nullcontext

from transformers import AutoTokenizer

from model.model import Transformer
from model.LMConfig import LMConfig
from model.dataset import PretrainDataset

warnings.filterwarnings('ignore')


def Logger(content):
    if not ddp or dist.get_rank() == 0:
        print(content)


def get_lr(it, all):
    warmup_iters = args.warmup_iters
    lr_decay_iters = all
    min_lr = args.learning_rate / 10

    if it < warmup_iters:
        return args.learning_rate * it / warmup_iters
    if it > lr_decay_iters:
        return min_lr
    decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)
    assert 0 <= decay_ratio <= 1
    coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))
    return min_lr + coeff * (args.learning_rate - min_lr)


def train_epoch(epoch, wandb):
    start_time = time.time()
    for step, (X, Y, loss_mask) in enumerate(train_loader):
        X = X.to(args.device)
        Y = Y.to(args.device)
        loss_mask = loss_mask.to(args.device)

        lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

        with ctx:
            out = model(X, Y)
            loss = out.last_loss / args.accumulation_steps
            loss_mask = loss_mask.view(-1)
            loss = torch.sum(loss * loss_mask) / loss_mask.sum()

        scaler.scale(loss).backward()

        if (step + 1) % args.accumulation_steps == 0:
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)

            scaler.step(optimizer)
            scaler.update()

            optimizer.zero_grad(set_to_none=True)

        if step % args.log_interval == 0:
            spend_time = time.time() - start_time
            Logger(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.7f} epoch_Time:{}min:'.format(
                    epoch,
                    args.epochs,
                    step,
                    iter_per_epoch,
                    loss.item() * args.accumulation_steps,
                    optimizer.param_groups[-1]['lr'],
                    spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))

            if (wandb is not None) and (not ddp or dist.get_rank() == 0):
                wandb.log({"loss": loss.item() * args.accumulation_steps,
                           "lr": optimizer.param_groups[-1]['lr'],
                           "epoch_Time": spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60})

        if (step + 1) % args.save_interval == 0 and (not ddp or dist.get_rank() == 0):
            model.eval()
            moe_path = '_moe' if lm_config.use_moe else ''
            ckp = f'{args.save_dir}/pretrain_{lm_config.dim}{moe_path}.pth'

            if isinstance(model, torch.nn.parallel.DistributedDataParallel):
                state_dict = model.module.state_dict()
            else:
                state_dict = model.state_dict()

            torch.save(state_dict, ckp)
            model.train()


def init_model():
    def count_parameters(model):
        return sum(p.numel() for p in model.parameters() if p.requires_grad)

    tokenizer = AutoTokenizer.from_pretrained('./model/minimind_tokenizer')

    model = Transformer(lm_config).to(args.device)
    # moe_path = '_moe' if lm_config.use_moe else ''

    Logger(f'LLM总参数量：{count_parameters(model) / 1e6:.3f} 百万')
    return model, tokenizer


def init_distributed_mode():
    if not ddp: return
    global ddp_local_rank, DEVICE

    dist.init_process_group(backend="nccl")
    ddp_rank = int(os.environ["RANK"])
    ddp_local_rank = int(os.environ["LOCAL_RANK"])
    ddp_world_size = int(os.environ["WORLD_SIZE"])
    DEVICE = f"cuda:{ddp_local_rank}"
    torch.cuda.set_device(DEVICE)


# torchrun --nproc_per_node 2 1-pretrain.py
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="MiniMind Pretraining")
    parser.add_argument("--out_dir", type=str, default="out", help="Output directory")
    parser.add_argument("--epochs", type=int, default=20, help="Number of epochs")
    parser.add_argument("--batch_size", type=int, default=64, help="Batch size")
    parser.add_argument("--learning_rate", type=float, default=2e-4, help="Learning rate")
    parser.add_argument("--device", type=str, default="cuda:0" if torch.cuda.is_available() else "cpu",
                        help="Device to use")
    parser.add_argument("--dtype", type=str, default="bfloat16", help="Data type")
    parser.add_argument("--use_wandb", action="store_true", help="Use Weights & Biases")
    parser.add_argument("--wandb_project", type=str, default="MiniMind-Pretrain", help="Weights & Biases project name")
    parser.add_argument("--num_workers", type=int, default=1, help="Number of workers for data loading")
    parser.add_argument("--data_path", type=str, default="./dataset/pretrain_data.csv", help="Path to training data")
    parser.add_argument("--ddp", action="store_true", help="Use DistributedDataParallel")
    parser.add_argument("--accumulation_steps", type=int, default=8, help="Gradient accumulation steps")
    parser.add_argument("--grad_clip", type=float, default=1.0, help="Gradient clipping threshold")
    parser.add_argument("--warmup_iters", type=int, default=0, help="Number of warmup iterations")
    parser.add_argument("--log_interval", type=int, default=100, help="Logging interval")
    parser.add_argument("--save_interval", type=int, default=1000, help="Model saving interval")
    parser.add_argument('--local_rank', type=int, default=-1, help='local rank for distributed training')

    args = parser.parse_args()

    lm_config = LMConfig()
    max_seq_len = lm_config.max_seq_len
    args.save_dir = os.path.join(args.out_dir)
    os.makedirs(args.save_dir, exist_ok=True)
    os.makedirs(args.out_dir, exist_ok=True)
    tokens_per_iter = args.batch_size * max_seq_len
    torch.manual_seed(1337)
    device_type = "cuda" if "cuda" in args.device else "cpu"

    args.wandb_run_name = f"MiniMind-Pretrain-Epoch-{args.epochs}-BatchSize-{args.batch_size}-LearningRate-{args.learning_rate}"

    ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()

    ddp = int(os.environ.get("RANK", -1)) != -1  # is this a ddp run?
    ddp_local_rank, DEVICE = 0, "cuda:0"
    if ddp:
        init_distributed_mode()
        args.device = torch.device(DEVICE)

    if args.use_wandb and (not ddp or ddp_local_rank == 0):
        import wandb

        wandb.init(project=args.wandb_project, name=args.wandb_run_name)
    else:
        wandb = None

    model, tokenizer = init_model()
    df = pd.read_csv(args.data_path)
    df = df.sample(frac=1.0)
    train_ds = PretrainDataset(df, tokenizer, max_length=max_seq_len)
    train_sampler = DistributedSampler(train_ds) if ddp else None
    train_loader = DataLoader(
        train_ds,
        batch_size=args.batch_size,
        pin_memory=True,
        drop_last=False,
        shuffle=False,
        num_workers=args.num_workers,
        sampler=train_sampler
    )

    scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate)

    if False and platform.system() != 'Windows' and float(torch.__version__.split('.')[0]) >= 2:
        Logger("compiling the model... (takes a ~minute)")
        unoptimized_model = model
        model = torch.compile(model)

    if ddp:
        model._ddp_params_and_buffers_to_ignore = {"pos_cis"}
        model = DistributedDataParallel(model, device_ids=[ddp_local_rank])

    iter_per_epoch = len(train_loader)
    for epoch in range(args.epochs):
        train_epoch(epoch, wandb)
