# 第5回講義 宿題

## 課題

今Lessonで学んだことに工夫を加えて，CNNでより高精度なCIFAR10の分類器を実装してみましょう．精度上位者はリーダーボードに載ります．

### 目標値

Accuracy 78%

### ルール

- 訓練データはx_train， t_train，テストデータはx_testで与えられます．
- 予測ラベルは one_hot表現ではなく0~9のクラスラベル で表してください．
- **下のセルで指定されているx_train，t_train以外の学習データは使わないでください．**
- 今回から基本的にAPI制限はありません．
- ただしCNNベースでないモデル（Vision Transformerなど）やtorchvision等の既存モデル，学習済みモデルは用いないでください．

### 提出方法

- 2つのファイルを提出していただきます．
  - テストデータ (x_test) に対する予測ラベルをcsvファイル (ファイル名: submission_pred.csv) で提出してください．
  - それに対応するpythonのコードをsubmission_code.pyとして提出してください (%%writefileコマンドなどを利用してください)．

### 評価方法

- 予測ラベルのt_testに対する精度 (Accuracy) で評価します．
- 提出後即時採点を行い，Leader Boardが更新されます．
- 締切後の点数を最終的な評価とします．

In [None]:
# ドライブのマウント
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### データの読み込み

- この部分は修正しないでください

In [None]:
import random

import numpy as np
import pandas as pd
import torch
from torchvision import transforms
from tqdm import tqdm_notebook as tqdm
from PIL import Image
from sklearn.model_selection import train_test_split
"""
#学習データ
x_train = np.load('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/data/x_train.npy')
t_train = np.load('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/data/t_train.npy')
    
#テストデータ
x_test = np.load('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/data/x_test.npy')
"""

#学習データ
x_train = np.load('drive/MyDrive/Colab Notebooks/DeepLearning/Lecture05/data/x_train.npy')
t_train = np.load('drive/MyDrive/Colab Notebooks/DeepLearning/Lecture05/data/t_train.npy')
    
#テストデータ
x_test = np.load('drive/MyDrive/Colab Notebooks/DeepLearning/Lecture05/data/x_test.npy')
class train_dataset(torch.utils.data.Dataset):
    def __init__(self, x_train, t_train):
        data = x_train.astype('float32')
        self.x_train = []
        for i in range(data.shape[0]):
            self.x_train.append(Image.fromarray(np.uint8(data[i])))
        self.t_train = t_train
        self.transform = transforms.ToTensor()

    def __len__(self):
        return len(self.x_train)

    def __getitem__(self, idx):
        return self.transform(self.x_train[idx]), torch.tensor(t_train[idx], dtype=torch.long)

class test_dataset(torch.utils.data.Dataset):
    def __init__(self, x_test):
        data = x_test.astype('float32')
        self.x_test = []
        for i in range(data.shape[0]):
            self.x_test.append(Image.fromarray(np.uint8(data[i])))
        self.transform = transforms.ToTensor()

    def __len__(self):
        return len(self.x_test)

    def __getitem__(self, idx):
        return self.transform(self.x_test[idx])

trainval_data = train_dataset(x_train, t_train)
test_data = test_dataset(x_test)

### 畳み込みニューラルネットワーク(CNN)の実装

In [None]:
def fix_seed(seed=1234):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)


fix_seed(seed=42)


class gcn():
    def __init__(self):
        pass

    def __call__(self, x):
        mean = torch.mean(x)
        std = torch.std(x)
        return (x - mean)/(std + 10**(-6))  # 0除算を防ぐ


class ZCAWhitening():
    def __init__(self, epsilon=1e-4, device="cuda"):  # 計算が重いのでGPUを用いる
        self.epsilon = epsilon
        self.device = device

    def fit(self, images):  # 変換行列と平均をデータから計算
        x = images[0][0].reshape(1, -1)
        self.mean = torch.zeros([1, x.size()[1]]).to(self.device)
        con_matrix = torch.zeros([x.size()[1], x.size()[1]]).to(self.device)
        for i in range(len(images)):  # 各データについての平均を取る
            x = images[i][0].reshape(1, -1).to(self.device)
            self.mean += x / len(images)
            con_matrix += torch.mm(x.t(), x) / len(images)
            if i % 10000 == 0:
                print("{0}/{1}".format(i, len(images)))
        self.E, self.V = torch.linalg.eigh(con_matrix)  # 固有値分解
        self.E = torch.max(self.E, torch.zeros_like(self.E)) # 誤差の影響で負になるのを防ぐ
        self.ZCA_matrix = torch.mm(torch.mm(self.V, torch.diag((self.E.squeeze()+self.epsilon)**(-0.5))), self.V.t())
        print("completed!")

    def __call__(self, x):
        size = x.size()
        x = x.reshape(1, -1).to(self.device)
        x -= self.mean
        x = torch.mm(x, self.ZCA_matrix.t())
        x = x.reshape(tuple(size))
        x = x.to("cpu")
        return x


# (datasetのクラスを自作したので，このあたりの処理が少し変わっています)

zca = ZCAWhitening()
zca.fit(trainval_data)

val_size = 3000
train_data, val_data = torch.utils.data.random_split(trainval_data, [len(trainval_data)-val_size, val_size])  # 訓練データと検証データに分割


# 前処理を定義
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.3),
    transforms.RandomCrop(32, padding=(4, 4, 4, 4), padding_mode='constant'),
    transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5),
    transforms.ToTensor(),
    zca])

transform = transforms.Compose([
    transforms.ToTensor(),
    zca])

# データセットに前処理を設定
trainval_data.transform = transform_train
test_data.transform = transform

batch_size = 64

dataloader_train = torch.utils.data.DataLoader(
    train_data,
    batch_size=batch_size,
    shuffle=True
)

dataloader_valid = torch.utils.data.DataLoader(
    val_data,
    batch_size=batch_size,
    shuffle=True
)

dataloader_test = torch.utils.data.DataLoader(
    test_data,
    batch_size=batch_size,
    shuffle=False
)


0/50000
10000/50000
20000/50000
30000/50000
40000/50000
completed!


In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F

rng = np.random.RandomState(1234)
random_state = 42
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


conv_net =nn.Sequential(
    nn.Conv2d(3, 32, 3),              # 32x32x3 -> 30x30x32
    nn.BatchNorm2d(32),
    nn.ReLU(),
    nn.AvgPool2d(2),                  # 30x30x32 -> 15x15x32
    nn.Conv2d(32, 64, 3),             # 15x15x32 -> 13x13x64
    nn.BatchNorm2d(64),
    nn.ReLU(),
    nn.AvgPool2d(2),                  # 13x13x64 -> 6x6x64
    nn.Conv2d(64, 128, 3),            # 6x6x64 -> 4x4x128
    nn.BatchNorm2d(128),
    nn.ReLU(),
    nn.AvgPool2d(2),                  # 4x4x128 -> 2x2x128
    nn.Flatten(),
    nn.Linear(2*2*128, 256),
    nn.ReLU(),
    nn.Linear(256, 10)
)


def init_weights(m):  # Heの初期化
    if type(m) == nn.Linear or type(m) == nn.Conv2d:
        torch.nn.init.kaiming_normal_(m.weight)
        m.bias.data.fill_(0.0)


conv_net.apply(init_weights)


n_epochs = 25
lr = 0.01
device = 'cuda'

conv_net.to(device)
optimizer = optim.Adam(conv_net.parameters(), lr=lr)
loss_function = nn.CrossEntropyLoss() 

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F

rng = np.random.RandomState(1234)
random_state = 42
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class ResBlock(nn.Module):

    def __init__(
        self,
        in_channels,
        channels,
        kernel=3,
        stride=1
    ):
        super().__init__()
        self.conv1 =  nn.Conv2d(in_channels, channels, kernel, padding=1)
        self.bn1 = nn.BatchNorm2d(channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(channels, channels, kernel, padding=1)
        self.bn2 = nn.BatchNorm2d(channels)


    def forward(self, x):
        identity= x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity

        out = self.relu(out)

        return out

conv_net =nn.Sequential(
    nn.Conv2d(3,16,1,padding=1), # 32x32x3 -> 32x32x16

    ResBlock(16,16),
    ResBlock(16,16),
    ResBlock(16,16),
    ResBlock(16,16),
    ResBlock(16,16),
    ResBlock(16,16),
    ResBlock(16,16),
    ResBlock(16,16),
    ResBlock(16,16),
    nn.Conv2d(16, 32,1,stride=2),  # 32x32x16 -> 16x16x32

    ResBlock(32,32),
    ResBlock(32,32),
    ResBlock(32,32),
    ResBlock(32,32),
    ResBlock(32,32),
    ResBlock(32,32),
    ResBlock(32,32),
    ResBlock(32,32),
    ResBlock(32,32),
    nn.Conv2d(32,64,1,stride=2),  # 16x16x32 -> 8x8x64

    ResBlock(64,64),
    ResBlock(64,64),
    ResBlock(64,64),
    ResBlock(64,64),
    ResBlock(64,64),
    ResBlock(64,64),
    ResBlock(64,64),
    ResBlock(64,64),
    ResBlock(64,64),

    nn.AdaptiveAvgPool2d((1,1)),
    nn.Flatten(),
    nn.Linear(64, 10),
    #nn.ReLU(),
    #nn.Linear(32, 10)
)


def init_weights(m):  # Heの初期化
    if type(m) == nn.Linear or type(m) == nn.Conv2d:
        torch.nn.init.kaiming_normal_(m.weight)
        m.bias.data.fill_(0.0)


conv_net.apply(init_weights)


n_epochs = 180
lr = 0.01
device = 'cuda'

conv_net.to(device)
optimizer = optim.Adam(conv_net.parameters(), lr=lr)
loss_function = nn.CrossEntropyLoss() 

In [None]:
for epoch in range(n_epochs):
    losses_train = []
    losses_valid = []

    conv_net.train()
    n_train = 0
    acc_train = 0
    for x, t in dataloader_train:
        n_train += t.size()[0]

        conv_net.zero_grad() 

        x = x.to(device)
        t = t.to(device)

        y = conv_net.forward(x)

        loss = loss_function(y, t)

        loss.backward()

        optimizer.step()

        pred = y.argmax(1)

        acc_train += (pred == t).float().sum().item()
        losses_train.append(loss.tolist())

    conv_net.eval()
    n_val = 0
    acc_val = 0
    for x, t in dataloader_valid:
        n_val += t.size()[0]

        x = x.to(device) 
        t = t.to(device)

        y = conv_net.forward(x)

        loss = loss_function(y, t)

        pred = y.argmax(1)

        acc_val += (pred == t).float().sum().item()
        losses_valid.append(loss.tolist())


    print('EPOCH: {}, Train [Loss: {:.3f}, Accuracy: {:.3f}], Valid [Loss: {:.3f}, Accuracy: {:.3f}]'.format(
        epoch,
        np.mean(losses_train),
        acc_train/n_train,
        np.mean(losses_valid),
        acc_val/n_val
    ))

EPOCH: 0, Train [Loss: 1.893, Accuracy: 0.318], Valid [Loss: 2.023, Accuracy: 0.359]
EPOCH: 1, Train [Loss: 1.300, Accuracy: 0.533], Valid [Loss: 1.249, Accuracy: 0.551]
EPOCH: 2, Train [Loss: 1.105, Accuracy: 0.607], Valid [Loss: 1.033, Accuracy: 0.634]
EPOCH: 3, Train [Loss: 0.991, Accuracy: 0.649], Valid [Loss: 0.934, Accuracy: 0.666]
EPOCH: 4, Train [Loss: 0.894, Accuracy: 0.686], Valid [Loss: 1.013, Accuracy: 0.651]
EPOCH: 5, Train [Loss: 0.815, Accuracy: 0.714], Valid [Loss: 0.922, Accuracy: 0.675]
EPOCH: 6, Train [Loss: 0.740, Accuracy: 0.742], Valid [Loss: 0.794, Accuracy: 0.731]
EPOCH: 7, Train [Loss: 0.685, Accuracy: 0.762], Valid [Loss: 0.695, Accuracy: 0.758]
EPOCH: 8, Train [Loss: 0.638, Accuracy: 0.778], Valid [Loss: 0.663, Accuracy: 0.786]
EPOCH: 9, Train [Loss: 0.602, Accuracy: 0.793], Valid [Loss: 0.691, Accuracy: 0.765]
EPOCH: 10, Train [Loss: 0.571, Accuracy: 0.802], Valid [Loss: 0.631, Accuracy: 0.790]
EPOCH: 11, Train [Loss: 0.538, Accuracy: 0.815], Valid [Loss: 1.

In [None]:
conv_net.eval()

t_pred = []
for x in dataloader_test:

    x = x.to(device)

    # 順伝播
    y = conv_net.forward(x)

    # モデルの出力を予測値のスカラーに変換
    pred = y.argmax(1).tolist()

    t_pred.extend(pred)

submission = pd.Series(t_pred, name='label')
submission.to_csv('drive/MyDrive/Colab Notebooks/DeepLearning/Lecture05/submission_pred.csv', header=True, index_label='id')
#submission.to_csv('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/submission_pred.csv', header=True, index_label='id')