In [105]:
from collections import deque
from dataclasses import dataclass
import re
from typing import List, Mapping

input_rates = {}
input_neighbours = {}
with open("input") as f:
    for line in f.readlines():
        match = re.match(r"Valve ([A-Z][A-Z]) has flow rate=(\d+); tunnels? leads? to valves? (.*)", line)
        valve = match[1]
        rate = int(match[2])
        tunnels = [t.strip() for t in match[3].split(",")]
        input_rates[valve] = rate
        input_neighbours[valve] = tunnels
        print(f"{valve=}, {rate=}, {tunnels=}")
print()
# Find shortest paths. cost_to_go[x][y] is the cost to open y from x.
def find_costs(src: str, rates: Mapping[str, int], neighbours: Mapping[str, List[str]]):
    """Finds costs to open positive-rate valves from `src`.
    """
    costs = {}
    frontier = deque()
    frontier.append([src])
    visited = set()
    while len(frontier):
        cur_path = frontier.popleft()
        cur = cur_path[-1]
        if cur not in costs and rates[cur] > 0:
            costs[cur] = len(cur_path)
        for n in neighbours[cur]:
            if n not in visited:
                visited.add(n)
                frontier.append(cur_path + [n])
    return costs
def find_all_paths_costs(rates, neighbours):
    cost_to_go = {}
    for valve, rate in rates.items():
        if rate > 0 or valve == "AA":
            cost_to_go[valve] = find_costs(valve, rates, neighbours)
    return cost_to_go
input_cost_to_go = find_all_paths_costs(input_rates, input_neighbours)

rates = input_rates
neighbours = input_neighbours
cost_to_go = input_cost_to_go

valve='AA', rate=0, tunnels=['BB', 'CC']
valve='BB', rate=13, tunnels=['AA']
valve='CC', rate=2, tunnels=['AA', 'DD']
valve='DD', rate=20, tunnels=['CC']



Do a depth-first search for the highest total flow by the time limit of 26 minutes running out.

In [142]:
@dataclass
class Agent:
    dest_valve: str
    time_to_dest: int

@dataclass
class State:
    elf: Agent
    elph: Agent
    opened: List[str]
    rate: int
    total_flow: int
    time_remaining: int
    
    def __eq__(self, other):
        return (
            self.elf == other.elf and
            self.elph == other.elph and
            set(self.opened) == set(other.opened) and
            self.rate == other.rate and
            self.total_flow == other.total_flow and
            self.time_remaining == other.time_remaining
        )

In [170]:
def expand_state(state: State):
    """Finds the next states that are possible from `state`.
    
    Pre-condition: The state has at least one agent that needs a new valve to go to.
    Post-condition: All states returned have time_remaining >= 0.
    """
    states = []


    # Case 1: Both need to go somewhere, selet unique ones.
    if state.elf.time_to_dest == 0 and state.elph.time_to_dest == 0:
        elf_costs = cost_to_go[state.elf.dest_valve]
        elph_costs = cost_to_go[state.elph.dest_valve]
        for elf_neighbour, elf_cost in elf_costs.items():
            if elf_neighbour in state.opened:
                continue
            for elph_neighbour, elph_cost in elph_costs.items():
                if elph_neighbour in state.opened:
                    continue
                if elf_neighbour == elph_neighbour:
                    continue

                # Create a state for the closer one.
                # TODO: Will need to figure out what to do when this one is expanded.
                if elf_cost < elph_cost:
                    new_state = State(elf=Agent(elf_neighbour, 0),
                                      elph=Agent(elph_neighbour, elph_cost - elf_cost),
                                      rate = state.rate + rates[elf_neighbour],
                                      total_flow = state.total_flow + state.rate * elf_cost,
                                      time_remaining = state.time_remaining - elf_cost,
                                      opened = state.opened + [elf_neighbour])
                    states.append(new_state)
                elif elph_cost < elf_cost:
                    new_state = State(elf=Agent(elf_neighbour, elph_cost),
                                      elph=Agent(elph_neighbour, 0),
                                      rate = state.rate + rates[elph_neighbour],
                                      total_flow = state.total_flow + state.rate * elph_cost,
                                      time_remaining = state.time_remaining - elph_cost,
                                      opened = state.opened + [elph_neighbour])
                    states.append(new_state)
                else:
                    time_remaining = state.time_remaining - elf_cost

                    if time_remaining < 0:
                        # TODO: Update this
                        new_state = State(current=neighbour,
                                        opened=state.opened + [neighbour],
                                        rate=state.rate + rates[neighbour],
                                        total_flow=state.total_flow + state.rate * cost,
                                        time_remaining=time_remaining)
                        states.append(new_state)
                    else:
                        states.append(State(elf=Agent(elf_neighbour, 0),
                                            elph=Agent(elph_neighbour, 0),
                                            rate=state.rate + rates[elf_neighbour] + rates[elph_neighbour],
                                            total_flow=state.total_flow + state.rate * elf_cost,
                                            time_remaining=state.time_remaining - elf_cost,
                                            opened=state.opened + [elf_neighbour, elph_neighbour]))
    elif state.elf.time_to_dest:
        # Find a new valve for elph to go to.
        elf_neighbour = state.elf.dest_valve
        for elph_neighbour, elph_cost in cost_to_go[state.elph.dest_valve].items():
            if elph_neighbour in state.opened or elph_neighbour == elf_neighbour:
                continue
            elf_cost = state.elf.time_to_dest
            if elf_cost < elph_cost:
                new_state = State(elf=Agent(elf_neighbour, 0),
                                  elph=Agent(elph_neighbour, elph_cost - elf_cost),
                                  rate = state.rate + rates[elf_neighbour],
                                  total_flow = state.total_flow + state.rate * elf_cost,
                                  time_remaining = state.time_remaining - elf_cost,
                                  opened = state.opened + [elf_neighbour])
                states.append(new_state)
            elif elph_cost < elf_cost:
                new_state = State(elf=Agent(elf_neighbour, elf_cost - elph_cost),
                                  elph=Agent(elph_neighbour, 0),
                                  rate = state.rate + rates[elph_neighbour],
                                  total_flow = state.total_flow + state.rate * elph_cost,
                                  time_remaining = state.time_remaining - elph_cost,
                                  opened = state.opened + [elf_neighbour])
                states.append(new_state)
            else:
                time_remaining = state.time_remaining - elf_cost
                states.append(State(elf=Agent(elf_neighbour, 0),
                                elph=Agent(elph_neighbour, 0),
                                rate=state.rate + rates[elf_neighbour] + rates[elph_neighbour],
                                total_flow=state.total_flow + state.rate * elf_cost,
                                time_remaining=state.time_remaining - elf_cost,
                                opened=state.opened + [elf_neighbour, elph_neighbour]))
    elif state.elph.time_to_dest:
        ...
    return states
expand_state(State(elf=Agent("AA", 0), elph=Agent("AA", 0), opened=[], rate=0, total_flow=0, time_remaining=26))

[State(elf=Agent(dest_valve='BB', time_to_dest=0), elph=Agent(dest_valve='DD', time_to_dest=1), opened=['BB'], rate=13, total_flow=0, time_remaining=24),
 State(elf=Agent(dest_valve='DD', time_to_dest=2), elph=Agent(dest_valve='BB', time_to_dest=0), opened=['BB'], rate=13, total_flow=0, time_remaining=24)]

# Case 2: Only one needs to expand a node

**Input**
```
Valve AA has flow rate=0; tunnels lead to valves BB, CC
Valve BB has flow rate=13; tunnel leads to valve AA
Valve CC has flow rate=0; tunnels lead to valves AA, DD
Valve DD has flow rate=15; tunnel leads to valve CC
```

**State**
```py
 State(elf=Agent(dest_valve='BB', time_to_dest=0),
          elph=Agent(dest_valve='DD', time_to_dest=1),
          opened=['BB'],
          rate=13,
          total_flow=0,
          time_remaining=24)
```

In [171]:
rates = {"AA": 0, "BB": 13, "CC": 0, "DD": 5}
neighbours = {"AA": ["BB", "CC"], "BB": ["AA"], "CC": ["AA", "DD"], "DD": ["CC"]}
cost_to_go = find_all_paths_costs(rates, neighbours)
expand_state(
    State(elf=Agent(dest_valve='DD', time_to_dest=1),
          elph=Agent(dest_valve='BB', time_to_dest=0),
          opened=['BB'],
          rate=13,
          total_flow=0,
          time_remaining=24))

[]

# End-to-End Test

**Input**
```
Valve AA has flow rate=0; tunnels lead to valves BB, CC
Valve BB has flow rate=13; tunnel leads to valve AA
Valve CC has flow rate=15; tunnel leads to valve AA
```
**Expected Output**

672

In [None]:
initial = State(elf="AA", elph="AA", opened=[], rate=0, total_flow=0, time_remaining=30)

for idx, state in expand_state(initial):
    print(f"{idx=}, {state=}")

frontier = deque()
frontier.append(initial)
greatest = 0
while len(frontier):
    state = frontier.pop()
    if state.time_remaining > 0:
        expand_state(state).map(frontier.append)
    else:
        greatest = max(greatest, state.total_flow)
print(f"{greatest=}")


# Run All Tests

In [150]:
# Case 1.1: Both expand to same distance
rates = {"AA": 0, "BB": 13, "CC": 15}
neighbours = {"AA": ["BB", "CC"], "BB": ["AA"], "CC": ["AA"]}
cost_to_go = find_all_paths_costs(rates, neighbours)
states = expand_state(State(elf=Agent("AA", 0), elph=Agent("AA", 0), opened=[], rate=0, total_flow=0, time_remaining=26))
assert len(states) == 2
assert State(elf=Agent('BB', 0), elph=Agent('CC', 0), opened=['BB', 'CC'], rate=28, total_flow=0, time_remaining=24) in states
assert State(elf=Agent('CC', 0), elph=Agent('BB', 0), opened=['BB', 'CC'], rate=28, total_flow=0, time_remaining=24) in states

# Case 1.2: One expands to closer one.
rates = {"AA": 0, "BB": 13, "CC": 0, "DD": 5}
neighbours = {"AA": ["BB", "CC"], "BB": ["AA"], "CC": ["AA", "DD"], "DD": ["CC"]}
cost_to_go = find_all_paths_costs(rates, neighbours)
states = expand_state(State(elf=Agent("AA", 0), elph=Agent("AA", 0), opened=[], rate=0, total_flow=0, time_remaining=26))

assert State(elf=Agent(dest_valve='BB', time_to_dest=0), elph=Agent(dest_valve='DD', time_to_dest=1), opened=['BB'], rate=13, total_flow=0, time_remaining=24) in states
assert State(elf=Agent(dest_valve='DD', time_to_dest=2), elph=Agent(dest_valve='BB', time_to_dest=0), opened=['BB'], rate=13, total_flow=0, time_remaining=24) in states
assert len(states) == 2
print("tests Pass")

[State(elf=Agent(dest_valve='BB', time_to_dest=0), elph=Agent(dest_valve='DD', time_to_dest=1), opened=['BB'], rate=13, total_flow=0, time_remaining=24),
 State(elf=Agent(dest_valve='DD', time_to_dest=2), elph=Agent(dest_valve='BB', time_to_dest=0), opened=['BB'], rate=13, total_flow=0, time_remaining=24)]