
必須ではありませんが、アニメーション作成にpillowを用います。
Anacondaではインストールされているはずですが、
condaを用いたインストール方法についてはこちらをご参照ください。
https://anaconda.org/anaconda/pillow


In [None]:
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import warnings
import pandas as pd
import random
from sklearn.gaussian_process.kernels import RBF
from sklearn.gaussian_process import GaussianProcessRegressor, kernels
import numpy as np
import os
import matplotlib.pyplot as plt
from copy import deepcopy
import seaborn as sns
import IPython

# warning表示を消しておく
warnings.filterwarnings('ignore')

In [None]:
DATA_NAME = "Carbon8" # "Carbon"は炭素結晶構造を用いる。下のget_reordered_data()で定義した他のデータも取得できる。
ACQ_FUNC = "UCB" # UCB or TS

OPTIMIZE = True  # RBFカーネルのガウス過程回帰でカーネルのoptmizeを行うか。
LENGTH_SCALE = 2 # RBFカーネルの長さスケール初期値

PLOTIT = True # 結果の可視化を行う。
MAXITERATION = 50 # ベイズ最適化最大繰り返し回数
NSELECT_INITIAL = 8 # 初期訓練データ選択用
RAND_SEED_INITIAL = 0 # 初期訓練データ選択用乱数SEED

# 獲得関数のパラメタの違いを吸収するための変数
# そしてイメージファイル名のパラメタも入れておく。
METADATA = {"outputdir": "image_executed", "prefix": "BayesianOpt_materilals", 
              "dataname": DATA_NAME, "acq": ACQ_FUNC, "xlabel": "index",
              "rand_seed_ts": 0, "v": 0.3, "xi": 0.1}
# 獲得関数のパラメタ
# v = 0.3 # for UCB
# xi = 0.1 # for EI and PI
# rand_seed_ts = 0　 # TSのための乱数SEED

私のPython環境でcarbonに対して予め実行した結果を書いておきます。
ITERはindex=0が見つかるまでの回数を示します。各自試してみてください。

| ACQ | NSELCT_INITIAL |RAND_SEED_INITIAL | ITER |
|---|---|---|---|
| UCB | 5 | 0 | 8 |
| UCB | 5 | 1 | 11 |
| UCB | 5 | 2 | 5 |
| UCB | 8 | 0 | 21 |
| UCB | 8 | 1 | 13 |
| UCB | 8 | 2 | 18 |
| UCB | 10 | 0 | 25 |
| UCB | 10 | 1 | 6 |
| UCB | 10 |2 | 18 |
| TS | 5 | 0 | 5 |
| TS | 5 | 1 | 12 |
| TS | 5 | 1 | 10 |
| TS | 8 | 0 | 20 |
| TS | 8 | 1 |  23 |
| TS | 8 | 2 | 14  |
| TS | 10 | 0 | 24 |
| TS | 10| 1 | 20 |
| TS | 10 | 2 | 29 |

In [None]:
def get_reordered_data(sample):
    """データ取得.

    sample は"Carbon", "ReCo", "ZB_WZ_all", "ZB_WZ_3"を取れる．

    Args:
        sample (str): データ名

    Returns:
        tuples containing

        - pd.DataFrame: データ
        - list[str]: 説明変数名リスト
        - str: 目的変数名
    """
    import get_data
    
    if sample == "Carbon8":
        df, descriptor_names, target_name = get_data.load('Carbon8_minusE')
        df = df.sort_values(
            by=target_name, ascending=False).reset_index(drop=True)
    elif sample == "ReCo":
        df, descriptor_names, target_name = get_data.load('Carbon8_minusE')
        mata_labels = ['name', 'polytyp', 'ref',
                       'author', 'link', 'comment', 'polytyp2']
        df = df.sort_values(
            by=target_name, ascending=False).reset_index(drop=True)
    elif sample == "ZB_WZ_all":
        df, descriptor_names, target_name = get_data.load('ZB_WZ_all')
        df[target_name] = - df[target_name]  # 最大値は離れているので最小値にする．
        df = df.sort_values(
            by=target_name, ascending=False).reset_index(drop=True)
    elif sample == "ZB_WZ_3":
        df, descriptor_names, target_name = get_data.load('ZB_WZ_3')
        df[target_name] = - df[target_name]  # 最大値は離れているので最小値にする．
        df = df.sort_values(
            by=target_name, ascending=False).reset_index(drop=True)
    else:
        raise RuntimeError(f'unknonw data_name={sample}')
    print("df.shape",df.shape)
    return df, descriptor_names, target_name


g_df, g_descriptor_names, g_target_name = get_reordered_data(DATA_NAME)


In [None]:
if DATA_NAME == "Carbon8":
    
    display(g_df[["key", "minus_energy", "polytype"]][:10])
    # 炭素系の場合は一部データにpolytypeが入っている。
    # 論文の値を使えないので計算し直している。
    # 計算精度が悪いので、graphiteが最もエネルギーが低くなっていない。

In [None]:
def evaluate_break_condition(train):
    """evaluate the conditon to break the loop

    Args:
        train (list[int]): training index

    Returns:
        bool: True if train contains 0 or 1
    """
    # io : list of actions
    if 0 in train:
        return True
    return False


In [None]:
if False:
    # pairplotの表示
    g_columns = deepcopy(g_descriptor_names)
    g_columns.append(g_target_name)
    sns.pairplot(g_df[g_columns])


In [None]:
def get_Xy(df, descriptor_names, target_name):
    """
        説明変数、目的変数の取得と規格化。
    
    """
    Xraw = df.loc[:, descriptor_names].values
    yraw = df.loc[:, target_name].values

    # standardize
    scalerX = StandardScaler()
    X = scalerX.fit_transform(Xraw) # 規格化された説明変数
    scalery = MinMaxScaler()
    y = scalery.fit_transform(yraw.reshape(-1, 1)).reshape(-1) # 可視化の都合で目的変数を[0:1]に直す。
    return X, y, scalerX, scalery

g_X, g_y, g_scalerX, g_scalery = get_Xy(g_df, g_descriptor_names, g_target_name)


In [None]:
# 最大値を探す問題設定としている。

# インデックス vs 目的変数の可視化
# 炭素結晶構造の場合は目的変数がminus energyなのでg_y最大がenergy最小の構造を示す。
if PLOTIT:
    plt.plot(g_y)

In [None]:
if PLOTIT:
    plt.plot(g_X) # インデックス vs Xの可視化


![](image_text/150_34.png)

In [None]:
from BO_misc import plot_GPR # 表示用の補助関数
from scipy.stats import norm

def search_candidate_UCB(it, train, X, y, reg, param, plot=True):
    """search next action in the UCB method

    Args:
        it (int): the number of iteration
        train (np.array): the index of training data
        X (np.array): descriptor
        y (np.array): target values
        reg (regressor): regressor
        param (dict): parameter of UCB
        plot (bool): True if show image

    Returns:
        int: next action
    """
    # GPR training data setの作成
    Xtrain = X[train]
    ytrain = y[train]
    reg.fit(Xtrain, ytrain)
    print("kernel=", reg.kernel_)
    yp_mean, yp_std = reg.predict(X, return_std=True)
    v = param["v"]
    acq = yp_mean + yp_std*np.sqrt(v*it)
    io = np.argsort(acq)[::-1]
    if io[0] in train:
        # train listに無い最も小さいindexを探す
        print("search the other best candidate")
        for ia1 in io:
            if ia1 not in train:
                ia = ia1
                break
    else:
        ia = io[0]
    if plot:
        idxall = list(range(y.shape[0]))
        plot_GPR(idxall, y, train, ytrain, yp_mean, yp_std, acq, it, ia=ia, metadata=METADATA)
    return ia


def search_candidate_TS(it, train, X, y, reg, param, plot=True):
    """search next action in the TS method

    Args:
        it (int): the number of iteration
        train (np.array): the index of training data
        X (np.array): descriptor
        y (np.array): target values
        reg (regressor): regressor
        param (dict): TS parameter
        plot (bool): True if showing image

    Returns:
        int: next action
    """
    rand_seed_ts = param["rand_seed_ts"]
    # GPR training data setの作成
    Xtrain = X[train]
    ytrain = y[train]
    reg.fit(Xtrain, ytrain)
    print("kernel=", reg.kernel_)
    y_mean, y_std = reg.predict(X, return_std=True)

    # draw a DATA_NAME from GPR, random_stateを指定しないと毎回random_state＝０になる．
    acq = reg.sample_y(X, random_state=rand_seed_ts+it)
    acq = acq.reshape(-1)
    if len(acq.shape) != 1:
        print("acq.shape", acq.shape)
        raise
    io = np.argsort(acq)[::-1]
    if io[0] in train:
        # train listに無い最も小さいindexを探す
        print(io[0], "in training set, search the other best candidate")
        for ia1 in io:
            if ia1 not in train:
                ia = ia1
                break
    else:
        ia = io[0]
    if plot:
        idxall = list(range(y.shape[0]))
        plot_GPR(idxall, y, train, ytrain, y_mean, y_std, acq, it, ia=ia, metadata=METADATA)
    return ia

def search_candidate_EI(it, train, X, y, reg, param, plot=True):
    """search next action in the EI method

    Args:
        it (int): the number of iteration
        train (np.array): the index of training data
        X (np.array): descriptor
        y (np.array): target values
        reg (regressor): regressor
        param (dict): EI parameter
        plot (bool): True if showing image

    Returns:
        int: next action
    """
    # GPR training data setの作成
    Xtrain = X[train]
    ytrain = y[train]
    reg.fit(Xtrain, ytrain)
    print("kernel=", reg.kernel_)
    y_mean, y_std = reg.predict(X, return_std=True)
    fp = np.max(ytrain)
    #fp = np.max(y_mean)
    xi = param["xi"]
    print("xi", xi)
    z = (y_mean - fp - xi)/y_std
    acq = (y_mean - fp - xi)*norm.cdf(z) + y_std * norm.pdf(z)
    if len(acq.shape) != 1:
        raise
    io = np.argsort(acq)[::-1]
    if io[0] in train:
        # train listに無い最も小さいindexを探す
        print("search the other best candidate")
        for ia1 in io:
            if ia1 not in train:
                ia = ia1
                break
    else:
        ia = io[0]
    if plot:
        idxall = list(range(y.shape[0]))
        plot_GPR(idxall, y, train, ytrain, y_mean, y_std, acq, it, ia=ia, metadata=METADATA)
    return ia


def search_candidate_PI(it, train, X, y, reg, param, plot=True):
    """search next action in the PI method

    Args:
        it (int): the number of iteration
        train (np.array): the index of training data
        X (np.array): descriptor
        y (np.array): target values
        reg (regressor): regressor
        param (dict): EI parameter
        plot (bool): True if showing image

    Returns:
        int: next action
    """
    # GPR training data setの作成
    Xtrain = X[train]
    ytrain = y[train]
    reg.fit(Xtrain, ytrain)
    print("kernel=", reg.kernel_)
    y_mean, y_std = reg.predict(X, return_std=True)
    fp = np.max(ytrain)
    #fp = np.max(y_mean)
    xi = param["xi"]
    z = (y_mean - fp - xi)/y_std

    acq = norm.cdf(z)
    acq = acq.reshape(-1)
    io = np.argsort(acq)[::-1]
    if io[0] in train:
        # train listに無い最も小さいindexを探す
        print("search the other best candidate")
        for ia1 in io:
            if ia1 not in train:
                ia = ia1
                break
    else:
        ia = io[0]
    if plot:
        idxall = list(range(y.shape[0]))
        plot_GPR(idxall, y, train, ytrain, y_mean, y_std, acq, it, ia=ia, metadata=METADATA)
    return ia


In [None]:
def select_initial_traininigset(nall, nselect=10, seed=1):
    """select initial actions

    [0, nselect)のindexは選択しない．

    Args:
        nselect (int, optional): the number of actions to select randomly. Defaults to 10.
        seed_initial_selection (int, optional): random seed. Defaults to 1.

    Returns:
        list: a list of actions
    """
    np.random.seed(seed)
    random.seed(seed)
    train = random.sample(range(nselect, nall), nselect)
    print("initial action", train)
    return train

g_train = select_initial_traininigset(g_X.shape[0], NSELECT_INITIAL, RAND_SEED_INITIAL)
# 初期観測済みデータの選択を行う。

In [None]:
def make_model(optimize, length_scale=0.5):
    """ガウス過程回帰モデルを得る．

    Args:
        optimize (bool): optimizer flag of GaussianProcessRegressor

    Returns:
        GaussianProcessRegressor: ガウス過程回帰モデル
    """
    if optimize:
        kernel = RBF(length_scale=length_scale)
        reg = GaussianProcessRegressor(kernel=kernel)
        # カーネルパラメタの最適化を行う。
    else:
        kernel = RBF(length_scale=length_scale)
        reg = GaussianProcessRegressor(kernel=kernel, optimizer=None)
        # カーネルパラメタの最適化を行わない。
    return reg


g_reg = make_model(optimize=OPTIMIZE, length_scale=LENGTH_SCALE)
# 回帰モデルの定義を行う。

In [None]:
def search_all(X, y, reg, train, acqfunc, aqsparam, maxiteration, plotit):
    """最大maxiteration回ベイズ最適化を行う。

    acqfuncは"UCB", "TS", "PI", "EI"を選択可能．

    Args:
        X (np.ndarray): 全てのX
        y (np.ndarray): 全てのy
        reg (GaussianProcessRegressor): ガウス過程回帰モデル
        train (list[int]): 観測済データ（訓練データ）
        acqfunc (str): 獲得関数名
        aqsparam (dict)): 獲得関数計算時のパラメタ
        maxiteration (int): 最大探索回数
        plotit (bool): 図示するかどうか．

    Raises:
        ValueError: _description_

    Returns:
        np.ndarray: 観測データリスト
    """
    if acqfunc == "UCB":
        search_function = search_candidate_UCB
    elif acqfunc == "TS":
        search_function = search_candidate_TS
    elif acqfunc == "PI":
        search_function = search_candidate_PI
    elif acqfunc == "EI":
        search_function = search_candidate_EI
    else:
        raise ValueError("unknown acqfunc={}".format(acqfunc))
            
    for _it in range(maxiteration):
        print()
        print("iteration=", _it+1)
        print("action=", train)

        _ia = search_function(_it, train, X, y, reg, aqsparam, plot=plotit) 
        #　獲得関数から次のaction=探索点を得る。
        # search_functionでは重複しないように次善点も選択する。
        print("next action=", _ia) # 次のaction=探索点を表示する。

        train = np.hstack([train, _ia]) # 訓練データに追加する。
        if evaluate_break_condition(train):
            print("found both minima, iteration=", _it+1)
            break

    print("\nfinal action", train)
    return train

g_train = search_all(g_X, g_y, g_reg, g_train, ACQ_FUNC,
                     METADATA, MAXITERATION, PLOTIT)
# ベイズ最適化の実行
# カーネルパラメタの表示も行う。

In [None]:
if not PLOTIT:
    # PLOTITでない場合は終了する。
    raise 

In [None]:
# PILがあれば、animation gifを作成、表示する。
try:
    from PIL import Image
    have_PIL = True
except ModuleNotFoundError:
    have_PIL = False
print(have_PIL) # PILライブラリがあるかどうかを表示する。

In [None]:
def show_acq_animation(filename_acqgif, have_PIL):
    if have_PIL:
        return IPython.display.Image(filename_acqgif)
    else:
        return None
    
if have_PIL:
    from BO_misc import make_acq_animation
    filename_acqgif = make_acq_animation(NSELECT_INITIAL, g_train, metadata=METADATA)

show_acq_animation(filename_acqgif, have_PIL)

In [None]:
from BO_misc import show_energysurface
show_energysurface(g_X, g_y, g_df, metadata=METADATA)

In [None]:
from BO_misc import show_2D_actions
show_2D_actions(g_X, g_y, NSELECT_INITIAL, g_train, g_df, DATA_NAME, metadata=METADATA)

In [None]:
def show_gif_pca_gif(train, nselect, metadata, have_PIL):
    if have_PIL:
        from BO_misc import make_pca_gif
        g_filename_pcagif = make_pca_gif(train, nselect, metadata=metadata)
        return IPython.display.Image(g_filename_pcagif)
    else:
        return None
    
show_gif_pca_gif(g_train, NSELECT_INITIAL, METADATA, have_PIL)