In [1]:
from keras.datasets import imdb
import torch.nn as nn
import torch
from torch.nn.utils.rnn import pad_sequence
import warnings
import os
warnings.filterwarnings("ignore")

In [2]:
def load_data(device='cpu'):
    # 加载 IMDB 数据集
    (train_data, train_labels), (test_data, test_labels) = imdb.load_data(num_words=10000)
    
    # 统一长度，不足的填充 0
    max_len = 200
    train_data = [torch.tensor(seq)[:max_len] for seq in train_data]
    test_data = [torch.tensor(seq)[:max_len] for seq in test_data]
    
    # 填充数据
    train_data = pad_sequence(train_data, batch_first=True)
    test_data = pad_sequence(test_data, batch_first=True)
    
    # 转换为 pytorch tensor
    train_labels = torch.tensor(train_labels, dtype=torch.float32, device=device)
    test_labels = torch.tensor(test_labels, dtype=torch.float32, device=device)
    train_data = torch.tensor(train_data, dtype=torch.long, device=device)
    test_data = torch.tensor(test_data, dtype=torch.long, device=device)
    
    # 查看数据集大小
    print(f'训练集大小: {len(train_data)}')
    print(f'测试集大小: {len(test_data)}')
    print("train labels shape: ", train_labels.shape)
    print("test labels shape: ", test_labels.shape)
    print("train data shape: ", train_data.shape)
    print("test data shape: ", test_data.shape)

    return train_data, train_labels, test_data, test_labels

In [3]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_data, train_labels, test_data, test_labels = load_data(device=device)
print(train_data[0])

训练集大小: 25000
测试集大小: 25000
train labels shape:  torch.Size([25000])
test labels shape:  torch.Size([25000])
train data shape:  torch.Size([25000, 200])
test data shape:  torch.Size([25000, 200])
tensor([   1,   14,   22,   16,   43,  530,  973, 1622, 1385,   65,  458, 4468,
          66, 3941,    4,  173,   36,  256,    5,   25,  100,   43,  838,  112,
          50,  670,    2,    9,   35,  480,  284,    5,  150,    4,  172,  112,
         167,    2,  336,  385,   39,    4,  172, 4536, 1111,   17,  546,   38,
          13,  447,    4,  192,   50,   16,    6,  147, 2025,   19,   14,   22,
           4, 1920, 4613,  469,    4,   22,   71,   87,   12,   16,   43,  530,
          38,   76,   15,   13, 1247,    4,   22,   17,  515,   17,   12,   16,
         626,   18,    2,    5,   62,  386,   12,    8,  316,    8,  106,    5,
           4, 2223, 5244,   16,  480,   66, 3785,   33,    4,  130,   12,   16,
          38,  619,    5,   25,  124,   51,   36,  135,   48,   25, 1415,   33,
      

In [4]:
import math
import torch
import torch.nn as nn

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=200):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.encoding[:, :seq_len, :].to(x.device)



class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, num_heads, hidden_dim, num_layers, output_dim, dropout=0.1, max_len=200):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.positional_encoding = PositionalEncoding(embed_size, max_len)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_size, nhead=num_heads, dim_feedforward=hidden_dim, dropout=dropout)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embed_size, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        embedded = self.embedding(x)
        encoded = self.positional_encoding(embedded)  # 加入位置编码
        transformer_out = self.transformer(encoded)
        out = self.fc(self.dropout(transformer_out.mean(dim=1)))  # 平均池化取每个序列的特征
        return out


In [5]:
from torch.utils.data import Dataset, DataLoader
from prefetch_generator import BackgroundGenerator

class IMDBDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

class DataLoaderX(DataLoader):
    def __iter__(self):
        return BackgroundGenerator(super().__iter__(), max_prefetch=32)

In [6]:
def train_model(model, train_data, train_labels, criterion, optimizer, num_epochs=5, batch_size=128):
    # 创建数据集和数据加载器
    dataset = IMDBDataset(train_data, train_labels)
    train_loader = DataLoaderX(dataset, batch_size=batch_size, shuffle=True)

    model.train()
    correct = 0
    total = 0
    for epoch in range(num_epochs):
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            correct += (torch.round(torch.sigmoid(outputs)).squeeze() == labels).sum().item()
            total += labels.size(0)
            loss = criterion(outputs, labels.unsqueeze(1))  # 需要调整标签形状
            loss.backward()
            optimizer.step()
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Accuracy: {correct/total:.4f}')

def evaluate_model(model, test_data, test_labels, batch_size=32):
    dataset = IMDBDataset(test_data, test_labels)
    test_loader = DataLoaderX(dataset, batch_size=batch_size, shuffle=False)

    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            predictions = torch.round(torch.sigmoid(outputs))
            correct += (predictions.squeeze() == labels).sum().item()
            total += labels.size(0)
    accuracy = correct / total
    print(f'Accuracy: {accuracy:.4f}')

In [7]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using {device} device')

# 设置超参数
vocab_size = 10000
embedding_dim = 128
hidden_dim = 128
output_dim = 1
n_layers = 3
dropout = 0.2
num_epochs = 30
learning_rate = 0.001
batch_size = 256
max_len = 200

Using cuda device


In [21]:
# Transformer 模型训练与评估
transformer_model = TransformerModel(vocab_size, embedding_dim, 8, hidden_dim, n_layers, output_dim, dropout, max_len=200)
transformer_model = transformer_model.to(device)
optimizer = torch.optim.Adam(transformer_model.parameters(), lr=learning_rate)
criterion = nn.BCEWithLogitsLoss()
train_model(transformer_model, train_data, train_labels, criterion, optimizer, num_epochs=100, batch_size=batch_size)
evaluate_model(transformer_model, test_data, test_labels, batch_size)
torch.save(transformer_model.state_dict(), 'data' + os.sep + 'transformer_model.pth')

Epoch [1/100], Loss: 0.6547, Accuracy: 0.5701
Epoch [2/100], Loss: 0.5133, Accuracy: 0.6376
Epoch [3/100], Loss: 0.4719, Accuracy: 0.6775
Epoch [4/100], Loss: 0.5312, Accuracy: 0.7039
Epoch [5/100], Loss: 0.4836, Accuracy: 0.7230
Epoch [6/100], Loss: 0.4194, Accuracy: 0.7378
Epoch [7/100], Loss: 0.4120, Accuracy: 0.7492
Epoch [8/100], Loss: 0.3400, Accuracy: 0.7588
Epoch [9/100], Loss: 0.3721, Accuracy: 0.7671
Epoch [10/100], Loss: 0.3356, Accuracy: 0.7744
Epoch [11/100], Loss: 0.3562, Accuracy: 0.7810
Epoch [12/100], Loss: 0.4042, Accuracy: 0.7870
Epoch [13/100], Loss: 0.3002, Accuracy: 0.7923
Epoch [14/100], Loss: 0.2824, Accuracy: 0.7975
Epoch [15/100], Loss: 0.3203, Accuracy: 0.8024
Epoch [16/100], Loss: 0.2772, Accuracy: 0.8073
Epoch [17/100], Loss: 0.2749, Accuracy: 0.8119
Epoch [18/100], Loss: 0.3094, Accuracy: 0.8163
Epoch [19/100], Loss: 0.2346, Accuracy: 0.8205
Epoch [20/100], Loss: 0.1999, Accuracy: 0.8245
Epoch [21/100], Loss: 0.2210, Accuracy: 0.8284
Epoch [22/100], Loss: 

In [36]:
def index2word(sentence_index):
    start_char = 1
    oov_char = 2
    index_from = 3
    
    word_index = imdb.get_word_index()
    inverted_word_index = dict(
        (i + index_from, word) for (word, i) in word_index.items()
    )
    
    inverted_word_index[start_char] = "[START]"
    inverted_word_index[oov_char] = "[OOV]"
    inverted_word_index[0] = "[PAD]"
    
    decoded_sequence = " ".join(inverted_word_index[i] for i in sentence_index)
    return decoded_sequence

def word2index(sentence_word):
    start_char = 1
    oov_char = 2
    index_from = 3
    
    word_index = imdb.get_word_index()
    inverted_word_index = dict(
        (word, i + index_from) for (word, i) in word_index.items()
    )
    inverted_word_index["[START]"] = start_char
    inverted_word_index["[OOV]"] = oov_char
    inverted_word_index["[PAD]"] = 0
    
    sentence_index = [inverted_word_index.get(word, 0) for word in sentence_word]
    return sentence_index

# 使用句子测试
def predict_sentiment(sentence, model, vocab_size=200, max_len=200, device='cpu', other_data=None):
    # 预处理句子
    sentence = sentence.lower().split(" ")
    sentence = word2index(sentence)
    sentence = torch.tensor(sentence).to(device)
    # 使用"[PAD]"填充
    if len(sentence) < max_len:
        sentence = torch.cat([sentence, torch.zeros(max_len-len(sentence), dtype=torch.long, device=device)])
    else:
        sentence = sentence[:max_len]
    sentence = pad_sequence([sentence], batch_first=True)
    # 拼接train_data
    sentence = torch.cat([other_data, sentence], dim=0)
    
    # 预测
    model.eval()
    with torch.no_grad():
        output = model(sentence)
        prediction = torch.round(torch.sigmoid(output))
    return prediction[-1].item()

print("Sample sentence: ")
print(index2word(test_data[0].cpu().numpy()))

Sample sentence: 
[START] please give this one a miss br br [OOV] [OOV] and the rest of the cast rendered terrible performances the show is flat flat flat br br i don't know how michael madison could have allowed this one on his plate he almost seemed to know this wasn't going to work out and his performance was quite [OOV] so all you madison fans give this a miss [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PAD] [PA

In [43]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transformer_model = TransformerModel(vocab_size, embedding_dim, 8, hidden_dim, n_layers, output_dim, dropout, max_len=200)
transformer_model = transformer_model.to(device)
transformer_model.load_state_dict(torch.load('data' + os.sep + 'transformer_model.pth', map_location=device))
# 选择一部分数据作为其他数据, 避免单个样本
other_data = train_data[:255]

sentence = '[START] this movie is great I love it It is the best movie ever [OOV]'
prediction = predict_sentiment(sentence, transformer_model, device=device, other_data=other_data)
print(f'Sentence: {sentence}, Prediction: {"Positive" if prediction == 1 else "Negative"}')

sentence = '[START] It is terrible [OOV]'
prediction = predict_sentiment(sentence, transformer_model, device=device, other_data=other_data)
print(f'Sentence: {sentence}, Prediction: {"Positive" if prediction == 1 else "Negative"}')

sentence = '[START] I love this movie [OOV]'
prediction = predict_sentiment(sentence, transformer_model, device=device, other_data=other_data)
print(f'Sentence: {sentence}, Prediction: {"Positive" if prediction == 1 else "Negative"}')

sentence = '[START] It is not good enough [OOV]'
prediction = predict_sentiment(sentence, transformer_model, device=device, other_data=other_data)
print(f'Sentence: {sentence}, Prediction: {"Positive" if prediction == 1 else "Negative"}')

sentence = '[START] the director is a big fool [OOV]'
prediction = predict_sentiment(sentence, transformer_model, device=device, other_data=other_data)
print(f'Sentence: {sentence}, Prediction: {"Positive" if prediction == 1 else "Negative"}')

Sentence: [START] this movie is great I love it It is the best movie ever [OOV], Prediction: Positive
Sentence: [START] It is terrible [OOV], Prediction: Negative
Sentence: [START] I love this movie [OOV], Prediction: Positive
Sentence: [START] It is not good enough [OOV], Prediction: Negative
Sentence: [START] the director is a big fool [OOV], Prediction: Negative
