# MNIST & ResNet18 implementation

Notice: I add the activation function to the subsequent fc layer as the custom classifier.

<center><img src="https://www.researchgate.net/profile/Sajid-Iqbal-13/publication/336642248/figure/fig1/AS:839151377203201@1577080687133/Original-ResNet-18-Architecture.png" width="75%" height="75%"/></center>

In [1]:
import numpy as np
import torch
from torch import nn
import torchvision.models as models
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader
# change the max. print value
np.set_printoptions(threshold=1000000, suppress=True)

In [2]:
# 固定亂數種子
torch.manual_seed(1)
torch.cuda.manual_seed_all(1)

In [3]:
# 數據預處理
transform = transforms.Compose([
#     transforms.RandomResizedCrop(224),# 對圖像進行隨機裁減
#     transforms.RandomRotation(20),# 對圖像進行隨機旋轉
#     transforms.RandomHorizontalFlip(p=0.5),# 對圖像進行隨機水平翻轉
#     transforms.Grayscale(num_output_channels=1),
    transforms.Resize((224,224)),
    transforms.ToTensor()# 變成tensor格式
#     transforms.Normalize(
#         mean=[0.485, 0.456, 0.406],
#         std=[0.229, 0.224, 0.225])
]) # 數據增強

# 讀取數據
root = "./data/custom/MNIST"
train_data = datasets.ImageFolder(root + "/train",transform)
test_data = datasets.ImageFolder(root + "/test",transform)

In [4]:
batch_size = 256
train_dataloader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True)
feature_dataloader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=False)

In [5]:
classes = train_data.classes
classes_index = train_data.class_to_idx
print(classes)
print(classes_index)

['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
{'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9}


In [6]:
# 測試CUDA能否使用
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


In [7]:
# 轉換成CUDA能讀的格式
# generate data in parallel with PyTorch
for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([256, 3, 224, 224])
Shape of y: torch.Size([256]) torch.int64


In [8]:
# 定義類神經網路模型
class RN18_custom_classifier(nn.Module):
    def __init__(self):
        super(RN18_custom_classifier, self).__init__()
        self.model = models.resnet18(pretrained=False)
        self.model.fc = nn.Sequential() #將分類層置空，下面將改變我們的分類層
        self.model.fc = nn.Sequential(
            nn.Linear(512,120),
            nn.Sigmoid(),
            nn.Linear(120,84),
            nn.Sigmoid(),
            nn.Linear(84,10)
        )
    def forward(self, x):
        out = self.model(x)
        return out
    
model = RN18_custom_classifier().to(device)
model

RN18_custom_classifier(
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=Tr

In [9]:
# 定義類神經網路模型
class FeatureExtractor(nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        self.model = models.resnet18(pretrained=False)
        self.model.fc = nn.Sequential() #將分類層置空，下面將改變我們的分類層
        self.model.fc = nn.Sequential(
            nn.Linear(512,120),
        )
    def forward(self, x):
        out = self.model(x)
        return out
    
new_model = FeatureExtractor().to(device)
new_model

FeatureExtractor(
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, tr

In [10]:
features = []
step = 64
size = len(feature_dataloader.dataset)
model.train()
for batch, (X, y) in enumerate(feature_dataloader):
    X = X.to(device)# 讀圖片跟labels
    # Systematic Sampling
    sX = X[:len(X)-1:step]
    model.eval() # 測試模式，不做反向傳播
    with torch.no_grad(): # 不需要計算梯度
     # Compute prediction error
        feature = new_model(sX)
    if batch % 100 == 0: # 顯示進度條
        current = batch * len(X)
        print(f"Feature Extractor: [{current:>5d}/{size:>5d}]")   
    # Convert to NumPy Array, Reshape it, and save it to features variabl
    features.append(feature.cpu().detach().numpy())    

Feature Extractor: [    0/60000]
Feature Extractor: [25600/60000]
Feature Extractor: [51200/60000]


In [11]:
# Convert to NumPy Array
# 由於資料是不規則numpy因此會報錯，這裡因美觀我把它關掉
np.warnings.filterwarnings('ignore', category = np.VisibleDeprecationWarning)
featuresdata = np.array(features)

In [12]:
# 因為資料不平整，要用此方法去處理成一維numpy
import itertools
featureslist =list(itertools.chain(*featuresdata))

In [13]:
# 驗證輸出的資料量是否正確
# print(f"batch_size/step: {batch_size/step}",f"\n")
# print(f"batch[0]: ",len(featuresdata[0]))
print(f"Theoretical Value: {len(featuresdata[0]) * (len(featuresdata)-1) + len(featuresdata[234])}" )
print(f"Actual value: {len(featureslist)}")

Theoretical Value: 938
Actual value: 938


In [14]:
print("Theoretical out_features: 120")
print(f"Actual out_features: {len(featureslist[0])}")

Theoretical out_features: 120
Actual out_features: 120


In [15]:
#...Rnnking 

In [16]:
#0.001 => 0.00001?
lr = 1e-3
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [17]:
# 設定 epochs 數
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 2.366558  [    0/60000]
loss: 0.715047  [25600/60000]
loss: 0.189875  [51200/60000]
Test Error: 
 Accuracy: 67.8%, Avg loss: 1.300368 

Epoch 2
-------------------------------
loss: 0.125726  [    0/60000]
loss: 0.144813  [25600/60000]
loss: 0.051598  [51200/60000]
Test Error: 
 Accuracy: 98.7%, Avg loss: 0.063729 

Epoch 3
-------------------------------
loss: 0.038285  [    0/60000]
loss: 0.036592  [25600/60000]
loss: 0.041391  [51200/60000]
Test Error: 
 Accuracy: 98.3%, Avg loss: 0.066752 

Epoch 4
-------------------------------
loss: 0.055919  [    0/60000]
loss: 0.089673  [25600/60000]
loss: 0.042698  [51200/60000]
Test Error: 
 Accuracy: 99.2%, Avg loss: 0.037649 

Epoch 5
-------------------------------
loss: 0.018915  [    0/60000]
loss: 0.011399  [25600/60000]
loss: 0.018717  [51200/60000]
Test Error: 
 Accuracy: 98.4%, Avg loss: 0.051917 

Epoch 6
-------------------------------
loss: 0.007658  [    0/60000]
loss: 0.046763  [256

In [18]:
# ith torch.no_grad():
#     # 取得一張測試圖片
#     index = 256 # 圖片索引編號
#     image, true_target = mnist_test[index] # 取得圖片與標註
#     image = torch.unsqueeze(image, 0) # 多增加一個維度，轉為 batch 形式
#     image = image.to(device)
#     prediction = model(image)
#     predicted_class = prediction.argmax()
#     np_image = image[0].cpu().numpy().transpose((1, 2, 0))
#     plt.imshow(np_image, cmap='gray')
#     plt.title(f'Predicted: {predicted_class} / True Target: {true_target}')
#     plt.show()