# Week 01 Assignment: minimum spanning trees

A minimum spanning tree (MST) of an undirected weighted graph $G=(V,E)$ is a graph $T=(V',E')$ such that
\begin{align*}
V' &= V\\
E' &\subset E
\end{align*}
with $\sum_{e\in E'} w_e$ being the the smallest possible among all choices of $E'$. Here $w_e$ is the weight of an edge.

## Borůvka's algorithm

The basis of all MST algorithm was developed by [Otakar Borůvka](https://en.wikipedia.org/wiki/Otakar_Bor%C5%AFvka) in 1926. Here's the pseudocode.

\begin{align*}
& \textbf{find minimum spanning tree of } G: \\
& \qquad \textbf{initialize}\ T\ \text{as an edgeless copy of}\ G \\
& \qquad \textbf{while}\ T\ \text{has more than 1 components}: \\
& \qquad\qquad \text{connect two components with a safe edge} \\
& \qquad \textbf{return}\ T
\end{align*}


In [1]:
##12345678901234567890123456789012345678901234567890123456789012345678901234567890


class MST:
    """A class with the methods necessary to produce a MST from the adjacency
    matrix of a weighted undirected graph.
    """

    def __init__(self, G: list[list[int]]):
        """Initializes the MST object with the input graph G."""
        # Validate that the input graph is a non-empty square matrix
        if len(G) == 0 or len(G) != len(G[0]):
            raise ValueError("Input graph must be a non-empty square matrix")
        # Validate that the input graph is symmetric
        for i in range(len(G)):
            for j in range(len(G)):
                if G[i][j] != G[j][i]:
                    raise ValueError("Input graph must be undirected (symmetric)")
        # Validate that the input graph has no self-loops
        for i in range(len(G)):
            if G[i][i] != G[0][0]:
                raise ValueError("Input graph must have no self-loops")

        # Local copy of the input graph's adjacency matrix, so that we don't
        # accidentally modify the input graph. Also this local copy is
        # accessible to all methods of this class.
        self.G: list[list[int]] = G
        # Shorthand notation for number of vertices, so that we don'try have to
        # keep writing len(self.G) all the time.
        self.n: int = len(self.G)
        # Shorthand of no-edge sentinel from input graph. Typicall we expect a 0
        # or an infinity as the no-edge sentinel, but we don't assume anything
        # here. We just take whatever is in G[0][0] as the no-edge sentinel.
        self.no_edge = G[0][0]
        # Adjacency matrix for MST. Initially it has no edges. This will be
        # populated as the algorithm progresses and will be the output
        # of the algorithm.
        self.T: list[list[int]] = [
            [self.no_edge for i in range(self.n)] for j in range(self.n)
        ]

    def _reachabilty(self, starting: int, graph: list[list[int]]) -> list[int]:
        """Determines the vertices of the input graph reachable from the
        starting vertex."""
        # List of reachable vertices
        reach: list[int] = []
        # List of vertices to explore next, primed with the starting vertex
        visit_next: list[int] = [starting]
        # Process every vertex in the visit_next list until it's empty.
        while len(visit_next) > 0:
            # Take a vertex out of the visit_next list. This can be the first
            # vertex in the list (operating it as a queue), the last vertex
            # (operating it as a stack), or any vertex with damn well please
            # (operating it as we like). All things being equal, stack is fine.
            vertex: int = visit_next.pop()
            if vertex not in reach:
                # Add this vertex to the reachable list
                reach.append(vertex)
                # Plan to visit this vertex's neighbors next
                for neighbor in range(self.n):
                    if graph[neighbor][vertex] != self.no_edge:
                        visit_next.append(neighbor)
        return reach  # Done!

    def _count_and_label(self):
        """Counts the components of the candidate MST (self.T) and labels its
        vertices with the component they belong to. For every vertex in the graph,
        track its component. The component label is the count value for each component
        when it was discovered. For example, if there are 3 components in the graph,
        the vertices in the first component will be labeled 1, those in the second
        component will be labeled 2, and those in the third component will be labeled 3.
        """
        # Count of components in the candidate MST
        count_of_components: int = 0
        # The component label for each vertex. Initialized to -1, meaning
        # that no vertex has been labeled yet.
        component_label: list[int] = [-1] * self.n
        # Remember the vertices we've seen so far. This is to avoid
        # double counting components.
        visited: list[int] = []
        # Process every vertex. If we haven't seen it before, it means
        # we've just found a new component.
        for vertex in range(self.n):
            if vertex not in visited:
                # We just found a new component, update the count. This count value
                # becomes the label for all vertices in this component.
                count_of_components += 1
                # Find everything we can reach from this vertex, because they'll be
                # in the same component
                reachable_from_vertex = self._reachabilty(vertex, self.T)
                # Add these reachable vertices to those visited, because
                # they are all in the same component and we don't want to
                # process them in the future.
                visited.extend(reachable_from_vertex)
                # The current component count becomes the label of all
                # these vertices
                for v in reachable_from_vertex:
                    component_label[v] = count_of_components
        return count_of_components, component_label  # Done

    def boruvka(self):
        """The algorithm itself!"""
        # Initialize count of components in the candidate MST and also get the
        # labels for each vertex in it.
        count_of_components, component_label = self._count_and_label()
        while count_of_components > 1:
            # Initialize a list to hold the safe edges for the various components.
            # Here lies a YUGE! implementation question. How to represent a safe edge?
            safe_edge = [None] * (self.n + 1)
            # Find any two vertices u, v in different components. The loop examines
            # all pairs of vertices above the main diagonal of the adjacency matrix.
            # There is no reason to look below the main diagonal, because the graph is
            # undirected and the adjacency matrix is symmetric. Also there is no reason
            # to look at the main diagonal, because there are no self-loops in the graph.
            # So we look at all pairs (u,v) such that u < v. For every such pair,
            # if u and v are in different components, we consider edge (u,v) as
            # a candidate for the safe edge of the component of u and also for
            # the safe edge of the component of v.
            for u in range(self.n):
                for v in range(u + 1, self.n):
                    if component_label[u] != component_label[v]:
                        # Vertices u, v are in different components. Let's figure
                        # out the safe edge for the component of vertex u. We do
                        # things below in a verbose manner for clarity. We'll be
                        # more concise when we do the same for component of vertex v.
                        if safe_edge[component_label[u]] is None:
                            # There is no safe edge for this component yet, so let's
                            # assume that (u,v) is just it. And here is how to represent
                            # the safe edge of a component.
                            safe_edge[component_label[u]] = [u, v]
                        else:
                            # There is currently a safe edge for this vertex, but is
                            # it truly the safest? Let's find out. Is edge (u,v)
                            # safer than the current safe edge for this component?
                            current_component = component_label[u]
                            current_safe_edge = safe_edge[current_component]
                            a = current_safe_edge[0]  # safe edge vertex a
                            b = current_safe_edge[1]  # safe edge vertex b
                            # Look up input graph for weigh of edge (a,b)
                            current_weight = self.G[a][b]
                            # Which of two edges (a,b) and (u,v) is safer, ie,
                            # has the smallest weight?
                            if self.G[u][v] < current_weight:
                                # If (u,v) is smaller than the existing safe
                                # edge (a,b) make (u,v) the safe edge for the
                                # current component.
                                safe_edge[current_component] = [u, v]
                        # It's component v's turn now. We repeat the same
                        # logic as above, but for component of vertex v. The
                        # code is a bit less verbose this time.
                        if safe_edge[component_label[v]] is None:
                            safe_edge[component_label[v]] = [v, u]
                        else:
                            [a, b] = safe_edge[component_label[v]]
                            if self.G[v][u] < self.G[a][b]:
                                safe_edge[component_label[v]] = [v, u]
            # Add all the safe edges to the candidate MST
            for edge in safe_edge:
                if edge is not None:
                    u = edge[0]
                    v = edge[1]
                    self.T[u][v] = self.G[u][v]
                    self.T[v][u] = self.G[v][u]
            # Recount the components and re-label the vertices
            count_of_components, component_label = self._count_and_label()
        return self.T  # Done!

In [2]:
# TEST CODE

_ = float("inf")

graph = [
    [_, _, _, 5, 1, _],
    [_, _, 20, 5, _, 10],
    [_, 20, _, 10, _, _],
    [5, 5, 10, _, _, 15],
    [1, _, _, _, _, 20],
    [_, 10, _, 15, 20, _],
]

mst = MST(graph)
mst.boruvka()
T = mst.T
weight = 0
for i in range(len(T)):
    for j in range(i + 1, len(T)):
        if T[i][j] != _:
            weight += T[i][j]
print("Weight of MST is", weight) # expect "Weight of MST is 31"

Weight of MST is 31


---

# A more compact version

The following class is very similar to the previous one except for its `boruvka` method. In this class, `boruvka several utility methods to complete its task. And all the utility methods update class fields instead of returning values.


In [5]:
##12345678901234567890123456789012345678901234567890123456789012345678901234567890


class MST_OO:
    """A class with the methods necessary to produce a MST from the adjacency
    matrix of a weighted undirected graph.
    """

    def __init__(self, G: list[list[int]]):
        """Initializes the MST object with the input graph G."""
        # Validate that the input graph is a non-empty square matrix
        if len(G) == 0 or len(G) != len(G[0]):
            raise ValueError("Input graph must be a non-empty square matrix")
        # Validate that the input graph is symmetric
        for i in range(len(G)):
            for j in range(len(G)):
                if G[i][j] != G[j][i]:
                    raise ValueError("Input graph must be undirected (symmetric)")
        # Validate that the input graph has no self-loops
        for i in range(len(G)):
            if G[i][i] != G[0][0]:
                raise ValueError("Input graph must have no self-loops")

        # Local copy of the input graph's adjacency matrix, so that we don't
        # accidentally modify the input graph. Also this local copy is
        # accessible to all methods of this class.
        self.G: list[list[int]] = G
        # Shorthand notation for number of vertices, so that we don'try have to
        # keep writing len(self.G) all the time.
        self.n: int = len(self.G)
        # Shorthand of no-edge sentinel from input graph. Typicall we expect a 0
        # or an infinity as the no-edge sentinel, but we don't assume anything
        # here. We just take whatever is in G[0][0] as the no-edge sentinel.
        self.no_edge = G[0][0]
        # Adjacency matrix for MST. Initially it has no edges. This will be
        # populated as the algorithm progresses and will be the output
        # of the algorithm.
        self.T: list[list[int]] = [
            [self.no_edge for i in range(self.n)] for j in range(self.n)
        ]
        # Initialize count of components field
        self.count_of_components: int = 0
        # Initialize component labels
        self.component_label: list[int] = [-1] * self.n
        # Initialie safe edges
        self.safe_edge: list[list[int, int]] = [None] * (self.n + 1)

    def _reachabilty(self, starting: int, graph: list[list[int]]) -> None:
        """Determines the vertices of the input graph reachable from the
        starting vertex."""
        # List of reachable vertices
        reach: list[int] = []
        # List of vertices to explore next, primed with the starting vertex
        visit_next: list[int] = [starting]
        # Process every vertex in the visit_next list until it's empty.
        while len(visit_next) > 0:
            # Take a vertex out of the visit_next list. This can be the first
            # vertex in the list (operating it as a queue), the last vertex
            # (operating it as a stack), or any vertex with damn well please
            # (operating it as we like). All things being equal, stack is fine.
            vertex: int = visit_next.pop()
            if vertex not in reach:
                # Add this vertex to the reachable list
                reach.append(vertex)
                # Plan to visit this vertex's neighbors next
                for neighbor in range(self.n):
                    if graph[neighbor][vertex] != self.no_edge:
                        visit_next.append(neighbor)
        return reach  # Done!

    def _count_and_label(self) -> None:
        """Counts the components of the candidate MST (self.T) and labels its
        vertices with the component they belong to. For every vertex in the graph,
        track its component. The component label is the count value for each component
        when it was discovered. For example, if there are 3 components in the graph,
        the vertices in the first component will be labeled 1, those in the second
        component will be labeled 2, and those in the third component will be labeled 3.
        """
        # Count of components in the candidate MST
        self.count_of_components = 0
        # The component label for each vertex. Initialized to -1, meaning
        # that no vertex has been labeled yet.
        self.component_label = [-1] * self.n
        # Remember the vertices we've seen so far. This is to avoid
        # double counting components.
        visited: list[int] = []
        # Process every vertex. If we haven't seen it before, it means
        # we've just found a new component.
        for vertex in range(self.n):
            if vertex not in visited:
                # We just found a new component, update the count. This count value
                # becomes the label for all vertices in this component.
                self.count_of_components += 1
                # Find everything we can reach from this vertex, because they'll be
                # in the same component
                reachable_from_vertex = self._reachabilty(vertex, self.T)
                # Add these reachable vertices to those visited, because
                # they are all in the same component and we don't want to
                # process them in the future.
                visited.extend(reachable_from_vertex)
                # The current component count becomes the label of all
                # these vertices
                for v in reachable_from_vertex:
                    self.component_label[v] = self.count_of_components
        # Done

    def boruvka(self):
        """The algorithm itself!"""
        self._count_and_label()
        while self.count_of_components > 1:
            self.safe_edge = [None] * (self.n + 1)
            for u in range(self.n):
                for v in range(u + 1, self.n):
                    if self.component_label[u] != self.component_label[v]:
                        self._find_safe_edge(u, v)
                        self._find_safe_edge(v, u)
            # Add all the safe edges to the candidate MST
            self._update_safe_edges()
            # Recount the components and re-label the vertices
            self._count_and_label()
        return self.T  # Done!

    def _find_safe_edge(self, s: int, t: int) -> None:
        """Find the safe edge for the component of vertex s, considering edge (s,t)"""
        if self.safe_edge[self.component_label[s]] is None:
            self.safe_edge[self.component_label[s]] = [s, t]
        else:
            [a, b] = self.safe_edge[self.component_label[s]]
            if self.G[s][t] < self.G[a][b]:
                self.safe_edge[self.component_label[s]] = [s, t]

    def _update_safe_edges(self) -> None:
        """Add all the safe edges to the candidate MST"""
        for edge in self.safe_edge:
            if edge is not None:
                [u, v] = edge
                self.T[u][v] = self.G[u][v]
                self.T[v][u] = self.G[v][u]

In [6]:
# TEST CODE

_ = float("inf")

graph = [
    [_, _, _, 5, 1, _],
    [_, _, 20, 5, _, 10],
    [_, 20, _, 10, _, _],
    [5, 5, 10, _, _, 15],
    [1, _, _, _, _, 20],
    [_, 10, _, 15, 20, _],
]

mst = MST_OO(graph)
mst.boruvka()
T = mst.T
weight = 0
for i in range(len(T)):
    for j in range(i + 1, len(T)):
        if T[i][j] != _:
            weight += T[i][j]
print("Weight of MST is", weight) # expect "Weight of MST is 31"

Weight of MST is 31
