In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
import collections
import numpy as np
import nltk

In [2]:
# 统计 train.txt 中最长句子的长度 maxlen; 句子条数 num_recs; 每词的词频 word_freqs[]
maxlen = 0
word_freqs = collections.Counter()
num_recs = 0
with open('./data/train.txt', 'r', encoding='utf-8') as f:
    for line in f:
        label, sentence = line.strip().split("\t")
        words = nltk.word_tokenize(sentence.lower()) #逐句分词
        maxlen = max(maxlen, len(words))    #最长句子
        word_freqs.update(words)
        num_recs += 1

print('max_len ', maxlen)   #最长句子
print('nb_words ', len(word_freqs))#单词数，为了统计词典大小

max_len  42
nb_words  2270


In [3]:
# 生成单词表和数字表
MAX_FEATURES = 2000   #词典大小
MAX_SENTENCE_LENGTH = 40   #句子长度限定为40
vocab_size = min(MAX_FEATURES, len(word_freqs)) + 2  #加入填充词，和未知词

word2index = {x[0]: i + 2 for i, x in enumerate(word_freqs.most_common(MAX_FEATURES))}
word2index["PAD"] = 0
word2index["UNK"] = 1
index2word = {v: k for k, v in word2index.items()}

# 数据准备
class TextDataset(Dataset):
    def __init__(self, file_path):
        self.X = []
        self.y = []
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                label, sentence = line.strip().split("\t")
                words = nltk.word_tokenize(sentence.lower())
                seqs = [word2index.get(word, word2index["UNK"]) for word in words]
                self.X.append(seqs)
                self.y.append(int(label))
        self.y = torch.tensor(self.y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return torch.tensor(self.X[idx]), self.y[idx]

# 实例化数据集
dataset = TextDataset('./data/train.txt')

In [4]:
# 填充序列
def pad_sequences(sequences, maxlen):  #填充句子
    return [torch.tensor(seq + [word2index["PAD"]] * (maxlen - len(seq)) if len(seq) < maxlen else seq[:maxlen]) for seq in sequences]

Xtrain, Xtest, ytrain, ytest = train_test_split(dataset.X, dataset.y.numpy(), test_size=0.2, random_state=42)
Xtrain = pad_sequences(Xtrain, MAX_SENTENCE_LENGTH) #按40最大长度来填充
Xtest = pad_sequences(Xtest, MAX_SENTENCE_LENGTH)

# 创建 DataLoader，按batchsize分批
train_loader = DataLoader(list(zip(Xtrain, ytrain)), batch_size=32, shuffle=True)
test_loader = DataLoader(list(zip(Xtest, ytest)), batch_size=32, shuffle=False)

In [5]:
# 定义 RNN 模型
class RNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        embedded = self.embedding(x)
        packed_output, hidden = self.rnn(embedded)#每个时间步的预测向量和隐向量
        output = self.fc(hidden[-1])
        return self.sigmoid(output)

# 初始化模型
EMBEDDING_SIZE = 128
HIDDEN_LAYER_SIZE = 64
model = RNN(vocab_size, EMBEDDING_SIZE, HIDDEN_LAYER_SIZE)

# 定义损失函数和优化器
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters())

# 训练模型
NUM_EPOCHS = 20
for epoch in range(NUM_EPOCHS):
    model.train()
    total_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f'Epoch {epoch + 1}, Loss: {total_loss / len(train_loader):.4f}')

Epoch 1, Loss: 0.6867
Epoch 2, Loss: 0.5002
Epoch 3, Loss: 0.2772
Epoch 4, Loss: 0.2414
Epoch 5, Loss: 0.2173
Epoch 6, Loss: 0.2006
Epoch 7, Loss: 0.1891
Epoch 8, Loss: 0.1617
Epoch 9, Loss: 0.1544
Epoch 10, Loss: 0.2410
Epoch 11, Loss: 0.1508
Epoch 12, Loss: 0.1552
Epoch 13, Loss: 0.1218
Epoch 14, Loss: 0.1157
Epoch 15, Loss: 0.1078
Epoch 16, Loss: 0.1123
Epoch 17, Loss: 0.1264
Epoch 18, Loss: 0.1034
Epoch 19, Loss: 0.0991
Epoch 20, Loss: 0.0905


In [6]:
# 测试模型
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        predicted = (outputs.squeeze() >= 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
accuracy = correct / total
print(f'Test Accuracy: {accuracy:.4f}')

Test Accuracy: 0.9147


In [7]:
# 预测
INPUT_SENTENCES = ['I love reading.', 'You are so boring.']
XX = []

for sentence in INPUT_SENTENCES:
    words = nltk.word_tokenize(sentence.lower())
    seq = [word2index.get(word, word2index['UNK']) for word in words]
    XX.append(seq)

XX = pad_sequences(XX, MAX_SENTENCE_LENGTH)
labels = model(torch.stack(XX)).detach().numpy()
labels = [int(round(x[0])) for x in labels]

label2word = {1: '积极', 0: '消极'}
for i in range(len(INPUT_SENTENCES)):
    print('{}   {}'.format(label2word[labels[i]], INPUT_SENTENCES[i]))

积极   I love reading.
消极   You are so boring.
