In [1]:
from enum import Enum
from queue import Queue
from dataclasses import dataclass

class Pulse(str, Enum):
    LOW = "low"
    HIGH = "high"

    def flip(self):
        if self == Pulse.LOW:
            return Pulse.HIGH
        return Pulse.LOW

@dataclass()
class Message:
    from_name: str
    to_name: str
    pulse: Pulse

    def __repr__(self) -> str:
        return f"{self.from_name} -{self.pulse}-> {self.to_name}"


class Module:
    def __init__(self, name: str, outputs: list[str]) -> None:
        self.name = name
        self.outputs = outputs
    
    def prepare_messages(self, pulse: Pulse) -> list[Message]:
        return [Message(self.name, output, pulse) for output in self.outputs]

    def process_message(self, message: Message) -> list[Message]:
        # print(f"{self} processing message: {message}")
        # print(message)
        return self._process_message(message)
    
    def _process_message(self, message: Message) -> list[Message]:
        return []
    
    def __repr__(self):
        return f"{self.name}: {type(self).__name__} -> {', '.join(self.outputs)}"


class Broadcaster(Module):
    def _process_message(self, message: Message) -> list[Message]:
        return self.prepare_messages(message.pulse)


class FlipFlop(Module):
    def __init__(self, name: str, outputs: list[str]) -> None:
        super().__init__(name, outputs)
        self.state = Pulse.LOW
    
    def _process_message(self, message: Message) -> list[Message]:
        match message:
            case Message(pulse=Pulse.LOW):
                self.state = self.state.flip()
                return self.prepare_messages(self.state)
        return []

class Conjunction(Module):
    def __init__(self, name: str, outputs: list[str], inputs: list[str]) -> None:
        super().__init__(name, outputs)
        self.set_inputs(inputs)
    
    def set_inputs(self, inputs: list[str]):
        self._inputs = { input_name: Pulse.LOW for input_name in inputs }

    def _get_output_pulse(self) -> Pulse:
        if all(last_state == Pulse.HIGH for last_state in self._inputs.values()):
            return Pulse.LOW
        return Pulse.HIGH
    
    def _update_last_pulse(self, message: Message) -> None:
        self._inputs[message.from_name] = message.pulse

    def _process_message(self, message: Message) -> list[Message]:
        self._update_last_pulse(message)
        new_output_pulse = self._get_output_pulse()
        return self.prepare_messages(new_output_pulse)

    def __repr__(self):
        return f"{self.name}: {type(self).__name__} {', '.join(self._inputs.keys())} -> {', '.join(self.outputs)}"

In [2]:
def parse_input(input: str) -> dict[str, Module]:
    modules: list[Module] = []
    unhandled_names = set()
    for line in input.splitlines():
        name, outputs_part = line.strip().split(" -> ")
        outputs = outputs_part.split(", ")
        for o in outputs:
            unhandled_names.add(o)

        if name == "broadcaster":
            modules.append(Broadcaster(name, outputs))
        else:
            symbol, name = name[0], name[1:]
            if symbol == "%":
                modules.append(FlipFlop(name, outputs))
            
            else:
                modules.append(Conjunction(name, outputs, []))
    
    for m in modules:
        if m.name in unhandled_names:
            unhandled_names.remove(m.name)
    
    for name in unhandled_names:
        modules.append(Module(name, []))
    
    for module in modules:
        match module:
            case Conjunction() as c:
                c.set_inputs([m.name for m in modules if c.name in m.outputs])
    
    return { m.name: m for m in modules }


In [3]:
def cycle(modules: dict[str, Module], message_queue: Queue[Message]) -> tuple[int, int]:
    message_queue.put(Message("button", "broadcaster", Pulse.LOW))
    pulses = {
        Pulse.LOW: 0,
        Pulse.HIGH: 0
    }
    while not message_queue.empty():
        message = message_queue.get()
        pulses[message.pulse] += 1
        new_messages = modules[message.to_name].process_message(message)
        for m in new_messages:
            message_queue.put(m)
    return tuple(pulses.values())

In [4]:
with open("20/input.txt") as f:
    input = f.read()

message_queue: Queue[Message] = Queue()
modules = parse_input(input)
modules

{'pr': pr: Conjunction bf, ns, pd, jp, tk, qb, pn, cb, mx -> pd, vx, vn, cl, hm,
 'hm': hm: FlipFlop -> qb,
 'nm': nm: FlipFlop -> dh, jv,
 'lv': lv: FlipFlop -> jv, tg,
 'dg': dg: FlipFlop -> tm, jm,
 'mt': mt: FlipFlop -> jv, zp,
 'ln': ln: Conjunction jv -> kj,
 'kj': kj: Conjunction ln, dr, zx, vn -> rx,
 'dr': dr: Conjunction qs -> kj,
 'dx': dx: FlipFlop -> ts,
 'qs': qs: Conjunction rg, lh, mv, ts, dl, vq, bs, ng -> kf, dr, sc, rg, gl, dx,
 'dh': dh: FlipFlop -> jv, mc,
 'rg': rg: FlipFlop -> qs, vq,
 'kt': kt: FlipFlop -> jv, mt,
 'lh': lh: FlipFlop -> qs, dl,
 'tp': tp: FlipFlop -> pf, jm,
 'bf': bf: FlipFlop -> vx, pr,
 'mv': mv: FlipFlop -> qs, gl,
 'ts': ts: FlipFlop -> ng, qs,
 'kf': kf: FlipFlop -> dx,
 'gv': gv: FlipFlop -> jm, km,
 'dl': dl: FlipFlop -> qs,
 'nd': nd: FlipFlop -> dg,
 'km': km: FlipFlop -> jm,
 'ns': ns: FlipFlop -> pr, pn,
 'gl': gl: FlipFlop -> kf,
 'pd': pd: FlipFlop -> pr, jp,
 'xv': xv: FlipFlop -> nd, jm,
 'hf': hf: FlipFlop -> nm,
 'vx': vx: Flip

In [5]:
pulses = (0, 0)
for _ in range(1000):
    new_pulses = cycle(modules, message_queue)
    pulses = (pulses[0] + new_pulses[0], pulses[1] + new_pulses[1])

pulses[0] * pulses[1]

949764474

## Part 2

In [81]:
with open("20/input.txt") as f:
    input = f.read()

message_queue: Queue[Message] = Queue()
modules = parse_input(input)
modules

{'pr': pr: Conjunction bf, ns, pd, jp, tk, qb, pn, cb, mx -> pd, vx, vn, cl, hm,
 'hm': hm: FlipFlop -> qb,
 'nm': nm: FlipFlop -> dh, jv,
 'lv': lv: FlipFlop -> jv, tg,
 'dg': dg: FlipFlop -> tm, jm,
 'mt': mt: FlipFlop -> jv, zp,
 'ln': ln: Conjunction jv -> kj,
 'kj': kj: Conjunction ln, dr, zx, vn -> rx,
 'dr': dr: Conjunction qs -> kj,
 'dx': dx: FlipFlop -> ts,
 'qs': qs: Conjunction rg, lh, mv, ts, dl, vq, bs, ng -> kf, dr, sc, rg, gl, dx,
 'dh': dh: FlipFlop -> jv, mc,
 'rg': rg: FlipFlop -> qs, vq,
 'kt': kt: FlipFlop -> jv, mt,
 'lh': lh: FlipFlop -> qs, dl,
 'tp': tp: FlipFlop -> pf, jm,
 'bf': bf: FlipFlop -> vx, pr,
 'mv': mv: FlipFlop -> qs, gl,
 'ts': ts: FlipFlop -> ng, qs,
 'kf': kf: FlipFlop -> dx,
 'gv': gv: FlipFlop -> jm, km,
 'dl': dl: FlipFlop -> qs,
 'nd': nd: FlipFlop -> dg,
 'km': km: FlipFlop -> jm,
 'ns': ns: FlipFlop -> pr, pn,
 'gl': gl: FlipFlop -> kf,
 'pd': pd: FlipFlop -> pr, jp,
 'xv': xv: FlipFlop -> nd, jm,
 'hf': hf: FlipFlop -> nm,
 'vx': vx: Flip

In [82]:
[m for m in modules.values() if "rx" in m.outputs]

[kj: Conjunction ln, dr, zx, vn -> rx]

In [83]:
[m for m in modules.values() if "rx" in m.outputs]

[kj: Conjunction ln, dr, zx, vn -> rx]

In [7]:
[m for m in modules.values() if "jv" in m.outputs]

[nm: FlipFlop -> dh, jv,
 lv: FlipFlop -> jv, tg,
 mt: FlipFlop -> jv, zp,
 dh: FlipFlop -> jv, mc,
 kt: FlipFlop -> jv, mt,
 tg: FlipFlop -> jv,
 vc: FlipFlop -> jv, hf,
 mc: FlipFlop -> jv, lv]

In [8]:
monitor = [m.name for m in modules.values() if "jv" in m.outputs]

In [9]:
finished = False
num_presses = 0
res = {m: [] for m in monitor}
for x in range(100):
    message_queue.put(Message("button", "broadcaster", Pulse.LOW))
    num_presses += 1

    while not message_queue.empty():
        message = message_queue.get()
        if message.to_name == "rx" and message.pulse == Pulse.LOW:
            break
        new_messages = modules[message.to_name].process_message(message)
        for m in new_messages:
            message_queue.put(m)
    
    for m, state in modules["jv"]._inputs.items():
        if state == Pulse.HIGH:
            res[m].append(num_presses)
        
num_presses

100

In [10]:
res

{'nm': [1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  10,
  11,
  12,
  13,
  14,
  15,
  16,
  17,
  18,
  19,
  20,
  21,
  22,
  23],
 'lv': [24,
  25,
  26,
  27,
  28,
  29,
  30,
  31,
  32,
  33,
  34,
  35,
  36,
  37,
  38,
  39,
  40,
  41,
  42,
  43,
  44,
  45,
  46,
  47,
  48,
  49,
  50,
  51,
  52,
  53,
  54,
  55,
  56,
  57,
  58,
  59,
  60,
  61,
  62,
  63,
  64,
  65,
  66,
  67,
  68,
  69,
  70,
  71,
  72,
  73,
  74,
  75,
  76,
  77,
  78,
  79,
  80,
  81,
  82,
  83,
  84,
  85,
  86,
  87,
  88,
  89,
  90,
  91,
  92,
  93,
  94,
  95,
  96,
  97,
  98,
  99,
  100],
 'mt': [2,
  3,
  6,
  7,
  10,
  11,
  14,
  15,
  18,
  19,
  22,
  23,
  26,
  27,
  30,
  31,
  34,
  35,
  38,
  39,
  42,
  43,
  46,
  47,
  50,
  51,
  54,
  55,
  58,
  59,
  62,
  63,
  66,
  67,
  70,
  71,
  74,
  75,
  78,
  79,
  82,
  83,
  86,
  87,
  90,
  91,
  94,
  95,
  98,
  99],
 'dh': [1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  10,
  11,
  12,
  13,
  14,
  15,
  16

In [99]:
from pyvis.network import Network

net = Network(notebook=True, directed=True)

for module in modules.values():
    if isinstance(module, Conjunction):
        shape = "diamond"
    elif isinstance(module, FlipFlop):
        shape = "triangle"
    else:
        shape = "dot"
    net.add_node(module.name, shape=shape)

net.add_edges([(m.name, o) for m in modules.values() for o in m.outputs])
    

net.show("ble.html")

ble.html
