---
title: "LC: Connected Sinks and Sources"
author: "Vahram Poghosyan"
date: "2022-01-23"
categories: ["Leetcode", "Algorithms", "Data Structures", "DFS"]
format:
  html:
    code-fold: true
jupyter: python3
include-after-body:
  text: |
    <script type="application/javascript" src="../../javascript/light-dark.js"></script>
---

## Problem statement

We are given a pipe system represented by a 2D rectangular grid of cells. There are three different types of items located in the cells within the grid, with each having either no items or 1 item:

* **Source:** There is 1 source in the system. It is represented by the asterisk character `*`.
* **Sinks:** There are an arbitrary number of sinks in the system. They are each represented by a different uppercase letter (`A`, `B`, `C`, etc.).
* **Pipes:** There are 10 different shapes of pipes, represented by the following characters: `═`, `║`, `╔`, `╗`, `╚`, `╝`, `╠`, `╣`, `╦`, `╩`

Note that: 

* Each pipe has openings on 2 or 3 sides of its cell.
* Two adjacent cells are connected if both have a pipe opening at their shared edge.
* We should treat the source and sinks as having pipe openings at all of their edges. For example, the two cells `A╦` are connected through their shared edge, but the two cells `B╔` are not.
* A sink may be connected to the source through another sink. For example, in the simple pipe system `*A═B═C`, all three sinks are connected to the source.

Our objective is to write a function that determines which sinks are connected to the source in a given pipe system.

As an example, consider the following pipe system. 

**Formatted input**

```
*╗ ╦═A
 ╠═╝
 C ╚═B
```
In this pipe system, the source `*` is connected to sinks `A` and `C` but not `B`.

A system is specified by an input text file in the following un-formated way.

**Unformatted input**

```
*02
C10
╠11
╗12
═21
╚30
╝31
╦32
═40
═42
B50
A52
```

Note that each item is followed by its coordinate in the grid (the origin of the coordinate system is taken to be the lower left corner (`0`,`0`) corresponding to an empty space in this system).

## Solution

### Parsing the input

Let's define a function `load_file` that loads the input file and returns the contents as a list of strings.

In [None]:
#| code-fold: false
def load_file(filePath: str) -> list[str]:
    with open(filePath, 'r') as file:
        return file.readlines()

::: {.callout-tip title="Note" appearance="minimal" collapse="false"}
We use the keyword `with` rather than `try`-`finally` to leverage built in Python file IO safety features.
:::

Let's save the output of `load_file` as the `input` we'll provide to `parse_input` (a function we'll define later). Running `load_file` for the example file above obtains:

In [None]:
#| code-fold: false
input = load_file("./problem_assets/connected_sinks_and_sources/example1.txt")
print(input[:5]) # Printing only the five leading entries of the input for brevity

['*02\n', 'C10\n', '╠11\n', '╗12\n', '═21\n']


As we can see the lines include the newline character, We can further clean up `input` by using `map` and [rstrip](https://docs.python.org/3/library/stdtypes.html#str.rstrip) which strips the right-end of each line of any undesired characters. We can modify the definition of `load_file` to return the cleaned up input.

In [None]:
#| code-fold: false
def load_file(filePath: str) -> list[str]:
    with open(filePath, 'r') as file:
        return [line.rstrip("\n") for line in file.readlines()]

Let's run this re-defined `load_file` for the example file again to see the difference.

In [None]:
#| code-fold: false
input_ex1 = load_file("./problem_assets/connected_sinks_and_sources/example1.txt")
print(input_ex1[:5])

['*02', 'C10', '╠11', '╗12', '═21']


### Defining an `Item` object

But we're still dealing with simple string representations... That means it's not immediately clear how to obtain the **open edges** of, say, the `╚` character programmatically. Let's parse this list down into a data structure we will call `Item`.

`Item` will have Boolean instance variables:

* `left`
* `right`
* `top`
* `bottom`

Which will indicate whether the corresponding edge is open or closed (`True` or `False` respectively).

It will also have instance variables `x` and `y` for the item's coordinates. An additional `type` instance variable with possible values in $\{$`Source`,`Pipe`,`Sink`$\}$. This may be helpful for checking edge conditions later on.

Here's the implementation of the `Item` class. Since it's quite long and trivial, I've collapsed the definition. Feel free to expand and examine...


In [None]:
class Item:
    def __init__(
            self, 
            type: str = "Source",
            ascii: str = "*",
            coords: tuple[int, int] = (0,0),
            left: bool = True,
            right: bool = True,
            top: bool = True,
            bottom: bool = True,
        ):
        self.type = type
        self.ascii = ascii
        self.coords = coords
        self.left = left
        self.right = right
        self.top = top
        self.bottom = bottom
    
    def __repr__(self): # This is a custom representation method for the Item class
        # Prepare all fields
        fields = [
            f"type: {self.type}",
            f"ascii: {self.ascii}",
            f"coordinates: {self.coords}",
            "edges:",
            f"  left: {self.left}",
            f"  right: {self.right}",
            f"  top: {self.top}",
            f"  bottom: {self.bottom}"
        ]
        # Find the longest line
        max_length = max(len(line) for line in fields)
        # Add extra space for the class name
        title = f" {type(self).__name__} "
        max_length = max(max_length, len(title))
        # Create the box
        bottom = "+" + "-" * (max_length + 2) + "+"
        title_line = f"+{title:-^{max_length + 2}}+"
        # Build the output
        lines = [title_line]
        # Add each field with padding
        for field in fields:
            padding = " " * (max_length - len(field))
            lines.append(f"| {field}{padding} |")
        lines.append(bottom)
        return "\n".join(lines)

Unfortunately, there's no way to get around the hard-coding of the ASCII characters into `Item`-s. Here's an implementation of `map_to_items`. It's quite long and trivial, but feel free to expand and examine the implementation...

In [None]:
def map_to_items(input: list[str]) -> list[Item]: # Converts the text input to a list of Item objects
    def to_item(line: str) -> Item: # Converts a single line in the input to an Item
        objectToReturn = Item() # Initialize an empty object
        objectToReturn.ascii = line # We will also print the unconverted ASCII representation of the object for debugging purposes
        objectToReturn.coords = tuple([int(coord) for coord in line[1:]]) # Extract the coordinates as a tuple of integers
        objectToReturn.left = True # Initialize all edges to open
        objectToReturn.right = True
        objectToReturn.top = True
        objectToReturn.bottom = True
        # Match the first character of the line to determine the type of object
        if line[0] == "*": # Match the first character of the line to determine the type of object
            objectToReturn.ascii = "*"
            # The default object is a source, '*', at (0,0), so nothing else needs to be changed in this case...
        elif line[0] == "═":
            # Note: edges are ordered [top, right, bottom, left]
            objectToReturn.ascii = "═"
            objectToReturn.type = "Pipe"
            objectToReturn.top = False
            objectToReturn.bottom = False
        elif line[0] == "║":
            objectToReturn.ascii = "║"
            objectToReturn.type = "Pipe"
            objectToReturn.left = False
            objectToReturn.right = False
        elif line[0] ==  "╔":
            objectToReturn.ascii = "╔"
            objectToReturn.type = "Pipe"
            objectToReturn.top = False
            objectToReturn.left = False
        elif line[0] ==  "╗":
            objectToReturn.ascii = "╗"
            objectToReturn.type = "Pipe"
            objectToReturn.top = False
            objectToReturn.right = False
        elif line[0] == "╚":
            objectToReturn.ascii = "╚"
            objectToReturn.type = "Pipe"
            objectToReturn.left = False
            objectToReturn.bottom = False
        elif line[0] == "╝":
            objectToReturn.ascii = "╝"
            objectToReturn.type = "Pipe"
            objectToReturn.right = False
            objectToReturn.bottom = False
        elif line[0] == "╠":
            objectToReturn.ascii = "╠"
            objectToReturn.type = "Pipe"
            objectToReturn.left = False
        elif line[0] == "╣":
            objectToReturn.ascii = "╣"
            objectToReturn.type = "Pipe"
            objectToReturn.right = False
        elif line[0] == "╦":
            objectToReturn.ascii = "╦"
            objectToReturn.type = "Pipe"
            objectToReturn.top = False
        elif line[0] == "╩":
            objectToReturn.ascii = "╩"
            objectToReturn.type = "Pipe"
            objectToReturn.bottom = False
        else: # The case of a sink
            if not line[0].isalpha(): # Check if the first character is a letter at all
                raise ValueError("The first character of a sink must be a letter.")
            objectToReturn.ascii = line[0]
            objectToReturn.type = "Sink"
        return objectToReturn
            
    return list(map(to_item, input))

Let's run `input` through `map_to_items` to obtain the programmatic representation of the items in our pipe system. We show only the first five from the output in the interest of brevity.

In [None]:
#| code-fold: false
parsed_input_ex1 = map_to_items(input_ex1)
for obj in parsed_input_ex1[:3]:
    print(obj)

+------- Item --------+
| type: Source        |
| ascii: *            |
| coordinates: (0, 2) |
| edges:              |
|   left: True        |
|   right: True       |
|   top: True         |
|   bottom: True      |
+---------------------+
+------- Item --------+
| type: Sink          |
| ascii: C            |
| coordinates: (1, 0) |
| edges:              |
|   left: True        |
|   right: True       |
|   top: True         |
|   bottom: True      |
+---------------------+
+------- Item --------+
| type: Pipe          |
| ascii: ╠            |
| coordinates: (1, 1) |
| edges:              |
|   left: False       |
|   right: True       |
|   top: True         |
|   bottom: True      |
+---------------------+


## Path finding (DFS)

This is a path finding problem, so we're likely going to use either BFS or DFS. Since it's easier to represent the given data as a 2D array, instead of converting it into a graph representation, we will implement DFS on the 2D array itself.

Here's the graph representation of the pipe system above, for equivalency's sake (although we won't be using graphs).

```{mermaid}
   graph TD;
      A["* (0,2)"] --> B["╗ (1,2)   "];
      B-->C["╠ (1,1)"];
      C-->D["C (1,0)"];
      C-->E["═ (2,1)"];
      E-->F["╝ (3,1)"];
      F-->G["╦ (3,2)"];
      G-->H["═ (4,2)"];
      H-->K["A (5,0)"];
      L["╚ (3,0)"]-->M["═ (4,0)"];
      M-->N["B (5,0)"];
```

<br></br>

As we can see, we have two disjoint acyclic graphs representing the example pipe system above (ignore the edge directions, [Mermaid](https://mermaid.js.org/intro/) is cool but pretty limited).

In general, because BFS traverses a graph one level at a time, we tend to use BFS when looking for the shortest path between two nodes. The first time BFS lands on the target node constitutes the shortest path to that node (or one of the multiple such paths if there's a tie between several paths). DFS is better suited for finding *any* valid path between the source `*` and a sink, and not necessarily the shortest such path, we can use DFS for this problem.

### Expanding the input into a 2D matrix

BFS can be implemented on a 2D array just as it can on a graph. Graphs are nothing more, really, than [adjacency lists](https://en.wikipedia.org/wiki/Adjacency_list). But first, we need to expand the input into such a 2D matrix of `Items`. To maintain align the directions of the neighbors of a given item to the formatted input, we need to expand the input in a particular way. We want `grid[0][2]` to refer to the source `* (0,2)`.

That means the `grid` should be something like:

```
[[None, None, *],[C,╠,╗], ...]
```
Then the 1st argument to `grid` will point to the 1st subarray, and the 2nd argument will point to the sink itself. In other words, the first argument refers to the columns of the formatted input, and the second to the rows. The empty space above `* (0, 2)`, in the formatted input, will then be referenced by `grid[0][3]`. 

In [None]:
#| code-fold: false
def expand_parsed_input(input: list[Item]) -> list[list[Item]]:
    # Find the maximum x and y coordinates occuring the given input
    max_rows  = 0
    max_cols = 0
    for item in input:
        if item.coords[0] > max_cols:
            max_cols = item.coords[0]
        if item.coords[1] > max_rows:
            max_rows = item.coords[1]
    # Initialize the 2D matrix with 'None' types
    grid = [[None for _ in range(max_rows + 1)] for _ in range(max_cols + 1)]
    # Populate the 2D matrix with the 'Item'-s
    for item in input:
        grid[item.coords[0]][item.coords[1]] = item
    return grid

expanded_parsed_input_ex1 = expand_parsed_input(parsed_input_ex1)

The output of `expand_input` contains `Item` types and `None` types (for empty cells).

Before we implement DFS, let's define a few helper functions that check if two `Item`-s are connected by the relevant edge.

### Helper functions to determine connectedness at an edge

In [None]:
#| code-fold: false
def connected_at_top(item1: Item, item2: Item) -> bool:
    return item1.top and item2.bottom

def connected_at_bottom(item1: Item, item2: Item) -> bool:
    return item1.bottom and item2.top

def connected_at_left(item1: Item, item2: Item) -> bool:
    return item1.left and item2.right

def connected_at_right(item1: Item, item2: Item) -> bool:
    return item1.right and item2.left

We're going to need these helper functions to check if two `Item`-s are connected by an edge before we add them to the DFS stack (to visit).

### DFS Implementation

Finally, let's implement iterative DFS using a stack.

In [None]:
#| code-fold: false
# Iterative version using a stack
def dfs_iterative(grid, start_x, start_y):
    # Obtain the dimensions of the grid
    # Note: grid = [[None, None, *],[C,╠,╗], ...] 
    # 1st index spans the subarrays 
    # 2nd index spans the items themselves 
    dim_cols, dim_rows = len(grid), len(grid[0])

    # Initialize the visited set
    visited = set()
    # Initialize the stack with the starting position
    stack = [(start_x, start_y)]
    
    while stack:
        x, y = stack.pop()
        
        if (x, y) in visited:
            continue
        if grid[x][y].type == "Sink":
            print(f"Found connected sink: {grid[x][y].ascii}")

        visited.add((x, y))
        # print(f"Visiting: {grid[x][y].ascii} ({x}, {y})") <-- Enable for debugging
        
        # Add only the eligible neighbors to the stack 
        # Eligible neighbors are those that:
        # 1. Are not outside the grid 
        # 2. Are not 'None' 
        # 3. Are connected to the current item by a corresponding edge
        # Check the top neighbor
        if y + 1 >= 0 and y + 1 <= dim_rows - 1 and grid[x][y + 1] is not None:
            if connected_at_top(grid[x][y], grid[x][y + 1]):
                stack.append((x, y + 1))
        # Check the bottom neighbor
        if y - 1 >= 0 and y - 1 <= dim_rows - 1 and grid[x][y - 1] is not None: 
            if connected_at_bottom(grid[x][y], grid[x][y - 1]):
                stack.append((x, y - 1))
        # Check the right neighbor
        if x + 1 >= 0 and x + 1 <= dim_cols - 1 and grid[x + 1][y] is not None:
            if connected_at_right(grid[x][y], grid[x + 1][y]):
                stack.append((x + 1, y))
        # Check the left neighbor
        if x - 1 >= 0 and x - 1 <=  dim_cols - 1 and grid[x - 1][y] is not None:
            if connected_at_left(grid[x][y], grid[x - 1][y]):
                stack.append((x - 1, y))

::: {.callout-tip title="Note" appearance="minimal" collapse="false"}
When doing iterative graph traversal with a heap or a stack, it's always a good idea to print the associated data structure (namely the heap or the stack) at the beginning of each iteration as a debugging tactic.
:::

Finally, let's run the DFS on the expanded input, supplying the coordinates of the source `*` as the starting point.

In [None]:
#| code-fold: false
dfs_iterative(expanded_parsed_input_ex1, 0, 2)

Found connected sink: A
Found connected sink: C


There we have it! 

## Verifying solution 

Now let's try it on a slightly altered input. This time, we have redirected the pipes so that only `B` and `C` are connected to the source. 

**Formatted input**
```
*╗ ╦═A
 ╠═╦
 C ╚═B
```

**Unformatted input**

```
*02
C10
╠11
╗12
═21
╚30
╦31
╦32
═40
═42
B50
A52
```

In [None]:
#| code-fold: false
input_ex2 = load_file("./problem_assets/connected_sinks_and_sources/example2.txt")
parsed_input_ex2 = map_to_items(input_ex2)
expanded_parsed_input_ex2 = expand_parsed_input(parsed_input_ex2)
dfs_iterative(expanded_parsed_input_ex2, 0, 2)

Found connected sink: B
Found connected sink: C


## Visualization of the DFS algorithm using Manim

Let's use Manim (a mathematical animation library), to visualize the DFS algorithm on the matrix for fun!

We start by importing the cell magic `%%manim`, then modify and insert our `dfs_iterative` implementation into a Manim scene. The Manim code itself is collapsed, as it's not the focus of this post.


In [16]:
import manim

In [None]:
%%manim -qh ConnectedSinksAndSourcesDemo

from manim import *

class ConnectedSinksAndSourcesDemo(Scene):
    def construct(self):    
        # Create table
        sinks_and_sources=[["*","╗","","╦","═","A"],["","╠","═","╝","",""],["","C","","╚","═","B"]]
        table = Table(sinks_and_sources, include_outer_lines=True)
        self.play(Write(table))
        # The DFS algorithm still requires the original `expanded_parsed_input_ex1` to work
        self.dfs_iterative(expanded_parsed_input_ex1, table, 0, 2)
        self.wait(5)


    # Custom Manim cell highlighter...
    def highlight_cell(self, manim_table, row, col, color=GREEN):
        cell = manim_table.get_cell((row, col))
        background = Rectangle(
            width=cell.width,
            height=cell.height,
            fill_color=color,
            fill_opacity=0.5,
            stroke_width=0
        )
        background.move_to(cell)
        self.play(FadeIn(background))

    # Same DFS algorthm as before, but modified to inject Manim effects...
    def dfs_iterative(self, grid, manim_table, start_x, start_y):
        dim_cols, dim_rows = len(grid), len(grid[0])

        # Initialize the visited set
        visited = set()
        # Initialize the stack with the starting position
        stack = [(start_x, start_y)]
        
        while stack:
            x, y = stack.pop()
            
            if (x, y) in visited:
                continue

            visited.add((x, y))

            # In Manim, tables are indexed from 1. 
            # (x,y) = (0,2) - corresponds to -> maninm_table(1,1)              
            self.highlight_cell(manim_table, 3-y, x+1)
                    
            if y + 1 >= 0 and y + 1 <= dim_rows - 1 and grid[x][y + 1] is not None:
                if connected_at_top(grid[x][y], grid[x][y + 1]):
                    stack.append((x, y + 1))
            # Check the bottom neighbor
            if y - 1 >= 0 and y - 1 <= dim_rows - 1 and grid[x][y - 1] is not None: 
                if connected_at_bottom(grid[x][y], grid[x][y - 1]):
                    stack.append((x, y - 1))
            # Check the right neighbor
            if x + 1 >= 0 and x + 1 <= dim_cols - 1 and grid[x + 1][y] is not None:
                if connected_at_right(grid[x][y], grid[x + 1][y]):
                    stack.append((x + 1, y))
            # Check the left neighbor
            if x - 1 >= 0 and x - 1 <=  dim_cols - 1 and grid[x - 1][y] is not None:
                if connected_at_left(grid[x][y], grid[x - 1][y]):
                    stack.append((x - 1, y))

Here’s DFS on the grid visualized using Manim.

::: {#fig-sinks-and-sources-manim}
![Connected Sinks and Sources Demo](./media/videos/leetcode/1080p60/ConnectedSinksAndSourcesDemo.mp4)
:::

## Conclusion

We explored how to use iterative DFS on a matrix to solve the connected sinks and sources problem. From here, we can explore other graph problems or learn about game development and, perhaps, turn this rudimentary algorithm into a fun game mechanic?