In [108]:
import pickle
import time
from rich.progress import track
import numpy as np
import math
from scipy.special import binom, perm
from itertools import combinations, permutations, product
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import copy

import random
from deap import base, creator, tools

from olympus.surfaces import CatCamel, CatMichalewicz, CatSlope, CatDejong # cat
from olympus.surfaces import Branin, Michalewicz, Levy, Dejong # cont

In [73]:
#------------------
# HELPER FUNCTIONS
#------------------
def stirling_sum(Ns):
    """ ...
    """
    stirling = lambda n,k: int(1./math.factorial(k) * np.sum([(-1.)**i * binom(k,i)*(k-i)**n for i in range(k)]))
    return np.sum([stirling(Ns, k) for k in range(Ns+1)])

def partition(S):
    """ ...
    """
    if len(S) == 1:
        yield [S]
        return 

    first = S[0]
    for smaller in partition(S[1:]):
        for n, subset in enumerate(smaller):
            yield smaller[:n]+[[first] + subset]+smaller[n+1:]
        yield [[first]]+smaller 
    
def gen_partitions(S):
    """
    generate all possible partitions of Ns-element set S
    
    Args: 
        S (list): list of non-functional parameters S
    """
    return [p for _, p in enumerate(partition(S),1)]


def gen_permutations(X_funcs, Ng):
    """ generate all possible functional parameter permutations
    given number of non-functional parameter subsets Ng
    
    Args: 
        X_funcs (np.ndarray): numpy array with all functional 
            possile functional parameters
        Ng (int): number of non-functional parameter subsets
        
    Returns
        (np.ndarray): array of parameter permutations of
            shape (# perms, Ng, # params)
    """
    
    return np.array(list(permutations(X_funcs, Ng)))

def measure_objective(xgs, G, surf_map):
    """ ... 
    """
    f_x = 0.
    for g_ix, Sg in enumerate(G):
        f_xg = 0.
        for si in Sg:
            f_xg += measure_single_obj(xgs[g_ix], si, surf_map)
        f_x += f_xg / len(Sg)

    return f_x


def record_merits(S, surf_map, X_func_truncate=20):
    
    # list of dictionaries to store G, X_func, f_x
    f_xs = [] 
    
    start_time = time.time()
    
    # generate all the partitions of non-functional parameters
    Gs = gen_partitions(S)
    print('total non-functional partitions : ', len(Gs))
    
    # generate all the possible values of functional parametres
    # assume all surfs have same # options, params
    param_opts = [f'x{i}' for i in range(surf_map[0].num_opts)] 
    params_opts = [param_opts for _ in range(surf_map[0].param_dim)]
    cart_product = list(product(*params_opts))
    X_funcs = np.array([list(elem) for elem in cart_product])
    
    if isinstance(X_func_truncate,int):
        X_funcs = X_funcs[:X_func_truncate, :]
    print('cardnality of functional params : ', X_funcs.shape[0])
    
    for G_ix, G in enumerate(Gs): 
        Ng = len(G)
        # generate permutations of functional params
        X_func_perms = gen_permutations(X_funcs, Ng)
        if G_ix % 1 == 0:
            print(f'[INFO] Evaluating partition {G_ix+1}/{len(Gs)+1} with len {Ng} ({len(X_func_perms)} perms)')
        for X_func in track(X_func_perms, description='Evaluating permutations...'):
            # measure objective 
            f_x = measure_objective(X_func, G, surf_map)
            # store values
            f_xs.append({'G': G, 'X_func': X_func,'f_x': f_x,})
    total_time = round(time.time()-start_time,2)
    print(f'[INFO] Done in {total_time} s')
    
    return f_xs

def measure_single_obj(X_func, si, surf_map):
    return surf_map[si].run(X_func)[0][0]

In [57]:
#-------------
# TOY PROBLEM
#-------------
#S = [0, 1, 2, 3] # four non-functional parameter options 
S = [0, 1, 2, 3]
surf_map_cat = {
    0: CatCamel(param_dim=2, num_opts=12),
    1: CatDejong(param_dim=2, num_opts=12),
    2: CatMichalewicz(param_dim=2, num_opts=12),
    3: CatSlope(param_dim=2, num_opts=12),
}
surf_map_cont = {
    0: Branin(),
    1: Dejong(),
    2: Michalewicz(),
    3: Levy(),
}

In [None]:
def measure_single_obj(X_func, si, surf_map):
    return surf_map[si].run(X_func)[0][0]

In [None]:
class GeneticGeneralOptimizer():
    def __init__(self, param_space, S, max_Ng):
        self.param_space = param_space
        self.S = S
        self.max_Ng = max_Ng
        
        self.CXPB = 0.5
        self.MUTB = 0.4
    

    def _optimize(self, max_iter=10, show_progress=True):
        
        return None
    
    @staticmethod
    def _evolution(population, toolbox, halloffame, cxpb=0.5, mutpb=0.3):
        
        return None

In [None]:
f_xs = record_merits(S, surf_map, X_func_truncate=20)

In [4]:
gen_partitions(S)

[[[0, 1, 2]], [[0], [1, 2]], [[0, 1], [2]], [[1], [0, 2]], [[0], [1], [2]]]

In [66]:
# minimizing fitness has negative weights, maximizing fitness has positive
creator.create('FitnessMin', base.Fitness, weights=(-1.0,)) 

# create individual which is a simple list of floats
creator.create('Individual', list, fitness=creator.FitnessMin) 

IND_SIZE=10 # size of 
NGEN = 10

toolbox = base.Toolbox()
toolbox.register("attribute", random.random)
toolbox.register("individual", tools.initRepeat, creator.Individual,
                 toolbox.attribute, n=IND_SIZE)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

In [67]:


toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=1, indpb=0.1)
toolbox.register("select", tools.selTournament, tournsize=3)
toolbox.register("evaluate", evaluate)

In [68]:
def main():
    pop = toolbox.population(n=50)
    CXPB, MUTPB, NGEN = 0.5, 0.2, 10

    # Evaluate the entire population
    fitnesses = map(toolbox.evaluate, pop)
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit

    for g in range(NGEN):
        print(f'Commencing GEN {g}...')
        # Select the next generation individuals
        offspring = toolbox.select(pop, len(pop))
        # Clone the selected individuals
        offspring = list(map(toolbox.clone, offspring))


        # Apply crossover and mutation on the offspring
        
        for child1, child2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < CXPB:
                toolbox.mate(child1, child2)
                del child1.fitness.values
                del child2.fitness.values

        for mutant in offspring:
            if random.random() < MUTPB:
                toolbox.mutate(mutant)
                del mutant.fitness.values

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # The population is entirely replaced by the offspring
        pop[:] = offspring
        
        
    return pop


In [None]:
pop = main()

In [None]:
param_opts = [f'x{i}' for i in range(21)]
params_opts = [param_opts, param_opts]
cart_product = list(product(*params_opts))
cart_product = np.array([list(elem) for elem in cart_product])
cart_product.shape

(441, 2)

['x0',
 'x1',
 'x2',
 'x3',
 'x4',
 'x5',
 'x6',
 'x7',
 'x8',
 'x9',
 'x10',
 'x11',
 'x12',
 'x13',
 'x14',
 'x15',
 'x16',
 'x17',
 'x18',
 'x19',
 'x20']

In [171]:
def evaluate(individual):
    G = individual['G']
    X_func = individual['X_func']
    
    f_x = 0.
    for g_ix, Sg in enumerate(G):
        f_xg = 0.
        for si in Sg:
            f_xg += measure_single_obj(X_func[g_ix], si, surf_map_cont)
        f_x += f_xg / len(Sg)

    return f_x,
    
    
    return sum(individual),

def init_population(num_inds=10):
    population = []
    for ind in range(num_inds):
        # generate G
        S = [0, 1, 2, 3]
        random.shuffle(S)
        max_Ng = 3
        Ng = np.random.randint(1,max_Ng+1)
        G = [sorted(S[i::Ng]) for i in range(Ng)]
        # generate X_func
        # i.e. sample Ng parameters from param_space
        param_dim = 2
        X_func = []
        for _ in range(Ng):
            X_func.append(list(np.random.uniform(0,1,size=(param_dim,))))
        # create individual
        ind = creator.Individual({'G':G, 'X_func':X_func})
        population.append(ind)
    return population

def custom_mutate_G(ind):
    
    print('ind : ', ind)
    
    # determine whether we are making a mutation to G
    mutation_types = ['fusion'] #'fusion', 'split']
    mutated = False
    while not mutated and len(mutation_types)>0:
        mutation_type = np.random.choice(mutation_types)

        if mutation_type == 'swap':
            # swap one general param from one subset to another
            if not len(ind['G'])>1:
                mutation_types.remove(mutation_type)
            else:
                mut_locs = np.random.choice(
                    np.arange(len(ind['G'])), size=(2,), replace=False,
                )
                mut_loc_1, mut_loc_2 = mut_locs[0], mut_locs[1]
                mut_ix_1 = np.random.randint(len(ind['G'][mut_loc_1]))
                mut_ix_2 = np.random.randint(len(ind['G'][mut_loc_2]))
                
                print(mut_loc_1, mut_loc_2)
                print(mut_ix_1, mut_ix_2)
                
                mut_val_1 = copy.deepcopy(ind['G'][mut_loc_1][mut_ix_1])
                mut_val_2 = copy.deepcopy(ind['G'][mut_loc_2][mut_ix_2])
                
                print(mut_val_1, mut_val_2)
                
                # make swap
                ind['G'][mut_loc_1][mut_ix_1] = mut_val_2
                ind['G'][mut_loc_2][mut_ix_2] = mut_val_1
                
                print('new ind : ', ind)

                mutated = True

        elif mutation_type == 'fusion':
            if not len(ind)>
            mutated=True
        elif mutation_type == 'split':
            mutated=True
        else:
            pass

    return ind
        

In [172]:
creator.create('FitnessMin', base.Fitness, weights=(-1.0,)) 
creator.create('Individual', dict, fitness=creator.FitnessMin)

toolbox = base.Toolbox()
toolbox.register('population', init_population)
toolbox.register('evaluate', evaluate)

# mutation/selection opertations operations
toolbox.register("mutate_X_func", tools.mutGaussian, mu=0, sigma=1, indpb=0.1)
toolbox.register("select", tools.selTournament, tournsize=3)

toolbox.register('mutate_G', custom_mutate_G)

In [173]:
pop = toolbox.population(num_inds=5)

In [174]:
fitnesses = list(map(toolbox.evaluate, pop))
for ind, fit in zip(pop, fitnesses):
    ind.fitness.values = fit

In [175]:
def main():
    pop = toolbox.population(num_inds=3)
    NGEN = 10
    mutate_G_pb = 0.7
    
    print('\n')
    for ind in pop:
        print(ind)
    print('\n')

    # Evaluate the entire population
    fitnesses = map(toolbox.evaluate, pop)
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit

    for g in range(NGEN):
        print(f'Commencing GEN {g}...')
        # Select the next generation individuals
        offspring = toolbox.select(pop, len(pop))
        # Clone the selected individuals
        offspring = list(map(toolbox.clone, offspring))
        
        for mutant in offspring:
            # mutation on G
            if random.random() < mutate_G_pb:
                toolbox.mutate_G(mutant)
                del mutant.fitness.values
        
        print('\n')
        for ind in pop:
            print(ind)
        print('\n')

        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = map(toolbox.evaluate, invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # The population is entirely replaced by the offspring
        pop[:] = offspring
        
        
    return pop

In [176]:
pop = main()



{'G': [[0, 2], [1], [3]], 'X_func': [[0.9921930802515037, 0.11506324754360275], [0.029861735748283436, 0.5416249553574167], [0.4670130376446544, 0.6593044506858373]]}
{'G': [[0, 1, 2, 3]], 'X_func': [[0.6135719682821698, 0.593069306655373]]}
{'G': [[0, 1, 2, 3]], 'X_func': [[0.48130054179634774, 0.9219373148170217]]}


Commencing GEN 0...
ind :  {'G': [[0, 1, 2, 3]], 'X_func': [[0.6135719682821698, 0.593069306655373]]}
ind :  {'G': [[0, 2], [1], [3]], 'X_func': [[0.9921930802515037, 0.11506324754360275], [0.029861735748283436, 0.5416249553574167], [0.4670130376446544, 0.6593044506858373]]}
1 0
0 1
1 2
new ind :  {'G': [[0, 1], [2], [3]], 'X_func': [[0.9921930802515037, 0.11506324754360275], [0.029861735748283436, 0.5416249553574167], [0.4670130376446544, 0.6593044506858373]]}


{'G': [[0, 2], [1], [3]], 'X_func': [[0.9921930802515037, 0.11506324754360275], [0.029861735748283436, 0.5416249553574167], [0.4670130376446544, 0.6593044506858373]]}
{'G': [[0, 1, 2, 3]], 'X_func': [[0.613571