基于GPT-2预训练模型的prompt learning：通过人工定义prompt template与verbalizer，进行句子情感分类

In [1]:
pip install transformers

Looking in indexes: http://repo.myhuaweicloud.com/repository/pypi/simple
Collecting transformers
  Downloading http://repo.myhuaweicloud.com/repository/pypi/packages/5b/0b/e45d26ccd28568013523e04f325432ea88a442b4e3020b757cf4361f0120/transformers-4.30.2-py3-none-any.whl (7.2 MB)
[K     |████████████████████████████████| 7.2 MB 100.0 MB/s eta 0:00:01
Collecting regex!=2019.12.17
  Downloading http://repo.myhuaweicloud.com/repository/pypi/packages/9a/05/18911646681dfab0ffb76b4b958356c0a3d211bb08e9a2f33f1e9487977d/regex-2024.4.16-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (761 kB)
[K     |████████████████████████████████| 761 kB 19.9 MB/s eta 0:00:01
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1
  Downloading http://repo.myhuaweicloud.com/repository/pypi/packages/4d/40/ab3c3c705e0a8cbbe760c49302b407190201d96fe7dfeea37ccafa004da3/tokenizers-0.13.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[K     |████████████████████████████████| 7.8 MB 112.5 MB/s e

In [9]:
import os

import numpy
import torch
from sklearn.model_selection import train_test_split
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, Dataset
from transformers import GPT2TokenizerFast, GPT2LMHeadModel

from tqdm import tqdm
from easydict import EasyDict as edict
import codecs
import math
from pathlib import Path
import random

In [10]:

cfg = edict({
    'name': 'movie review',
    'pre_trained': True,
    'num_classes': 2,
    'batch_size': 15,
    'epoch_size': 3,
    'weight_decay': 3e-5,
    'data_path': "./data/prompt tuning/data/",
    'checkpoint_path': 'soft-prompt.pth',
    'device_name':"cuda" if torch.cuda.is_available() else "cpu",
    'gpt2_model':'./gpt2',
    'prompt_len':10,
    'max_len' : 100,
    'classes':[['positive'],['negative']],
    'split': 0.8,
    'device_target': 'Ascend',
    'device_id': 0,
    'keep_checkpoint_max': 1,
    'word_len': 768,
    'vec_length': 40,
})

In [11]:

## load model ##

tokenizer = GPT2TokenizerFast.from_pretrained(
    cfg.gpt2_model, add_prefix_space=True
)
tokenizer.pad_token = tokenizer.eos_token

model = GPT2LMHeadModel.from_pretrained(cfg.gpt2_model)
# 冻结所有参数
for param in model.parameters():
    param.requires_grad = False
model.eval()

GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(50257, 768)
    (wpe): Embedding(1024, 768)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0): GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2Attention(
          (c_attn): Conv1D()
          (c_proj): Conv1D()
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D()
          (c_proj): Conv1D()
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
      (1): GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2Attention(
          (c_attn): Conv1D()
          (c_proj): Conv1D()
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dro

In [12]:
# 数据预览
with open(cfg.data_path + "rt-polarity.neg", 'r', encoding='utf-8') as f:
        print("Negative reivews:")
        for i in range(5):
            print("[{0}]:{1}".format(i,f.readline()))
with open(cfg.data_path + "rt-polarity.pos", 'r', encoding='utf-8') as f:
        print("Positive reivews:")
        for i in range(5):
            print("[{0}]:{1}".format(i,f.readline()))

Negative reivews:
[0]:simplistic , silly and tedious . 

[1]:it's so laddish and juvenile , only teenage boys could possibly find it funny . 

[2]:exploitative and largely devoid of the depth or sophistication that would make watching such a graphic treatment of the crimes bearable . 

[3]:[garbus] discards the potential for pathological study , exhuming instead , the skewed melodrama of the circumstantial situation . 

[4]:a visually flashy but narratively opaque and emotionally vapid exercise in style and mystification . 

Positive reivews:
[0]:the rock is destined to be the 21st century's new " conan " and that he's going to make a splash even greater than arnold schwarzenegger , jean-claud van damme or steven segal . 

[1]:the gorgeously elaborate continuation of " the lord of the rings " trilogy is so huge that a column of words cannot adequately describe co-writer/director peter jackson's expanded vision of j . r . r . tolkien's middle-earth . 

[2]:effective but too-tepid biopic

In [13]:
class CustomDataset(Dataset):
    #data: list[dict[str, torch.Tensor]]

    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [None]:
class MovieDataset:
    '''
    影评数据集
    '''
    def __init__(self, root_dir, maxlen, split):
        '''
        input:
            root_dir: 影评数据目录
            maxlen: 设置句子最大长度
            split: 设置数据集中训练/评估的比例
        '''
        self.path = root_dir
        self.files = []

        self.doConvert = False
        
        mypath = Path(self.path)
        if not mypath.exists() or not mypath.is_dir():
            print("please check the root_dir!")
            raise ValueError

        # 在数据目录中找到文件
        for root,_,filename in os.walk(self.path):
            for each in filename:
                self.files.append(os.path.join(root,each))
            break

        # 确认是否为两个文件.neg与.pos
        if len(self.files) != 2:
            print("There are {} files in the root_dir".format(len(self.files)))
            raise ValueError

        # 读取数据
        self.word_num = 0
        self.maxlen = 0
        self.minlen = float("inf")
        self.maxlen = float("-inf")
        self.Pos = []
        self.Neg = []
        self.sentences = []
        self.isShuffle = True

        for filename in self.files:
            f = codecs.open(filename, 'r')
            ff = f.read()
            file_object = codecs.open(filename, 'w', 'utf-8')
            file_object.write(ff)
            self.read_data(filename)

        self.Pos = self.process_data(self.Pos, cfg.classes[0][0])
        self.Neg = self.process_data(self.Neg, cfg.classes[1][0])
        
        #self.text2vec(maxlen=maxlen)
        self.split_dataset(split=split)

    def read_data(self, filePath):
        with open(filePath,'r') as f:
            for sentence in f.readlines():
                sentence = sentence.replace('\n','')\
                    .replace('"','')\
                    .replace('\'','')\
                    .replace('.','')\
                    .replace(',','')\
                    .replace('[','')\
                    .replace(']','')\
                    .replace('(','')\
                    .replace(')','')\
                    .replace(':','')\
                    .replace('--','')\
                    .replace('-',' ')\
                    .replace('\\','')\
                    .replace('0','')\
                    .replace('1','')\
                    .replace('2','')\
                    .replace('3','')\
                    .replace('4','')\
                    .replace('5','')\
                    .replace('6','')\
                    .replace('7','')\
                    .replace('8','')\
                    .replace('9','')\
                    .replace('`','')\
                    .replace('=','')\
                    .replace('$','')\
                    .replace('/','')\
                    .replace('*','')\
                    .replace(';','')\
                    .replace('<b>','')\
                    .replace('%','')
                if sentence:
                    self.word_num += len(sentence.split(' '))
                    self.maxlen = max(self.maxlen, len(sentence.split(' ')))
                    self.minlen = min(self.minlen, len(sentence.split(' ')))
                    if 'pos' in filePath:
                        self.Pos.append([sentence, self.feelMap['pos']])
                    else:
                        self.Neg.append([sentence, self.feelMap['neg']])

    def process_data(self, data_set, tag):
        ret = []
        prompt = f"Is the sentiment positive or negative"  # 硬提示
        for line in data_set:
            res = tokenizer(
                (prompt + line).strip('\n'),
                return_tensors="pt",
                text_target=tag,
                padding='max_length',
                max_length=cfg.max_len + cfg.prompt_len,
                add_special_tokens=True,
            )
            res['text'] = line
            res['input_ids'] = res['input_ids'].squeeze(0)
            res['labels'] = res['labels'].squeeze(0)
            res['attention_mask'] = res['attention_mask'].squeeze(0)
            res['answer'] = tag
            res['len'] = res['attention_mask'].sum()
            res['attention_mask'][res['len']:res['len'] + cfg.prompt_len] = 1
            ret.append(res)
        return ret

    def split_dataset(self, split):
        '''
        分割为训练集与测试集

        '''

        trunk_pos_size = math.ceil((1-split)*len(self.Pos))
        trunk_neg_size = math.ceil((1-split)*len(self.Neg))
        trunk_num = int(1/(1-split))
        pos_temp=list()
        neg_temp=list()
        for index in range(trunk_num):
            pos_temp.append(self.Pos[index*trunk_pos_size:(index+1)*trunk_pos_size])
            neg_temp.append(self.Neg[index*trunk_neg_size:(index+1)*trunk_neg_size])
        self.test = pos_temp.pop(2)+neg_temp.pop(2)
        self.train = [i for item in pos_temp+neg_temp for i in item]

        random.shuffle(self.train)
        # random.shuffle(self.test)

    def get_dict_len(self):
        '''
        获得数据集中文字组成的词典长度
        '''
        if self.doConvert:
            return len(self.Vocab)
        else:
            print("Haven't finished Text2Vec")
            return -1
        
    def train_dataset(self):
        return CustomDataset(self.train)
    
    def test_dataset(self):
        return CustomDataset(self.test) 



In [48]:
instance = MovieDataset(cfg.data_path, maxlen=cfg.max_len, split = cfg.split)
train_dataset = instance.train_dataset()
test_dataset = instance.test_dataset()

In [69]:
# 看下一个词最大概率是什么，效果不好
def test():
    cfg.batch_size = 1
    data_loader = DataLoader(test_dataset, batch_size=cfg.batch_size, )
    total = 0
    correct = 0
    # 预先编码 'positive' 和 'negative' 以减少循环中的计算
    positive_token_id = tokenizer.encode('positive')[0]
    negative_token_id = tokenizer.encode('negative')[0]
    for batch in data_loader:
        inputs, labels = batch['input_ids'].to(cfg.device_name), batch['labels'].to(cfg.device_name)
        
        output = model(inputs, labels=labels)
        logits = output.logits[:, -1, :]  # 取最后一个token的输出
        
        # 选取最后一个提示词对应的生成词
        AnswerPlace = (batch["len"] + cfg.prompt_len - 1).to(cfg.device_name)

        probabilities = torch.nn.functional.softmax(output.logits[:, :, :], dim=-1)
        answer_pb = probabilities[torch.arange(probabilities.shape[0]), AnswerPlace]
        predicted_tokens = [tokenizer.decode(s).strip() for s
                            in torch.argmax(answer_pb, dim=-1)]
        
        batch['result'] = predicted_tokens
        for i in range(cfg.batch_size):
            print(f'text: {batch["text"][i].strip()}')
            print(f'result/answer: {batch["result"][i].strip()}/{batch["answer"][i].strip()}')
            print()
            total += 1
            if batch["result"][i] == batch["answer"][i]:
                correct += 1

        print(f'correct: {correct}/{total} = {correct / total}')


In [72]:
# 比较下一个词是pos还是neg的概率，效果也不好
def test():
    cfg.batch_size = 1
    data_loader = DataLoader(test_dataset, batch_size=cfg.batch_size, )
    total = 0
    correct = 0
    # 预先编码 'positive' 和 'negative' 以减少循环中的计算
    positive_token_id = tokenizer.encode('positive')[0]
    negative_token_id = tokenizer.encode('negative')[0]
    for batch in data_loader:
        inputs, labels = batch['input_ids'].to(cfg.device_name), batch['labels'].to(cfg.device_name)
        
        output = model(inputs, labels=labels)
        logits = output.logits[:, -1, :]  # 取最后一个token的输出
        # 获取 'positive' 和 'negative' token 的概率
        probabilities = torch.nn.functional.softmax(logits, dim=-1)
        positive_probs = probabilities[:, positive_token_id]
        negative_probs = probabilities[:, negative_token_id]
        # 计算每个样本的结果
        results = (positive_probs > negative_probs).long()
        batch['result'] = [cfg.classes[0][0] if result.item() == 1 else cfg.classes[1][0] for result in results]

        # 打印结果
        print(f"text: {batch['text'][0].strip()}")
        print(f"probability of 'positive': {positive_probs[0]}")
        print(f"probability of 'negative': {negative_probs[0]}")
        print(f"correct label: {batch['answer'][0]}")

        #batch['result'] = predicted_tokens
        for i in range(cfg.batch_size):
            print(f'text: {batch["text"][i].strip()}')
            print(f'result/answer: {batch["result"][i].strip()}/{batch["answer"][i].strip()}')
            print()
            total += 1
            if batch["result"][i] == batch["answer"][i]:
                correct += 1

        print(f'correct: {correct}/{total} = {correct / total}')


In [73]:
test()

text: kinnear gives a tremendous performance .
probability of 'positive': 5.478357212318485e-10
probability of 'negative': 2.7351310105672155e-10
correct label: positive
text: kinnear gives a tremendous performance .
result/answer: positive/positive

correct: 1/1 = 1.0
text: director david fincher and writer david koepp can't sustain it .
probability of 'positive': 5.371085243233154e-10
probability of 'negative': 2.6735461067239896e-10
correct label: negative
text: director david fincher and writer david koepp can't sustain it .
result/answer: positive/negative

correct: 1/2 = 0.5
text: after sitting through this sloppy , made-for-movie comedy special , it makes me wonder if lawrence hates criticism so much that he refuses to evaluate his own work .
probability of 'positive': 5.983928907937752e-10
probability of 'negative': 2.8242583272053423e-10
correct label: negative
text: after sitting through this sloppy , made-for-movie comedy special , it makes me wonder if lawrence hates critic

KeyboardInterrupt: 