<a target="_blank" href="https://colab.research.google.com/github/trainocate-japan/Machine-Learning-and-Deep-Learning-Hands-on/blob/main/exercise/6_ディープラーニング/6-4_(演習)PyTorchによるワインの品種分類.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>


# 6-3_PyTorchによるワインの品種分類
このノートブックでは、PyTorchで分類の予測モデルを作成します。<br>
予測を行うテーマはwineの成分からブドウの品種を予測することです。

## ライブラリのインポート

In [None]:
# データを処理するための基本的なライブラリ
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns

# scikit-learnから必要なライブラリをインポート
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from sklearn import datasets # scikit-learnからデータセットをインポートするためのライブラリ

# PyTorchで使用
import torch
# PyTorchのインポート設定は色々なスタイルがありますが、事前にインポートするとKerasに比べるて非常に多くのインポートを書く必要がある為、
# 今回は使用するたびに全て記載するスタイルをとります。

## データの準備

In [None]:
# データの読込
wine_dataset = datasets.load_wine()
# データセットのキー項目を確認
print(wine_dataset.keys())

In [None]:
# データセットの説明はDESCRに格納されています。
print(wine_dataset['DESCR'])

In [None]:
# 目的変数となる変数名と、説明変数となる変数名を確認する
print(wine_dataset['target_names'])
print(wine_dataset['feature_names'])

In [None]:
# 説明変数となるdataと目的変数となるtargetを確認する
print(type(wine_dataset['data']))
print(wine_dataset['data'].shape)
print(type(wine_dataset['target']))
print(wine_dataset['target'].shape)
# 178行のデータがそれぞれ格納されている

In [None]:
# 説明変数と目的変数をpandasのDataFrameに変換して列名をつけて取り出す
wine_dataset_x = pd.DataFrame(wine_dataset['data'], columns=wine_dataset['feature_names'])
wine_dataset_y = pd.DataFrame(wine_dataset['target'], columns=['Class'])
display(wine_dataset_x.head(3))
display(wine_dataset_y.head(3))

In [None]:
# データの概要を把握します。
# 説明変数と目的変数のDataFrameを結合します。
wine = pd.concat([wine_dataset_x, wine_dataset_y], axis=1)

print('\n--要約--\n')
display(wine.info())
print('\n--統計情報--\n')
display(wine.describe())

In [None]:
# 目的変数を確認してみる
print(wine_dataset['target'])
# 0,1,2は先ほど表示したfeature_namesに対応している。(今回は品種名までは書かれていない)
# 0:class_0、1:class_1、2:class_2
# ★PyTorchの多値分類では正解データを0,1,2,・・・というラベルの表現のまま渡すので、one-hot表現は不要

In [None]:
# 訓練データと検証データに分割（80%を訓練用に使用）
train_x, test_x, train_y, test_y = train_test_split(wine_dataset_x, wine_dataset['target'], train_size=0.8, test_size=0.2, random_state=0, stratify=wine_dataset['target']) 
# さらに訓練データを検証データに分割（訓練データの20%を検証用に使用）
train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, train_size=0.8, test_size=0.2, random_state=0) 

In [None]:
# 標準化を行う
# 標準化はscikit-learnに用意されている
scaler_x = StandardScaler()
scaler_x.fit(train_x)
train_x_scaled = scaler_x.transform(train_x)
val_x_scaled = scaler_x.transform(val_x)
test_x_scaled = scaler_x.transform(test_x)

print(type(train_x_scaled))
print(train_x_scaled[:2])
print(type(val_x_scaled))
print(val_x_scaled[:2])
print(type(test_x_scaled))
print(test_x_scaled[:2])
# scikit-learnのStandardScalerを使用するとnumpy配列になるので、説明変数はpandasのデータフレームからnumpyへの変換は不要
# 目的変数もデータセットから直接取得している場合にはnumpy配列なので変換は不要

#### データをtorchtensorに変換する

In [None]:
# numpy配列をテンソルに変換
# ★多値分類では回帰とはデータの準備が異なる部分があります。 
# 目的変数はone-hot表現にはせず、N行1列の形にもせず、1次配列の形で用意します。
train_x_scaled = torch.Tensor(train_x_scaled).float()
train_y = torch.Tensor(train_y).long()
val_x_scaled = torch.Tensor(val_x_scaled).float()
val_y = torch.Tensor(val_y).long()
test_x_scaled = torch.Tensor(test_x_scaled).float()
test_y = torch.Tensor(test_y).long()
## データをテンソルデータセットインスタンスにする
train_dataset = torch.utils.data.TensorDataset(train_x_scaled, train_y)
val_dataset = torch.utils.data.TensorDataset(val_x_scaled, val_y)
test_dataset = torch.utils.data.TensorDataset(test_x_scaled, test_y)

## モデルの定義

In [None]:
# 乱数シードの固定
torch.manual_seed(0)

# ニューラルネットワークを定義
class Net(torch.nn.Module):

  # 必要な層や活性化関数を定義する
  def __init__(self):
    super(Net, self).__init__()
    self.l1 = torch.nn.Linear(train_x.shape[1], 128)     # 中間層1
    self.a1 = torch.nn.ReLU()  # 活性化関数1
    self.l2 = torch.nn.Linear(128, 128)   # 中間層2
    self.a2 = torch.nn.ReLU()  # 活性化関数2
    self.l3 = torch.nn.Linear(128, 3)     # 出力層

  # 順伝搬を定義。引数のxは、説明変数。
  # 順番に関数を実行し、その結果を次の関数に渡していく
  def forward(self, x):
    x = self.l1(x)
    x = self.a1(x)
    x = self.l2(x)
    x = self.a2(x)
    x = self.l3(x)
    return x

## 学習

In [None]:
num_epochs = 700

# データローダーの用意
# PyTorchではデータローダーに格納されたデータセットをバッチサイズで区切って、学習のたびに取り出します。
# shuffleをTrueに設定することで、データをシャッフルして取り出します(★batch_sizeをデータ量に合わせて32に設定)
batch_size = 32
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# モデルをインスタンス化
net = Net()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001) # 最適化手法の用意
criterion = torch.nn.CrossEntropyLoss() # ★誤差関数の用意(多値分類なので、交差エントロピー誤差を使う)

## 学習時に経過情報を保存する空リストを作成
train_loss_list = []      # 学習データの誤差関数用リスト
val_loss_list = []        # 検証データの誤差関数用リスト

# エポック分の繰り返し
for epoch in range(num_epochs):
    
    #学習の進行状況を表示
    print('--------')
    print("Epoch: {}/{}".format(epoch + 1, num_epochs))

    # 損失の初期化
    train_loss = 0        # 学習データの誤差関数
    val_loss = 0          # 検証データの誤差関数
    
    #=====学習パート=======
    # 学習モードに設定
    # PyTorchでは学習時と評価時でモードを切り替える
    net.train()

    #ミニバッチごとにデータをロードして学習
    for x, y in train_dataloader:
        preds = net(x)                            # 順伝搬で予測を実行
        loss = criterion(preds, y)                # 誤差関数を計算
        optimizer.zero_grad()                     # 勾配を初期化
        loss.backward()                           # 勾配を計算
        optimizer.step()                          # パラメータ更新
        train_loss += loss.data.numpy().tolist()  # ミニバッチごとの損失を格納   
    #ミニバッチの平均の損失を計算
    batch_train_loss = train_loss / len(train_dataloader)
    
    #=====評価パート(検証データ)=======
    # 評価モードに設定
    net.eval()
    # 評価時は勾配計算は不要なので、勾配計算を無効にして負荷を下げる
    with torch.no_grad():
        for x, y in val_dataloader:
            preds = net(x)                        # 順伝搬で予測を実行
            loss = criterion(preds, y)            # 誤差関数を計算
            val_loss += loss.item()               # ミニバッチごとの損失を格納    
    #ミニバッチの平均の損失を計算
    batch_val_loss = val_loss / len(val_dataloader)
    
    #エポックごとに損失を表示
    print("Train_Loss: {:.4f}".format(batch_train_loss))
    print("val_loss: {:.4f}".format(batch_val_loss))
    #損失をリスト化して保存
    train_loss_list.append(batch_train_loss)
    val_loss_list.append(batch_val_loss)

## 評価

In [None]:
# この後精度改善のために何度か確認するので、関数化しておきます
def myevaluete():

  # 誤差関数の可視化
  fig = plt.figure() # グラフの描画領域全体のオブジェクトを取得
  fig.set_figheight(8) # 縦の幅を指定
  fig.set_figwidth(12) # 横の幅を指定
  plt.plot(train_loss_list, color='b', label='train_Loss')
  plt.plot( val_loss_list, color='m', label='val_loss')
  plt.xlabel('epoch')
  plt.ylabel('loss')
  plt.legend()
  plt.show()

  net.eval() # モデルを評価モードにする

  # テストデータ用のデータローダを用意
  test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

  y_pred = None
  with torch.no_grad():
      # ミニバッチで取り出しながら、最初のバッチはy_predに設定し、
      # 2回目以降はそこへnumpy配列で接続していく
      for inputs, labels in test_dataloader:
          outputs = net(inputs)
          if y_pred is None:
              y_pred = outputs.data.numpy()
          else:
              y_pred = np.concatenate([y_pred, outputs.data.numpy()])

  # np.argmaxで最も確率が高い値を0,1,2に変換する
  accuracy = accuracy_score(test_y, np.argmax(y_pred, axis=1))
  print('テストデータに対する予測精度：{}\n'.format(accuracy))

myevaluete()

# ニューラルネットワークモデルを改良する

In [None]:
# Early-Stopping機能を実装したクラス
class EarlyStopping:

  def __init__(self, patience=0):
    self._step = 0              # lossが改善しなかった連続回数をカウントする。先頭の_は内部的であることを表わしています
    self._loss = float('inf')   # そこまでで最も改善が見られたlossの値を格納する
    self.patience = patience    # 引数で指定する、何回改善されなかったら早期終了するかの回数

  def __call__(self, loss):
    if self._loss < loss:
      self._step += 1   # lossが改善しなければ_stepを1増やす
      if self._step >= self.patience:    # patienceの回数改善しなかったら早期終了する
        print('early stopping')
        return True
    else:               # lossが改善した場合は_stepを0にして_lossを更新する
      self._step = 0
      self._loss = loss
    return False

In [None]:
# 乱数シードの固定
torch.manual_seed(0)

# ニューラルネットワークを定義
class Net(torch.nn.Module):

  # 必要な層や活性化関数を定義する
  def __init__(self):
    super(Net, self).__init__()
    self.l1 = torch.nn.Linear(train_x.shape[1], 128)     # 中間層1
    self.a1 = torch.nn.ReLU()             # 活性化関数1
    self.d1 = torch.nn.Dropout(0.2)       # ★ドロップアウト層1★
    self.l2 = torch.nn.Linear(128, 128)   # 中間層2
    self.a2 = torch.nn.ReLU()             # 活性化関数2
    self.d2 = torch.nn.Dropout(0.2)       # ★ドロップアウト層2★
    self.l3 = torch.nn.Linear(128, 3)     # 出力層

  # 順伝搬を定義。引数のxは、説明変数。
  # 順番に関数を実行し、その結果を次の関数に渡していく
  def forward(self, x):
    x = self.l1(x)
    x = self.a1(x)
    x = self.d1(x) # ★ドロップアウト層1★
    x = self.l2(x)
    x = self.a2(x)
    x = self.d2(x) # ★ドロップアウト層2★
    x = self.l3(x)
    return x

In [None]:
num_epochs = 700

# データローダーの用意
# PyTorchではデータローダーに格納されたデータセットをバッチサイズで区切って、学習のたびに取り出します。
# shuffleをTrueに設定することで、データをシャッフルして取り出します
batch_size = 32
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# ★早期終了のインスタンスを準備★
es = EarlyStopping(patience=10)

# モデルをインスタンス化
net = Net()
optimizer = torch.optim.SGD(net.parameters(), lr=0.01) # 最適化手法の用意
criterion = torch.nn.CrossEntropyLoss() # ★誤差関数の用意(多値分類なので、交差エントロピー誤差を使う)

## 学習時に経過情報を保存する空リストを作成
train_loss_list = []      # 学習データの誤差関数用リスト
val_loss_list = []        # 検証データの誤差関数用リスト

# エポック分の繰り返し
for epoch in range(num_epochs):
    
    #学習の進行状況を表示
    print('--------')
    print("Epoch: {}/{}".format(epoch + 1, num_epochs))

    # 損失の初期化
    train_loss = 0        # 学習データの誤差関数
    val_loss = 0          # 検証データの誤差関数
    
    #=====学習パート=======
    # 学習モードに設定
    # PyTorchでは学習時と評価時でモードを切り替える
    net.train()

    #ミニバッチごとにデータをロードして学習
    for x, y in train_dataloader:
        preds = net(x)                            # 順伝搬で予測を実行
        loss = criterion(preds, y)                # 誤差関数を計算
        optimizer.zero_grad()                     # 勾配を初期化
        loss.backward()                           # 勾配を計算
        optimizer.step()                          # パラメータ更新
        train_loss += loss.data.numpy().tolist()  # ミニバッチごとの損失を格納   
    #ミニバッチの平均の損失を計算
    batch_train_loss = train_loss / len(train_dataloader)
    
    #=====評価パート(検証データ)=======
    # 評価モードに設定
    net.eval()
    # 評価時は勾配計算は不要なので、勾配計算を無効にして負荷を下げる
    with torch.no_grad():
        for x, y in val_dataloader:
            preds = net(x)                        # 順伝搬で予測を実行
            loss = criterion(preds, y)            # 誤差関数を計算
            val_loss += loss.item()               # ミニバッチごとの損失を格納    
    #ミニバッチの平均の損失を計算
    batch_val_loss = val_loss / len(val_dataloader)
    
    #エポックごとに損失を表示
    print("Train_Loss: {:.4f}".format(batch_train_loss))
    print("val_loss: {:.4f}".format(batch_val_loss))
    #損失をリスト化して保存
    train_loss_list.append(batch_train_loss)
    val_loss_list.append(batch_val_loss)

    # ★早期終了判定★
    if es(batch_val_loss):
      break

In [None]:
myevaluete()

※ ニューラルネットワークの構成によっては、結果が97.2%もしくは100%になるかもしれません。これはテストデータを切り出す際に全体の20%で指定をすると36件のデータがテストデータとなり、そのうちの35件を正しく予測できると97.2%になるため、35件あてられたか、36件あてられたかの違いです。<br>
分類問題の精度は正解率なので、0か1のやや極端な値です。ギリギリ正解だったのか、余裕で正解だったのかは考慮されていません。そういった関係から直感的にはわかりにくいものの、誤差関数のほうが評価指標として優先される場合もあります。