---
# --- Day 20: Pulse Propagation ---
---

In [323]:
from typing import List, Dict
import numpy as np

## Load data

In [333]:
class CommunicationModule:
    
    def __init__(self, module_name: str, destinations: List[str], verbose: bool):
        self.module_name = module_name
        self.destination_modules = destinations
        self.pulses_sent = [0, 0]
        self.received_pulses = []
        self.pulse_to_transmit = None
        self.verbose = verbose
        
    def receive_pulse(self, pulse: int, sender: str) -> None:
        self.received_pulses.append((pulse, sender))
    
    """Interface for communications modules""" 
    def _update_status(self, pulse: int, sender: str) -> None:
        raise NotImplementedError
        
    def transmit(self, modules: Dict) -> None:
        while len(self.received_pulses) > 0:
            p, s = self.received_pulses.pop(0)
            self._update_status(p, s)
            if self.pulse_to_transmit is not None:
                self.pulses_sent[self.pulse_to_transmit] += len(self.destination_modules)
                for m in self.destination_modules:
                    if self.verbose:
                        print(f"{self.module_name} {self.pulse_to_transmit} -> {m}")
                    modules[m].receive_pulse(self.pulse_to_transmit, self.module_name)
                for m in self.destination_modules:
                    modules[m].transmit(modules)

In [334]:
class BroadcasterModule(CommunicationModule):
    module_type = "b"
    
    def _update_status(self, pulse: int, sender: str) -> None:
        self.pulse_to_transmit = pulse

In [335]:
class FlipFlopModule(CommunicationModule):    
    module_type = "%"

    def __init__(self, module_name: str, destinations: List[str], verbose: bool):
        super().__init__(module_name, destinations, verbose)
        self.active = False        
    
    def _update_status(self, pulse: int, sender: str) -> None:
        if pulse == 0:
            self.active = not self.active
            self.pulse_to_transmit = int(self.active)
        else:
            self.pulse_to_transmit = None

In [336]:
class ConjunctionModule(CommunicationModule):
    module_type = "&"

    def __init__(self, module_name: str, destinations: List[str], verbose: bool):
        super().__init__(module_name, destinations, verbose)
        self.memory = dict()
    
    def _update_status(self, pulse: int, sender: str) -> None:
        self.memory[sender] = pulse
        self.pulse_to_transmit = 0 if all([v==1 for v in self.memory.values()]) else 1 

In [337]:
class OutputModule(CommunicationModule):
    module_type = "o"

    def _update_status(self, pulse: int, sender: str) -> None:
        pass

In [338]:
def instantiate_modules(file_tag: str, verbose: bool=False) -> Dict[str, CommunicationModule]:
    with open(f"data/day20_{file_tag}.txt", "r") as f:
        data = f.read().splitlines()
    modules = [OutputModule("output", [], verbose)]
    for row in data:
        module_string, dests = row.split(" -> ")
        destinations = dests.split(", ")
        module_type = module_string[0]
        if module_type == "b":
            modules.append(BroadcasterModule("broadcaster", destinations, verbose))
        elif module_type == "%":
            modules.append(FlipFlopModule(module_string[1:], destinations, verbose))
        elif module_type == "&":
            modules.append(ConjunctionModule(module_string[1:], destinations, verbose))
        else:
            raise ValueError("Module type unknown.")
    modules_dict = {m.module_name: m for m in modules}
    # create input connections for conjunction modules
    extra_modules = {}
    for k, m in modules_dict.items():
        for d in m.destination_modules:
            if not d in modules_dict:
                extra_modules[d] = OutputModule("output", [], verbose)
            elif modules_dict[d].module_type == "&":
                modules_dict[d].memory.update({k: 0})    
    return {**modules_dict, **extra_modules}

## --- Part One ---

In [339]:
modules = instantiate_modules("input", verbose=False)

n_button_presses = 1000
cnt = 0
back_to_initial_status = False
while not back_to_initial_status and cnt < n_button_presses:
    modules["broadcaster"].receive_pulse(0, "button")
    modules["broadcaster"].transmit(modules)    
    sum_fp = sum([m.active for m in modules.values() if m.module_type == "%"])
    sum_im = sum([sum(m.memory.values()) for m in modules.values() if m.module_type == "&"])
    back_to_initial_status = (sum_fp + sum_im) == 0
    cnt += 1
pulse_counts = np.sum([m.pulses_sent for m in modules.values()], axis=0) + np.array([1, 0])*cnt
print("Pulse counts:")
print(f"After {cnt} button presses: {pulse_counts}")
final_pulse_counts = pulse_counts * int(n_button_presses / cnt)
print(f"After {n_button_presses} button presses: {final_pulse_counts}")
print(np.prod(final_pulse_counts))

Pulse counts:
After 1000 button presses: [16669 42193]
After 1000 button presses: [16669 42193]
703315117


## --- Part Two ---

In [340]:
[m.module_name for _,m in modules.items() if "rx" in m.destination_modules]

['cs']

In [342]:
modules["cs"].memory

{'kh': 0, 'lz': 0, 'tg': 0, 'hn': 0}

In [330]:
modules = instantiate_modules("input", verbose=False)
cnt = 0
while modules["rx"].low_pulses_received == 0:
    modules["broadcaster"].receive_pulse(0, "button")
    modules["broadcaster"].transmit(modules)
    cnt += 1

KeyboardInterrupt: 

In [331]:
cnt

2116885

In [332]:
modules["rx"].low_pulses_received

0