In [None]:
import numpy as np
from collections import deque

# Hungarian Algorithm

In [None]:
class HungarianAlgorithmGraph:
    def __init__(self, cost_matrix):
        """
        Initialize the HungarianAlgorithmGraph with a cost matrix.
        param cost_matrix: A square matrix where cost_matrix[u][v] represents the cost of assigning u to v.
        """
        self.cost_matrix = cost_matrix  # The cost matrix for the assignment problem
        self.n = len(cost_matrix)  # Number of nodes in each set (U and V)
        self.U = set(range(self.n))  # Set of nodes in U
        self.V = set(range(self.n))  # Set of nodes in V
        self.matching = {}  # Dictionary to store the current matching
        self.labels_u = [0] * self.n  # Labels for nodes in U
        self.labels_v = [0] * self.n  # Labels for nodes in V
        self.slack = [float('inf')] * self.n  # Slack values for nodes in V

    def initialize_labels(self):
        """
        Initialize labels for U and V.
        Labels for U are set to the maximum cost in each row of the cost matrix,
        and labels for V are initialized to 0.
        """
        for u in range(self.n):
            self.labels_u[u] = max(self.cost_matrix[u])  # Max weight in row u for label_u[u]

    def find_augmenting_path(self, u, visited_u, visited_v, parent):
        """
        Attempt to find an augmenting path starting from node u in U.
        -param u: Current node in U
        -param visited_u: List tracking visited nodes in U
        -param visited_v: List tracking visited nodes in V
        -param parent: Parent array to reconstruct paths        """
        visited_u[u] = True  # Mark the current node in U as visited
        for v in range(self.n):  # Iterate over nodes in V
            if visited_v[v]:  # Skip if node v is already visited
                continue
            # Compute the reduced cost for edge (u, v)
            delta = self.labels_u[u] + self.labels_v[v] - self.cost_matrix[u][v]
            if delta == 0:  # If edge (u, v) is in the equality graph
                visited_v[v] = True  # Mark v as visited
                # If v is unmatched or we can find an augmenting path from its match
                if v not in self.matching or self.find_augmenting_path(self.matching[v], visited_u, visited_v, parent):
                    self.matching[v] = u  # Match u with v
                    return True
            else:
                # Update the slack for node v
                self.slack[v] = min(self.slack[v], delta)
        return False  # No augmenting path found

    def update_labels(self, visited_u, visited_v):
        """
        Update labels for U and V to reduce slack and maintain feasibility.
        -param visited_u: List tracking visited nodes in U
        -param visited_v: List tracking visited nodes in V
        """
        # Compute the smallest slack value
        delta = min(self.slack[v] for v in range(self.n) if not visited_v[v])
        # Update labels for visited nodes in U and V
        for u in range(self.n):
            if visited_u[u]:
                self.labels_u[u] -= delta
        for v in range(self.n):
            if visited_v[v]:
                self.labels_v[v] += delta
            else:
                self.slack[v] -= delta

    def augment(self):
        """
        Perform augmentation for each node in U to construct the optimal matching.
        """
        for u in range(self.n):  # Iterate over all nodes in U
            visited_u = [False] * self.n  # Reset visited for U
            visited_v = [False] * self.n  # Reset visited for V
            self.slack = [float('inf')] * self.n  # Reset slack values
            # Repeat until an augmenting path is found for u
            while not self.find_augmenting_path(u, visited_u, visited_v, {}):
                self.update_labels(visited_u, visited_v)  # Update labels if no path is found
                visited_u = [False] * self.n  # Reset visited for U
                visited_v = [False] * self.n  # Reset visited for V

    def solve(self):
        """
        Solve the assignment problem using the Hungarian algorithm.
        return: A list of optimal assignments and the total cost.
        """
        self.initialize_labels()  # Step 1: Initialize labels
        self.augment()  # Step 2: Find optimal matching through augmentation
        # Extract the resulting matching
        result = [(self.matching[v], v) for v in self.matching]
        # Compute the total cost of the matching
        optimal_cost = sum(self.cost_matrix[u][v] for u, v in result)
        return result, optimal_cost

In [None]:
cost_matrix = np.array([
    [4, 2, 8],  # Costs for assignments of u_0
    [2, 3, 7],  # Costs for assignments of u_1
    [3, 1, 6]   # Costs for assignments of u_2
])

# Instantiate the HungarianAlgorithmGraph class
hungarian = HungarianAlgorithmGraph(cost_matrix)
assignment, cost = hungarian.solve()

# Output the results
print("Optimal Assignment:", assignment)  # Matching pairs (u, v)
print("Optimal Cost:", cost)  # Total cost of the optimal matching

Optimal Assignment: [(0, 2), (1, 1), (2, 0)]
Optimal Cost: 14


# Blossom Algorithm

In [None]:
class BlossomAlgorithmWithHungarian:
    def __init__(self, cost_matrix):
        self.cost_matrix = np.array(cost_matrix)
        self.n = len(cost_matrix)
        self.matching = [-1] * self.n
        self.labels = [-1] * self.n
        self.parent = [-1] * self.n
        self.base = list(range(self.n))

    def find_lowest_common_ancestor(self, u, v):
        visited = [False] * self.n
        while True:
            u = self.base[u]
            visited[u] = True
            if self.matching[u] == -1:
                break
            u = self.parent[self.matching[u]]
        while True:
            v = self.base[v]
            if visited[v]:
                return v
            v = self.parent[self.matching[v]]

    def mark_blossom(self, u, v, lca):
        while self.base[u] != lca:
            self.parent[u] = v
            v = self.matching[u]
            if self.labels[v] == 1:
                self.labels[v] = 0
                self.parent[v] = u
            self.base[u] = self.base[v] = lca
            u = self.parent[self.matching[v]]

    def bfs(self, start):
        """
        Perform BFS to find an augmenting path starting from the node `start`.
        """
        queue = deque([start])
        self.labels = [-1] * self.n  # Reset labels: -1 means unvisited
        self.parent = [-1] * self.n  # Reset parent: -1 means no parent
        self.base = list(range(self.n))  # Each node is its own base initially
        self.labels[start] = 0  # Mark start node with label 0 (even)

        while queue:
            u = queue.popleft()

            # Explore all neighbors of u
            for v in range(self.n):
                if self.cost_matrix[u][v] > 0 and self.labels[v] == -1:
                    # If v is unmatched and we find an augmenting path
                    if self.matching[v] == -1:
                        self.parent[v] = u
                        self.augment_path(v)
                        return True
                    # If v is matched and the match is unvisited, continue exploring
                    elif self.labels[self.matching[v]] == -1:
                        self.labels[v] = 1  # Mark v with label 1 (odd)
                        self.parent[v] = u
                        queue.append(self.matching[v])
                        self.labels[self.matching[v]] = 0  # Mark the matched node with label 0 (even)
                    # If u and v are in different alternating trees, check for a blossom
                    elif self.base[u] != self.base[v]:
                        # Find the least common ancestor of u and v
                        lca = self.find_lowest_common_ancestor(u, v)
                        # Mark both u and v as part of the blossom
                        self.mark_blossom(u, v, lca)
                        self.mark_blossom(v, u, lca)

        return False  # No augmenting path found

    def augment_path(self, u):
        """
        Augment the matching along the alternating path ending at u.
        This is the part where the matching is flipped (augmented).
        """
        while u != -1:
            v = self.parent[u]
            w = self.matching[v]
            self.matching[u] = v
            self.matching[v] = u
            u = w

    def solve_bipartite(self):
        hungarian_solver = HungarianAlgorithmGraph(self.cost_matrix)
        matching, _ = hungarian_solver.solve()
        print(f"Hungarian Algorithm Matching: {matching}")
        for u, v in matching:
            self.matching[u] = v
            self.matching[v] = u

    def solve_general(self):
        for u in range(self.n):
            if self.matching[u] == -1:
                if self.bfs(u):
                    print(f"Augmenting path found starting from node {u}")
                else:
                    print(f"No augmenting path found starting from node {u}")

    def solve(self, bipartite=True):
        if bipartite:
            self.solve_bipartite()
        else:
            self.solve_general()
        return [(u, v) for u, v in enumerate(self.matching) if u < v and self.matching[u] != -1]


In [None]:
cost_matrix = np.array([
    [4, 2, 8],
    [2, 3, 7],
    [3, 1, 6]
])

blossom_solver = BlossomAlgorithmWithHungarian(cost_matrix)
max_matching = blossom_solver.solve(bipartite=True)
print("Maximum Matching:", max_matching)

Hungarian Algorithm Matching: [(0, 2), (1, 1), (2, 0)]
Maximum Matching: [(0, 2)]


In [None]:
cost_matrix = np.array([
    [0, 1, 1, 0, 0],  # Node 0 connected to 1 and 2
    [1, 0, 1, 1, 0],  # Node 1 connected to 0, 2, and 3
    [1, 1, 0, 0, 0],  # Node 2 connected to 0, 1
    [0, 1, 0, 0, 1],  # Node 3 connected to 1 and 4
    [0, 0, 0, 1, 0]   # Node 4 connected to 3
])

blossom_solver = BlossomAlgorithmWithHungarian(cost_matrix)
max_matching = blossom_solver.solve(bipartite=False)
print("Maximum Matching:", max_matching)

Augmenting path found starting from node 0
Augmenting path found starting from node 2
No augmenting path found starting from node 4
Maximum Matching: [(0, 2), (1, 3)]


In [None]:
cost_matrix = np.array([
    [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 0
    [1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    [0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], # 2
    [0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0], # 4
    [0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0], # 6
    [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1, 1, 0, 0], # 8
    [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0], # 10
    [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1],
    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0], # 12
    [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]])

blossom_solver = BlossomAlgorithmWithHungarian(cost_matrix)
max_matching = blossom_solver.solve(bipartite=False)
print("Maximum Matching:", max_matching)

Augmenting path found starting from node 0
Augmenting path found starting from node 2
Augmenting path found starting from node 4
No augmenting path found starting from node 6
No augmenting path found starting from node 7
Augmenting path found starting from node 8
No augmenting path found starting from node 10
Augmenting path found starting from node 11
No augmenting path found starting from node 13
Maximum Matching: [(0, 1), (2, 3), (4, 5), (8, 9), (11, 12)]
