<a href="https://colab.research.google.com/github/naoya1110/jetbot_road_following/blob/main/02_train_model_resnet18.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Road Following by Classification - AIモデルの学習

## はじめに
データ収集が終わったらAIモデルを学習させましょう。

### GPUの確認
モデルの学習に必要なGPUが使える状態か確認しましょう。"cuda"と表示されればGPUが使えます。

In [None]:
import torch

if torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"

print(device)

### パッケージのインポート
必要なPythonパッケージをインポートしましょう。

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
from tqdm.notebook import tqdm
import shutil

## データの準備

### データのアップロード
左のファイルタブを開いてdataset.zipをアップロードしましょう。アップロードには少し時間がかかります。

### ZIPファイルの解凍
次のセルを実行してdataset.zipを解凍しましょう。

In [None]:
!unzip -q dataset.zip

### 壊れたデータの削除
データセットには画像として読み込めない壊れたデータが含まれていることがあります。このようなデータはモデルの学習に使用できないので削除しましょう。

In [None]:
subdirs = sorted(os.listdir("dataset"))

for subdir in subdirs:
    filenames = os.listdir(os.path.join("dataset", subdir))
    for filename in filenames:
        path = os.path.join("dataset", subdir, filename)
        img = cv2.imread(path)
        if img is None:
            try:
                os.remove(path)
                print("Removed", path)
            except IsADirectoryError:
                shutil.rmtree(path)
                print("Removed", path)

### データ量の確認
各クラスのデータがどれくらいあるか確認しましょう。

In [None]:
n_files = {}

for subdir in subdirs:
    n_file =  len(os.listdir(os.path.join("dataset", subdir)))
    n_files[subdir]=n_file

print(n_files)
plt.figure(figsize=(5,3))
plt.rcParams["font.size"]=14
plt.bar(n_files.keys(), n_files.values())

### クラスの重み (Class Weights)
データ量に偏りがあるとAIモデルは偏った推論をするように学習してしまうことがあります。これを補正するために各クラスのデータ量に応じた重みづけを学習時に利用します。

In [None]:
n_total = sum(n_files.values())
class_weights = (1/np.array(list(n_files.values())))*(n_total/3)
class_weights = torch.tensor(class_weights, dtype=torch.float32)
class_weights

### Datasetオブジェクトの作成

In [None]:
import torchvision
import torchvision.datasets as datasets
import torchvision.transforms.v2 as v2


dataset = datasets.ImageFolder(
    'dataset',
    v2.Compose([
        v2.ColorJitter(0.1, 0.1, 0.1, 0.1),
        v2.Resize((224, 224)),
        v2.ToTensor(),
    ])
)

### Class Index
クラスの指標（インデックス）を確認しましょう。

In [None]:
dataset.class_to_idx

クラス名の辞書を作成しましょう。

In [None]:
classnames = {}

for key, value in dataset.class_to_idx.items():
    classnames[value] = key

classnames

### Train Test Split
全データを学習用データとテスト用データに分離しましょう。ここでは全データの20%をテスト用データとして使用します。

In [None]:
test_size = int(0.2*len(dataset))
train_size = len(dataset)-test_size
print("train size:", train_size)
print("test size:", test_size)
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

### DataLoaderオブジェクトの作成
学習用とテスト用のDataLoaderオブジェクトを作成しましょう。

In [None]:
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=20,
    shuffle=True,
    num_workers=2
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=20,
    shuffle=False,
    num_workers=2
)

## AIモデルの準備

### ResNet18
今回はResNet18というAIモデルを使用しましょう。

In [None]:
import torchvision.models as models
import torch.nn as nn

def create_resnet18():
    model = models.resnet18(pretrained=True)
    model.fc = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(512, 3)
    )
    return model

model = create_resnet18().to(device)

## モデルの学習

### 学習ループ

モデルの学習しましょう。ハイパーパラメータは適宜変更してください。

In [None]:
import torch.optim as optim

model = create_resnet18().to(device)
loss_func = nn.CrossEntropyLoss(weight=class_weights.to(device))                      # set loss function
optimizer = optim.Adam(model.parameters(), lr=1E-4)    # set optimizer
epochs = 10

best_model_path = 'best_model_resnet18.pth'
best_accuracy = 0.0

# create empty lists for saving metrics during training
train_loss_list = []
train_accuracy_list = []
test_loss_list = []
test_accuracy_list = []

for epoch in range(epochs):
    print("-----------------------------")
    print(f"Epoch {epoch+1}/{epochs}")

    # initialize metrics
    train_correct_count = 0
    train_accuracy = 0
    train_loss = 0
    test_correct_count = 0
    test_accuracy = 0
    test_loss = 0

    #--- Training Phase ---#
    model.train()    # set model to training mode

    pbar = tqdm(train_loader)
    pbar.set_description("Train")

    for x_batch, y_batch in pbar:      # take mini batch data from train_loader

        x_batch = x_batch.to(device)     # load x_batch data on GPU
        y_batch = y_batch.to(device)     # load y_batch data on GPU

        optimizer.zero_grad()                  # reset gradients to 0
        p_batch = model(x_batch)               # do prediction
        loss = loss_func(p_batch, y_batch)     # measure loss
        loss.backward()                        # calculate gradients
        optimizer.step()                       # update model parameters

        train_loss += loss.item()                                # accumulate loss value
        p_batch_label = torch.argmax(p_batch, dim=1)             # convert p_batch vector to p_batch_label
        train_correct_count += (p_batch_label == y_batch).sum()  # count up number of correct predictions

        pbar.set_postfix({"accuracy":f"{(p_batch_label == y_batch).sum()/len(x_batch):.4f}", "loss": f"{loss.item():.4f}"})
    #----------------------#

    #--- Evaluation Phase ---#
    with torch.no_grad():   # disable autograd for saving memory usage
        model.eval()        # set model to evaluation mode

        pbar = tqdm(test_loader)
        pbar.set_description("Test")

        for x_batch, y_batch in pbar:   # take mini batch data from test_loader

            x_batch = x_batch.to(device)     # load x_batch data on GPU
            y_batch = y_batch.to(device)     # load y_batch data on GPU

            p_batch = model(x_batch)              # do prediction
            loss = loss_func(p_batch, y_batch)    # measure loss

            test_loss += loss.item()                                # accumulate loss value
            p_batch_label = torch.argmax(p_batch, dim=1)            # convert p_batch vector to p_batch_label
            test_correct_count += (p_batch_label == y_batch).sum()  # count up number of correct predictions

            pbar.set_postfix({"accuracy":f"{(p_batch_label == y_batch).sum()/len(x_batch):.4f}", "loss": f"{loss.item():.4f}"})
    #------------------------#

    train_accuracy = train_correct_count.item()/len(train_dataset)   # determine accuracy for training data
    test_accuracy = test_correct_count.item()/len(test_dataset)      # determine accuracy for test data
    train_loss = train_loss/len(train_loader)                        # determine loss for training data
    test_loss = test_loss/len(test_loader)                           # determine loss for test data

    # show and store metrics
    print(f"Train: Accuracy={train_accuracy:.3f} Loss={train_loss:.3f}, Test: Accuracy={test_accuracy:.3f} Loss={test_loss:.3f}")
    train_accuracy_list.append(train_accuracy)
    train_loss_list.append(train_loss)
    test_accuracy_list.append(test_accuracy)
    test_loss_list.append(test_loss)

    # save the model if test accuracy is better than before
    if test_accuracy > best_accuracy:
        torch.save(model.state_dict(), best_model_path)
        print(f"Test accuracy improved from {best_accuracy:.3f} to {test_accuracy:.3f}")
        print(f"Model saved at {best_model_path}")
        best_accuracy = test_accuracy

### 学習曲線
正解率とロスがどのように変化したか確認しましょう。

In [None]:
plt.figure(figsize=(5,3))
plt.rcParams["font.size"]=14
real_epochs = np.arange(len(train_accuracy_list))+1

plt.plot(real_epochs, train_accuracy_list, c="#ff7f0e", label="train acc")
plt.plot(real_epochs, test_accuracy_list, lw=0, marker="o", c="#ff7f0e", label="test acc")
plt.plot(real_epochs, train_loss_list, c="#1f77b4", label="train loss")
plt.plot(real_epochs, test_loss_list, lw=0, marker="o", c="#1f77b4", label="test loss")

plt.xlabel("Epoch")
plt.ylabel("Accuracy & Loss")
plt.grid()
plt.legend()

## 学習済みモデルの評価

### 最良モデルの読み込み

In [None]:
model.load_state_dict(torch.load(best_model_path))   # load model parameters to the initialized model

### テストデータに対する正解率

In [None]:
test_accuracy = 0

y_test_all = np.array([])
p_label_all = np.array([])

with torch.no_grad():    # disable autograd
    model.eval()         # set model to evaluation mode

    for x_batch, y_batch in test_loader:    # take mini batch data from train_loader
        x_batch = x_batch.to(device)        # transfer x_batch to gpu
        y_batch = y_batch.to(device)        # transfer y_batch to gpu
        p_batch = model(x_batch)            # do prediction

        p_batch_label = torch.argmax(p_batch, dim=1)       # convert p_batch vector to p_batch_label
        test_accuracy += (p_batch_label == y_batch).sum()  # count up number of correct predictions

        y_test_all = np.append(y_test_all, y_batch.to("cpu").numpy())          # append y_batch in y_test_all
        p_label_all = np.append(p_label_all, p_batch_label.to("cpu").numpy())  # append p_batch_label in p_label_all

test_accuracy = test_accuracy/len(test_dataset)      # determine accuracy for test data
print(f"Test Accuracy = {test_accuracy:.3f}")

### 混同行列

In [None]:
from sklearn.metrics import confusion_matrix

cmx = confusion_matrix(y_test_all, p_label_all)

cmx_pct = np.zeros(cmx.shape)

for i in range(cmx.shape[0]):
    for j in range(cmx.shape[1]):
        cmx_pct[i, j] = cmx[i, j]/cmx[i, :].sum()

plt.figure(figsize=(6,4))
labels = classnames.values()

sns.heatmap(cmx_pct, annot=True, fmt=".2f", cmap="Blues", vmin=0, vmax=1,
            xticklabels=classnames.values(), yticklabels=classnames.values(), square=True)

plt.ylabel("True")
plt.xlabel("Pred")
plt.title("confusion matrix")

### テストデータに対する推論

In [None]:
plt.figure(figsize=(20, 14))

for i in range(50):
    image, _ = test_dataset[i]
    image = np.transpose(image, (1,2,0))

    plt.subplot(5, 10, i+1)
    plt.imshow(image)

    true_class = classnames[y_test_all[i]]
    pred_class = classnames[p_label_all[i]]
    if true_class == pred_class:
        color = "green"
    else:
        color = "red"
    plt.title(f"T={true_class}\nP={pred_class}", color=color)
    plt.axis("off")

## 学習済みモデルのダウンロード
モデルの学習が終わったら`best_model_resnet18.pth`をダウンロードし，JetBotに入れましょう。