<a href="https://colab.research.google.com/github/maskot1977/tmd2022/blob/UU7bJsBfSEyE/tmd2022_6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

「AI創薬・ケモインフォマティクス入門」講義資料　（講師：小寺正明）

3月25日（土）19:40～21:10　第6回  「計算機実験2」

# RDKit インストール

In [None]:
!pip install rdkit-pypi

# 化合物データ取得

In [None]:
import pandas as pd

# csvからのデータ読み込み_
url = "https://raw.githubusercontent.com/maskot1977/toydata/main/data/data_18.csv"
database1 = pd.read_csv(url)
database1

# 回帰問題用目的変数とその分布

In [None]:
import matplotlib.pyplot as plt

plt.title("Melting point as continus value Y1")
Y1 = database1["Melting point"]
Y1.hist(bins=20)
plt.show()

# 分類用目的変数とその分布（練習のため）

In [None]:
import matplotlib.pyplot as plt
import numpy as np

Y2 = pd.DataFrame(
    np.where(
        database1["Melting point"]
        > database1["Melting point"].describe().median() * 0.9,
        1,
        0,
    ),
    columns=["Melting point as discrete value Y2"],
)
Y2.hist(bins=20)
plt.show()

# バイアスのある分類用目的変数とその分布（練習のため）

In [None]:
import numpy as np

Y3 = pd.DataFrame(
    np.where(
        database1["Melting point"]
        > database1["Melting point"].describe().median() * 2.2,
        1,
        0,
    ),
    columns=["Melting point as biased discrete value Y3"],
)
Y3.hist(bins=20)
plt.show()

# RDKit supporter

RDKit supporter は、RDKit や ML 周りで便利な関数やクラスを私が書き溜めたものです。インストールしてみましょう。

In [None]:
!pip install git+https://github.com/maskot1977/rdkit_supporter.git

# RDKit 記述子

In [None]:
%%time
import rdkit
from rdkit_supporter.descriptors import calc_descriptors

rdkit_df = calc_descriptors(database1["Open Babel SMILES"])
display(rdkit_df)

# 説明変数

今回は、説明変数 X として RDKit descriptors を用います。

In [None]:
X = rdkit_df

# 欠損値の補間の例

In [None]:
from sklearn.impute import KNNImputer

imputer = KNNImputer()
X = pd.DataFrame(imputer.fit_transform(X), columns=X.columns)

# データ分割

In [None]:
from sklearn.model_selection import train_test_split

# 訓練データ（広義）とテストデータに分割する
(
    X_train,
    X_test,
    Y1_train,
    Y1_test,
    Y2_train,
    Y2_test,
    Y3_train,
    Y3_test,
) = train_test_split(X, Y1, Y2, Y3, test_size=0.5, random_state=53, stratify=Y3)

# 訓練データ（狭義）と検証データに分割する
(X_tra, X_val, Y1_tra, Y1_val, Y2_tra, Y2_val, Y3_tra, Y3_val) = train_test_split(
    X_train,
    Y1_train,
    Y2_train,
    Y3_train,
    test_size=0.5,
    random_state=53,
    stratify=Y3_train,
)

# 結果表示用関数

In [None]:
import matplotlib.pyplot as plt


# 学習曲線の図示
def learning_curve(
    train_loss_history, valid_loss_history, train_score_history, valid_score_history
):
    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(12, 4))
    axes[0].plot(train_loss_history, label="Train Loss")
    axes[0].plot(valid_loss_history, label="Valid Loss")
    axes[0].set_yscale("log")
    axes[0].set_xlabel("Epoch")
    axes[0].set_ylabel("Loss")
    axes[0].grid()
    axes[0].legend()
    axes[1].plot(train_score_history, label="Train Score")
    axes[1].plot(valid_score_history, label="Valid Score")
    axes[1].set_xlabel("Epoch")
    axes[1].set_ylabel("Score")
    axes[1].set_ylim([0, 1])
    axes[1].set_yticks(np.linspace(0, 1, 11))
    axes[1].grid()
    axes[1].legend()
    plt.show()

In [None]:
import matplotlib.pyplot as plt


# 回帰問題用性能評価指標の図示
def regression_metrics(Y, Y_pred):
    scores = []
    scores.append(["MedAE", metrics.median_absolute_error(Y, Y_pred)])
    scores.append(["MAE", metrics.mean_absolute_error(Y, Y_pred)])
    scores.append(["RMSE", metrics.mean_squared_error(Y, Y_pred, squared=False)])
    r = np.corrcoef(Y, Y_pred)[0][1]
    r2 = metrics.r2_score(Y, Y_pred)
    y_max = max(Y.max(), Y_pred.max())
    y_min = min(Y.min(), Y_pred.min())
    y_height = abs(y_max - y_min) / 2
    fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(12, 3))
    axes[0].barh([x[0] for x in scores], [x[1] for x in scores])
    axes[0].set_xlabel("Score")
    axes[0].grid()
    axes[1].set_title("R={}".format(r))
    axes[1].scatter(Y, Y_pred, alpha=0.5, c=Y)
    axes[1].plot([y_min, y_max], [y_min, y_max], c="r")
    axes[1].set_xlabel("Y_true")
    axes[1].set_ylabel("Y_pred")
    axes[1].grid()
    axes[1].set_aspect("equal")
    axes[2].set_title("R2={}".format(r2))
    axes[2].scatter(Y, Y_pred - Y, alpha=0.5, c=Y)
    axes[2].plot([y_min, y_max], [0, 0], c="r")
    axes[2].set_xlabel("Y_true")
    axes[2].set_ylabel("Y_err")
    axes[2].set_ylim([-y_height, y_height])
    axes[2].grid()
    plt.show()

In [None]:
import matplotlib.pyplot as plt


# 分類問題用性能評価指標の図示
def classification_metrics(Y, Y_pred):
    scores = []
    precision = metrics.precision_score(Y, Y_pred)
    recall = metrics.recall_score(Y, Y_pred)
    scores.append(["MCC", metrics.matthews_corrcoef(Y, Y_pred)])
    scores.append(["Cohen's Kappa", metrics.cohen_kappa_score(Y, Y_pred)])
    scores.append(["F1", metrics.f1_score(Y, Y_pred)])
    scores.append(["Average Precision", metrics.average_precision_score(Y, Y_pred)])
    scores.append(["Precision", precision])
    scores.append(["Recall", recall])
    scores.append(["AUC", metrics.balanced_accuracy_score(Y, Y_pred)])
    # scores.append(["TopK ACC", metrics.top_k_accuracy_score(Y, Y_pred)])
    scores.append(["Balanced ACC", metrics.balanced_accuracy_score(Y, Y_pred)])
    scores.append(["ACC", metrics.accuracy_score(Y, Y_pred)])

    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(10, 4))
    axes[0].barh([x[0] for x in scores], [x[1] for x in scores])
    axes[0].set_xlim([0, 1])
    axes[0].set_xlabel("Score")
    axes[0].grid()

    tn, fp, fn, tp = metrics.confusion_matrix(Y, Y_pred).ravel()
    axes[1].set_title("tn, fp, fn, tp = {} {} {} {}".format(tn, fp, fn, tp))
    axes[1].bar(["Positive", "Negative"], [tp, tn])
    axes[1].bar(["Positive", "Negative"], [-fn, -fp])
    axes[1].grid()
    plt.show()

# PyTorchで回帰問題を解く （1層モデル）

In [None]:
import torch

# 　もし GPU が使用可能なら GPU を使う。もしなければ CPU を使う
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
import torch


# 1層モデル（回帰問題なら線形重回帰、分類問題ならロジスティック回帰に相当）
class SLP(torch.nn.Module):
    def __init__(self, n_input, n_output):
        super(SLP, self).__init__()
        self.l1 = torch.nn.Linear(n_input, n_output)  # 全結合層

    def forward(self, x):
        x = self.l1(x)
        return x

In [None]:
# 回帰問題用データセット Y1

# 訓練データ（狭義）を説明変数と目的変数の組にする
training_data1 = torch.utils.data.TensorDataset(
    torch.tensor(X_tra.values).float(),
    torch.tensor(Y1_tra.values.reshape((-1, 1))).float(),
)

# 検証データを説明変数と目的変数の組にする
validation_data1 = torch.utils.data.TensorDataset(
    torch.tensor(X_val.values).float(),
    torch.tensor(Y1_val.values.reshape((-1, 1))).float(),
)

# テストデータを説明変数と目的変数の組にする
test_data1 = torch.utils.data.TensorDataset(
    torch.tensor(X_test.values).float(),
    torch.tensor(Y1_test.values.reshape((-1, 1))).float(),
)

In [None]:
# 各種パラメーター

batch_size = 20
lr = 0.01
n_hidden = 100  # 1層モデルでは使わない
n_epoch = 100

# 学習モデル
model = SLP(len(training_data1[0][0]), 1).to(device)

In [None]:
# データをバッチごとに取り出すデータローダーにセットする
train_loader = torch.utils.data.DataLoader(
    training_data1, batch_size=batch_size, shuffle=True
)
valid_loader = torch.utils.data.DataLoader(
    validation_data1, batch_size=batch_size, shuffle=True
)
test_loader = torch.utils.data.DataLoader(
    test_data1, batch_size=batch_size, shuffle=True
)

In [None]:
from sklearn import metrics

measure = metrics.r2_score  # 回帰問題の評価関数

# 各種履歴
train_loss_history = []
valid_loss_history = []
train_score_history = []
valid_score_history = []

optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # 最適化手法（オプティマイザ）
criterion = torch.nn.MSELoss()  # 誤差関数

# n_epoch回繰り返す
for epoch in range(n_epoch):

    # トレーニング（訓練）
    model.train()
    train_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in train_loader:  # バッチを取り出す
        x = torch.autograd.Variable(x).to(device)  # 自動微分可能な変数に変換
        y = torch.autograd.Variable(y).to(device)  # 自動微分可能な変数に変換
        optimizer.zero_grad()  # オプティマイザの勾配を初期化
        y_pred = model(x).to(device)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        loss.backward()  # 誤差逆伝搬
        optimizer.step()  # オプティマイザを更新
        train_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    train_loss_history.append(train_loss)  # トレーニング誤差の履歴を記録

    train_score = measure(Y_pred, Y_true)  # トレーニングデータに対する評価関数のスコア
    train_score_history.append(train_score)  # 履歴を記録

    # バリデーション（検証）
    model.eval()
    valid_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in valid_loader:
        x = torch.autograd.Variable(x, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y = torch.autograd.Variable(y, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y_pred = model(x)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        valid_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    valid_loss_history.append(valid_loss)  # バリデーション誤差の履歴を記録

    valid_score = measure(Y_pred, Y_true)  # バリデーションデータに対する評価関数のスコア
    valid_score_history.append(valid_score)  # 履歴を記録

    if (epoch + 1) % (n_epoch / 10) == 0:
        print(  # 途中経過を表示
            "Epoch={}\tLoss(train)={:.4f}\tLoss(valid)={:.4f}\tScore(train)={:.4f}\tScore(valid)={:.4f} ".format(
                epoch + 1, train_loss, valid_loss, train_score, valid_score
            )
        )

    if min(valid_score_history) == valid_score:
        best_model = model  # ベストモデルを保存

In [None]:
# 学習曲線の描画
learning_curve(
    train_loss_history, valid_loss_history, train_score_history, valid_score_history
)

In [None]:
from sklearn import metrics

# テストデータによる性能評価
test_loss = 0
Y_pred = np.array([])
Y_true = np.array([])
for x, y in test_loader:
    x = torch.autograd.Variable(x, requires_grad=False).to(device)
    y = torch.autograd.Variable(y, requires_grad=False).to(device)
    y_pred = best_model(x)
    loss = criterion(y_pred, y)
    test_loss += loss.item() / x.shape[0]

    Y_pred = np.concatenate([Y_pred, y_pred.detach().cpu().numpy().flatten()])
    Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])

print(
    "Loss(test)={}\tScore(test)={}".format(test_loss, metrics.r2_score(Y_true, Y_pred))
)

In [None]:
regression_metrics(Y_true, Y_pred)

In [None]:
# 学習済みモデルのパラメーター
best_model.l1.weight, best_model.l1.bias

# PyTorchで回帰問題を解く （多層モデル）

In [None]:
import torch


# シンプルな多層パーセプトロン（3層）
class MLP(torch.nn.Module):
    def __init__(self, n_input, n_hidden, n_output):
        super(MLP, self).__init__()
        self.l1 = torch.nn.Linear(n_input, n_hidden)
        self.l2 = torch.nn.Linear(n_hidden, n_hidden)
        self.l3 = torch.nn.Linear(n_hidden, n_output)

    def forward(self, x):
        x = torch.relu(self.l1(x))
        x = torch.relu(self.l2(x))
        x = self.l3(x)
        return x

In [None]:
# 各種パラメーター

batch_size = 20
lr = 0.01
n_hidden = 100
n_epoch = 100

# 学習モデル
model = MLP(len(training_data1[0][0]), n_hidden, 1).to(device)

In [None]:
from sklearn import metrics

measure = metrics.r2_score  # 回帰問題の評価関数

# 各種履歴
train_loss_history = []
valid_loss_history = []
train_score_history = []
valid_score_history = []

optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # 最適化手法（オプティマイザ）
criterion = torch.nn.MSELoss()  # 誤差関数

# n_epoch回繰り返す
for epoch in range(n_epoch):

    # トレーニング（訓練）
    model.train()
    train_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in train_loader:  # バッチを取り出す
        x = torch.autograd.Variable(x).to(device)  # 自動微分可能な変数に変換
        y = torch.autograd.Variable(y).to(device)  # 自動微分可能な変数に変換
        optimizer.zero_grad()  # オプティマイザの勾配を初期化
        y_pred = model(x).to(device)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        loss.backward()  # 誤差逆伝搬
        optimizer.step()  # オプティマイザを更新
        train_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    train_loss_history.append(train_loss)  # トレーニング誤差の履歴を記録

    train_score = measure(Y_pred, Y_true)  # トレーニングデータに対する評価関数のスコア
    train_score_history.append(train_score)  # 履歴を記録

    # バリデーション（検証）
    model.eval()
    valid_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in valid_loader:
        x = torch.autograd.Variable(x, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y = torch.autograd.Variable(y, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y_pred = model(x)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        valid_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    valid_loss_history.append(valid_loss)  # バリデーション誤差の履歴を記録

    valid_score = measure(Y_pred, Y_true)  # バリデーションデータに対する評価関数のスコア
    valid_score_history.append(valid_score)  # 履歴を記録

    if (epoch + 1) % (n_epoch / 10) == 0:
        print(  # 途中経過を表示
            "Epoch={}\tLoss(train)={:.4f}\tLoss(valid)={:.4f}\tScore(train)={:.4f}\tScore(valid)={:.4f} ".format(
                epoch + 1, train_loss, valid_loss, train_score, valid_score
            )
        )

    if min(valid_score_history) == valid_score:
        best_model = model  # ベストモデルを保存

In [None]:
# 学習曲線の描画
learning_curve(
    train_loss_history, valid_loss_history, train_score_history, valid_score_history
)

In [None]:
from sklearn import metrics

# テストデータによる性能評価
test_loss = 0
Y_pred = np.array([])
Y_true = np.array([])
for x, y in test_loader:
    x = torch.autograd.Variable(x, requires_grad=False).to(device)
    y = torch.autograd.Variable(y, requires_grad=False).to(device)
    y_pred = best_model(x)
    loss = criterion(y_pred, y)
    test_loss += loss.item() / x.shape[0]

    Y_pred = np.concatenate([Y_pred, y_pred.detach().cpu().numpy().flatten()])
    Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])

print(
    "Loss(test)={}\tScore(test)={}".format(test_loss, metrics.r2_score(Y_true, Y_pred))
)

In [None]:
regression_metrics(Y_true, Y_pred)

# PyTorchで分類問題を解く （1層モデル）

In [None]:
import torch

training_data2 = torch.utils.data.TensorDataset(
    torch.tensor(X_tra.values).float(),
    torch.tensor(Y2_tra.values.reshape((-1, 1))).float(),
)

validation_data2 = torch.utils.data.TensorDataset(
    torch.tensor(X_val.values).float(),
    torch.tensor(Y2_val.values.reshape((-1, 1))).float(),
)

test_data2 = torch.utils.data.TensorDataset(
    torch.tensor(X_test.values).float(),
    torch.tensor(Y2_test.values.reshape((-1, 1))).float(),
)

In [None]:
batch_size = 20
lr = 0.01
n_hidden = 100  # 1層モデルでは使わない
n_epoch = 100

# 学習モデル
model = SLP(len(training_data2[0][0]), 1).to(device)

In [None]:
train_loader = torch.utils.data.DataLoader(
    training_data2, batch_size=batch_size, shuffle=True
)
valid_loader = torch.utils.data.DataLoader(
    validation_data2, batch_size=batch_size, shuffle=True
)
test_loader = torch.utils.data.DataLoader(
    test_data2, batch_size=batch_size, shuffle=True
)

In [None]:
from sklearn import metrics

measure = metrics.f1_score  # 分類問題の評価関数

# 各種履歴
train_loss_history = []
valid_loss_history = []
train_score_history = []
valid_score_history = []

optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # 最適化手法（オプティマイザ）
criterion = torch.nn.MSELoss()  # 誤差関数

# n_epoch回繰り返す
for epoch in range(n_epoch):

    # トレーニング（訓練）
    model.train()
    train_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in train_loader:  # バッチを取り出す
        x = torch.autograd.Variable(x).to(device)  # 自動微分可能な変数に変換
        y = torch.autograd.Variable(y).to(device)  # 自動微分可能な変数に変換
        optimizer.zero_grad()  # オプティマイザの勾配を初期化
        y_pred = model(x).to(device)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        loss.backward()  # 誤差逆伝搬
        optimizer.step()  # オプティマイザを更新
        train_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    train_loss_history.append(train_loss)  # トレーニング誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    train_score = measure(Y_pred, Y_true)  # トレーニングデータに対する評価関数のスコア
    train_score_history.append(train_score)  # 履歴を記録

    # バリデーション（検証）
    model.eval()
    valid_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in valid_loader:
        x = torch.autograd.Variable(x, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y = torch.autograd.Variable(y, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y_pred = model(x)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        valid_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    valid_loss_history.append(valid_loss)  # バリデーション誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    valid_score = measure(Y_pred, Y_true)  # バリデーションデータに対する評価関数のスコア
    valid_score_history.append(valid_score)  # 履歴を記録

    if (epoch + 1) % (n_epoch / 10) == 0:
        print(  # 途中経過を表示
            "Epoch={}\tLoss(train)={:.4f}\tLoss(valid)={:.4f}\tScore(train)={:.4f}\tScore(valid)={:.4f} ".format(
                epoch + 1, train_loss, valid_loss, train_score, valid_score
            )
        )

    if min(valid_score_history) == valid_score:
        best_model = model  # ベストモデルを保存

In [None]:
# 学習曲線の描画
learning_curve(
    train_loss_history, valid_loss_history, train_score_history, valid_score_history
)

In [None]:
from sklearn import metrics

test_loss = 0
n_data = 0
Y_pred = np.array([])
Y_true = np.array([])
for x, y in test_loader:
    x = torch.autograd.Variable(x, requires_grad=False).to(device)
    y = torch.autograd.Variable(y, requires_grad=False).to(device)
    y_pred = best_model(x)
    loss = criterion(y_pred, y)
    test_loss += loss.item()
    n_data += x.shape[0]

    Y_pred = np.concatenate([Y_pred, y_pred.detach().cpu().numpy().flatten()])
    Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])

Y_pred = np.where(Y_pred >= 0.5, 1, 0)
test_loss /= n_data
print(test_loss, metrics.f1_score(Y_true, Y_pred))

In [None]:
classification_metrics(Y_true, Y_pred)

# PyTorchで分類問題を解く （多層モデル）

In [None]:
batch_size = 20
lr = 0.01
n_hidden = 100
n_epoch = 100

model = MLP(len(training_data2[0][0]), n_hidden, 1).to(device)

In [None]:
from sklearn import metrics

measure = metrics.f1_score  # 分類問題の評価関数

# 各種履歴
train_loss_history = []
valid_loss_history = []
train_score_history = []
valid_score_history = []

optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # 最適化手法（オプティマイザ）
criterion = torch.nn.MSELoss()  # 誤差関数

# n_epoch回繰り返す
for epoch in range(n_epoch):

    # トレーニング（訓練）
    model.train()
    train_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in train_loader:  # バッチを取り出す
        x = torch.autograd.Variable(x).to(device)  # 自動微分可能な変数に変換
        y = torch.autograd.Variable(y).to(device)  # 自動微分可能な変数に変換
        optimizer.zero_grad()  # オプティマイザの勾配を初期化
        y_pred = model(x).to(device)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        loss.backward()  # 誤差逆伝搬
        optimizer.step()  # オプティマイザを更新
        train_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    train_loss_history.append(train_loss)  # トレーニング誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    train_score = measure(Y_pred, Y_true)  # トレーニングデータに対する評価関数のスコア
    train_score_history.append(train_score)  # 履歴を記録

    # バリデーション（検証）
    model.eval()
    valid_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in valid_loader:
        x = torch.autograd.Variable(x, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y = torch.autograd.Variable(y, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y_pred = model(x)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        valid_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    valid_loss_history.append(valid_loss)  # バリデーション誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    valid_score = measure(Y_pred, Y_true)  # バリデーションデータに対する評価関数のスコア
    valid_score_history.append(valid_score)  # 履歴を記録

    if (epoch + 1) % (n_epoch / 10) == 0:
        print(  # 途中経過を表示
            "Epoch={}\tLoss(train)={:.4f}\tLoss(valid)={:.4f}\tScore(train)={:.4f}\tScore(valid)={:.4f} ".format(
                epoch + 1, train_loss, valid_loss, train_score, valid_score
            )
        )

    if min(valid_score_history) == valid_score:
        best_model = model  # ベストモデルを保存

In [None]:
# 学習曲線の描画
learning_curve(
    train_loss_history, valid_loss_history, train_score_history, valid_score_history
)

In [None]:
from sklearn import metrics

test_loss = 0
n_data = 0
Y_pred = np.array([])
Y_true = np.array([])
for x, y in test_loader:
    x = torch.autograd.Variable(x, requires_grad=False).to(device)
    y = torch.autograd.Variable(y, requires_grad=False).to(device)
    y_pred = best_model(x)
    loss = criterion(y_pred, y)
    test_loss += loss.item()
    n_data += x.shape[0]

    Y_pred = np.concatenate([Y_pred, y_pred.detach().cpu().numpy().flatten()])
    Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])

Y_pred = np.where(Y_pred >= 0.5, 1, 0)
test_loss /= n_data
print(test_loss, metrics.f1_score(Y_true, Y_pred))

In [None]:
classification_metrics(Y_true, Y_pred)

# PyTorch 回帰問題（バイアスあり）

In [None]:
import torch

training_data3 = torch.utils.data.TensorDataset(
    torch.tensor(X_tra.values).float(),
    torch.tensor(Y3_tra.values.reshape((-1, 1))).float(),
)

validation_data3 = torch.utils.data.TensorDataset(
    torch.tensor(X_val.values).float(),
    torch.tensor(Y3_val.values.reshape((-1, 1))).float(),
)

test_data3 = torch.utils.data.TensorDataset(
    torch.tensor(X_test.values).float(),
    torch.tensor(Y3_test.values.reshape((-1, 1))).float(),
)

In [None]:
batch_size = 20
lr = 0.01
n_hidden = 100
n_epoch = 100

model = MLP(len(training_data3[0][0]), n_hidden, 1).to(device)

In [None]:
train_loader = torch.utils.data.DataLoader(
    training_data3, batch_size=batch_size, shuffle=True
)
valid_loader = torch.utils.data.DataLoader(
    validation_data3, batch_size=batch_size, shuffle=True
)
test_loader = torch.utils.data.DataLoader(
    test_data3, batch_size=batch_size, shuffle=True
)

In [None]:
from sklearn import metrics

measure = metrics.f1_score  # 分類問題の評価関数

# 各種履歴
train_loss_history = []
valid_loss_history = []
train_score_history = []
valid_score_history = []

optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # 最適化手法（オプティマイザ）
criterion = torch.nn.MSELoss()  # 誤差関数

# n_epoch回繰り返す
for epoch in range(n_epoch):

    # トレーニング（訓練）
    model.train()
    train_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in train_loader:  # バッチを取り出す
        x = torch.autograd.Variable(x).to(device)  # 自動微分可能な変数に変換
        y = torch.autograd.Variable(y).to(device)  # 自動微分可能な変数に変換
        optimizer.zero_grad()  # オプティマイザの勾配を初期化
        y_pred = model(x).to(device)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        loss.backward()  # 誤差逆伝搬
        optimizer.step()  # オプティマイザを更新
        train_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    train_loss_history.append(train_loss)  # トレーニング誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    train_score = measure(Y_pred, Y_true)  # トレーニングデータに対する評価関数のスコア
    train_score_history.append(train_score)  # 履歴を記録

    # バリデーション（検証）
    model.eval()
    valid_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in valid_loader:
        x = torch.autograd.Variable(x, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y = torch.autograd.Variable(y, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y_pred = model(x)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        valid_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    valid_loss_history.append(valid_loss)  # バリデーション誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    valid_score = measure(Y_pred, Y_true)  # バリデーションデータに対する評価関数のスコア
    valid_score_history.append(valid_score)  # 履歴を記録

    if (epoch + 1) % (n_epoch / 10) == 0:
        print(  # 途中経過を表示
            "Epoch={}\tLoss(train)={:.4f}\tLoss(valid)={:.4f}\tScore(train)={:.4f}\tScore(valid)={:.4f} ".format(
                epoch + 1, train_loss, valid_loss, train_score, valid_score
            )
        )

    if min(valid_score_history) == valid_score:
        best_model = model  # ベストモデルを保存

In [None]:
# 学習曲線の描画
learning_curve(
    train_loss_history, valid_loss_history, train_score_history, valid_score_history
)

In [None]:
from sklearn import metrics

test_loss = 0
n_data = 0
Y_pred = np.array([])
Y_true = np.array([])
for x, y in test_loader:
    x = torch.autograd.Variable(x, requires_grad=False).to(device)
    y = torch.autograd.Variable(y, requires_grad=False).to(device)
    y_pred = best_model(x)
    loss = criterion(y_pred, y)
    test_loss += loss.item()
    n_data += x.shape[0]

    Y_pred = np.concatenate([Y_pred, y_pred.detach().cpu().numpy().flatten()])
    Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])

Y_pred = np.where(Y_pred >= 0.5, 1, 0)
test_loss /= n_data
print(test_loss, metrics.f1_score(Y_true, Y_pred))

In [None]:
classification_metrics(Y_true, Y_pred)

# Optutnaによるチューニング

In [None]:
# 以下の２つの設定は必要に応じて適宜調整してください。
dateflag = "0316b"  # 解析日を記録するための変数
MODEL_PATH = "./drive/MyDrive/tmd2022-6/"  # データの保存場所を指定するための変数
learning_time_limit = 600  # １つの学習器あたりに許す最大の学習時間（秒）
timeout_optuna = 600  # Optuna による反復計算に許す最大の学習時間（秒）
n_trials_optuna = 2  # Optuna による反復計算の最大回数（普通は100や1000などの数字を入れる）

In [None]:
# Google Colaboratory から Google Drive にマウント

from google.colab import drive

drive.mount("/content/drive")

In [None]:
# もしデータ保存場所がなければ作る

import os

if not os.path.exists(MODEL_PATH):
    os.makedirs(MODEL_PATH)

In [None]:
!pip install optuna

In [None]:
import torch


# 層の深さまで調節できるモデル
class MLP(torch.nn.Module):
    def __init__(self, n_input, n_hidden, n_output, n_layer=3, p_dropout=0.5):
        super(MLP, self).__init__()
        self.l = torch.nn.ModuleList([])
        self.l.append(torch.nn.Linear(n_input, n_hidden))  # 全結合層（第1層）
        for _ in range(int(n_layer) - 2):
            self.l.append(torch.nn.Linear(n_hidden, n_hidden))  # 全結合層（中間層）
        self.l.append(torch.nn.Linear(n_hidden, n_output))  # 全結合層（最終層）
        self.dropout = torch.nn.Dropout(p_dropout)  # ドロップアウト層

    def forward(self, x):
        for i in range(len(self.l) - 1):
            x = torch.relu(self.l[i](x))
            x = self.dropout(x)
        x = self.l[-1](x)
        return x

In [None]:
from sklearn import metrics


# 層の深さまで調節できるモデルをハイパラチューニングする
class BestMLP:
    def __init__(self, training_data, validation_data, measure=metrics.f1_score):
        self.best_score = None
        self.best_model = None
        self.best_train_loss = None
        self.best_valid_loss = None
        self.training_data = training_data
        self.validation_data = validation_data
        self.measure = measure

    def __call__(self, trial):
        n_layer = trial.suggest_int("n_layer", 3, 10)
        n_hidden = trial.suggest_int("n_hidden", 10, 100)
        batch_size = trial.suggest_int("batch_size", 10, 100)
        lr = trial.suggest_float("lr", 1e-4, 1e-1)
        p_dropout = trial.suggest_float("p_dropout", 0.0, 1.0)
        n_epoch = 100

        valid_score_history = []

        model = MLP(
            len(training_data[0][0]), n_hidden, 1, n_layer=n_layer, p_dropout=p_dropout
        ).to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        criterion = torch.nn.MSELoss()
        train_loader = torch.utils.data.DataLoader(
            self.training_data, batch_size=batch_size, shuffle=True
        )
        valid_loader = torch.utils.data.DataLoader(
            self.validation_data, batch_size=batch_size, shuffle=True
        )

        for epoch in range(n_epoch):

            for x, y in train_loader:
                x = torch.autograd.Variable(x).to(device)
                y = torch.autograd.Variable(y).to(device)
                optimizer.zero_grad()
                y_pred = model(x)
                loss = criterion(y_pred, y)
                loss.backward()
                optimizer.step()

            Y_pred = np.array([])
            Y_true = np.array([])
            for x, y in valid_loader:
                x = torch.autograd.Variable(x, requires_grad=False).to(device)
                y = torch.autograd.Variable(y, requires_grad=False).to(device)
                y_pred = model(x)
                loss = criterion(y_pred, y)

                Y_pred = np.concatenate(
                    [Y_pred, y_pred.detach().cpu().numpy().flatten()]
                )
                Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])

            Y_pred = np.where(Y_pred >= 0.5, 1, 0)
            score = self.measure(Y_pred, Y_true)
            valid_score_history.append(score)

        return max(valid_score_history)

In [None]:
import optuna

study_name = MODEL_PATH + dateflag
training_data = training_data3
objective = BestMLP(training_data3, validation_data3)

# 学習環境を立ち上げる
study = optuna.create_study(
    study_name=dateflag,
    storage="sqlite:///" + study_name + ".sql",
    load_if_exists=True,
    directions=["maximize"],
)

In [None]:
study.optimize(
    objective,
    n_trials=2,
    show_progress_bar=True,
)

In [None]:
# ベスト結果を出したパラメーターを確認する

best_trials_df = []
for trial in study.best_trials:
    params = trial.params
    params["score"] = trial.values[0]
    best_trials_df.append(params)

best_trials_df = pd.DataFrame(best_trials_df)
best_trials_df

In [None]:
# すべてのトライアルを確認する

trials_df = []
for trial in study.trials:
    if trial.values is not None:
        params = trial.params
        params["score"] = trial.values[0]
        trials_df.append(params)

trials_df = pd.DataFrame(trials_df)
trials_df

In [None]:
# スコアとパラメータの履歴を確認する

for x in trials_df.columns:
    for y in ["score"]:
        if x == y:
            continue
        plt.scatter(trials_df[x], trials_df[y], c=trials_df.index, cmap="Blues")
        plt.colorbar()
        plt.xlabel(x)
        plt.ylabel(y)
        plt.grid()
        plt.show()

# ベストパラメータを用いて再学習

In [None]:
study.best_params

In [None]:
n_hidden = study.best_params["n_hidden"]
lr = study.best_params["lr"]
batch_size = study.best_params["batch_size"]
n_layer = study.best_params["n_layer"]
p_dropout = study.best_params["p_dropout"]
n_epoch = 100

model = MLP(
    len(training_data3[0][0]), n_hidden, 1, n_layer=n_layer, p_dropout=p_dropout
).to(device)

In [None]:
from sklearn import metrics

measure = metrics.f1_score  # 分類問題の評価関数

# 各種履歴
train_loss_history = []
valid_loss_history = []
train_score_history = []
valid_score_history = []

optimizer = torch.optim.Adam(model.parameters(), lr=lr)  # 最適化手法（オプティマイザ）
criterion = torch.nn.MSELoss()  # 誤差関数

# n_epoch回繰り返す
for epoch in range(n_epoch):

    # トレーニング（訓練）
    model.train()
    train_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in train_loader:  # バッチを取り出す
        x = torch.autograd.Variable(x).to(device)  # 自動微分可能な変数に変換
        y = torch.autograd.Variable(y).to(device)  # 自動微分可能な変数に変換
        optimizer.zero_grad()  # オプティマイザの勾配を初期化
        y_pred = model(x).to(device)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        loss.backward()  # 誤差逆伝搬
        optimizer.step()  # オプティマイザを更新
        train_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    train_loss_history.append(train_loss)  # トレーニング誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    train_score = measure(Y_pred, Y_true)  # トレーニングデータに対する評価関数のスコア
    train_score_history.append(train_score)  # 履歴を記録

    # バリデーション（検証）
    model.eval()
    valid_loss = 0
    Y_pred = np.array([])
    Y_true = np.array([])
    for x, y in valid_loader:
        x = torch.autograd.Variable(x, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y = torch.autograd.Variable(y, requires_grad=False).to(
            device
        )  # 自動微分は必要ないが同じ形式の変数を用意する
        y_pred = model(x)  # 予測
        loss = criterion(y_pred, y)  # 誤差を計算
        valid_loss += loss.item() / x.shape[0]  # そのバッチの誤差を蓄積
        Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])  # 実測値を記録
        Y_pred = np.concatenate(
            [Y_pred, y_pred.detach().cpu().numpy().flatten()]
        )  # 予測値を記録

    valid_loss_history.append(valid_loss)  # バリデーション誤差の履歴を記録

    Y_pred = np.where(Y_pred >= 0.5, 1, 0)  # 分類モデル用に 0 or 1 に変換
    valid_score = measure(Y_pred, Y_true)  # バリデーションデータに対する評価関数のスコア
    valid_score_history.append(valid_score)  # 履歴を記録

    if (epoch + 1) % (n_epoch / 10) == 0:
        print(  # 途中経過を表示
            "Epoch={}\tLoss(train)={:.4f}\tLoss(valid)={:.4f}\tScore(train)={:.4f}\tScore(valid)={:.4f} ".format(
                epoch + 1, train_loss, valid_loss, train_score, valid_score
            )
        )

    if min(valid_score_history) == valid_score:
        best_model = model  # ベストモデルを保存

In [None]:
# 学習曲線の描画
learning_curve(
    train_loss_history, valid_loss_history, train_score_history, valid_score_history
)

In [None]:
from sklearn import metrics

test_loss = 0
n_data = 0
Y_pred = np.array([])
Y_true = np.array([])
for x, y in test_loader:
    x = torch.autograd.Variable(x, requires_grad=False).to(device)
    y = torch.autograd.Variable(y, requires_grad=False).to(device)
    y_pred = best_model(x)
    loss = criterion(y_pred, y)
    test_loss += loss.item()
    n_data += x.shape[0]

    Y_pred = np.concatenate([Y_pred, y_pred.detach().cpu().numpy().flatten()])
    Y_true = np.concatenate([Y_true, y.detach().cpu().numpy().flatten()])

Y_pred = np.where(Y_pred >= 0.5, 1, 0)
test_loss /= n_data
print(test_loss, metrics.f1_score(Y_true, Y_pred))

In [None]:
classification_metrics(Y_true, Y_pred)