In [1]:
!pip install tensorflow
!pip install gpflow==2.9.1

Collecting gpflow==2.9.1
  Downloading gpflow-2.9.1-py3-none-any.whl.metadata (13 kB)
Collecting check-shapes>=1.0.0 (from gpflow==2.9.1)
  Downloading check_shapes-1.1.1-py3-none-any.whl.metadata (2.4 kB)
Collecting deprecated (from gpflow==2.9.1)
  Downloading Deprecated-1.2.14-py2.py3-none-any.whl.metadata (5.4 kB)
Collecting dropstackframe>=0.1.0 (from check-shapes>=1.0.0->gpflow==2.9.1)
  Downloading dropstackframe-0.1.1-py3-none-any.whl.metadata (4.3 kB)
Collecting lark<2.0.0,>=1.1.0 (from check-shapes>=1.0.0->gpflow==2.9.1)
  Downloading lark-1.2.1-py3-none-any.whl.metadata (1.8 kB)
Downloading gpflow-2.9.1-py3-none-any.whl (380 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m380.6/380.6 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading check_shapes-1.1.1-py3-none-any.whl (45 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.8/45.8 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Deprecated-1.2.14-py2.py3-none-any

In [2]:
import numpy as np
import pandas as pd
import gpflow
from scipy.stats import norm
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import os

In [3]:
import tensorflow as tf

In [4]:
from google.colab import drive
import os

# Mount Google Drive
drive.mount('/content/drive')

# Define the directory in Google Drive where you want to save the results
save_dir = '/content/drive/MyDrive/HGP_1D_Results'
os.makedirs(save_dir, exist_ok=True)

Mounted at /content/drive


In [27]:
# Function to generate noisy data
def generate_noisy_data(x, n_points=5, noise_std_range=(0.0, 3.0)):
    X_noisy = []
    y_noisy = []
    noise_levels = []

    for point in x:
        noise_level = np.random.uniform(noise_std_range[0], noise_std_range[1])
        for _ in range(n_points):
            noisy_y = calculate_y(np.array([[point]])) + np.random.normal(0, noise_level)
            X_noisy.append(point)
            y_noisy.append(noisy_y)
            noise_levels.append(noise_level)

    X_noisy = np.array(X_noisy).reshape(-1, 1)
    y_noisy = np.array(y_noisy).reshape(-1, 1)
    noise_levels = np.array(noise_levels).reshape(-1, 1)

    return X_noisy, y_noisy, noise_levels

In [26]:
# assign GP var
gp_var = 1.5

# Function to generate a grid of points for prediction
def generate_grid(x_limits, num_points=100):
    grid = np.linspace(x_limits[0], x_limits[1], num_points)
    return grid.reshape(-1, 1)

# Function to calculate y based on the Forrester function
def calculate_y(x):
    term1 = (6 * x - 2) ** 2 * np.sin(12 * x - 4) + np.random.normal(0, np.sqrt(gp_var), x.shape)
    return term1

def calculate_y_true(x):
    term2 = (6 * x - 2) ** 2 * np.sin(12 * x - 4)
    return term2

In [25]:
# DEFINE HETEROSKEDASTIC GP MODEL + TRAINING
def train_noise_model(X, y):
    kernel = gpflow.kernels.Matern32(lengthscales=0.1)
    noise_model = gpflow.models.GPR(data=(X, y), kernel=kernel, mean_function=None)
    opt = gpflow.optimizers.Scipy()
    opt.minimize(noise_model.training_loss, noise_model.trainable_variables)
    return noise_model

def train_heteroskedastic_gp_model(X, y, noise_variance):
    kernel = gpflow.kernels.SquaredExponential(lengthscales=0.1)
    heteroskedastic_gp_model = gpflow.models.GPR(data=(X, y), kernel=kernel, mean_function=None)
    # Use the mean noise variance instead of per-data-point variance
    heteroskedastic_gp_model.likelihood.variance.assign(1e-5)
    opt = gpflow.optimizers.Scipy()
    opt.minimize(heteroskedastic_gp_model.training_loss, heteroskedastic_gp_model.trainable_variables)
    return heteroskedastic_gp_model

# Expected Improvement function for minimization
def expected_improvement(y_best, f_mean, f_var):
    variance = np.maximum(f_var, 1e-9)  # Ensure variance is non-negative
    std_dev = np.sqrt(variance)
    delta = y_best - f_mean
    with np.errstate(divide='ignore'):
        Z = delta / std_dev
        ei = delta * norm.cdf(Z) + std_dev * norm.pdf(Z)  # Correctly adjusted for minimization
        ei[std_dev == 0.0] = 0.0
    return ei

In [29]:
# MAIN SCRIPT
x_limits = [0., 1.]

# Define the levels for x
x_manual = np.array([0.0, 0.5, 1.0])

# Initialize an empty DataFrame to store results
results_df = pd.DataFrame(columns=['iteration', 'x1', 'y', 'ei', 'LCB', 'y_best', 'true_var', 'pred_var'])

num_iterations = 3  # Define the number of iterations

# Initialize with initial noisy data
X, y, noise_levels = generate_noisy_data(x_manual, n_points=5)

for iteration in range(num_iterations):
    # Train noise model to estimate the noise variance
    noise_model = train_noise_model(X, y)
    pred_noise_var, _ = noise_model.predict_f(X)
    pred_noise_var = pred_noise_var.numpy()

    # Train heteroskedastic GP model
    heteroskedastic_gp_model = train_heteroskedastic_gp_model(X, y, pred_noise_var)

    # Generate grid for prediction
    X_plot = generate_grid(x_limits, 500)

    # Predict f_mean and f_var for the model
    f_mean, f_var = heteroskedastic_gp_model.predict_f(X_plot)
    f_mean = f_mean.numpy()
    f_var = f_var.numpy()

    # Calculate true y values for comparison
    y_true = calculate_y_true(X_plot)

    # Calculate LCB and UCB for plotting
    LCB = f_mean - 1.96 * np.sqrt(f_var)
    UCB = f_mean + 1.96 * np.sqrt(f_var)

    # y_best = minimum value of y observed so far
    y_best = np.min(y)

    # Calculate EI
    ei = expected_improvement(y_best, f_mean, f_var)

    # Find the point that maximizes EI
    max_ei_index = np.argmax(ei)
    x_max_ei = X_plot[max_ei_index]
    y_max_ei_predicted = f_mean[max_ei_index]

    # Find the point with the lowest LCB
    min_lcb_index = np.argmin(LCB)
    x_min_lcb = X_plot[min_lcb_index]
    y_min_lcb_predicted = f_mean[min_lcb_index]

    # Generate new noisy data points for the new X values (EI and LCB points)
    X_new = np.array([[x_max_ei[0]], [x_min_lcb[0]]])
    X_new_noisy, y_new_noisy, _ = generate_noisy_data(X_new.flatten(), n_points=1)

    # Update the training data with the new noisy points
    X = np.vstack((X, X_new_noisy))
    y = np.vstack((y, y_new_noisy))

    # Store the results in the DataFrame
    new_rows = pd.DataFrame([
        {
            'iteration': iteration,
            'x1': x_max_ei[0],
            'y': y_new_noisy[0, 0],
            'ei': ei[max_ei_index],
            'LCB': None,
            'y_best': y_best,
            'true_var': np.var(y_true - f_mean),
            'pred_var': f_var[max_ei_index]
        },
        {
            'iteration': iteration,
            'x1': x_min_lcb[0],
            'y': y_new_noisy[1, 0],
            'ei': ei[min_lcb_index],
            'LCB': LCB[min_lcb_index],
            'y_best': y_best,
            'true_var': np.var(y_true - f_mean),
            'pred_var': f_var[min_lcb_index]
        }
    ])
    results_df = pd.concat([results_df, new_rows], ignore_index=True)

    # Plotting
    fontsize = 10
    linewidth = 1.5

    # Plot preparation
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(9, 3), dpi=300)
    fig.tight_layout(pad=3.0)

    # Plot f_mean, LCB, UCB, and scatter observed points
    ax1.fill_between(X_plot.flatten(), LCB.flatten(), UCB.flatten(), color='silver', alpha=0.3)
    ax1.plot(X_plot, f_mean, color="green", linestyle='-', linewidth=linewidth, label='Mean')
    ax1.plot(X_plot, LCB, color="red", linestyle='-', linewidth=linewidth, label='LCB')
    ax1.plot(X_plot, UCB, color="blue", linestyle='-', linewidth=linewidth, label='UCB')
    ax1.plot(X_plot, y_true, color="black", linestyle='-', linewidth=linewidth, label='y_true')
    ax1.set_xticks(X.flatten())
    ax1.tick_params(axis='x', labelsize=fontsize)
    ax1.tick_params(axis='y', labelsize=fontsize)
    ax1.set_xlabel("x", fontsize=fontsize)
    ax1.set_ylabel("f", fontsize=fontsize)
    ax1.grid(True, which='major', color='gray', linestyle='--')
    ax1.scatter(X.flatten(), y.flatten(), color="black", s=10, label='Observed Points')
    ax1.xaxis.set_major_formatter(FuncFormatter(lambda x, _: f"{x:.2f}"))

    # Plot EI
    ax2.plot(X_plot, ei, color="red", linewidth=linewidth, label='EI')
    ax2.set_xlabel("x", fontsize=fontsize)
    ax2.set_ylabel("EI", fontsize=fontsize)
    ax2.set_xticks(X.flatten())
    ax2.tick_params(axis='x', labelsize=fontsize)
    ax2.tick_params(axis='y', labelsize=fontsize)
    ax2.set_xlabel("x", fontsize=fontsize)
    ax2.set_ylabel("EI", fontsize=fontsize)
    ax2.grid(True, which='major', color='gray', linestyle='--')
    ax2.xaxis.set_major_formatter(FuncFormatter(lambda x, _: f"{x:.2f}"))

    # Save the plot to a file in Google Drive
    plt.savefig(os.path.join(save_dir, f'HeteroNoise_HGP_Forrester_iteration_{iteration + 1}_1Dplots.png'))
    plt.close()

    print(f"Figure saved for iteration {iteration + 1}")

# After all iterations have completed, save the final results to a CSV file in Google Drive
results_df.to_csv(os.path.join(save_dir, 'HeteroNoise_Final_Forrester_1D_HGP_Results.csv'), index=False)
print("Final results saved to Final_HGP_Results.csv")


  results_df = pd.concat([results_df, new_rows], ignore_index=True)


Figure saved for iteration 1
Figure saved for iteration 2
Figure saved for iteration 3
Final results saved to Final_HGP_Results.csv
