In [22]:
import torch
torch.__version__

'2.0.1+cu118'

In [76]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# 定义数据预处理和增强
def get_transform():
    return transforms.Compose([
        transforms.Resize((256, 256)),  # 保持MNIST原始尺寸
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5,), std=(0.5,))  # MNIST是灰度图像，所以只有一个通道
    ])

# 定义数据加载函数
def load_data(batch_size):
    # 获取数据预处理和增强
    transform = get_transform()

    # 加载MNIST数据集的训练集和测试集
    train_dataset = datasets.MNIST(
        root='./data',
        train=True,
        transform=transform,
        download=True  # 直接下载数据集
    )
    test_dataset = datasets.MNIST(
        root='./data',
        train=False,
        transform=transform,
        download=True  # 直接下载数据集
    )

    # 创建训练和测试数据加载器
    train_data_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, num_workers=0
    )
    test_data_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False, num_workers=0
    )

    return train_data_loader, test_data_loader

# 设置批量大小
batch_size = 50

# 加载数据
train_data_loader, test_data_loader = load_data(batch_size)  # 传递batch_size参数

In [None]:
import copy
import os
import random

import numpy as np
import torch
import torch.nn as nn
from torch import optim
from tqdm import tqdm

import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def setup_seed(seed):
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True

setup_seed(20)

class cnn(nn.Module):
    def __init__(self):
        super(cnn, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        # 计算全连接层的输入尺寸
        # 输入尺寸: 28x28
        # 第一次卷积和池化后: 14x14
        # 第二次卷积和池化后: 7x7
        # 第三次卷积和池化后: 4x4
        # 全连接层输入尺寸: 4x4x64 = 1024
        self.fc1 = nn.Linear(1024, 64)
        self.fc2 = nn.Linear(64, 10)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        self.out = nn.Linear(10, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = x.view(x.shape[0], -1)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.softmax(self.out(x))
        x = F.log_softmax(x, dim=1)
        return x

def get_val_loss(model, Val):
    model.eval()
    criterion = nn.CrossEntropyLoss().to(device)
    val_loss = []
    for (data, target) in Val:
        data, target = data.to(device), target.long().to(device)
        output = model(data)
        loss = criterion(output, target)
        val_loss.append(loss.cpu().item())
    return np.mean(val_loss)

def load_data(batch_size):
    transform = transforms.Compose([
        transforms.Resize((256, 256)),
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5,), std=(0.5,))
    ])

    train_dataset = datasets.MNIST(
        root='./data',
        train=True,
        transform=transform,
        download=True
    )
    test_dataset = datasets.MNIST(
        root='./data',
        train=False,
        transform=transform,
        download=True
    )

    train_data_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True, num_workers=0
    )
    test_data_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False, num_workers=0
    )

    return train_data_loader, test_data_loader

def train():
    writer = SummaryWriter("log/")
    batch_size = 50
    train_data_loader, val_data_loader = load_data(batch_size)
    print('train...')
    epoch_num = 30
    best_model = None
    min_epochs = 5
    min_val_loss = float('inf')
    model = cnn().to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.0008)
    criterion = nn.CrossEntropyLoss().to(device)
    for epoch in tqdm(range(epoch_num), ascii=True):
        train_loss = []
        for batch_idx, (data, target) in enumerate(train_data_loader):
            data, target = data.to(device), target.long().to(device)
            model.train()
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.cpu().item())
        # validation
        val_loss = get_val_loss(model, val_data_loader)
        writer.add_scalar("val_loss", val_loss, epoch)
        if epoch + 1 > min_epochs and val_loss < min_val_loss:
            min_val_loss = val_loss
            best_model = copy.deepcopy(model)
        tqdm.write('Epoch {:03d} train_loss {:.5f} val_loss {:.5f}'.format(epoch, np.mean(train_loss), val_loss))
    torch.save(best_model.state_dict(), "model/cnn.pkl")

def test():
    batch_size = 50
    _, test_data_loader = load_data(batch_size)
    model = cnn().to(device)
    model.load_state_dict(torch.load("model/cnn.pkl"), False)
    total = 0
    correct = 0
    model.eval()
    for (data, target) in test_data_loader:
        data, target = data.to(device), target.to(device)
        outputs = model(data)
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
    print('Accuracy:%d%%' % (100 * correct / total))

if __name__ == "__main__":
    train()
    test()

train...


  3%|3         | 1/30 [00:41<19:53, 41.16s/it]

Epoch 000 train_loss 1.66440 val_loss 1.58910


  7%|6         | 2/30 [01:24<19:52, 42.61s/it]

Epoch 001 train_loss 1.57834 val_loss 1.59800


In [82]:
train()
test()

ValueError: not enough values to unpack (expected 3, got 2)

In [93]:
def test_model(model_path, batch_size=50):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = cnn(in_channels=1).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    predictions = []
    labels = []
    with torch.no_grad():
        for data, target in load_data(batch_size):
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs, 1)
            predictions.extend(predicted.cpu().numpy())
            labels.extend(target.cpu().numpy())

    # 计算准确率和F1分数
    accuracy = accuracy_score(labels, predictions)
    f1 = f1_score(labels, predictions, average='macro')
    print(f'Accuracy: {accuracy:.2f}%')
    print(f'F1 Score: {f1:.2f}')

    # 可视化
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.bar(range(10), np.bincount(labels, minlength=10))
    plt.title('True Distribution')
    plt.subplot(1, 2, 2)
    plt.bar(range(10), np.bincount(predictions, minlength=10))
    plt.title('Predicted Distribution')
    plt.show()

if __name__ == "__main__":
    test_model("model/cnn.pkl")

TypeError: __init__() got an unexpected keyword argument 'in_channels'

In [102]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import numpy as np

# 定义CNN模型结构
class cnn(nn.Module):
    def __init__(self):
        super(cnn, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        # 计算全连接层的输入尺寸
        # 输入尺寸: 28x28
        # 第一次卷积和池化后: 14x14
        # 第二次卷积和池化后: 7x7
        # 第三次卷积和池化后: 4x4
        # 全连接层输入尺寸: 4x4x64 = 1024
        self.fc1 = nn.Linear(1024, 64)
        self.fc2 = nn.Linear(64, 10)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        self.out = nn.Linear(10, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = x.view(x.shape[0], -1)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.softmax(self.out(x))
        x = F.log_softmax(x, dim=1)
        return x

# 加载数据
def load_data(batch_size):
    transform = transforms.Compose([
        transforms.Resize((28, 28)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    return test_loader

# 测试模型性能
def test_model(model_path, batch_size=50):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = cnn().to(device)  # 使用您的模型类名
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()

    predictions = []
    labels = []
    with torch.no_grad():
        for data, target in load_data(batch_size):
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs, 1)
            predictions.extend(predicted.cpu().numpy())
            labels.extend(target.cpu().numpy())

    # 计算性能指标
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average='macro')
    recall = recall_score(labels, predictions, average='macro')
    f1 = f1_score(labels, predictions, average='macro')

    print(f'Accuracy: {accuracy:.2f}')
    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'F1 Score: {f1:.2f}')

    # 可视化结果
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.bar(range(10), np.bincount(labels, minlength=10))
    plt.title('True Distribution')
    plt.subplot(1, 3, 2)
    plt.bar(range(10), np.bincount(predictions, minlength=10))
    plt.title('Predicted Distribution')
    plt.subplot(1, 3, 3)
    cm = confusion_matrix(labels, predictions)
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion Matrix')
    plt.colorbar()
    plt.xticks(range(10))
    plt.yticks(range(10))
    plt.show()

# 调用测试函数
if __name__ == "__main__":
    test_model("model/cnn.pkl")

FileNotFoundError: [Errno 2] No such file or directory: 'model/cnn.pkl'

In [94]:
    accuracy = accuracy_score(labels, predictions)
    f1 = f1_score(labels, predictions, average='macro')
    print(f'Accuracy: {accuracy:.2f}%')
    print(f'F1 Score: {f1:.2f}')

NameError: name 'accuracy_score' is not defined

NameError: name 'labels' is not defined

In [None]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import matplotlib.pyplot as plt
import numpy as np

# 确保CNN类定义与训练时相同
class CNN(nn.Module):
    # ...（省略模型定义，使用您之前定义的cnn类）

# 加载数据
def load_data(batch_size):
    transform = transforms.Compose([
        transforms.Resize((28, 28)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    return test_loader

# 测试模型性能
def test_model(model_path, batch_size=50):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = CNN().to(device)  # 实例化模型
    model.load_state_dict(torch.load(model_path, map_location=device))  # 加载模型权重
    model.eval()  # 设置为评估模式

    predictions = []
    labels = []
    with torch.no_grad():  # 在测试阶段不计算梯度
        for data, target in load_data(batch_size):
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs, 1)
            predictions.extend(predicted.cpu().numpy())
            labels.extend(target.cpu().numpy())

    # 计算性能指标
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average='macro')
    recall = recall_score(labels, predictions, average='macro')
    f1 = f1_score(labels, predictions, average='macro')

    print(f'Accuracy: {accuracy:.2f}')
    print(f'Precision: {precision:.2f}')
    print(f'Recall: {recall:.2f}')
    print(f'F1 Score: {f1:.2f}')

    # 可视化结果
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.bar(range(10), np.bincount(labels, minlength=10))
    plt.title('True Distribution')
    plt.subplot(1, 3, 2)
    plt.bar(range(10), np.bincount(predictions, minlength=10))
    plt.title('Predicted Distribution')
    plt.subplot(1, 3, 3)
    plt.bar(range(10), np.bincount(np.array(predictions) - np.array(labels), minlength=10))
    plt.title('Confusion Matrix')
    plt.show()

# 调用测试函数
if __name__ == "__main__":
    test_model("model/cnn.pkl")