<a href="https://colab.research.google.com/github/tsakailab/tutorial/blob/main/pytorch/pytorch_tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

pytorchで転移学習やらモデルの作成やらをやってみよう！というゼミ資料です．

最初の方は転移学習について話しています．

このコードは前半については[公式のチュートリアル](https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html)をほぼコピペしています．

説明がかなり雑な部分が多いので，気づいた人は説明の追加をしていただければ非常に助かります．

In [None]:
#@title ## Download and extract data zipfile

# License: BSD
# Author: Sasank Chilamkurthy

from __future__ import print_function, division
import warnings
warnings.simplefilter('ignore')
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
from tqdm import tqdm
from collections import OrderedDict
import glob
from PIL import Image

plt.ion()   # interactive mode
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
!wget https://download.pytorch.org/tutorial/hymenoptera_data.zip
!unzip hymenoptera_data.zip -d ./

ants = glob.glob('./hymenoptera_data/train/ants/*')
bees = glob.glob('./hymenoptera_data/train/bees/*')
f=lambda x: np.array(Image.open(x))
ants = np.array(list(map(f,ants)))
bees = np.array(list(map(f,bees)))
data = np.concatenate((ants,bees),0)
labels = np.concatenate((np.zeros(len(ants)),np.ones(len(bees))),0).astype('int64')

ants_val = glob.glob('./hymenoptera_data/train/ants/*')
bees_val = glob.glob('./hymenoptera_data/train/bees/*')
f=lambda x: np.array(Image.open(x))
ants_val = np.array(list(map(f,ants_val)))
bees_val = np.array(list(map(f,bees_val)))
data_val = np.concatenate((ants_val,bees_val),0)
labels = np.concatenate((np.zeros(len(ants_val)),np.ones(len(bees_val))),0).astype('int64')

In [None]:
# dataloader作り
# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

data_dir = './hymenoptera_data/'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
                                             shuffle=True, num_workers=4)
              for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes


In [None]:
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated

# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)
imshow(out, title=[class_names[x] for x in classes])

In [None]:
#@title ## Define some function(training, printing result)
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title('predicted: {}'.format(class_names[preds[j]]))
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)
def train_model_tqdm(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    train_loss,val_loss,train_acc,val_acc = 0,0,0,0
    with tqdm(range(num_epochs)) as bar:
        for epoch in bar:
            bar.set_description('Epoch {}/{}'.format(epoch, num_epochs - 1))
            
            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                if phase == 'train':
                    scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                #print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                #    phase, epoch_loss, epoch_acc))

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                if phase=='train':
                    train_loss = epoch_loss
                    train_acc = epoch_acc
                else:
                    val_loss = epoch_loss
                    val_acc = epoch_acc
            bar.set_postfix(OrderedDict(train_loss='{:.4f}'.format(train_loss), 
                                        train_accuracy = '{:.4f}'.format(train_acc),
                                        val_loss='{:.4f}'.format(val_loss), 
                                        val_accuracy = '{:.4f}'.format(val_acc)))
            print()
    time_elapsed = time.time() - since
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model


In [None]:
# make model
model = models.resnet18(pretrained=True)
#model = model = models.vgg16(pretrained=True)

num_ftrs = model.fc.in_features
# Here the size of each output sample is set to 2.
# Alternatively, it can be generalized to nn.Linear(num_ftrs, len(class_names)).
model.fc = nn.Linear(num_ftrs, 2)

model = model.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
# Train model!
model_ft = train_model(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
visualize_model(model_conv)

plt.ioff()
plt.show()

以下はtutorialにない内容．

できたら便利だという事を武田の独断と偏見で追加しています．

In [None]:
# tqdmを使ってみる
# train_model_tqdmという関数を作っておきました．引数は同じなので，使ってみて下さい．


In [None]:
# batch_sizeを変えてみる
# 先に宣言したimshowあたりで見てみてください．

#
# batch_sizeを変えたdataloaders作成
#

# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)
print(classes)
imshow(out, title=[class_names[x] for x in classes])

In [None]:
#@title ##難しいので折り畳み
# MRIグループ向け．
# 既に全データがnumpyで保存されているとき，どのようにdataloadersを作る？
# ヒント：クラスを宣言することが必要です．
# [答えはここに]内に武田が書いたmydatasetsというクラスがあります．
# 殆ど最小限の構造になっているはずなので，見てみてほしい．

# 全部盛りnumpyファイルの作成
data = data # 画像データ
labels = labels # ラベルデータ(アリ:0,ハチ:1)

#
# datasetクラスの宣言
#

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}
data_train = mydatasets(data, labels, data_transforms['train'])
train_loaders = torch.utils.data.DataLoader(data_train, batch_size=2, shuffle=True)

# Get a batch of training data
inputs, classes = next(iter(train_loaders))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)
imshow(out, title=[class_names[x] for x in classes])

In [None]:
#@title ##答えはここに
# こんな感じのクラスを作成すれば達成できます．
class mydatasets(torch.utils.data.Dataset):
    def __init__(self, data, labels, transforms=None):
        self.data = data # データはdataという変数に格納されます．
        self.label = labels # ラベルはlabelという変数に．
        self.datanum = len(self.data) # コレを設定すると，総データ数をdataloaderが管理してくれます．
        self.transforms = transforms # data augmentation(データ拡張)の設定を保存．
        
    def __len__(self):
        return self.datanum
    
    def __getitem__(self, idx):
        out_data = Image.fromarray(self.data[idx]) # データ読み取り
        out_label = self.label[idx]
        if self.transforms:
            out_data = self.transforms(out_data)
        return out_data, out_label

In [None]:
# modelをプリントしてみる
# modelはどのような構造となっているのかを確認してみて下さい．


In [None]:
# かなり長い文字列が表示されたと思います．
# では，VGG16のものを表示してみてください．


# 多少は短いものが表示されると思います．

In [None]:
#@title ##答え
model = models.vgg16(pretrained=True)
print(model)

In [None]:
# VGG16の特徴抽出部より特徴抽出部を抜き出してください．
# featuresとある部分がそれに対応します．


In [None]:
#@title ##答え
feature_extraction = model.features
print(feature_extraction)

In [None]:
# 抜きだした特徴抽出部から一部をさらに取り出してください．
# 何処でもいいですが，悩むのであれば最後から2段目のMaxPooling層までを抜き出してみて下さい．


In [None]:
#@title ##答え
feature_extraction = model.features
layers = feature_extraction[:23]
print(layers)

In [None]:
# 抜き出した層の重みを比べてみて下さい．
# 保存されているはずです．

# こうして転移学習する層数を変えることができます．
# 層数が少なければどのような特徴が抽出されるか，逆に層が厚い場合はどうなのか，考えてみて下さい．
# 受容野とかそういう話です．
# 分からなければ松尾とかが詳しいはずです．聞いてみて下さい．

In [None]:
#@title ##答え
feature_extraction = model.features
layers = feature_extraction[:23]

print(feature_extraction[0].weight == layers[0].weight)

In [None]:
# 御託はそろそろとして，モデルを自分で組んでみましょう．
# ひな型を武田が用意しました．
# 1層の畳み込み層のみで構成されるネットワークを作ってみましょう．
# 入力は(224,224,3)の3チャネル画像を想定します．
# つまりinput_channelは3です．公式のAPIを見に行ったりして書いてみて下さい．
# 公式API:https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html

# 記入する際，out_channelを決定する必要があると思います．
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = ___
        self.conv = ___ # ここに記入
        self.fc1 = nn.Linear(out_channel,2)
        
    def forward(self,x):
        x = self.conv(x)
        
        x = x.view(x.shape[0],28*28*64)
        x = self.fc1(x)   #上の部分をしっかり記入できていれば動くようになっています．

        return 

In [None]:
#@title ##答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = 64
        self.conv = torch.nn.Conv2d(3,out_channel,3,bias=True,padding = 1)
        self.fc1 = nn.Linear(out_channel*224*224,2)
        
    def forward(self,x):
        x = self.conv(x)
        x = x.view(x.shape[0],224*224*64)
        x = self.fc1(x)

        return x

In [None]:
# 作成したモデルを学習させてみましょう！
# 多分ですが，わけわからん事になります．
# (最適化関数をAdamにすることで少しはマシになります．やってみてもよろし)
model = my_model().to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# 活性化関数を入れてみる
# たまに聞く活性化関数なるものを導入してみましょう．
# 特徴抽出を行うレイヤーの後ろに置きます．
# 今回はReLUを使ってみて下さい．

class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = ___
        self.conv = torch.nn.Conv2d(3,out_channel,3,bias=True,padding = 1) # ここに記入
        self.activate = ____ # ReLUを導入してみましょう．
        self.fc1 = nn.Linear(224*224*out_channel,2)
        
    def forward(self,x):
        x = self.conv(x)
        x = self.activate(x)
        
        x = x.view(x.shape[0],224*224*64)
        x = self.fc1(x)   #上の部分をしっかり記入できていれば動くようになっています．

        return x

In [None]:
#@title ##答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = 64
        self.conv = torch.nn.Conv2d(3,out_channel,3,bias=True,padding = 1) # ここに記入
        self.activate = torch.nn.ReLU() # ReLUを導入してみましょう．
        self.fc1 = nn.Linear(out_channel*224*224,2)
        
    def forward(self,x):
        x = self.conv(x)
        x = self.activate(x)
        
        x = x.view(x.shape[0],224*224*64)
        x = self.fc1(x)
        return x

In [None]:
#@title ##学習用セル
model = my_model().to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# Poolingもしてみましょう．
# VGGにも含まれるMaxPooling層を導入します．
# 特徴抽出部のすぐ後ろに入れてみましょう．
# 今回はforwardも自力で書き換えてみて下さい．
# チャネル数を変えてみてもいいかもしれません．

class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = ___
        self.conv = torch.nn.Conv2d(3,out_channel,3,bias=True,padding = 1) # ここに記入
        self.activate = torch.nn.ReLU()
        self.fc1 = nn.Linear(out_channel*112*112,2)
        self.pooling = ___ #ここに記入してみて下さい．
        
    def forward(self,x):
        x = self.conv(x)
        x = self.activate(x)
        
        x = x.view(x.shape[0],112*112*64)
        x = self.fc1(x)   #上の部分をしっかり記入できていれば動くようになっています．

        return x

In [None]:
#@title ##答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = 64
        self.conv = torch.nn.Conv2d(3,out_channel,3,bias=True,padding = 1) # ここに記入
        self.activate = torch.nn.ReLU()
        self.fc1 = nn.Linear(out_channel*112*112,2)
        self.pooling = torch.nn.MaxPool2d(2) #ここに記入してみて下さい．
        
    def forward(self,x):
        x = self.conv(x)
        x = self.activate(x)
        x = self.pooling(x)

        x = x.view(x.shape[0],112*112*64)
        x = self.fc1(x)   #上の部分をしっかり記入できていれば動くようになっています．
        return x

In [None]:
#@title ##学習用セル(学習が進まなかったのでここだけAdam)
model = my_model().to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.Adam(model.parameters(), lr=0.001)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# では，特徴抽出部を増やしてみましょう．
# 一気に2層ほど増やしてみることにします．
# 今回はヒント無しです．答えは用意しておきますが，実際に書いてみて下さい．
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = ___
        self.conv = torch.nn.Conv2d(3,out_channel,3,bias=True,padding = 1)
        self.activate = torch.nn.ReLU()
        self.fc1 = nn.Linear(out_channel,2)
        self.pooling = torch.nn.MaxPool2d(2)
        
    def forward(self,x):
        x = self.conv(x)
        x = self.activate(x)
        x = self.pooling(x)
        
        x = x.view(x.shape[0],28*28*64)
        x = self.fc1(x)

        return x

In [None]:
#@title ##答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        out_channel = 64
        self.conv1 = torch.nn.Conv2d(3,out_channel,3,bias=True,padding = 1)
        self.activate = torch.nn.ReLU()
        self.pooling = torch.nn.MaxPool2d(2)

        self.conv2 = torch.nn.Conv2d(out_channel,out_channel*2,3,bias=True,padding = 1)
        self.conv3 = torch.nn.Conv2d(out_channel*2,out_channel,3,bias=True,padding = 1)
        
        self.fc1 = nn.Linear(out_channel*28*28,2)
    def forward(self,x):
        x = self.conv1(x)
        x = self.activate(x)
        x = self.pooling(x)
        x = self.conv2(x)
        x = self.activate(x)
        x = self.pooling(x)
        x = self.conv3(x)
        x = self.activate(x)
        x = self.pooling(x)
        x = x.view(x.shape[0],28*28*64)
        x = self.fc1(x)

        return x

In [None]:
#@title ##学習用セル
model = my_model().to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# 少しはマシになったかもしれません．
# そろそろネットワークを作る事にも慣れてきたことでしょう．
# それでは，特徴抽出部を転移学習してみましょう．
# 特徴抽出部だけを抜き出してきた時を思い出してみて下さい．

class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg = ___ # ここに記入
        self.activate = torch.nn.ReLU()
        self.avgpool = models.vgg16().avgpool
        self.fc1 = nn.Linear(512*7*7,2)
        
    def forward(self,x):

        x = self.avgpool(x)
        x = x.view(x.shape[0],512*7*7)
        x = self.fc1(x)

        return x

In [None]:
#@title 答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg = models.vgg16(pretrained=True).features
        self.activate = torch.nn.ReLU()
        self.avgpool = models.vgg16().avgpool
        self.fc1 = nn.Linear(512*7*7,2)
        
    def forward(self,x):
        x = self.vgg(x)
        x = self.avgpool(x)
        x = x.view(x.shape[0],512*7*7)
        x = self.fc1(x)

        return x

In [None]:
#@title ##学習用セル
model = my_model().to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# 一応性能が出たと思います．
# 転移学習の力を思い知って頂ければ幸い．
# 転移学習しましたが，実際にはもうひと手間加えたい事があります．
# それは学習不可にする事です．
# 何故そうするかはパラメータ数が多すぎる事とかいろいろあるわけですが，まあ詳しくは割愛．と言うわけで学習不可，つまりfreezeしましょう．
# どうすればよいかは調べれば出てくるので，実践してみて下さい．
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg = models.vgg16(pretrained=True).features
        self.activate = torch.nn.ReLU()
        self.avgpool = models.vgg16().avgpool
        self.fc1 = nn.Linear(512*7*7,2)
        
    def forward(self,x):
        x = self.vgg(x)
        x = self.avgpool(x)
        x = x.view(x.shape[0],512*7*7)
        x = self.fc1(x)

        return x

In [None]:
#@title 答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg = models.vgg16(pretrained=True).features
        for layer in self.vgg:
            for param in layer.parameters():
                param.requires_grad = False
        self.activate = torch.nn.ReLU()
        self.avgpool = models.vgg16().avgpool
        self.fc1 = nn.Linear(512*7*7,2)
        
    def forward(self,x):
        x = self.vgg(x)
        x = self.avgpool(x)
        x = x.view(x.shape[0],512*7*7)
        x = self.fc1(x)

        return x


In [None]:
#@title ##学習用セル
model = my_model().to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# できましたか？
# 実際freezeできているか確認することはなかなか難しいと思います．
# そんなところで，便利なライブラリを紹介します．
# torchsummary君です．百聞は何とやらと言うわけで使ってみて下さい．便利さが分かります．(pip推奨．condaでインストールしようとすると変な事になります)
!pip install torchsummary
from torchsummary import summary
summary(model_ft,(3,224,224))
# ちなみにこのtorchsummary君ですが，0番目のGPUにモデルが載っている事を前提にできているみたいです．
# 複数GPUや，CPUを使う場合は利用できないのでご注意を．
# ここでtrainable parameterに振られている方はfreezeできていないのでやり直しです．

転移学習を実装する事に関してはマスターしたも同然です．

後は自分で学んでいけるでしょう(ここまでちゃんとやった人はそもそも自力で調べられる人だと思いますが)

ここから先は発展的内容になります．

武田が知っていた方が面白いのではと思った事を書き連ねているだけです．

もういいやという人はブラウザバック推奨．

In [None]:
# さて，散々識別問題を解かせてきました．しかし実際にモデルは何処を見ているのでしょうか．
# その基準を知ろうという試みとして，CAM(class activation map)があります．
# 簡潔に言えば，モデルがどこを見て診断したか可視化したデータです．
# これは酒井研の研究では細胞診，MRIで利用されていますね．
# 詳しくはここでは触れません．調べてみてね．本項ではその実装について触れます．
# 作り方は次セルに任せるとして，このセルではモデルの出力を増やしてみましょう．

# モデルはさっきまで使っていたものとします．
# このモデルのうち，特徴抽出部から出て来たものをついでに出力してみましょう．
# どうすれば出力できるでしょうか？
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg = models.vgg16(pretrained=True).features
        self.activate = torch.nn.ReLU()
        self.avgpool = models.vgg16().avgpool
        self.fc1 = nn.Linear(512*7*7,2)
        
    def forward(self,x):
        x = self.vgg(x)
        x = self.avgpool(x)
        x = x.view(x.shape[0],512*7*7)
        x = self.fc1(x)

        return x

In [None]:
#@title 答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg = models.vgg16(pretrained=True).features
        self.activate = torch.nn.ReLU()
        self.avgpool = models.vgg16().avgpool
        self.fc1 = nn.Linear(512*7*7,2)
        
    def forward(self,x):
        x = self.vgg(x)
        feature_map = x.clone()
        x = self.avgpool(x)
        x = x.view(x.shape[0],512*7*7)
        x = self.fc1(x)

        return x, feature_map

実際に学習してみましょう．

本来はちょいと学習用関数も変更する必要がありますが，今回は学習用セルに含めておきました．
(0番目の出力が推論であることを想定しています．)

In [None]:
#@title 学習用セル
def train_model_tqdm(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    train_loss,val_loss,train_acc,val_acc = 0,0,0,0
    with tqdm(range(num_epochs)) as bar:
        for epoch in bar:
            bar.set_description('Epoch {}/{}'.format(epoch, num_epochs - 1))
            
            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)[0]
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                if phase == 'train':
                    scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                #print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                #    phase, epoch_loss, epoch_acc))

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                if phase=='train':
                    train_loss = epoch_loss
                    train_acc = epoch_acc
                else:
                    val_loss = epoch_loss
                    val_acc = epoch_acc
            bar.set_postfix(OrderedDict(train_loss='{:.4f}'.format(train_loss), 
                                        train_accuracy = '{:.4f}'.format(train_acc),
                                        val_loss='{:.4f}'.format(val_loss), 
                                        val_accuracy = '{:.4f}'.format(val_acc)))
            print()
    time_elapsed = time.time() - since
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model
#@title ##学習用セル
model = my_model().to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model_tqdm(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# 学習が終わりましたね．
# 実際に算出される特徴マップを見てみましょうか．
img = Image.open('./hymenoptera_data/val/ants/Ant-1818.jpg')
img = data_transforms['val'](img)
imshow(img)
feature_map = model_ft(img.unsqueeze(0).to(device))[1]
fig = plt.figure(figsize=(15,15))
for x in range(1,6):
    plt.subplot(1,5,x)
    plt.imshow(feature_map.cpu().detach()[0][x], cmap='gray')

# まあ，ようわからんわけです．
# これは特徴抽出部の層を厚く取ったため，挟まれるPoolingでfeature mapのサイズが小さくなってしまっている事に起因しています．
# 一応この特徴マップでもどこから特徴を取得しているかわかりますが，それでも分かりづらいわけです．

In [None]:
# ではCAMに移ります．
# CAMはGAP(Global average Pooling)層の入力として現れます．
# これを導入すると殆どFCN(Fully Convolutional Network)となるわけですが，学術的話は一切やりません．調べて．
# GAPはx.mean([2,3])で実装できます．楽ですね．
# CAMは各クラスの数だけである必要があります．
# 従って，512channelまで増えた特徴マップを2チャネルに落とす必要があるわけで．
# これは何層かConv2dを挟んで対応します．
# まあなんやかんや言ってますが，私の説明で分かるとも思えないので，実際にやってみましょう．難しいと思いますので，答えを見たりしてやってみて下さい．


In [None]:
#@title 答え
class my_model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.vgg = models.vgg16(pretrained=True).features
        for layer in self.vgg:
            for param in layer.parameters():
                param.requires_grad = False
        self.activate = torch.nn.ReLU()
        self.avgpool = models.vgg16().avgpool
        self.conv = torch.nn.Sequential(
            torch.nn.Conv2d(512,128,3,padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(128,32,3,padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32,2,2,padding=1),
            torch.nn.ReLU(),
        )
        
    def forward(self,x):
        x = self.vgg(x)
        x = self.conv(x)
        cam = x.clone()
        x = x.mean([2,3])
        
        return x, cam

In [None]:
#@title 学習用セル
def train_model_tqdm(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    train_loss,val_loss,train_acc,val_acc = 0,0,0,0
    with tqdm(range(num_epochs)) as bar:
        for epoch in bar:
            bar.set_description('Epoch {}/{}'.format(epoch, num_epochs - 1))
            
            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)[0]
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                if phase == 'train':
                    scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                #print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                #    phase, epoch_loss, epoch_acc))

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(model.state_dict())
                if phase=='train':
                    train_loss = epoch_loss
                    train_acc = epoch_acc
                else:
                    val_loss = epoch_loss
                    val_acc = epoch_acc
            bar.set_postfix(OrderedDict(train_loss='{:.4f}'.format(train_loss), 
                                        train_accuracy = '{:.4f}'.format(train_acc),
                                        val_loss='{:.4f}'.format(val_loss), 
                                        val_accuracy = '{:.4f}'.format(val_acc)))
            print()
    time_elapsed = time.time() - since
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model
#@title ##学習用セル
model = my_model().to(device)
summary(model,(3,224,224))
criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model_tqdm(model, criterion, optimizer_ft, exp_lr_scheduler,num_epochs=25)

In [None]:
# CAMを出力してみましょう．
# 実行するたび別のものが表示されるので，いろいろと試してみて下さい．
# ちなみに推論されたクラスはCAMの上にある文字が赤くなります．
import random
target = ['ants','bees'][1]
img = Image.open('./hymenoptera_data/val/{}/'.format(target) + random.choice(os.listdir('./hymenoptera_data/val/{}/'.format(target))))
img = data_transforms['val'](img)
imshow(img)
outputs, cam = model_ft(img.unsqueeze(0).to(device))
_, preds = torch.max(outputs, 1)
fig = plt.figure(figsize=(7,7))
for x in range(2):
    plt.subplot(1,2,x+1)
    plt.imshow(cam.cpu().detach()[0][x], cmap='gray', vmin=cam.min(),vmax=cam.max())
    plt.title(class_names[x],color="red" if preds == x else 'black')
