# [Advent of Code 2022 Day 11](https://adventofcode.com/2022/day/11)

I actually did do this before in the AOC 2020 (the bus routes question), so I came pretty prepared.

This is the second time CRT has come up. I'm going to steal it off the Internet and put it in my dirty AOC library.

Also TIL Python `f`-strings are eagerly evaluated. I had to go on StackOverflow to find a dirty `eval` trick to force deferred evaluation...

## Initial setup

In [1]:
import ipytest
import pytest
import sys
sys.path.append("..")
from ansi import *
from comp import *
ipytest.autoconfig()
PART_ONE_SENTINEL = 0x3f3f3f3f + 1
PART_TWO_SENTINEL = 0x3f3f3f3f + 2
run_doctest_for = lambda func: doctest.run_docstring_examples(func, globals())

## Test Cases
The gist of this question is optimization. Part 1 and part 2 are the same, just with a higher efficiency ask.

### Part 1

In [2]:
PART_ONE_CASES: dict[str, dict[str, str | int]] = {
    "example": {
        "example1": 10605,
    },
    "input": {
        "input1": 112221,
    },
}
PART_ONE_INPUTS: dict[str, dict[str, str | int]] = {
    key: {} for key in PART_ONE_CASES.keys()
}
PART_ONE_OUTPUTS: dict[str, dict[str, str | int]] = {
    key: {} for key in PART_ONE_CASES.keys()
}

### Part 2

In [3]:
PART_TWO_CASES: dict[str, dict[str, str | int]] = {
    "example": {
        "example1": 2713310158,
    },
    "input": {
        "input1": 25272176808,
    },
}
PART_TWO_INPUTS: dict[str, dict[str, str | int]] = {
    key: {} for key in PART_TWO_CASES.keys()
}
PART_TWO_OUTPUTS: dict[str, dict[str, str | int]] = {
    key: {} for key in PART_TWO_CASES.keys()
}

## Input Parsing
Using a Pydantic model for Monkey validation helped.

In [4]:
class Monkey(BaseModel):
    name: str
    queue: Deque = Field(default_factory=deque)
    actions: int = 0
    operation: list[str]
    test: int
    true_action: int
    false_action: int

def parse_input_from_filename(filename: str) -> Context:
    lines = list(yield_line(filename))

    ctx = Context()
    ctx.input = []

    input_lines = ctx.input
    monkey = {}

    for idx, line in enumerate(lines):
        line = line.strip()
        if line.startswith("Monkey"):
            monkey["name"] = parse(r"Monkey (\d+):", line)[0]
        elif line.startswith("Starting items"):
            monkey["queue"] = intsep(parse(r"Starting items: (.*)", line)[0], ",")
        elif line.startswith("Operation:"):
            monkey["operation"] = strsep(parse(r"Operation: new = (.*)", line)[0])
        elif line.startswith("Test: divisible by "):
            monkey["test"] = int(parse(r"Test: divisible by (\d+)", line)[0])
        elif line.startswith("If true: throw to monkey"):
            monkey["true_action"] = int(parse(r"If true: throw to monkey (\d+)", line)[0])
        elif line.startswith("If false: throw"):
            monkey["false_action"] = int(parse(r"If false: throw to monkey (\d+)", line)[0])
        elif line == "":  # Trim off
            input_lines.append(Monkey(**monkey))
            monkey = {}
        else:
            raise Exception(f"ayo {line}")

    input_lines.append(Monkey(**monkey))

    return ctx

### Test Parsing Examples

In [5]:
%%ipytest -xrPvvvvv
@pytest.mark.parametrize("test_file_name", PART_ONE_CASES["example"].keys() | PART_TWO_CASES["example"].keys())
def test_parsing_examples(test_file_name):
    for entity in parse_input_from_filename(test_file_name).input:
        log(f"{entity}")

[32m.[0m[32m                                                                                            [100%][0m
[32m[1m_________________________________ test_parsing_examples[example1] _________________________________[0m
-------------------------------------- Captured stderr call ---------------------------------------
name='0' queue=deque([79, 98]) actions=0 operation=['old', '*', '19'] test=23 true_action=2 false_action=3
name='1' queue=deque([54, 65, 75, 74]) actions=0 operation=['old', '+', '6'] test=19 true_action=2 false_action=0
name='2' queue=deque([79, 60, 97]) actions=0 operation=['old', '*', 'old'] test=13 true_action=1 false_action=3
name='3' queue=deque([74]) actions=0 operation=['old', '+', '3'] test=17 true_action=0 false_action=1
[32m[32m[1m1 passed[0m[32m in 0.02s[0m[0m


### Test Parsing Inputs

In [6]:
%%ipytest -xrPvvvvv
@pytest.mark.parametrize("test_file_name", PART_ONE_CASES["input"].keys() | PART_TWO_CASES["input"].keys())
def test_parsing_inputs(test_file_name):
    for entity in parse_input_from_filename(test_file_name).input:
        log(f"{entity}")

[32m.[0m[32m                                                                                            [100%][0m
[32m[1m___________________________________ test_parsing_inputs[input1] ___________________________________[0m
-------------------------------------- Captured stderr call ---------------------------------------
name='0' queue=deque([54, 98, 50, 94, 69, 62, 53, 85]) actions=0 operation=['old', '*', '13'] test=3 true_action=2 false_action=1
name='1' queue=deque([71, 55, 82]) actions=0 operation=['old', '+', '2'] test=13 true_action=7 false_action=2
name='2' queue=deque([77, 73, 86, 72, 87]) actions=0 operation=['old', '+', '8'] test=19 true_action=4 false_action=7
name='3' queue=deque([97, 91]) actions=0 operation=['old', '+', '1'] test=17 true_action=6 false_action=5
name='4' queue=deque([78, 97, 51, 85, 66, 63, 62]) actions=0 operation=['old', '*', '17'] test=5 true_action=6 false_action=3
name='5' queue=deque([88]) actions=0 operation=['old', '+', '3'] test=7 true_ac

## Helper Functions

### Get Math Operation
Should've used `eval`...

In [7]:
%%ipytest -xrPvvvvv
def get_math_op(op: str) -> Callable[[int, int], int]:
    """
    Takes a symbol representing a mathematical operation and returns an apply function implementing it.

    :param op: the operation string from the input
    :return: the apply function that implements this
    """
    if op == "+":
        return lambda x, y: x + y
    if op == "*":
        return lambda x, y: x * y
    if op == "/":
        return lambda x, y: x // y
    if op == "-":
        return lambda x, y: x - y
    raise NotImplementedError(f"Unimplemented operator string: {op}")

@pytest.mark.parametrize("arg, case", [("+", (1, 2, 3)), ("*", (3, 4, 12)), ("/", (12, 3, 4)), ("-", (34, 10, 24))])
def test_get_math_op(arg: str, case: tuple[int, int, int]) -> None:
    operation = get_math_op(arg)
    lvalue, rvalue, expected = case
    assert operation(lvalue, rvalue) == expected

[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                                         [100%][0m
[32m[32m[1m4 passed[0m[32m in 0.02s[0m[0m


### Get Monkey Operation
Really should've used `eval`...

In [8]:
%%ipytest -xrPvvvvv
def get_monkey_op(args: list[str]):
    """
    Takes the monkey's operation tokens and returns the relevant apply function.

    :param args: LVALUE OP RVALUE
    :return: apply function
    """
    lvalue, op, rvalue = args
    apply = get_math_op(op)
    if lvalue == "old" and rvalue == "old":
        return lambda old: apply(old, old)
    elif lvalue == "old" and rvalue != "old":
        return lambda old: apply(old, int(rvalue))
    elif lvalue != "old" and rvalue == "old":
        return lambda old: apply(int(lvalue), old)
    elif lvalue != "old" and rvalue != "old":
        return lambda old: apply(int(lvalue), int(rvalue))
    raise Exception(f"Bad configuration of {args=}")

def test_get_monkey_op() -> None:
    assert get_monkey_op(["old", "*", "old"])(10) == 100
    assert get_monkey_op(["old", "+", "old"])(10) == 20
    assert get_monkey_op(["old", "+", "20"])(10) == 30
    assert get_monkey_op(["50", "-", "20"])(1234554320) == 30

[32m.[0m[32m                                                                                            [100%][0m
[32m[32m[1m1 passed[0m[32m in 0.01s[0m[0m


### Monkey Processor
This will simulate the monkey's processing. It isn't idempotent as there's a side effect of incrementing the monkey's `action` counter.
In theory I should be able to just take the length of its queue and use that as the incrementer...

In [9]:
def process_monkey(monkey: Monkey, all_monkeys: list[Monkey], do_crt: bool = False) -> list[tuple[int, int]]:
    """
    Processes a monkey's queue, updating its action count and returning a list of its updated worry value requests.

    :param monkey: the monkey whose queue needs to be processed
    :param all_monkeys: list of all monkeys (used to extract pairwise moduli)
    :param do_crt: flag to determine whether to reduce all worry values using Chinese Remainder Theorem
    :return: a list of the monkey's requests after processing the operations
    """
    results = []
    moduli = tuple([monk.test for monk in all_monkeys])
    while monkey.queue:
        monkey.actions += 1
        popped_worry_value = monkey.queue.popleft()
        monkey_operation = get_monkey_op(monkey.operation)
        modified_worry_value = monkey_operation(popped_worry_value)
        final_worry_value = modified_worry_value // (1 if do_crt else 3)
        satisfied = final_worry_value % monkey.test == 0
        next_monkey = monkey.false_action if not satisfied else monkey.true_action
        if do_crt:
            residues = tuple([final_worry_value] * len(all_monkeys))
            final_worry_value = solve_chinese_remainder_theorem(residues, moduli)
        else:
            log(defer(f"Monkey inspects an item with a worry level of {popped_worry_value}"))
            log(defer(f"Worry level goes from {popped_worry_value} to {modified_worry_value}"))
            log(defer(f"Monkey gets bored with item. Worry level is divided by 3 to {final_worry_value}"))
            log(defer(f"Current worry level {'is' if satisfied else 'is not'} divisible by {monkey.test}"))
            log(defer(f"Item with worry level {final_worry_value} is thrown to monkey {next_monkey}"))
            log(defer(""))
        results.append((final_worry_value, next_monkey))
    return results

### Score Getter
Every time a monkey is processed, the function returns a list containing the monkey's newly-declared worry values, along with the next monkey to receive them.

In [10]:
def get_score_after_round(round_number: int, monkeys: list[Monkey], do_crt: bool = False) -> int:
    """
    Conducts `round_number` rounds (where each found, all monkeys have their queues processed).
    Then, returns the product of the two highest monkey action counts.

    :param round_number: how many rounds to conduct
    :param monkeys: the list of monkeys that are being processed
    :param do_crt: whether to perform Chinese Remainder Theorem
    :return: the product of the two most active monkeys' action counts
    """
    for _ in range(round_number):
        for monkey in monkeys:
            for new_worry_value, receiving_monkey_idx in process_monkey(monkey, monkeys, do_crt=do_crt):
                monkeys[receiving_monkey_idx].queue.append(new_worry_value)
    return prod(heapq.nlargest(2, [monkey.actions for monkey in monkeys]))

## Main Function
What's stopping me from just unconditionally doing CRT is that you don't divide by 3 in the unbounded version.

In [11]:
def solve(part: int, filename: str) -> int:
    monkeys = parse_input_from_filename(filename).input
    if part == 1:
        disable_logging()
        return get_score_after_round(20, monkeys)
    if part == 2:
        disable_logging()
        return get_score_after_round(10000, monkeys, do_crt=True)
    else:
        raise Exception(f"Invalid part: {part}")

## Execution

### Part 1

In [12]:
%%ipytest -xrPvvvvv
@pytest.mark.parametrize("test_file_name, test_expected_output", PART_ONE_CASES["example"].items())
def test_part_one_examples(test_file_name, test_expected_output):
    test_actual_output = solve(1, test_file_name)
    PART_ONE_OUTPUTS["example"][test_file_name] = test_actual_output
    failure_message = "Did you forget to calibrate the example test case?" if (
        test_expected_output == PART_ONE_SENTINEL
    ) else f"Failed example test case: expected {test_expected_output} but got {test_actual_output}"
    assert test_actual_output == test_expected_output, failure_message

@pytest.mark.parametrize("test_file_name, test_expected_output", PART_ONE_CASES["input"].items())
def test_part_one_inputs(test_file_name, test_expected_output):
    test_actual_output = solve(1, test_file_name)
    PART_ONE_OUTPUTS["input"][test_file_name] = test_actual_output
    failure_message = f"Candidate answer {test_actual_output} found" if (
        test_expected_output == PART_ONE_SENTINEL
    ) else f"Failed input test case: expected {test_expected_output} but got {test_actual_output}"
    assert test_actual_output == test_expected_output, failure_message

[32m.[0m[32m.[0m[32m                                                                                           [100%][0m
[32m[32m[1m2 passed[0m[32m in 0.15s[0m[0m


### Part 2

In [13]:
%%ipytest -xrPvvvvv
@pytest.mark.parametrize("test_file_name, test_expected_output", PART_TWO_CASES["example"].items())
def test_part_two_examples(test_file_name, test_expected_output):
    test_actual_output = solve(2, test_file_name)
    PART_TWO_OUTPUTS["example"][test_file_name] = test_actual_output
    failure_message = "Did you forget to calibrate the example test case?" if (
            test_expected_output == PART_TWO_SENTINEL
    ) else f"Failed example test case: expected {test_expected_output} but got {test_actual_output}"
    assert test_actual_output == test_expected_output, failure_message

@pytest.mark.parametrize("test_file_name, test_expected_output", PART_TWO_CASES["input"].items())
def test_part_two_inputs(test_file_name, test_expected_output):
    test_actual_output = solve(2, test_file_name)
    PART_TWO_OUTPUTS["input"][test_file_name] = test_actual_output
    failure_message = f"Candidate answer {test_actual_output} found" if (
            test_expected_output == PART_TWO_SENTINEL
    ) else f"Failed input test case: expected {test_expected_output} but got {test_actual_output}"
    assert test_actual_output == test_expected_output, failure_message

[32m.[0m[32m.[0m[32m                                                                                           [100%][0m
[32m[32m[1m2 passed[0m[32m in 4.04s[0m[0m


Not too bad. Definitely room for optimization.