# **CNN-KANを用いた画像分類**

## Kolmogorov-Arnold Network (KAN) の実装

### 必要なライブラリのインポート

In [None]:
import torch
import torch.nn.functional as F
import math

### KANLinearクラスの定義
KANLinearは、KANの核となる線形変換層である。この層は、通常の線形変換に加えてB-スプライン基底関数を用いた非線形変換を組み合わせている。
KANLinearクラスは、入力特徴量を非線形変換し、出力特徴量を生成する。主な特徴は以下の通り：
- B-スプライン基底関数を用いた非線形変換
- グリッド更新機能
- 正則化損失の計算

`__init__`メソッドは、KANLinear層の初期化を行う。主な役割は以下の通り：
- グリッドの初期化
- 基本重みとスプライン重みのパラメータ化
- 各種スケーリングパラメータの設定
- 活性化関数の設定

`b_splines`メソッドは、入力テンソルに対してB-スプライン基底を計算する。これにより、非線形変換の基礎となる基底関数が得られる。

`curve2coeff`メソッドは、与えられた点を補間する曲線の係数を計算する。これにより、スプライン重みが更新される。

`forward`メソッドは、入力テンソルに対して順伝播を行う。基本的な線形変換と、B-スプライン基底を用いた非線形変換を組み合わせて出力を生成する。

`update_grid`メソッドは、入力データの分布に基づいてグリッドを更新する。これにより、モデルが入力データの特性に適応できる。

`regularization_lossメ`ソッドは、モデルの過学習を防ぐための正則化損失を計算する。活性化の正則化とエントロピーの正則化を組み合わせている。

これらのメソッドにより、KANLinear層は非線形システムの近似に適した柔軟な変換を行うことができる。

In [None]:
class KANLinear(torch.nn.Module):
    def __init__(
        self,
        in_features,
        out_features,
        grid_size=5,
        spline_order=3,
        scale_noise=0.1,
        scale_base=1.0,
        scale_spline=1.0,
        enable_standalone_scale_spline=True,
        base_activation=torch.nn.SiLU,
        grid_eps=0.02,
        grid_range=[-1, 1],
    ):
        super(KANLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.grid_size = grid_size
        self.spline_order = spline_order

        h = (grid_range[1] - grid_range[0]) / grid_size
        grid = (
            (
                torch.arange(-spline_order, grid_size + spline_order + 1) * h
                + grid_range[0]
            )
            .expand(in_features, -1)
            .contiguous()
        )
        self.register_buffer("grid", grid)

        self.base_weight = torch.nn.Parameter(torch.Tensor(out_features, in_features), requires_grad=False)
        self.spline_weight = torch.nn.Parameter(
            torch.Tensor(out_features, in_features, grid_size + spline_order)
        )
        if enable_standalone_scale_spline:
            self.spline_scaler = torch.nn.Parameter(
                torch.Tensor(out_features, in_features)
            )

        self.scale_noise = scale_noise
        self.scale_base = scale_base
        self.scale_spline = scale_spline
        self.enable_standalone_scale_spline = enable_standalone_scale_spline
        self.base_activation = base_activation()
        self.grid_eps = grid_eps

        self.reset_parameters()

    def reset_parameters(self):
        torch.nn.init.kaiming_uniform_(self.base_weight, a=math.sqrt(5) * self.scale_base)
        with torch.no_grad():
            noise = (
                (
                    torch.rand(self.grid_size + 1, self.in_features, self.out_features)
                    - 1 / 2
                )
                * self.scale_noise
                / self.grid_size
            )
            self.spline_weight.data.copy_(
                (self.scale_spline if not self.enable_standalone_scale_spline else 1.0)
                * self.curve2coeff(
                    self.grid.T[self.spline_order : -self.spline_order],
                    noise,
                )
            )
            if self.enable_standalone_scale_spline:
                # torch.nn.init.constant_(self.spline_scaler, self.scale_spline)
                torch.nn.init.kaiming_uniform_(self.spline_scaler, a=math.sqrt(5) * self.scale_spline)

    def b_splines(self, x: torch.Tensor):
        """
        Compute the B-spline bases for the given input tensor.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, in_features).

        Returns:
            torch.Tensor: B-spline bases tensor of shape (batch_size, in_features, grid_size + spline_order).
        """
        assert x.dim() == 2 and x.size(1) == self.in_features

        grid: torch.Tensor = (
            self.grid
        )  # (in_features, grid_size + 2 * spline_order + 1)
        x = x.unsqueeze(-1)
        bases = ((x >= grid[:, :-1]) & (x < grid[:, 1:])).to(x.dtype)
        for k in range(1, self.spline_order + 1):
            bases = (
                (x - grid[:, : -(k + 1)])
                / (grid[:, k:-1] - grid[:, : -(k + 1)])
                * bases[:, :, :-1]
            ) + (
                (grid[:, k + 1 :] - x)
                / (grid[:, k + 1 :] - grid[:, 1:(-k)])
                * bases[:, :, 1:]
            )

        assert bases.size() == (
            x.size(0),
            self.in_features,
            self.grid_size + self.spline_order,
        )
        return bases.contiguous()

    def curve2coeff(self, x: torch.Tensor, y: torch.Tensor):
        """
        与えられた入力テンソルに対してB-スプライン基底を計算する。

        引数:
            x (torch.Tensor): 形状(batch_size, in_features)の入力テンソル。
            y (torch.Tensor): 形状(batch_size, in_features, out_features)の出力テンソル。

        戻り値:
            torch.Tensor: 形状(batch_size, in_features, grid_size + spline_order)のB-スプライン基底テンソル。
        """
        assert x.dim() == 2 and x.size(1) == self.in_features
        assert y.size() == (x.size(0), self.in_features, self.out_features)

        A = self.b_splines(x).transpose(
            0, 1
        )  # (in_features, batch_size, grid_size + spline_order)
        B = y.transpose(0, 1)  # (in_features, batch_size, out_features)
        solution = torch.linalg.lstsq(
            A, B
        ).solution  # (in_features, grid_size + spline_order, out_features)
        result = solution.permute(
            2, 0, 1
        )  # (out_features, in_features, grid_size + spline_order)

        assert result.size() == (
            self.out_features,
            self.in_features,
            self.grid_size + self.spline_order,
        )
        return result.contiguous()

    @property
    def scaled_spline_weight(self):
        return self.spline_weight * (
            self.spline_scaler.unsqueeze(-1)
            if self.enable_standalone_scale_spline
            else 1.0
        )

    def forward(self, x: torch.Tensor):
        assert x.size(-1) == self.in_features
        original_shape = x.shape
        x = x.reshape(-1, self.in_features)

        base_output = F.linear(self.base_activation(x), self.base_weight)
        spline_output = F.linear(
            self.b_splines(x).view(x.size(0), -1),
            self.scaled_spline_weight.view(self.out_features, -1),
        )
        output = base_output + spline_output
        
        output = output.reshape(*original_shape[:-1], self.out_features)
        return output

    @torch.no_grad()
    def update_grid(self, x: torch.Tensor, margin=0.01):
        assert x.dim() == 2 and x.size(1) == self.in_features
        batch = x.size(0)

        splines = self.b_splines(x)  # (batch, in, coeff)
        splines = splines.permute(1, 0, 2)  # (in, batch, coeff)
        orig_coeff = self.scaled_spline_weight  # (out, in, coeff)
        orig_coeff = orig_coeff.permute(1, 2, 0)  # (in, coeff, out)
        unreduced_spline_output = torch.bmm(splines, orig_coeff)  # (in, batch, out)
        unreduced_spline_output = unreduced_spline_output.permute(
            1, 0, 2
        )  # (batch, in, out)

        # sort each channel individually to collect data distribution
        x_sorted = torch.sort(x, dim=0)[0]
        grid_adaptive = x_sorted[
            torch.linspace(
                0, batch - 1, self.grid_size + 1, dtype=torch.int64, device=x.device
            )
        ]

        uniform_step = (x_sorted[-1] - x_sorted[0] + 2 * margin) / self.grid_size
        grid_uniform = (
            torch.arange(
                self.grid_size + 1, dtype=torch.float32, device=x.device
            ).unsqueeze(1)
            * uniform_step
            + x_sorted[0]
            - margin
        )

        grid = self.grid_eps * grid_uniform + (1 - self.grid_eps) * grid_adaptive
        grid = torch.concatenate(
            [
                grid[:1]
                - uniform_step
                * torch.arange(self.spline_order, 0, -1, device=x.device).unsqueeze(1),
                grid,
                grid[-1:]
                + uniform_step
                * torch.arange(1, self.spline_order + 1, device=x.device).unsqueeze(1),
            ],
            dim=0,
        )

        self.grid.copy_(grid.T)
        self.spline_weight.data.copy_(self.curve2coeff(x, unreduced_spline_output))

    def regularization_loss(self, regularize_activation=1.0, regularize_entropy=1.0):
        """
        正則化損失を計算する。

        これは、論文で述べられているオリジナルのL1正則化のシミュレーションである。
        オリジナルの方法では、F.linear関数の背後に隠れている中間テンソル(batch, in_features, out_features)から
        絶対値とエントロピーを計算する必要があるが、メモリ効率の良い実装を目指すため、この方法を採用している。

        L1正則化は現在、スプライン重みの平均絶対値として計算されている。
        著者の実装では、サンプルベースの正則化に加えてこの項も含まれている。
        """
        l1_fake = self.spline_weight.abs().mean(-1)
        regularization_loss_activation = l1_fake.sum()
        p = l1_fake / regularization_loss_activation
        regularization_loss_entropy = -torch.sum(p * p.log())
        return (
            regularize_activation * regularization_loss_activation
            + regularize_entropy * regularization_loss_entropy
        )


### KANクラスの定義
KANクラスでは、複数のKANLinear層を組み合わせて全体のネットワークを構成する。

`__init__`メソッドは、KANネットワーク全体の初期化を行う。主な役割は以下の通り：
- グリッドサイズとスプライン次数の設定
- 隠れ層の構造に基づいてKANLinear層のリストを作成
- 各KANLinear層に共通のハイパーパラメータを設定

`layers_hidden`引数は、ネットワークの各層のユニット数を指定するリストである。例えば、[64, 32, 16]と指定すると、入力層64ユニット、第1隠れ層32ユニット、出力層16ユニットのネットワークが構築される。

`forward`メソッドは、入力テンソルに対してネットワーク全体の順伝播を行う。主な特徴は以下の通り：
- 各KANLinear層を順番に適用
- オプションで各層のグリッドを更新可能（update_grid=Trueの場合）
- 最終的な出力を返す

`update_grid`オプションを使用することで、モデルを入力データの分布に適応させることができる。これは、特に分布が時間とともに変化する動的システムのモデリングに有用である。

`regularization_loss`メソッドは、ネットワーク全体の正則化損失を計算する。主な特徴は以下の通り：
- 各KANLinear層の正則化損失を合計
- 活性化の正則化とエントロピーの正則化を調整可能

In [None]:
class KAN(torch.nn.Module):
    def __init__(
        self,
        layers_hidden,
        grid_size=5,
        spline_order=3,
        scale_noise=0.1,
        scale_base=1.0,
        scale_spline=1.0,
        base_activation=torch.nn.SiLU,
        grid_eps=0.02,
        grid_range=[-1, 1],
    ):
        super(KAN, self).__init__()
        self.grid_size = grid_size
        self.spline_order = spline_order

        self.layers = torch.nn.ModuleList()
        for in_features, out_features in zip(layers_hidden, layers_hidden[1:]):
            self.layers.append(
                KANLinear(
                    in_features,
                    out_features,
                    grid_size=grid_size,
                    spline_order=spline_order,
                    scale_noise=scale_noise,
                    scale_base=scale_base,
                    scale_spline=scale_spline,
                    base_activation=base_activation,
                    grid_eps=grid_eps,
                    grid_range=grid_range,
                )
            )

    def forward(self, x: torch.Tensor, update_grid=False):
        for layer in self.layers:
            if update_grid:
                layer.update_grid(x)
            x = layer(x)
        return x

    def regularization_loss(self, regularize_activation=1.0, regularize_entropy=1.0):
        return sum(
            layer.regularization_loss(regularize_activation, regularize_entropy)
            for layer in self.layers
        )

## CNNの実装
このCNNモデルの構造は以下のようになっている：

1. **畳み込み部分**:
- 3つの畳み込み層 (conv1, conv2, conv3)
- 2つの最大プーリング層
- 特徴マップのサイズを徐々に小さくしながら、特徴を抽出


2. **全結合部分**:
- 3つの全結合層 (fc1, fc2, fc3)
- 畳み込み部分の出力を平坦化し、最終的な分類を行う

3. **正則化**:
- 2つのドロップアウト層 (畳み込み後と全結合層間)

4. **活性化関数**:
- 指定された活性化関数 (デフォルトはSiLU) を各層で使用

In [None]:
class CNN(torch.nn.Module):
    def __init__(self, 
                 input_size,  # (channels, height, width)
                 base_channels=32, 
                 hidden_units=64, 
                 dropout1_rate=0.25,
                 dropout2_rate=0.5,
                 base_activation=torch.nn.SiLU):
        super(CNN, self).__init__()
        input_channels, height, width = input_size
        self.base_activation = base_activation()
        self.conv1 = torch.nn.Conv2d(input_channels, base_channels, 3, 1)
        self.conv2 = torch.nn.Conv2d(base_channels, base_channels * 2, 3, 1)
        self.conv3 = torch.nn.Conv2d(base_channels * 2, base_channels, 1)
        conv_output_size = self._get_conv_output_size(height, width, base_channels)
        self.fc1 = torch.nn.Linear(conv_output_size, hidden_units)
        self.fc2 = torch.nn.Linear(hidden_units, hidden_units // 2)
        self.fc3 = torch.nn.Linear(hidden_units // 2, 10)
        self.dropout1 = torch.nn.Dropout2d(dropout1_rate)
        self.dropout2 = torch.nn.Dropout(dropout2_rate)

    def _get_conv_output_size(self, height, width, base_channels):
        size_h = (height - 2) // 2
        size_h = (size_h - 2) // 2
        size_w = (width - 2) // 2
        size_w = (size_w - 2) // 2
        return size_h * size_w * base_channels

    def forward(self, x):
        x = self.base_activation(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = self.base_activation(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.base_activation(self.conv3(x))
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.base_activation(self.fc1(x))
        x = self.dropout2(x)
        x = self.base_activation(self.fc2(x))
        x = self.fc3(x)
        return x


## CNN-KANの実装
CNNの2つの全結合層を削除し、KAN層を追加している。
一つ目の全結合層はKAN層の次元削減のための役割を果たしている。これにより、学習時間が削減される。

In [None]:
class CNNKAN(CNN):
    def __init__(self, 
                 input_size,  # (channels, height, width)
                 base_channels=32, 
                 kan_hidden=128,
                 dropout1_rate=0.25,
                 dropout2_rate=0.5,
                 base_activation=torch.nn.SiLU):
        super(CNNKAN, self).__init__(input_size, base_channels, kan_hidden, dropout1_rate, dropout2_rate, base_activation)
        _, height, width = input_size
        conv_output_size = self._get_conv_output_size(height, width, base_channels)
        del self.fc1
        del self.fc2
        del self.fc3

        self.fc1 = torch.nn.Linear(conv_output_size, kan_hidden)
        self.kan = KAN([kan_hidden, kan_hidden // 4, 10])

    def forward(self, x):
        x = self.base_activation(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = self.base_activation(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.base_activation(self.conv3(x))
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.base_activation(self.fc1(x))
        x = self.dropout2(x)
        x = self.kan(x)
        return x

## トレーニングコードの実装

### 必要なライブラリのインポート

In [None]:
import os
import warnings
import time
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from tqdm import tqdm

### データセットのロード

`load_mnist(batch_size=64)`

MNISTデータセットを読み込み、データローダーを生成する。
- 画像を正規化し、テンソルに変換
- 訓練用、検証用、評価用のデータローダーを作成
- バッチサイズは指定可能（デフォルト64）

`load_cifar10(batch_size=64)`

CIFAR-10データセットを読み込み、データローダーを生成する。
- MNISTと同様の処理を行うが、CIFAR-10用の正規化パラメータを使用


`load_cifar10(batch_size=64)`

CIFAR-10データセットを読み込み、データローダーを生成する。
- MNISTと同様の処理を行うが、CIFAR-10用の正規化パラメータを使用

In [None]:
def load_mnist(batch_size=64):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))
    ])
    trainset = torchvision.datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    valset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
    trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
    valloader = DataLoader(valset, batch_size=batch_size, shuffle=False)
    eval_loader = DataLoader(valset, batch_size=len(valset), shuffle=False)
    return trainloader, valloader, eval_loader

def load_cifar10(batch_size=64):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261))
    ])
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    valset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
    trainloader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
    valloader = DataLoader(valset, batch_size=batch_size, shuffle=False)
    eval_loader = DataLoader(valset, batch_size=len(valset), shuffle=False)
    return trainloader, valloader, eval_loader

### モデルのセットアップ

`setup_model(model)`

モデルのセットアップを行う。
- GPUが利用可能な場合はGPUを使用
- AdamWオプティマイザと学習率スケジューラを設定
- 損失関数としてクロスエントロピーを使用

In [None]:
def setup_model(model):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4) # type: ignore
    scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
    criterion = torch.nn.CrossEntropyLoss()
    return model, optimizer, scheduler, criterion, device

### チェックポイントの保存・ロード

`save_checkpoint(model, optimizer, scheduler, epoch, val_accuracy, filename)`

モデルの状態をチェックポイントとして保存する。

`load_checkpoint(filename, model, optimizer, scheduler)`

保存されたチェックポイントからモデルの状態を読み込む。

In [None]:
def save_checkpoint(model, optimizer, scheduler, epoch, val_accuracy, filename):
    if not os.path.exists('checkpoints'):
        os.makedirs('checkpoints')
    file_path = os.path.join('checkpoints', filename)
    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'scheduler_state_dict': scheduler.state_dict(),
        'val_accuracy': val_accuracy
    }
    torch.save(checkpoint, file_path)

def load_checkpoint(filename, model, optimizer, scheduler):
    file_path = os.path.join('checkpoints', filename)
    checkpoint = torch.load(file_path)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
    return checkpoint['epoch'], checkpoint['val_accuracy']

### 訓練・評価関数

`train_epoch(model, trainloader, optimizer, criterion, device)`

1エポックの訓練を行う。
- tqdmを使用して進捗を表示
- 各バッチごとに損失を計算し、モデルを更新

`validate(model, valloader, criterion, device)`

検証データセットでモデルの性能を評価する。

`evaluate(model, dataloader, device)`

指定されたデータローダーを使用してモデルの精度を評価する。

In [None]:
def train_epoch(model, trainloader, optimizer, criterion, device):
    model.train()
    with tqdm(trainloader, desc="Training") as pbar:
        for images, labels in pbar:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            if isinstance(model, KAN):
                images = images.view(images.size(0), -1)
            output = model(images)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            accuracy = (output.argmax(dim=1) == labels).float().mean()
            pbar.set_postfix(loss=f"{loss.item():.4f}", accuracy=f"{accuracy.item():.4f}")
    return loss.item(), accuracy.item() # type: ignore

def validate(model, valloader, criterion, device):
    model.eval()
    val_loss = 0
    val_accuracy = 0
    with torch.no_grad():
        for images, labels in valloader:
            images, labels = images.to(device), labels.to(device)
            if isinstance(model, KAN):
                images = images.view(images.size(0), -1)
            output = model(images)
            val_loss += criterion(output, labels).item()
            val_accuracy += (output.argmax(dim=1) == labels).float().mean().item()
    return val_loss / len(valloader), val_accuracy / len(valloader)

def evaluate(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            if isinstance(model, KAN):
                images = images.view(images.size(0), -1)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

`plot_comparison(results, dataset, epochs)`

複数のモデルの訓練結果を比較するグラフを作成し保存する。

トレーニング時と検証時に分けてグラフを作成する。

1. 損失比較プロット（左）
- 各モデルの検証損失を表示

2. 精度比較プロット（右）
- 各モデルの検証精度を表示

3. グラフの調整と保存
- 軸ラベル、タイトル、凡例を設定
- 'figures'ディレクトリに保存

In [None]:
def plot_comparison(results, dataset, epochs):
    if not os.path.exists('figures'):
        os.makedirs('figures')

    file_path_train = os.path.join('figures', f'{dataset}_{epochs}_training_comparison.png')
    plt.figure(figsize=(16, 8))

    plt.subplot(1, 2, 1)
    for model, data in results.items():
        plt.plot(np.arange(1, epochs + 1), data['train_losses'], label=f'{model} Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Training Loss Comparison ({dataset})')
    plt.legend()
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))

    plt.subplot(1, 2, 2)
    for model, data in results.items():
        plt.plot(np.arange(1, epochs + 1), data['train_accuracies'], label=f'{model} Training Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title(f'Training Accuracy Comparison ({dataset})')
    plt.legend()
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))

    plt.tight_layout()
    plt.savefig(file_path_train)
    plt.close()

    file_path_val = os.path.join('figures', f'{dataset}_{epochs}_validation_comparison.png')
    plt.figure(figsize=(16, 8))

    plt.subplot(1, 2, 1)
    for model, data in results.items():
        plt.plot(np.arange(1, epochs + 1), data['val_losses'], label=f'{model} Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Validation Loss Comparison ({dataset})')
    plt.legend()
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))

    plt.subplot(1, 2, 2)
    for model, data in results.items():
        plt.plot(np.arange(1, epochs + 1), data['val_accuracies'], label=f'{model} Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title(f'Validation Accuracy Comparison ({dataset})')
    plt.legend()
    plt.gca().xaxis.set_major_locator(MaxNLocator(integer=True))

    plt.tight_layout()
    plt.savefig(file_path_val)
    plt.close()

### Main関数

モデルの訓練・評価を行う。

1. データセットの選択と読み込み
- 'mnist' または 'cifar10' から選択
- 選択されたデータセットに応じてデータローダーを生成

1. モデルの選択と初期化
- 'cnn'、'kan'、'cnn_kan' から選択
- 選択されたモデルを初期化し、入力チャンネル数を設定

3. モデルのセットアップ
- デバイス（GPU/CPU）の設定
- オプティマイザ、スケジューラ、損失関数の初期化

4. 訓練ループ
- 指定されたエポック数だけ訓練を繰り返す
- 各エポックで訓練と検証を実行
- 学習率のスケジューリングを行う
- 訓練と検証の損失と精度を表示

5. ベストモデルの保存
- 検証精度が最高値を更新するたびにモデルを保存

6. ベストモデルの評価
- 保存された最良のモデルを読み込む
- 訓練データセット全体での精度を評価
- テストデータセット全体での精度を評価

7. 結果の可視化
- 全モデルの訓練・検証結果を比較するグラフを生成
- 損失と精度の推移を別々のサブプロットで表示
- 結果を画像ファイルとして保存

In [None]:
def main(dataset, models, epochs):
    warnings.filterwarnings("ignore")

    if dataset == 'mnist':
        trainloader, valloader, eval_loader = load_mnist()
        input_size = (1, 28, 28)
    elif dataset == 'cifar10':
        trainloader, valloader, eval_loader = load_cifar10()
        input_size = (3, 32, 32)
    else:
        raise ValueError(f"Unknown dataset: {dataset}")

    results = {}

    for modelname in models:
        print(f"\nTraining {modelname} on {dataset}")
        if modelname == 'cnn':
            model = CNN(input_size)
        elif modelname == 'kan':
            input_size_flat = input_size[0] * input_size[1] * input_size[2]
            model = KAN([input_size_flat, 64, 10])
        elif modelname == 'cnn-kan':
            model = CNNKAN(input_size)
        else:
            raise ValueError(f"Unknown model: {modelname}")

        model, optimizer, scheduler, criterion, device = setup_model(model)

        train_losses = []
        train_accuracies = []
        val_losses = []
        val_accuracies = []

        best_val_accuracy = 0

        start_time = time.time()
        filename = f'{modelname}_{dataset}_{epochs}_checkpoint.pth'
        for epoch in range(epochs):
            train_loss, train_acc = train_epoch(model, trainloader, optimizer, criterion, device)
            val_loss, val_accuracy = validate(model, valloader, criterion, device)
            scheduler.step()

            train_losses.append(train_loss)
            train_accuracies.append(train_acc)
            val_losses.append(val_loss)
            val_accuracies.append(val_accuracy)

            print(f"Epoch {epoch + 1}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")
            print(f"Epoch {epoch + 1}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                save_checkpoint(model, optimizer, scheduler, epoch, val_accuracy, filename)

        end_time = time.time()
        total_time = end_time - start_time
        print(f"Total training time for {modelname}: {total_time:.2f} seconds")

        best_epoch, best_accuracy = load_checkpoint(filename, model, optimizer, scheduler)
        print(f"Loaded best model from epoch {best_epoch+1} with validation accuracy {best_accuracy:.4f}")

        test_accuracy = evaluate(model, eval_loader, device)
        print(f"Test Accuracy for {modelname}: {test_accuracy:.4f}")

        results[modelname] = {
            'train_losses': train_losses,
            'train_accuracies': train_accuracies,
            'val_losses': val_losses,
            'val_accuracies': val_accuracies,
            'test_accuracy': test_accuracy
        }

    plot_comparison(results, dataset, epochs)

## トレーニング実行

In [None]:
if __name__ == "__main__":
    main(dataset='mnist', models=['cnn', 'kan', 'cnn-kan'], epochs=10)

In [None]:
if __name__ == "__main__":
    main(dataset='cifar10', models=['cnn', 'kan', 'cnn-kan'], epochs=10)