<a href="https://colab.research.google.com/github/yichen-qi/LLM_Learn/blob/main/model_finetunning_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LLM模型微调，训练，测试，调用

## 依赖库

In [None]:
!pip install transformers datasets
import torch
from transformers import BertTokenizer, BertModel
from datasets import load_dataset

print("Torch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())


### 导入依赖所有库

In [None]:
from datasets import load_dataset
from torch.utils.data import Dataset
from transformers import BertModel
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer
from torch.optim import AdamW


## 制作Dataset

In [None]:
class CustomDataset(Dataset):
    def __init__(self, split):
        self.dataset = load_dataset("lansinuote/ChnSentiCorp")

        if split == 'train':
            self.data = self.dataset['train']
        elif split == 'validation':
            self.data = self.dataset['validation']
        elif split == 'test':
            self.data = self.dataset['test']
        else:
            raise ValueError('Invalid split')
    def __len__(self):
        return len(self.data)

    def __getitem__(self, item):
        return self.data[item]['text'], self.data[item]['label']

if __name__ == '__main__':
    train_dataset = CustomDataset('train')
    print(len(train_dataset))
    print(train_dataset[0])

## 下游任务模型设计

### 加载预训练模型

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

bert_model = BertModel.from_pretrained("google-bert/bert-base-chinese")
bert_model.to(device)

### 定义下游任务模型(将主干网络提取的特征进行分类)


In [None]:
class Model(torch.nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.fc = torch.nn.Linear(768, 2)

    def forward(self, input_ids, attention_mask, token_type_ids):
        #上游不参与训练
        with torch.no_grad():
            out = bert_model(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)

        #下游参与训练

        out = self.fc(out.last_hidden_state[:,0])
        out = out.softmax(dim=1)
        return out

### 自定义模型微调

In [None]:
token = BertTokenizer.from_pretrained("google-bert/bert-base-chinese")
epochs = 100

#数据编码处理
def collate_fn(data):
    sentences = [item[0] for item in data]
    labels = [item[1] for item in data]

    data = token.batch_encode_plus(
        sentences,
        padding="max_length",
        max_length=350,
        truncation=True,
        return_tensors="pt",
        return_length=True
    )
    input_ids = data["input_ids"]
    attention_mask = data["attention_mask"]
    token_type_ids = data["token_type_ids"]
    labels = torch.LongTensor(labels)
    return input_ids, attention_mask, token_type_ids, labels

train_dataset = CustomDataset('train')

train_loader = DataLoader(
    dataset = train_dataset,
    batch_size = 32,
    shuffle = True,
    drop_last = True,
    collate_fn = collate_fn
)

print("Using device:", device)

if __name__ == '__main__':
    print(device)
    my_model = Model().to(device)

    optimizer = AdamW(my_model.parameters(), lr=5e-4)

    loss_func = nn.CrossEntropyLoss()

    my_model.train()

    for epoch in range(epochs):
        for step, (input_ids, attention_mask, token_type_ids, labels) in enumerate(train_loader):
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            token_type_ids = token_type_ids.to(device)
            labels = labels.to(device)

            out = my_model(input_ids, attention_mask, token_type_ids)

            loss = loss_func(out, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if step % 10 == 0:
                out = out.argmax(dim=1)
                acc = (out == labels).sum().item() / len(labels)
                print(f"Epoch: {epoch}, Step: {step}, Loss: {loss.item()}, Acc: {acc}")

        torch.save(my_model.state_dict(), f"params/{epoch}bert.pt")
        print(f"Epoch: {epoch}, Save model params")


## 模型性能测试


In [None]:
test_dataset = CustomDataset('test')

test_loader = DataLoader(
    dataset = test_dataset,
    batch_size = 32,
    shuffle = True,
    drop_last = True,
    collate_fn = collate_fn
)

if __name__ == '__main__':
    acc = 0
    total = 0

    test_model = Model().to(device)
    test_model.load_state_dict(torch.load('params/11bert.pt'))
    test_model.eval()

    for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(test_loader):

        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        token_type_ids = token_type_ids.to(device)
        labels = labels.to(device)

        out = test_model(input_ids, attention_mask, token_type_ids)

        out = out.argmax(dim=1)
        acc += (out == labels).sum().item()
        total += len(labels)

    print("Test Accuracy: {:.4f}".format(acc/total))

## 模型调用

In [None]:
sentiment_model = Model().to(device)

names = ["负向评价", "正向评价"]

def collate_fn_sentiment(data):
    sentences = data


    data = token.batch_encode_plus(
        sentences,
        padding="max_length",
        max_length=350,
        truncation=True,
        return_tensors="pt",
        return_length=True
    )

    input_ids = data["input_ids"]
    attention_mask = data["attention_mask"]
    token_type_ids = data["token_type_ids"]
    return input_ids, attention_mask, token_type_ids


def prediction():
    sentiment_model.load_state_dict(torch.load('params/11bert.pt'))
    sentiment_model.eval()

    while True:
        sentence = input("请输入句子(输入q退出)：")
        if sentence == "q":
            print("退出")
            break

        input_ids, attention_mask, token_type_ids = collate_fn_sentiment([sentence])
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        token_type_ids = token_type_ids.to(device)

        with torch.no_grad():
            output = sentiment_model(input_ids, attention_mask, token_type_ids)
            output = output.argmax(dim=1)
            print("模型判定", names[output], "\n")

if __name__ == '__main__':
    prediction()


In [None]:
val_dataset = CustomDataset('validation')
val_dataset[1]