# 必要なモジュールのインポート

In [None]:
import torch
import datetime
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from torchvision import transforms, datasets, models
from pathlib import Path
from sklearn.metrics import ConfusionMatrixDisplay

# パラメータの設定
エポック数を変えたい時は`MAX_EPOCH`に代入する値を変更する。

In [None]:
# エポック数
MAX_EPOCH = 50

# 画像の正規化用
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

if torch.backends.mps.is_available():
    device = torch.device('mps')
elif torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

# 訓練用データと評価用データを読み込む`prepare_loader()`関数を定義
画像を短い辺が64ピクセルになるように全体を縮小し、その後中央の64×64ピクセルのみを切り取る。その後、一般的な画像データにおけるR, G, Bの値の平均と標準偏差を使って値の正規化を行う。これにより、学習が進みやすくなる。

データセットから画像を5つずつ（バッチの大きさを5として）ランダムに取り出し、特徴量と教師データを学習モデルに渡す**データローダー**を設定する。

In [None]:
def prepare_loader(data_path):
    # transform定義
    data_transforms = {
        'train': transforms.Compose([
            transforms.Resize(64),
            transforms.CenterCrop(64),
            transforms.ToTensor(),
            transforms.Normalize(mean=mean, std=std),
        ]),
        'val': transforms.Compose([
            transforms.Resize(64),
            transforms.CenterCrop(64),
            transforms.ToTensor(),
            transforms.Normalize(mean=mean, std=std),
        ]),
    }
    image_datasets = {
        x: datasets.ImageFolder(root=data_path/x, transform=data_transforms[x])
        for x in ['train', 'val']}

    torch.manual_seed(42)
    dataloaders = {
        x: DataLoader(image_datasets[x], batch_size=5, shuffle=True)
        for x in ['train', 'val']}

    dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
    class_names = image_datasets['train'].classes

    return dataloaders, dataset_sizes, class_names

# 学習を行う`train_model()`関数を定義

In [None]:
def train_model(model, loss_fn, optimizer, dataloaders, dataset_sizes):
    loss_list = {'train': [], 'val': []}
    acc_list = {'train': [], 'val': []}
    for epoch in range(1, MAX_EPOCH+1):
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0
            for images, labels in dataloaders[phase]:
                images = images.to(device=device)
                labels = labels.to(device=device)
                with torch.set_grad_enabled(phase == 'train'):
                    # モデルで予測を計算
                    outputs = model(images)
                    # 出力で最大値を持つインデックスを取得
                    _, preds = torch.max(outputs, dim=1)
                    # 損失の計算
                    loss = loss_fn(outputs, labels)
                    if phase == 'train':
                        # 勾配情報を削除
                        optimizer.zero_grad()
                        # 微分計算
                        loss.backward()
                        # 勾配を更新
                        optimizer.step()
                # 損失の加算
                running_loss += loss.item() * images.size(0)
                # 正解の加算
                running_corrects += torch.sum(preds == labels.data).item()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]
            loss_list[phase].append(epoch_loss)
            acc_list[phase].append(epoch_acc)
            if epoch == 1 or epoch % 10 == 0:
                if phase == 'train':
                    dt_now = datetime.datetime.now()
                    print(f'{dt_now.strftime("%H:%M:%S")} epoch: {epoch:3d} ', end=' ')
                print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}',
                      end=' ' if phase == 'train' else '\n')

    _, ax = plt.subplots(1, 2, figsize=(10, 5))
    ax[0].plot(range(1, MAX_EPOCH+1), loss_list['train'], color='blue',
               linestyle='-', label='train')
    ax[0].plot(range(1, MAX_EPOCH+1), loss_list['val'], color='orange',
               linestyle='--', label='val')
    ax[0].set_xlabel('epoch')
    ax[0].set_ylabel('loss')
    ax[0].set_title('Loss')
    ax[0].legend()
    
    ax[1].plot(range(1, MAX_EPOCH+1), acc_list['train'], color='blue',
               linestyle='-', label='train')
    ax[1].plot(range(1, MAX_EPOCH+1), acc_list['val'], color='orange',
               linestyle='--', label='val')
    ax[1].set_xlabel('epoch')
    ax[1].set_ylabel('accuracy')
    ax[1].set_title('Accuracy')
    ax[1].legend()
    plt.tight_layout()
    plt.show()

    return model

# 学習モデルの定義と学習方法を設定し、`train_model()`関数で学習を実行させる`training()`関数を定義
画像認識を行うために訓練されたディープニューラルネットワークによる学習モデルである**AlexNet**を準備し、学習済みのパラメータを読み込む。

AlexNetは1,000種類の画像を識別できるように訓練されているので、ネットワークの最後を変更し、3種類の画像の識別ができるように変更する。

**学習率**を0.001と設定して、**最適化関数**にSGD（確率的勾配降下法）を選択する。また、**損失関数**としてクロスエントロピー誤差を設定する。

In [None]:
def training(dataloaders, dataset_sizes, class_names):
    print(f'Training on device {device}')
    
    model = models.alexnet(weights='DEFAULT')
    for param in model.parameters():
        param.requires_grad = False
    num_features = model.classifier[-1].in_features
    model.classifier[-1] = nn.Linear(num_features, len(class_names))
    model = model.to(device)

    # 最適化関数と損失関数の設定
    learning_rate = 0.001
    optimizer = optim.SGD(params=model.parameters(), lr=learning_rate)
    loss_fn = nn.CrossEntropyLoss()

    # 学習
    model = train_model(model, loss_fn, optimizer, dataloaders, dataset_sizes)

    return model

# 学習済みのモデルを受け取って、画像の判別を行う`predict()`関数を定義

In [None]:
def predict(model, dataloader):
    preds = []
    images = []
    labels = []
    model.eval()
    with torch.no_grad():
        for image, label in dataloader:
            image = image.to(device=device)
            label = label.to(device=device)
            outputs = model(image)
            _, pred = torch.max(outputs, dim=1)
            preds.append(pred)
            images.append(image)
            labels.append(label)
    return torch.cat(preds).tolist(), torch.cat(images).to('cpu'), torch.cat(labels).tolist()

# データローダーで渡される画像データを表示して確認できる`check_data()`関数を定義
データローダーから提供される画像データは正規化されているため、元の値に戻す。

2行10列で20個の画像を表示する。

In [None]:
def check_data(dataloader):
    inv_normalize = transforms.Normalize(
        mean=[-m/s for m, s in zip(mean, std)],
        std=[1/s for s in std]
    )
    
    H = 2
    W = 10
    fig = plt.figure(figsize=(W, H))
    fig.subplots_adjust(left=0, right=1, bottom=0, top=1.0, hspace=0.4, wspace=0.4)

    i = 0
    flag = False
    for imgs, labels in dataloader:
        for k in range(imgs.shape[0]):
            img = imgs[k]
            label = labels[k]
            plt.subplot(H, W, i+1)
            img = torch.clamp(inv_normalize(img), 0, 1)
            plt.imshow(img.permute(1, 2, 0))
            plt.title(label.item())
            plt.axis('off')
            i += 1
            if i >= H * W:
                flag = True
                break
        if flag:
            break
    plt.show()

# データの準備
入力データのある場所を`data_dir`で指定し、`prepare_loader()`関数によりデータローダーを作成する。

作成されたデータローダーにある訓練用データと評価用データの画像を表示する。

In [None]:
###
data_dir = 'training'
###

root_dir = Path('./')
data_path = root_dir/data_dir
dataloaders, dataset_sizes, class_names = prepare_loader(data_path)
print(dataset_sizes)
print('訓練用データの例')
check_data(dataloaders['train'])
print('評価用データの例')
check_data(dataloaders['val'])

# 学習の実行
`training()`関数により学習を行い、学習後のモデルを`model`に格納する。

In [None]:
model = training(dataloaders, dataset_sizes, class_names)

# 正確度による学習モデルの精度評価
学習後のモデルと評価用データを`predict()`に渡し、判別結果を受け取る。

正しい判別ができた画像の数を数えて、正解数と正確度を表示する。

In [None]:
preds, images, labels = predict(model, dataloaders['val'])
correct = 0
for pred, y in zip(preds, labels):
    if pred == y:
        correct += 1
print('正解数:', correct, '正確度:', f'{correct/len(labels):.3f}')

# 混同行列による精度評価

In [None]:
ConfusionMatrixDisplay.from_predictions(labels, preds)
plt.show()

# 判別結果の可視化を行う`check_results()`関数を定義

In [None]:
def check_results(Xs, ys, preds, flag):
    inv_normalize = transforms.Normalize(
        mean=[-m/s for m, s in zip(mean, std)],
        std=[1/s for s in std]
    )
    
    H = 4
    W = 10
    fig = plt.figure(figsize=(W, H))
    fig.subplots_adjust(left=0, right=1, bottom=0, top=1.0, hspace=0.4, wspace=0.4)

    i = 0
    for img, label, pred in zip(Xs, ys, preds):
        if (flag == 'success' and label == pred) or (flag == 'failed' and label != pred):
            plt.subplot(H, W, i+1)
            img = torch.clamp(inv_normalize(img), 0, 1)
            plt.imshow(img.permute(1, 2, 0))
            plt.title(f'{label}->{pred}', fontsize=8)
            plt.axis('off')
            i += 1
            if i >= H * W:
                break
    plt.show()

# 判別に成功した画像を表示する

In [None]:
check_results(images, labels, preds, 'success')

# 判別に失敗した画像を表示する

In [None]:
check_results(images, labels, preds, 'failed')