In [None]:
from collections import deque
import copy
from tqdm import tqdm
from typing import Callable
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE

import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision
import random
from PIL import Image
import numpy as np

from utils.generate_subset
from utils.transform
from utils.get_dataset_statistics
from utils.target_transform

In [None]:
def evaluate(data_loader:Dataset ,model:nn.Module,loss_func:Callable):
  model.eval()
  losses = []
  preds = []
  for x,y in data_loader:
    with torch.no_grad():
      x = x.to(model.get_device())
      y = y.to(model.get_device())

      y_pred = model(x)

      losses.append(loss_func(y_pred , y , reduction='none'))
      preds.append(y_pred.argmax(dim=1) == y)

  loss = torch.cat(losses).mean()
  accuracy = torch.cat(preds).float().mean()

  return loss , accuracy

In [None]:
# 検証セットの結果による最良モデルの保存用変数
val_loss_best = float('inf')
model_best = None

In [1]:
class Config:
    '''
    ハイパーパラメータとオプションの設定
    '''
    def __init__(self):
        self.val_ratio = 0.2       # 検証に使う学習セット内のデータの割合
        self.dim_hidden = 512      # 隠れ層の特徴量次元
        self.num_hidden_layers = 2 # 隠れ層の数
        self.num_epochs = 30       # 学習エポック数
        self.lr = 1e-2             # 学習率
        self.moving_avg = 20       # 移動平均で計算する損失と正確度の値の数
        self.batch_size = 32       # バッチサイズ
        self.num_workers = 2       # データローダに使うCPUプロセスの数
        self.device = 'cpu'        # 学習に使うデバイス
        self.num_samples = 200     # t-SNEでプロットするサンプル数
config = Config()

In [None]:
def train_eval():
    config = Config()

    # 入力データ正規化のために学習セットのデータを使って
    # 各次元の平均と標準偏差を計算
    dataset = torchvision.datasets.CIFAR10(
        root='data', train=True, download=True,
        transform=transform)

    channel_mean, channel_std = get_dataset_statistics(dataset)

    # 正規化を含めた画像整形関数の用意（この時点で平坦化されている。）
    img_transform = lambda x: transform(
        x, channel_mean, channel_std)

        # 学習、評価セットの用意
    train_dataset = torchvision.datasets.CIFAR10(
        root='data', train=True, download=True,
        transform=img_transform)
    test_dataset = torchvision.datasets.CIFAR10(
        root='data', train=False, download=True,
        transform=img_transform)
    
    # データの数を分けている。全てリストに8:2で分けられている
    val_set,train_set = generate_subset(
     train_dataset,config.val_ratio)
    
    train_sampler = SubsetRandomSampler(train_set)

        # DataLoaderを生成
    train_loader = DataLoader(
        train_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers, sampler=train_sampler)
    val_loader = DataLoader(
        train_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers, sampler=val_set)
    test_loader = DataLoader(
        test_dataset, batch_size=config.batch_size,
        num_workers=config.num_workers)
    
    # 目的関数の生成
    loss_func = F.cross_entropy

    # 検証セットの結果による最良モデルの保存用変数
    val_loss_best = float('inf')
    model_best = None

        # FNNモデルの生成
    model = FNN(32 * 32 * 3, config.dim_hidden,
                config.num_hidden_layers,
                len(train_dataset.classes))

    # モデルを指定デバイスに転送(デフォルトはCPU)
    model.to(config.device)

    # 最適化器の生成
    optimizer = optim.SGD(model.parameters(), lr=config.lr)

    for epoch in range(config.num_epochs):
      model.train()

      with tqdm(train_loader) as pbar:
        pbar.set_description(f'[エポック {epoch + 1}]')

        # 移動平均計算用
        losses = deque()
        accs = deque()
        for x,y in pbar:
          #データをモデルと同じデバイスに転送
          x = x.to(model.get_device())
          y = y.to(model.get_device())

          # すでに計算された勾配をリセット
          optimizer.zero_grad()

          # 順伝番
          y_pred = model(x)

          #学習データに対する損失と正確度を計算
          loss = loss_func(y_pred,y)
          accuracy = (y_pred.argmax(dim=1) == y).float().mean()

          #誤差逆伝番
          loss.backward()

          #パラメーター更新
          optimizer.step()

          #移動平均を計算して表示
          losses.append(loss.item())
          accs.append(accuracy.item())
          if len(losses) > config.moving_avg: #直近のmoving_avgの個数の移動平均しか見ない
            losses.popleft()
            accs.popleft()
          pbar.set_postfix({
              'loss':torch.Tensor(losses).mean().item(),
              'accuracy':torch.Tensor(accs).mean().item()
          })

      #検証データを使って精度評価
      val_loss,val_accuracy = evaluate(
          val_loader,model,loss_func)
      
      print(f'検証：loss = {val_loss:.3f},' f'検証:accuracy = {val_accuracy:.3f}')

      # より良い検証結果が得られた場合、モデルを記録
      if val_loss < val_loss_best:
        val_loss_best = val_loss
        model_best = model.copy()

    #テスト
    test_loss , test_accuracy = evaluate(
        test_loader,model_best,loss_func)
    print(f'テスト: loss = {test_loss:.3f}, '
          f'accuracy = {test_accuracy:.3f}')

In [None]:
train_eval()