## Advent of Code - Day 7

In [6]:
import tempfile
from contextlib import contextmanager

In [7]:
@contextmanager
def test_file(test_input):
    with tempfile.NamedTemporaryFile('r+') as f:
        f.write(test_input)
        f.seek(0)
        yield f

### Part 1

Idea: Build a graph to encode the ordering of steps. Keep a priority queue of all executable steps sorted alphabetically by step name. Initialise the priority queue with steps that have no requirements. Whilst the priority queue is not empty extract the next step, execute it, and reduce the parent count for each child by 1. If the parent count for a given child reaches 0, add the child to the priority queue. 

We'll use an adjacency list representation for the graph under the assumption that it is sparse. For ease of implementation we'll also assume that the instructions are well-formed and do not contain duplicate requirements (edges) or circular dependencies (something that could be checked by doing a DFS search first!). Building the graph is $O(m)$ where $m$ is the number of edges (number of lines in input file).

All that remains is to iterate over the priority queue until it is empty. Finding the initial steps is $O(n)$. Each step is inserted once and extracted once from the priority queue making this have a complexity of $O(n\log n)$. During the process each edge is iterated over once as the children of each step are looped over - this is $O(m)$. Hence the total complexity is $O(n\log n + m)$.

With a plan in place lets begin implementing it! Beginning reading each edge from the instruction input file:

In [133]:
import heapq
import re
from collections import UserDict

In [134]:
def get_edges(path):
    """Yields each edge (parent, child) requirement for the graph stored at path."""
    pattern = re.compile('^Step ([A-Z]+) must be finished before step ([A-Z]+) can begin.\n?$')
    with open(path, 'r') as f:
        for requirement in f:
            match = pattern.match(requirement)
            if not match:
                raise ValueError('invalid instruction file: %r' % path)
            yield match.groups()

In [135]:
test_input = """Step C must be finished before step A can begin.
Step C must be finished before step F can begin.
Step A must be finished before step B can begin.
Step A must be finished before step D can begin.
Step B must be finished before step E can begin.
Step D must be finished before step E can begin.
Step F must be finished before step E can begin.
"""

In [136]:
with test_file(test_input) as f:
    for edge in get_edges(f.name): 
        print(edge)

('C', 'A')
('C', 'F')
('A', 'B')
('A', 'D')
('B', 'E')
('D', 'E')
('F', 'E')


Now we can build the graph. Start by defining a node in the graph:

In [142]:
class Step:
    """A single Step in a sequence of instructions.
    
    Args:
        name (str): Name of the Step.
    
    Attributes:
        name (str): See Args.
        children (list of Step): Steps that can only be performed
            once this Step is complete.
        n_parent (int): Number of Steps still to perform before
            this Step can be executed.
    """
    
    def __init__(self, name):
        self.name = name
        self.children = []
        self.n_parent = 0
        
    def add_child(self, child):
        self.children.append(child)
        child.n_parent += 1
        
    def __lt__(self, other):
        return self.name < other.name
    
    def __repr__(self):
        return 'Step(name=%r, children=%r, n_parent=%r)' % (
            self.name, [c.name for c in self.children], self.n_parent
        )

And then building up adjacency list structure:

In [143]:
class Instructions(UserDict):
    """A set of Steps."""
    
    def get_step(self, step_name):
        """Returns the step at `step_name`. 
        
        The Step will be created if not exist.
        """
        try:
            step = self.data[step_name]
        except KeyError:
            step = Step(step_name)
            self.data[step_name] = step
        return step


def build_instructions(edges):
    """Returns an `Instructions` instance for steps specified by `edges`."""
    instructions = Instructions()
    for parent, child in edges:
        child_step = instructions.get_step(child)
        instructions.get_step(parent).add_child(child_step)
    return instructions

In [144]:
with test_file(test_input) as f:
    instructions = build_instructions(get_edges(f.name))
    for step in instructions.values():
        print(step)

Step(name='A', children=['B', 'D'], n_parent=1)
Step(name='C', children=['A', 'F'], n_parent=0)
Step(name='F', children=['E'], n_parent=1)
Step(name='B', children=['E'], n_parent=1)
Step(name='D', children=['E'], n_parent=1)
Step(name='E', children=[], n_parent=3)


Matches the example graph! Now to execute the instructions:

In [159]:
def execute(instructions):
    """Returns a list of step names in execution order."""
    heap = []
    
    # find steps with no requirements
    for step in instructions.values():
        if step.n_parent == 0:
            heapq.heappush(heap, step)

    step_order = []
    while heap:
        step = heapq.heappop(heap)
        step_order.append(step.name)
        for child in step.children:
            child.n_parent -= 1
            if child.n_parent == 0:
                heapq.heappush(heap, child)
        
    return step_order

In [163]:
with test_file(test_input) as f:
    instructions = build_instructions(get_edges(f.name))
    step_order = ''.join(execute(instructions))
    print(step_order, step_order == "CABDFE")

CABDFE True


Now to run on the real input:

In [164]:
instructions = build_instructions(get_edges('input'))
print(''.join(execute(instructions)))

LFMNJRTQVZCHIABKPXYEUGWDSO


### Part 2

We augment the `execute` function above with an additional priority queue, called `exec_heap` that keeps track of steps that are in execution (i.e. an elf has been assigned to that step). `exec_heap` stores a `[execution time remaining, step]` pair and is ordered by `execution time remaining`. There can be at most `n_elves` entries in `exec_heap` at any one time. Tasks are moved into the `exec_heap` from the original `heap`, now renamed `todo_heap`, when an elf is available. 

We avoid ticking in one second increments by fast-forwarding time to the next to complete entry in `exec_heap`. 

In [222]:
def execute_time(step, base_time):
    """Returns the time required for step to be completed."""
    return ord(step.name) - ord('A') + 1 + base_time

In [223]:
def timed_execute(instructions, base_time, n_elves):
    """Returns the total execution time and a list of (step name, completion time)."""
    todo_heap = []  # available to execute
    exec_heap = []  # being executed
    
    # find steps with no requirements
    for step in instructions.values():
        if step.n_parent == 0:
            heapq.heappush(todo_heap, step)

    total_time = 0
    step_order = []
    while todo_heap or exec_heap:
        if exec_heap:
            # fast forward time to complete next step
            exec_time, step = heapq.heappop(exec_heap)
            total_time += exec_time
            
            step_order.append((total_time, step.name))
            
            for i in range(len(exec_heap)):
                # OK as order is preserved
                exec_heap[i][0] -= exec_time
            
            # maybe add child steps to `todo_steps`
            for child in step.children:
                child.n_parent -= 1
                if child.n_parent == 0:
                    heapq.heappush(todo_heap, child)
                    
        # load up available elves with `todo_steps`, if any
        for _ in range(n_elves - len(exec_heap)):
            if len(todo_heap) == 0:
                # no steps available!
                break
                
            todo = heapq.heappop(todo_heap)
            heapq.heappush(exec_heap, [execute_time(todo, base_time), todo])
            
    return total_time, step_order

Test on example:

In [224]:
with test_file(test_input) as f:
    instructions = build_instructions(get_edges(f.name))
    print(timed_execute(instructions, base_time=0, n_elves=2))

(15, [(3, 'C'), (4, 'A'), (6, 'B'), (9, 'F'), (10, 'D'), (15, 'E')])


Looks good! Run on real input:

In [225]:
instructions = build_instructions(get_edges('input'))
total_time, _ = timed_execute(instructions, base_time=60, n_elves=5)
print(total_time)

1180
