In [19]:
# !python3.12 -m pip install osqp==1.0.0b1

In [20]:
pip freeze

aiofiles==23.2.1
annotated-types==0.6.0
anyio==4.3.0
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asttokens==2.4.1
async-lru==2.0.4
attrs==23.2.0
Babel==2.15.0
bash_kernel==0.9.3
beautifulsoup4==4.12.3
bleach==6.1.0
certifi==2024.2.2
cffi==1.16.0
charset-normalizer==3.3.2
clarabel==0.7.1
click==8.1.7
comm==0.2.2
cvxpy==1.5.1
debugpy==1.8.1
decorator==5.1.1
defusedxml==0.7.1
ecos==2.0.13
executing==2.0.1
fastapi==0.110.0
fastjsonschema==2.19.1
fqdn==1.5.1
h11==0.14.0
httpcore==1.0.5
httptools==0.6.1
httpx==0.27.0
idna==3.6
ipykernel==6.29.4
ipython==8.24.0
ipywidgets==8.1.2
isoduration==20.11.0
jedi==0.19.1
Jinja2==3.1.4
json5==0.9.25
jsonpointer==2.4
jsonschema==4.22.0
jsonschema-specifications==2023.12.1
jupyter==1.0.0
jupyter-console==6.6.3
jupyter-events==0.10.0
jupyter-lsp==2.2.5
jupyter_client==8.6.1
jupyter_core==5.7.2
jupyter_server==2.14.0
jupyter_server_terminals==0.5.3
jupyterlab==4.1.8
jupyterlab_pygments==0.3.0
jupyterlab_server==2.27.1
jupyterlab_widgets==

In [22]:
from scipy.special import stirling2
import bf
import scipy
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse import coo_matrix
import sys
import os
from copy import deepcopy
import matplotlib.pyplot as plt

np.set_printoptions(threshold=sys.maxsize)

ImportError: cannot import name 'stirling2' from 'scipy.special' (/home/wladek/.local/lib/python3.12/site-packages/scipy/special/__init__.py)

# W zmiennych positive i negative będziemy przechowywać rozwiązania spełniające i niespełniające warunków problemu.

In [None]:
positive, negative = [], []

# Funkcja gen_solution generuje rozwiązanie spełniające 2 warunki:
# $\forall i = 1, .., N_{M} : \sum \limits _{j=1} ^{N_{T}} x_{ij} \le 1$
# $\forall i = 1, .., N_{T} : \sum \limits _{i=1} ^{N_{M}} x_{ij} \ge 1$

In [None]:
def gen_solution(T: int, M: int, TEST: int):
    mach = np.random.choice(list(range(M)), size=TEST, replace=False)
    tasks = np.hstack(
        (
            np.random.RandomState().permutation(T),
            np.random.choice(list(range(T)), size=TEST - T, replace=True),
        )
    )
    return mach, tasks

In [None]:
NT = 10
NM = 50

E = np.random.random(NT)
D = np.random.random(NT)
eta = np.random.random((NT, NM))
rows, cols = gen_solution(NT, NM, NM)
X = coo_matrix((np.ones(NM), (cols, rows)), shape=(NT, NM)).todense()
P = np.random.random(NM)
print(X.shape)
print(E.shape)
print(D.shape)
print(eta.shape)
print(P.shape)

# Funkcja check_X sprawdza rozwiązanie pod kątem dwóch warunków rozwiązania generowanego przez gen_solution oraz dodatkowo sprawdza trzeci warunek:
# $\forall j = 1, ..., N_{T} : t_{j} = \frac{e_{j}}{\sum \limits _{i=1} ^{N_{M}} n_{ij}x_{ij}} \leq d_{ij}$

In [None]:
def check_X(X, E, D, eta, P):
    assert np.all(np.sum(X, axis=0) <= 1)
    assert np.all(np.sum(X, axis=1) >= 1)
    if np.sum((E / np.sum(np.multiply(eta, X), axis=1) - D) > 0) == 0:
        print("test positive")
        positive.append([X, E, D, eta, P])
    else:
        negative.append([X, E, D, eta, P])

In [None]:
def test_X(X, E, D, eta, P):
    if np.all(np.sum(X, axis=0) <= 1) and np.all(np.sum(X, axis=1) >= 1):
        return np.sum((E / np.sum(np.multiply(eta, X), axis=1) - D) > 0) == 0
    return False

# Możemy zauważyć, że macierz rozwiązań $X$ jest macierzą rzadką, dlatego potencjalnie może opłacać się przechowywanie jako macierzy (listy) koordynatów

In [None]:
def get_data(NT: int, NM: int, verbose: bool = False):
    E = np.random.random(NT)
    D = np.random.random(NT)
    eta = np.random.random((NT, NM))
    P = np.random.random(NM)
    rows, cols = gen_solution(NT, NM, NM)
    X = coo_matrix((np.ones(NM), (cols, rows)), shape=(NT, NM)).todense()
    if verbose:
        print(X.shape)
        print(E.shape)
        print(D.shape)
        print(eta.shape)
        print(P.shape)
    return X, E, D, eta, P

In [None]:
check_X(*get_data(10, 50, verbose=True))

In [None]:
while len(positive) < 10:
    check_X(*get_data(10, 50))

In [None]:
def save_list(listname):
    list = eval(listname)
    for i, example in enumerate(list[:10]):
        object_list = np.empty(5, object)
        object_list[:] = list[i]
        np.save(os.path.join(listname, f"test{i}.npy"), object_list)

In [None]:
save_list("positive")
save_list("negative")

# Wizualizacja rozwiązań

In [None]:
def show_example(example):
    plt.title("machines chosen for tasks")
    plt.xlabel("machine")
    plt.ylabel("tasks")
    plt.imshow(example[0])
    plt.colorbar()
    plt.show()
    plt.title("tasks effort")
    plt.xlabel("task")
    plt.ylabel("effort")
    plt.bar(range(len(example[1])), example[1])
    plt.show()
    plt.title("tasks deadline length")
    plt.xlabel("task")
    plt.ylabel("deadline")
    plt.bar(range(len(example[2])), example[2])
    plt.show()
    plt.title("effectivity of machines doing given task")
    plt.xlabel("machine")
    plt.ylabel("tasks")
    plt.imshow(example[3])
    plt.colorbar()
    plt.show()
    plt.title("cost of work of given machine")
    plt.xlabel("machine")
    plt.ylabel("cost of work")
    plt.bar(range(len(example[4])), example[4])
    plt.show()

In [None]:
b = np.load("positive//test0.npy", allow_pickle=True)

In [None]:
show_example(b)

In [None]:
def alter(A, v_swaps: int, h_swaps: int, turn_off: int, turn_on: int):
    """
    h_swaps: number of horizontal swaps
    v_swaps: number of vertical swaps
    turn_off: number of machines to turn off for a random task
    turn on: number of machines to turn on for a random task
    """
    X = deepcopy(A)
    columns_to_swap = zip(
        np.random.randint(0, X.shape[1], v_swaps),
        np.random.randint(0, X.shape[1], v_swaps),
    )
    rows_to_swap = zip(
        np.random.randint(0, X.shape[0], h_swaps),
        np.random.randint(0, X.shape[0], h_swaps),
    )
    for c1, c2 in columns_to_swap:
        tmp = deepcopy(X[:, c1])
        X[:, c1] = X[:, c2]
        X[:, c2] = tmp
    for r1, r2 in rows_to_swap:
        tmp = deepcopy(X[r1, :])
        X[r1, :] = X[r2, :]
        X[r2, :] = tmp
    pairs = list(zip(*np.where(X)))
    chosen = np.random.choice(len(pairs), size=turn_off)
    for ind in chosen:
        X[pairs[ind]] = 0
    return X
    pairs = list(zip(*np.where(X == 0)))
    chosen = np.random.choice(len(pairs), size=turn_on)
    for ind in chosen:
        X[pairs[ind]] = 0
    return X

In [None]:
rows, cols = gen_solution(NT, NM, NM)
d = coo_matrix((np.ones(NM), (cols, rows)), shape=(NT, NM)).todense()
plt.imshow(d, vmin=0, vmax=1)
plt.show()
new_d = alter(d, 5, 3, 2, 2)
plt.imshow(new_d, vmin=0, vmax=1)
plt.show()
plt.imshow(new_d - d)
plt.show()

In [None]:
def cost(X, E, D, eta, P, base_only=False):
    cond_1 = np.sum(np.sum(X, axis=0) > 1) ** 2
    cond_2 = np.sum(np.sum(X, axis=1) < 1) ** 2
    cond_3 = E / (np.sum(np.multiply(eta, X), axis=1) + 1e-8) - D  # deadlines not met
    cond_3_sum = np.sum(np.where(cond_3 > 0, 100 * cond_3, 0.1 * cond_3))
    base_cost = np.sum(
        np.multiply(E, np.sum(np.multiply(P, X), axis=1))
        / (np.sum(np.multiply(eta, X), axis=1) + 1e-8)
    )
    if base_only:
        return base_cost
    proper = 4 if (cond_1 or cond_2 or np.any(cond_3 > 0)) else 1
    return (base_cost + cond_3_sum + cond_2 + cond_1) * proper

In [None]:
cost(X, E, D, eta, P, True)

In [None]:
distribution = [cost(*get_data(5, 10), base_only=True) for i in range(10000)]
plt.hist(distribution, range=(0, 100), density=True, bins=60)
plt.title("Base cost distribution")
plt.show()

In [None]:
distribution = [cost(*get_data(10, 50)) for i in range(10000)]
plt.hist(distribution, range=(0, 1e4), density=True, bins=60)
plt.title("Distribution of cost with rules violation penalty")
plt.show()

In [None]:
_, E, D, eta, P = get_data(NT, NM)

In [None]:
def get_cheapest_problem(NT: int, NM: int):
    """
    NT - liczba zadań
    NM - liczba maszyn
    """
    problems = []
    for i in range(10000):
        problems.append(get_data(NT, NM))
    return min(problems, key=lambda v: cost(*v))

In [None]:
def get_random_solutions(
    NT: int,
    NM: int,
    n_solutions: int,
    proper: bool,
    E: np.ndarray,
    D: np.ndarray,
    eta: np.ndarray,
    P: np.ndarray,
):
    """
    NT - liczba zadań
    NM - liczba maszyn
    n_solutions - liczba żądanych rozwiązań
    proper - czy wszystkie zwracane rozwiązania powinny być poprawne
    E, D, eta, P - warunki zadania
    """
    solutions = []
    while len(solutions) < n_solutions:
        rows, cols = gen_solution(NT, NM, NM)
        X = coo_matrix((np.ones(NM), (cols, rows)), shape=(NT, NM)).todense()
        if (not proper) or test_X(X, E, D, eta, P):
            solutions.append(X)
    return solutions

In [None]:
def bee_algorithm_iteration(
    solutions: list,
    top_n: int,
    top_attempts: int,
    rest_attempts: int,
    max_solutions: int,
    params: dict,
):
    """
    solutions - rozwiązania wejściowe
    top_n - liczba rozwiązań najlepszych - traktowanych priorytetowo
    top_attempts - liczba pszczół dla najlepszych rozwiązań
    rest_attempts - liczba pszczół dla pozostałych rozwiązań
    max_solutions - maksymalna liczba rozwiązań
    params - słownik parametrów określających sposób generowania permutacji rozwiązań
    """
    new_solutions = []
    for i, solution in enumerate(solutions):
        n_tries = top_attempts if i < top_n else rest_attempts
        for j in range(n_tries):
            new_solutions.append(alter(solution, **params))
    return sorted(new_solutions + solutions, key=lambda x: cost(x, E, D, eta, P))[
        :max_solutions
    ]

In [None]:
params = {"v_swaps": 5, "h_swaps": 5, "turn_off": 2, "turn_on": 2}

In [None]:
start_solutions = get_random_solutions(NT, NM, 160, False, E, D, eta, P)

In [None]:
len(start_solutions)

In [None]:
solutions_iter_1 = bee_algorithm_iteration(start_solutions, 3, 3, 1, 120, params)

In [None]:
print("example costs before 1 iteration of bee algorithm:")
print([cost(i, E, D, eta, P) for i in start_solutions][:10])
print("example costs after 1 iteration of bee algorithm:")
print([cost(i, E, D, eta, P) for i in solutions_iter_1][:10])

In [None]:
print(
    f"average cost before 1 iteration of bee algorithm:\t{(sum(cost(i, E, D, eta, P) for i in start_solutions) / len(start_solutions)):.2f}"
)
print(
    f"average cost after 1 iteration of bee algorithm:\t{(sum(cost(i, E, D, eta, P) for i in solutions_iter_1) / len(solutions_iter_1)):.2f}"
)
print(
    f"best cost before 1 iteration of bee algorithm:\t{min(cost(i, E, D, eta, P) for i in start_solutions):.2f}"
)
print(
    f"best cost after 1 iteration of bee algorithm:\t{min(cost(i, E, D, eta, P) for i in solutions_iter_1):.2f}"
)

In [None]:
def bee_algorithm(starting_solutions: list, iterations: int, params: dict):
    solutions = [i for i in starting_solutions]
    params_c = {key: val for key, val in params.items()}
    top_n = params_c.pop("top_n")
    top_attempts = params_c.pop("top_attempts")
    rest_attempts = params_c.pop("rest_attempts")
    max_solutions = params_c.pop("max_solutions")
    for i in range(iterations):
        solutions = bee_algorithm_iteration(
            solutions, top_n, top_attempts, rest_attempts, max_solutions, params_c
        )
    return solutions[0]

In [None]:
params = {
    "v_swaps": 1,
    "h_swaps": 1,
    "turn_off": 3,
    "turn_on": 3,
    "top_n": 5,
    "top_attempts": 11,
    "rest_attempts": 6,
    "max_solutions": 200,
}

In [None]:
sol = bee_algorithm(start_solutions, 40, params)

In [None]:
cost(sol, E, D, eta, P)

In [None]:
X, E, D, eta, P = get_data(10, 100)

In [None]:
test_X(X, E, D, eta, P)

In [None]:
base_cost = np.multiply(E, np.sum(np.multiply(P, X), axis=1)) / np.sum(
    np.multiply(eta, X), axis=1
)

In [None]:
problem = get_cheapest_problem(10, 50)
cost(*problem)

In [None]:
test_X(*problem)

In [None]:
X, E, D, eta, P = problem

In [None]:
sol = bee_algorithm([X], 40, params)

In [None]:
cost(sol, E, D, eta, P)

In [None]:
test_X(sol, E, D, eta, P)

In [None]:
def plot_bee_algorithm(starting_solutions: list, iterations: int, params: dict):
    solutions = [i for i in starting_solutions]
    params_c = {key: val for key, val in params.items()}
    top_n = params_c.pop("top_n")
    top_attempts = params_c.pop("top_attempts")
    rest_attempts = params_c.pop("rest_attempts")
    max_solutions = params_c.pop("max_solutions")
    costs = []
    for i in range(iterations):
        solutions = bee_algorithm_iteration(
            solutions, top_n, top_attempts, rest_attempts, max_solutions, params_c
        )
        costs.append(cost(solutions[0], E, D, eta, P))
    plt.plot([i for i in range(iterations)], costs)
    plt.show()

In [None]:
plot_bee_algorithm(start_solutions, 40, params)

In [None]:
work = np.array(
    [
        0.05649429,
        0.53109867,
        0.86764481,
    ]
)
time = np.array(
    [
        0.53677527,
        0.43160818,
        0.9047183,
    ]
)
P = np.array([0.42415955, 0.73918663, 0.3058235, 0.6060437, 0.64862661])
eta = np.array(
    [[0.55492947, 0.35726281, 0.75614558, 0.17878197, 0.95030017] for _ in range(3)]
)

In [None]:
start_solutions = get_random_solutions(3, 5, 160, False, work, time, eta, P)

In [None]:
sol = bee_algorithm(start_solutions, 40, params)

In [None]:
cost(sol, work, time, eta, P)

In [None]:
test_X(sol, work, time, eta, P)

In [None]:
R = np.array([[0, 0, 0, 1, 0], [0, 1, 0, 0, 1], [1, 0, 1, 0, 0]])
test_X(R, work, time, eta, P)

In [None]:
cost(R, work, time, eta, P, base_only=True)