# 画像分類のファインチューニング
## ライブラリのインストール

In [None]:
%pip install pandas plotly torch torchvision kaleido nbformat

In [None]:
import os
from pathlib import Path

import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


## モデルの作成
Resnet50 のモデルを読み込み、最後の全結合層を取り除いて、新しい全結合層を追加する。


In [None]:
def get_model(pretrained: bool = True, state_dict: dict | None = None):
    """モデルの取得

    Args:
        pretrained (bool, optional): 事前学習済みの重みを読み込むか. Defaults to True.
        state_dict (dict | None, optional): _description_. Defaults to None.

    Returns:
        _type_: _description_
    """
    # 事前学習済みのResNetモデルをロード
    model = models.resnet50(pretrained=pretrained)
    # ResNetの最後の全結合層をクラス数に置き換え
    model.fc = nn.Linear(model.fc.in_features, 10)
    if state_dict is not None:
        model.load_state_dict(state_dict)

    # デバイスの選択、GPUが使用可能なら使う
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    # 損失関数の設定
    criterion = nn.CrossEntropyLoss()
    # オプティマイザの設定
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    return model, criterion, optimizer

## 学習の実行関数を作成

In [None]:

import numpy as np
import torch
from torch.utils.data import DataLoader, SubsetRandomSampler
from torchvision import datasets, transforms
from tqdm import tqdm


def get_random_sampler(dataset, subset_size=1000, random_seed=42):
    """データセットからランダムにデータを取得するためのSubsetRandomSamplerを作成する

    Args:
        dataset (_type_): 対象データセット
        subset_size (int, optional): ランダムに抽出するデータ数. Defaults to 1000.
        random_seed (int, optional): seed値. Defaults to 42.

    Returns:
        _type_: _description_
    """
    # データセットのインデックス配列を作成する
    indices = list(range(len(dataset)))
    # インデックスをシャッフルする
    np.random.seed(random_seed)
    np.random.shuffle(indices)
    # シャッフルしたインデックスからsubset_size分だけ取得する
    subset_indices = indices[:subset_size]
    # SubsetRandomSamplerにインデックスを渡すことで、そのインデックスのデータをサンプリングする
    return SubsetRandomSampler(subset_indices)


def train_epoch(model, train_loader, criterion, optimizer, device):
    """1エポック分の学習を行う

    Args:
        model (_type_):
        train_loader (_type_):
        criterion (_type_):
        optimizer (_type_):
        device (_type_):

    Returns:
        _type_: _description_
    """
    # モデルをtrainモードにする
    model.train()
    # 損失を記録する変数を定義
    running_loss = 0.0

    # ミニバッチごとにループを回す
    for images, labels in tqdm(train_loader, total=len(train_loader)):
        images, labels = images.to(device), labels.to(device)

        # 勾配を初期化する
        optimizer.zero_grad()
        # 準伝搬
        outputs = model(images)
        # 損失関数を計算
        loss = criterion(outputs, labels)
        # 逆伝搬
        loss.backward()
        # パラメータ更新
        optimizer.step()

        # ミニバッチの損失を計算し記録する
        running_loss += loss.item()

    # 1エポックあたりの平均損失を計算する
    avg_loss = running_loss / len(train_loader)
    return avg_loss


def validate_epoch(model, val_loader, criterion, device):
    """1エポック分の検証を行う

    Args:
        model (_type_): _description_
        val_loader (_type_): _description_
        criterion (_type_): _description_
        device (_type_): _description_

    Returns:
        _type_: _description_
    """
    # モデルをevalモードにする
    model.eval()
    # 損失を記録する変数を定義
    running_loss = 0.0

    all_output = []
    all_labels = []
    # 勾配計算をしないようにする(推論なので)
    with torch.no_grad():
        # ミニバッチごとにループを回す
        for images, labels in tqdm(val_loader, total=len(val_loader)):
            # デバイスの指定
            images, labels = images.to(device), labels.to(device)
            # 準伝搬
            outputs = model(images)
            all_output.append(outputs)
            all_labels.append(labels)
            # 損失計算
            loss = criterion(outputs, labels)
            # 損失を記録する
            running_loss += loss.item()

    # 1エポックあたりの平均損失を計算する
    avg_loss = running_loss / len(val_loader)
    # テストデータの予測結果を取得する
    all_output = torch.cat(all_output, dim=0).cpu()
    all_labels = torch.cat(all_labels, dim=0).cpu()
    return avg_loss, all_output, all_labels


def get_cifar10_train_test_loader(
    train_samples: int = 1000,
    test_samples: int = 1000,
    resize: tuple[int, int] = (256, 256),
    batch_size: int = 32,
):
    """CIFAR-10データセットの学習データと検証データのDataLoaderを作成する

    Args:
        train_samples (int, optional): _description_. Defaults to 1000.
        test_samples (int, optional): _description_. Defaults to 1000.
        resize (tuple[int, int], optional): _description_. Defaults to (256, 256).
        batch_size (int, optional): _description_. Defaults to 32.

    Returns:
        _type_: _description_
    """
    # 画像を256x156にリサイズして、テンソルに変換する
    transform = transforms.Compose([transforms.Resize(resize), transforms.ToTensor()])

    # 学習データセットの作成
    train_dataset = datasets.CIFAR10(
        root="./data", train=True, download=True, transform=transform
    )
    # データセットからランダムにデータを取得する
    train_sampler = get_random_sampler(train_dataset, train_samples)
    # 学習DataLoaderの作成
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, sampler=train_sampler
    )

    # 検証データセットの作成
    test_dataset = datasets.CIFAR10(
        root="./data", train=False, download=True, transform=transform
    )
    # データセットからランダムにデータを取得する
    test_sampler = get_random_sampler(test_dataset, test_samples)
    # 検証DataLoaderの作成
    test_loader = DataLoader(test_dataset, batch_size=batch_size, sampler=test_sampler)
    return train_loader, test_loader


In [None]:
def run(
    train_samples: int = 1000,
    test_samples: int = 1000,
    pretrained: bool = True,
    num_epochs: int = 50,
):
    # 事前学習済みのResNetモデルをロード
    model, criterion, optimizer = get_model(pretrained=pretrained)

    # データをロードする
    train_loader, test_loader = get_cifar10_train_test_loader(
        train_samples=train_samples, test_samples=test_samples
    )

    # 結果を書き出すフォルダを作成
    result = []
    output_dir = Path(
        "output",
        "classification_cifar10",
        "pretrained" if pretrained else "un_pretrained",
        f"train_samples_{train_samples}",
    )
    os.makedirs(output_dir, exist_ok=True)

    # 学習のループ
    for epoch in range(num_epochs):
        # 学習
        train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
        # 検証
        val_loss, predicted_output, true_label = validate_epoch(
            model, test_loader, criterion, device
        )
        # predicted_output と true_label から accuracy を計算する
        _, predicted_class = torch.max(predicted_output, dim=1)
        # predicted_class と true_label は同じサイズのテンソルであることを確認
        assert predicted_class.size() == true_label.size()
        # 予測が正しかった数を計算
        correct = (predicted_class == true_label).sum().item()
        # テストデータの総数
        total = true_label.size(0)

        # 正解率の計算
        accuracy = correct / total
        # 結果の保存
        result.append(
            {"train_loss": train_loss, "val_loss": val_loss, "accuracy": accuracy}
        )

        # 結果の表示
        print(f"Epoch {epoch+1}/{num_epochs}", result[-1])
        # モデルの保存
        # torch.save(model.state_dict(), output_dir / f"check_point_epoch_{epoch}.pt")

    # 結果の保存
    df_result = pd.DataFrame(result)
    df_result.to_csv(output_dir / "training_curve.csv")

## サンプル数を変えて学習を回す
学習コードができたので、教師データのサンプル数を変えて学習を回してみる。

### 実験の設定
実験対象のサンプル数と、学習エポック数を設定する

In [None]:
# 教師データのサンプル数リスト
# train_samples_list = [100, 500, 1000, 2000, 3000, 4000, 5000]
# 学習エポック数
# num_train_epoch = 10

# 動作確認用の設定
# 教師データのサンプル数リスト
train_samples_list = [100, 500]
# 学習エポック数
num_train_epoch = 10


In [None]:

# いろいろなサンプル数で学習を実施する
for num in train_samples_list:
    # ファインチューニング
    run(train_samples=num, test_samples=1000, pretrained=True, num_epochs=num_train_epoch)
    # ファインチューニングなし
    run(train_samples=num, test_samples=1000, pretrained=False, num_epochs=num_train_epoch)

## 学習結果の確認

In [None]:
def load_result(pretrained: bool = True, train_samples: int = 1000) -> pd.DataFrame:
    """学習の結果を読み込む

    Args:
        pretrained (bool, optional): 事前学習の有無. Defaults to True.
        train_samples (int, optional): 学習データの数. Defaults to 1000.

    Returns:
        _type_: _description_
    """
    # 引数から結果のフォルダを特定
    output_dir = Path(
        "output",
        "classification_cifar10",
        "pretrained" if pretrained else "un_pretrained",
        f"train_samples_{train_samples}",
    )
    # 結果を読み込む
    df_result = pd.read_csv(output_dir / "training_curve.csv", index_col=0)
    return df_result


def plot_result(pretrained: bool = True, train_samples: int = 1000):
    """学習結果をプロットする

    Args:
        pretrained (bool, optional): 事前学習の有無. Defaults to True.
        train_samples (int, optional): 学習データの数. Defaults to 1000.

    Returns:
        _type_: _description_
    """
    # 結果の読み込み
    df = load_result(pretrained=pretrained, train_samples=train_samples)
    # プロット
    title = f"{pretrained=} {train_samples=}"
    fig_loss = px.line(df, y=["train_loss", "val_loss"])
    fig_loss.update_layout(
        title=f"Loss {title}",
        xaxis_title="Epoch",
        yaxis_title="Loss",
        yaxis_range=[0, 5],
    )
    os.makedirs("output/classification_cifar10/figs", exist_ok=True)
    fig_loss.write_image(
        f"output/classification_cifar10/figs/loss_{title.replace(' ', '_')}.png"
    )
    fig_acc = px.line(df, y=["accuracy"])
    fig_acc.update_layout(
        title=f"Accuracy {title}",
        xaxis_title="Epoch",
        yaxis_title="Test Accuracy",
        yaxis_range=[0, 1],
    )
    fig_acc.write_image(
        f"output/classification_cifar10/figs/acc_{title.replace(' ', '_')}.png"
    )

    return fig_loss, fig_acc


def plot_mix_result(train_samples: int = 1000):
    """事前学習ありなしを比較する

    Args:
        train_samples (int, optional): 学習データの数. Defaults to 1000.

    Returns:
        _type_: _description_
    """
    # 事前学習あり
    df_fine = load_result(pretrained=True, train_samples=train_samples)
    # 事前学習なし
    df_no_fine = load_result(pretrained=False, train_samples=train_samples)

    # プロット
    fig = go.Figure()
    fig.add_scatter(x=df_fine.index, y=df_fine["accuracy"], name="fine tuning accuracy")
    fig.add_scatter(
        x=df_fine.index, y=df_no_fine["accuracy"], name="no fine tuning accuracy"
    )
    title = f"{train_samples=}"
    fig.update_layout(
        title=f"Accuracy {title}",
        xaxis_title="Epoch",
        yaxis_title="Accuracy",
        yaxis_range=[0, 1],
    )
    fig.write_image(f"output/classification_cifar10/figs/acc_fine_{title}.png")

    return fig

In [None]:
# 各学習結果について、プロットを作成する
for samples in train_samples_list:
    plot_result(pretrained=True, train_samples=samples)
    plot_result(pretrained=False, train_samples=samples)
    plot_mix_result(train_samples=samples)

### 学習結果をノートブックで確認する

In [None]:
plot_result(pretrained=True, train_samples=train_samples_list[0])[0]
plot_result(pretrained=False, train_samples=train_samples_list[0])[0]

## 教師データサンプル数と精度の変化を可視化する


In [None]:
# データ量変更実験の可視化
pretrain_max_list = []
no_pretrain_max_list = []
for samples in train_samples_list:
    pretrain_max_list.append(
        load_result(pretrained=True, train_samples=samples)["accuracy"].max()
    )
    no_pretrain_max_list.append(
        load_result(pretrained=False, train_samples=samples)["accuracy"].max()
    )
df_max = pd.DataFrame(
    {
        "pretrain_max": pretrain_max_list,
        "no_pretrain_max": no_pretrain_max_list,
        "train_samples": train_samples_list,
    },
    index=range(7),
)
fig = px.scatter(df_max, y=["pretrain_max", "no_pretrain_max"], x="train_samples")
fig.update_layout(
    title="学習サンプルサイズと精度",
    xaxis_title="Train Samples",
    yaxis_title="Accuracy",
    yaxis_range=[0, 1],
)
fig.write_image("output/classification_cifar10/figs/acc_max.png")
fig