Alternative implementation of `generals1`;
* Traitors now return random observations.

In [14]:
import random
import logging
from enum import Enum, auto
from itertools import combinations
from collections import Counter, defaultdict

In [15]:
logging.basicConfig(level=100, force=True)

In [16]:
class Observation(Enum):
    '''
    Represents an observation that a general has made on a particular city.
    '''
    ATTACK = auto()
    RETREAT = auto()

In [17]:
class EnemyCity:
    def __init__(self, should_attack):
        # A city is modelled to be "objectively" either attackable or not
        self.should_attack = should_attack

In [18]:
class General:
    '''
    Represents a General who is either loyal or a traitor.
    '''
    # Used for easier representation
    next_id = 0

    def __init__(self, is_traitor=False):
        self._objective_observation = None
        self.last_observations = []
        self.is_traitor = is_traitor

        self._id = General.next_id
        General.next_id += 1
    
    def make_observation(self, city: EnemyCity):
        self._objective_observation = (Observation.RETREAT, Observation.ATTACK)[city.should_attack]
    
    def speak(self):
        if not self.is_traitor:
            return self._objective_observation
        
        # Traitors return a random observation
        return random.choice([Observation.RETREAT, Observation.ATTACK])
    
    def listen(self, observation):
        self.last_observations.append(observation)
    
    def make_decision(self):
        return Counter(self.last_observations).most_common(1)[0][0]

In [19]:
class Messenger:
    @staticmethod
    def traverse_generals(generals):
        '''
        Traverse the network created by pairing every general with every other general.
        Calls the `speak`, and `listen` methods of each `General` instance.
        Effectively creates a node graph of generals and messages.

        :return: The resulting communication graph represented as a dictionary.
        '''
        # {General: {target: message, ...}, ...}
        comms = defaultdict(dict)
        
        for x, y in combinations(generals, 2):
            y_speak = y.speak()
            x_speak = x.speak()
            
            x.listen(y_speak)
            y.listen(x_speak)
            
            comms[y][x] = y_speak
            comms[x][y] = x_speak
        
        return comms

In [20]:
def main(total_generals, num_traitors=-1, should_attack=None):
    '''
    Run the main simulation between a number of generals.

    :param total_generals: The TOTAL number of generals
    :param num_traitors: The number of traitors.
        Can be expressed either as...
            int >= 0                 for an absolute size
            float >= 0.0 and <= 1.0  for a percentage of `total_generals`
            int < 0                  for random distribution
    :param should_attack: Should the city be objectively attackable or not.
        (default: random)
    :return: The PoA agreed to by the loyal generals.
        ("retreat", "attack" or None)
    '''
    n_traitors = 0
    
    if isinstance(num_traitors, int):
        if num_traitors >= 0:
            n_traitors = num_traitors
        else:
            n_traitors = int(total_generals * random.random())
    elif isinstance(num_traitors, float) and 0 <= num_traitors <= 1:
        n_traitors = int(total_generals * num_traitors)
    
    generals = [General(False) for _ in range(total_generals - n_traitors)] + \
               [General(True) for _ in range(n_traitors)]
    
    if should_attack is not None:
        city = EnemyCity(should_attack)
    else:
        city = EnemyCity(random.choice([True, False]))
    
    logging.info(
        f'Running with {total_generals} total generals of which'
        f' {n_traitors} are traitorous. EnemyCity.should_attack == {city.should_attack}\n'
    )
    
    # Each general makes an objective observation
    for g in generals:
        g.make_observation(city)
    
    # Each general communicates his observation (or not)
    m = Messenger()
    communications = m.traverse_generals(generals)
    
    # Each general comes up with a plan of action
    loyal_poa, agreed_plan = set(), None
    
    for g in generals:
        decision = g.make_decision()
        
        if not g.is_traitor:
            loyal_poa.add(decision)
    
    if len(loyal_poa) != 1:
        logging.info('Loyal Generals cannot agree on a plan of action!!!\n')
    else:
        agreed_plan = loyal_poa.pop()
        logging.info(f'Loyal Generals have agreed on: {agreed_plan}\n')
    
    total_a, total_r = 0, 0

    for g in communications:
        a, r = 0, 0
        
        for msg in communications[g].values():
            if msg is Observation.ATTACK:
                a += 1
            else:
                r += 1
        
        total_a += a
        total_r += r

        logging.debug(
            f'{"[!] " if g.is_traitor else ""}'
            f'General {g._id} has made the objective observation of {g._objective_observation.name}'
            f' and reported ATTTACK {a} times and RETREAT {r} times'
        )
    
    return agreed_plan, total_a, total_r

In [21]:
def test_runner():
    # [(num_generals, num_traitors), ...]
    tests = [
        (10, 0),
        (10, 2),
        (10, 4),
        (10, 6),
        (10, 8),
        (10, 10)
    ]

    for test in tests:
        print(f'\n\nRunning test {test}\n\n')

        for _ in range(10):  # repeat each test 10 times
            plan, a, r = main(*test, True)
            print(f'plan={plan} attack={a} retreat={r}')

In [22]:
if __name__ == '__main__':
    # m = 10  # 10 traitors
    
    # # The loyal generals should all agree to the same reasonable plan
    # main((3 * m) + 1, m)
    
    # # Here, the loyal generals *should* fail to 
    # # reach agreement or agree to an "unreasonable" plan.
    # # (This doesn't actually happen...)
    # main((3 * m), m)  # Note the missing `+1`
    
    # main(15, 10)
    test_runner()



Running test (10, 0)


plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0
plan=Observation.ATTACK attack=90 retreat=0


Running test (10, 2)


plan=Observation.ATTACK attack=83 retreat=7
plan=Observation.ATTACK attack=84 retreat=6
plan=Observation.ATTACK attack=78 retreat=12
plan=Observation.ATTACK attack=80 retreat=10
plan=Observation.ATTACK attack=80 retreat=10
plan=Observation.ATTACK attack=81 retreat=9
plan=Observation.ATTACK attack=83 retreat=7
plan=Observation.ATTACK attack=80 retreat=10
plan=Observation.ATTACK attack=79 retreat=11
plan=Observation.ATTACK attack=83 retreat=7


Running test (10, 4)


plan=Observation.ATTACK attack=74 retrea