In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
import scipy

%matplotlib widget

# Dominance based simulated anealling (DBMOSA): an metaheauristic algorithm for solving multi-objective optimisation problems

DBMOSA is version of the single-objective optimization algorithm; simulated anealling. The algorithm iterates from an initial condition to converge to the set of Pareto Optimal solutions and the Pareto Optimal front in the objective space.

## Optimization functions and algorithm helper functions

functions below and their use:
- objective_1 & objective_2:     objective functions
- constraint:                   decision variable bound constraint
- dominance: check how many variables in an archive dominate a new solution
- dominace_removal: loop through archive and remove any dominated solutions
- sharing_function: helper function to compute m(x) for Kernel diversification
- kernel_diversification: performs a kernel diversification approach in the decision space
- histogram_diversification: performs histogram diversification in the objective space 

In [2]:
# objective functions, constraints and other help functions
def objective_1(x):
    return x**2

def objective_2(x):
    return (x-2)**2

def constraint(x):
    # is x within bounds or not
    if x >= -1e5 and x <= 1e5:
        return True
    else: 
        return False

# this function determines how much candidate solutions in a list dominate a specifc solution
def dominance(A,x,plot=True, compute_fitness=True,f_1_x_new=None,f_2_x_new=None):

    # objective function values for query solution
    if compute_fitness == True: # compute fitness
        f_1_x = objective_1(x)
        f_2_x = objective_2(x)
    else: # use fitness values passed to this function, this is used for diversification strategies
        f_1_x = f_1_x_new
        f_2_x = f_2_x_new

    if plot == True:
        plt.figure(figsize=(5,5))
        plt.plot(f_1_x,f_2_x,'bo',zorder=10)
        plt.xlabel('function 1')
        plt.ylabel('function 2')

    # for each candidate solution in an archive / list, check if they dominate the queried solution
    dominate_count = 0
    for i in A:

        # objective function values for an existing archive solution
        f_1_i = objective_1(i)
        f_2_i = objective_2(i)

        # plot to visualise dominance
        if plot == True:
            plt.plot(f_1_i,f_2_i,'rs',zorder=1)

        if f_1_i < f_1_x and f_2_i < f_2_x:
            dominate_count += 1
            # print(i,f_1_i,f_2_i)
            # print('domintes: ')
            # print(x,f_1_x,f_2_x)

    return dominate_count

# when a new solution is added to the archive we must remove any dominated by this new solution, this function does this.
def dominance_removal(archive,x,plot=True):

    A = archive.copy()

    # objective function values for new solution
    f_1_x = objective_1(x)
    f_2_x = objective_2(x)


    if plot == True:
        plt.figure(figsize=(5,5))
        plt.plot(f_1_x,f_2_x,'bo',label='New solution',zorder=10)
        plt.xlabel('function 1')
        plt.ylabel('function 2')
        plt.legend()
        
    # loop through each archive value
    for i in A:

        # objective function values for an existing archive solution
        f_1_i = objective_1(i)
        f_2_i = objective_2(i)

        # plot to visualise dominance
        if plot == True:
            plt.plot(f_1_i,f_2_i,'rs',zorder=1)

        if f_1_i > f_1_x and f_2_i > f_2_x:
            print(x,f_1_x,f_2_x)
            print('domintes: ')
            print(i,f_1_i,f_2_i)
            print('dropping this candidate solution from archive')

            A.remove(i)

    return A

# functions for kernel diversification

# sharing function
def sharing_function(archive,x,sigma):

    # calculate distance between each point in the archive and current solution x
    distances = []
    for i in archive:
        dist = abs(x - i)
        distances.append(dist)

    # sharing function
    sharing_vector = []
    for dist in distances:
        if dist < sigma:
            sh_i = 1 - dist/sigma
        else:
            sh_i = 0
        
        sharing_vector.append(sh_i)

    return sum(sharing_vector)
    

# this function performs kernel diversification
def kernel_diversification(archive,x,sigma): # sigma is the niche size

    # compute sharing counter
    m_x = sharing_function(archive,x,sigma)

    # compute fitness for solution
    f_x_1 = objective_1(x)
    f_x_2 = objective_2(x)

    # check that m_x is not zero, otherwise we cannot take log
    if m_x > 1:
        # compute new fitness for solution
        f_x_1_new = m_x*f_x_1
        f_x_2_new = m_x*f_x_2

    elif m_x > 0:
        # compute new fitness for solution
        f_x_1_new = f_x_1/m_x
        f_x_2_new = f_x_2/m_x

    else:
        f_x_1_new = f_x_1
        f_x_2_new = f_x_2

    return f_x_1_new,f_x_2_new

def histogram_diversification(x,ax,f_1_quads_safe,f_2_quads_safe,locations_dict):
    # define quadrants
    f_1_quads = f_1_quads_safe.copy()
    f_2_quads = f_2_quads_safe.copy()

    # current solution objective values, append to grid lists
    f_x_1 = objective_1(x)
    f_x_2 = objective_2(x)

    # append f_1_x and f_2_x to list of grid lines, 
    f_1_quads.append(f_x_1)
    f_2_quads.append(f_x_2)
    
    # afterwards sort list
    f_1_quads = sorted(f_1_quads)
    f_2_quads = sorted(f_2_quads)

    # find index location in sorted list of grid lines, this index is the x or y axis grid position
    pos_f_1 = f_1_quads.index(f_x_1)
    pos_f_2 = f_2_quads.index(f_x_2)

    # check that grid spot is not taken, check against record of populated grid spots
    if locations_dict[f'{pos_f_1},{pos_f_2}'] == 0: # spot free and append value to archive
        locations_dict[f'{pos_f_1},{pos_f_2}'] = 1
        add_x = True
    else:
        add_x = False

    return locations_dict,add_x

# function to determine which grid locations are populated by current archive members
def archive_grid_locations(archive,f_1_quads_safe,f_2_quads_safe):

    # define dict to store populated grid spots
    keys = []
    for i in range(0,len(f_1_quads_safe)+1):
        for j in range(0,len(f_1_quads_safe)+1):
            keys.append(f'{i},{j}')

    locations_dict = dict.fromkeys(keys,0)

    # add each archive member to a grid spot
    for x in archive:
        # determine x objective function values
        locations_dict,add_x = histogram_diversification(x,ax,f_1_quads_safe,f_2_quads_safe,locations_dict)

    return locations_dict


# DBMOSA algo          
def DBMOSA(c_max, d_max, Tmin, Tmax,diversification=False,sigma=None,grid_step_size=None):

    # initialise
    iteration = 0
    epoch = 0
    c = 0 # counter for number of neighbours accepted
    d = 0 # counter for number of neighbours rejected
    T = Tmax
    x = 0
    archive = [x]
    stopping_criteria = False
    archive_size = []
    temperature = []

    if diversification == 'histogram':
        # define static histogram grid
        f_1_quads_safe = list(np.arange(0,10,grid_step_size))
        f_2_quads_safe = list(np.arange(0,10,grid_step_size))

        # setup dictonary to keep track of populated grid spots
        keys = []
        for i in range(0,len(f_1_quads_safe)+1):
            for j in range(0,len(f_1_quads_safe)+1):
                keys.append(f'{i},{j}')

        locations_dict = dict.fromkeys(keys,0)

    # loop until max number of epoch reached
    while stopping_criteria == False:
        print('\nIteration: ',iteration,' Epoch: ',epoch ,'Current archive', archive)
        print('Current candidate solution, x:', x, objective_1(x), objective_2(x))

        # step 1: check termination condition
        if  T < Tmin: 
            stopping_criteria = True
            print('\nOptimization complete, here is the archive of Pareto Optimal solutions: ',archive)
            break


        # step 2: check if number of rejections has hit limit: increase temp
        if d == d_max:
            print('Temperature increased!')
            # increase temperature 

            # geometric
            # T = T*1.1

            # linear
            T=T+0.01*Tmax


            # increment epoch counter and single epoch relevant counts
            epoch += 1
            c = 0
            d = 0
            archive_size.append(len(archive))
            temperature.append(T)
            continue # skip rest of loop and restart loop

        # step 3: check if number of acceptances has hit limit: decrease temp
        if c == c_max:
            print('Temperature decreased!')
            # decrease temperature 
            
            # geometric
            # T=T*0.9

            # linear
            T=T-0.01*Tmax

            # increment epoch counter and single epoch relevant counts
            epoch += 1
            c = 0
            d = 0
            archive_size.append(len(archive))
            temperature.append(T)
            continue # skip rest of loop and restart loop

        # step 4: generate stochasic neighbouring solution

        x_prime = round(x*(random.uniform(-1.5,1.5)) + 1,5)

        # check new x is valid
        constraint_passed = constraint(x_prime) # function checks if x_prime is within allowed bounds or not
        while constraint_passed != True:
            x_prime = round(x*(random.uniform(-1.5,1.5)) +1,5)
            constrain_passed = constraint(x_prime)
        
        print('New possible solution, x_prime: ',x_prime, objective_1(x_prime), objective_2(x_prime))

        # step 5: accept or reject neighbour based on metropolis acceptance rule

        # temporay new archive with current and neighbouring solution added
        A_tilda = archive.copy()
        A_tilda.append(x_prime)
        A_tilda.append(x)
        A_tilda = list(set(A_tilda))

        # determine how many canidate solutions dominate x and x_prime
        A_tilda_x = dominance(A_tilda,x,False)
        A_tilda_x_prime = dominance(A_tilda,x_prime,False)

        print('A_tilda: ',A_tilda)
        print('A_tilda_x dominance: ',A_tilda_x)
        print('A_tilda_x_prime dominance: ',A_tilda_x_prime)
        
        # calculate energy
        delta_E = (A_tilda_x_prime - A_tilda_x) / len(A_tilda)

        # calculate acceptance probability
        P_x = min([np.exp(-delta_E/T),1])

        # metropolis acceptance rule , if true we reject this new candidate solution x_prime and jump back to step 2
        if random.uniform(0,1) > P_x:
            iteration += 1
            d += 1
            print('Solution rejected!')
            continue

        # step 6 and 7: set new x and increment acceptance
        x = x_prime
        c += 1

        # step 8: diversification or not 
        if diversification == False: # add x to archive and check if its dominated
            # check for dominance of new solution in archive
            A_tilda_x = dominance(archive,x,False)

        elif diversification == 'kernel': # implement kernel diversification
            # kernel diversification
            f_1_x_new,f_2_x_new = kernel_diversification(archive,x,sigma)

            # check for dominance of new solution in archive
            A_tilda_x = dominance(archive,x,plot=False, compute_fitness=False, f_1_x_new=f_1_x_new,f_2_x_new=f_2_x_new)

        elif diversification == 'histogram': # implement histogram diversification

            # check with histogram grid x falls in
            locations_dict_temp, add_x = histogram_diversification(x,ax,f_1_quads_safe,f_2_quads_safe,locations_dict)

            if add_x == True: # x may be added as the grid is open for it
                # check for dominance of new solution in archive
                A_tilda_x = dominance(archive,x,plot=False)
            else:
                iteration += 1
                continue

        # if nothing dominate x then add x to archive and remove anything dominated by x 
        if A_tilda_x == 0:
            # append new solution to archive
            archive.append(x)

            # drop canidate solutions which are dominated by new solution
            archive = dominance_removal(archive,x,plot=False)

            print('Solution ',x, ' accepted and archived!')

            archive = list(set(archive))

            # if using histogram diversification, after removing dominated solutions recalculate which grids are populated
            if diversification == 'histogram':
                locations_dict = archive_grid_locations(archive,f_1_quads_safe,f_2_quads_safe)

        # step 9: increment iteration counter
        iteration += 1
    return archive,archive_size,temperature,epoch


## Run DMBOSA

The above functions define a test multi-objective optimization problem and an implementation of DBMOSA. Below DMBOSA is called to solve the multi-optimization problem.

In [3]:
# run algo
archive,archive_size,temperature,epoch = DBMOSA(c_max=10, d_max=10, Tmin=1, Tmax=100, diversification=False,sigma=None)


Iteration:  0  Epoch:  0 Current archive [0]
Current candidate solution, x: 0 0 4
New possible solution, x_prime:  1.0 1.0 1.0
A_tilda:  [0, 1.0]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0
Solution  1.0  accepted and archived!

Iteration:  1  Epoch:  0 Current archive [0, 1.0]
Current candidate solution, x: 1.0 1.0 1.0
New possible solution, x_prime:  0.15123 0.0228705129 3.4179505129
A_tilda:  [0, 1.0, 0.15123]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0
Solution  0.15123  accepted and archived!

Iteration:  2  Epoch:  0 Current archive [0, 1.0, 0.15123]
Current candidate solution, x: 0.15123 0.0228705129 3.4179505129
New possible solution, x_prime:  1.12781 1.2719553960999999 0.7607153961
A_tilda:  [0, 1.0, 0.15123, 1.12781]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0
Solution  1.12781  accepted and archived!

Iteration:  3  Epoch:  0 Current archive [0, 1.0, 0.15123, 1.12781]
Current candidate solution, x: 1.12781 1.2719553960999999 0.7607153961
New p

## Visualization of results objective space

In [4]:
fig,ax = plt.subplots(1,3,figsize=(16,5))


# plot true Pareto Front

# linear spread of x values between upper and lower limits, inclusive
x_vals = np.linspace(-1e5,1e5,num=1000000) 

# for each x_val compute objective function values and store results into lists
f_1 = []
f_2 = []
for x in x_vals:
    f_1.append(objective_1(x))
    f_2.append(objective_2(x))

# plot approximate Pareto Front
for i in archive:
        f_1_i = objective_1(i)
        f_2_i = objective_2(i)

        # plot to visualise dominance
        ax[0].plot(f_1_i,f_2_i,'rs')
ax[0].plot(f_1_i,f_2_i,'rs',label='Approximate Pareto Optimal Front')
ax[0].plot(f_1,f_2,'-',label='True Pareto Optimal Front')       
ax[0].set_title(f'Initial starting temperature {100}') #\nproprotional_to_epoch_inverse
ax[0].set_xlabel('function 1')
ax[0].set_ylabel('function 2')
ax[0].set_xlim([0,4])
ax[0].set_ylim([0,4])
ax[0].legend()

ax[1].plot(range(0,epoch),archive_size,'bo-')
ax[1].set_xlabel('Number of epochs')
ax[1].set_ylabel('Archive size')

ax[2].plot(range(0,epoch),temperature,'bs-')
ax[2].set_xlabel('Number of epochs')
ax[2].set_ylabel('Temperature')

plt.tight_layout()


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

## Visualization in decision space

In [5]:
# plot decision space
from mpl_toolkits import mplot3d

# find objective 1 and 2 values for each x value in the archive
df = pd.DataFrame(columns=['x','f_1_x','f_2_x'])
for x in archive:
    f_1_x = objective_1(x)
    f_2_x = objective_2(x)

    df = df.append({'x':x,'f_1_x':f_1_x,'f_2_x':f_2_x},ignore_index=True)

# plots results using 3d plot
fig = plt.figure()
ax = plt.axes(projection='3d')
scat_plot = ax.scatter(df['f_1_x'], df['f_2_x'],df['x'],  c=df['x'], cmap='viridis', linewidth=0.5)
ax.set_xlabel('f_1')
ax.set_ylabel('f_2')
ax.set_zlabel('x')
cb = plt.colorbar(scat_plot, pad=0.2)
cb.set_label('x values')
plt.tight_layout()
# ax.view_init(0, 0)


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

# Implementing varitions of DBMOSA to force convergence to fewer solutions, but solutions must be spread evenly across the true Pareto Optimal front.

These variation focus on more exclusive updates to the archive. Pareto Optimal solutions are only added to the archive if they meet the requiremnts set by the new methods;
- kernel method: if solution is too near neighbouring solutions do not include
- histogram method: if solution falls in grid block which is already populated, do not include new solution. Solutions are only included if the fall in an empty grid block.

## Kernel method

In [6]:
# run algo
archive,archive_size,temperature,epoch = DBMOSA(c_max=10, d_max=10, Tmin=1, Tmax=100, diversification='kernel',sigma=0.5)


Iteration:  0  Epoch:  0 Current archive [0]
Current candidate solution, x: 0 0 4
New possible solution, x_prime:  1.0 1.0 1.0
A_tilda:  [0, 1.0]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0
Solution  1.0  accepted and archived!

Iteration:  1  Epoch:  0 Current archive [0, 1.0]
Current candidate solution, x: 1.0 1.0 1.0
New possible solution, x_prime:  2.21295 4.897147702500001 0.04534770250000008
A_tilda:  [0, 1.0, 2.21295]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0
Solution  2.21295  accepted and archived!

Iteration:  2  Epoch:  0 Current archive [0, 1.0, 2.21295]
Current candidate solution, x: 2.21295 4.897147702500001 0.04534770250000008
New possible solution, x_prime:  0.11258 0.0126742564 3.5623542564000004
A_tilda:  [0, 1.0, 2.21295, 0.11258]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0

Iteration:  3  Epoch:  0 Current archive [0, 1.0, 2.21295]
Current candidate solution, x: 0.11258 0.0126742564 3.5623542564000004
New possible solution, x_prime: 

In [7]:
fig,ax = plt.subplots(1,3,figsize=(16,5))


# plot true Pareto Front

# linear spread of x values between upper and lower limits, inclusive
x_vals = np.linspace(-1e5,1e5,num=1000000) 

# for each x_val compute objective function values and store results into lists
f_1 = []
f_2 = []
for x in x_vals:
    f_1.append(objective_1(x))
    f_2.append(objective_2(x))

# plot approximate Pareto Front
for i in archive:
        f_1_i = objective_1(i)
        f_2_i = objective_2(i)

        # plot to visualise dominance
        ax[0].plot(f_1_i,f_2_i,'rs')
ax[0].plot(f_1_i,f_2_i,'rs',label='Approximate Pareto Optimal Front')
ax[0].plot(f_1,f_2,'-',label='True Pareto Optimal Front')       
ax[0].set_title(f'Initial starting temperature {100}') #\nproprotional_to_epoch_inverse
ax[0].set_xlabel('function 1')
ax[0].set_ylabel('function 2')
ax[0].set_xlim([0,4])
ax[0].set_ylim([0,4])
ax[0].legend()

ax[1].plot(range(0,epoch),archive_size,'bo-')
ax[1].set_xlabel('Number of epochs')
ax[1].set_ylabel('Archive size')

ax[2].plot(range(0,epoch),temperature,'bs-')
ax[2].set_xlabel('Number of epochs')
ax[2].set_ylabel('Temperature')

plt.tight_layout()


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …

# Histogram methods

In [8]:
archive,archive_size,temperature,epoch = DBMOSA(c_max=10, d_max=10, Tmin=1, Tmax=100, diversification='histogram',sigma=0.25,grid_step_size=0.5)


Iteration:  0  Epoch:  0 Current archive [0]
Current candidate solution, x: 0 0 4
New possible solution, x_prime:  1.0 1.0 1.0
A_tilda:  [0, 1.0]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0
Solution  1.0  accepted and archived!

Iteration:  1  Epoch:  0 Current archive [0, 1.0]
Current candidate solution, x: 1.0 1.0 1.0
New possible solution, x_prime:  1.34462 1.8080029443999999 0.4295229444000001
A_tilda:  [0, 1.0, 1.34462]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  0
Solution  1.34462  accepted and archived!

Iteration:  2  Epoch:  0 Current archive [0, 1.0, 1.34462]
Current candidate solution, x: 1.34462 1.8080029443999999 0.4295229444000001
New possible solution, x_prime:  -0.13924 0.0193877776 4.5763477776000006
A_tilda:  [0, 1.0, -0.13924, 1.34462]
A_tilda_x dominance:  0
A_tilda_x_prime dominance:  1

Iteration:  3  Epoch:  0 Current archive [0, 1.0, 1.34462]
Current candidate solution, x: -0.13924 0.0193877776 4.5763477776000006
New possible solution, x_prim

In [9]:
fig,ax = plt.subplots(1,3,figsize=(16,5))


# plot true Pareto Front

# linear spread of x values between upper and lower limits, inclusive
x_vals = np.linspace(-1e5,1e5,num=1000000) 

# for each x_val compute objective function values and store results into lists
f_1 = []
f_2 = []
for x in x_vals:
    f_1.append(objective_1(x))
    f_2.append(objective_2(x))

# plot approximate Pareto Front
for i in archive:
        f_1_i = objective_1(i)
        f_2_i = objective_2(i)

        # plot to visualise dominance
        ax[0].plot(f_1_i,f_2_i,'rs')
ax[0].plot(f_1_i,f_2_i,'rs',label='Approximate Pareto Optimal Front')
ax[0].plot(f_1,f_2,'-',label='True Pareto Optimal Front')       
ax[0].set_title(f'Initial starting temperature {100}') #\nproprotional_to_epoch_inverse
ax[0].set_xlabel('function 1')
ax[0].set_ylabel('function 2')
ax[0].set_xlim([0,4])
ax[0].set_ylim([0,4])
ax[0].legend()

f_1_quads_safe = list(np.arange(0,10,0.5))
f_2_quads_safe = list(np.arange(0,10,0.5))

for i in range(len(f_1_quads_safe)):
    ax[0].plot([f_1_quads_safe[i],f_1_quads_safe[i]],[0,10],'b-')
    ax[0].plot([0,10],[f_2_quads_safe[i],f_2_quads_safe[i]],'b-') 


ax[1].plot(range(0,epoch),archive_size,'bo-')
ax[1].set_xlabel('Number of epochs')
ax[1].set_ylabel('Archive size')

ax[2].plot(range(0,epoch),temperature,'bs-')
ax[2].set_xlabel('Number of epochs')
ax[2].set_ylabel('Temperature')


plt.tight_layout()


Canvas(toolbar=Toolbar(toolitems=[('Home', 'Reset original view', 'home', 'home'), ('Back', 'Back to previous …