In [None]:
import torch
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

import pandas as pd
from sklearn.model_selection import train_test_split
import re
import nltk
import string
from collections import Counter
import numpy as np
from argparse import Namespace
import collections
import os

In [None]:
args = Namespace(
    raw_train_dataset_csv="YOUR_PATH/train.csv",
    raw_test_dataset_csv="YOUR_PATH/test.csv",
    proportion_subset_of_train=0.1,
    train_proportion=0.7,
    val_proportion=0.15,
    test_proportion=0.15,
    output_munged_csv="reviews_with_splits_lite.csv",
    seed=1337
)

In [None]:
train_reviews = pd.read_csv(args.raw_train_dataset_csv, header=None, names=['rating', 'review'])

# 选择总数据的10%以加快实验速度
by_rating = collections.defaultdict(list)
for _, row in train_reviews.iterrows():
    by_rating[row.rating].append(row.to_dict())
    
review_subset = []

for _, item_list in sorted(by_rating.items()):
    n_total = len(item_list)  # 获取每个评分类别的总数
    n_subset = int(args.proportion_subset_of_train * n_total)  # 计算子集的大小
    review_subset.extend(item_list[:n_subset])  # 添加子集数据到review_subset

review_subset = pd.DataFrame(review_subset)  # 将子集转换为DataFrame

In [None]:
# 按评分拆分子集以创建新的训练、验证和测试集
by_rating = collections.defaultdict(list)
for _, row in review_subset.iterrows():
    by_rating[row.rating].append(row.to_dict())
    
final_list = []
np.random.seed(args.seed)

for _, item_list in sorted(by_rating.items()):

    np.random.shuffle(item_list)  # 随机打乱每个评分的评论列表
    
    n_total = len(item_list)  # 获取当前评分的评论总数
    n_train = int(args.train_proportion * n_total)  # 计算训练集的数量
    n_val = int(args.val_proportion * n_total)  # 计算验证集的数量
    n_test = int(args.test_proportion * n_total)  # 计算测试集的数量
    
    # 给数据点添加 split 属性
    for item in item_list[:n_train]:
        item['split'] = 'train'
    
    for item in item_list[n_train:n_train+n_val]:
        item['split'] = 'val'
        
    for item in item_list[n_train+n_val:n_train+n_val+n_test]:
        item['split'] = 'test'

    # 添加到最终列表
    final_list.extend(item_list)
    
final_reviews = pd.DataFrame(final_list)  # 将最终列表转换为 DataFrame

In [None]:
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r"([.,!?])", r" \1 ", text)
    text = re.sub(r"[^a-zA-Z.,!?]+", r" ", text)
    return text

final_reviews.review = final_reviews.review.apply(preprocess_text)

In [None]:
# Mapping positive and negative reviews
mapping_dict = {1 : 'Negative', 2 : 'Positive'}
final_reviews['rating'] = final_reviews['rating'].map(mapping_dict)

In [None]:
final_reviews.head()

In [None]:
final_reviews.to_csv("YOUR_PATH/reviews_with_splits.csv",index = False)

In [None]:
class ReviewDataset(Dataset):
    def __init__(self, review_df, vectorizer):
        """
        参数:
            review_df (pandas.DataFrame): 数据集
            vectorizer (ReviewVectorizer): 从数据集实例化的向量化器
        """
        self.review_df = review_df
        self._vectorizer = vectorizer

        # 训练集
        self.train_df = self.review_df[self.review_df.split == 'train']
        self.train_size = len(self.train_df)

        # 验证集
        self.val_df = self.review_df[self.review_df.split == "val"]
        self.validation_size = len(self.val_df)

        # 测试集
        self.test_df = self.review_df[self.review_df.split == "test"]
        self.test_size = len(self.test_df)

        # 查找字典
        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.validation_size),
                             'test': (self.test_df, self.test_size)}
        self.set_split('train')

    @classmethod
    def load_dataset_and_make_vectorizer(cls, review_csv):
        """加载数据集并从头创建一个新的向量化器
        参数:
            review_csv (str): 数据集的位置
        返回:
            ReviewDataset 的一个实例
        """
        review_df = pd.read_csv(review_csv)
        return cls(review_df, ReviewVectorizer.from_dataframe(review_df))
    
    def get_vectorizer(self):
        """返回向量化器"""
        return self._vectorizer
    
    def set_split(self, split="train"):
        """选择数据集中的拆分部分
        参数:
            split (str): "train", "val" 或 "test" 之一
        """
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]
        
    def __len__(self):
        """返回目标拆分部分的大小"""
        return self._target_size
    
    def __getitem__(self, index):
        """PyTorch 数据集的主要入口方法
        参数:
            index (int): 数据点的索引
        返回:
            数据点的特征 (x_data) 和标签 (y_target) 的字典
        """
        row = self._target_df.iloc[index]
        review_vector = self._vectorizer.vectorize(row.review)
        rating_index = self._vectorizer.rating_vocab.lookup_token(row.rating)
        return {'x_data': review_vector,
                'y_target': rating_index}
    
    def get_num_batches(self, batch_size):
        """给定批量大小，返回数据集中的批次数量
        参数:
            batch_size (int)
        返回:
            数据集中的批次数量
        """
        return len(self) // batch_size

In [None]:
class Vocabulary(object):
    """处理文本并提取映射用词汇表的类"""

    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        """
        初始化函数
        参数:
            token_to_idx (dict): 预先存在的token到索引的映射字典
            add_unk (bool): 是否添加UNK（未知）token的标志
            unk_token (str): 要添加到词汇表中的UNK（未知）token
        """
        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx
        
        # 反向映射：从索引到token
        self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()}
        
        self._add_unk = add_unk
        self._unk_token = unk_token
        
        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token)
    
    def to_serializable(self):
        """返回一个可序列化的字典"""
        return {'token_to_idx': self._token_to_idx,
                'add_unk': self._add_unk,
                'unk_token': self._unk_token}
    
    @classmethod
    def from_serializable(cls, contents):
        """从一个序列化的字典实例化Vocabulary对象"""
        return cls(**contents)

    def add_token(self, token):
        """根据token更新映射字典
        参数:
            token (str): 要添加到词汇表中的项目
        返回:
            index (int): 与该token对应的整数索引
        """
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token
        return index
    
    def lookup_token(self, token):
        """检索与token对应的索引，如果token不存在，则返回UNK索引
        参数:
            token (str): 要查找的token
        返回:
            index (int): 与该token对应的索引
        注意:
            为了启用UNK功能，`unk_index`需要>=0（即已添加到词汇表中）
        """
        if self._add_unk:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]
    
    def lookup_index(self, index):
        """返回与索引对应的token
        参数:
            index (int): 要查找的索引
        返回:
            token (str): 与该索引对应的token
        抛出:
            KeyError: 如果索引不在词汇表中
        """
        if index not in self._idx_to_token:
            raise KeyError("索引 (%d) 不在词汇表中" % index)
        return self._idx_to_token[index]
    
    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)
    
    def __len__(self):
        return len(self._token_to_idx)


In [None]:
class ReviewVectorizer(object):
    """ 向量化器，协调词汇表并将其投入使用 """
    
    def __init__(self, review_vocab, rating_vocab):
        """
        参数:
            review_vocab (Vocabulary): 将单词映射到整数
            rating_vocab (Vocabulary): 将类别标签映射到整数
        """
        self.review_vocab = review_vocab
        self.rating_vocab = rating_vocab
    
    def vectorize(self, review):
        """ 为评论创建一个压缩的独热向量
        参数:
            review (str): 评论文本
        返回:
            one_hot (np.ndarray): 压缩的独热编码
        """
        one_hot = np.zeros(len(self.review_vocab), dtype=np.float32)
        
        for token in review.split(" "):
            if token not in string.punctuation:
                one_hot[self.review_vocab.lookup_token(token)] = 1
        return one_hot
    
    @classmethod
    def from_dataframe(cls, review_df, cutoff=25):
        """ 从数据集的 DataFrame 实例化向量化器
        参数:
            review_df (pandas.DataFrame): 评论数据集
            cutoff (int): 基于频率过滤的参数
        返回:
            ReviewVectorizer 的一个实例
        """
        review_vocab = Vocabulary(add_unk=True)
        rating_vocab = Vocabulary(add_unk=False)
        
        # 添加评分
        for rating in sorted(set(set(review_df.rating))):
            rating_vocab.add_token(rating)
            
        # 如果单词计数 > cutoff，则添加前几个单词
        word_counts = Counter()
        for review in review_df.review:
            for word in review.split(" "):
                if word not in string.punctuation:
                    word_counts[word] += 1
                    
        for word, count in word_counts.items():
            if count > cutoff:
                review_vocab.add_token(word)
        
        return cls(review_vocab, rating_vocab)
    
    @classmethod
    def from_serializable(cls, contents):
        """ 从可序列化的字典实例化 ReviewVectorizer
        参数:
            contents (dict): 可序列化的字典
        返回:
            ReviewVectorizer 类的一个实例
        """
        review_vocab = Vocabulary.from_serializable(contents['review_vocab'])
        rating_vocab = Vocabulary.from_serializable(contents['rating_vocab'])
        return cls(review_vocab=review_vocab, rating_vocab=rating_vocab)

    def to_serializable(self):
        """ 创建用于缓存的可序列化字典
        返回:
            contents (dict): 可序列化的字典
        """
        return {'review_vocab': self.review_vocab.to_serializable(),
                'rating_vocab': self.rating_vocab.to_serializable()}

In [None]:
def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"):
    """
    一个生成器函数，用于包装PyTorch的DataLoader。
    它将确保每个张量都在正确的设备位置上。
    参数:
        dataset (Dataset): 数据集
        batch_size (int): 批量大小
        shuffle (bool): 是否打乱数据
        drop_last (bool): 是否丢弃最后一个不完整的批次
        device (str): 设备类型（例如 "cpu" 或 "cuda"）
    """
    # 创建DataLoader
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                            shuffle=shuffle, drop_last=drop_last)

    # 遍历DataLoader生成的数据字典
    for data_dict in dataloader:
        out_data_dict = {}
        # 将每个张量移动到指定设备
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        # 生成处理后的数据字典
        yield out_data_dict

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class ReviewClassifier(nn.Module):
    """ 一个简单的基于感知器的分类器 """
    def __init__(self, num_features):
        """
        参数:
            num_features (int): 输入特征向量的大小
        """
        super(ReviewClassifier, self).__init__()
        self.fc1 = nn.Linear(in_features=num_features,
                             out_features=1)
        
    def forward(self, x_in, apply_sigmoid=False):
        """分类器的前向传播
        参数:
            x_in (torch.Tensor): 输入数据张量
            x_in.shape 应该是 (batch, num_features)
            apply_sigmoid (bool): 是否应用 Sigmoid 激活函数的标志
                                  如果与交叉熵损失一起使用，应为 False
        返回:
            结果张量。张量的形状应该是 (batch,)。
        """
        y_out = self.fc1(x_in).squeeze()
        if apply_sigmoid:
            y_out = F.sigmoid(y_out)
        return y_out

In [None]:
args = Namespace(
    # Data and path information
    frequency_cutoff=25,
    model_state_file='model.pth',
    review_csv='YOUR_PATH/reviews_with_splits.csv',
    save_dir='model_storage/ch3/yelp/',
    vectorizer_file='vectorizer.json',
    # No model hyperparameters
    # Training hyperparameters
    batch_size=128,
    early_stopping_criteria=5,
    learning_rate=0.001,
    num_epochs=10,
    seed=1337,
    cuda = True
    # Runtime options omitted for space
)

In [None]:
def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)
        
def compute_accuracy(y_pred, y_target):
    y_target = y_target.cpu()
    y_pred_indices = (torch.sigmoid(y_pred)>0.5).cpu().long()#.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

In [None]:
import torch.optim as optim

def make_train_state(args):
    """
    创建一个用于存储训练状态的字典
    参数:
        args (Namespace): 命令行参数
    返回:
        dict: 包含训练状态的字典
    """
    return {'epoch_index' : 0,  # 当前的训练轮次
            'train_loss' : [],  # 训练集上的损失
            'train_acc' : [],   # 训练集上的准确率
            'val_loss' : [],    # 验证集上的损失
            'val_acc' : [],     # 验证集上的准确率
            'test_loss' : -1,   # 测试集上的损失
            'test_acc' : -1}    # 测试集上的准确率

train_state = make_train_state(args)

# 检查是否可以使用CUDA
if not torch.cuda.is_available():
    args.cuda = False
args.device = torch.device("cuda" if args.cuda else "cpu")

print("Using CUDA: {}".format(args.cuda))

# 处理目录
handle_dirs(args.save_dir)

# 数据集和向量化器
dataset = ReviewDataset.load_dataset_and_make_vectorizer(args.review_csv)
vectorizer = dataset.get_vectorizer()

# 模型
classifier = ReviewClassifier(num_features = len(vectorizer.review_vocab))
classifier = classifier.to(args.device)

# 损失函数和优化器
loss_func = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(classifier.parameters(), lr = args.learning_rate)

In [None]:
for epoch_index in range(args.num_epochs):
    train_state["epoch_indx"] = epoch_index
    
    # 遍历训练数据集
    
    # 设置：批量生成器，将损失和准确率设为0，开启训练模式
    dataset.set_split("train")
    batch_generator = generate_batches(dataset, batch_size=args.batch_size,
                                       device=args.device)
    running_loss = 0.0
    running_acc = 0.0
    classifier.train()
    
    for batch_index, batch_dict in enumerate(batch_generator):
        # 训练过程分为5步
        
        # 第一步：将梯度归零
        optimizer.zero_grad()
        
        # 第二步：计算输出
        y_pred = classifier(x_in=batch_dict['x_data'].float())
        
        # 第三步：计算损失
        loss = loss_func(y_pred, batch_dict["y_target"].float())
        loss_batch = loss.item()
        running_loss += (loss_batch - running_loss) / (batch_index + 1)
        
        # 第四步：使用损失计算梯度
        loss.backward()
        
        # 第五步：使用优化器进行梯度更新
        optimizer.step()

        # 计算准确率
        acc_batch = compute_accuracy(y_pred, batch_dict["y_target"])
        running_acc += (acc_batch - running_acc) / (batch_index + 1)
        
    train_state["train_loss"].append(running_loss)
    train_state["train_acc"].append(running_acc)
    print("Epoch # {0} : Train Loss : {1} Train accuracy : {2}".format(epoch_index, running_loss, running_acc))
    
    # 遍历验证数据集
    # 设置：批量生成器，将损失和准确率设为0，开启评估模式
    dataset.set_split('val')
    batch_generator = generate_batches(dataset,
                                       batch_size=args.batch_size,
                                       device=args.device)
    running_loss = 0.0
    running_acc = 0.0
    classifier.eval()
    
    for batch_index, batch_dict in enumerate(batch_generator):
        # 第一步：计算输出
        y_pred = classifier(x_in=batch_dict["x_data"].float())
        
        # 第二步：计算损失
        loss = loss_func(y_pred, batch_dict['y_target'].float())
        loss_batch = loss.item()
        running_loss += (loss_batch - running_loss) / (batch_index + 1)

        # 第三步：计算准确率
        acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
        running_acc += (acc_batch - running_acc) / (batch_index + 1)
    
    train_state['val_loss'].append(running_loss)
    train_state['val_acc'].append(running_acc)
    print("Epoch # {0} : Val Loss : {1} Val accuracy : {2}".format(epoch_index, running_loss, running_acc))

In [None]:
# 设置数据集为测试集
dataset.set_split('test')
# 生成批量数据
batch_generator = generate_batches(dataset,
                                   batch_size=args.batch_size,
                                   device=args.device)
running_loss = 0.  # 初始化损失
running_acc = 0.   # 初始化准确率
classifier.eval()  # 设置模型为评估模式
for batch_index, batch_dict in enumerate(batch_generator):
    # 计算输出
    y_pred = classifier(x_in=batch_dict['x_data'].float())
    # 计算损失
    loss = loss_func(y_pred, batch_dict['y_target'].float())
    loss_batch = loss.item()
    running_loss += (loss_batch - running_loss) / (batch_index + 1)
    # 计算准确率
    acc_batch = compute_accuracy(y_pred, batch_dict['y_target'])
    running_acc += (acc_batch - running_acc) / (batch_index + 1)
# 保存测试集上的损失和准确率
train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc

In [None]:
print("Test loss: {:.3f}".format(train_state['test_loss']))
print("Test Accuracy: {:.2f}".format(train_state['test_acc']))

In [None]:
def predict_rating(review, classifier, vectorizer, decision_threshold=0.5):
    """
    预测评论的评分
    参数:
        review (str): 评论的文本
        classifier (ReviewClassifier): 训练好的模型
        vectorizer (ReviewVectorizer): 对应的向量化器
        decision_threshold (float): 分隔评分类别的数值边界
    """
    # 预处理评论文本
    review = preprocess_text(review)
    # 将评论文本向量化
    vectorized_review = torch.tensor(vectorizer.vectorize(review))
    # 使用分类器进行预测
    result = classifier(vectorized_review.view(1, -1))
    # 计算概率值
    probability_value = F.sigmoid(result).item()
    index = 1
    # 根据决策阈值判断评分类别
    if probability_value < decision_threshold:
        index = 0
    # 返回评分类别对应的索引
    return vectorizer.rating_vocab.lookup_index(index)

# 测试评论
test_review = "this is a pretty awesome book"
# 预测测试评论的评分
prediction = predict_rating(test_review, classifier, vectorizer)
# 打印预测结果
print("{} -> {}".format(test_review, prediction))

In [None]:
# Sort weights
fc1_weights = classifier.fc1.weight.detach()[0]
_, indices = torch.sort(fc1_weights, dim=0, descending=True)
indices = indices.numpy().tolist()
# Top 20 words
print("Influential words in Positive Reviews:")
for i in range(20):
    print(vectorizer.review_vocab.lookup_index(indices[i]))