In [1]:
import numpy as np
from scipy import stats
from scipy.stats import norm
from pymoo.indicators.hv import HV
from pymoo.util.dominator import Dominator
import sobol_seq
import matplotlib.pyplot as plt

In [2]:
def fast_non_dominated_sort(F, **kwargs):
    
    M = Dominator.calc_domination_matrix(F)

    # calculate the dominance matrix
    n = M.shape[0]

    fronts = []

    if n == 0:
        return fronts

    # final rank that will be returned
    n_ranked = 0
    ranked = np.zeros(n, dtype=int)

    # for each individual a list of all individuals that are dominated by this one
    is_dominating = [[] for _ in range(n)]

    # storage for the number of solutions dominated this one
    n_dominated = np.zeros(n)

    current_front = []

    for i in range(n):

        for j in range(i + 1, n):
            rel = M[i, j]
            if rel == 1:
                is_dominating[i].append(j)
                n_dominated[j] += 1
            elif rel == -1:
                is_dominating[j].append(i)
                n_dominated[i] += 1

        if n_dominated[i] == 0:
            current_front.append(i)
            ranked[i] = 1.0
            n_ranked += 1

    # append the first front to the current front
    fronts.append(current_front)

    # while not all solutions are assigned to a pareto front
    while n_ranked < n:

        next_front = []

        # for each individual in the current front
        for i in current_front:

            # all solutions that are dominated by this individuals
            for j in is_dominating[i]:
                n_dominated[j] -= 1
                if n_dominated[j] == 0:
                    next_front.append(j)
                    ranked[j] = 1.0
                    n_ranked += 1

        fronts.append(next_front)
        current_front = next_front

    return fronts

In [3]:
def EHVI_2D(PF,r,mu,sigma):
    n = PF.shape[0]
    S1 = np.array([r[0],-np.inf])
    S1 = S1.reshape(1,-1)
    Send = np.array([-np.inf,r[1]])
    Send = Send.reshape(1,-1)
    index = np.argsort(PF[:,1])
    
    S = PF[index,:]
    
    S = np.concatenate((S1,S,Send),axis = 0)
    
    y1 = S[:,0] 
    y2 = S[:,1]
    
    y1 = y1.reshape(-1,1)
    y2 = y2.reshape(-1,1)
    
    mu = mu.reshape(1,-1)
    sigma = sigma.reshape(1,-1)
        
    sum_total1 = 0;
    sum_total2 = 0;
    
    for i in range(1,n+2):
        t = (y1[i] - mu[0][0])/sigma[0][0]
        if i==n+1:
            sum_total1 = sum_total1
        else:
            sum_total1 = sum_total1 + (y1[i-1] - y1[i])*stats.norm.cdf(t)*psi_cal(y2[i],y2[i],mu[0][1],sigma[0][1])
        sum_total2 = sum_total2 + (psi_cal(y1[i-1],y1[i-1],mu[0][0],sigma[0][0]) \
                                   - psi_cal(y1[i-1],y1[i],mu[0][0],sigma[0][0]))*psi_cal(y2[i],y2[i],mu[0][1],sigma[0][1])
        
    EHVI = sum_total1 + sum_total2
    return EHVI


def psi_cal(a,b,m,s):
    t = (b - m)/s
    return s*stats.norm.pdf(t) + (a - m)*stats.norm.cdf(t)

In [4]:
def sobol_multivariate_normal(mean, cov, n_samples, sobol_samples=None):
    """
    Draw or update Sobol samples for a multi-variate normal distribution.
    
    Parameters:
    - mean: Mean vector of the multi-variate normal distribution.
    - cov: Covariance matrix of the multi-variate normal distribution.
    - n_samples: Number of Sobol samples to generate.
    - sobol_samples: (Optional) Existing Sobol samples to transform.
    
    Returns:
    - samples: Sobol samples transformed to follow the multi-variate normal distribution.
    """
    dim = len(mean)
    
    if sobol_samples is None:
        # Generate Sobol samples in the unit hypercube
        sobol_samples = sobol_seq.i4_sobol_generate(dim, n_samples)
    
    # Transform Sobol samples to follow the standard normal distribution using the inverse CDF (quantile function)
    norm_samples = norm.ppf(sobol_samples)
    
    # Use the Cholesky decomposition to transform standard normal samples to the target multi-variate normal distribution
    L = np.linalg.cholesky(cov)
    samples = norm_samples @ L.T + mean
    
    return samples, sobol_samples

def EHVI_monte_carlo(PF,r,mu,sigma,HV_PF):
    mu = mu.reshape(-1)
    sigma = sigma.reshape(-1)
    Sigma = np.diag(sigma**2)
#     samples = np.random.multivariate_normal(mean = mu,cov = Sigma,size=50000)
    samples,ss_samples = sobol_multivariate_normal(mu, Sigma, 1000)
    
#     plt.scatter(samples[:,0],samples[:,1])
#     plt.show()
#     print(samples)
    n_samples = samples.shape[0]
    HV_cont = np.zeros(n_samples)
    for i in range(0,n_samples):
        #print(i)
        s = samples[i,:]
        s = s.reshape(1,-1)
        PF_temp = np.concatenate((PF,s),axis=0)
        #print(PF_temp)
        front_index = fast_non_dominated_sort(PF_temp)
        #print(front_index[0])
        front_index = np.asarray(front_index[0])
        #print(front_index)
        PF_i = PF_temp[front_index,:]
        PF_i = PF_i.reshape(-1,PF.shape[1])
        
        ind = HV(ref_point=r)
#         HV_PF = ind(pareto_front)
#         HV_cont[i] = calculate_hv(PF_i,r) - HV_PF
        HV_cont[i] = ind(PF_i) - HV_PF

    return np.mean(HV_cont)




In [5]:
# pareto_front = np.array([[0.2, 0.4], [0.5, 0.3], [0.7, 0.2]])
# ref_point = np.array([2.0, 1.0])

pareto_front = np.array([[2, 3], [4, 1]])
ref_point = np.array([5, 5])
# HV_PF = calculate_hv(pareto_front,ref_point)

ind = HV(ref_point=ref_point)
HV_PF = ind(pareto_front)
# print(HV_PF)

# mu  = np.array([5,0.1])
mu = np.array([3,2])
std_vector = np.array([2,0.01])
EHVI_analytical = EHVI_2D(pareto_front,ref_point,mu,std_vector)
EHVI_approximation = EHVI_monte_carlo(pareto_front,ref_point,mu,std_vector,HV_PF)

# print(EHVI_analytical,EHVI_GPT,EHVI_approximation)
print(EHVI_analytical,EHVI_approximation)

[2.18677934] 2.1725615495592985
