In [None]:
import torch
import json
from peft import LoraConfig,TaskType,get_peft_model
from datasets import Dataset
import itertools
import numpy as np
import pprint
from transformers import AutoTokenizer,AutoModelForCausalLM,Trainer,DataCollatorForSeq2Seq,TrainingArguments

In [None]:
# 加载模型和分词器
model_path="D:\Program Projects\Python Projects\DB-GPT\models\Qwen2-0.5B"

tokenizer = AutoTokenizer.from_pretrained(model_path, use_fast=False, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_path, trust_remote_code=True, torch_dtype=torch.half, device_map="auto")
model.config.pad_token_id=tokenizer.pad_token_id
# 这是模型决定的结束的词元token和它的id
print(f"eos_token: {tokenizer.eos_token}, id: {tokenizer.eos_token_id}") 
print(f"pad_token: {tokenizer.pad_token}, id: {tokenizer.pad_token_id}") 


In [None]:
# 加载训练数据
data=[]
data_path = "../huanhuan.json"
with open(data_path) as f:
    data = json.load(f)
data

In [None]:
# 转换成Dataset
train_dataset = Dataset.from_list(data)
train_dataset

In [None]:
# 打印几条看看
data_temp = itertools.islice(train_dataset,5)
for i in data_temp:
    print(i)

In [None]:
# 定义推理函数
def inference(model,tokenizer,text):
    model.eval() # 推理模式
    text = "\n".join(["<|system|>", "现在你要扮演皇帝身边的女人--甄嬛", "<|user|>", text + "<|assistant|>"]).strip()+ "\n"
    input_ids = tokenizer.encode(text,return_tensors='pt',truncation=True,max_length=1024)
    device = model.device
    generated_tokens=model.generate(
        input_ids=input_ids.to(device), 
        max_length=512, # 生成更过内容时，需要更多时间
        pad_token_id=tokenizer.pad_token_id,
        temperature=0.5,
        num_return_sequences=1
        
    )

    generated_text_with_prompt = tokenizer.batch_decode(generated_tokens,skip_special_tokens=True)
    generated_text_answer=generated_text_with_prompt[0][len(text):]
    return generated_text_answer

In [None]:
text = "你会干什么"
pprint.pprint(inference(model,tokenizer,text))

In [None]:
# 分词编码及预处理
def process_func(example):

    # 定义列表
    input_ids, labels = [], []


    # 设置编码最长编码
    MAX_LENGTH = 512 


    # instruction文本、编码
    text_instruction = "\n".join(["<|system|>", "现在你要扮演皇帝身边的女人--甄嬛", "<|user|>", example["instruction"][0] + example["input"][0] + "<|assistant|>"]).strip()+ "\n"
    instruction = tokenizer.encode(text=text_instruction,add_special_tokens=True, truncation=True, max_length=MAX_LENGTH)


    # response文本、编码
    text_response = example["output"][0]
    response = tokenizer.encode(text=text_response, add_special_tokens=False, truncation=True,max_length=MAX_LENGTH)
    


    # input_ids编码 = 指令 + 回复 + 结尾
    input_ids = instruction + response + [tokenizer.eos_token_id]

    
    # labels编码 = 填充（长度等于指令） + 回复 + 结尾
    labels = [tokenizer.pad_token_id] * len(instruction) + response + [tokenizer.eos_token_id]



    # 计算需要填充的长度512 - 64 = 448
    pad_len = MAX_LENGTH - len(input_ids) # 计算需要填充的长度
    

    # input_ids、labels 分别加上填充id，一起组成512长度的固定编码
    input_ids = input_ids + [tokenizer.pad_token_id] * pad_len
    labels = labels + [tokenizer.pad_token_id] * pad_len

    # 处理labels
    labels = [(l if l != tokenizer.pad_token_id else -100) for l in labels]
    
    # 编码返回到数据集
    example["input_ids"]=np.array([input_ids])
    example["labels"]=np.array([labels])
    
    return example


In [None]:
# map处理数据集
data=train_dataset.map(process_func,batched=True,batch_size=1,drop_last_batch=True)
split_dataset = data.train_test_split(test_size=0.1,shuffle=True,seed=123)
print(split_dataset)

In [None]:
# peft微调


config = LoraConfig(
    task_type=TaskType.CAUSAL_LM, 
    target_modules=["q_proj","up_proj","o_proj","k_proj","down_proj","gate_proj","v_proj"],
    inference_mode=False, # 训练模式
    r=128, # Lora 秩
    lora_alpha=32, # Lora alaph，具体作用参见 Lora 原理
    lora_dropout=0.9, # Dropout 比例
    bias='none'
)

model = get_peft_model(model,config) # 生成了一个Lora模型
model.print_trainable_parameters()



In [None]:
training_args = TrainingArguments(
    output_dir="output_dir/bigscience/mt0-large-lora",
    learning_rate=1e-3,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    num_train_epochs=100,
    max_steps=100,
    weight_decay=0.01,
    evaluation_strategy="steps",
    save_strategy="steps",
    load_best_model_at_end=True,
)
data_collator = DataCollatorForSeq2Seq(
    tokenizer,
    model=model,
    label_pad_token_id=-100,
    pad_to_multiple_of=None,
    padding=False
)



In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=split_dataset["train"],
    eval_dataset=split_dataset["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    # compute_metrics=compute_metrics,
)

trainer.train()
model.save_pretrained("output_dir")

In [None]:
# 训练后再推理
model.eval()
text = "你会干什么"
pprint.pprint(inference(model,tokenizer,text))

In [None]:
# 重新加载
from peft import AutoPeftModelForCausalLM

model = AutoPeftModelForCausalLM.from_pretrained("output_dir")
tokenizer = AutoTokenizer.from_pretrained(model_path)
model.eval()
model = model.cuda()
inputs = tokenizer("<|system|>\n现在你要扮演皇帝身边的女人--甄嬛\n<|user|>\n {}\n{}".format("你是谁？", "").strip() + "<|assistant|>\n", return_tensors="pt").to(model.device)
# tokenizer.decode(model.generate(**inputs, max_length=128, do_sample=True)[0], skip_special_tokens=True)

# peft官方
outputs = model.generate(input_ids=inputs["input_ids"].to("cuda"), max_new_tokens=128)
print(tokenizer.batch_decode(outputs.detach().cpu().numpy(), skip_special_tokens=True)[0])
