In [None]:
import struct

import numpy as np 
import pandas as pd
import torch
#import torchvision 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F 

from torch.utils.data import Dataset, DataLoader


### 处理mnist数据集

In [None]:
def process_mnist_ubyte(dataset_path):
    content = open(dataset_path, "rb").read()
    dtype = content[2]
    dimensions = content[3]
    shape = []
    for dim_idx in range(dimensions):
        chunk = content[4*(1+dim_idx): 4*(2+dim_idx)]
        shape.append(struct.unpack('>I', chunk)[0])
        
    data = np.array(list(content[(dim_idx+2)*4:]))
    data = data.reshape(*shape)

    return data
        

In [None]:
# 测试读取test数据文件
dataset_path = "../dataset/mnist/t10k-images-idx3-ubyte"
dataset = process_mnist_ubyte(dataset_path)
print("test datasets shape is: {}".format(dataset.shape))

# 测试读取test标签文件
dataset_path = "../dataset/mnist/t10k-labels-idx1-ubyte"
dataset = process_mnist_ubyte(dataset_path)
print("test labels shape is: {}".format(dataset.shape))

### 定义MnistDataSet数据集类

In [None]:
class Mnist(Dataset):
    def __init__(self):
        super(Mnist, self).__init__()
        train_data_path = "../dataset/mnist/train-images-idx3-ubyte"
        train_label_path = "../dataset/mnist/train-labels-idx1-ubyte"
        # 因为minist每一个像素都是无符号8位数据， 所以使用ByteTensor存储
        # / 255.0把数据压缩到 （0-1）
        train_data = process_mnist_ubyte(train_data_path) / 255.0
        train_data = np.expand_dims(train_data, 1)
        self.train_data = torch.FloatTensor(train_data)
        self.train_label = torch.LongTensor(process_mnist_ubyte(train_label_path))
        
    def __len__(self):
        return self.train_data.shape[0]
    
    def __getitem__(self, idx):
        return self.train_data[idx], self.train_label[idx]

### 定义Lenet5网络模型

In [None]:
class Lenet5(nn.Module):
    def __init__(self):
        super(Lenet5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=3)
        self.pool = nn.AvgPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=3)
        self.fc = nn.Linear(1936, 10)
        
    def forward(self, x):
        # 1x28x28 -> 6x26X26
        x = self.conv1(x)
        # 6x26x26 -> 6*13*13
        x = self.pool(x)
        x = F.relu(x)
        # 6x13x13 -> 16x11x11
        x = self.conv2(x)        
        x = x.view(-1, 1936)
        x = self.fc(x)
        
        return x 
        

In [None]:
net = Lenet5()
dataset = Mnist()
criterion = nn.CrossEntropyLoss()
optim = optim.SGD(params=net.parameters(), lr=0.01)

In [None]:
epochs = 100
data = DataLoader(dataset, batch_size=256, shuffle=True)

for epoch in range(1, epochs+1):
    for batch, mini_data in enumerate(data):
        x, y = mini_data
        output = net(x)
        optim.zero_grad()
        loss = criterion(output, y)
        loss.backward()
        optim.step()
    if epoch % 2 == 0:
        print("epoch {}, loss: {}".format(epoch, loss))

### 测试模型性能

In [None]:
dataset_path = "../dataset/mnist/t10k-images-idx3-ubyte"
test_data = process_mnist_ubyte(dataset_path) / 255.0
test_data = np.expand_dims(test_data, 1)
test_data = torch.FloatTensor(test_data)
print("test datasets shape is: {}".format(test_data.shape))

# 测试读取test标签文件
dataset_path = "../dataset/mnist/t10k-labels-idx1-ubyte"
test_label = process_mnist_ubyte(dataset_path)
#test_label = F.one_hot(test_label, 10)
print("test labels shape is: {}".format(test_label.shape)) 

In [None]:
y_predict = net(test_data)
values, indices = torch.max(y_predict, dim=1)
indices = indices.numpy()

In [None]:
correct_ratio = np.sum(indices == test_label) / float(len(test_label))
correct_ratio