In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import re
import nltk
from sklearn.model_selection import train_test_split
from collections import Counter
from torch.nn.utils.rnn import pad_sequence
import time

In [2]:
df = pd.read_csv("IMDB Dataset.csv")

In [3]:
# 2. 分词
def tokenize(text):
    text = re.sub(r"<.*?>", "", text)
    return text.lower().split()

df['tokens'] = df['review'].apply(tokenize)

In [4]:
# 3. 构建词表
all_tokens = [word for tokens in df['tokens'] for word in tokens]
vocab = ['<PAD>', '<UNK>'] + [word for word, _ in Counter(all_tokens).most_common(10000)]
word2idx = {word: idx for idx, word in enumerate(vocab)}


In [5]:
# 4. 编码文本和标签
def encode(tokens):
    return [word2idx.get(word, word2idx['<UNK>']) for word in tokens]

df['encoded'] = df['tokens'].apply(encode)
df['label'] = df['sentiment'].map({'positive': 1, 'negative': 0})


In [6]:
# 5. 划分训练/测试集
X_train, X_test, y_train, y_test = train_test_split(
    df['encoded'], df['label'], test_size=0.2, random_state=42)

In [7]:
# 6. 自定义Dataset
class IMDBDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = [torch.tensor(x, dtype=torch.long) for x in texts]
        self.labels = torch.tensor(labels.values, dtype=torch.float)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

def collate_fn(batch):
    texts, labels = zip(*batch)
    texts_padded = pad_sequence(texts, batch_first=True, padding_value=0)
    return texts_padded, torch.tensor(labels)

train_ds = IMDBDataset(X_train, y_train)
test_ds = IMDBDataset(X_test, y_test)
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_ds, batch_size=32, collate_fn=collate_fn)

In [None]:

class BiRNN(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super(BiRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.rnn = nn.RNN(embed_size, hidden_size, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        embedded = self.embedding(x)
        output, _ = self.rnn(embedded)
        out = self.fc(output[:, -1, :])
        return self.sigmoid(out).squeeze()

In [9]:
class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size):
        super(BiLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True, bidirectional=True)
        #单项
        #self.fc = nn.Linear(hidden_size, 1)  # 不乘2了
        #双向
        self.fc = nn.Linear(hidden_size * 2, 1)  # 双向所以乘以2
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        embedded = self.embedding(x)                        # [batch_size, seq_len, embed_dim]
        output, (hn, cn) = self.lstm(embedded)              # output: [batch_size, seq_len, hidden_dim*2]
        #双向
        final_feature = torch.cat((hn[-2,:,:], hn[-1,:,:]), dim=1)  # [batch_size, hidden_dim*2]
        out = self.fc(final_feature)                        # [batch_size, 1]
        return self.sigmoid(out).squeeze()
        #单向
        #final_feature = hn[-1, :, :]  # shape: [batch_size, hidden_dim]
        #out = self.fc(final_feature)
        #return self.sigmoid(out).squeeze()

In [10]:
# 8. 训练模型
start_time = time.time()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = BiLSTM(len(word2idx), 64, 64).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    total_loss = 0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")
end_time = time.time()

Epoch 1, Loss: 0.6517
Epoch 2, Loss: 0.5656
Epoch 3, Loss: 0.4601
Epoch 4, Loss: 0.3604
Epoch 5, Loss: 0.3222
Epoch 6, Loss: 0.2955
Epoch 7, Loss: 0.2336
Epoch 8, Loss: 0.1975
Epoch 9, Loss: 0.1663
Epoch 10, Loss: 0.1397


In [11]:
print(f"训练总耗时: {end_time - start_time:.2f} 秒")

训练总耗时: 79.45 秒


In [12]:
torch.save(model.state_dict(), 'bidirection_lstm_model.pth')

In [13]:
# 9. 评估模型
model.eval()
correct = total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        preds = (outputs > 0.5).float()
        correct += (preds == labels).sum().item()
        total += labels.size(0)
print(f"Test Accuracy: {correct / total * 100:.2f}%")

Test Accuracy: 88.34%
