# Preparation

### Load the MINIST dataset

In [109]:
import cupy as np
import numpy as np2
import gc
from struct import unpack
import gzip
import matplotlib.pyplot as plt
import random
from PIL import Image, ImageEnhance
from typing import Tuple

def read(filepath, show=False):
    with gzip.open(filepath, 'rb') as f:
        if show:
            magic, num, rows, cols = unpack('>4I', f.read(16))
            print('magic\t\t', magic)
            print('num\t\t', num)
            print('rows\t\t', rows)
            print('cols\t\t', cols)
        content=np.frombuffer(f.read(), dtype=np.uint8)
    return content

print('Train*****************************')
train_imgs = read(r'dataset\MNIST\train-images-idx3-ubyte.gz', show=True).reshape(-1, 28, 28)
train_labels = read(r'dataset\MNIST\train-labels-idx1-ubyte.gz')

print('Test******************************')
test_imgs = read(r'dataset\MNIST\t10k-images-idx3-ubyte.gz', show=True).reshape(-1, 28, 28)
test_labels = read(r'dataset/MNIST/t10k-labels-idx1-ubyte.gz')

# X_train, y_train = mnist_reader.load_mnist('fashion_mnist/data/fashion', kind='train')
# X_test, y_test = mnist_reader.load_mnist('fashion_mnist/data/fashion', kind='t10k')


Train*****************************
magic		 2051
num		 60000
rows		 28
cols		 28
Test******************************
magic		 2051
num		 10000
rows		 28
cols		 28


In [110]:
train_labels = train_labels[-60000:]
test_labels = test_labels[-10000:]
print('train images shape\t', train_imgs.shape)
print('train labels shape\t', train_labels.shape)
print('test images shape\t', test_imgs.shape)
print('test labels shape\t', test_labels.shape)
print(train_labels[0:10])

train images shape	 (60000, 28, 28)
train labels shape	 (60000,)
test images shape	 (10000, 28, 28)
test labels shape	 (10000,)
[5 0 4 1 9 2 1 3 1 4]


### Import package MyDL which is written based on Numpy

In [111]:
import MyDL
import MyDL.data
import MyDL.optimizer as optim
import MyDL.nn as nn

# MLP model

In [112]:
class mnist_dataset(MyDL.data.Dataset):
    def __init__(self, images:MyDL.MyTensor, labels, augment=False, augment_prob=0.5, unfold=False):
        self.images = images
        self.labels = labels
        self.augment = augment
        self.augment_prob = augment_prob
        self.unfold = unfold

    def __len__(self):
        return len(self.images)

    def __getitem__(self, index):
        images = self.images[index]
        labels = self.labels[index]

        # Data Augmentation
        if self.augment:
            images = np.asnumpy(images.data)
            assert images.dtype == np.uint8, "Image dtype must be uint8 for augmentation."
            hflip_prob = 0.5
            rotate_degrees = 15
            brightness_range = (0.8, 1.2)

            n, c, h, w = images.shape
            augmented_images_list = []
            original_dtype = images.dtype

            for i in range(n):
                do_augentation = random.random() < self.augment_prob
                if not do_augentation:
                    augmented_images_list.append(images[i])
                    continue

                single_image_chw = images[i]  #(c, h, w)

                img_np_hw = single_image_chw[0]  # (h, w)

                pil_mode = 'L'

                img_pil = Image.fromarray(img_np_hw, mode=pil_mode)

                # --- Augmentation ---
                # Flipping
                if random.random() < hflip_prob:
                    img_pil = img_pil.transpose(Image.FLIP_LEFT_RIGHT)
                # Rotating
                if rotate_degrees > 0:
                    angle = random.uniform(-rotate_degrees, rotate_degrees)
                    # 使用 BICUBIC 插值获得更好的旋转质量
                    fill_value = (0,) * c
                    img_pil = img_pil.rotate(angle, resample=Image.BICUBIC, expand=False, fillcolor=fill_value)
                # Brightness Adjustment
                if brightness_range != (1.0, 1.0):
                    factor = random.uniform(brightness_range[0], brightness_range[1])
                    enhancer = ImageEnhance.Brightness(img_pil)
                    img_pil = enhancer.enhance(factor)
                # Contrast Adjustment
                if random.random() < 0.3:
                    contrast_factor = random.uniform(0.8, 1.2)
                    enhancer = ImageEnhance.Contrast(img_pil)
                    img_pil = enhancer.enhance(contrast_factor)
                # Color Adjustment
                if random.random() < 0.3:
                    color_factor = random.uniform(0.8, 1.2)
                    enhancer = ImageEnhance.Color(img_pil)
                    img_pil = enhancer.enhance(color_factor)

                augmented_img_np_hwc = np.expand_dims(np.array(img_pil), axis=0)  # (1, h, w)
                if augmented_img_np_hwc.shape != (h, w, c):  # shape should not change if everything is correct
                    augmented_images_list.append(single_image_chw)
                    continue
                augmented_img_np_chw = np.transpose(augmented_img_np_hwc, (2, 0, 1))  # (c, h, w)
                augmented_images_list.append(augmented_img_np_chw)

            # images = np.stack(augmented_images_list, axis=0)
            if self.unfold:
                images = images.reshape(-1, 28 * 28)
            images = MyDL.MyTensor(np.array(images), requires_grad=False)
        
        else:
            images = images.data
            if self.unfold:
                images = images.reshape(-1, 28 * 28)
            images = MyDL.MyTensor(np.array(images), requires_grad=False)

        return images, labels

### Data preparation

In [113]:
train_imgs = train_imgs.reshape(-1, 1, 28, 28)
test_imgs = test_imgs.reshape(-1, 1, 28, 28)

X_train_mytensor = MyDL.MyTensor(train_imgs[:50000], requires_grad=False)
y_train_mytensor = MyDL.MyTensor(train_labels[:50000], requires_grad=False)
X_val_mytensor = MyDL.MyTensor(train_imgs[50000:], requires_grad=False)
y_val_mytensor = MyDL.MyTensor(train_labels[50000:], requires_grad=False)
X_test_mytensor = MyDL.MyTensor(test_imgs, requires_grad=False)
y_test_mytensor = MyDL.MyTensor(test_labels, requires_grad=False)

train_data = mnist_dataset(X_train_mytensor, y_train_mytensor, augment=True, augment_prob=0.5, unfold=True)
val_data = mnist_dataset(X_val_mytensor, y_val_mytensor)
test_data = mnist_dataset(X_test_mytensor, y_test_mytensor)

### Network Structure

In [114]:
class MLP3(nn.NeuralNetwork):
    def __init__(self, hidden_size1=100, hidden_size2=10, activation='relu'):
        super().__init__()
        self.hidden_size1 = hidden_size1
        self.hidden_size2 = hidden_size2
        self.activ_func = activation
        self.fc1 = nn.Linear(784, hidden_size1, initialize='random')
        self.dropout1 = nn.Dropout(0.5)
        self.fc2 = nn.Linear(hidden_size1, hidden_size2, initialize='random')
        self.dropout2 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(hidden_size2, 10, initialize='random')
        if activation == 'relu':
            self.activation = nn.ReLU()
        elif activation == 'tanh':
            self.activation = nn.Tanh()
        else:
            raise ValueError('Unknown activation function')
        self.softmax = nn.Softmax()
        self.BN1 = nn.BatchNorm1d(784)
        self.BN2 = nn.BatchNorm1d(hidden_size1)
        self.BN3 = nn.BatchNorm1d(hidden_size2)
    def forward(self, x):
        x = self.BN1(x)
        x = self.fc1(x)
        x = self.BN2(x)
        x = self.activation(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.BN3(x)
        x = self.activation(x)
        x = self.dropout2(x)
        x = self.fc3(x)
        x = self.softmax(x)
        return x

### Train model and Search the best hyperparameters

In [None]:
continue_if_exists = False
highest_val_acc = 0
num_epochs = 30
for activ_func in ['relu']:#, 'tanh']:
    for hidden_size1, hidden_size2 in [(600, 100)]:#, (10, 10)]:
        for lambda_L2 in [0.0]:#, 0.0001, 0.001]:
            for lr in [0.05]:#, 0.01, 0.001]:
                model = None
                gc.collect()

                model_name = 'MLP3_({},{})_{}_L2-{}_lr-{}_augment={}'.format(hidden_size1, hidden_size2, activ_func, lambda_L2, lr, train_data.augment)
                print(f'model: {model_name}')

                model = MLP3(hidden_size1=hidden_size1, hidden_size2=hidden_size2, activation=activ_func)
                criterion = nn.CrossEntropyLoss()
                optimizer = optim.Adam(model.params, lr=lr, decay_rate=0.2)

                result = MyDL.train(model, criterion, optimizer, train_data, val_data, num_epochs=num_epochs, batch_size=256, lambda_L2=lambda_L2, result_path='figure/results', model_path='figure/model_params', model_name=model_name, continue_if_exists=continue_if_exists, calc_val_loss_every_iteration=False)

                MyDL.save_result(model_name=model_name, result_dict=result, path='figure/results')
                if result['val_acc_epoch'][-1] > highest_val_acc:
                    highest_val_acc = result['val_acc_epoch'][-1]
                    best_model_name = model_name
                    best_hyperparams = (hidden_size1, hidden_size2, activ_func, lambda_L2, lr)

model: MLP3_(600,100)_relu_L2-0.0_lr-0.05_augment=True
Model already exists. Loading model from figure/model_params/MLP3_(600,100)_relu_L2-0.0_lr-0.05.npz...
Model loaded successfully.
Model is not going to be trained further as continue_if_exists is set to False.

Results saved to figure/results\MLP3_(600,100)_relu_L2-0.0_lr-0.05_augment=True.npz.


### Display the best model

In [116]:
import os
print(f'Best model: {best_model_name}')
with np.load(os.path.join('figure/results', f'{best_model_name}.npz')) as result:
    train_loss = result['train_loss_epoch']
    val_loss = result['val_loss_epoch']
    train_acc = result['train_acc_epoch']
    val_acc = result['val_acc_epoch']
print(f'Train loss: {train_loss[-1]:.3}  Val loss: {val_loss[-1]:.3}  Train acc: {train_acc[-1]:.3}  Val acc: {val_acc[-1]:.3}')
print(f'Hyperparameters: {best_hyperparams}')

Best model: MLP3_(600,100)_relu_L2-0.0_lr-0.05_augment=True
Train loss: 0.105  Val loss: 0.108  Train acc: 0.966  Val acc: 0.975
Hyperparameters: (600, 100, 'relu', 0.0, 0.05)


# ResNet

### Data Preparation

In [117]:
X_train_mytensor = MyDL.MyTensor(train_imgs[:50000], requires_grad=False)
y_train_mytensor = MyDL.MyTensor(train_labels[:50000], requires_grad=False)
X_val_mytensor = MyDL.MyTensor(train_imgs[50000:], requires_grad=False)
y_val_mytensor = MyDL.MyTensor(train_labels[50000:], requires_grad=False)
X_test_mytensor = MyDL.MyTensor(test_imgs, requires_grad=False)
y_test_mytensor = MyDL.MyTensor(test_labels, requires_grad=False)

train_data = mnist_dataset(X_train_mytensor, y_train_mytensor, augment=True, augment_prob=0.5, unfold=False)
val_data = mnist_dataset(X_val_mytensor, y_val_mytensor)
test_data = mnist_dataset(X_test_mytensor, y_test_mytensor)

### Network Structure

In [118]:
# Define the residual block
class ResiduleBlock(nn.NeuralNetwork):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2D(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2D(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # When channel number not match or stride != 1, use 1x1 conv before jump connect
        self.cross_block = False
        if stride != 1 or in_channels != out_channels:
            self.cross_block = True
            self.conv_shortcut = nn.Conv2D(in_channels, out_channels, kernel_size=1,
                                           padding=0, stride=stride, bias=False)
            self.bn_shortcut = nn.BatchNorm2d(out_channels)
    
    def forward(self, x):
        out = nn.ReLU.forward(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        if self.cross_block:
            x = self.conv_shortcut(x)
            x = self.bn_shortcut(x)
        out = out + x  # 残差连接
        out = nn.ReLU.forward(out)
        return out
    
class ResNetMNIST(nn.NeuralNetwork):
    """
    input: (batch, 1, 28, 28)
    ↓
    Conv 3x3, 16 channels + BN + ReLU
    ↓
    ResiduleBlockx2 (channel = 16, stride=1)
    ↓
    ResiduleBlockx2 (channel = 32, stride=2)
    ↓
    ResiduleBlockx2 (channel = 64, stride=2)
    ↓
    Average Polling (len 64 vector)
    ↓
    FC layer (64 -> 10)
    ↓
    out: (10)
    """
    def __init__(self, block, num_classes=10):
        super(ResNetMNIST, self).__init__()
        self.in_channels = 16

        self.conv = nn.Conv2D(1, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(block, 16, 2, stride=1)
        self.layer2 = self._make_layer(block, 32, 2, stride=2)
        self.layer3 = self._make_layer(block, 64, 2, stride=2)
        self.avg_pool = nn.FullAveragePool2d()
        # self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(64, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride):
        strides = [stride] + [1] * (blocks - 1)
        layers = []
        for s in strides:
            layers.append(block(self.in_channels, out_channels, s))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = nn.ReLU.forward(self.bn(self.conv(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.avg_pool(out)  # (batch, c)
        # out = self.dropout(out)  # (batch, c)
        out = self.fc(out)
        out = nn.Softmax.forward(out)
        return out

### Train model and Search the best hyperparameters

In [119]:
continue_if_exists = False
highest_val_acc = 0
num_epochs = 20
for lambda_L2 in [0.0]:#, 0.0001, 0.001]:
    for lr in [0.05]:#, 0.01, 0.001]:
        model = None
        gc.collect()
        
        model_name = 'Resnet_L2-{}_lr-{}'.format(lambda_L2, lr)
        print(f'model: {model_name}')
        model = ResNetMNIST(ResiduleBlock)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.params, lr=lr, decay_rate=0.2)
        result = MyDL.train(model, criterion, optimizer, 
                            train_data, val_data, num_epochs=num_epochs, 
                            batch_size=128, lambda_L2=lambda_L2, 
                            result_path='figure/results', 
                            model_path='figure/model_params', 
                            model_name=model_name, 
                            continue_if_exists=continue_if_exists, calc_val_loss_every_iteration=False)
        if not (not continue_if_exists and result['continued_train']):
            MyDL.save_result(model_name=model_name, result_dict=result, path='figure/results')
        if result['val_acc_epoch'][-1] > highest_val_acc:
            highest_val_acc = result['val_acc_epoch'][-1]
            best_model_name = model_name
            best_hyperparams = (lambda_L2, lr)

model: Resnet_L2-0.0_lr-0.05
iter 50	 loss 2.426
iter 100	 loss 2.275
iter 150	 loss 2.746
iter 200	 loss 2.119
iter 250	 loss 1.208
iter 300	 loss 0.964
iter 350	 loss 0.757
Epoch 1/20. Training Loss:   2.043 	 Accuracy: 0.447
            Validation Loss: 0.712 	 Accuracy: 0.788


KeyboardInterrupt: 