In [None]:
import numpy as np
from multitools import gamma_GC
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
from itertools import islice, combinations
from numpy import random
from scipy.spatial.distance import jensenshannon
rng = random.default_rng(seed = 1123)

import itertools as it
import pandas as pd

Generate items:

In [None]:
n_obj = 3
n_con = 1  # to simplify my coding i 
r = np.array([      
    [1.0, 0.4, -0.5, 0.3],
    [0.4, 1.0, 0.5, 0.4],
    [-0.5, 0.5, 1.0, 0.2],
    [0.3, 0.4, 0.2, 1.0],
])
shape, scale = [3.0, 4.0, 2.0, 8.0], [2.0, 3, 2,1.0]

In [None]:
n = 20
items = gamma_GC(r, n, shape, scale)

In [None]:
def cleanupsamples(samples,nobj,precision=1): 
    samples = np.round(samples, precision)
    c,i = np.unique(samples[:, :nobj], axis=0, return_index=True)
    newsamples = samples[i,:] #note - these have been sorted into increasing magnitude. 
    if precision==0:
        newsamples = np.array(newsamples, dtype='int8')
         
    return newsamples

items = cleanupsamples(items, n_obj, precision=0)

In [None]:
items

Test:

In [None]:
n_items = 20
df = pd.read_csv(f"data/card_game_test_data/test_data_n20_exp5_clean.csv")

items_df_visual = pd.DataFrame()
for trial in range(len(df)):
    trial_data = []
    for item in range(1, n_items+1): 
        row = {
            'Science': df[f'value1_item_{item}'][trial],
            'Culture': df[f'value2_item_{item}'][trial],
            'Govern': df[f'value3_item_{item}'][trial],
            'Money': df[f'weight1_item_{item}'][trial],
            # 'Space': df[f'weight2_item_{item}'][trial],
            # 'capacity1': df['capacity1'][trial],
            # 'capacity2': df['capacity2'][trial],
        }
        trial_data.append(row)
    items_df_visual = pd.concat([items_df_visual, pd.DataFrame(trial_data)], ignore_index=True) ## create a new sequential index

test_trial = items_df_visual.iloc[:n_items,:]

In [None]:
test_trial

In [None]:
items = test_trial.values

In [None]:
items

EDA:

In [None]:
def get_objectives(samples, indices, nobj):
    objectives = np.zeros((indices.shape[0], nobj), dtype='int16')
    for j in range(indices.shape[0]):
        objectives[j,:] = np.sum(samples[indices[j], :nobj], axis=0, dtype='int16')
    return objectives

def get_constraints(samples, indices, nobj, ncon):
    constraints = np.zeros((indices.shape[0],ncon),dtype='int16')
    for j in range(indices.shape[0]):
        constraints[j,:] = np.sum(samples[indices[j],nobj:],axis=0,dtype='int16')
    constraints = np.squeeze(constraints)
    return constraints

In [None]:
def non_dominated_sort(objectives):
    n_solutions = objectives.shape[0]
    dominated_sets = [[] for _ in range(n_solutions)] # the set of solutions that p dominates
    domination_counts = [0] * n_solutions # the number of solutions that dominate p
    ranks = np.full(n_solutions, -1, dtype=int)           
    fronts = [[]]  

    for p in range(n_solutions):
        for q in range(n_solutions):
            if p == q:
                continue

            if np.all(objectives[p, :] >= objectives[q, :]) and \
                np.any(objectives[p, :] > objectives[q, :]):
                dominated_sets[p].append(q)
            elif np.all(objectives[q, :] >= objectives[p, :]) and \
                np.any(objectives[q, :] > objectives[p, :]):
                domination_counts[p] += 1 
        
        if domination_counts[p] == 0:
            ranks[p] = 0
            fronts[0].append(p)
    
    
    i = 0
    while i < len(fronts) and len(fronts[i]) > 0:
        next_front = []
        for p in fronts[i]:
            for q in dominated_sets[p]: 
                domination_counts[q] -= 1
                if domination_counts[q] == 0:
                    ranks[q] = i + 1
                    next_front.append(q)
        
        if next_front:
            fronts.append(next_front)
        i += 1

    dominated_sets = np.array([np.array(s, dtype=int) for s in dominated_sets], dtype=object)
    domination_counts = np.array(domination_counts, dtype=int)
    ranks = np.array(ranks, dtype=int)
    fronts = np.array([np.array(f, dtype=int) for f in fronts if len(f) > 0], dtype=object)

    return  dominated_sets, domination_counts, ranks, fronts


In [None]:
def assign_crowding_distance(objectives):
    distances = np.zeros(objectives.shape[0],  dtype=float)
    for m in range(np.shape(objectives)[1]):
        objective = objectives[:, m]
        sort_indices = np.argsort(objective)[::-1]
        sorted_objective = objective[sort_indices]
        min = sorted_objective[0]
        max = sorted_objective[-1]
        distances[sort_indices[0]] = np.inf
        distances[sort_indices[-1]] = np.inf
        for i in range(1, np.shape(objectives)[0] - 1):
            distances[sort_indices[i]] += (sorted_objective[i + 1] - sorted_objective[i - 1]) \
            / (max - min)
    return distances

In [None]:
def binary_tournament_selection(population, ranks, distances):
    indices = np.arange(len(population))
    i, j = rng.choice(indices, size=2, replace=False)
    if ranks[i] < ranks[j]:
        return i
    if ranks[j] < ranks[i]:
        return j
    else:
        if distances[i] > distances[j]:
            return i
        else:
            return j

In [None]:
def samplePopulation(samples, distribution, pop_size, n_selected, capacity):
    pop_count = 0
    population = np.zeros((pop_size, n_selected), dtype='int8')
    n_items = distribution.size

    while pop_count < pop_size:
        knapsack = rng.choice(n_items, n_selected, p=distribution, replace=False)
        constraint = np.sum(samples[knapsack, -1])
        if (constraint <= capacity):
            population[pop_count, :] = knapsack
            pop_count += 1

    return population

In [None]:
def generate_initial_population(items, capacity, n_selected, pop_size, n_obj):
    """
    Generate initial population and select population based on tournament selection.
    """
    n_items = items.shape[0]
    
    distribution = np.ones(n_items) / n_items
    population = samplePopulation(items, distribution, pop_size, n_selected, capacity)
    objectives = get_objectives(items, population, n_obj)

    _, _, ranks, fronts = non_dominated_sort(objectives)
    distances_all_solutions = np.zeros(population.shape[0], dtype=float)
    for f in fronts:
        distances = assign_crowding_distance(objectives[f, :])
        distances_all_solutions[f] = distances
    
    select_indices = np.array([], dtype=int)
    while len(select_indices) < pop_size:
        indice = binary_tournament_selection(population, ranks, distances_all_solutions)
        select_indices = np.concatenate([select_indices, np.array([indice])])
    
    selected_population = population[select_indices]
    selected_objectives = objectives[select_indices]
    
    distribution = np.ones(n_items)
    distribution += np.bincount(selected_population.flatten(), minlength=n_items)
    distribution /= np.sum(distribution)
    
    return distribution, selected_population, selected_objectives

In [None]:
def update_distribution(distribution, items, selected_population, selected_objectives, 
                        capacity, n_selected, pop_size, n_obj):
    """
    Update distribution, output current pareto front, and check convergence in a generation. 
    """
    population = samplePopulation(items, distribution, pop_size, n_selected, capacity)
    objectives = get_objectives(items, population, n_obj)

    # find current pareto front
    _, _, _, fronts_current = non_dominated_sort(objectives)
    pareto_indices = population[fronts_current[0]]

    objectives = np.vstack((selected_objectives, objectives))
    population = np.vstack((selected_population, population))
    
    _, _, ranks, fronts = non_dominated_sort(objectives)
    select_indices = np.array([], dtype=int)
    for f in fronts:
        if len(select_indices) + len(f) <= pop_size:
            select_indices = np.concatenate([select_indices, f])
        else:
            remaining_size = pop_size - len(select_indices)
            f_distance = assign_crowding_distance(objectives[f, :])
            sort_indices = np.argsort(f_distance)[::-1]
            remaining = f[sort_indices[:remaining_size]]
            select_indices = np.concatenate([select_indices, remaining])
            break
    
    selected_population = population[select_indices]
    selected_objectives = objectives[select_indices]
    
    n_items = items.shape[0]
    updated_distribution = np.ones(n_items)
    updated_distribution += np.bincount(selected_population.flatten(), minlength=n_items)
    updated_distribution /= np.sum(updated_distribution)

    # pareto_indices = population[fronts[0]]
    distribution[distribution < 1E-08] = 1E-08
    updated_distribution[updated_distribution < 1E-08] = 1E-08
    js_div = jensenshannon(distribution, updated_distribution)**2
    
    return updated_distribution, selected_population, selected_objectives, pareto_indices, js_div

In [None]:
def knapsack_eda(items, capacity, n_selected, n_obj, pop_size=1000, generations=100):
    """
    Main EDA function for knapsack problem.
    """
    n_items = items.shape[0]
    distribution = np.ones(n_items) / n_items
    distribution_table = []
    pareto_indices_table = []
    pareto_front_table = []
    js_div_list = []
    
    distribution, selected_population, selected_objectives = generate_initial_population(
        items, capacity, n_selected, pop_size, n_obj
    )

    for g in range(generations):
        distribution, selected_population, selected_objectives, pareto_indices, js_div = update_distribution(
            distribution, items, selected_population, selected_objectives,
            capacity, n_selected, pop_size, n_obj
        )
        
        pareto_front = np.zeros((pareto_indices.shape[0], items.shape[1]))
        for k in range(pareto_indices.shape[0]):
            pareto_front[k, :] = np.sum(items[pareto_indices[k, :], :], axis=0)
        
        distribution_table.append(distribution.copy())
        pareto_indices_table.append(pareto_indices.copy())
        pareto_front_table.append(pareto_front.copy())
        js_div_list.append(js_div)
    
    return distribution_table, pareto_indices_table, pareto_front_table, js_div_list  

In [None]:
n_items = items.shape[0]
n_selected  = 5
# capacity = int(shape[-1]*scale[-1]*n_selected)
capacity = 24
n_obj = 3
pop_size = 1000 #default
generations = 10 #default 
distribution_table, pareto_indices_table, pareto_front_table, js_div_list = knapsack_eda(items, capacity, n_selected, n_obj, pop_size=pop_size, generations=generations)

In [None]:
np.unique(pareto_front_table[-1], axis=0)

In [None]:
plt.plot(np.arange(1,generations+1,1),js_div_list)
plt.xlabel('Generations')
plt.ylabel('Jansen-Shannon Divergence')
plt.yscale('log')
plt.title('Jansen-Shannen Divergence between succeessive generations')
plt.show()