In [None]:
!pip install torch==1.11.0 torchvision==0.12.0 tensorboard==2.11.0 tensorboardX matplotlib torchtext torchdata

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import torchtext
import torchdata
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer

# 卷积神经网络

In [None]:
class MyCNN(nn.Module):
    def __init__(self):
        super(MyCNN, self).__init__()
        #TODO

    def forward(self, x):
        #TODO


In [None]:
# 数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)), 
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# 加载数据集
train_dataset = torchvision.datasets.OxfordIIITPet(root='./data', split='trainval', target_types='category',  download=True, transform=transform)
test_dataset = torchvision.datasets.OxfordIIITPet(root='./data', split='test', target_types='category', download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
# 初始化模型、损失函数和优化器
model = MyCNN()
criterion = # TODO
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
# 训练模型
num_epochs = 5
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if (i+1) % 10 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, i+1, len(train_loader), running_loss / 100))
            running_loss = 0.0

In [None]:
# 在测试集上评估模型
model.eval()

confusion_matrix = np.zeros((len(test_dataset.classes), len(test_dataset.classes)))

correct = 0
total = 0


with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        #TODO: 填写confusion matrix

accuracy = correct / total
print('Accuracy on the test set: {:.2f}%'.format(100 * accuracy))

# 绘制混淆矩阵
plt.figure(figsize=(10, 8))
plt.imshow(confusion_matrix, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(test_dataset.classes))
plt.xticks(tick_marks, test_dataset.classes, rotation=45)
plt.yticks(tick_marks, test_dataset.classes)
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()



# 循环神经网络

In [None]:
train_iter = iter(IMDB(split="train"))
next(train_iter)

In [None]:
next(train_iter)

In [None]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

tokenizer = get_tokenizer("basic_english")

train_iter = IMDB(split='train')
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)


vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])

In [None]:
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: int(x=='pos')

In [None]:
label_pipeline('neg')

In [None]:
import torchtext.functional as F

def collate_batch(batch):
    
    label_list, text_list = [], []
    for _label, _text in batch:
        label_list.append(label_pipeline(_label))
        processed_text = text_pipeline(_text)
        text_list.append(processed_text)
        
    label_list = torch.tensor(label_list, dtype=torch.int64)
    
    text_list = #TODO: padding，确保同一batch内sequence长度一致
    
    return label_list, text_list


In [None]:
from torchtext.data.functional import to_map_style_dataset

train_iter, test_iter = IMDB()
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)

train_dataloader = DataLoader(
    train_dataset, batch_size=64, shuffle=True, collate_fn=collate_batch
)
test_dataloader = DataLoader(
    test_dataset, batch_size=64, shuffle=False, collate_fn=collate_batch
)

In [None]:
class MyRNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        #TODO 定义神经网络参数
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.rnn = ...
        ...
        
    def forward(self, text):
        # TODO
        # 定义神经网络前向计算

In [None]:
#TODO
model = MyRNN(...)
optimizer = optim.SGD(model.parameters(), lr=1e-3)
criterion = #TODO


In [None]:
# 训练函数
def train(model, dataloader):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 50
    

    for idx, (label, text) in enumerate(dataloader):
        optimizer.zero_grad()
        # print(text.shape)
        predicted_label = model(text)
        # print(predicted_label.shape, label.shape)
        loss = criterion(predicted_label, label.float())
        loss.backward()
        optimizer.step()
        total_acc += (torch.round(predicted_label)== label).sum().item()
        total_count += label.size(0)
        if idx % log_interval == 0 and idx > 0:
            print(
                "| epoch {:3d} | {:5d}/{:5d} batches "
                "| accuracy {:8.3f}".format(
                    epoch, idx, len(dataloader), total_acc / total_count
                )
            )
            total_acc, total_count = 0, 0

# 评估函数
def evaluate(model, dataloader):
    model.eval()
    total_acc, total_count = 0, 0

    with torch.no_grad():
        for idx, (label, text) in enumerate(dataloader):
            predicted_label = model(text)
            loss = criterion(predicted_label, label)
            total_acc += (torch.round(predicted_label) == label).sum().item()
            total_count += label.size(0)
    return total_acc / total_count

In [None]:
num_epochs = 5
for epoch in range(5):
    train(model, train_dataloader)

In [None]:
#TODO
evaluate(...)