In [1]:
import numpy as np

from numpy.random import default_rng
from numba import njit
from typing import Set, Tuple

In [2]:
def generate_problem(num_groups: int,
                     num_classes: int,
                     min_group_size: int,
                     max_group_size: int,
                     class_percent: np.array) -> np.ndarray:

    problem = np.zeros((num_groups, num_classes), dtype=int)

    rng = default_rng()
    group_sizes = rng.integers(low=min_group_size,
                               high=max_group_size,
                               size=num_groups)

    for i in range(num_groups):
        # Calculate the
        proportions = np.random.normal(class_percent, class_percent / 10)

        problem[i, :] = proportions * group_sizes[i]
    return problem

In [3]:
@njit
def calculate_cost(problem: np.ndarray,
                   solution: np.ndarray,
                   k: int) -> float:
    cost = 0.0
    total = np.sum(problem)
    class_sums = np.sum(problem, axis=0)
    num_classes = problem.shape[1]

    for i in range(k):
        idx = solution == i
        fold_sum = np.sum(problem[idx, :])

        if total > 0.0 and fold_sum > 0.0:
            # Start by calculating the fold imbalance cost
            cost += (fold_sum / total - 1.0 / k) ** 2

            # Now calculate the cost associated with the class imbalances
            for j in range(num_classes):
                cost += (np.sum(problem[idx, j]) / fold_sum - class_sums[j] / total) ** 2
        else:
            cost += 1.0
    return cost

In [4]:
@njit
def generate_search_space(problem: np.ndarray,
                          solution: np.ndarray,
                          k: int) -> np.ndarray:
    num_groups = problem.shape[0]

    space = np.zeros((num_groups, k))
    sol = solution.copy()

    for i in range(num_groups):
        for j in range(k):
            if solution[i] == j:
                space[i,j] = np.infty
            else:
                sol[i] = j
                space[i,j] = calculate_cost(problem, sol, k)
        sol[i] = solution[i]
    return space

In [5]:
@njit
def solution_to_str(solution: np.ndarray) -> str:
    return "".join([str(n) for n in solution])

In [6]:
def select_move(decision: np.ndarray,
                solution: np.ndarray,
                history: Set) -> Tuple:
    candidates = np.argsort(decision, axis=None)

    for c in candidates:
        grp, cls = np.unravel_index(c, decision.shape)
        s = solution.copy()
        s[grp] = cls
        sol_str = solution_to_str(s)

        if sol_str not in history:
            return grp, cls
    return -1, -1 # No move found!

In [7]:
prb = generate_problem(num_groups=500,
                       num_classes=4,
                       min_group_size=400,
                       max_group_size=2000,
                       class_percent=np.array([0.4, 0.3, 0.2, 0.1]))

In [8]:
def generate_initial_solution(problem: np.ndarray,
                              k: int,
                              algo: str="k-bound") -> np.ndarray:
    num_groups = problem.shape[0]
    if algo == "k-bound":
        rng = default_rng()
        total = np.sum(problem)
        indices = rng.permutation(problem.shape[0])

        solution = np.zeros(num_groups, dtype=int)
        c = 0
        fold_total = 0
        for i in indices:
            group = np.sum(problem[i, :])
            if fold_total + group < total / k:
                fold_total += group
            else:
                c = (c + 1) % k
                fold_total = group
            solution[i] = c
    elif algo == "random":
        rng = default_rng()
        solution = rng.integers(low=0, high=k, size=num_groups)
    elif algo == "zeros":
        solution = np.zeros(num_groups, dtype=int)
    else:
        raise Exception("Invalid algorithm name")
    return solution

In [9]:
def solve(problem: np.ndarray,
          k=5,
          min_cost=1e-5,
          max_retry=100,
          verbose=False) -> np.ndarray:
    hist = set()
    retry = 0

    solution = generate_initial_solution(problem, k)
    incumbent = solution.copy()
    low_cost = calculate_cost(problem, solution, k)
    cost = 1.0
    while retry < max_retry and cost > min_cost:
        decision = generate_search_space(problem, solution, k=5)
        grp, cls = select_move(decision, solution, hist)

        if grp != -1:
            solution[grp] = cls
            cost = calculate_cost(problem, solution, k=5)
            if cost < low_cost:
                low_cost = cost
                incumbent = solution.copy()
                retry = 0
                if verbose:
                    print(cost)
            else:
                retry += 1
            hist.add(solution_to_str(solution))
    return incumbent

In [10]:
solution = solve(prb, min_cost=1e-5, k=5, verbose=True)
solution

8.028158540955434e-05
6.502376139707922e-05
5.464347782944837e-05
4.301962138717457e-05
3.6589581706198305e-05
2.7849518341838307e-05
2.610685042347762e-05
2.3610338623673816e-05
2.008385337007756e-05
1.6837909344540007e-05
1.498558025272575e-05
1.3517929416647237e-05
1.2074721967291692e-05
1.1421630300944058e-05
1.0721054483775257e-05
9.524765715259523e-06


array([4, 0, 4, 2, 2, 1, 0, 4, 2, 1, 1, 3, 3, 0, 0, 0, 3, 1, 3, 2, 4, 2,
       4, 0, 3, 1, 4, 2, 0, 3, 4, 1, 2, 2, 3, 4, 0, 3, 0, 0, 4, 3, 4, 4,
       0, 4, 3, 0, 3, 3, 3, 2, 2, 2, 3, 1, 3, 3, 0, 0, 4, 4, 2, 1, 2, 2,
       4, 2, 1, 4, 3, 2, 4, 3, 3, 1, 3, 0, 1, 1, 4, 2, 3, 1, 0, 0, 1, 3,
       2, 4, 0, 1, 2, 4, 4, 2, 4, 1, 4, 3, 3, 1, 1, 2, 3, 1, 2, 2, 2, 3,
       1, 0, 2, 3, 2, 3, 4, 0, 2, 1, 4, 1, 0, 2, 0, 2, 1, 3, 4, 3, 4, 1,
       3, 3, 1, 4, 2, 0, 2, 2, 0, 1, 3, 4, 2, 3, 0, 1, 2, 0, 2, 1, 4, 0,
       4, 3, 2, 3, 4, 4, 0, 4, 0, 3, 3, 2, 1, 4, 3, 1, 1, 0, 2, 1, 2, 3,
       2, 0, 1, 2, 0, 0, 0, 0, 0, 1, 3, 2, 3, 1, 1, 0, 3, 1, 4, 2, 0, 3,
       1, 2, 0, 4, 4, 2, 0, 1, 2, 4, 1, 1, 1, 2, 1, 0, 4, 0, 4, 1, 3, 4,
       3, 4, 2, 4, 3, 4, 0, 0, 0, 0, 3, 0, 4, 0, 2, 1, 0, 3, 1, 0, 1, 4,
       3, 3, 0, 2, 3, 3, 4, 1, 1, 3, 0, 0, 1, 0, 0, 4, 4, 4, 2, 1, 2, 1,
       2, 0, 1, 2, 1, 2, 0, 3, 3, 0, 1, 3, 0, 1, 3, 2, 2, 3, 1, 0, 4, 2,
       2, 1, 2, 1, 4, 1, 1, 4, 2, 3, 0, 0, 1, 1, 2,

In [11]:
np.sum(prb, axis=0) / np.sum(prb)

array([0.40122698, 0.29987707, 0.19984937, 0.09904658])

In [12]:
folds = [prb[solution==i] for i in range(5)]
fold_percents = np.array([np.sum(folds[i], axis=0) / np.sum(folds[i]) for i in range(5)])
fold_percents

array([[0.40213076, 0.29938555, 0.19929537, 0.09918832],
       [0.40074922, 0.30057671, 0.19912732, 0.09954675],
       [0.4010002 , 0.29856801, 0.20065612, 0.09977567],
       [0.40156828, 0.30083997, 0.19946032, 0.09813143],
       [0.40068528, 0.30002025, 0.20070553, 0.09858893]])

In [13]:
[np.sum(folds[i]) / np.sum(prb) for i in range(5)]

[0.20034752594623986,
 0.19969401505608037,
 0.20023438581796935,
 0.19963153468673694,
 0.20009253849297348]

In [14]:
# import pandas as pd

In [15]:
# df = pd.DataFrame(data=prb, columns=['Class 0', 'Class 1', 'Class 2', 'Class 3'])
# df['Solution'] = solution

In [16]:
# df

In [17]:
# df.to_clipboard(excel=True)

In [18]:
# decision_df = pd.DataFrame(data=generate_search_space(prb, solution, k=5), columns=['Fold 0', 'Fold 1', 'Fold 2', 'Fold 3', 'Fold 4'])
# decision_df

In [19]:
# decision_df.to_clipboard(excel=True)