<a href="https://colab.research.google.com/github/project-ccap/project-ccap.github.io/blob/master/notebooks/2020_0724transfer_learning_tlpa_mnasnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# TLPA 画像を使ってディープラーニングモデルによる転移学習を行う PyTorch デモ
- author: 浅川伸一
- date: 2020-0726

In [None]:
# 各画像の画面表示時に日本語キャプションを付与する準備
import matplotlib.pyplot as plt
%matplotlib inline
!pip install japanize-matplotlib
import japanize_matplotlib

#  ImageNet の各ラベルの WordNet ID 処理用
import nltk
nltk.download('wordnet')
nltk.download('omw')

# ライブラリのインストール
!git clone https://github.com/project-ccap/ccap.git

In [None]:
# 画像データ，設定データを Google Drive から入手
# このセルを実行するとブラウザの別タブで Google アカウントへの認証が求められる
# Google アカウントを選択するとクリデンシャルキーが表示されるので，そのキーを
# コピーして，このセルの出力欄にある空欄に貼り付けてエンターキー (リターンキー) を押下する

# Import PyDrive and associated libraries.
# This only needs to be done once per notebook.
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client.
# This only needs to be done once per notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# 以下実際のデータの情報
#https://drive.google.com/file/d/1xKXbovkEQwdJefzCuaS_a351LUIuRz-1/view?usp=sharing 
#for Gdrive cis.twcu.ac.jp/GitHub_shared/ccap_data.tgz
file_id = '1xKXbovkEQwdJefzCuaS_a351LUIuRz-1'
downloaded = drive.CreateFile({'id': file_id})
downloaded.GetContentFile('ccap_data.tgz')

# 入手したデータの解凍
!tar xzf ccap_data.tgz

In [None]:
# 以下は動作確認，ImageNet の利用
# ただし本来 ImageNet の画像利用には登録が必要である
# そのため，利用時には各ユーザの責任において ImageNet への登録申請を行うこと
# 参照 URL: http://image-net.org/download-images
# 文献: J. Deng, W. Dong, R. Socher, L.-J. Li, K. Li and L. Fei-Fei, ImageNet: A Large-Scale Hierarchical Image Database,
#       IEEE Computer Vision and Pattern Recognition (CVPR), 2009.
from ccap import imagenetDataset
imagenet = imagenetDataset()

# 最初のデータの表示
print(imagenet(0))

# 4 番目のデータ 0 から始まるので 3 が 4 番目のデータを表す
print(imagenet.data[3])

# 最後の画像データをランダムサンプリングして一枚だけ表示
# 実行するたび表示されるデータは異なる
imagenet.sample_and_show(999)

In [None]:
# TLPA データの利用
# 文献: 藤田郁代 他, 2000, 「失語症語彙検査」の開発, 音声言語医学 42:179-202
from ccap import tlpaDataset
tlpa = tlpaDataset()

# 最初のデータの表示
print(tlpa(0))

# 図版画像の表示
tlpa.show_an_image('桜')

# 総データ数の表示
print(tlpa.__len__())

# ここから先は PyTorch を使った転移学習の実際

In [None]:
import numpy as np
import PIL.Image as PILImage
from scipy.special import logsumexp, softmax
from termcolor import colored

import torch
import torchvision
#from torchvision import models, transforms
import torch.utils.data as data
import torch.nn as nn
import torch.optim as optim


In [None]:
import torchvision.models as models
#resnet18 = models.resnet18()
#alexnet = models.alexnet()
#vgg16 = models.vgg16()
#squeezenet = models.squeezenet1_0()
#densenet = models.densenet161()
#inception = models.inception_v3()
#googlenet = models.googlenet()
#shufflenet = models.shufflenet_v2_x1_0()
#mobilenet = models.mobilenet_v2()
#resnext50_32x4d = models.resnext50_32x4d()
#wide_resnet50_2 = models.wide_resnet50_2()
#mnasnet = models.mnasnet1_0()

net = models.mnasnet1_0(pretrained=True, progress=True)

In [None]:
from torchvision import transforms

transform = transforms.Compose([transforms.Resize(256), 
                                transforms.CenterCrop(224), 
                                transforms.ToTensor()])


In [None]:
#tlpa = tlpaDataset()
tlpa_img_path = [tlpa.data[k]['img'] for k in tlpa.data.keys()]
#tlpa.data.keys()
tlpa_name_dict = {i:k for i, k in enumerate(tlpa.data.keys())}
print(tlpa_name_dict)

In [None]:
# 入力画像の前処理をするクラス
# 訓練時と推論時で処理が異なる
class ImageTransform():
    """
    画像の前処理クラス。訓練時、検証時で異なる動作をする。
    画像のサイズをリサイズし、色を標準化する。
    訓練時は RandomResizedCrop と RandomHorizontalFlip で データ拡張


    Attributes
    ----------
    resize : int
        リサイズ先の画像の大きさ。
    mean : (R, G, B)
        各色チャネルの平均値。
    std : (R, G, B)
        各色チャネルの標準偏差。
    """

    def __init__(self, resize, mean, std):
        self.data_transform = {
                        'train': transforms.Compose(
                            [transforms.RandomResizedCrop(resize, scale=(0.8, 1.0)),  # データ拡張
                             transforms.RandomHorizontalFlip(),  # データ拡張
                             transforms.RandomAffine(degrees=(-20,20), translate=None, scale=[0.9,1.1]),
                             transforms.ToTensor(),  # テンソルに変換
                             transforms.Normalize(mean, std)  # 標準化
                             ]),
                        'val': transforms.Compose(
                            [transforms.Resize((resize, resize)),  # リサイズ
                             # transforms.CenterCrop(resize),  # 画像中央をresize×resizeで切り取り
                             transforms.ToTensor(),  # テンソルに変換
                             transforms.Normalize(mean, std)  # 標準化
                             ])
                        }

    def __call__(self, img, phase='train'):
        """
        Parameters
        ----------
        phase : 'train' or 'val'
            前処理のモードを指定。
        """
        return self.data_transform[phase](img)

In [None]:
# Dataset の作成
class tlpa_torch_Dataset(data.Dataset):
    """
    TLPA 画像のDatasetクラス。PyTorchのDatasetクラスを継承。

    Attributes
    ----------
    file_list : リスト
        画像のパスを格納したリスト
    transform : object
        前処理クラスのインスタンス
    phase : 'train' or 'test'
        学習か訓練かを設定する。
    """

    def __init__(self, file_list, name_dict, transform=None, phase='train'):
        self.file_list = file_list  # ファイルパスのリスト
        self.transform = transform  # 前処理クラスのインスタンス
        self.phase = phase  # train or valの指定
        self.namedict = name_dict

    def __len__(self):
        '''画像の枚数を返す'''
        return len(self.file_list)

    def __getitem__(self, index):
        '''
        前処理をした画像のTensor形式のデータとラベルを取得
        '''

        # index番目の画像をロード
        img_path = self.file_list[index]
        img = PILImage.open(img_path)  # [高さ][幅][色RGB]

        # 画像の前処理を実施
        img_transformed = self.transform(
            img, self.phase)  # torch.Size([3, 224, 224])

        # 画像のラベルをファイル名から抜き出す
        label = self.namedict[index]
        return img_transformed, label


# 画像の前処理に必要なパラメータの定義
size = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)

train_dataset = tlpa_torch_Dataset(file_list=tlpa_img_path, 
                                   name_dict=tlpa_name_dict,  
                                   transform=ImageTransform(size, mean, std), 
                                   phase='train')

val_dataset = tlpa_torch_Dataset(file_list=tlpa_img_path, 
                                 name_dict=tlpa_name_dict, 
                                 transform=ImageTransform(size, mean, std), 
                                 phase='val')

# 動作確認
index = 3
print(train_dataset.__getitem__(index)[0].size())
print(train_dataset.__getitem__(index)[1])
print(train_dataset.__len__())

In [None]:
# ミニバッチのサイズの設定
batch_size = 32

# DataLoaderを作成
train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True)

val_dataloader = torch.utils.data.DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False)

# 辞書型変数へまとめる
dataloaders_dict = {"train": train_dataloader, "val": val_dataloader}

# 動作確認
batch_iterator = iter(dataloaders_dict["train"])  # イテレータに変換
inputs, labels = next(batch_iterator)  # 1番目の要素を取り出す
print(inputs.size())
print(labels)

In [None]:
# 事前学習済のモデル構成を表示
net

In [None]:
# 直上出力の最後 `Linear(in_features=1280, out_features=1000, bias=True)` に注目
# モデルの最終直下層の出力ユニット数を TLPA に合わせて 180 にする
net.classifier[1] = nn.Linear(in_features=1280, out_features=180)


In [None]:
# 訓練モードに設定
net.train()

In [None]:
# 損失関数の設定
criterion = nn.CrossEntropyLoss()

In [None]:
# 転移学習で学習させるパラメータを params_to_update に格納
params_to_update = []

# 学習させるパラメータ名
update_param_names = ["classifier.1.weight", "classifier.1.bias"]

# 学習させるパラメータ以外は勾配計算をなくし、変化しないように設定
for name, param in net.named_parameters():
    if name in update_param_names:
        param.requires_grad = True
        params_to_update.append(param)
        print(name)
    else:
        param.requires_grad = False

# params_to_update を表示
print(params_to_update)

In [None]:
update_param_names = ["classifier.1.weight", "classifier.1.bias"]
for name, param in net.named_parameters():
    if name in update_param_names:
#        param.requires_grad = True
#        params_to_update.append(param)
        print(name)
    else:
#        param.requires_grad = False
        continue


In [None]:
# 最適化手法の設定
#optimizer = optim.SGD(params=params_to_update, lr=0.001, momentum=0.9)
#help(optim.Adam)
#optimizer = optim.SGD(params=params_to_update, lr=0.001, momentum=0.9)
optimizer = optim.Adam(params=params_to_update) 

In [None]:
# 学習関数の定義
def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):

    # epochのループ
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-------------')

        # epochごとの学習と検証のループ
        for phase in ['train', 'val']:
            if phase == 'train':
                net.train()  # モデルを訓練モード
            else:
                net.eval()   # モデルを検証モード

            epoch_loss = 0.0  # epochの損失和
            epoch_corrects = 0  # epochの正解数

            # 未学習時の検証性能を確かめるため、epoch=0の訓練は省略
            if (epoch == 0) and (phase == 'train'):
                continue

            # データローダーからミニバッチを取り出す
            #for inputs, labels in tqdm(dataloaders_dict[phase]):
            # tqdm は要らん。冗長な出力になるだけ
            for inputs, labels in dataloaders_dict[phase]:
                # optimizerを初期化
                optimizer.zero_grad()

                # 順伝搬（forward）計算
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = net(inputs)
                    loss = criterion(outputs, labels)  # 損失を計算
                    _, preds = torch.max(outputs, 1)  # ラベルを予測
  
                    # 訓練時はバックプロパゲーション
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    # イタレーション結果の計算
                    # lossの合計を更新
                    epoch_loss += loss.item() * inputs.size(0)  
                    # 正解数の合計を更新
                    epoch_corrects += torch.sum(preds == labels.data)

            # epochごとのlossと正解率を表示
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double(
            ) / len(dataloaders_dict[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

In [None]:
# 訓練実施前の動作確認として 1 エポックだけ実行
for inputs, labels in dataloaders_dict['train']:
    print(inputs.size(), labels)
    output = net(inputs)
    loss = criterion(output, labels)

In [None]:
%%time
# 学習・検証の実行
num_epochs=50
train_model(net, dataloaders_dict, criterion, optimizer, num_epochs=num_epochs)


In [None]:

saved_weight_file = '2020-0725tlpa_mnasnet_weights.pth'
torch.save(net.state_dict(), saved_weight_file)
load_weights = torch.load(saved_weight_file)
net.load_state_dict(load_weights)

In [None]:
#float_formatter = "{:.3f}".format
#np.set_printoptions(formatter={'float_kind':float_formatter})
# see https://note.nkmk.me/python-numpy-set-printoptions-float-formatter/
np.set_printoptions(formatter={'int': '{:3d}'.format, 'float_kind':'{:.3f}'.format})

def diagnose(no, display=False, n_best=5):
    img, label = tlpa(no)
    img = PILImage.open(img)   # [高さ][幅][色RGB]

    # 元の画像の表示
    #if display:
    #    plt.imshow(img); plt.show()

    # 画像の前処理と処理済み画像の表示
    size = 224
    mean = (0.485, 0.456, 0.406)
    std = (0.229, 0.224, 0.225)

    transform = ImageTransform(size, mean, std)
    img_transformed = transform(img, phase="val")  # torch.Size([3, 224, 224])

    # (色、高さ、幅)を (高さ、幅、色)に変換し、0-1に値を制限して表示
    if display:
        img_transformed_ = img_transformed.numpy().transpose((1, 2, 0))
        img_transformed_ = np.clip(img_transformed_, 0, 1)
        plt.imshow(img_transformed_);plt.show()

    # 認識の実施
    inputs = transform(img, phase='val')
    inputs_ = inputs.unsqueeze_(0)
    out = net(inputs_)
    outnp = out.detach().numpy()
    ids = np.argsort( - outnp[0])
    sftmx = softmax(-outnp[0])
    #print(sftmx[ids[0]], sftmx[ids[1]], sftmx[ids[2]])
    #print(np.sort(sftmx)[:5])

    if no == ids[0]:
        print('Hit ', end="")
    else:
        print(colored('Miss', 'red'), end="")

    print(ids[:n_best], end=" ")
    for no in ids[:n_best]:
        print(tlpa.data[no]['Name'], end=" ")
    print(- np.sort(-sftmx)[:n_best])


In [None]:
for i in range(tlpa.__len__()):
#for i in [1,3, 8, 9, 10, 11, 15 ]:
    diagnose(i, display=False, n_best=5)