# Co-enhance session 4, 5: 学習

## 環境設定 (Google drive内のファイルへのアクセス)

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
import os

# チーム番号 (必ず自分のチームにする！)
os.environ['TEAM'] = ''
print('you are team {}'.format(os.environ['TEAM']))

## 学習データをモデルが読み込める状態にする  (データ拡張，ミニバッチ化)

In [0]:
root_dir = "./gdrive/Team Drives/coenhance_teams/team{}/content".format(os.getenv('TEAM'))

In [0]:
# 深層学習ライブラリpytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

from torchvision import transforms
from torchvision.datasets import ImageFolder

# 可視化モジュール
import numpy as np
import matplotlib.pyplot as plt
import cv2
from PIL import Image

import time

In [0]:
# ミニバッチ化を行う関数
def collater(sample):
    images = []
    classes = []
    for i in sample:
        images.append(i[0])
        classes.append(i[1])
    im = torch.stack(images)
    cl = torch.tensor(classes)
    return im, cl

# データ拡張方法
preprocess = transforms.Compose([
    # 360度ランダムで画像を回転する
    transforms.RandomRotation(180),
    # ランダムで上下左右反転する
    transforms.RandomHorizontalFlip(0.5),
    transforms.RandomVerticalFlip(0.5),
    # 明るさやコントラストなどのランダムでの調整
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0),
    transforms.ToTensor(),
    #transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.2, 0.2, 0.2])
])

In [0]:
# バッチサイズ
BATCH_SIZE = 64

# 下の可視化で何枚表示するか，バッチサイズより小さくとる
show_size = 10

# クラスの一覧
food = os.listdir(os.path.join(root_dir, "data"))
food.sort()
print("classes: ", food)

dataset = ImageFolder(os.path.join(root_dir, "data"), transform=preprocess)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collater, num_workers=8)

In [0]:
sample = iter(dataloader).next()
# キャンバス(幅，高さ)
fig = plt.figure(figsize=(20, 10))

for i in range(show_size):
    im = sample[0][i]
    ax = fig.add_subplot(1, show_size, i+1)
    plt.imshow(im.numpy().transpose(1, 2, 0))
    plt.grid(False)
    plt.axis('off')
    cl = food[sample[1][i].item()]
    ax.title.set_text(cl)

## CNNのモデルや学習に必要なものの定義

In [0]:
# 畳み込みニューラルネットワーク1
class CNN(nn.Module):
    def __init__(self, image_size, num_classes, C=3):
        super(CNN, self).__init__()
        self.shallow = nn.Sequential(
            nn.Conv2d(C, 2*C, kernel_size=4, padding=1, stride=2),
            nn.BatchNorm2d(2*C),
            nn.ReLU(),
            nn.Conv2d(2*C, 4*C, kernel_size=4, padding=1, stride=2),
            nn.BatchNorm2d(4*C),
            nn.ReLU(),
            nn.ConvTranspose2d(4*C, 8*C, kernel_size=4, padding=0, stride=4),
            nn.Sigmoid()
        )
        self.GAP = nn.AvgPool2d(image_size)
        self.linear = nn.Linear(8*C, num_classes)
        self.softmax = nn.Softmax()
    
    def forward(self, x):
        out = self.shallow(x)
        out = self.GAP(out)
        out = torch.squeeze(out)
        out = self.linear(out)
        out = self.softmax(out)
        return out
    
    def get_cam(self, x, idx):
        self.eval()
        camout = self.shallow(x)
        # act_mapはGAP前の特徴マップ, (C x image_size x image_size)
        act_map = camout[0]
        C, H, W = act_map.size()
        # weightsは全結合層における重みの値, (C x num_classes)
        weights = self.linear.weight.data
        N, _ = weights.size()
        # camの計算 (num_classes x image_size x image_size)
        cam = torch.mm(weights, act_map.view(C, H*W)).view(N, H, W)
        # 特定のクラスの重みのみ取り出す
        cam = cam[idx]
        maxval = cam.max()
        minval = cam.min()
        cam = (cam - minval) / (maxval - minval)
        cam = cam.cpu().detach().numpy()
        return cam

In [0]:
# 残差ブロックの定義
class ResidualBlock(nn.Module):
    def __init__(self, C):
        super(ResidualBlock, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(C, C, kernel_size=3, padding=3, dilation=3),
            nn.BatchNorm2d(C),
            nn.LeakyReLU()
        )
        
    def forward(self, x):
        return x + self.model(x)

# 畳み込みニューラルネットワーク2 (残差ブロックあり)  
class CNN_res(nn.Module):
    def __init__(self, image_size, num_classes, residual_num=5, C=3):
        super(CNN_res, self).__init__()
        self.shallow = []
        for i in range(residual_num):
            self.shallow.append(ResidualBlock(C))
        self.shallow = nn.Sequential(*self.shallow)
        self.GAP = nn.AvgPool2d(image_size)
        self.linear = nn.Linear(C, num_classes)
        self.softmax = nn.Softmax()
    
    def forward(self, x):
        out = self.shallow(x)
        out = self.GAP(out)
        out = torch.squeeze(out)
        out = self.linear(out)
        out = self.softmax(out)
        return out
    
    def get_cam(self, x, idx):
        self.eval()
        camout = self.shallow(x)
        # act_mapはGAP前の特徴マップ, (C x image_size x image_size)
        act_map = camout[0]
        C, H, W = act_map.size()
        # weightsは全結合層における重みの値, (C x num_classes)
        weights = self.linear.weight.data
        N, _ = weights.size()
        # camの計算 (num_classes x image_size x image_size)
        cam = torch.mm(weights, act_map.view(C, H*W)).view(N, H, W)
        # 特定のクラスの重みのみ取り出す
        cam = cam[idx]
        maxval = cam.max()
        minval = cam.min()
        cam = (cam - minval) / (maxval - minval)
        cam = cam.cpu().detach().numpy()
        return cam

In [0]:
# 学習率
LEARNING_RATE = 1e-2
# ハイパーパラメータβ
BETAS = (0.5, 0.999)

# GPU使用可能か判定し，deviceに設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("GPU visible: {}".format(torch.cuda.is_available()))

# 使用するニューラルネットワークモデル (上は通常，下は残差ブロックあり)
#model = CNN(image_size=75, num_classes=len(food))
model = CNN_res(image_size=75, num_classes=len(food))

# 最適化手法 (勾配の谷をどう降りていくか)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, betas=BETAS)

# 損失関数 (クロスエントロピー)
criterion = nn.CrossEntropyLoss()

In [0]:
# データ転送 (ある場合GPU)
model = model.to(device)
criterion = criterion.to(device)

In [0]:
# 重み初期化の時に用いる標準偏差
INITSTD=0.1

def init_weights(layer):
    if isinstance(layer, nn.Conv2d) or isinstance(layer, nn.Linear) or isinstance(layer, nn.ConvTranspose2d):
        torch.nn.init.normal_(layer.weight, std=INITSTD)
        if layer.bias is not None:
            torch.nn.init.zeros_(layer.bias)

modeldata = model.apply(init_weights)

## 学習実行！

In [0]:
# 学習を行うエポック数
NUM_EPOCHS = 100

# モデルを保存するパス
save_path = os.path.join(root_dir, "models")

print('begin training')
loss_list = []
for ep in range(NUM_EPOCHS):
    
    begin = time.time()
    correct = 0
    full = 0
    running_loss = 0
    cnt = 0
    
    for it, sample in enumerate(dataloader):
        im_batch = sample[0].to(device)
        cl_batch = sample[1].to(device)
        
        optimizer.zero_grad()
        out = model(im_batch)
        correct += torch.sum(torch.argmax(out, dim=1) == cl_batch).item()
        full += BATCH_SIZE
        cnt += 1
        loss = criterion(out, cl_batch)
        loss.backward()
        optimizer.step()
        loss_list.append(loss.item())
        running_loss += loss.item()
        
        # 10ステップごとに損失関数の値を表示
        if it % 10 == 9:
            print('{}th iter done \t| loss: {}'.format(it+1, loss.item(), flush=True))
        
        
    end = time.time()
    # エポックごとに損失関数・クラス識別の正解率を表示
    print('-' * 100)
    print('epoch {:03d} \t| loss: {:.05f} \t| train_acc: {:04d}/{:04d} \t| {:.05f}s per loop'.format(ep+1, running_loss/cnt, correct, full, (end-begin)/cnt), flush=True)
    print('-' * 100)
    
    # 10エポックごとにモデルを保存
    if ep % 10 == 9:
        path = os.path.join(save_path, 'ep{}.pth'.format(ep+1))
        torch.save(model.state_dict(), path)
        print('saved model for epoch {} at {}'.format(ep+1, path))
        
print('done!')

## 学習結果の可視化 (損失グラフ，テストデータのCAM)

In [0]:
# 学習損失のイテレーションごとのプロット
plt.plot(loss_list)
plt.title('training loss')
plt.ylabel('cross entropy')
plt.xlabel('iterations')

In [0]:
# camを画像に乗せてヒートマップにする関数．画像とcamを入力する
def cam_on_image(img, mask):
	heatmap = cv2.applyColorMap(np.uint8(255*mask), cv2.COLORMAP_JET)
	heatmap = np.float32(heatmap) / 255
	cam = heatmap + np.float32(img)
	cam = cam / np.max(cam)
	return cam

In [0]:
# キャンバスの設定
fig = plt.figure(figsize=(10, 10))
# タイトル
plt.suptitle('visualization of CAM')

# dataloader(学習データのローダー)からサンプルを適当に取ってくる
for i in dataloader:
    im = i[0].to(device)
    out = model(im).detach().cpu()[0]
    # 普通に分類させる
    cl = np.argmax(out)
    # 該当したクラスclに関するCAMを取得する
    cam = model.get_cam(im, cl)

    # 元画像を表示
    ax1 = fig.add_subplot(221)
    original = i[0][0].numpy().transpose(1, 2, 0)
    plt.imshow(original)
    plt.grid(False)
    plt.axis('off')
    gt = food[i[1][0].item()]
    ax1.title.set_text('ground truth: {}'.format(gt))
    
    # CAMの乗った画像を表示
    ax2 = fig.add_subplot(222)
    plt.imshow(cam_on_image(original, cam))
    plt.grid(False)
    plt.axis('off')
    p = out[cl].item() * 100
    classname = food[cl]
    ax2.title.set_text('predicted class: {}, {:.02f}%'.format(classname, p))
    break