### 11.7 CNN

### ライブラリ・環境設定

#### ライブラリ導入

In [None]:
# 必要ライブラリ追加導入
!pip install japanize-matplotlib -qq
!pip install torchviz -qq
!pip install torchinfo -qq

#### ライブラリインポート

In [None]:
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import japanize_matplotlib
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchinfo import summary
from tqdm.notebook import tqdm

#### 環境設定

In [None]:
np.set_printoptions(formatter={'float': '{:0.3f}'.format})
pd.options.display.float_format = '{:.3f}'.format
warnings.filterwarnings('ignore')

#### GPU存在チェック

In [None]:
# GPU存在チェック

# GPUが利用可能かどうかのチェック
device = torch.device("cuda:0" \
if torch.cuda.is_available() else "cpu")

# 利用可能な場合は"cuda:0"が出力される
print(device)

### データ準備

#### データローダー構築

In [None]:
# データローダー構築
def get_data_loaders(batch_size=100, data_dir="./data"):
    """MNISTの訓練・テストデータをDataLoaderで返す"""
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])

    train_set = datasets.MNIST(root=data_dir, train=True, download=True, transform=transform)
    test_set  = datasets.MNIST(root=data_dir, train=False, download=True, transform=transform)

    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, pin_memory=True)
    test_loader  = DataLoader(test_set,  batch_size=batch_size, shuffle=False, pin_memory=True)

    return train_loader, test_loader

batch_size = 100
train_loader, test_loader = get_data_loaders(batch_size)

### モデル構築

#### モデル定義

####  全結合型モデル定義

In [None]:
# 全結合型モデル定義

class Net(nn.Module):
    """全結合1層のシンプルなNN（MNIST想定: 28*28 -> 100 -> 10）"""
    def __init__(self, n_input=28*28, n_hidden=100, n_output=10):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_input, n_hidden),
            nn.ReLU(inplace=True),
            nn.Linear(n_hidden, n_output)
        )

    def forward(self, x):
        # (B, 1, 28, 28) -> (B, 784)
        x = torch.flatten(x, 1)
        return self.net(x)

#### CNNモデル定義

In [None]:
# CNNモデル定義
class CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
        )
        self.classifier = nn.Linear(64 * 7 * 7, 10)

    def forward(self, x):
        # 入力は (B, 1, 28, 28) を想定（ToTensor()でOK）
        x = self.features(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.classifier(x)
        return x  # CrossEntropyLoss用にlogitsを返す

#### 訓練用関数

In [None]:
# 学習用関数

def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        # 勾配初期化
        optimizer.zero_grad()
        # 予測計算
        outputs = model(inputs)
        # 損失計算
        loss = criterion(outputs, labels)
        # 勾配(微分)計算
        loss.backward()
        # パラメータ更新
        optimizer.step()

        with torch.no_grad():
            preds = outputs.argmax(1)
            running_loss += loss.item() * labels.size(0)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    return running_loss / total, correct / total

#### 検証用関数

In [None]:
# 検証用関数

@torch.no_grad()
def validate(model, loader, criterion):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0

    for inputs, labels in loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        preds = outputs.argmax(1)

        running_loss += loss.item() * labels.size(0)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    return running_loss / total, correct / total

#### 学習関数

In [None]:
def fit(*, net_class=Net, n_hidden=100, num_epochs=20, lr=0.01,
        batch_size=100, optimizer_class=optim.SGD,
        seed=42, data_dir="./data"):
    """
    引数（すべてキーワード専用）:
        net_class       : モデルクラス（例: Net, CustomCNNなど）
        n_hidden        : 隠れ層ノード数（Net使用時のみ）
        num_epochs      : 繰り返し数
        lr              : 学習率
        batch_size      : バッチサイズ
        optimizer_class : 最適化関数クラス (例: optim.SGD, optim.Adam)

    戻り値:
        model, history(np.ndarray: [epoch, train_loss, train_acc, val_loss, val_acc])
    """
    torch.manual_seed(seed)
    np.random.seed(seed)

    # DataLoader作成（バッチサイズ指定）
    train_loader, test_loader = get_data_loaders(batch_size=batch_size,
                                                 data_dir=data_dir)

    # モデル構築（引数対応）
    try:
        model = net_class(n_hidden=n_hidden).to(device)
    except TypeError:
        model = net_class().to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optimizer_class(model.parameters(), lr=lr)

    history = []
    for epoch in tqdm(range(1, num_epochs + 1), desc="Training"):
        tr_loss, tr_acc = train_one_epoch(model, train_loader, criterion,
                                          optimizer)
        va_loss, va_acc = validate(model, test_loader, criterion)

        print(
            f"Epoch [{epoch}/{num_epochs}] "
            f"train_loss: {tr_loss:.5f}, train_acc: {tr_acc:.5f}, "
            f"val_loss: {va_loss:.5f}, val_acc: {va_acc:.5f}"
        )
        history.append([epoch, tr_loss, tr_acc, va_loss, va_acc])

    return model, np.array(history, dtype=float)

#### 学習

In [None]:
model, history = fit()


In [None]:
model_cnn, history_cnn = fit(net_class=CNN)


### 結果確認

#### グラフ描画関数

In [None]:
# グラフ描画関数

import matplotlib.pyplot as plt

def plot_learning_curves_multi(histories, labels=None, title_suffix=""):
    """
    複数のhistoryを黒と青のみで重ねて描画
    実線: 訓練データ, 破線: テストデータ
    """
    plt.figure(figsize=(10, 4), tight_layout=True)

    # --- 線スタイルの組み合わせ（黒と青のみ）---
    colors = ['b', 'k']        # 4本まで対応
    linestyles = ['-', '-']  # パターンで区別

    # --- 損失曲線 ---
    plt.subplot(1, 2, 1)
    for i, history in enumerate(histories):
        epochs = history[:, 0]
        train_loss = history[:, 1]
        val_loss = history[:, 3]
        color = colors[i % len(colors)]
        ls_val = linestyles[(i + 0) % len(linestyles)]  # テストは異なる線種

        label_val   = f"{labels[i]}(テスト)" if labels else f"model{i+1}(テスト)"

        plt.plot(epochs, val_loss,   color=color, linestyle=ls_val,   label=label_val)

    plt.xlabel('繰り返し回数')
    plt.ylabel('損失')
    plt.xticks(np.arange(0,21,2))
    plt.title(f'学習曲線（損失）{title_suffix}')
    plt.legend(fontsize=8)
    plt.grid(True)

    # --- 精度曲線 ---
    plt.subplot(1, 2, 2)
    for i, history in enumerate(histories):
        epochs = history[:, 0]
        train_acc = history[:, 2]
        val_acc   = history[:, 4]
        color = colors[i % len(colors)]
        ls_val = linestyles[(i + 0) % len(linestyles)]

        label_val   = f"{labels[i]}(テスト)" if labels else f"model{i+1}(テスト)"

        plt.plot(epochs, val_acc,   color=color, linestyle=ls_val,   label=label_val)

    plt.xlabel('繰り返し回数')
    plt.ylabel('精度')
    plt.xticks(np.arange(0,21,2))
    plt.title(f'学習曲線（精度）{title_suffix}')
    plt.ylim(0.9,1.0)
    plt.legend(fontsize=8)
    plt.grid(True)
    plt.show()

#### グラフ描画

In [None]:
# グラフ描画
plot_learning_curves_multi(
    [history, history_cnn],
    labels=["Net", "CNN"],
    title_suffix="Net vs CNN"
)

### バージョン確認

In [None]:
!pip install watermark -qq
%load_ext watermark
%watermark --iversions