# アリとハチの画像を分類するモデルを学習し、テストする




本ファイルは、Google Colabでの実行を前提としています

# 事前準備

セットアップのためのコード。「!」でターミナルコマンドを実行している。

In [None]:
!git clone https://github.com/rin-nishikawa/internship.git
%cd "internship"
%cd "1_image_classification"

In [None]:
!ls

In [None]:
# make_folders_and_data_downloads.ipynbの中身を実行
import os
import urllib.request
import zipfile

以下のコードで、dataフォルダの中にhymenoptera_dataフォルダができる事を確認しよう。

In [None]:
data_dir = "./data/"
if not os.path.exists(data_dir):
    os.mkdir(data_dir)

if not os.path.exists("./weights/"):
    os.mkdir("./weights/")

# url = "https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json"
# save_path = os.path.join(data_dir, "imagenet_class_index.json")

# if not os.path.exists(save_path):
#     urllib.request.urlretrieve(url, save_path)


zip_path = os.path.join(data_dir, "catdog.zip")
save_path = os.path.join(data_dir, "catdog/")
# ZIPファイルを読み込み
zip = zipfile.ZipFile(zip_path)
zip.extractall(save_path)  # ZIPを解凍
zip.close()  # ZIPファイルをクローズ
# ZIPファイルを消去
os.remove(zip_path)
os.rename("./data/catdog/test/", "./data/catdog/val/")

# 初期設定

In [None]:
# パッケージのimport
import glob
import os.path as osp
import random
import numpy as np
import json
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision
from torchvision import models, transforms


In [None]:
# 乱数のシードを設定
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("使用デバイス：", device)

# DataSetを作成

## ――――演習１ここから――――

In [None]:
# 入力画像の前処理をするクラス
# 訓練時と推論時で処理が異なる


class ImageTransform():
    """
    画像の前処理クラス。訓練時、検証時で異なる動作をする。
    画像のサイズをリサイズし、色を標準化する。
    訓練時はRandomResizedCropとRandomHorizontalFlipでデータオーギュメンテーションする。


    Attributes
    ----------
    resize : int
        リサイズ先の画像の大きさ。
    mean : (R, G, B)
        各色チャネルの平均値。
    std : (R, G, B)
        各色チャネルの標準偏差。
    """

    def __init__(self, resize, mean, std):
        self.data_transform = {
            'train': transforms.Compose([
                transforms.RandomResizedCrop(resize, scale=(0.5, 1.0)),  # データオーギュメンテーション
                transforms.RandomHorizontalFlip(),  # データオーギュメンテーション
                transforms.ToTensor(),  # テンソルに変換
                transforms.Normalize(mean, std)  # 標準化
            ]),
            'val': transforms.Compose([
                transforms.Resize(resize),  # リサイズ
                transforms.CenterCrop(resize),  # 画像中央をresize×resizeで切り取り
                transforms.ToTensor(),  # テンソルに変換
                transforms.Normalize(mean, std)  # 標準化
            ])
        }

    def __call__(self, img, phase='train'):
        """
        Parameters
        ----------
        phase : 'train' or 'val'
            前処理のモードを指定。
        """
        return self.data_transform[phase](img)


In [None]:
# 訓練時の画像前処理の動作を確認
# 実行するたびに処理結果の画像が変わる

# 1. 画像読み込み
image_file_path = './data/girl3.jpeg'
img = Image.open(image_file_path)   # [高さ][幅][色RGB]

# 2. 元の画像の表示
print("元画像表示")
plt.imshow(img)
plt.show()

# 3. 画像の前処理と処理済み画像の表示
size = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)

transform = ImageTransform(size, mean, std)
img_transformed = transform(img, phase="train")  # torch.Size([3, 224, 224])

# (色、高さ、幅)を (高さ、幅、色)に変換し、0-1に値を制限して表示
img_transformed = img_transformed.numpy().transpose((1, 2, 0))
img_transformed = np.clip(img_transformed, 0, 1)
print("変換後画像表示")
plt.imshow(img_transformed)
plt.show()

## ――――演習１ここまで――――

In [None]:
# アリとハチの画像へのファイルパスのリストを作成する


def make_datapath_list(phase="train"):
    """
    データのパスを格納したリストを作成する。

    Parameters
    ----------
    phase : 'train' or 'val'
        訓練データか検証データかを指定する

    Returns
    -------
    path_list : list
        データへのパスを格納したリスト
    """

    rootpath = "./data/catdog/"
    target_path = osp.join(rootpath+phase+'/**/*.jpg')

    path_list = []  # ここに格納する

    # globを利用してサブディレクトリまでファイルパスを取得する
    for path in glob.glob(target_path):
        path_list.append(path)

    return path_list


# 実行
train_list = make_datapath_list(phase="train")
val_list = make_datapath_list(phase="val")

print(train_list[1])
print(val_list[1])
print(len(train_list))
print(len(val_list))


In [None]:
# アリとハチの画像のDatasetを作成する


class HymenopteraDataset(data.Dataset):
    """
    アリとハチの画像のDatasetクラス。PyTorchのDatasetクラスを継承。

    Attributes
    ----------
    file_list : リスト
        画像のパスを格納したリスト
    transform : object
        前処理クラスのインスタンス
    phase : 'train' or 'test'
        学習か訓練かを設定する。
    """

    def __init__(self, file_list, transform=None, phase='train'):
        self.file_list = file_list  # ファイルパスのリスト
        self.transform = transform  # 前処理クラスのインスタンス
        self.phase = phase  # train or valの指定

    def __len__(self):
        '''画像の枚数を返す'''
        return len(self.file_list)

    def __getitem__(self, index):
        '''
        前処理をした画像のTensor形式のデータとラベルを取得
        '''

        # index番目の画像をロード
        img_path = self.file_list[index]
        img = Image.open(img_path)  # [高さ][幅][色RGB]

        # 画像の前処理を実施
        img_transformed = self.transform(
            img, self.phase)  # torch.Size([3, 224, 224])

        # 画像のラベルをファイル名から抜き出す
        if self.phase == "train":
            label = img_path[20:24]
        elif self.phase == "val":
            label = img_path[18:22]

        # ラベルを数値に変更する
        if label == "cats":
            label = 0
        elif label == "dogs":
            label = 1

        return img_transformed, label


# 実行
train_dataset = HymenopteraDataset(
    file_list=train_list, transform=ImageTransform(size, mean, std), phase='train')

val_dataset = HymenopteraDataset(
    file_list=val_list, transform=ImageTransform(size, mean, std), phase='val')

# 動作確認
index = 0
print(train_list[index])
print(train_dataset.__getitem__(index)[0].size())
print(train_dataset.__getitem__(index)[1])


# DataLoaderを作成

## ――――演習２ここから――――

In [None]:
# ミニバッチのサイズを指定
batch_size = 32

# DataLoaderを作成
train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True)

val_dataloader = torch.utils.data.DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False)

# 辞書型変数にまとめる
dataloaders_dict = {"train": train_dataloader, "val": val_dataloader}

# 動作確認
batch_iterator = iter(dataloaders_dict["train"])  # イテレータに変換
inputs, labels = next(batch_iterator)  # 1番目の要素を取り出す
print(inputs.size())
print(labels)


## ――――演習２ここまで――――

# ネットワークモデルを作成

## ――――演習３ここから――――

In [None]:
# 学習済みのVGG-16モデルをロード
# VGG-16モデルのインスタンスを生成
use_pretrained = False  # 未学習パラメータを使用
####演習####
net = models.vgg16(pretrained=use_pretrained)

# モデルのネットワーク構成を出力
print(net)

In [None]:
# VGG16の最後の出力層の出力ユニットをアリとハチの2つに付け替える
net.classifier[6] = nn.Linear(in_features=4096, out_features=2)

In [None]:
# 訓練モードに設定
net.train()

## ――――演習３ここまで――――

# 損失関数を定義

In [None]:
# 損失関数の設定
criterion = nn.CrossEntropyLoss()

# 最適化手法を設定

In [None]:
####西川による変更####

# # 転移学習で学習させるパラメータを、変数params_to_updateに格納する
# params_to_update = []

# # 学習させるパラメータ名
# update_param_names = ["classifier.6.weight", "classifier.6.bias"]

# # 学習させるパラメータ以外は勾配計算をなくし、変化しないように設定
# for name, param in net.named_parameters():
#     if name in update_param_names:
#         param.requires_grad = True
#         params_to_update.append(param)
#         print(name)
#     else:
#         param.requires_grad = False

# # params_to_updateの中身を確認
# print("-----------")
# print(params_to_update)

########


In [None]:
# 最適化手法の設定

####西川による修正####
# optimizer = optim.SGD(params=params_to_update, lr=0.001, momentum=0.9)
########

optimizer = optim.SGD(params=net.parameters(), lr=0.001, momentum=0.9)


# 学習・検証を実施

In [None]:
# モデルを学習させる関数を作成


def train_model(net, dataloaders_dict, criterion, optimizer, acc_history, num_epochs):

    # GPUが使えるかを確認
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("使用デバイス：", device)

    # ネットワークをGPUへ
    net.to(device)

    # epochのループ
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-------------')

        # epochごとの学習と検証のループ
        for phase in ['train', 'val']:
            if phase == 'train':
                net.train()  # モデルを訓練モードに
            else:
                net.eval()   # モデルを検証モードに

            epoch_loss = 0.0  # epochの損失和
            epoch_corrects = 0  # epochの正解数

            # 未学習時の検証性能を確かめるため、epoch=0の訓練は省略
            if (epoch == 0) and (phase == 'train'):
                continue

            # データローダーからミニバッチを取り出すループ
            for inputs, labels in tqdm(dataloaders_dict[phase]):

                inputs=inputs.to(device)
                labels=labels.to(device)

                # optimizerを初期化
                optimizer.zero_grad()

                # 順伝搬（forward）計算
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = net(inputs)
                    loss = criterion(outputs, labels)  # 損失を計算
                    _, preds = torch.max(outputs, 1)  # ラベルを予測

                    # 訓練時はバックプロパゲーション
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    # イタレーション結果の計算
                    # lossの合計を更新
                    epoch_loss += loss.item() * inputs.size(0)  
                    # 正解数の合計を更新
                    epoch_corrects += torch.sum(preds == labels.data)

            # epochごとのlossと正解率を表示
            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double(
            ) / len(dataloaders_dict[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))
            
            if (epoch == 0) and (phase == 'val'):
              continue
            else:
              acc_history[phase].append(epoch_acc.detach().cpu().numpy())
    return acc_history


In [None]:
# 学習・検証を実行する
num_epochs=1
acc_history = {'train': [], 'val': []}
acc_history = train_model(net, dataloaders_dict, criterion, optimizer, acc_history, num_epochs=num_epochs)

In [None]:
fig = plt.figure()
ax = fig.add_subplot()
ax.plot(acc_history['train'], label='train')
ax.plot(acc_history['val'], label='val')
ax.legend()
ax.set_ylim(0, 1)
fig.show()

# テスト（推論）を実施

In [None]:
net.eval()  # 推論モードに設定
net.to(device)

In [None]:
# ILSVRCのラベル情報をロードし辞書型変数を生成します
ILSVRC_class_index = {'0': ['n02219486', 'cats'],'1': ['n02206856', 'dogs']}

In [None]:
# 出力結果からラベルを予測する後処理クラス
class ILSVRCPredictor():
    """
    ILSVRCデータに対するモデルの出力からラベルを求める。

    Attributes
    ----------
    class_index : dictionary
            クラスindexとラベル名を対応させた辞書型変数。
    """

    def __init__(self, class_index):
        self.class_index = class_index

    def predict_max(self, out):
        """
        確率最大のILSVRCのラベル名を取得する。

        Parameters
        ----------
        out : torch.Size([1, 2])
            Netからの出力。

        Returns
        -------
        predicted_label_name : str
            最も予測確率が高いラベルの名前
        """
        print("アリであると推測："+str(out.detach().cpu().numpy()[0][0]))
        print("ハチであると推測："+str(out.detach().cpu().numpy()[0][1]))
        maxid = np.argmax(out.detach().cpu().numpy())

        predicted_label_name = self.class_index[str(maxid)][1]

        return predicted_label_name


## ――――演習４ここから――――

In [None]:
# ILSVRCPredictorのインスタンスを生成します
predictor = ILSVRCPredictor(ILSVRC_class_index)

# 入力画像を読み込む
####演習####
image_file_path = './data/b.jpeg'
img = Image.open(image_file_path)  # [高さ][幅][色RGB]

# 元の画像の表示
plt.imshow(img)
plt.show()

# 前処理の後、バッチサイズの次元を追加する
resize = 224
transform = ImageTransform(resize, mean, std)  # 前処理クラス作成
img_transformed = transform(img)  # torch.Size([3, 224, 224])
inputs = img_transformed.unsqueeze_(0)  # torch.Size([1, 3, 224, 224])
inputs=inputs.to(device)

# モデルに入力し、モデル出力をラベルに変換する
out = net(inputs)  # torch.Size([1, 1000])
result = predictor.predict_max(out)

# 予測結果を出力する
print("入力画像の予測結果：", result)

## ――――演習４ここまで――――

# 参考
https://github.com/YutaroOgawa/pytorch_advanced , (参照 2023-10-26)  
小川雄太郎. つくりながら学ぶ! PyTorchによる発展ディープラーニング. マイナビ, 2019