<a href="https://colab.research.google.com/github/yunju-1118/EWHA/blob/main/LeNet_mnist_practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision.transforms as transforms
from torchvision.datasets import MNIST, CIFAR10, CIFAR100
from torch.utils.data import DataLoader

import numpy as np

## **MNIST**

In [2]:
## data loader

path = './datasets/'

transform = transforms.Compose([transforms.ToTensor()])

train_data = MNIST(root=path,train=True,transform=transform,download=True)
test_data = MNIST(root=path,train=False,transform=transform,download=True)

batch_size = 100

train_loader = DataLoader(dataset=train_data,batch_size=batch_size,shuffle=True,num_workers=4)
test_loader = DataLoader(dataset=test_data,batch_size=batch_size,shuffle=False,num_workers=4)

input_shape = train_data[0][0].shape
output_shape = len(train_data.classes)

100%|██████████| 9.91M/9.91M [00:00<00:00, 18.6MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 497kB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 4.59MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 5.35MB/s]


In [5]:
## model definition

class LeNet(nn.Module):
    def __init__(self):
        super().__init__()

        #### HERE ####
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, padding=2)
        self.pool1 = nn.AvgPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, stride=1, padding=0)
        self.pool2 = nn.AvgPool2d(kernel_size=2, stride=2, padding=0)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(in_features=16*5*5, out_features=120)
        self.fc2 = nn.Linear(in_features=120, out_features=84)
        self.fc3 = nn.Linear(in_features=84, out_features=output_shape)


    def forward(self,x):
        #### HERE ####
#       print(x.shape)# 100, 1, 28, 28
        hidden = F.leaky_relu(self.conv1(x))
#       print(hidden.shape)# 100, 6, 28, 28
        hidden = self.pool1(hidden)
#       print(hidden.shape)# 100, 6, 14, 14
        hidden = F.leaky_relu(self.conv2(hidden))
#       print(hidden.shape)# 100, 16, 10, 10
        hidden = self.pool2(hidden)
#       print(hidden.shape)# 100, 16, 5, 5
        hidden = self.flatten(hidden)
#       print(hidden.shape)# 100, 400 => clf의 목적에 부합하는 이미지의 vector 표현형
        hidden = F.leaky_relu(self.fc1(hidden))
#       print(hidden.shape)# 100, 120
        hidden = F.leaky_relu(self.fc2(hidden))
#       print(hidden.shape)# 100, 84
        output = self.fc3(hidden)
#       print(output.shape)# 100, 10

        return output

**Conv2d( )**:

in_channels: input, out_channels: output, kernel_size, padding

gray-scale img => input channel => 1

그 외


- pooling layer는 재사용 가능 -> 학습을 하지 않기 때문 같은 이유로 drop out 등도 가능

- convolution layer의 경우, parameter가 모두 같아도 재사용 불가능 -> 학습


In [6]:
if torch.backends.mps.is_available():
    device = torch.device("mps:0")
elif torch.cuda.is_available():
    device = torch.device("cuda:0")
else:
    device = torch.device("cpu")

In [None]:
model = LeNet().to(device)
loss = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-3)

num_epoch = 100
train_loss_list, test_loss_list = list(), list()

for i in range(num_epoch):

    # train
    #### HERE ####
    model.train()

    total_loss = 0
    count = 0

    for batch_idx, (x, y) in enumerate(train_loader):

        #### HERE ####
        x, y = x.to(device), y.to(device)
        y_est = model.forward(x)
        cost = loss(y_est, y)

        # backward computation
        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        pred = torch.argmax(y_est, dim=-1)
        count += (pred == y).sum().item()

    acc = count/len(train_data)
    ave_loss = total_loss/len(train_data)

    train_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("\nEpoch %d Train: %.3f / %.3f"%(i,ave_loss,acc))

    # eval
    #### HERE ####
    model.eval()
    count = 0

    with torch.no_grad():
        for batch_idx, (x, y) in enumerate(test_loader):

            #### HERE ####
            x,y = x.to(device), y.to(device)
            y_est = model.forward(x)
            cost = loss(y_est, y)

            total_loss += cost.item()*len(x)

            pred = torch.argmax(y_est, dim=-1)
            count += (pred==y).sum().item()

        acc = count/len(test_data)
        ave_loss = total_loss/len(test_data)

        test_loss_list.append(ave_loss)

        if i % 1 == 0:
            print("Epoch %d Test: %.3f / %.3f"%(i,ave_loss,acc))


print()
num_parameter = 0
for parameter in model.parameters():        # 각 layer마다 존재하는 parameter 개수의 합
    print(parameter.shape)
    num_parameter += np.prod(parameter.size())
print(num_parameter)

## **CIFAR100**

### **LeNet**

In [3]:
## data loader

path = './datasets/'

transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize(mean=[0.5071, 0.4867, 0.4408],
                                                     std = [0.2675, 0.2565, 0.2761])]) # 각각의 channel에 대

train_data = CIFAR100(root=path,train=True,transform=transform,download=True)
test_data = CIFAR100(root=path,train=False,transform=transform,download=True)

batch_size = 100

train_loader = DataLoader(dataset=train_data,batch_size=batch_size,shuffle=True,num_workers=4)
test_loader = DataLoader(dataset=test_data,batch_size=batch_size,shuffle=False,num_workers=4)

input_shape = train_data[0][0] #.shape # 3*32*32
output_shape = len(train_data.classes)  # 100

100%|██████████| 169M/169M [00:13<00:00, 12.2MB/s]


In [7]:
## model definition

class LeNet(nn.Module):
    def __init__(self):
        super().__init__()

        # classifier와 feature extractor를 나눠서 구현
        self.fe = nn.Sequential(# 100, 3, 32, 32
            nn.Conv2d(in_channels=3, out_channels=9, kernel_size=3, padding=1), # 100, 9, 32, 32
            nn.LeakyReLU(),
            nn.Conv2d(in_channels=9, out_channels=18, kernel_size=5, padding=2), # 100, 18, 32, 32
            nn.LeakyReLU(),
            nn.AvgPool2d(kernel_size=2, stride=2), # 100, 18, 16, 16
            nn.Conv2d(in_channels=18, out_channels=32, kernel_size=4, stride=2, padding=1), # 100, 32, 8, 8
            nn.LeakyReLU(),
            nn.AvgPool2d(kernel_size=2, stride=2, padding=0)
        ) # 100, 32, 4, 4

        self.flatten = nn.Flatten()

        self.fc = nn.Sequential(
            nn.Linear(512,256),
            nn.LeakyReLU(),
            nn.Dropout(),
            nn.Linear(256, output_shape)
        )

    def forward(self,x):
        hidden = self.fe(x)
        hidden = self.flatten(hidden)
        output = self.fc(hidden)

        return output

In [8]:
if torch.backends.mps.is_available():
    device = torch.device("mps:0")
elif torch.cuda.is_available():
    device = torch.device("cuda:0")
else:
    device = torch.device("cpu")

In [9]:
model = LeNet().to(device)
loss = nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-3)

num_epoch = 100
train_loss_list, test_loss_list = list(), list()

for i in range(num_epoch):

    # train
    #### HERE ####
    model.train()

    total_loss = 0
    count = 0

    for batch_idx, (x, y) in enumerate(train_loader):

        #### HERE ####
        x, y = x.to(device), y.to(device)
        y_est = model.forward(x)
        cost = loss(y_est, y)

        # backward computation
        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        pred = torch.argmax(y_est, dim=-1)
        count += (pred == y).sum().item()

    acc = count/len(train_data)
    ave_loss = total_loss/len(train_data)

    train_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("\nEpoch %d Train: %.3f / %.3f"%(i,ave_loss,acc))

    # eval
    #### HERE ####
    model.eval()
    count = 0

    with torch.no_grad():
        for batch_idx, (x, y) in enumerate(test_loader):

            #### HERE ####
            x,y = x.to(device), y.to(device)
            y_est = model.forward(x)
            cost = loss(y_est, y)

            total_loss += cost.item()*len(x)

            pred = torch.argmax(y_est, dim=-1)
            count += (pred==y).sum().item()

        acc = count/len(test_data)
        ave_loss = total_loss/len(test_data)

        test_loss_list.append(ave_loss)

        if i % 1 == 0:
            print("Epoch %d Test: %.3f / %.3f"%(i,ave_loss,acc))


print()
num_parameter = 0
for parameter in model.parameters():        # 각 layer마다 존재하는 parameter 개수의 합
    print(parameter.shape)
    num_parameter += np.prod(parameter.size())
print(num_parameter)


Epoch 0 Train: 0.000 / 0.083
Epoch 0 Test: 3.638 / 0.147

Epoch 1 Train: 0.000 / 0.145
Epoch 1 Test: 3.380 / 0.191

Epoch 2 Train: 0.000 / 0.182
Epoch 2 Test: 3.149 / 0.238

Epoch 3 Train: 0.000 / 0.212
Epoch 3 Test: 3.051 / 0.256

Epoch 4 Train: 0.000 / 0.231
Epoch 4 Test: 2.955 / 0.282

Epoch 5 Train: 0.000 / 0.247
Epoch 5 Test: 2.911 / 0.296

Epoch 6 Train: 0.000 / 0.259
Epoch 6 Test: 2.833 / 0.303

Epoch 7 Train: 0.000 / 0.273
Epoch 7 Test: 2.778 / 0.309

Epoch 8 Train: 0.000 / 0.281
Epoch 8 Test: 2.754 / 0.317

Epoch 9 Train: 0.000 / 0.291
Epoch 9 Test: 2.693 / 0.329

Epoch 10 Train: 0.000 / 0.298
Epoch 10 Test: 2.676 / 0.336

Epoch 11 Train: 0.000 / 0.310
Epoch 11 Test: 2.651 / 0.335

Epoch 12 Train: 0.000 / 0.317
Epoch 12 Test: 2.622 / 0.347

Epoch 13 Train: 0.000 / 0.322
Epoch 13 Test: 2.599 / 0.351

Epoch 14 Train: 0.000 / 0.332
Epoch 14 Test: 2.570 / 0.356

Epoch 15 Train: 0.000 / 0.337
Epoch 15 Test: 2.519 / 0.362

Epoch 16 Train: 0.000 / 0.340
Epoch 16 Test: 2.524 / 0.367


### **Fully-Connected layer**

fully-connected softmax classifier를 구현을 해보시길 바랄게요...

처음부터 flatten해서 적용해보깅

In [18]:
# import library
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision.transforms as transforms
from torchvision.datasets import CIFAR100
from torch.utils.data import DataLoader

import numpy as np

In [19]:
## data loader
path = './datasets/'

transform = transforms.Compose([transforms.ToTensor(),
                              transforms.Normalize(mean=[0.5071, 0.4867, 0.4408],
                                                     std = [0.2675, 0.2565, 0.2761])])

train_data = CIFAR100(root=path,train=True,transform=transform,download=True)
test_data = CIFAR100(root=path,train=False,transform=transform,download=True)

batch_size=100

train_loader = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=False, num_workers=4)

print(train_data)
print(test_data)

Dataset CIFAR100
    Number of datapoints: 50000
    Root location: ./datasets/
    Split: Train
    StandardTransform
Transform: Compose(
               ToTensor()
               Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
           )
Dataset CIFAR100
    Number of datapoints: 10000
    Root location: ./datasets/
    Split: Test
    StandardTransform
Transform: Compose(
               ToTensor()
               Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
           )


In [20]:
input_shape = train_data[0][0].reshape(-1).shape[0]
output_shape = len(train_data.classes)

print(input_shape, output_shape)

3072 100


In [25]:
## Model Definition
class SoftmaxClassifier(nn.Module):
    def __init__(self):
        super().__init__()

        self.layers = nn.Sequential(
            nn.Flatten(), # 3*32*32 = 3072
            nn.Linear(input_shape, 1024), # 100*1024
            nn.LeakyReLU(),
            nn.Linear(1024, 512),
            nn.LeakyReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),
            nn.LeakyReLU(),
            nn.Linear(256,output_shape)
        )

    def forward(self, x):
        return self.layers(x)

In [23]:
if torch.backends.mps.is_available():
    device = torch.device("mps:0")
elif torch.cuda.is_available():
    device = torch.device("cuda:0")
else:
    device = torch.device("cpu")

print(device)

cuda:0


In [26]:
model = SoftmaxClassifier().to(device)
loss = nn.CrossEntropyLoss(reduction="mean")

optimizer = torch.optim.SGD(model.parameters(), lr=1e-3, weight_decay=1e-3)

In [28]:
num_epoch = 100
train_loss_list, test_loss_list = list(), list()

for i in range(num_epoch):
    # train
    model.train()

    total_loss = 0
    count = 0

    for batch_idx, (x,y) in enumerate(train_loader):
        x, y = x.to(device), y.to(device)

        pre_y_est = model.forward(x)
        cost = loss(pre_y_est, y)
        y_est = F.softmax(pre_y_est, dim=-1)

        total_loss += cost.item()*len(x)

        optimizer.zero_grad()
        cost.backward()
        optimizer.step()

        pred = torch.argmax(y_est, dim=-1)
        count += (pred==y).sum().item()

    acc = count/len(train_data)
    ave_loss = total_loss/len(train_data)

    train_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("\nEpoch %d Train: %.3f / %.3f"%(i,ave_loss, acc))

    # test
    model.eval()

    total_loss = 0
    count = 0

    with torch.no_grad():
        for batch_idx, (x,y) in enumerate(test_loader):
            x, y = x.to(device), y.to(device)

            pre_y_est = model.forward(x)
            cost = loss(pre_y_est, y)
            y_est = F.softmax(pre_y_est, dim=-1)

            total_loss += cost.item()*len(x)

            pred = torch.argmax(y_est, dim=-1)
            count += (pred==y).sum().item()

        acc = count/len(test_data)
        ave_loss = total_loss/len(test_data)

        test_loss_list.append(ave_loss)

        if i% 1 == 0:
            print("Epoch %d Test: %.3f / %.3f"%(i,ave_loss,acc))


Epoch 0 Train: 4.600 / 0.013
Epoch 0 Test: 4.596 / 0.020

Epoch 1 Train: 4.596 / 0.015
Epoch 1 Test: 4.591 / 0.028

Epoch 2 Train: 4.591 / 0.020
Epoch 2 Test: 4.586 / 0.033

Epoch 3 Train: 4.585 / 0.023
Epoch 3 Test: 4.580 / 0.037

Epoch 4 Train: 4.579 / 0.026
Epoch 4 Test: 4.573 / 0.042

Epoch 5 Train: 4.572 / 0.031
Epoch 5 Test: 4.565 / 0.044

Epoch 6 Train: 4.564 / 0.035
Epoch 6 Test: 4.556 / 0.045

Epoch 7 Train: 4.555 / 0.037
Epoch 7 Test: 4.545 / 0.046

Epoch 8 Train: 4.543 / 0.040
Epoch 8 Test: 4.531 / 0.044

Epoch 9 Train: 4.530 / 0.041
Epoch 9 Test: 4.515 / 0.045

Epoch 10 Train: 4.513 / 0.041
Epoch 10 Test: 4.497 / 0.045

Epoch 11 Train: 4.495 / 0.042
Epoch 11 Test: 4.477 / 0.045

Epoch 12 Train: 4.474 / 0.044
Epoch 12 Test: 4.454 / 0.045

Epoch 13 Train: 4.451 / 0.046
Epoch 13 Test: 4.428 / 0.048

Epoch 14 Train: 4.427 / 0.046
Epoch 14 Test: 4.400 / 0.051

Epoch 15 Train: 4.399 / 0.050
Epoch 15 Test: 4.370 / 0.054

Epoch 16 Train: 4.368 / 0.053
Epoch 16 Test: 4.338 / 0.058


In [29]:
num_parameter = 0
for parameter in model.parameters():
    print(parameter.shape)
    num_parameter += np.prod(parameter.size())
print(num_parameter)

torch.Size([1024, 3072])
torch.Size([1024])
torch.Size([512, 1024])
torch.Size([512])
torch.Size([256, 512])
torch.Size([256])
torch.Size([100, 256])
torch.Size([100])
3828580


=> parameter의 개수는 더럽게 많지만 성능은 LeNet에 비해 아주 안 좋음.