# 第5章: PyTorch を用いたトランスクリプトームデータの分類

- 澤田 高志
- 清水 秀幸

##### 入力5-1

In [None]:
!pip install optuna

##### 入力5-2

In [None]:
# パッケージのインポート
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import seaborn as sns
import sklearn
import torch

##### 入力5-3

In [None]:
# バージョンの確認
print('numpy: ', np.__version__)
print('pandas: ', pd.__version__)
print('matplotlib: ', matplotlib.__version__)
print('sklearn: ', sklearn.__version__)
print('torch: ', torch.__version__)
print('seaborn: ', sns.__version__)
print('optuna: ', optuna.__version__)

##### 入力5-4

In [None]:
# GPUが使用可能かどうか確認する
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

##### 入力5-5

In [None]:
# Numpyのアレイ
import numpy as np

data = [[1, 2], [3, 4]]
np_data = np.array(data)
print(np_data)

##### 入力5-6

In [None]:
# PyTorchのテンソル
import torch

tensor_data = torch.tensor(data)
print(tensor_data)

##### 入力5-7

In [None]:
tensor_from_np = torch.from_numpy(np_data)
tensor_from_np

##### 入力5-8

In [None]:
# 前のデータ型のように (like) ランダムなテンソルを作る
tensor_random = torch.rand_like(tensor_data, dtype=torch.float)
print(tensor_random)

##### 入力5-9

In [None]:
# 4*4の行列の成分がすべて1であるテンソル
tensor = torch.ones(4, 4)

# GPU上へtensorを動かす
tensor.to(device)
print(tensor)

##### 入力5-10

In [None]:
# Numpyと似たような指定ができる
# 値の代入
tensor[:, 1] = 0
tensor[1, :] = 3
print(tensor)

##### 入力5-11

In [None]:
# 行や列の抽出
print('0行目: ', tensor[0])
print('1列目: ', tensor[:, 1])
print('最終列: ', tensor[:, -1])

##### 入力5-12

In [None]:
# dim=1で横に並べる
t1 = torch.cat([tensor, tensor, tensor], dim=1)
print(t1)

##### 入力5-13

In [None]:
# テンソルの計算: ここでは行列式
tensor @ tensor

##### 入力5-14

In [None]:
# アダマール積
tensor * tensor

##### 入力5-15

In [None]:
sum_item = tensor.sum().item()
print(sum_item, type(sum_item))

##### 入力5-16

In [None]:
print(tensor, '\n')
# 'add'ではなく'add_'
tensor.add_(5)
print(tensor)

##### 入力5-17

In [None]:
t = torch.ones(5)
print('tensor: ', t)
n = t.numpy()
print('numpy: ', n)

##### 入力5-18

In [None]:
t.add_(1)
print('tensor: ', t)
print('numpy: ', n)

##### 入力5-19

In [None]:
n = np.ones(5)
t = torch.from_numpy(n)

##### 入力5-20

In [None]:
np.add(n, 3, out=n)
print('tensor: ', t)
print('numpy: ', n)

##### 入力5-21

In [None]:
predicted_probability_p = [0.8, 0.2, 0.6]
true_label_t = [1.0, 0, 0] # 1.0でなく1にするとテンソルにする際にエラーが起きる

# Numpyで計算をしてみよう
import numpy as np

BCE_0 = -true_label_t[0] * np.log(predicted_probability_p[0])
BCE_1 = -(1 - true_label_t[1]) * np.log(1 - predicted_probability_p[1])
BCE_2 = -(1 - true_label_t[2]) * np.log(1 - predicted_probability_p[2])
print(BCE_0, BCE_1, BCE_2)

##### 入力5-22

In [None]:
np.mean([BCE_0, BCE_1, BCE_2])

##### 入力5-23

In [None]:
# torchでは，BCELoss
import torch
import torch.nn as nn

loss = nn.BCELoss()
# Numpyのアレイをtorch.tensorでPyTorchで扱える形に変換する
p_tensor = torch.tensor(predicted_probability_p)
t_tensor = torch.tensor(true_label_t) # 1.0でなく1にするとエラーが起きる
loss(p_tensor, t_tensor)

##### 入力5-24

In [None]:
# シグモイド関数を作図
# シグモイド関数
def sigmoid(x):
      return 1 / (1 + np.exp(-x))
# シグモイド関数の導関数
def sigmoid_prime(x):
      return (np.exp(x)) / ((1 + np.exp(x)) ** 2)

# tanh関数を作図
# tanh関数
def tanh(x):
      return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))
# tanh関数の導関数
def tanh_prime(x):
      return 4 / (np.exp(x) + np.exp(-x)) ** 2

# 定義域を決める
x = np.linspace(-6, 6, 256)
# サイズを定める
fig = plt.figure(figsize=(12, 6))

# 1*2に分けて1番目に描画
ax1 = fig.add_subplot(121)
ax1.set_title('Sigmoid and Sigmoid_prime', size=16)
ax1.set_xlabel('x', size=16)
ax1.set_ylabel('y', size=16)
ax1.grid(axis='both')
ax1.set_ylim(-1.1,1.1)

# sigmoidのグラフを描く
ax1.plot(x, sigmoid(x), color='#1f77b4', label='sigmoid')
ax1.plot(x, sigmoid_prime(x), color='#ff7f0e', label='sigmoid_prime')
ax1.legend()

# 1*2に分けて2番目に描画
ax2 = fig.add_subplot(122)
ax2.set_title('tanh and tanh_prime', size=16)
ax2.set_xlabel('x', size=16)
#ax2.set_ylabel('y', size=16)
ax2.grid(axis='both')
ax2.set_ylim(-1.1,1.1)

# tanhのグラフを描く
ax2.plot(x, tanh(x), color='#1f77b4', label='tanh')
ax2.plot(x, tanh_prime(x), color='#ff7f0e', label='tanh_prime')
ax2.legend()

##### 入力5-25

In [None]:
# ReLU関数を作図
# ReLU関数
def ReLU(x):
      return np.maximum(0, x)

# ReLU関数の導関数
def ReLU_prime(x):
      return np.where(x > 0, 1, 0)

# 定義域を決める
x = np.linspace(-6, 6, 256)
# サイズを定める
fig = plt.figure(figsize=(6, 6))

# 1*1に分けて1番目に描画
ax = fig.add_subplot(111)
ax.set_title('ReLU and ReLU_prime', size=16)
ax.set_xlabel('x', size=16)
ax.set_ylabel('y', size=16)
ax.grid(axis='both')

# ReLUのグラフを描く
ax.plot(x, ReLU(x), color='#1f77b4', label='ReLU')
ax.plot(x, ReLU_prime(x), color='#ff7f0e', label='ReLU_prime')
ax.legend()

##### 入力5-26

In [None]:
import matplotlib.pyplot as plt
import numpy as np

from mpl_toolkits.mplot3d import axes3d

x = np.linspace(-6, 6, 256)
y = np.linspace(-6, 6, 256)
# メッシュ作成
X, Y = np.meshgrid(x, y)
# 計算式
Loss = X**2 - Y**2

fig = plt.figure(figsize=(12, 6))

# x軸が見えやすい方向から見た鞍点
# add_subplot(121)とは，1*2に図を並べる際の1番目を指定している
ax1 = fig.add_subplot(121, projection='3d')
ax1.set_title('Saddle_point', size=16)
ax1.set_xlabel('$w_{1}$', size=16)
ax1.set_ylabel('$w_{2}$', size=16)
ax1.set_zlabel('Loss', size=16)
# 鞍点の例: 双曲放物面を描く
ax1.plot_wireframe(X, Y, Loss, color='#1f77b4')
# 角度を指定
ax1.view_init(elev=20, azim=65)
# 鞍点はx=y=Loss=0の点である.サイズを大きく設定したが見えにくい
ax1.scatter(0, 0, 0, s=300, color='#ff7f0e')

# y軸が見えやすい方向から見た鞍点
ax2 = fig.add_subplot(122, projection='3d')
ax2.set_title('Saddle_point', size=16)
ax2.set_xlabel('$w_{1}$', size=16)
ax2.set_ylabel('$w_{2}$', size=16)
ax2.set_zlabel('Loss', size=16)
# 鞍点の例: 双曲放物面を描く
ax2.plot_wireframe(X, Y, Loss, color='#1f77b4')
# 別の角度の指定
ax2.view_init(elev=20, azim=25)
# 鞍点はx=y=Loss=0の点である. サイズを大きく設定したが見えにくい
ax2.scatter(0, 0, 0, s=300, color='#ff7f0e')

plt.show()

##### 入力5-27

In [None]:
# アップロードしたcsvファイルを読み込む
gse_mRNA_exprs_normal_selected = pd.read_csv(
       '/content/GSE36376_normal.csv', index_col=0
)
gse_mRNA_exprs_tumor_selected = pd.read_csv(
       '/content/GSE36376_tumor.csv', index_col=0
)

# class labelを作り，正常を0，がんを1とする
gse_mRNA_exprs_normal_selected.loc['class'] = 0
gse_mRNA_exprs_tumor_selected.loc['class'] = 1

##### 入力5-28

In [None]:
# 分類器を作るために，データを分ける.
from sklearn.model_selection import train_test_split

# 転置したものをtrain_test_splitで分ける
# まずはtrainvalデータセットとtestデータに分ける
# testは全体の25%，trainvalが75%
gse_mRNA_exprs_normal_selected_trainval, gse_mRNA_exprs_normal_selected_test = train_test_split(
    gse_mRNA_exprs_normal_selected.T, train_size=0.75, random_state=0)
gse_mRNA_exprs_tumor_selected_trainval, gse_mRNA_exprs_tumor_selected_test = train_test_split(
    gse_mRNA_exprs_tumor_selected.T, train_size=0.75, random_state=0)

# 次にtrainvalデータセットをtrainとvalに分ける
# trainが全体の50%，valが全体の25%
gse_mRNA_exprs_normal_selected_train, gse_mRNA_exprs_normal_selected_val = train_test_split(
    gse_mRNA_exprs_normal_selected_trainval, train_size=0.667, random_state=0)
gse_mRNA_exprs_tumor_selected_train, gse_mRNA_exprs_tumor_selected_val = train_test_split(
    gse_mRNA_exprs_tumor_selected_trainval, train_size=0.667, random_state=0)

# trainデータ，validationデータ，testデータを作り出す
gse_mRNA_exprs_train = pd.concat(
    [gse_mRNA_exprs_normal_selected_train, gse_mRNA_exprs_tumor_selected_train]
)
X_train = gse_mRNA_exprs_train.iloc[:, 0:-1]
y_train = gse_mRNA_exprs_train.iloc[:, -1]

gse_mRNA_exprs_val = pd.concat(
    [gse_mRNA_exprs_normal_selected_val, gse_mRNA_exprs_tumor_selected_val]
)

X_val = gse_mRNA_exprs_val.iloc[:, 0:-1]
y_val = gse_mRNA_exprs_val.iloc[:, -1]

gse_mRNA_exprs_test = pd.concat(
    [gse_mRNA_exprs_normal_selected_test, gse_mRNA_exprs_tumor_selected_test]
)
X_test = gse_mRNA_exprs_test.iloc[:, 0:-1]
y_test = gse_mRNA_exprs_test.iloc[:, -1]

# 標準化する
from sklearn.preprocessing import StandardScaler

sc = StandardScaler()
# StandardScalerのfitメソッドを取ってくる→トレーニングデータからデータ全体の平均値と標準偏差を推定
sc.fit(X_train)

# トレーニングデータの平均と標準偏差の計算
X_train_std = sc.transform(X_train)
X_val_std = sc.transform(X_val)
X_test_std = sc.transform(X_test)

##### 入力5-29

In [None]:
# コードに再現性を持たせるために乱数を固定
torch.manual_seed(42)
torch.cuda.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.use_deterministic_algorithms = True

##### 入力5-30

In [None]:
from torch.utils.data import DataLoader, Dataset

# 持っているデータをPyTorchが扱えるデータ型に「翻訳」する
class TrainValData(Dataset):
    # データセットの初期設定 (init)
    def __init__(self, X_data, y_data):
        self.X_data = X_data
        self.y_data = y_data
    # 指定されたindexに対応するサンプルを取ってくる (getitem)
    def __getitem__(self, index):
        return self.X_data[index], self.y_data[index]
    # データセットのサンプル数 (len)
    def __len__(self):
        return len(self.X_data)

# 誤ってテストデータを訓練に使えないように，
# yのラベル情報はないものとしている
class TestData(Dataset):
    def __init__(self, X_data):
        self.X_data = X_data
    def __getitem__(self, index):
        return self.X_data[index]
    def __len__(self):
        return len(self.X_data)

# 訓練データ, 検証データ, テストデータをPyTorchで分割する
train_data = TrainValData(torch.FloatTensor(X_train_std), torch.FloatTensor(y_train))
val_data = TrainValData(torch.FloatTensor(X_val_std), torch.FloatTensor(y_val))
test_data = TestData(torch.FloatTensor(X_test_std))

##### 入力5-31

In [None]:
# データローダーを初期化
BATCH_SIZE = 64
# 訓練データセットについて，訓練データはサンプル数は216なので，そのままだと1エポック4イテレーション
# しかし，最後のイテレーションは24個分のデータしかない. これではノイズの影響が大きいため，
# drop_last=Trueにしておく
train_loader = DataLoader(
      dataset=train_data, batch_size=BATCH_SIZE, shuffle=True, drop_last=True
)

val_loader = DataLoader(dataset=val_data, batch_size=BATCH_SIZE)
test_loader = DataLoader(dataset=test_data, batch_size=BATCH_SIZE)

##### 入力5-32


In [None]:
# ニューラルネットワークアーキテクチャの定義
import torch.nn as nn

num_layers = 3
num_features = [32, 128, 64]

# モデルの定義のためにクラスを作る
class BinaryClassificationNet(nn.Module):
    # __init__()関数で使用するレイヤーを定義
    def __init__(self):

        # 基底クラスとしてnn.moduleを継承する
        # ニューラルネットワークの基本機能が使えるようになるおまじないと考えてもよい
        super(BinaryClassificationNet, self).__init__()

        # 一方通行のモデルを作る際はnn.Sequentialを使うと
        # コードがスッキリまとめられる
        # 第1層: 入力特徴量は13，出力はnum_features[0]=32
        self.layers1 = nn.Sequential(
            # nn.Linearは重みとバイアスだけを持つ結合層
            nn.Linear(in_features=13, out_features=num_features[0]),
            # バッチ正規化
            nn.BatchNorm1d(num_features[0]),
            # 活性化関数はReLU
            nn.ReLU(),
        )
        # 第2層: 入力特徴量はnum_features[0]=32，出力はnum_features[1]=128
        self.layers2 = nn.Sequential(
            nn.Linear(in_features=num_features[0], out_features=num_features[1]),
            nn.BatchNorm1d(num_features[1]),
            nn.ReLU(),
        )
        # 第3層: 入力特徴量はnum_features[1]=128，出力はnum_features[2]=64
        self.layers3 = nn.Sequential(
            nn.Linear(in_features=num_features[1], out_features=num_features[2]),
            nn.BatchNorm1d(num_features[2]),
            nn.ReLU(),
        )

        # 最後に先の出力のnum_features[i]を1にする.これは0である確率1つだけということを言う
        # 後述するが損失関数でnn.BCEWithLogitsLoss()を使う場合，活性化関数のシグモイドは使わない
        self.layers_out = nn.Linear(in_features=num_features[2], out_features=1)
    
    # 関数forward()では定義した層を積み重ねる
    # 図で言えば左から右の，順伝播処理を書いている
    def forward(self, x):
        x = self.layers1(x)
        x = self.layers2(x)
        x = self.layers3(x)
        x = self.layers_out(x)
        return x

##### 入力5-33

In [None]:
import torch.optim as optim

# GPUをモデルで使えるようにする
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = BinaryClassificationNet()
model.to(device)

print(model)

# 損失関数にはバイナリー交差エントロピーを示すnn.BCEWithLogitsLoss()を使う
# この損失関数では，出力特徴量を1の全結合層(nn.Linear(out_features=1))を受け取る
loss_fn = nn.BCEWithLogitsLoss()
# 重み更新にはAdamを用いる. 汎用性が高く，最も用いられているオプティマイザの1つである
LEARNING_RATE = 0.001
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# 学習過程でうまく学習できているか監視するために，損失関数だけでなく
# accuracyも見ることにする.そのためにもaccuracyを確認する関数を定義する
def binary_acc(y_true, y_pred):
    # torch.eq()で2つのテンソルで値が一致する数をカウントする
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

##### 入力5-34

In [None]:
# エポック数は400にする
EPOCHS = 400
model.train()
# トレーニングと検証を同時に行うforループを作る
# まだテストデータセットは使わない

# 損失関数の出力とaccuracyはリストにまとめて後で図示する
train_loss_li = []
train_acc_li = []
val_loss_li = []
val_acc_li = []

# エポック: すべての入力データを何回使えたかを示す
for epoch in range(EPOCHS):
    # イテレーション: 重みを何度更新したかを示す.訓練データセットは216種，
    # バッチサイズ64で，'drop_last=True'より，1エポックあたり3イテレーションである

    # 訓練では，まず，ミニバッチごとに分けられたデータをfor文で1つずつ扱う
    for X_batch_train, y_batch_train in train_loader:
        # 5.1.1項で行ったようにミニバッチのデータ(tensor)をGPU上に動かしている
        X_batch_train, y_batch_train = X_batch_train.to(device), y_batch_train.to(
            device
        )

        # 1. Forward関数を使って，図で言えば左から右の順伝播処理を作る
        # squeezeでいらない次元を減らす
        y_logits = model(X_batch_train).squeeze()
        # model()はBinaryClassificationNetのモデルだが，その出力は全結合層のものである
        # ゆえに，y_logitsはそのままでは確率ではない
        # y_logitsをシグモイド関数で予測確率に変換し，その数値を丸めて予測ラベルにする
        y_pred = torch.round(torch.sigmoid(y_logits))

        # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する. ここではaccも使う
        loss = loss_fn(y_logits, y_batch_train)
        acc = binary_acc(y_true=y_batch_train, y_pred=y_pred)

        # 3. オプティマイザの勾配を0に初期化し，過去の勾配の影響を排除して現在の勾配を計算できるようにする
        optimizer.zero_grad()

        # 4. 誤差逆伝播法で損失関数の勾配を計算する
        loss.backward()

        # 5. オプティマイザで重みなどのパラメータ更新を行う
        optimizer.step()
    
    # 訓練データセットでミニバッチ学習が終わったところで，検証データセットの結果も確認する
    model.eval()

    # 検証では勾配が変動してはならないため，torch.no_grad()でテンソルの勾配計算ができないようにする
    with torch.no_grad():
        for X_batch_val, y_batch_val in val_loader:
            X_batch_val, y_batch_val = X_batch_val.to(device), y_batch_val.to(device)
            # 1. Forward関数を使って，図で言えば左から右の順伝播処理を作る
            val_logits = model(X_batch_val).squeeze()
            val_pred = torch.round(torch.sigmoid(val_logits))
            # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する. ここではaccも使う
            val_loss = loss_fn(val_logits, y_batch_val)
            val_acc = binary_acc(y_true=y_batch_val, y_pred=val_pred)
            # 3以降は勾配の計算に関わってくるため，検証やテストでは使わない
        
        # 20エポックごとの結果をまとめる
        if epoch % 20 == 0:
            print(f'Epoch: {epoch} | Train_loss: {loss:.5f}, Train_acc: {acc:.2f}%')
            print(f'Epoch: {epoch} | Val_loss: {val_loss:.5f}, Val_acc: {val_acc:.2f}%')
        # 最後に，train_loss，train_acc，val_loss，val_accをリストにまとめる
        train_loss_li.append(loss.item())
        train_acc_li.append(acc)
        val_loss_li.append(val_loss.item())
        val_acc_li.append(val_acc)

##### 入力5-35

In [None]:
# 全400エポックにおけるtrain_loss，val_loss，train_acc，val_accを確認する

fig = plt.figure(figsize=(10, 6))

# add_subplot(121)とは，1*2に図を並べる際の1番目を指定している
ax1 = fig.add_subplot(121)
# ax1.set_xlabel('epochs', size = 16)
ax1.set_ylabel('loss', size=16)
ax1.plot(train_loss_li, label='train_loss')
ax1.plot(val_loss_li, color='#ff7f0e', label='val_loss')
ax1.axvline(x=100, color='#2ca02c')
ax1.legend()

ax2 = fig.add_subplot(122)
ax2.set_ylabel('acc', size=16)
# ax2.set_xlabel('epochs', size = 16)
ax2.plot(train_acc_li, label='train_acc')
ax2.plot(val_acc_li, color='#ff7f0e', label='val_acc')
ax2.axvline(x=100, color='#2ca02c')
ax2.legend()

plt.show()

##### 入力5-36

In [None]:
# 先人が過去に実装したearly_stoppingのための関数
# 日本語部分は著者による説明である
# 参考: https://github.com/Bjarten/early-stopping-pytorch/blob/master/pytorchtools.py
import numpy as np
import torch


class EarlyStopping:
    '''Early stopはval_lossが規定回数を経ても改善しなかった場合にそこで終了させる関数である'''

    def __init__(
            self, patience=7, verbose=False, delta=0, path='checkpoint.pt', trace_func=print
    ):

        '''
        Args:
            patience (int): val_lossが最後に改善されてからpatienceで指定された回まで改善なければ終了
                            Default: 7
            verbose (bool): Trueなら, val_lossの改善状況を示す
                            Default: False
            delta (float):  改善と認定するための最小変化量. delta以下の改善は改善と認めない
                            Default: 0
            path (str): PyTorchモデルを保存するパス
                            Default: 'checkpoint.pt'
            trace_func (function): val_lossが更新されないとき，それが何回目を示す
                            Default: print
        '''
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.path = path
        self.trace_func = trace_func

    def __call__(self, val_loss, model):
        
        # -val_lossをscoreにすることで，val_lossを最小化する目標から，scoreを最大化する目標へ
        score = -val_loss

        # best_scoreがなければscoreを当面のbest_scoreにする
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
        # scoreがbest_score+deltaに満たなければ棄却
        elif score < self.best_score + self.delta:
            # 棄却カウント1追加
            self.counter += 1
            self.trace_func(
                f'EarlyStopping counter: {self.counter} out of {self.patience}'
            )
            # 棄却カウントの合計がpatienceを超えたとき
            if self.counter >= self.patience:
                # Early stopで中止
                self.early_stop = True
        # scoreがそれまでのbest_scoreより大きいとき
        else:
            # scoreを更新
            self.best_score = score
            # modelとval_lossを保存
            self.save_checkpoint(val_loss, model)
            # 棄却カウントを0に戻す
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        '''val_lossが減少した際のmodelを保存する関数'''
        if self.verbose:
            self.trace_func(
                f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...'
            )
        torch.save(model.state_dict(), self.path)
        self.val_loss_min = val_loss

##### 入力 5-37

In [None]:
# エポック数は400にする
EPOCHS = 400

# early_stoppingを定義
early_stopping = EarlyStopping(patience=10, verbose=True)

# モデルとオプティマイザを最初の条件に戻す
model = BinaryClassificationNet()
model.to(device)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

model.train()
# トレーニングと検証を同時に行うforループを作る
# この節ではまだテストデータセットは使わない

# 損失関数の出力とaccuracyはリストにまとめて後で図示する
train_loss_li = []
train_acc_li = []
val_loss_li = []
val_acc_li = []

for epoch in range(EPOCHS):
    
    for X_batch_train, y_batch_train in train_loader:
        X_batch_train, y_batch_train = X_batch_train.to(device), y_batch_train.to(
            device
        )
        
        y_logits = model(X_batch_train).squeeze()
        y_pred = torch.round(torch.sigmoid(y_logits))

        # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する. ここではaccも使う
        loss = loss_fn(y_logits, y_batch_train)
        acc = binary_acc(y_true=y_batch_train, y_pred=y_pred)

        # 3. オプティマイザの勾配を0に初期化し，過去の勾配の影響を排除して現在の勾配を計算できるようにする
        optimizer.zero_grad()

        # 4. 誤差逆伝播法で損失関数の勾配を計算する
        loss.backward()

        # 5. オプティマイザで重みなどのパラメータ更新を行う
        optimizer.step()

    # 訓練データセットでミニバッチ学習が終わったところで，検証データセットの結果も確認する
    model.eval()

    # 検証では勾配が変動してはならないため，torch.no_grad()でテンソルの勾配計算ができないようにする
    with torch.no_grad():
        for X_batch_val, y_batch_val in val_loader:
            X_batch_val, y_batch_val = X_batch_val.to(device), y_batch_val.to(device)
            
            # 1. Forward関数を使って，図で言えば左から右の順伝播処理を作る
            val_logits = model(X_batch_val).squeeze()
            val_pred = torch.round(torch.sigmoid(val_logits))
            # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する.ここではaccも使う
            val_loss = loss_fn(val_logits, y_batch_val)
            val_acc = binary_acc(y_true=y_batch_val, y_pred=val_pred)
            # 3以降は勾配の計算に関わってくるため，検証やテストでは使わない
    
    # 10エポックごとの結果をまとめる
    if epoch % 10 == 0:
        print(f'Epoch: {epoch} | Train_loss: {loss:.5f}, Train_acc: {acc:.2f}%')
        print(f'Epoch: {epoch} | Val_loss: {val_loss:.5f}, Val_acc: {val_acc:.2f}%')
    # 最後に，train_loss，train_acc，val_loss，val_accをリストにまとめる
    train_loss_li.append(loss.item())
    train_acc_li.append(acc)
    val_loss_li.append(val_loss.item())
    val_acc_li.append(val_acc)

    # early_stoppingを適用
    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        # 一定epochだけval_lossが最低値を更新しなかった場合，ここに入り学習を終了
        break


##### 入力5-38

In [None]:
# 全400エポックで設定したところで途中でearly stopした結果

# 結局何エポックだったかを確認
print(len(val_loss_li))

fig = plt.figure(figsize=(10, 6))

# add_subplot(121)とは，1*2に図を並べる際の1番目を指定している
ax1 = fig.add_subplot(121)
ax1.set_xlabel('epochs', size=16)
ax1.set_ylabel('loss', size=16)
ax1.plot(train_loss_li, label='train_loss')
ax1.plot(val_loss_li, color='#ff7f0e', label='val_loss')
ax1.legend()

ax2 = fig.add_subplot(122)
ax2.set_ylabel('acc', size=16)
ax2.set_xlabel('epochs', size=16)
ax2.plot(train_acc_li, label='train_acc')
ax2.plot(val_acc_li, color='#ff7f0e', label='val_acc')
ax2.legend()

plt.show()

##### 入力5-39

In [None]:
# itertoolsでリストを平坦化する
import itertools

# early_stoppingした時のモデルは保存されている. これをロードする
model_path = 'checkpoint.pt'
model.load_state_dict(torch.load(model_path))

# 予測確率をリストにまとめる
test_prob_list = []
# 予測ラベルをリストにまとめる
test_pred_list = []

# テストデータセットの結果を確認する
model.eval()
# テストでは勾配が変動してはならないため，torch.no_grad()でテンソルの勾配計算ができないようにする.
with torch.no_grad():
    for X_batch_test in test_loader:
        X_batch_test = X_batch_test.to(device)
        test_logits = model(X_batch_test).squeeze()
        # 確率をリストに
        test_prob = torch.sigmoid(test_logits)
        test_prob_list.append(test_prob.cpu().numpy())
        # ラベルをリストに
        test_pred = torch.round(test_prob)
        test_pred_list.append(test_pred.cpu().numpy())

test_prob_list = list(itertools.chain.from_iterable(test_prob_list))
test_pred_list = list(itertools.chain.from_iterable(test_pred_list))

##### 入力5-40

In [None]:
from sklearn.metrics import auc, roc_curve

# ROC-AUC
fpr_test, tpr_test, thresholds_test = roc_curve(y_test, test_prob_list)
auc_test = auc(fpr_test, tpr_test)

# ROC曲線を描く
plt.plot([0, 1], [0, 1], color='black', linestyle='--')

plt.plot(
      fpr_test, tpr_test, label='EarlyStopping+NeuralNetwork (AUC = %.3f)' % auc_test
)
plt.fill_between(fpr_test, tpr_test, 0, alpha=0.1)

plt.legend()
plt.title('EarlyStopping+NeuralNetwork')
plt.xlabel('False_Positive_Rate')
plt.ylabel('True_Positive_Rate')
plt.grid(True)
plt.show()

##### 入力5-41

In [None]:
# classification_report
from sklearn.metrics import classification_report, confusion_matrix

print(confusion_matrix(y_test, test_pred_list))
print(classification_report(y_test, test_pred_list))

##### 入力5-42

In [None]:
import torch.nn as nn

class BinaryClassificationNet(nn.Module):
    # __init__()関数で使用するレイヤーを定義
    def __init__(self, trial, num_layers, num_features):
        super(BinaryClassificationNet, self).__init__()

        # for loopの作りやすさから，nn.Sequentialではなくnn.ModuleListに
        # appendしていく形をとる
        # 第1層
        # 入力特徴量は13，出力はnum_features[0]にしておいた
        self.layers = nn.ModuleList(
            [nn.Linear(in_features=13, out_features=num_features[0])]
        )
        self.layers.append(nn.BatchNorm1d(num_features[0]))
        self.layers.append(nn.ReLU())

        # 第2層以降
        # 先の出力はnum_features[0]だった.for文で層を重ねていく
        for i in range(1, num_layers):
            self.layers.append(
                nn.Linear(in_features=num_features[i - 1], out_features=num_features[i])
            )
            self.layers.append(nn.BatchNorm1d(num_features[i]))
            self.layers.append(nn.ReLU())
        # 最後に先の出力のnum_features[i]を1にする. これは0である確率1つだけということを言う
        self.layers_out = nn.Linear(in_features=num_features[i], out_features=1)

    # 関数forward()では定義済みレイヤーを呼び出す
    def forward(self, x):
        for i, l in enumerate(self.layers):
            # l(x)でモデルを使う
            x = l(x)
        # sigmoidは損失関数に含まれているので設定しなくてok
        x = self.layers_out(x)
        return x

##### 入力5-43

In [None]:
# どのoptimizerを使うかというところからも最適化する
import torch.optim as optim

def get_optimizer(trial, model):
    optimizer_names = ['Adam', 'RMSprop']
    optimizer_name = trial.suggest_categorical('optimizer', optimizer_names)
    weight_decay = trial.suggest_float('weight_decay', 1e-8, 1e-2, log=True)
    if optimizer_name == optimizer_names[0]:
        Adam_lr = trial.suggest_float('Adam_lr', 1e-5, 1e-1, log=True)
        optimizer = optim.Adam(
            model.parameters(), lr=Adam_lr, weight_decay=weight_decay
        )
    else:
        RMSprop_lr = trial.suggest_float('RMSprop_lr', 1e-5, 1e-1, log=True)
        optimizer = optim.RMSprop(
            model.parameters(), lr=RMSprop_lr, weight_decay=weight_decay
        )
    return optimizer

##### 入力5-44

In [None]:
# 損失関数も定義しておく
loss_fn = nn.BCEWithLogitsLoss()

def train(model, device, train_loader, optimizer):
    model.train()
    for epoch in range(EPOCHS):
        for X_batch_train, y_batch_train in train_loader:
            X_batch_train, y_batch_train = X_batch_train.to(device), y_batch_train.to(
                device
            )

            # 1. Forward関数を使って，図で言えば左から右の順伝播処理を作る
            y_logits = model(X_batch_train).squeeze()
            y_pred = torch.round(torch.sigmoid(y_logits))
            
            # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する
            loss = loss_fn(y_logits, y_batch_train)

            # 3. オプティマイザの勾配を0に初期化し，過去の勾配の影響を排除して現在の勾配を計算できるようにする
            optimizer.zero_grad()

            # 4. 誤差逆伝播法で損失関数の勾配を計算する
            loss.backward()

            # 5. オプティマイザで重みなどのパラメータ更新を行う
            optimizer.step()

def val(model, device, val_loader):
    model.eval()
    with torch.no_grad():
        for X_batch_val, y_batch_val in val_loader:
            X_batch_val, y_batch_val = X_batch_val.to(device), y_batch_val.to(device)

            # 1. Forward関数を使って，図で言えば左から右の順伝播処理を作る
            val_logits = model(X_batch_val).squeeze()
            val_pred = torch.round(torch.sigmoid(val_logits))
            # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する
            val_loss = loss_fn(val_logits, y_batch_val)
    return val_loss

##### 入力5-45

In [None]:
# エポック数は増やしすぎると計算量が上がる
# 5.1.3項の例で，30程度あればニューラルネットワークには十分に考えられたため30にしておくが，
# 時間がかかりすぎる場合は20などに減らすこともできる
EPOCHS = 30

def objective_nn(trial):
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

    # 層の数は3から7に
    num_layers = trial.suggest_int('num_layer', 3, 7)
    # 全結合層の特徴量数
    num_features = [
        int(trial.suggest_float(f'num_features_{i}', 16, 128, step=16))
        for i in range(num_layers)
    ]

    model = BinaryClassificationNet(trial, num_layers, num_features).to(device)
    optimizer = get_optimizer(trial, model)

    for step in range(EPOCHS):
        train(model, device, train_loader, optimizer)
        val_loss = val(model, device, val_loader)
    return val_loss

##### 入力5-46

In [None]:
from functools import partial

import optuna

obj_nn = partial(objective_nn)
# セッションの作成
sampler = optuna.samplers.TPESampler(seed=0)

# val_lossを最小化するためにdirection='minimize'にする
study_nn = optuna.create_study(sampler=sampler, direction='minimize')
# 回数を指定する.50回だとおよそ10分程度かかる
study_nn.optimize(obj_nn, n_trials=50)

##### 入力5-47

In [None]:
study_nn.best_value

##### 入力5-48

In [None]:
nn_params = study_nn.best_params
nn_params

##### 入力5-49

In [None]:
class BinaryClassificationNet(nn.Module):
    # __init__()関数で使用するレイヤーを定義
    def __init__(self):
    
        # 基底クラスとしてnn.moduleを継承する
        # ニューラルネットワークの基本機能が使えるようになるおまじないと考えてもよい
        super(BinaryClassificationNet, self).__init__()

        # for loopの作りやすさから，nn.Sequentialではなくnn.ModuleListに
        # appendしていく形をとる
        # 第1層
        # 入力特徴量は13，出力はnum_features[0]にしておいた
        self.layers = nn.ModuleList(
           [nn.Linear(in_features=13, out_features=num_features[0])]
        )
        self.layers.append(nn.BatchNorm1d(num_features[0]))
        self.layers.append(nn.ReLU())
        # 第2層以降
        # 先の出力はnum_features[0]だった.for文で層を重ねていく
        for i in range(1, num_layers):
           self.layers.append(
              nn.Linear(in_features=num_features[i - 1], out_features=num_features[i])
            )
           self.layers.append(nn.BatchNorm1d(num_features[i]))
           self.layers.append(nn.ReLU())

        # 最後に先の出力のnum_features[i]を1にする. これは0である確率1つだけということを言う
        self.layers_out = nn.Linear(in_features=num_features[i], out_features=1)
    
    # 関数forward()では定義済みレイヤーを呼び出す
    def forward(self, x):
        for i, l in enumerate(self.layers):
            # l(x)でモデルを使う
            x = l(x)
        # sigmoidは損失関数に含まれているので設定しなくてok
        x = self.layers_out(x)
        return x


##### 入力5-50

In [None]:
# 最適化したハイパーパラメータ情報を入れる
num_layers = nn_params['num_layer']
# リスト内包表記で各パラメータを用意
# num_featuresはint()で32.0→32としないとエラーを起こす
num_features = [
      int(nn_params[f'num_features_{i}']) for i in range(nn_params['num_layer'])
]

# モデルを構築
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model = BinaryClassificationNet()
# モデルをGPUへ
model.to(device)
print(model)

loss_fn = nn.BCEWithLogitsLoss()

# 最適化したoptimizerを使用する
optimizer_names = ['Adam', 'RMSprop']
weight_decay = nn_params['weight_decay']
if nn_params['optimizer'] == optimizer_names[0]:
    Adam_lr = nn_params['Adam_lr']
    optimizer = optim.Adam(model.parameters(), lr=Adam_lr, weight_decay=weight_decay)
else:
    RMSprop_lr = nn_params['RMSprop_lr']
    optimizer = optim.RMSprop(model.parameters(), lr=RMSprop_lr, weight_decay=weight_decay)

# 学習過程でうまく学習できているか監視するために，損失関数だけでなく
# accuracyも見ることにする. そのためにもaccuracyを確認する関数を定義する

def binary_acc(y_true, y_pred):
    # torch.eq()で2つのテンソルで値が一致する数をカウントする
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct / len(y_pred)) * 100
    return acc

##### 入力5-51

In [None]:
# エポック数は30にする
EPOCHS = 30
model.train()
# トレーニングと検証を同時に行うforループを作る
# この節ではまだテストデータセットは使わない

# 損失関数の出力とaccuracyはリストにまとめて後で図示する
train_loss_li = []
train_acc_li = []
val_loss_li = []
val_acc_li = []

# エポック: すべての入力データを何回使えたかを示す
for epoch in range(EPOCHS):

    # イテレーション: 重みを何度更新したかを示す. バッチサイズ64で，

    # 訓練では，まず，ミニバッチごとに分けられたデータを抽出する
    for X_batch_train, y_batch_train in train_loader:

        # ミニバッチのデータをGPUに適用する
        X_batch_train, y_batch_train = X_batch_train.to(device), y_batch_train.to(
            device
        )

        # 1. Forward関数を使って，図で言えば左から右の順伝播処理を作る
        y_logits = model(X_batch_train).squeeze()
        y_pred = torch.round(torch.sigmoid(y_logits))

        # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する.ここではaccも使う
        loss = loss_fn(y_logits, y_batch_train)
        acc = binary_acc(y_true=y_batch_train, y_pred=y_pred)

        # 3. オプティマイザの勾配を0に初期化し，過去の勾配の影響を排除して現在の勾配を計算でき るようにする
        optimizer.zero_grad()

        # 4. 誤差逆伝播法で損失関数の勾配を計算する
        loss.backward()

        # 5. オプティマイザで重みなどのパラメータ更新を行う
        optimizer.step()

    # 検証データでの確認
    # 訓練データの後に検証データを確認している
    model.eval()
    # torch.no_grad()でテンソルの勾配計算ができないようにする
    with torch.no_grad():
        for X_batch_val, y_batch_val in val_loader:
            X_batch_val, y_batch_val = X_batch_val.to(device), y_batch_val.to(device)

            # 1. Forward関数を使って，図で言えば左から右の順伝播処理を作る
            val_logits = model(X_batch_val).squeeze()
            val_pred = torch.round(torch.sigmoid(val_logits))
            # 2. 損失関数を使って，どの程度モデルが誤っているかを確認する.ここではaccも使う
            val_loss = loss_fn(val_logits, y_batch_val)
            val_acc = binary_acc(y_true=y_batch_val, y_pred=val_pred)
            # 3以降は勾配の計算に関わってくるため，検証やテストでは書かない
        if epoch % 2 == 0:
            print(
                f'Epoch: {epoch} | Train_loss: {loss:.5f}, Train_acc: {acc:.2f}% | Val_loss: {val_loss:.5f}, Val_acc: {val_acc:.2f}%'
            )
    val_loss_li.append(val_loss.item())
    val_acc_li.append(val_acc)

    train_loss_li.append(loss.item())
    train_acc_li.append(acc)

##### 入力5-52

In [None]:
fig = plt.figure(figsize=(10, 6))

ax1 = fig.add_subplot(121)
ax1.set_ylabel('loss', size=16)
ax1.set_ylabel('loss', size=16)
ax1.plot(train_loss_li, label='train_loss')
ax1.plot(val_loss_li, color='#ff7f0e', label='val_loss')
ax1.legend()

ax2 = fig.add_subplot(122)
ax2.set_ylabel('acc', size=16)
ax2.plot(train_acc_li, label='train_acc')
ax2.plot(val_acc_li, color='#ff7f0e', label='val_acc')
ax2.legend()

plt.show()

##### 入力5-53

In [None]:
# itertoolsでリストを平坦化する
import itertools

# 予測確率をリストにまとめる
test_prob_list = []
# 予測ラベルをリストにまとめる
test_pred_list = []

# テストデータセットの結果を確認する
model.eval()
# テストでは勾配が変動してはならないため，torch.no_grad()でテンソルの勾配計算ができないように する.
with torch.no_grad():
      for X_batch_test in test_loader:
          X_batch_test = X_batch_test.to(device)
          test_logits = model(X_batch_test).squeeze()
          # 確率をリストに
          test_prob = torch.sigmoid(test_logits)
          test_prob_list.append(test_prob.cpu().numpy())
          # ラベルをリストに
          test_pred = torch.round(test_prob)
          test_pred_list.append(test_pred.cpu().numpy())

test_prob_list = list(itertools.chain.from_iterable(test_prob_list))
test_pred_list = list(itertools.chain.from_iterable(test_pred_list))

##### 入力5-54

In [None]:
from sklearn.metrics import auc, roc_curve

# ROC-AUC
fpr_test, tpr_test, thresholds_test = roc_curve(y_test, test_prob_list)
auc_test = auc(fpr_test, tpr_test)

# ROC曲線を描く
plt.plot([0, 1], [0, 1], color='black', linestyle='--')

plt.plot(fpr_test, tpr_test, label='Optuna+NeuralNetwork (AUC = %.3f)' % auc_test)
plt.fill_between(fpr_test, tpr_test, 0, alpha=0.1)

plt.legend()
plt.title('Optuna+NeuralNetwork')
plt.xlabel('False_Positive_Rate')
plt.ylabel('True_Positive_Rate')
plt.grid(True)
plt.show()

##### 入力5-55

In [None]:
# classification_report
from sklearn.metrics import classification_report, confusion_matrix

print(confusion_matrix(y_test, test_pred_list))
print(classification_report(y_test, test_pred_list))