In [188]:
import numpy as np
from typing import List, Tuple

class TransportProblem:
    def __init__(self, main_table: np.ndarray, reserves: np.ndarray, needs: np.ndarray):
        """
        main_table: Таблица стоимостей перевозок, где элемент c_ij - стоимость перевозки единицы груза от поставщика i к потребителю j
        reserves: Запасы поставщиков, где элемент a_i - объем запаса поставщика i
        needs: Потребности потребителей, где элемент b_j - объем потребности потребителя j
        """
        self.main_table = main_table
        self.reserves = reserves
        self.needs = needs

    def add_dummy_supplier(self, volume):
        """
        Добавление фиктивного поставщика
        volume: Объем запаса фиктивного поставщика.

        Фиктивный поставщик добавляется, если сумма запасов меньше суммы потребностей
        Его стоимость перевозок равна нулю, чтобы не влиять на общую стоимость
        """
        # Добавляем строку нулевых стоимостей для фиктивного поставщика
        dummy_row = np.zeros(self.main_table.shape[1])
        self.main_table = np.vstack([self.main_table, dummy_row])
        # Добавляем запас фиктивного поставщика
        self.reserves = np.append(self.reserves, volume)

    def add_dummy_customer(self, volume):
        """
        Добавление фиктивного потребителя
        volume: Объем потребности фиктивного потребителя

        Фиктивный потребитель добавляется, если сумма запасов больше суммы потребностей
        Его стоимость перевозок равна нулю, чтобы не влиять на общую стоимость
        """
        # Добавляем столбец нулевых стоимостей для фиктивного потребителя
        dummy_column = np.zeros((self.main_table.shape[0], 1))
        self.main_table = np.hstack([self.main_table, dummy_column])
        # Добавляем потребность фиктивного потребителя
        self.needs = np.append(self.needs, volume)

    def balance(self):
        """
        Балансировка задачи: добавление фиктивных поставщиков или потребителей
        Необходимо для приведения задачи к закрытому виду, где сумма запасов равна сумме потребностей
        """
        total_reserves = np.sum(self.reserves)
        total_needs = np.sum(self.needs)

        if total_reserves < total_needs:
            # Добавляем фиктивного поставщика
            self.add_dummy_supplier(total_needs - total_reserves)
        elif total_reserves > total_needs:
            # Добавляем фиктивного потребителя
            self.add_dummy_customer(total_reserves - total_needs)

    def recalculate_plan(self, transportation_table, cycle_path):
        """
        Пересчитывание плана перевозок на основе найденного цикла
        x: Таблица перевозок, где элемент x_ij - объем перевозки от поставщика i к потребителю j
        cycle_path: (List[Tuple[int, int]]) Цикл для пересчета

        Возвращает int число, величина пересчета (минимальное значение в клетках цикла)

        Алгоритм:
        1) Находим минимальное значение в клетках цикла
        2) Перераспределяем объемы перевозок по циклу, увеличивая и уменьшая значения в соответствующих клетках
        3) Обнуляем клетку с минимальным значением, чтобы исключить ее из базиса
        """
        # Находим минимальное значение в клетках цикла
        min_value = np.min([transportation_table[i][j] for i, j in cycle_path[1:-1:2]])
        cells_with_min_value = [(i, j) for i, j in cycle_path[1:-1:2] if np.isnan(transportation_table[i][j]) or transportation_table[i][j] == min_value]

        # Если минимальное значение NaN, обрабатываем особый случай
        if np.isnan(min_value):
            i, j = cycle_path[0]
            transportation_table[i][j] = np.nan
            i, j = cells_with_min_value[0]
            transportation_table[i][j] = 0

            return min_value

        # Пересчитываем значения в клетках цикла
        for step, (i, j) in enumerate(cycle_path[:-1]):
            if (i, j) in cells_with_min_value:
                if cells_with_min_value.index((i, j)) == 0:
                    transportation_table[i][j] = 0
                else:
                    transportation_table[i][j] = np.nan
                continue

            if np.isnan(transportation_table[i][j]):
                transportation_table[i][j] = 0

            if step % 2 == 0:
                transportation_table[i][j] += min_value  # Увеличиваем объем в четных клетках цикла
            else:
                transportation_table[i][j] -= min_value  # Уменьшаем объем в нечетных клетках цикла

        return min_value

    def calculate_potentials(self, x, u, v):
        """
        Вычисление потенциалов для таблицы перевозок
        x: Таблица перевозок
        u: Потенциалы строк
        v: Потенциалы столбцов

        Алгоритм:
        1) Потенциалы u_i и v_j вычисляются по формуле u_i + v_j = c_ij для базисных клеток
        2) Начинаем с u[0] = 0 и вычисляем остальные потенциалы, используя базисные клетки
        """
        m, n = x.shape
        u[:] = np.nan
        v[:] = np.nan
        u[0] = 0  # Начальное значение потенциала первой строки

        # Вычисляем потенциалы по формуле u_i + v_j = c_ij, пока все они не будут определены
        while np.any(np.isnan(u)) or np.any(np.isnan(v)):
            for i in range(m):
                for j in range(n):
                    if not np.isnan(u[i]) and np.isnan(v[j]) and x[i][j] != 0:
                        v[j] = self.main_table[i][j] - u[i]
                    elif np.isnan(u[i]) and not np.isnan(v[j]) and x[i][j] != 0:
                        u[i] = self.main_table[i][j] - v[j]

    def is_optimal(self, x, u, v):
        """
        Проверка, является ли текущий план оптимальным
        x: Таблица перевозок
        u: Потенциалы строк
        v: Потенциалы столбцов

        Возвращает True, если план оптимален, иначе False

        Условие оптимальности:
        Для всех свободных клеток (x_ij = 0) должно выполняться: u_i + v_j <= c_ij
        Если это условие нарушается, план не оптимален
        """
        m, n = x.shape
        for i in range(m):
            for j in range(n):
                if x[i][j] == 0 and u[i] + v[j] > self.main_table[i][j]:
                    return False
        return True

    def solve(self):
        """
        Основная функция-решение транспортной задачи методом потенциалов

        Возвращает np.ndarray оптимальный план перевозок

        Алгоритм:
        1) Балансировка задачи
        2) Построение начального базисного решения методом северо-западного угла
        3) Улучшение плана:
           - Вычисление потенциалов
           - Проверка оптимальности
           - Поиск цикла и пересчет плана
        """
        # Балансировка задачи
        self.balance()

        m, n = self.main_table.shape
        x = np.zeros((m, n))  # Таблица перевозок
        u = np.zeros(m)  # Потенциалы строк
        v = np.zeros(n)  # Потенциалы столбцов

        # Начальное базисное решение методом северо-западного угла (минимальное находим)
        i, j = 0, 0
        while i < m and j < n:
            x[i][j] = min(self.reserves[i], self.needs[j])
            self.reserves[i] -= x[i][j]
            self.needs[j] -= x[i][j]
            if self.reserves[i] == 0:
                i += 1
            else:
                j += 1

        # Улучшаем план пока не будет оптимальным
        while True:
            self.calculate_potentials(x, u, v)
            if self.is_optimal(x, u, v):
                break

            # Находим клетку для улучшения, не проходящую условия оптимальности
            for i in range(m):
                for j in range(n):
                    if x[i][j] == 0 and u[i] + v[j] > self.main_table[i][j]:
                        start_pos = (i, j)
                        break

            # Находим цикл и пересчитываем план
            cycle_path = self.find_cycle_path(x, start_pos)
            self.recalculate_plan(x, cycle_path)

        return x

    def find_cycle_path(self, x, start_pos):
        """
        Нахождение цикла в таблице перевозок, начиная с заданной позиции
        x: Таблица перевозок
        start_pos: (Tuple[int, int]) начальная позиция для поиска цикла

        Возвращает List[Tuple[int, int]] список позиций, образующих цикл

        Алгоритм:
        1) Начинаем с начальной позиции и перемещаемся по строкам и столбцам, используя занятые клетки
        2) Возвращаемся в начальную позицию, завершая цикл
        """
        def get_posible_moves(bool_table, path):
            """
            Возвращает возможные ходы для построения цикла
            bool_table: Логическая таблица, где True обозначает доступные клетки
            path: (List[Tuple[int, int]]) Текущий путь

            Возвращает список возможных ходов
            """
            posible_moves = np.full(bool_table.shape, False)

            current_pos = path[-1]
            prev_pos = path[-2] if len(path) > 1 else (np.nan, np.nan)

            # Разрешаем ходы по строке и столбцу
            if current_pos[0] != prev_pos[0]:
                posible_moves[current_pos[0]] = True

            if current_pos[1] != prev_pos[1]:
                posible_moves[:, current_pos[1]] = True

            return list(zip(*np.nonzero(posible_moves * bool_table)))

        res = [start_pos]
        bool_table = x != 0

        while True:
            current_pos = res[-1]

            # Помечаем текущую позицию как посещенную
            bool_table[current_pos[0]][current_pos[1]] = False

            # Если длина пути больше 3, разрешаем вернуться в начальную позицию
            if len(res) > 3:
                bool_table[start_pos[0]][start_pos[1]] = True

            # Получаем возможные ходы
            posible_moves = get_posible_moves(bool_table, res)

            # Если начальная позиция доступна, завершаем цикл
            if start_pos in posible_moves:
                res.append(start_pos)
                return res

            # Если нет возможных ходов, сбрасываем путь
            if not posible_moves:
                for i, j in res[1:-1]:
                    bool_table[i][j] = True

                res = [start_pos]
                continue

            # Добавляем следующий ход в путь
            res.append(posible_moves[0])

Все наборы данных для тестов взяты с различных сайтов-примеров и ютуб гайдов по решению подобных задач, ответы полностью совпали с ними

Пример 1 https://galyautdinov.ru/post/transportnaya-zadacha

In [189]:
# Пример 1 сбалансированная задача
main_table = np.array([[5, 3, 1], [3, 2, 4], [4, 1, 2]])
needs = np.array([15, 20, 25])
reserves = np.array([10, 20, 30])

problem = TransportProblem(main_table, reserves, needs)
solution = problem.solve()

print("Оптимальная транспортировка: \n", solution)
cost = np.sum(problem.main_table * solution)
print("\n Затраты на перевозку: ", cost)

Оптимальная транспортировка: 
 [[ 0.  0. 10.]
 [15.  5.  0.]
 [ 0. 15. 15.]]

 Затраты на перевозку:  110.0


Примеры 2 и 3 https://youtu.be/8SjsPTSx6rw?si=k0VoHYVHyNqloh2h

In [190]:
# Пример 2 Фиктивный поставщик
main_table = np.array([[6, 5, 2], [3, 7, 4], [7, 8, 1], [2, 2, 3]])
needs = np.array([150, 150, 250])
reserves = np.array([250, 100, 80, 80])

problem = TransportProblem(main_table, reserves, needs)
solution = problem.solve()

print("Оптимальная транспортировка: \n", solution)
cost = np.sum(problem.main_table * solution)
print("\n Затраты на перевозку: ", cost)

Оптимальная транспортировка: 
 [[  0.  80. 170.]
 [100.   0.   0.]
 [  0.   0.  80.]
 [ 50.  30.   0.]
 [  0.  40.   0.]]

 Затраты на перевозку:  1280.0


In [191]:
# Пример 3 Фиктивный потребитель
main_table = np.array([[6, 5, 2], [3, 7, 4], [7, 8, 1], [2, 2, 3]])
needs = np.array([150, 150, 200])
reserves = np.array([250, 100, 100, 100])

problem = TransportProblem(main_table, reserves, needs)
solution = problem.solve()

print("Оптимальная транспортировка: \n", solution)
cost = np.sum(problem.main_table * solution)
print("\n Затраты на перевозку: ", cost)

Оптимальная транспортировка: 
 [[  0. 100. 100.  50.]
 [100.   0.   0.   0.]
 [  0.   0. 100.   0.]
 [ 50.  50.   0.   0.]]

 Затраты на перевозку:  1300.0


Пример 4 https://youtu.be/1jBa_2IYDNY?si=RAebsowiSsOvu5NH

In [192]:
# Пример 4 Сбалансированная задача
main_table = np.array([[1, 3, 2, 4], [2, 1, 4, 3], [3, 5, 6, 1]])
needs = np.array([30, 10, 20, 40])
reserves = np.array([35, 50, 15])

problem = TransportProblem(main_table, reserves, needs)
solution = problem.solve()

print("Оптимальная транспортировка: \n", solution)
cost = np.sum(problem.main_table * solution)
print("\n Затраты на перевозку: ", cost)

Оптимальная транспортировка: 
 [[15.  0. 20.  0.]
 [15. 10.  0. 25.]
 [ 0.  0.  0. 15.]]

 Затраты на перевозку:  185.0


Пример 5 https://youtu.be/4AinFDRwOGU?si=y6k5QQofBRse4XCe

In [193]:
# Пример 5 большой размерности
main_table = np.array([[80, 48, 110, 72], [73, 57, 95, 48], [25, 35, 68, 60], [60, 70, 82, 120], [115, 92, 74, 135]])
needs = np.array([80, 150, 220, 160])
reserves = np.array([110, 85, 75, 90, 250])

problem = TransportProblem(main_table, reserves, needs)
solution = problem.solve()

print("Оптимальная транспортировка: \n", solution)
cost = np.sum(problem.main_table * solution)
print("\n Затраты на перевозку: ", cost)

Оптимальная транспортировка: 
 [[  0.  35.   0.  75.]
 [  0.   0.   0.  85.]
 [ 75.   0.   0.   0.]
 [  5.  85.   0.   0.]
 [  0.  30. 220.   0.]]

 Затраты на перевозку:  38325.0
