# Iterated Prisoner's Dilemma On A Network

## Imports and Config

In [None]:
import math
import random
import logging
import numpy as np
import pandas as pd
import networkx as nx
from functools import partial
from dataclasses import dataclass
from IPython.display import display
import matplotlib.pyplot as plt
import matplotlib.patheffects as pe
import matplotlib.patches as mpatches
from matplotlib.colors import ListedColormap
from matplotlib.ticker import PercentFormatter
from matplotlib.animation import FuncAnimation

In [None]:
%matplotlib notebook
plt.rcParams["animation.html"] = "jshtml"
plt.rcParams["animation.embed_limit"] = 500

In [None]:
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)

## Matrices

In [None]:
payoff_matrices = {  # some of these don't look right
    "Default": {
        ("C", "C"): (3, 3),
        ("C", "D"): (0, 5),
        ("D", "C"): (5, 0),
        ("D", "D"): (0, 0),
    },
    "Canonical": {
        ("C", "C"): (-1, -1),
        ("C", "D"): (-3, 0),
        ("D", "C"): (0, -3),
        ("D", "D"): (-2, -2),
    },
    "Friend or Foe": {
        ("C", "C"): (1, 1),
        ("C", "D"): (0, 2),
        ("D", "C"): (2, 0),
        ("D", "D"): (0, 0),
    },
    "Snowdrift": {
        ("C", "C"): (500, 500),
        ("C", "D"): (200, 800),
        ("D", "C"): (800, 200),
        ("D", "D"): (0, 0),
    },
    "Prisoners": {
        ("C", "C"): (500, 500),
        ("C", "D"): (-200, 1200),
        ("D", "C"): (1200, -200),
        ("D", "D"): (0, 0),
    },
}

## Classes

### Strategy

In [None]:
class ActionStrategy:
    """Strategy that always plays its current action, randomly initialized."""

    def __init__(self, rng):
        self.rng = rng
        self.action = "C" if self.rng.random() < 0.5 else "D"

    def decide(self, agent_history):
        return self.action

    def set_action(self, action):
        self.action = action

In [None]:
class ImitationStrategy(ActionStrategy):
    """Imitate the action with the highest mean payoff in interactions."""

    def decide(self, agent_history):
        totals = {"C": 0.0, "D": 0.0}
        counts = {"C": 0, "D": 0}
        for interactions in agent_history.values():
            for inter in interactions:
                counts[inter.own_action] += 1
                counts[inter.neighbor_action] += 1
                totals[inter.own_action] += inter.own_reward
                totals[inter.neighbor_action] += inter.neighbor_reward

        mean_C = totals["C"] / counts["C"] if counts["C"] else 0
        mean_D = totals["D"] / counts["D"] if counts["D"] else 0

        if mean_C > mean_D:
            self.action = "C"
        elif mean_D > mean_C:
            self.action = "D"
        else:  # in case of a tie, continue with the existing strategy
            pass

        return self.action

In [None]:
class FermiStrategy(ActionStrategy):
    """
    Endogenous Fermi algorithm (pairwise).
    """

    def __init__(self, rng, temperature=0.1):
        super().__init__(rng)
        self.K = temperature

    def decide(self, agent_history):
        # if no neighbours yet, keep current action
        if not agent_history:
            return self.action

        # pick one random neighbour we've interacted with
        neighbour_id = self.rng.choice(list(agent_history.keys()))
        interactions = agent_history.get(neighbour_id, [])
        if not interactions:
            return self.action

        # estimate payoffs from history with that neighbour
        own_rewards = [i.own_reward for i in interactions]
        neigh_rewards = [i.neighbor_reward for i in interactions]

        payoff_self = float(np.mean(own_rewards)) if own_rewards else 0.0
        payoff_neigh = float(np.mean(neigh_rewards)) if neigh_rewards else 0.0

        delta = payoff_neigh - payoff_self

        # Fermi probability
        if self.K == 0:
            p_switch = 1.0 if delta > 0 else 0.0
        else:
            exponent = -delta / self.K
            exponent = max(min(exponent, 700), -700)  # avoid overflow
            p_switch = 1.0 / (1.0 + math.exp(exponent))

        # switch action to neighbour's most recently observed action
        if self.rng.random() < p_switch:
            self.action = interactions[-1].neighbor_action

        return self.action

In [None]:
class ReinforcementLearningStrategy(ActionStrategy):
    """
    Q-learning strategy with epsilon-greedy action selection.
    """

    def __init__(self, rng, learning_rate=0.1, epsilon=0.1, initial_q=0.0):
        super().__init__(rng)
        self.alpha = learning_rate
        self.epsilon = epsilon
        self.q = {"C": float(initial_q), "D": float(initial_q)}
        self._last_action = None
        self._last_reward = 0.0

    def decide(self, agent_history):
        # observe most recent reward from any interaction (if exists)
        last = None
        for interactions in agent_history.values():
            if interactions:
                cand = interactions[-1]
                if last is None:
                    last = cand
        if last is not None:
            self._last_reward = last.own_reward

        # update Q for the action we previously played
        if self._last_action is not None:
            a = self._last_action
            self.q[a] = self.q[a] + self.alpha * (self._last_reward - self.q[a])

        # choose next action (epsilon-greedy)
        if self.rng.random() < self.epsilon:
            action = "C" if self.rng.random() < 0.5 else "D"
        else:
            if self.q["C"] > self.q["D"]:
                action = "C"
            elif self.q["D"] > self.q["C"]:
                action = "D"
            else:
                action = "C" if self.rng.random() < 0.5 else "D"

        self.action = action
        self._last_action = action
        return self.action

### Agent

In [None]:
class Agent:
    """Minimal agent holding a strategy, payoff, and history."""

    @dataclass
    class Interaction:
        own_action: str
        own_reward: float
        neighbor_action: str
        neighbor_reward: float

    def __init__(self, agent_id, strategy, history_window=5, store_history=True):
        self.id = agent_id
        self.strategy = strategy
        self.history = {}
        self.payoff = 0.0
        self.history_window = history_window
        self.store_history = store_history

    def choose_action(self):
        return self.strategy.decide(self.history)

    def record_interaction(
        self, neighbor_id, own_action, neighbor_action, own_reward, neighbor_reward
    ):
        self.payoff += own_reward
        if not self.store_history:
            return
        lst = self.history.setdefault(neighbor_id, [])
        lst.append(
            self.Interaction(own_action, own_reward, neighbor_action, neighbor_reward)
        )
        if len(lst) > self.history_window:
            del lst[: -self.history_window]

### Network

In [None]:
class Network:
    """NetworkX graph wrapper."""

    def generate_graph(self, kind, n, seed=None, **kwargs):
        """Generate a networkx graph by name."""
        if kind == "grid":
            side_length = int(math.sqrt(n))
            graph = nx.convert_node_labels_to_integers(
                nx.grid_2d_graph(side_length, side_length)
            )

        elif kind == "stochastic_block":
            sizes = kwargs.pop("sizes")
            p = kwargs.pop("p")
            graph = nx.stochastic_block_model(sizes, p, seed=seed, **kwargs)

        else:
            generators = {
                "erdos_renyi": nx.erdos_renyi_graph,
                "watts_strogatz": nx.watts_strogatz_graph,
                "barabasi_albert": nx.barabasi_albert_graph,
            }
            graph = generators[kind](n, seed=seed, **kwargs)

        self.kind = kind
        self.graph = graph
        self.seed = seed

    def neighbour(self, agent_id):
        """Return neighbour agent IDs."""
        if self.graph:
            return list(self.graph.neighbors(agent_id))
        else:
            raise NotImplementedError

In [None]:
class NetworkSimulation(Network):
    """
    Base class for running evolutionary games on any NetworkX graph.
    """

    def __init__(
        self,
        kind="grid",
        n=400,
        seed=42,
        rounds=100,
        strategy=ActionStrategy,
        strategy_kwargs=None,
        payoff_matrix=payoff_matrices["Default"],
        rng=None,
        history_window=5,
        store_history=True,
        store_snapshots=True,
        **graph_kwargs,
    ):
        self.strategy = strategy
        self.strategy_kwargs = strategy_kwargs or {}
        self.rounds = rounds
        self.payoff_matrix = payoff_matrix
        self.rng = rng if rng is not None else np.random.default_rng(seed)
        self.history_window = history_window
        self.store_history = store_history
        self.store_snapshots = store_snapshots
        self.generate_graph(kind=kind, n=n, seed=seed, **graph_kwargs)
        print(nx.degree_histogram(self.graph))
        print(nx.average_clustering(self.graph))
        self.edge_list = list(self.graph.edges())
        self.agents = {}
        self.snapshots = []
        self._initialize_agents()

    def _initialize_agents(self):
        for agent_id in self.graph.nodes:
            strat = self.strategy(self.rng, **self.strategy_kwargs)
            self.agents[agent_id] = Agent(
                agent_id,
                strat,
                history_window=self.history_window,
                store_history=self.store_history,
            )

    def _reset_payoffs(self):
        for agent in self.agents.values():
            agent.payoff = 0.0

    # fast inner loop: precompute lookups and actions once
    def _play_round(self):
        agents = self.agents
        payoff_matrix = self.payoff_matrix
        edge_list = self.edge_list

        for agent in agents.values():
            agent.payoff = 0.0

        actions = {node: agents[node].choose_action() for node in agents}
        for node_a, node_b in edge_list:
            action_a = actions[node_a]
            action_b = actions[node_b]
            payoff_a, payoff_b = payoff_matrix.get((action_a, action_b), (0, 0))
            agents[node_a].record_interaction(
                node_b, action_a, action_b, payoff_a, payoff_b
            )
            agents[node_b].record_interaction(
                node_a, action_b, action_a, payoff_b, payoff_a
            )

    def _get_state(self):
        return {
            node_id: (1 if agent.strategy.action == "D" else 0)
            for node_id, agent in self.agents.items()
        }

    def encode_state(self):  # for speed
        arr = np.fromiter(
            (
                1 if self.agents[i].strategy.action == "D" else 0
                for i in self.graph.nodes()
            ),
            dtype=np.uint8,
            count=self.graph.number_of_nodes(),
        )
        return np.packbits(arr, bitorder="little").tobytes()

    def decode_state(self, packed):
        n = self.graph.number_of_nodes()
        bits = np.unpackbits(np.frombuffer(packed, dtype=np.uint8), bitorder="little")
        return bits[:n].astype(np.uint8)

    def run_until_attractor(
        self,
        max_steps=2000,
        check_every=1,
        store_cycle_states=True,
    ):
        """
        Detect fixed points or cycles by hashing compact states
        """
        seen = {}
        cache = []
        for t in range(max_steps + 1):
            key = None
            if t % check_every == 0:
                key = self.encode_state()
                if key in seen:
                    t0 = seen[key]
                    period = t - t0
                    attractor = "fixed" if period == 1 else "cycle"
                    cycle_states = cache[t0:t] if store_cycle_states else None
                    return {
                        "t_end": t,
                        "t_cycle_start": t0,
                        "period": period,
                        "attractor": attractor,
                        "cycle_states": cycle_states,
                    }
                seen[key] = t

            if store_cycle_states:
                if key is None:
                    key = self.encode_state()
                cache.append(key)

            self.step()

        return {
            "t_end": max_steps,
            "t_cycle_start": None,
            "period": None,
            "attractor": "unknown",
            "cycle_states": None,
        }

    def cooperation_metrics(self, state01=None):
        """
        Cluster metrics for cooperators using NetworkX components
        """
        if state01 is None:
            state01 = np.fromiter(
                (
                    1 if self.agents[i].strategy.action == "D" else 0
                    for i in self.graph.nodes()
                ),
                dtype=np.uint8,
                count=self.graph.number_of_nodes(),
            )
        coop_nodes = [
            node for node, val in zip(self.graph.nodes(), state01) if val == 0
        ]
        if not coop_nodes:
            return {
                "coop_frac": 0.0,
                "n_coop_clusters": 0,
                "largest_coop_cluster": 0,
                "mean_coop_cluster_size": 0.0,
            }
        H = self.graph.subgraph(coop_nodes)
        comps = list(nx.connected_components(H))
        sizes = sorted([len(c) for c in comps], reverse=True)
        return {
            "coop_frac": float((state01 == 0).mean()),
            "n_coop_clusters": int(len(sizes)),
            "largest_coop_cluster": int(sizes[0]) if sizes else 0,
            "mean_coop_cluster_size": float(np.mean(sizes)) if sizes else 0.0,
        }

    def step(self):
        self._play_round()
        if self.store_snapshots:
            self.snapshots.append(self._get_state())

    def run(self):
        for _ in range(self.rounds):
            self.step()

## Helpers

In [None]:
def oscillation_summary(decoded_cycle_states):
    """
    Generate summary of oscillations over time.
    """
    A = np.stack(decoded_cycle_states, axis=0)
    oscillates_mask = A.min(axis=0) != A.max(axis=0)
    frozen_C = (~oscillates_mask) & (A[0] == 0)
    frozen_D = (~oscillates_mask) & (A[0] == 1)
    return {
        "frac_oscillating": float(np.mean(oscillates_mask)),
        "frac_frozen_C": float(np.mean(frozen_C)),
        "frac_frozen_D": float(np.mean(frozen_D)),
    }


def run_many(
    kind,
    n,
    payoff_matrix,
    strategy_class,
    strategy_kwargs,
    seeds,
    max_steps=1500,
    graph_kwargs=None,
    history_window=5,
    store_history=True,
):
    """
    Helper to run several simulations and compute metrics.
    """
    graph_kwargs = graph_kwargs or {}
    strategy_kwargs = strategy_kwargs or {}
    rows = []
    for seed in seeds:
        model = NetworkSimulation(
            kind=kind,
            n=n,
            seed=seed,
            rounds=max_steps,
            payoff_matrix=payoff_matrix,
            strategy=strategy_class,
            strategy_kwargs=strategy_kwargs,
            store_snapshots=False,
            history_window=history_window,
            store_history=store_history,
            **graph_kwargs,
        )
        res = model.run_until_attractor(max_steps=max_steps, store_cycle_states=True)
        row = {
            "seed": seed,
            "attractor": res["attractor"],
            "period": res["period"],
            "t_end": res["t_end"],
        }

        if res["cycle_states"]:
            decoded = [model.decode_state(s) for s in res["cycle_states"]]
            osc = oscillation_summary(decoded)
            row.update(osc)

            mets = [model.cooperation_metrics(state01=st) for st in decoded]
            row["cycle_mean_coop_frac"] = float(np.mean([m["coop_frac"] for m in mets]))
            row["cycle_mean_largest_cluster"] = float(
                np.mean([m["largest_coop_cluster"] for m in mets])
            )
        else:
            m = model.cooperation_metrics()
            row["cycle_mean_coop_frac"] = m["coop_frac"]
            row["cycle_mean_largest_cluster"] = m["largest_coop_cluster"]
            row["frac_oscillating"] = np.nan
            row["frac_frozen_C"] = np.nan
            row["frac_frozen_D"] = np.nan

        rows.append(row)

    return pd.DataFrame(rows)


def cluster_size_distribution(graph, state01):
    """
    Cacluculate cluster size distribution.
    """
    coop_nodes = [node for node, val in zip(graph.nodes(), state01) if val == 0]
    if not coop_nodes:
        return {}
    H = graph.subgraph(coop_nodes)
    comps = list(nx.connected_components(H))
    sizes = [len(c) for c in comps]
    dist = {}
    for size in sizes:
        dist[size] = dist.get(size, 0) + 1
    return dist


def track_cooperation_over_time(
    model,
    steps=200,
    sample_every=1,
    max_cluster_size=50,
):
    """
    Sample cooperation and cluster sizes over time without storing full snapshots
    """
    metric_rows = []
    dist_rows = []

    for t in range(steps + 1):
        if t % sample_every == 0:
            state01 = np.fromiter(
                (
                    1 if model.agents[i].strategy.action == "D" else 0
                    for i in model.graph.nodes()
                ),
                dtype=np.uint8,
                count=model.graph.number_of_nodes(),
            )
            mets = model.cooperation_metrics(state01=state01)
            mets["t"] = t
            metric_rows.append(mets)

            dist = cluster_size_distribution(model.graph, state01)
            for size, count in dist.items():
                size_key = size
                if max_cluster_size is not None and size > max_cluster_size:
                    size_key = max_cluster_size + 1
                dist_rows.append({"t": t, "size": size_key, "count": count})

        model.step()

    metrics_df = pd.DataFrame(metric_rows)
    dist_df = pd.DataFrame(dist_rows)
    return metrics_df, dist_df

## Visualization

### Animation

In [None]:
def experiment(
    model_class,
    strategy_class,
    strategy_kwargs=None,
    steps=100,
    seed=42,
    interval=300,
    payoff_matrix=payoff_matrices["Default"],
    title="",
    kind="grid",
    n=400,
    is_grid=False,
    **graph_kwargs,
):
    """
    Produce animations showing the network state over time.
    """
    payoff_matrix = payoff_matrix
    strategy_kwargs = strategy_kwargs or {}
    model = model_class(
        kind=kind,
        n=n,
        seed=seed,
        rounds=steps,
        payoff_matrix=payoff_matrix,
        strategy=strategy_class,
        strategy_kwargs=strategy_kwargs,
        **graph_kwargs,
    )

    graph = model.graph
    n_nodes = graph.number_of_nodes()

    C_COOP, C_DEFECT = "#40B0A6", "#FFBE6A"
    cmap = ListedColormap([C_COOP, C_DEFECT])
    fig, (ax_sim, ax_stats) = plt.subplots(
        2, 1, figsize=(7, 8), gridspec_kw={"height_ratios": [4, 1]}
    )

    # -------------------------
    # Stats plot (C% and D%)
    # -------------------------
    xs, ys_c, ys_d = [], [], []

    (line_c,) = ax_stats.plot([], [], lw=2, label="C")
    (line_d,) = ax_stats.plot([], [], lw=2, label="D")

    ax_stats.set_xlim(0, steps)
    ax_stats.set_ylim(0, 100)
    ax_stats.yaxis.set_major_formatter(PercentFormatter(xmax=100))
    ax_stats.set_ylabel("Population")
    ax_stats.grid(True, linestyle=":", alpha=0.4)
    ax_stats.legend(frameon=False, ncol=2, loc="upper right")

    # -------------------------
    # Simulation plot
    # -------------------------
    if is_grid:
        dim = int(math.isqrt(n_nodes))
        if dim * dim != n_nodes:
            raise ValueError(f"Grid mode needs square number of nodes, got {n_nodes}.")

        def state_as_grid():
            state = model._get_state()
            grid = [[0] * dim for _ in range(dim)]
            for node, val in state.items():
                grid[node // dim][node % dim] = val
            return grid

        sim_artist = ax_sim.imshow(state_as_grid(), cmap=cmap, vmin=0, vmax=1)
        ax_sim.set_xticks([])
        ax_sim.set_yticks([])

        def update_sim():
            sim_artist.set_data(state_as_grid())

    else:
        pos = nx.spring_layout(graph, seed=seed)
        nodelist = list(graph.nodes())
        nx.draw_networkx_edges(graph, pos, ax=ax_sim, alpha=0.3, edge_color="gray")
        state0 = model._get_state()
        sim_artist = nx.draw_networkx_nodes(
            graph,
            pos,
            nodelist=nodelist,
            node_color=[state0[i] for i in nodelist],
            cmap=cmap,
            vmin=0,
            vmax=1,
            node_size=80,
            edgecolors="gray",
            ax=ax_sim,
        )
        ax_sim.axis("off")

        def update_sim():
            state = model._get_state()
            sim_artist.set_array([state[i] for i in nodelist])

    # -------------------------
    # Animation update
    # -------------------------
    def update(frame):
        if frame > 0:
            model.step()

        ax_sim.set_title(f"{title} (Step {frame}/{steps})")

        update_sim()

        state = model._get_state()
        d = sum(state.values())
        c = n_nodes - d

        xs.append(frame)
        ys_c.append(100 * c / n_nodes)
        ys_d.append(100 * d / n_nodes)

        line_c.set_data(xs, ys_c)
        line_d.set_data(xs, ys_d)

        return sim_artist, line_c, line_d

    anim = FuncAnimation(
        fig,
        update,
        frames=steps + 1,
        interval=interval,
        blit=True,
        repeat=False,
    )
    return anim

## Experiments

In [None]:
matrix_names = ["Snowdrift"]
size = 10
n = size * size

runs = [
    # ("Imitation", ImitationStrategy, {}),
    (
        "Reinforcement Learning",
        ReinforcementLearningStrategy,
        {
            "learning_rate": 0.1,
            "epsilon": 0.1,
            "initial_q": 0.0,
        },
    ),
]

graph_setups = [
    # ("Grid", "grid", {}, True),
    ("Small-World", "watts_strogatz", {"k": 4, "p": 0.1}, False),
    ("Erdos-Renyi", "erdos_renyi", {"p": 0.1}, False),
]

for matrix_name in matrix_names:
    matrix = payoff_matrices[matrix_name]

    for graph_label, kind, graph_kwargs, is_grid in graph_setups:
        for strat_label, strat_cls, strat_kwargs in runs:
            ani = experiment(
                model_class=NetworkSimulation,
                strategy_class=strat_cls,
                strategy_kwargs=strat_kwargs,
                steps=60,
                seed=42,
                interval=50,
                payoff_matrix=matrix,
                kind=kind,
                n=n,
                is_grid=is_grid,
                title=f"{strat_label} on {matrix_name} ({graph_label})",
                **graph_kwargs,
            )
            display(ani)

## WIP

In [None]:
df = run_many(
    kind="watts_strogatz",
    n=400,
    payoff_matrix=payoff_matrices["Snowdrift"],
    strategy_class=ImitationStrategy,
    strategy_kwargs={},
    seeds=range(30),
    max_steps=1500,
    graph_kwargs={"k": 4, "p": 0.1},
)
print(df.head())
print(
    df.groupby("attractor")[
        [
            "period",
            "cycle_mean_coop_frac",
            "cycle_mean_largest_cluster",
            "frac_oscillating",
        ]
    ].agg(["mean", "std", "median"])
)

In [None]:
# Cooperation over time + cluster size distribution
model = NetworkSimulation(
    kind="watts_strogatz",
    n=400,
    seed=1,
    rounds=300,
    payoff_matrix=payoff_matrices["Snowdrift"],
    strategy=ImitationStrategy,
    strategy_kwargs={},
    store_snapshots=False,
    history_window=5,
    store_history=True,
    k=4,
    p=0.1,
)

time_metrics, size_dist = track_cooperation_over_time(
    model, steps=300, sample_every=2, max_cluster_size=40
)

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(7, 7), sharex=True)
ax1.plot(time_metrics["t"], time_metrics["coop_frac"], label="Coop fraction")
ax1.plot(
    time_metrics["t"],
    time_metrics["largest_coop_cluster"],
    label="Largest coop cluster",
)
ax1.set_ylabel("Value")
ax1.grid(True, linestyle=":", alpha=0.4)
ax1.legend(frameon=False)

if not size_dist.empty:
    pivot = size_dist.pivot_table(
        index="t", columns="size", values="count", aggfunc="sum", fill_value=0
    )
    ax2.stackplot(pivot.index, pivot.values.T, labels=pivot.columns)
    ax2.set_ylabel("Cluster count")
    ax2.set_xlabel("t")
    ax2.grid(True, linestyle=":", alpha=0.4)

plt.tight_layout()
plt.show()

# Batch summary plots over seeds
batch_df = run_many(
    kind="watts_strogatz",
    n=400,
    payoff_matrix=payoff_matrices["Snowdrift"],
    strategy_class=ImitationStrategy,
    strategy_kwargs={},
    seeds=range(30),
    max_steps=1500,
    graph_kwargs={"k": 4, "p": 0.1},
)

fig, axes = plt.subplots(1, 3, figsize=(12, 3))
axes[0].hist(batch_df["period"].dropna(), bins=20)
axes[0].set_title("Period")
axes[1].hist(batch_df["cycle_mean_coop_frac"].dropna(), bins=20)
axes[1].set_title("Cycle mean coop")
axes[2].hist(batch_df["cycle_mean_largest_cluster"].dropna(), bins=20)
axes[2].set_title("Cycle mean largest cluster")
for ax in axes:
    ax.grid(True, linestyle=":", alpha=0.4)
plt.tight_layout()
plt.show()