# Mount

In [None]:
def mount(gpu_info=True, ram_info=True):
    # Googleドライブをマウント
    from google.colab import drive
    drive.mount('/content/drive')

    if gpu_info:
        # GPUの割り当てを確認
        gpu_info = !nvidia-smi
        gpu_info = '\n'.join(gpu_info)
        if gpu_info.find('failed') >= 0:
            print('Not connected to a GPU')
        else:
            print(gpu_info)

    if ram_info:
        # 使用可能なメモリ量を確認
        from psutil import virtual_memory
        ram_gb = virtual_memory().total / 1e9
        print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

        if ram_gb < 20:
            print('Not using a high-RAM runtime')
        else:
            print('You are using a high-RAM runtime!')

# Installing module

In [None]:
# インストール
!pip install -q pytorch_lightning
!pip install -q torchmetrics
!pip install --upgrade -q wandb

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m776.9/776.9 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m806.1/806.1 kB[0m [31m53.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m22.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m190.6/190.6 kB[0m [31m23.6 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m254.1/254.1 kB[0m [31m28.8 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.7/62.7 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import os
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathlib import Path
from PIL import Image
import copy

# Pytorch
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torchvision
import torchvision.transforms as T

# PytorchLightning
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from torchmetrics import Accuracy
from pytorch_lightning.callbacks import Callback
from pytorch_lightning import seed_everything
from pytorch_lightning.loggers import WandbLogger

# ignore warning
import warnings
warnings.filterwarnings("ignore")

# Create Data

* データはWandBのアーティファクトを使用

In [None]:
def download_data():
    # v3は教師データ増の為にdfのカラム増やしており、念のためv2のままとする（元々v2のバージョンでやっていた）
    processed_data_at = wandb.use_artifact(f'{PROCESSED_DATA_AT}:v2', type='split_data')
    processed_dataset_dir = Path(processed_data_at.download())
    # v0
    processed_data_at_v0 = wandb.use_artifact(f'{PROCESSED_DATA_AT}:v0', type='split_data')
    processed_dataset_dir_v0 = Path(processed_data_at_v0.download())

    return processed_dataset_dir, processed_dataset_dir_v0

In [None]:
def get_df(config, processed_dataset_dir, processed_dataset_dir_v0, data_reduction=False):
    df = pd.read_csv(processed_dataset_dir_v0 / 'data_split.csv')

    # 画像パスの割り当て
    df["image_fname"] = [processed_dataset_dir/f'images/{f}' for f in df.File_Name.values]

    # ラベルの割り当て
    def get_label(row):
        for key, value in STL10_CLASSES.items():
            if row[value] == 1:
                return key  # 整数のキーを返す
        return None

    df['label'] = df.apply(get_label, axis=1)


    """
    データを削減する（動作確認の時用）
    """
    if data_reduction:
        # train, valid, testごとにランダムに batch_size*3ずつ選択
        df_train = df[df['Split_add_valid'] == 'train'].sample(n=config["batch_size"]*3, random_state=1)
        df_valid = df[df['Split_add_valid'] == 'valid'].sample(n=config["batch_size"]*3, random_state=1)
        df_test = df[df['Split_add_valid'] == 'test'].sample(n=config["batch_size"]*3, random_state=1)

        # 選択した行を結合
        df = pd.concat([df_train, df_valid, df_test]).reset_index(drop=True)

    return df

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform
        self.classes = [STL10_CLASSES[i] for i in range(len(STL10_CLASSES))]

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_name = self.df.iloc[idx, -2]  # 画像のパス
        image = Image.open(img_name).convert('RGB')

        label = self.df.iloc[idx, -1]  # ラベル

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
def create_transforms(img_size, mean, std):
    return {
        "train": T.Compose(
            [
                T.RandomResizedCrop(img_size, scale=(0.8, 1.0)),
                T.RandomHorizontalFlip(),
                T.ToTensor(),
                T.Normalize(mean=mean, std=std),
            ]
        ),
        "valid": T.Compose(
            [
                T.ToTensor(),
                T.Normalize(mean=mean, std=std),
            ]
        ),
        "test": T.Compose(
            [
                T.ToTensor(),
                T.Normalize(mean=mean, std=std),
            ]
        )
    }

In [None]:
def print_dataloader_info(dl, name="DataLoader"):
    print(f"--- {name} ---")
    print(f"Number of batches: {len(dl)}")
    print(f"Total number of items: {len(dl.dataset)}")
    print("---------------")

def show_batch_with_title(dl, title, max_n=16, figsize=(5, 5)):
    print(title)
    images, _ = next(iter(dl))
    grid_img = torchvision.utils.make_grid(images[:max_n], nrow=4, normalize=True)
    plt.figure(figsize=figsize)
    plt.imshow(grid_img.permute(1, 2, 0))
    plt.title(title)
    plt.show()

In [None]:
def create_dataset_and_loader(df, split_type, batch_size, transforms, num_workers=0, show_info=False, show_images=False):
    dataset = CustomDataset(df[df['Split_add_valid'] == split_type], transform=transforms[split_type])
    shuffle = True if split_type == 'train' else False
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)

    # データローダーの情報表示
    if show_info:
        print_dataloader_info(loader, f"{split_type.capitalize()} Data Loader")

    # バッチ画像の表示
    if show_images:
        show_batch_with_title(loader, f"{split_type.capitalize()} Batch Images")

    return loader

# Model module

## Multi Head Attention

In [None]:
class SelfAttention(nn.Module):
    '''
    自己アテンション
    dim_hidden: 入力特徴量の次元
    num_heads : マルチヘッドアテンションのヘッド数
    qkv_bias  : クエリなどを生成する全結合層のバイアスの有無
    '''
    def __init__(self, dim_hidden: int, num_heads: int,
                 qkv_bias: bool=False):
        super().__init__()

        # 特徴量を各ヘッドのために分割するので、
        # 特徴量次元をヘッド数で割り切れるか検証
        assert dim_hidden % num_heads  == 0

        self.num_heads = num_heads

        # ヘッド毎の特徴量次元
        dim_head = dim_hidden // num_heads

        # ソフトマックスのスケール値
        self.scale = dim_head ** -0.5 # （2乗根の逆数、ViT入門では2乗根）

        # ヘッド毎にクエリ、キーおよびバリューを生成するための全結合層
        self.proj_in = nn.Linear(
            dim_hidden, dim_hidden * 3, bias=qkv_bias) # 3はq, k, vの3つ分か

        # 各ヘッドから得られた特徴量を一つにまとめる全結合層
        self.proj_out = nn.Linear(dim_hidden, dim_hidden)

        # アテンションの勾配とマップ（アテンションの値（類似度））格納用
        self.attn_gradients = None
        self.attention_map = None

    def save_attn_gradients(self, attn_gradients):
        self.attn_gradients = attn_gradients

    def get_attn_gradients(self):
        return self.attn_gradients

    def save_attention_map(self, attention_map):
        self.attention_map = attention_map

    def get_attention_map(self):
        return self.attention_map

    '''
    順伝播関数
    x: 入力特徴量, [バッチサイズ, 特徴量数, 特徴量次元]
    '''
    def forward(self, x: torch.Tensor, register_hook=False):
        # print("x.requires_grad:", x.requires_grad)

        bs, ns = x.shape[:2]

        qkv = self.proj_in(x) # 線形変換 + 特徴量の数を3倍？

        # view関数により
        # [バッチサイズ, 特徴量数, QKV, ヘッド数, ヘッドの特徴量次元]
        # permute関数により
        # [QKV, バッチサイズ, ヘッド数, 特徴量数, ヘッドの特徴量次元]
        qkv = qkv.view(
            bs, ns, 3, self.num_heads, -1).permute(2, 0, 3, 1, 4)

        # クエリ、キーおよびバリューに分解
        q, k, v = qkv.unbind(0)

        # クエリとキーの行列積とアテンションの計算（今回マスクは不使用）
        # attnは[バッチサイズ, ヘッド数, 特徴量数, 特徴量数]
        attn = q.matmul(k.transpose(-2, -1))
        attn = (attn * self.scale).softmax(dim=-1)

        # attnテンソルが勾配を持つかどうか確認
        # print("attn.requires_grad:", attn.requires_grad)

        # アテンションマップを保存
        self.save_attention_map(attn)

        # register_hookがTrueなら、アテンションの勾配を保存するフックを登録。
        # print("register_hook: ", register_hook)
        if register_hook:
            attn.register_hook(self.save_attn_gradients)

        # アテンションとバリューの行列積によりバリューを収集
        # xは[バッチサイズ, ヘッド数, 特徴量数, ヘッドの特徴量次元]
        x = attn.matmul(v)

        # permute関数により
        # [バッチサイズ, 特徴量数, ヘッド数, ヘッドの特徴量次元]
        # flatten関数により全てのヘッドから得られる特徴量を連結して、
        # [バッチサイズ, 特徴量数, ヘッド数 * ヘッドの特徴量次元]
        x = x.permute(0, 2, 1, 3).flatten(2)
        x = self.proj_out(x) # 線形変換

        return x

## FNN

In [None]:
class FNN(nn.Module):
    '''
    Transformerエンコーダ内の順伝播型ニューラルネットワーク
    dim_hidden     : 入力特徴量の次元
    dim_feedforward: 中間特徴量の次元
    '''
    def __init__(self, dim_hidden: int, dim_feedforward: int):
        super().__init__()

        self.linear1 = nn.Linear(dim_hidden, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, dim_hidden)
        self.activation = nn.GELU()

    '''
    順伝播関数
    x: 入力特徴量, [バッチサイズ, 特徴量数, 特徴量次元]
    '''
    def forward(self, x: torch.Tensor):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)

        return x

## TransformerEncoder

In [None]:
class TransformerEncoderLayer(nn.Module):
    '''
    Transformerエンコーダ層
    dim_hidden     : 入力特徴量の次元
    num_heads      : ヘッド数
    dim_feedforward: 中間特徴量の次元
    '''
    def __init__(self, dim_hidden: int,
                 num_heads: int,
                 dim_feedforward: int):
        super().__init__()

        self.attention = SelfAttention(dim_hidden, num_heads)
        self.fnn = FNN(dim_hidden, dim_feedforward)

        self.norm1 = nn.LayerNorm(dim_hidden)
        self.norm2 = nn.LayerNorm(dim_hidden)

    '''
    順伝播関数
    x: 入力特徴量, [バッチサイズ, 特徴量数, 特徴量次元]
    '''
    def forward(self, x: torch.Tensor, register_hook=False):
        x = self.norm1(x)
        x = self.attention(x, register_hook=register_hook) + x
        x = self.norm2(x)
        x = self.fnn(x) + x

        return x

## Vision Transformer

In [None]:
class VisionTransformer(nn.Module):
    '''
    Vision Transformer
    num_classes    : 分類対象の物体クラス数
    img_size       : 入力画像の大きさ(幅と高さ等しいことを想定)
    patch_size     : パッチの大きさ(幅と高さ等しいことを想定)
    dim_hidden     : 入力特徴量の次元
    num_heads      : マルチヘッドアテンションのヘッド数
    dim_feedforward: FNNにおける中間特徴量の次元
    num_layers     : Transformerエンコーダの層数
    '''
    def __init__(self, num_classes: int,
                 img_size: int,
                 patch_size: int,
                 dim_hidden: int,
                 num_heads: int,
                 dim_feedforward: int,
                 num_layers: int):
        super().__init__()

        # 画像をパッチに分解するために、画像の大きさがパッチの大きさで割り切れるか確認
        assert img_size % patch_size == 0

        self.img_size = img_size
        self.patch_size = patch_size

        # パッチの行数と列数はともにimg_size // patch_sizeであり、
        # パッチ数はその2乗になる
        num_patches = (img_size // patch_size) ** 2

        # パッチ特徴量はパッチを平坦化することにより生成されるため
        # その次元はpatch_size * patch_size * 3（RGBチャネル）
        dim_patch = 3 * patch_size ** 2

        # パッチ特徴量をTransformerエンコーダーに入力する前に
        # パッチ特徴量の次元を変換する全結合層
        self.patch_embed = nn.Linear(dim_patch, dim_hidden)

        # 位置埋め込み（パッチ数 + クラス埋め込みの分を用意）
        self.pos_embed = nn.Parameter(
            torch.zeros(1, num_patches + 1, dim_hidden))

        # クラス埋め込み
        self.class_token = nn.Parameter(
            torch.zeros((1, 1, dim_hidden))
        )

        # Transformerエンコーダ層
        self.layers = nn.ModuleList([TransformerEncoderLayer(
            dim_hidden, num_heads, dim_feedforward
        ) for _ in range(num_layers)])

        # ロジット（ニューロンの出力値）を生成する前のレイヤー正規化と全結合
        self.norm = nn.LayerNorm(dim_hidden)
        self.linear = nn.Linear(dim_hidden, num_classes)

    '''
    順伝播関数
    x           ： 入力, [バッチサイズ, 入力チャネル数, 高さ, 幅]
    return_embed： 特徴量を返すかロジットを返すかを選択する真偽値
    '''
    def forward(self, x: torch.Tensor,
                return_embed: bool=False,
                register_hook=False):
        bs, c, h, w = x.shape

        # 入力画像の大きさがクラス生成時に指定したimg_sizeと
        # 合致しているか確認
        assert h == self.img_size and w == self.img_size

        # 高さ軸と幅軸をそれぞれパッチ数 * パッチの大きさに分解し、
        # [バッチサイズ, チャネル数, パッチの行数, パッチの大きさ,
        # パッチの列数, パッチの大きさ] の形にする
        x = x.view(bs, c, h // self.patch_size, self.patch_size,
                w // self.patch_size, self.patch_size)

        # permute関数により、
        # [バッチサイズ, パッチ行数, パッチ列数, チャネル,
        #                   パッチの大きさ, パッチの大きさ] の形にする
        x = x.permute(0, 2, 4, 1, 3, 5)

        # パッチを平坦化
        # permute関数適用後にはメモリ上のデータ配置の整合性の関係で
        # view関数を使えないのでreshape関数を使用
        x = x.reshape(
            bs, (h // self.patch_size) * (w // self.patch_size), -1)

        x = self.patch_embed(x)

        # クラス埋め込みをバッチサイズ分用意
        class_token = self.class_token.expand(bs, -1, -1)

        x = torch.cat((class_token, x), dim=1)

        x += self.pos_embed

        # Transformerエンコーダ層を適用
        # print("register_hook", register_hook)
        for layer in self.layers:
            x = layer(x, register_hook=register_hook)

        # クラス埋め込みをベースとした特徴量を抽出
        x = x[:, 0]

        x = self.norm(x)

        if return_embed: # Trueならロジットを返す
            return x

        x = self.linear(x)

        return x

# LightningModule

## data

In [None]:
class PLDataModule(pl.LightningDataModule):
    def __init__(self, train_loader, val_loader, test_loader):
        super().__init__()
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.test_loader = test_loader

    def train_dataloader(self):
        return self.train_loader

    def val_dataloader(self):
        return self.val_loader

    def test_dataloader(self):
        return self.test_loader

## lightning

In [None]:
class PLViTModule(pl.LightningModule):
    def __init__(self, config):
        super(PLViTModule, self).__init__()

        self.lr = config['lr']
        self.num_classes = config['num_classes']
        self.img_size = config['img_size']
        self.patch_size = config['patch_size']
        self.dim_hidden = config['dim_hidden']
        self.num_heads = config['num_heads']
        self.dim_feedforward = config['dim_feedforward']
        self.num_layers = config['num_layers']
        self.loss_fn = nn.CrossEntropyLoss()
        self.accuracy = Accuracy(num_classes=self.num_classes, task="multiclass")
        self.class_accuracy = torch.nn.ModuleList([Accuracy(num_classes=self.num_classes, task="multiclass") for _ in range(self.num_classes)])
        self.class_names = [STL10_CLASSES[i] for i in range(len(STL10_CLASSES))]
        self.save_test_predictions = config['save_test_predictions']
        self.test_predictions = []

        # モデル定義
        self.models = VisionTransformer(num_classes = self.num_classes,
                                        img_size = self.img_size,
                                        patch_size = self.patch_size,
                                        dim_hidden = self.dim_hidden,
                                        num_heads = self.num_heads,
                                        dim_feedforward = self.dim_feedforward,
                                        num_layers = self.num_layers)

        # ハイパーパラメーターをself.hparamsに保存する (W&Bによって自動ロギングされる)
        self.save_hyperparameters()

    def forward(self, x: torch.Tensor, return_embed: bool = False, register_hook=False):
        output = self.models(x, register_hook=register_hook)
        return output

    def training_step(self, batch, batch_idx):
        images, target = batch
        # print(images.shape, target)
        preds = self.forward(images)
        # クロスエントロピー、これでモデルを更新
        loss = self.loss_fn(preds, target)

        # Accuracyの計算
        self.accuracy(preds, target)
        # validでは不要だが、training_stepではこうしないとwandbにaccuracyが保存されない
        current_accuracy = self.accuracy.compute()

        # ログを記録(self.logはPyTorch Lightninのメソッド)
        self.log("01_loss/train", loss, prog_bar=True, logger=True, on_epoch=True, on_step=False)
        self.log("02_metrics/accuracy_train", current_accuracy, prog_bar=False, logger=True, on_epoch=True, on_step=False)

        return {"loss": loss}  # このlossを基にモデルが更新される

    def validation_step(self, batch, batch_idx):
        images, target = batch
        # print(images.shape, target)
        preds = self.forward(images)
        loss = self.loss_fn(preds, target)
        self.accuracy(preds, target)

        # ログを記録
        self.log("01_loss/valid", loss, prog_bar=True, logger=True, on_epoch=True, on_step=False)
        self.log("02_metrics/accuracy_valid", self.accuracy, prog_bar=False, logger=True, on_epoch=True, on_step=False)

        return {"valid_loss": loss}

    def test_step(self, batch, batch_idx):
        images, targets = batch
        preds = self.forward(images)
        loss = self.loss_fn(preds, targets)
        self.accuracy(preds, targets)

        # ログを記録
        self.log("01_loss/test", loss, prog_bar=True, logger=True, on_epoch=True, on_step=False)
        self.log("02_metrics/accuracy_test", self.accuracy, prog_bar=False, logger=True, on_epoch=True, on_step=False)

        pred_labels = preds.argmax(dim=1)
        class_probabilities = preds.softmax(dim=1)

        # 画像と予測結果を記録
        for i in range(images.size(0)):
            self.test_predictions.append({
                "images": images[i],
                "pred_labels": pred_labels[i],
                "true_labels": targets[i],
                "class_probabilities": class_probabilities[i],
            })

        # ここで全体の精度を計算
        self.accuracy(preds, targets)

        # クラスごとの精度を計算
        for i in range(self.num_classes):
            class_mask = targets == i
            class_preds = preds[class_mask]
            class_targets = targets[class_mask]
            if class_targets.nelement() != 0:  # クラスに対するデータがある場合のみ計算
                self.class_accuracy[i](class_preds, class_targets)

        return {"test_loss": loss}

    def on_test_epoch_end(self):
        if self.save_test_predictions:
          # wandbテーブルにカラムを追加（"image"カラムも含む）
          columns = ["image", "pred", "truth"] + self.class_names
          test_table = wandb.Table(columns=columns)

          # test_predictionsに保存されたデータを使ってwandb.Tableを作成
          for output in self.test_predictions:
              # wandb.Imageを使用して画像データをテーブルに追加
              image_data = wandb.Image(output["images"].cpu())  # GPU上の画像データをCPUに移動
              pred_label_index = output["pred_labels"].item()
              true_label_index = output["true_labels"].item()

              # クラスインデックスをクラス名に変換
              pred_label_name = STL10_CLASSES[pred_label_index]
              true_label_name = STL10_CLASSES[true_label_index]

              class_probabilities = output["class_probabilities"].tolist()

              # wandbテーブルにデータを追加
              row = [image_data, pred_label_name, true_label_name] + class_probabilities
              test_table.add_data(*row)

          # wandbにテーブルをログとして保存（wandb.logは、カスタムデータ（画像、テーブル、グラフなど）をWandBに記録するのに特化）
          wandb.log({"Test Predictions": test_table})

          # テスト予測データをクリア
          self.test_predictions.clear()

        # 全体の精度を計算
        overall_accuracy = self.accuracy.compute()
        # クラスごとの精度をリストに保存
        class_accuracies = [self.class_accuracy[i].compute() for i in range(self.num_classes)]

        # Weights & Biases の棒グラフを作成するためのデータを準備
        data = [["Overall", overall_accuracy]] + [[self.class_names[i], class_accuracies[i]] for i in range(len(self.class_names))]

        # Weights & Biases 用のテーブルオブジェクトを作成
        accuracy_table = wandb.Table(data=data, columns=["Class", "Accuracy"])

        # 棒グラフを作成してログに記録
        wandb.log({"Class-wise Test Accuracies": wandb.plot.bar(accuracy_table, "Class", "Accuracy", title="Class-wise Test Accuracies")})

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.models.parameters(), lr=self.lr)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)
        return {"optimizer": optimizer, "lr_scheduler": scheduler}

## callback

In [None]:
class PLCallback(Callback):
    def __init__(self,
                 config,
                 num_samples=32,
                 epoch_interval=10,
                 tsne_img_path=None,
                 data_module=None):

        self.channel_mean = config['channel_mean']
        self.channel_std = config['channel_std']
        self.tsne_samples = config['tsne_samples']

        self.num_samples = num_samples  # ログに記録するサンプル数（バッチサイズ以上にすること）
        self.epoch_interval = epoch_interval  # エポックの間隔を指定
        self.tsne_img_path = tsne_img_path
        self.data_module = data_module

    def on_validation_epoch_end(self, trainer, pl_module):
        # epoch_intervalごとに処理を行う
        if trainer.current_epoch % self.epoch_interval == 0:
            # モデルの取得
            model = pl_module
            # 検証データローダー
            dataloader = trainer.val_dataloaders

            """サンプル画像と正解ラベルを取得"""
            batch = next(iter(dataloader))
            x, y = batch

            device = model.device
            x, y = x.to(device), y.to(device)
            x.requires_grad_()  # 勾配を追跡

            # サンプル数に応じてデータを取得
            images = [img for img in x[:self.num_samples]]

            def denormalize(image, mean, std):
                mean = torch.tensor(mean).view(3, 1, 1).to(image.device)
                std = torch.tensor(std).view(3, 1, 1).to(image.device)
                return image * std + mean

            # 元画像の逆正規化
            images = [denormalize(img, self.channel_mean, self.channel_std) for img in images]

            """t-SNEとTE用にモデルを保存してロード"""
            vee_model = copy.deepcopy(pl_module)

            te_images_with_captions = []

            """Transformer-Explainability"""
            # 予測値の取得
            preds = [vee_model(img.unsqueeze(0).cuda()) for img in images]

            with torch.set_grad_enabled(True):
                # TE実行
                te = Transformer_Explainability.Transformer_Explainability(model=vee_model, cls_to_idx=STL10_CLASSES)
                # サンプル画像の枚数分行う
                for original_img, pred in zip(images, preds):
                    # te_imageは0-255の範囲
                    te_image = te.generate_visualization(original_img)

                    # te_imageをテンソルに変換し、次元の順序を変更し、範囲を0.0〜1.0に変更
                    te_image_tensor = torch.tensor(te_image).to(original_img.device).float() / 255.0
                    te_image_tensor = te_image_tensor.permute(2, 0, 1)

                    # 元の画像とte_imageを横に並べて結合
                    combined_image = torch.cat((original_img, te_image_tensor), dim=2)  # dim=2は横方向に結合するため

                    # wandbはnumpy配列を受け取るので、テンソルをnumpy配列に変換
                    combined_image_np = combined_image.detach().cpu().numpy().transpose(1, 2, 0)  # CHW -> HWC

                    # 予測結果の文字列取得
                    top_str = te.print_top_classes(pred)
                    # wandb保存用のリスト
                    te_images_with_captions.append((combined_image_np, top_str))

            # 画像とそのキャプションを組み合わせてwandbにログ
            wandb.log({"te_images": [wandb.Image(image, caption=caption) for image, caption in te_images_with_captions]})

            """t-SNE"""
            util.plot_t_sne(self.data_module.test_dataloader(),
                            vee_model,
                            self.tsne_samples,
                            vee_model.device,
                            self.tsne_img_path)

            # t-SNEプロットをログ
            wandb.log({"t-SNE Plot (Valid End ※テストデータ使用)": [wandb.Image(plt.imread(self.tsne_img_path))]})

# Train

In [None]:
def train(config):
    seed_everything(config["seed"])

    # cudnnの決定性を保証（精度下がる可能性あり）
    # torch.backends.cudnn.deterministic = True
    # torch.backends.cudnn.benchmark = False

    # wandbの初期化
    run = wandb.init(project=WANDB_PROJECT,
                 entity=None,
                 job_type=WANDB_JOBTYPE,
                 group=WANDB_GROUP,
                 # name=WANDB_NAME,
                 config=config)

    # PyTorchLightningとwandbを統合
    wandb_logger = WandbLogger(project=WANDB_PROJECT,
                               config=config,
                               job_type=WANDB_JOBTYPE,
                               )

    # データのダウンロードとDataFrameの取得
    processed_dataset_dir, processed_dataset_dir_v0 = download_data()
    df = get_df(config, processed_dataset_dir, processed_dataset_dir_v0, data_reduction=config["data_reduction"])

    channel_mean, channel_std = config["channel_mean"], config["channel_std"]
    transforms = create_transforms(config["img_size"], channel_mean, channel_std)

    train_loader = create_dataset_and_loader(df, 'train', config["batch_size"], transforms, num_workers=config['num_workers'], show_info=True, show_images=False)
    val_loader = create_dataset_and_loader(df, 'valid', config["batch_size"], transforms, num_workers=config['num_workers'], show_info=True, show_images=False)
    test_loader = create_dataset_and_loader(df, 'test', config["batch_size"], transforms, num_workers=config['num_workers'], show_info=True, show_images=False)

    data_module = PLDataModule(train_loader, val_loader, test_loader)
    model = PLViTModule(config)

    # wandb.watchの実行有無を設定
    if config.get("use_wandb_watch", False):
        wandb_logger.watch(model, log="all", log_freq=config["log_freq"])

    # callback時のt-SNEプロットの保存先
    tsne_img_path = config["val_epoch_end_dir"] + "vee_tsne.png"

    # val_epoch_endディレクトリの存在確認と作成
    val_epoch_end_dir = config["val_epoch_end_dir"]
    if not os.path.exists(val_epoch_end_dir):
        os.makedirs(val_epoch_end_dir)

    # checkpointディレクトリの存在確認と作成
    checkpoint_dir = config["save_dir"]
    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)

    model_checkpoint = ModelCheckpoint(
        filename = WANDB_GROUP + "_" + run.name + "_{epoch}", # モデルファイルの名前
        dirpath = config["save_dir"],           # モデルを保存するディレクトリのパス
        monitor = "01_loss/valid",              # 監視する指標
        mode = "min",                           # 最小化を目指す
        save_top_k = 1,                         # 最良のK個のモデルを保存
        save_last = False,                      # 最後のエポックのモデルを保存
    )

    early_stopping = EarlyStopping(
        monitor="01_loss/valid",
        mode="min",
        patience=config["patience"],
    )

    # callbackの初期化
    callbacks = [model_checkpoint, early_stopping]

    # PLCallbackを使用するかどうかの設定
    if config.get("use_pl_callback", False):
        pl_callback = PLCallback(
            config,
            num_samples=32,
            epoch_interval=config["val_epoch_interval"],
            tsne_img_path=tsne_img_path,
            data_module=data_module
        )
        callbacks.append(pl_callback)

    # Trainer
    trainer = pl.Trainer(
        max_epochs=config['num_epochs'],
        accelerator="auto",
        precision=config['precision'],
        callbacks=callbacks,
        logger=[wandb_logger],
        deterministic=True,  # モデルの再現性確保（非決定的な機能を制御）
    )

    # 学習実行
    trainer.fit(
        model,
        datamodule=data_module,
    )

    # テスト実行
    if config.get("run_test", False):
        ## ベストモデルのパラメータ（重み）を自動的に読み込む
        trainer.test(dataloaders=data_module.test_dataloader())

    # Close wandb run
    wandb.finish()

# Config

In [None]:
base_dir = r"drive/MyDrive/wandb_mlops/"

'''
Weights and Biasesの設定
'''
WANDB_PROJECT = "mlops-001"           # プロジェクトの名前
WANDB_GROUP = "ViT"                   # グループの名前
# WANDB_NAME = "Sweeps_best"     # 学習時の名前
WANDB_JOBTYPE = "train"
RAW_DATA_AT = 'STL10'
PROCESSED_DATA_AT = 'STL10_split'
STL10_CLASSES = {i:c for i,c in enumerate(['airplane', 'bird', 'car', 'cat', 'deer', 'dog', 'horse', 'monkey', 'ship', 'truck'])}

In [None]:
# default
train_config = dict(
    save_dir = base_dir + "checkpoints/",               # モデル保存先
    val_epoch_end_dir = base_dir + "val_epoch_end/",    # on_validation_epoch_endで使うモデルの保存先

    data_reduction = False,                 # データをbatch_size*3に削減する
    use_pl_callback = False,                # PLCallbackの使用有無（Transformer-Explainability, t-SNE）
    use_wandb_watch = False,                # wandb.watchの使用有無
    run_test = True,                        # testの実行有無
    save_test_predictions = False,           # テスト予測データの保存有無

    num_epochs = 50,                        # 学習エポック数 30
    val_epoch_interval = 10,                # on_validation_epoch_end（TEとt-SNE）を実行するエポックの間隔
    patience = 20,                          # early_stoppingのpatience
    batch_size = 128,                       # バッチサイズ

    num_classes = 10,                       # データセットのクラス数
    num_workers = 4,                        # データローダに使うCPUプロセスの数
    tsne_samples = 1000,                    # t-SNEでプロットするサンプル数
    precision = 16,                         # 32

    lr = 1e-4,                              # 学習率
    img_size = 96,                          # 入力画像の大きさ
    patch_size = 8,                         # パッチサイズ
    dim_hidden = 512,                       # 隠れ層の次元
    num_heads = 8,                          # マルチヘッドアッテンションのヘッド数
    dim_feedforward = 512,                  # Transformerエンコーダ層内のFNNにおける隠れ層の特徴量次元
    num_layers = 6,                         # Transformerエンコーダの層数
    seed=42,
    log_freq = 100,                          # wandb.watch()で保存するstepの間隔（デフォルト:100）
    channel_mean = [0.4467, 0.4398, 0.4066], # STL10データセットの平均
    channel_std = [0.2603, 0.2566, 0.2713]   # STL10データセットの標準偏差
)

# Run

In [None]:
# Google Driveのマウント
mount(gpu_info=True, ram_info=True)

Mounted at /content/drive
Sat Dec 16 07:26:32 2023       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  NVIDIA A100-SXM4-40GB          Off | 00000000:00:04.0 Off |                    0 |
| N/A   31C    P0              43W / 400W |      2MiB / 40960MiB |      0%      Default |
|                                         |                      |             Disabled |
+-----------------------------------------+----------------------+----------------------+
                                          

In [None]:
import sys
sys.path.append(base_dir)

# t-SNE
import util
# Transformer_Explainability
import Transformer_Explainability

In [None]:
# Weights and Biases
import wandb
!wandb login

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter, or press ctrl+c to quit: 
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


In [None]:
# train(train_config)

# Sweep

In [None]:
def train_wrapper():
    with wandb.init() as run:
        wandb_config = run.config

        # 既存のtrain_configのコピーを作成
        train_config_updated = train_config.copy()
        # Sweepで変更されるすべてのパラメータを更新
        train_config_updated['lr'] = wandb_config.lr
        train_config_updated['batch_size'] = wandb_config.batch_size
        train_config_updated['dim_hidden'] = wandb_config.dim_hidden
        train_config_updated['num_layers'] = wandb_config.num_layers
        train_config_updated['patch_size'] = wandb_config.patch_size
        train_config_updated['num_heads'] = wandb_config.num_heads

        # 更新された設定でトレーニング関数を実行
        train(train_config_updated)

In [None]:
sweep_config = {
    'method': 'random',  # サーチ手法（例：'random', 'grid', 'bayes'）
    'metric': {
        'name': '01_loss/valid',  # 最適化したいメトリック
        'goal': 'minimize'   # 目標（'minimize' または 'maximize'）
    },
    'parameters': {
        # 'lr': {
        #     'min': 1e-6,
        #     'max': 5e-4,
        #     'distribution': 'uniform'
        # },
        'lr': {
            'values': [1e-6, 5e-6, 1e-5, 5e-5, 1e-4, 5e-4]
        },
        'batch_size': {
            'values': [32, 64, 128]
        },
        'dim_hidden': {
            'values': [256, 512, 768, 1024]  # 隠れ層の次元の範囲
        },
        'num_layers': {
            'values': [3, 6, 9, 12]  # 層数の範囲
        },
        'patch_size': {
            'values': [4, 8]  # パッチサイズの範囲
        },
        'num_heads': {
            'values': [4, 8, 16]  # ヘッド数の範囲
        }
    },
    'early_terminate': {
        'type': 'hyperband',   # early_terminateのタイプ
        'min_iter': 20,        # 最小イテレーション数
        'eta': 3               # etaパラメータ（通常は3）
    },
    'run_cap': 50  # 最大試行回数
}

In [None]:
# Sweepの作成
sweep_id = wandb.sweep(sweep_config, project=WANDB_PROJECT)
# Sweepエージェントの実行
wandb.agent(sweep_id, train_wrapper)

Create sweep with ID: f693gxzy
Sweep URL: https://wandb.ai/56agumi85ten2sun/mlops-001/sweeps/f693gxzy


[34m[1mwandb[0m: Ctrl + C detected. Stopping sweep.


# End