In [None]:
# standard modules
import os
import math
import random
from datetime import datetime
import pdb

# myself
from config import setting
from module import const
from module import machine_learning_model  # AlexNet, get_input_size_into_FLATTEN


import numpy as np
import matplotlib.pyplot as plt
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset, DataLoader
from torch import nn
import torch.nn.functional as F
from torch import optim


from sklearn.model_selection import train_test_split
from sklearn import preprocessing

from IPython.display import display
from torchinfo import summary
from torchviz import make_dot

import os
import sys
import cv2
from PIL import Image
import pandas as pd

In [None]:
# check GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

const.CLASSES = ("S10C", "S15C", "S25C", "S35C", "S45C", "S55C")
const.NUM_CLASS = 6

const.IMG_HEIGHT = 256
const.IMG_WIDTH = 256

# バッチサイズ(目安は全データの1/10だが正解なし)
# const.BATCH_SIZE = 2
# const.BATCH_SIZE = 6
# const.BATCH_SIZE = 18
const.BATCH_SIZE = 24

# const.NUM_EPOCHS = 60
const.NUM_EPOCHS =5

In [None]:
x10_960_x1_datasets = np.load(const.DATASET_PATH)

# データを3次元のtensor([c h w])に変換(1次元と3次元で比較)
images = x10_960_x1_datasets["x"]
labels = x10_960_x1_datasets["y"]

images_list = []
labels_list = []

# 正規化手法定義 MinMaxScaler(0<=data<=1)
mmscaler = preprocessing.MinMaxScaler()

for i, image in enumerate(images):
    print(f"image: ")
    print(image)

    image = Image.fromarray(image)
    image = image.convert("RGB")

    image = image.resize((const.IMG_HEIGHT, const.IMG_WIDTH))

    # print("image mode: {}".format(image.mode)) # むりやり3チャンネルにしているので表記上はRGBだが実際はL
    # PIL前に正規化をするとバグる
    image = np.asarray(image, np.float32)

    # 正規化
    # 次元が2以上なので使えない
    # image = mmscaler.fit_transform(image)
    image = image.astype(float) / 255.0

    image = np.transpose(image, (2, 0, 1))
    images_list.append(image)
    labels_list.append(labels[i])

    print(i, "/", len(images))


transpose_images = np.array(images_list)
transpose_labels = np.array(labels_list)

print("transpose_images dtype: {}".format(type(transpose_images)))
print("transpose_images shape: {}".format(transpose_images.shape))
print("transpose_images: {}".format(transpose_images))

print("transpose_labels dtype: {}".format(type(transpose_labels)))
print("transpose_labels shape: {}".format(transpose_labels.shape))
print("transpose_labels: {}".format(transpose_labels))
np.savez(
    const.APP_PATH, "data/temp/np_savez", transpose_images, x10_960_x1_datasets["y"]
)

In [None]:
# transformsなしの自作のデータセット作成
data = torch.tensor(transpose_images, dtype=torch.float32)
label = torch.tensor(transpose_labels, dtype=torch.int64)

print("data.shape:", data.shape, "label.shape:", label.shape)

# Datasetを作成
dataset = torch.utils.data.TensorDataset(data, label)

X_sample, y_sample = dataset[0]
print(X_sample.shape, y_sample.shape)
print(X_sample)
print(y_sample)


# NOTE: transformsを使ったrailにのったやり方(自作クラスを後に作成する)
# https://dreamer-uma.com/pytorch-dataset/

In [None]:
# DataLoader ミニバッチ学習
# 入力値と目標値をまとめる


# 学習データ、検証データ、テストデータに分ける
# 各データセットのサンプル数を決定
# train : test = 60% : 20%
num_train = int(len(dataset) * 0.8)
num_test = int(len(dataset) - num_train)

print("num_train:", num_train)
print("num_test:", num_test)

# ランダムにするらめにシードの固定をした
# torch.manual_seed(0)
# データセットの分割
train, test = torch.utils.data.random_split(dataset, [num_train, num_test])
print("train:", train)
print("test:", test)

In [None]:
# ミニバッチ学習

train_loader = torch.utils.data.DataLoader(
    train, const.BATCH_SIZE, shuffle=True, drop_last=True
)
test_loader = torch.utils.data.DataLoader(test, const.BATCH_SIZE)

sample_x, sample_t = next(iter(train_loader))

print("x:", sample_x)
print("t:", sample_t)

In [None]:
# 2.3 ニューラルネットワークの定義

n_output = const.NUM_CLASS
# 中間層のノード数(適切を調査)
n_hidden = 128

# print("x before shape: {}".format(sample_x.shape))

# # flattenに入れる前の入力画像(特徴MAP)のサイズ確認
conv1 = nn.Conv2d(3, 32, 3, padding=(1, 1))
conv2 = nn.Conv2d(32, 32, 3, padding=(1, 1))
conv3 = nn.Conv2d(32, 64, 3, padding=(1, 1))
conv4 = nn.Conv2d(64, 64, 3, padding=(1, 1))
conv5 = nn.Conv2d(64, 128, 3, padding=(1, 1))
conv6 = nn.Conv2d(128, 128, 3, padding=(1, 1))

bn1 = nn.BatchNorm2d(32)
bn2 = nn.BatchNorm2d(32)
bn3 = nn.BatchNorm2d(64)
bn4 = nn.BatchNorm2d(64)
bn5 = nn.BatchNorm2d(128)
bn6 = nn.BatchNorm2d(128)

dropout1 = nn.Dropout(0.2)
dropout2 = nn.Dropout(0.3)
dropout3 = nn.Dropout(0.4)

relu = nn.ReLU(inplace=True)
maxpool = nn.MaxPool2d((2, 2))

x1 = relu(bn1(conv1(sample_x)))
x2 = dropout1(maxpool(relu(bn2(conv2(x1)))))
x3 = relu(bn3(conv3(x2)))
x4 = dropout2(maxpool(relu(bn4(conv4(x3)))))
x5 = relu(bn5(conv5(x4)))
x6 = dropout3(maxpool(relu(bn6(conv6(x5)))))

x7 = torch.flatten(x6)
print("x7 shape: {}".format(x7.shape))

# x2 = relu(x1)
# x3 = conv2(x2)
# x4 = relu(x3)
# x5 = maxpool(x4)

# print("x1 shape: {}".format(x1.shape))
# print("x2 shape: {}".format(x2.shape))
# print("x3 shape: {}".format(x3.shape))
# print("x4 shape: {}".format(x4.shape))
# print("x5 shape: {}".format(x5.shape))

# features = nn.Sequential(conv1, relu, conv2, relu, maxpool)

# outputs = features(sample_x)
# print(outputs)

# flatten = nn.Flatten()
# outputs2 = flatten(outputs)

# print("outputs shape: {}".format(outputs.shape))
# print("outputs2 shape: {}".format(outputs2.shape))


# # flattenに入れる前の入力画像(特徴MAP)のサイズ確認
# conv1 = nn.Conv2d(3, 32, 3)
# conv2 = nn.Conv2d(32, 32, 3)
# maxpool = nn.MaxPool2d((2, 2))
# relu = nn.ReLU(inplace=True)

# x = maxpool(relu(conv1(x)))
# x = maxpool(relu(conv2(x)))
# x = torch.flatten(x)

# print("x shape: {}".format(x.shape))


input_size_into_FLATTEN = machine_learning_model.get_input_size_into_FLATTEN(sample_x)
print("input_size_into_FLATTEN: {}".format(input_size_into_FLATTEN))

# load AlexNet
net = machine_learning_model.AlexNet(n_output, n_hidden, sample_t).to(device)
print(net)

In [None]:
# 2.4 損失関数と最適化関数の定義

const.lr = 0.01

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters())
# optimizer = optim.SGD(net.parameters(), lr=lr)

In [None]:
# 学習用関数
def fit(
    net, optimizer, criterion, num_epochs, train_loader, test_loader, device, history
):
    # tqdmライブラリのインポート
    from tqdm import tqdm

    base_epochs = len(history)

    for epoch in range(base_epochs, num_epochs + base_epochs):
        # 1エポックあたりの正解数(精度計算用)
        n_train_acc, n_val_acc = 0, 0
        # 1エポックあたりの累積損失(平均化前)
        train_loss, val_loss = 0, 0
        # 1エポックあたりのデータ累積件数
        n_train, n_test = 0, 0

        # lossの最小値
        min_val_loss = 1

        # 訓練フェーズ
        net.train()

        # for inputs, labels in tqdm(train_loader):
        for inputs, labels in train_loader:
            # 1バッチあたりのデータ件数
            train_batch_size = len(labels)
            # 1エポックあたりのデータ累積件数
            n_train += train_batch_size

            # GPUヘ転送
            inputs = inputs.to(device)
            labels = labels.to(device)

            # 勾配の初期化
            optimizer.zero_grad()

            # 予測計算
            outputs = net(inputs)

            # 損失計算
            loss = criterion(outputs, labels)

            # 勾配計算
            loss.backward()

            # パラメータ修正
            optimizer.step()

            # 予測ラベル導出
            predicted = torch.max(outputs, 1)[1]

            # 平均前の損失と正解数の計算
            # lossは平均計算が行われているので平均前の損失に戻して加算
            train_loss += loss.item() * train_batch_size
            n_train_acc += (predicted == labels).sum().item()

        # 予測フェーズ
        net.eval()

        for inputs_test, labels_test in test_loader:
            # 1バッチあたりのデータ件数
            test_batch_size = len(labels_test)
            # 1エポックあたりのデータ累積件数
            n_test += test_batch_size

            # GPUヘ転送
            inputs_test = inputs_test.to(device)
            labels_test = labels_test.to(device)

            # 予測計算
            outputs_test = net(inputs_test)

            # 損失計算
            loss_test = criterion(outputs_test, labels_test)

            # 予測ラベル導出
            predicted_test = torch.max(outputs_test, 1)[1]

            #  平均前の損失と正解数の計算
            # lossは平均計算が行われているので平均前の損失に戻して加算
            val_loss += loss_test.item() * test_batch_size
            n_val_acc += (predicted_test == labels_test).sum().item()

        # 精度計算
        train_acc = n_train_acc / n_train
        val_acc = n_val_acc / n_test
        # 損失計算
        avg_train_loss = train_loss / n_train
        avg_val_loss = val_loss / n_test
        # 結果表示
        print(
            f"Epoch [{(epoch+1)}/{num_epochs+base_epochs}], loss: {avg_train_loss:.5f} acc: {train_acc:.5f} val_loss: {avg_val_loss:.5f}, val_acc: {val_acc:.5f}"
        )

        # 記録
        if min_val_loss > avg_val_loss:
            torch.save(net.state_dict(), "AlexNet_model_max_weight.pth")

        item = np.array([epoch + 1, avg_train_loss, train_acc, avg_val_loss, val_acc])
        history = np.vstack((history, item))
    return history


history = np.zeros((0, 5))

history = fit(
    net,
    optimizer,
    criterion,
    const.NUM_EPOCHS,
    train_loader,
    test_loader,
    device,
    history,
)

In [None]:
# 学習ログ解析


def evaluate_history(history):
    # 損失と精度の確認
    print(f"初期状態: 損失: {history[0,3]:.5f} 精度: {history[0,4]:.5f}")
    print(f"最終状態: 損失: {history[-1,3]:.5f} 精度: {history[-1,4]:.5f}")

    num_epochs = len(history)
    unit = num_epochs / 10

    # 学習曲線の表示 (損失)
    plt.figure(figsize=(9, 8))
    plt.plot(history[:, 0], history[:, 1], "b", label="訓練")
    plt.plot(history[:, 0], history[:, 3], "k", label="検証")
    plt.xticks(np.arange(0, num_epochs + 1, unit))
    plt.xlabel("繰り返し回数")
    plt.ylabel("損失")
    plt.title("学習曲線(損失)")
    plt.legend()
    plt.show()

    # 学習曲線の表示 (精度)
    plt.figure(figsize=(9, 8))
    plt.plot(history[:, 0], history[:, 2], "b", label="訓練")
    plt.plot(history[:, 0], history[:, 4], "k", label="検証")
    plt.xticks(np.arange(0, num_epochs + 1, unit))
    plt.xlabel("繰り返し回数")
    plt.ylabel("精度")
    plt.title("学習曲線(精度)")
    plt.legend()
    plt.show()

In [None]:
# イメージとラベル表示
def show_images_labels(loader, classes, net, device):
    # データローダーから最初の1セットを取得する
    for images, labels in loader:
        break
    # 表示数は50個とバッチサイズのうち小さい方
    n_size = min(len(images), 50)

    if net is not None:
        # デバイスの割り当て
        inputs = images.to(device)
        labels = labels.to(device)

        # 予測計算
        outputs = net(inputs)
        predicted = torch.max(outputs, 1)[1]

    # 最初のn_size個の表示
    plt.figure(figsize=(20, 15))
    for i in range(n_size):
        ax = plt.subplot(5, 10, i + 1)
        label_name = classes[labels[i]]
        # netがNoneでない場合は、予測結果もタイトルに表示する
        if net is not None:
            predicted_name = classes[predicted[i]]
            # 正解かどうかで色分けをする
            if label_name == predicted_name:
                c = "k"
            else:
                c = "b"
            ax.set_title(label_name + ":" + predicted_name, c=c, fontsize=20)
        # netがNoneの場合は、正解ラベルのみ表示
        else:
            ax.set_title(label_name, fontsize=20)
        # TensorをNumPyに変換
        image_np = images[i].numpy().copy()
        # 軸の順番変更 (channel, row, column) -> (row, column, channel)
        img = np.transpose(image_np, (1, 2, 0))
        # 値の範囲を[-1, 1] -> [0, 1]に戻す
        img = (img + 1) / 2
        # 結果表示
        plt.imshow(img)
        ax.set_axis_off()
    plt.show()

In [None]:
# 評価

evaluate_history(history)

In [None]:
# 最初の50個の表示

show_images_labels(test_loader, const.CLASSES, None, device)
show_images_labels(test_loader, const.CLASSES, net, device)