In [51]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.cuda import device
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from xarray import Dataset

In [52]:
# 读取数据 csv->ndarray->tensor
train_df = pd.read_csv('mnist_data/mnist_train.csv')
test_df = pd.read_csv('mnist_data/mnist_test.csv')
x_ndarray_train = train_df.drop('label', axis=1).to_numpy()
x_ndarray_test = test_df.drop('label', axis=1).to_numpy()
y_ndarray_train = train_df['label'].to_numpy()
y_ndarray_test = test_df['label'].to_numpy()

x_tensor_train = torch.tensor(x_ndarray_train, dtype=torch.float)
x_tensor_train = x_tensor_train.view(-1, 1, 28, 28)
x_tensor_test = torch.tensor(x_ndarray_test, dtype=torch.float)
x_tensor_test = x_tensor_test.view(-1, 1, 28, 28)
y_tensor_train = torch.tensor(y_ndarray_train, dtype=torch.long)
y_tensor_test = torch.tensor(y_ndarray_test, dtype=torch.long)

In [53]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(
            #in_channels:灰度图 out_channels：输出多少特征图 kernel_size:卷积核大小 padding：填充 stride：步长
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, padding=1, stride=1), #输出特征(16,28,28)
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2), # 最大池化层操作，输出（16,14,14）
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1, stride=1),  # 输出（32,14,14）
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2), # 输出（32,7,7）
        )
        self.out = nn.Linear(32 * 7 * 7, 10) # 全连接层得到结果
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        # 卷积层后，获得32*7*7
        x = x.view(x.size(0), -1)   # flatten操作，输出（batch_size,32*7*7)
        output = self.out(x)
        return output

In [54]:
def accuracy(predictions, labels):
    pred = torch.max(predictions, 1)[1]
    # 预测正确的样本数量
    rights = pred.eq(labels.data.view_as(pred)).sum()
    return rights,len(labels)

In [76]:
from torch.utils.data import TensorDataset, DataLoader

tensorDataset_train = TensorDataset(x_tensor_train, y_tensor_train)
tensorDataset_test = TensorDataset(x_tensor_test, y_tensor_test)
print(len(tensorDataset_train),len(tensorDataset_test)) 

batch_size = 64
dataLoader_train = DataLoader(tensorDataset_train, batch_size=batch_size, shuffle=True)
dataLoader_test = DataLoader(tensorDataset_test, batch_size=batch_size)
print(len(dataLoader_train),len(dataLoader_test))
print(len(dataLoader_train.dataset))

60000 10000
938 157
60000


In [78]:

num_classes = 10
num_epochs = 3

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
net = CNN().to(device)
#交叉熵损失函数
criterion = nn.CrossEntropyLoss()
#优化器
optimizer = optim.Adam(net.parameters(), lr=0.001)

for epoch in range(num_epochs):
    train_rights = []
    print(f'第{epoch+1}轮训练开始，请等待...')
    for batch, (data, target) in enumerate(dataLoader_train):
        data, target = data.to(device), target.to(device)
        net.train()
        output = net(data)
        loss = criterion(output, target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        right = accuracy(output, target)
        right = (right[0].cpu(), right[1])
        train_rights.append(right)
        
        if (batch % 100 == 0 or batch+1 == len(dataLoader_train)):
            test_rights = []
            with torch.no_grad():
                net.eval()  
                for (data,target) in dataLoader_test:
                    data, target = data.to(device), target.to(device)
                    output = net(data)
                    right = accuracy(output, target)
                    right = (right[0].cpu(), right[1])
                    test_rights.append(right)
                #准确率计算
                train_rate = (sum(tup[0] for tup in train_rights), sum(tup[1] for tup in train_rights))
                test_rate = (sum(tup[0] for tup in test_rights), sum(tup[1] for tup in test_rights))
            
            print('--当前epoch:{} [{}/{} ({:.0f}%)]\t损失:{:.6f}\t训练准确率:({:.2f})%\t测试集正确率:{:.2f}%'.format(
                    epoch+1, batch+1, len(dataLoader_train),
                    100. * (batch+1) / len(dataLoader_train),
                    loss.data,
                    100. * train_rate[0].numpy() / train_rate[1],
                    100. * test_rate[0].numpy() / test_rate[1]))

print("训练完成,Done!")
                

第1轮训练开始，请等待...
第2轮训练开始，请等待...
第3轮训练开始，请等待...
训练完成,Done!
