# Imports

In [None]:
%matplotlib inline
from matplotlib import colors
import matplotlib.pyplot as plt
import numpy as np
import time
from itertools import product
from multifidelityfunctions import oneDimensional as OD
from multifidelityfunctions import MultiFidelityFunction
from multiLevelCoSurrogates import CandidateArchive, Surrogate, HierarchicalSurrogate, MultiFidelityBO, create_random_sample_set
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.ensemble import RandomForestRegressor

Print settings/helpers

In [None]:
from IPython.core.display import clear_output
from pprint import pprint
np.set_printoptions(linewidth=200)

# Recreating the example plot in [Forrester2007 (Multi-fidelity optimization via surrogate modelling)](https://royalsocietypublishing.org/doi/full/10.1098/rspa.2007.1900)

## Step by step construction

The function in question:

In [None]:
plot_x = np.linspace(start=0,stop=1,num=101).reshape(-1,1)

plot_high = OD.high(plot_x)
plot_low = OD.low(plot_x)

plt.plot(plot_x, plot_high, label='high')
plt.plot(plot_x, plot_low, label='low')
plt.legend(loc=1)
plt.show()

Showing the datapoints selected by the paper.

In [None]:
high_x = np.array([0, .4, .6, 1]).reshape(-1,1)
low_x = np.linspace(0,1,11).reshape(-1,1)

high_y = OD.high(high_x)
low_y = OD.low(low_x)


line, = plt.plot(plot_x, plot_high, label='high')
plt.scatter(high_x, high_y, color=line.get_color())
line, = plt.plot(plot_x, plot_low, label='low')
plt.scatter(low_x, low_y, color=line.get_color())
plt.legend(loc=1)
plt.show()

Training Gaussian Process models for each fidelity exclusively. Low-fidelity is a good fit, high fidelity is not.

In [None]:
gp_direct = GaussianProcessRegressor()
gp_direct.fit(high_x, high_y)

gp_low = GaussianProcessRegressor()
gp_low.fit(low_x, low_y)

In [None]:
line, = plt.plot(plot_x, plot_high, label='high')
plt.scatter(high_x, high_y, color=line.get_color())
line, = plt.plot(plot_x, plot_low, label='low')
plt.scatter(low_x, low_y, color=line.get_color())

plt.plot(plot_x, gp_direct.predict(plot_x), label='high-fit GP')
plt.plot(plot_x, gp_low.predict(plot_x), label='low-fit GP')

plt.legend(loc=1)
plt.show()

Co-Kriging formulation is $\hat{f}_h(x) = \rho * f_l(x) + \delta(x)$. <br>
$\hat{f}_h(x)$ is the high-fidelity prediction at $x$<br>
$\rho$ is a scaling factor<br>
$f_l(x)$ is a low-fidelity information input (either actual or another model) at $x$<br>
$\delta(x)$ is a prediction for the difference between $f_h(x)$ and $\rho * f_l(x)$<br>

$\rho$ is calculated as $1 / (1/n)\Sigma_{i=1}^n f_h(x_i) / f_l(x_i)$, i.e. `1/mean(f_high(x_high) / f_low(x_high))` with `x_high` being all input for which we have high-fidelity outcomes.

Here we start by plotting just the parts of this equation.<br>
In this example, there is an explicit scaling factor of __2__ between high and low fidelity that is seen to be easily captured by the difference model $\delta(x)$/`gp_diff`

In [None]:
low_at_high = np.array(OD.low([x for x in high_x])).reshape(-1,1)
scale = 1/np.mean(high_y / low_at_high)

diff_x = high_x
diff_y = np.array([(OD.high(x) - scale*OD.low(x)) for x in diff_x])

gp_diff = GaussianProcessRegressor()
gp_diff.fit(diff_x, diff_y)

line, = plt.plot(plot_x, plot_high, label='high')
plt.scatter(high_x, high_y, color=line.get_color())
line, = plt.plot(plot_x, plot_low, label='low')
plt.scatter(low_x, low_y, color=line.get_color())

plt.plot(plot_x, gp_direct.predict(plot_x), label='high-fit GP')
plt.plot(plot_x, gp_low.predict(plot_x), label='low-fit GP')

plt.plot(plot_x, plot_high - plot_low, label='diff')
plt.plot(plot_x, gp_diff.predict(plot_x), label='scaled diff-fit GP')

plt.legend(loc=1)
plt.show()

And now with the actual co-kriging prediction plotted.

In [None]:
co_y = lambda x: scale*gp_low.predict(x) + gp_diff.predict(x)

line, = plt.plot(plot_x, plot_high, label='high')
plt.scatter(high_x, high_y, color=line.get_color())
line, = plt.plot(plot_x, plot_low, label='low')
plt.scatter(low_x, low_y, color=line.get_color())

plt.plot(plot_x, gp_direct.predict(plot_x), label='high-fit GP')
plt.plot(plot_x, gp_low.predict(plot_x), label='low-fit GP')

# plt.plot(plot_x, plot_high - plot_low, label='diff')
# plt.plot(plot_x, gp_diff.predict(plot_x.reshape(-1,1)), label='diff-fit GP')

plt.plot(plot_x, co_y(plot_x), label='co-kriging')

plt.legend(loc=1)
plt.show()

## Direct construction with (Hierarchical)Surrogate

Recreating the same plot as above using our own (Hierarchical)Surrogate interface.

### Without normalization by Surrogate

In [None]:
low_x = np.linspace(0,1,11).reshape((-1,1))
high_x = low_x[[0,4,6,10]].reshape((-1,1))

archive = CandidateArchive(ndim=1, fidelities=['high', 'low', 'high-low'])
archive.addcandidates(low_x, OD.low(low_x), fidelity='low')
archive.addcandidates(high_x, OD.high(high_x), fidelity='high')

surr_high = Surrogate.fromname('Kriging', archive, fidelity='high', normalized=False)
surr_low = Surrogate.fromname('Kriging', archive, fidelity='low', normalized=False)
surr_hier = HierarchicalSurrogate('Kriging', surr_low, archive, ['high', 'low'], normalized=False)

surr_high.train()
surr_low.train()
surr_hier.train()

# Plotting
x = np.linspace(start=0,stop=1,num=101).reshape(-1,1)
plt.plot(x, OD.high(x), label='high')
plt.plot(x, OD.low(x), label='low')
plt.plot(x, surr_high.predict(x), label='high-fit GP')
plt.plot(x, surr_low.predict(x), label='low-fit GP')
plt.plot(x, surr_hier.predict(x), label='co-kriging')
plt.legend(loc=0)
plt.tight_layout()
plt.show()

### With normalization by Surrogate

Just to show that the normalization is correctly implemented.<br>
Because of the values in this example, it's not really needed, but if the results at least don't get worse in this case, it's probably correct.

In [None]:
low_x = np.linspace(0,1,11).reshape((-1,1))
high_x = low_x[[0,4,6,10]].reshape((-1,1))

archive = CandidateArchive(ndim=1, fidelities=['high', 'low', 'high-low'])
archive.addcandidates(low_x, OD.low(low_x), fidelity='low')
archive.addcandidates(high_x, OD.high(high_x), fidelity='high')

surr_high = Surrogate.fromname('Kriging', archive, fidelity='high', normalized=True)
surr_low = Surrogate.fromname('Kriging', archive, fidelity='low', normalized=True)
surr_hier = HierarchicalSurrogate('Kriging', surr_low, archive, ['high', 'low'], normalized=True)

surr_high.train()
surr_low.train()
surr_hier.train()

# Plotting
x = np.linspace(start=0,stop=1,num=101).reshape(-1,1)
plt.plot(x, OD.high(x), label='high')
plt.plot(x, OD.low(x), label='low')
plt.plot(x, surr_high.predict(x), label='high-fit GP')
plt.plot(x, surr_low.predict(x), label='low-fit GP')
plt.plot(x, surr_hier.predict(x), label='co-kriging')
plt.legend(loc=0)
plt.tight_layout()
plt.show()

## Direct construction with MultiFidelityBO

Recreating the same plot again with the MultiFidelityBO (Bayesian Optimization) interface.<br>
This interface automatically creates a full set of hierarchical models for any number of fidelities.

In [None]:
low_x = np.linspace(0,1,11).reshape((-1,1))
high_x = low_x[[0,4,6,10]].reshape((-1,1))

archive = CandidateArchive(ndim=1, fidelities=['high', 'low', 'high-low'])
archive.addcandidates(low_x, OD.low(low_x), fidelity='low')
archive.addcandidates(high_x, OD.high(high_x), fidelity='high')

mfbo = MultiFidelityBO(OD, archive, output_range=(-10, 16))

# Plotting
x = np.linspace(start=0,stop=1,num=101).reshape(-1,1)
plt.plot(x, OD.high(x), label='high')
plt.plot(x, OD.low(x), label='low')
plt.plot(x, mfbo.direct_models['high'].predict(x), label='high-fit GP')
plt.plot(x, mfbo.models['low'].predict(x), label='low-fit GP')
plt.plot(x, mfbo.models['high'].predict(x), label='co-kriging')
plt.legend(loc=0)
plt.tight_layout()
plt.show()

## Trade-off heatmap: number of high- vs. low-fidelity points

### Random Sample generation

In [None]:
max_high = 40
max_low = 100
num_reps = 30

mse_tracking = np.zeros((max_high+1, max_low+1, num_reps, 3))

cases = list(product(range(2, max_high+1), range(3, max_low+1), range(num_reps)))

for idx, case in enumerate(cases):
    num_high, num_low, rep = case
    
    if num_high >= num_low:
        continue
    if idx % 100 == 0:
        clear_output()
        print(f'{idx}/{len(cases)}')
    
    low_x = np.random.rand(num_low)
    high_x = np.random.choice(low_x, num_high, replace=False).reshape((-1,1))
    low_x = low_x.reshape((-1,1))
    
    archive = CandidateArchive(ndim=1, fidelities=['high', 'low', 'high-low'])
    archive.addcandidates(low_x, OD.low(low_x), fidelity='low')
    archive.addcandidates(high_x, OD.high(high_x), fidelity='high')
    
    mfbo = MultiFidelityBO(OD, archive, output_range=(-10, 16))
    mse_tracking[num_high, num_low, rep] = mfbo.getMSE()

clear_output()
print(f'{len(cases)}/{len(cases)}')

In [None]:
filtered_mse_tracking = np.copy(mse_tracking)
filtered_mse_tracking[mse_tracking == 0] = np.nan
plot_data = np.median(filtered_mse_tracking, axis=2)

In [None]:
print('median')
pprint([(f'{95+i}%-ile', np.percentile(np.median(mse_tracking, axis=2).flatten(), 95+i)) for i in range(6)])

In [None]:
norm = colors.LogNorm(vmin=.5, vmax=100, clip=True)
fig, axes = plt.subplots(figsize=(7.5,9), sharey=True)
plt.subplot(311)
plt.title('high (hierarchical)')
img = plt.imshow(plot_data[:,:,0], cmap='viridis_r')
img.set_norm(norm)
plt.subplot(312)
plt.title('high (direct)')
img = plt.imshow(plot_data[:,:,1], cmap='viridis_r')
img.set_norm(norm)
plt.subplot(313)
plt.title('low (direct)')
img = plt.imshow(plot_data[:,:,2], cmap='viridis_r')
img.set_norm(norm)
fig.text(0.06, 0.5, '#High-fid samples', ha='center', va='center', rotation='vertical')
fig.colorbar(img, ax=axes, orientation='vertical')
plt.xlabel('#Low-fid samples')
plt.tight_layout()
plt.show()

### Linspace, random subsample generation

In [None]:
max_high = 40
max_low = 100
num_reps = 30

lin_mse_tracking = np.zeros((max_high+1, max_low+1, num_reps, 3))

cases = list(product(range(2, max_high+1), range(3, max_low+1), range(num_reps)))

for idx, case in enumerate(cases):
    num_high, num_low, rep = case
    
    if num_high >= num_low:
        continue
    if idx % 100 == 0:
        clear_output()
        print(f'{idx}/{len(cases)}')
    
    low_x = np.linspace(start=0, stop=1, num=num_low, endpoint=True)
    high_x = np.random.choice(low_x, num_high, replace=False).reshape((-1,1))
    low_x = low_x.reshape((-1,1))
    
    archive = CandidateArchive(ndim=1, fidelities=['high', 'low', 'high-low'])
    archive.addcandidates(low_x, OD.low(low_x), fidelity='low')
    archive.addcandidates(high_x, OD.high(high_x), fidelity='high')
    
    mfbo = MultiFidelityBO(OD, archive, output_range=(-10, 16))
    lin_mse_tracking[num_high, num_low, rep] = mfbo.getMSE()

clear_output()
print(f'{len(cases)}/{len(cases)}')

In [None]:
filtered_lin = np.copy(lin_mse_tracking)
filtered_lin[lin_mse_tracking == 0] = np.nan
lin_plot_data = np.median(filtered_lin, axis=2)

In [None]:
print('median')
pprint([(f'{95+i}%-ile', np.percentile(np.median(lin_mse_tracking, axis=2).flatten(), 95+i)) for i in range(6)])

In [None]:
norm = colors.LogNorm(vmin=.5, vmax=100, clip=True)
fig, axes = plt.subplots(figsize=(7.5,9), sharey=True)
plt.subplot(311)
plt.title('high (hierarchical)')
img = plt.imshow(lin_plot_data[:,:,0], cmap='viridis_r')
img.set_norm(norm)
plt.subplot(312)
plt.title('high (direct)')
img = plt.imshow(lin_plot_data[:,:,1], cmap='viridis_r')
img.set_norm(norm)
plt.subplot(313)
plt.title('low (direct)')
img = plt.imshow(lin_plot_data[:,:,2], cmap='viridis_r')
img.set_norm(norm)
fig.text(0.06, 0.5, '#High-fid samples', ha='center', va='center', rotation='vertical')
fig.colorbar(img, ax=axes, orientation='vertical')
plt.xlabel('#Low-fid samples')
plt.tight_layout()
plt.show()

# EGO - 1D function

First creating an inverted function as BO is currently hardcoded for maximization problems

In [None]:
inv_OD = MultiFidelityFunction(
    u_bound=np.array(OD.u_bound), l_bound=np.array(OD.l_bound),
    functions=[lambda x: -OD.high(x), lambda x: -OD.low(x)],
    fidelity_names=['high', 'low'],
)

In [None]:
low_x = np.linspace(0,1,6).reshape((-1,1))
high_x = low_x[[2,3]].reshape((-1,1))

archive = CandidateArchive(ndim=1, fidelities=['high', 'low', 'high-low'])
archive.addcandidates(low_x, inv_OD.low(low_x), fidelity='low')
archive.addcandidates(high_x, inv_OD.high(high_x), fidelity='high')

mfbo = MultiFidelityBO(inv_OD, archive, output_range=(-16, 10), schema=[1,1])


# Plotting
plt.ion()
x = np.linspace(start=0,stop=1,num=101).reshape(-1,1)
line_1, = plt.plot(x, inv_OD.high(x), label='high')
scat_1 = plt.scatter(high_x, inv_OD.high(high_x), color=line_1.get_color())
line_2, = plt.plot(x, inv_OD.low(x), label='low')
scat_2 = plt.scatter(low_x, inv_OD.low(low_x), color=line_2.get_color())
line_high, = plt.plot(x, mfbo.direct_models['high'].predict(x), label='high-fit GP')
line_low,  = plt.plot(x, mfbo.models['low'].predict(x), label='low-fit GP')
line_hier, = plt.plot(x, mfbo.models['high'].predict(x), label='co-kriging')
plt.title('0')
plt.ylim([-16, 10])
plt.legend(loc=0)
plt.tight_layout()
plt.show()

for idx in range(15):
    mfbo.iteration(idx)
    clear_output()
    
    line_1, = plt.plot(x, inv_OD.high(x), label='high')
    scat_1 = plt.scatter(*archive.getcandidates(fidelity='high'), color=line_1.get_color())
    line_2, = plt.plot(x, inv_OD.low(x), label='low')
    scat_2 = plt.scatter(*archive.getcandidates(fidelity='low'), color=line_2.get_color())
    line_high, = plt.plot(x, mfbo.direct_models['high'].predict(x), label='high-fit GP')
    line_low,  = plt.plot(x, mfbo.models['low'].predict(x), label='low-fit GP')
    line_hier, = plt.plot(x, mfbo.models['high'].predict(x), label='co-kriging')
    
    plt.title(f'{idx+1}')
    plt.ylim([-16, 10])
    plt.legend(loc=0)
    plt.tight_layout()
    plt.show()
    time.sleep(2)