# 第5回講義 宿題

## 課題

今Lessonで学んだことに工夫を加えて，CNNでより高精度なCIFAR10の分類器を実装してみましょう．

### 目標値

Accuracy 76%

### ルール


- 訓練データは`x_train`， `t_train`，テストデータは`x_test`で与えられます．
- 予測ラベルは one_hot表現ではなく0~9のクラスラベル で表してください．
- **下のセルで指定されている`x_train`，`t_train`以外の学習データは使わないでください．**
- Pytorchを利用して構いません．
- 今回から基本的にAPI制限はありません．
- ただしCNNベースでないモデル（Vision Transformerなど）やtorchvision等の既存モデル，学習済みモデルは用いないでください．

### 提出方法
- 2つのファイルを提出していただきます．
    1. テストデータ (`x_test`) に対する予測ラベルをcsv形式で保存し，**Omnicampusの宿題タブから「第5回 畳み込みニューラルネットワーク」を選択して**提出してください．
    2. それに対応するpythonのコードを　ファイル＞ダウンロード＞.pyをダウンロード　から保存し，**Omnicampusの宿題タブから「第5回 畳み込みニューラルネットワーク (code)」を選択して**提出してください．pythonファイル自体の提出ではなく，「提出内容」の部分にコード全体をコピー&ペーストしてください．
      
- なお，採点は1で行い，2はコードの確認用として利用します（成績優秀者はコード内容を公開させていただくかもしれません）．コードの内容を変更した場合は，**1と2の両方を提出し直してください**．

### 評価方法

- 予測ラベルの`t_test`に対する精度 (Accuracy) で評価します．
- 即時採点しLeader Boardを更新します（採点スケジュールは別アナウンス）．
- 締切時の点数を最終的な評価とします．

In [None]:
# ドライブのマウント
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# 作業ディレクトリを指定
work_dir = 'drive/MyDrive/Colab Notebooks' # ご自身の環境に合わせて修正してください

### データの読み込み（この部分は修正しないでください）

In [None]:
import random

import numpy as np
import pandas as pd
import torch
from torchvision import transforms
from tqdm import tqdm_notebook as tqdm
from PIL import Image
from sklearn.model_selection import train_test_split

#学習データ
x_train = np.load(work_dir +  '/第5回/data/x_train.npy')
t_train = np.load(work_dir +  '/第5回/data/_train.npy')

#テストデータ
x_test = np.load(work_dir +  '/第5回/data/x_test.npy')

class train_dataset(torch.utils.data.Dataset):
    def __init__(self, x_train, t_train):
        data = x_train.astype('float32')
        self.x_train = []
        for i in range(data.shape[0]):
            self.x_train.append(Image.fromarray(np.uint8(data[i])))
        self.t_train = t_train
        self.transform = transforms.ToTensor()

    def __len__(self):
        return len(self.x_train)

    def __getitem__(self, idx):
        return self.transform(self.x_train[idx]), torch.tensor(t_train[idx], dtype=torch.long)

class test_dataset(torch.utils.data.Dataset):
    def __init__(self, x_test):
        data = x_test.astype('float32')
        self.x_test = []
        for i in range(data.shape[0]):
            self.x_test.append(Image.fromarray(np.uint8(data[i])))
        self.transform = transforms.ToTensor()

    def __len__(self):
        return len(self.x_test)

    def __getitem__(self, idx):
        return self.transform(self.x_test[idx])

trainval_data = train_dataset(x_train, t_train)
test_data = test_dataset(x_test)

### 畳み込みニューラルネットワーク(CNN)の実装

In [None]:
def fix_seed(seed=1234):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)


fix_seed(seed=42)


class gcn():
    def __init__(self):
        pass

    def __call__(self, x):
        mean = torch.mean(x)
        std = torch.std(x)
        return (x - mean)/(std + 10**(-6))  # 0除算を防ぐ


class ZCAWhitening():
    def __init__(self, epsilon=1e-4, device="cuda"):  # 計算が重いのでGPUを用いる
        self.epsilon = epsilon
        self.device = device

    def fit(self, images):  # 変換行列と平均をデータから計算
        x = images[0][0].reshape(1, -1)
        self.mean = torch.zeros([1, x.size()[1]]).to(self.device)
        con_matrix = torch.zeros([x.size()[1], x.size()[1]]).to(self.device)
        for i in range(len(images)):  # 各データについての平均を取る
            x = images[i][0].reshape(1, -1).to(self.device)
            self.mean += x / len(images)
            con_matrix += torch.mm(x.t(), x) / len(images)
            if i % 10000 == 0:
                print("{0}/{1}".format(i, len(images)))
        con_matrix -= torch.mm(self.mean.t(), self.mean)
        self.E, self.V = torch.linalg.eigh(con_matrix)  # 固有値分解
        self.E = torch.max(self.E, torch.zeros_like(self.E)) # 誤差の影響で負になるのを防ぐ
        self.ZCA_matrix = torch.mm(torch.mm(self.V, torch.diag((self.E.squeeze()+self.epsilon)**(-0.5))), self.V.t())
        print("completed!")

    def __call__(self, x):
        size = x.size()
        x = x.reshape(1, -1).to(self.device)
        x -= self.mean
        x = torch.mm(x, self.ZCA_matrix.t())
        x = x.reshape(tuple(size))
        x = x.to("cpu")
        return x


# (datasetのクラスを自作したので，このあたりの処理が少し変わっています)

zca = ZCAWhitening()
zca.fit(trainval_data)

val_size = 3000
train_data, val_data = torch.utils.data.random_split(trainval_data, [len(trainval_data)-val_size, val_size])  # 訓練データと検証データに分割


# 前処理を定義
# CNN: データ拡張 (Data Augmentation) と正規化 (Normalization) の定義
# 訓練データには、過学習を防ぐためにランダムな変換（水平反転、クロップ）を適用
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # データ拡張: ランダムに水平反転
    transforms.RandomCrop(32, padding=4, padding_mode='reflect'),  # データ拡張: 4ピクセルパディング後、ランダムに32x32をクロップ
    transforms.ToTensor(),  # PIL Image を Tensor (C, H, W) に変換 [0, 255] -> [0.0, 1.0]
    zca  # ZCA Whitening (前処理) を適用
])

# 検証・テストデータにはデータ拡張を行わず、ToTensorとZCAのみ適用
transform = transforms.Compose([
    transforms.ToTensor(),  # PIL Image を Tensor (C, H, W) に変換
    zca  # ZCA Whitening (前処理) を適用
])

# データセットに前処理を設定
trainval_data.transform = transform_train
test_data.transform = transform

batch_size = 64

# CNN: データローダーの定義 (バッチ処理、シャッフル)
dataloader_train = torch.utils.data.DataLoader(
    train_data,
    batch_size=batch_size,
    shuffle=True
)

dataloader_valid = torch.utils.data.DataLoader(
    val_data,
    batch_size=batch_size,
    shuffle=False
)

dataloader_test = torch.utils.data.DataLoader(
    test_data,
    batch_size=batch_size,
    shuffle=False
)

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F

rng = np.random.RandomState(1234)
random_state = 42
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# CNN: モデルアーキテクチャの定義
class Cifar10CNN(nn.Module):
    def __init__(self, num_classes=10):
        super(Cifar10CNN, self).__init__()
        
        # --- CNN: 特徴抽出ブロック 1 ---
        # (Input: 3x32x32, Output: 64x32x32)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64) # バッチ正規化
        # (Input: 64x32x32, Output: 64x32x32)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        # (Input: 64x32x32, Output: 64x16x16)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) # プーリング層
        self.dropout1 = nn.Dropout2d(p=0.25) # Dropout (過学習防止)

        # --- CNN: 特徴抽出ブロック 2 ---
        # (Input: 64x16x16, Output: 128x16x16)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        # (Input: 128x16x16, Output: 128x16x16)
        self.conv4 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(128)
        # (Input: 128x16x16, Output: 128x8x8)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.dropout2 = nn.Dropout2d(p=0.25)

        # --- CNN: 分類器 (全結合層) ---
        # 特徴抽出ブロックの出力サイズ: 128チャンネル * 8ピクセル * 8ピクセル
        self.fc_input_features = 128 * 8 * 8
        
        # (Input: 128*8*8, Output: 512)
        self.fc1 = nn.Linear(self.fc_input_features, 512)
        self.bn_fc1 = nn.BatchNorm1d(512) # 1Dのバッチ正規化
        self.dropout_fc = nn.Dropout(p=0.5) # Dropout
        
        # (Input: 512, Output: 10)
        self.fc2 = nn.Linear(512, num_classes) # 出力層 (10クラス)

    def forward(self, x):
        # CNN: 順伝播 (Forward) の定義
        
        # --- ブロック 1 --- (畳み込み -> 正規化 -> 活性化) x 2 -> プーリング -> Dropout
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool1(x)
        x = self.dropout1(x)

        # --- ブロック 2 --- (畳み込み -> 正規化 -> 活性化) x 2 -> プーリング -> Dropout
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.pool2(x)
        x = self.dropout2(x)

        # --- 分類器 ---
        # CNN: 平坦化 (Flatten) - (N, C, H, W) を (N, C*H*W) に変形
        x = x.view(-1, self.fc_input_features) 
        
        # (全結合 -> 正規化 -> 活性化 -> Dropout)
        x = F.relu(self.bn_fc1(self.fc1(x)))
        x = self.dropout_fc(x)
        
        # (出力層)
        x = self.fc2(x)
        
        return x

conv_net = Cifar10CNN().to(device)


def init_weights(m):  # Heの初期化
    if type(m) == nn.Linear or type(m) == nn.Conv2d:
        torch.nn.init.kaiming_normal_(m.weight)
        m.bias.data.fill_(0.0)


conv_net.apply(init_weights)


n_epochs = 5
lr = 0.01
device = 'cuda'

conv_net.to(device)
# CNN: 最適化手法の定義 (Adam: 学習率 lr)
optimizer = optim.Adam(conv_net.parameters(), lr=lr)

# CNN: 損失関数の定義 (多クラス分類のためのクロスエントロピー)
loss_function = nn.CrossEntropyLoss()


In [None]:
for epoch in range(n_epochs):
    losses_train = []
    losses_valid = []

    # CNN: モデルを訓練 (Train) モードに設定 (Dropout, BatchNormを有効化)
    conv_net.train()
    n_train = 0
    acc_train = 0
    for x, t in dataloader_train:
        # --- 訓練フェーズ --- 
        n_train += len(t) # 処理したデータ数を加算

        # データをGPU/CPUに転送
        x = x.to(device)
        t = t.to(device)

        # CNN: 勾配の初期化 (前回のバッチの勾配をリセット)
        optimizer.zero_grad()

        # CNN: 順伝播 (Forward) - モデルで予測
        y = conv_net.forward(x)

        # CNN: 損失 (Loss) の計算 (予測 y と正解 t の誤差)
        loss = loss_function(y, t)

        # CNN: 逆伝播 (Backward) - 損失に基づいて勾配を計算
        loss.backward()

        # CNN: パラメータの更新 - 勾配に基づいて重みを更新
        optimizer.step()

        # 予測結果の取得 (最も確率の高いクラス)
        pred = y.argmax(1)
        # --- 訓練フェーズ終了 --- 

        acc_train += (pred == t).float().sum().item()
        losses_train.append(loss.tolist())

    # CNN: モデルを評価 (Evaluation) モードに設定 (Dropout, BatchNormを無効化)
    conv_net.eval()
    n_val = 0
    acc_val = 0
    for x, t in dataloader_valid:
        # --- 検証フェーズ --- 
        n_val += len(t) # 処理したデータ数を加算

        # データをGPU/CPUに転送
        x = x.to(device)
        t = t.to(device)

        # CNN: 順伝播 (Forward)
        # torch.no_grad() ブロックで勾配計算を無効にし、メモリ効率化・高速化
        with torch.no_grad():
            y = conv_net.forward(x)

        # CNN: 損失 (Loss) の計算
        loss = loss_function(y, t)

        # 予測結果の取得
        pred = y.argmax(1)
        # --- 検証フェーズ終了 --- 

        acc_val += (pred == t).float().sum().item()
        losses_valid.append(loss.tolist())

    print('EPOCH: {}, Train [Loss: {:.3f}, Accuracy: {:.3f}], Valid [Loss: {:.3f}, Accuracy: {:.3f}]'.format(
        epoch,
        np.mean(losses_train),
        acc_train/n_train,
        np.mean(losses_valid),
        acc_val/n_val
    ))


In [None]:
conv_net.eval()

t_pred = []
for x in dataloader_test:

    x = x.to(device)

    # CNN: 順伝播 (Forward) - テストデータで予測
    # 勾配計算は不要
    with torch.no_grad():
        y = conv_net.forward(x)

    # モデルの出力を予測値のスカラーに変換
    pred = y.argmax(1).tolist()

    t_pred.extend(pred)

submission = pd.Series(t_pred, name='label')
submission.to_csv(work_dir + '/第5回/submission_pred_05.csv', header=True, index_label='id')