In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
import os
device = "cuda"

In [None]:
model_dir = './0.5B-trained'

model = None
# 加载模型和分词器
if os.path.exists(model_dir):
    model = AutoModelForCausalLM.from_pretrained(model_dir)
else:
    model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen1.5-0.5B-Chat", torch_dtype="auto", device_map="auto")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen1.5-0.5B-Chat")

In [None]:
from typing import Dict
import torch
from torch.utils.data import Dataset
from transformers.trainer_pt_utils import LabelSmoother

IGNORE_TOKEN_ID = LabelSmoother.ignore_index  # 设置忽略令牌的ID，用于损失计算时忽略

def preprocess(messages, tokenizer, max_len):
    print("preprocessing")
    
    texts = []
    for message in messages:
        # 将对话格式应用于每组消息
        texts.append(
            tokenizer.apply_chat_template(
                message,
                tokenize=True,
                add_generation_prompt=False,
                padding=True,
                max_length=max_len,
                truncation=True,
            )
        )
    input_ids = torch.tensor(texts, dtype=torch.long)
    target_ids = input_ids.clone()
    target_ids[target_ids == tokenizer.pad_token_id] = IGNORE_TOKEN_ID
    attention_mask = input_ids.ne(tokenizer.pad_token_id)
    
    return dict(
        input_ids=input_ids, target_ids=target_ids, attention_mask=attention_mask
    )

class SupervisedDataset(Dataset):
    def __init__(self, raw_data, tokenizer, max_len):
        messages = [example["messages"] for example in raw_data]
        data_dict = preprocess(messages, tokenizer, max_len)

        self.input_ids = data_dict["input_ids"]
        self.target_ids = data_dict["target_ids"]
        self.attention_mask = data_dict["attention_mask"]

        
    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, i) -> Dict[str, torch.Tensor]:
        return dict(
            input_ids=self.input_ids[i],
            labels=self.target_ids[i],
            attention_mask=self.attention_mask[i],
        )

class KnowledgeDataset(Dataset):
    def __init__(self, raw_data, tokenizer, max_len):
        texts = tokenizer(raw_data, padding=True, truncation=True, return_tensors="pt", max_length=max_len)
        print('总Token数：', texts.input_ids.numel())
        self.input_ids = texts.input_ids
        self.target_ids = self.input_ids.clone()
        self.target_ids[self.target_ids == tokenizer.pad_token_id] = IGNORE_TOKEN_ID
        self.attention_mask = texts.attention_mask

        
    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, i) -> Dict[str, torch.Tensor]:
        return dict(
            input_ids=self.input_ids[i],
            labels=self.target_ids[i],
            attention_mask=self.attention_mask[i],
        )


In [None]:
#=======================
# 发起对话
#=======================
# 对话内容
prompt = ""
messages = [
    {"role": "system", "content": "你是一个有用的助手。"},
    {"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(device)
generated_ids = model.generate(
    model_inputs.input_ids,
    max_new_tokens=512
)
generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
print('====用户输入====')
print(prompt)
print('====模型回复====')
print(response)

In [None]:
#=======================
# 训练模型
#=======================
# 训练对话与回答
raw_data = [
    {"messages": [
        {"role": "system", "content": "你是一个有用的助手。"},
        {"role": "user", "content": "Maeiee是谁？"},
        {"role": "assistant", "content": "Maeiee是我的好朋友！"}
    ]},
    # 更多的对话实例...
]

train_dataset = SupervisedDataset(raw_data, tokenizer, 512)
# 增量训练模型
# 注意：你需要根据你的实际训练环境调整此部分
training_args = TrainingArguments(
    output_dir='./results',          # 输出目录
    num_train_epochs=1,              # 总训练轮次
    per_device_train_batch_size=1,   # 每个设备的批大小
    warmup_steps=0,                # 预热步骤
    weight_decay=0.01,               # 权重衰减
    logging_dir='./logs',            # 日志目录
)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,  # 使用新的训练数据
    # 这里可能还需要一个评估数据集
)
trainer.train()

In [None]:
#=======================
# 知识注入
#=======================
# 训练对话与回答
raw_data = \
"""
"""

train_dataset = KnowledgeDataset(raw_data, tokenizer, 512)
# 增量训练模型
# 注意：你需要根据你的实际训练环境调整此部分
training_args = TrainingArguments(
    output_dir='./results',          # 输出目录
    num_train_epochs=1,              # 总训练轮次
    per_device_train_batch_size=1,   # 每个设备的批大小
    warmup_steps=0,                # 预热步骤
    weight_decay=0.01,               # 权重衰减
    logging_dir='./logs',            # 日志目录
)
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,  # 使用新的训练数据
    # 这里可能还需要一个评估数据集
)
trainer.train()

In [None]:
model.save_pretrained(model_dir)