# Assignment 3

## 1. Setup

In [10]:
# Imports 
    # Copied from exmaple notebook 
    
from mesa import Agent, Model
from mesa.time import SimultaneousActivation
from mesa.space import NetworkGrid
from mesa.datacollection import DataCollector
import networkx as nx
import numpy as np
import pandas as pd
from __future__ import annotations
from typing import Tuple, List, Iterable, Dict, Optional
import matplotlib.pyplot as plt
import random
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed 
import math
import os
from dataclasses import dataclass
import matplotlib.pyplot as plt
from collections import defaultdict

# Basic Mesa-like minimal scheduler and network utilities (we'll avoid heavy Mesa imports)
import networkx as nx



# Baseline Analysis
Run a baseline system analysis for the EV Stag Hunt model:
- Heatmap of final adoption X* vs (X0, I0)
- Phase plots (I(t), X(t)) for representative initial conditions
- Sensitivity curves: final adoption vs beta_I for several X0

Produces PNG plots and CSV data in ./output_ev_analysis/

### Model Implementation

In [11]:
class EVAgent:
    def __init__(self, node_id: int, model: "EVStagHuntModel", init_strategy: str = "D"):
        self.node_id = node_id
        self.model = model
        self.strategy = init_strategy  # "C" or "D"
        self.payoff = 0.0
        self.next_strategy = init_strategy

    def step(self) -> None:
        """Compute payoff from interactions with neighbours."""
        I = self.model.infrastructure
        a0 = self.model.a0
        beta_I = self.model.beta_I
        b = self.model.b
        a_I = a0 + beta_I * I

        neighbors = list(self.model.G.neighbors(self.node_id))
        if not neighbors:
            self.payoff = 0.0
            return

        payoff = 0.0
        for nbr in neighbors:
            other = self.model.node_agent_map[nbr]
            s_i = self.strategy
            s_j = other.strategy
            if s_i == "C" and s_j == "C":
                payoff += a_I
            elif s_i == "C" and s_j == "D":
                payoff += 0.0
            elif s_i == "D" and s_j == "C":
                payoff += b
            else:  # D vs D
                payoff += b
        self.payoff = payoff

    def advance(self) -> None:
        """Compute next_strategy according to model's strategy_choice_func but DO NOT commit."""
        func = self.model.strategy_choice_func
        neighbors = [self.model.node_agent_map[n] for n in self.model.G.neighbors(self.node_id)]

        if func == "imitate":
            candidates = neighbors + [self]
            best = max(candidates, key=lambda a: a.payoff)
            self.next_strategy = best.strategy
        elif func == "logit":
            a_I = self.model.a0 + self.model.beta_I * self.model.infrastructure
            pi_C = 0.0
            pi_D = 0.0
            for other in neighbors:
                if other.strategy == "C":
                    pi_C += a_I
                    pi_D += self.model.b
                else:
                    pi_C += 0.0
                    pi_D += self.model.b
            tau = getattr(self.model, "tau", 1.0)
            denom = math.exp(pi_C / tau) + math.exp(pi_D / tau)
            P_C = math.exp(pi_C / tau) / denom if denom > 0 else 0.5
            self.next_strategy = "C" if random.random() < P_C else "D"
        else:
            raise ValueError(f"Unknown strategy choice function: {func}")

    def commit(self) -> None:
        """Apply next_strategy synchronously."""
        self.strategy = self.next_strategy


class EVStagHuntModel:
    def __init__(
        self,
        initial_ev: int,
        a0: float,
        beta_I: float,
        b: float,
        g_I: float,
        I0: float,
        seed: int | None,
        network_type: str,
        n_nodes: int,
        p: float,
        m: int,
        strategy_choice_func: str = "imitate",
        tau: float = 1.0,
    ):
        self.random = random.Random(seed)
        self.seed = seed
        self.a0 = a0
        self.beta_I = beta_I
        self.b = b
        self.g_I = g_I
        self.infrastructure = float(I0)
        self.strategy_choice_func = strategy_choice_func
        self.tau = tau

        # Build network
        if network_type == "ER":
            G = nx.erdos_renyi_graph(n_nodes, p, seed=seed)
        elif network_type == "WS":
            G = nx.watts_strogatz_graph(n_nodes, max(2, m), p, seed=seed)
        elif network_type == "BA":
            G = nx.barabasi_albert_graph(n_nodes, max(1, m), seed=seed)

        # if graph disconnected, keep largest connected component (avoids isolated nodes effects)
        if not nx.is_connected(G) and len(G) > 0:
            comp = max(nx.connected_components(G), key=len)
            G = G.subgraph(comp).copy()
        self.G = G

        # create agents: one per node
        nodes = list(self.G.nodes())
        total_nodes = len(nodes)
        k_ev = max(0, min(initial_ev, total_nodes))
        ev_nodes = set(self.random.sample(nodes, k_ev))
        self.node_agent_map: Dict[int, EVAgent] = {}
        for node in nodes:
            init_strategy = "C" if node in ev_nodes else "D"
            agent = EVAgent(node, self, init_strategy)
            self.node_agent_map[node] = agent

    def get_adoption_fraction(self) -> float:
        agents = list(self.node_agent_map.values())
        if not agents:
            return 0.0
        return sum(1 for a in agents if a.strategy == "C") / len(agents)

    def step(self) -> None:
        # compute payoffs
        for agent in list(self.node_agent_map.values()):
            agent.step()
        # choose next strategies (based on payoffs)
        for agent in list(self.node_agent_map.values()):
            agent.advance()
        # commit synchronously
        for agent in list(self.node_agent_map.values()):
            agent.commit()
        # update infrastructure: I <- clip(I + g_I*(X - I), 0, 1)
        X = self.get_adoption_fraction()
        I = self.infrastructure
        dI = self.g_I * (X - I)
        self.infrastructure = float(min(1.0, max(0.0, I + dI)))

### Runner Utilities

In [12]:
def run_trial(
    X0_frac: float,
    I0: float,
    *,
    a0: float,
    beta_I: float,
    b: float,
    g_I: float,
    T: int,
    network_type: str,
    n_nodes: int,
    p: float,
    m: int,
    seed: Optional[int],
    strategy_choice_func: str,
    tau: float = 1.0,
    record_trajectory: bool = False,
) -> Tuple[float, Tuple[np.ndarray, np.ndarray]]:
    """Run a single trial. Returns final adoption and optional trajectories (X(t), I(t))."""
    initial_ev = int(round(X0_frac * n_nodes))
    model = EVStagHuntModel(
        initial_ev=initial_ev,
        a0=a0,
        beta_I=beta_I,
        b=b,
        g_I=g_I,
        I0=I0,
        seed=seed,
        network_type=network_type,
        n_nodes=n_nodes,
        p=p,
        m=m,
        strategy_choice_func=strategy_choice_func,
        tau=tau,
    )
    X_traj = []
    I_traj = []
    for t in range(T):
        if record_trajectory:
            X_traj.append(model.get_adoption_fraction())
            I_traj.append(model.infrastructure)
        model.step()
    if record_trajectory:
        X_traj.append(model.get_adoption_fraction())
        I_traj.append(model.infrastructure)
    final = model.get_adoption_fraction()
    return final, (np.array(X_traj), np.array(I_traj))

### Analysis Functions

In [13]:
def heatmap_final_adoption(
    X0_grid: Iterable[float],
    I0_grid: Iterable[float],
    *,
    a0: float,
    beta_I: float,
    b: float,
    g_I: float,
    n_nodes: int,
    p: float,
    m: int,
    T: int,
    trials_per_cell: int,
    network_type: str,
    strategy_choice_func: str,
    tau: float,
    seed_base: int,
    out_dir: str,
) -> Tuple[List[float], List[float], np.ndarray]:
    """Compute mean final adoption X* for every (X0, I0) cell and save CSV."""
    X0_vals = list(X0_grid)
    I0_vals = list(I0_grid)
    H = np.zeros((len(I0_vals), len(X0_vals)), dtype=float)  # rows: I0, cols: X0
    meta = {}
    for i, I0 in enumerate(I0_vals):
        for j, X0 in enumerate(X0_vals):
            finals = []
            for k in range(trials_per_cell):
                seed = seed_base + i * 10000 + j * 100 + k
                final, _ = run_trial(
                    X0_frac=X0,
                    I0=I0,
                    a0=a0,
                    beta_I=beta_I,
                    b=b,
                    g_I=g_I,
                    T=T,
                    network_type=network_type,
                    n_nodes=n_nodes,
                    p=p,
                    m=m,
                    seed=seed,
                    strategy_choice_func=strategy_choice_func,
                    tau=tau,
                    record_trajectory=False,
                )
                finals.append(final)
            H[i, j] = float(np.mean(finals))
            meta[(I0, X0)] = {"finals": finals, "mean": float(np.mean(finals)), "std": float(np.std(finals))}
    # Save CSV
    df = pd.DataFrame(H, index=[f"I0={v:.3f}" for v in I0_vals], columns=[f"X0={v:.3f}" for v in X0_vals])
    df.to_csv(os.path.join(out_dir, "heatmap_final_adoption.csv"))
    return X0_vals, I0_vals, H

def sensitivity_betaI(
    beta_vals: Iterable[float],
    X0_values: Iterable[float],
    *,
    I0: float,
    a0: float,
    b: float,
    g_I: float,
    n_nodes: int,
    p: float,
    m: int,
    T: int,
    trials: int,
    network_type: str,
    strategy_choice_func: str,
    tau: float,
    seed_base: int,
    out_dir: str,
) -> Tuple[List[float], List[float], np.ndarray]:
    """Compute mean final adoption vs beta_I for each initial X0 in X0_values."""
    beta_vals = list(beta_vals)
    X0_values = list(X0_values)
    results = np.zeros((len(X0_values), len(beta_vals)), dtype=float)
    for i, X0 in enumerate(X0_values):
        for j, beta in enumerate(beta_vals):
            finals = []
            for k in range(trials):
                seed = seed_base + i * 10000 + j * 100 + k
                final, _ = run_trial(
                    X0_frac=X0,
                    I0=I0,
                    a0=a0,
                    beta_I=beta,
                    b=b,
                    g_I=g_I,
                    T=T,
                    network_type=network_type,
                    n_nodes=n_nodes,
                    p=p,
                    m=m,
                    seed=seed,
                    strategy_choice_func=strategy_choice_func,
                    tau=tau,
                    record_trajectory=False,
                )
                finals.append(final)
            results[i, j] = float(np.mean(finals))
    # Save CSV
    df = pd.DataFrame(results, index=[f"X0={v:.3f}" for v in X0_values], columns=[f"beta_I={v:.3f}" for v in beta_vals])
    df.to_csv(os.path.join(out_dir, "sensitivity_betaI.csv"))
    return X0_values, beta_vals, results


def collect_trajectories(
    cases: List[Tuple[float, float]],
    *,
    a0: float,
    beta_I: float,
    b: float,
    g_I: float,
    n_nodes: int,
    p: float,
    m: int,
    T: int,
    network_type: str,
    strategy_choice_func: str,
    tau: float,
    seed_base: int,
    out_dir: str,
) -> Dict[Tuple[float, float], Dict]:
    """Run representative trajectories and save time-series / phase plots."""
    trajs = {}
    for idx, (X0, I0) in enumerate(cases):
        seed = seed_base + idx * 100
        final, (X_traj, I_traj) = run_trial(
            X0_frac=X0,
            I0=I0,
            a0=a0,
            beta_I=beta_I,
            b=b,
            g_I=g_I,
            T=T,
            network_type=network_type,
            n_nodes=n_nodes,
            p=p,
            m=m,
            seed=seed,
            strategy_choice_func=strategy_choice_func,
            tau=tau,
            record_trajectory=True,
        )
        trajs[(X0, I0)] = {"final": final, "X": X_traj, "I": I_traj}
        # Save plots
        t = np.arange(len(X_traj))
        plt.figure(figsize=(6, 4))
        plt.plot(t, X_traj, label="X(t)")
        plt.plot(t, I_traj, label="I(t)")
        plt.xlabel("time step")
        plt.ylabel("value")
        plt.title(f"Time series X/I  X0={X0:.2f}, I0={I0:.2f}")
        plt.legend()
        plt.tight_layout()
        plt.savefig(os.path.join(out_dir, f"timeseries_X0_{int(X0*100)}_I0_{int(I0*100)}.png"))
        plt.close()

        plt.figure(figsize=(5, 5))
        plt.plot(I_traj, X_traj, marker=".", linewidth=1)
        plt.xlabel("I(t)")
        plt.ylabel("X(t)")
        plt.title(f"Phase plot X vs I  X0={X0:.2f}, I0={I0:.2f}")
        plt.tight_layout()
        plt.savefig(os.path.join(out_dir, f"phase_X0_{int(X0*100)}_I0_{int(I0*100)}.png"))
        plt.close()
    return trajs

### Configure and Run Analyses

In [18]:
def main():
    out_dir = "output_ev_analysis"
    os.makedirs(out_dir, exist_ok=True)

    # ---- Config: adjust these for more/less compute ----
    # Model params (baseline)
    a0 = 2.0
    b = 1.0
    g_I = 0.05

    # Network params
    network_type = "ER"  # "WS" or "BA"
    n_nodes = 200
    p = 0.05
    m = 2

    # Strategy rule: "imitate" or "logit"
    strategy_choice_func = "imitate"
    tau = 1.0

    # Time horizon
    T = 300

    # Heatmap sweep resolution & trials
    X0_grid = np.linspace(0.0, 1.0, 11)  # 11 values (lower resolution)
    I0_grid = np.linspace(0.0, 1.0, 11)
    trials_per_cell = 3  # increase for better averaging; reduce if runtime is high

    # Sensitivity sweep
    beta_vals = np.linspace(0.0, 4.0, 10)
    X0_sens = [0.1, 0.2, 0.3, 0.4, 0.6]  # representative seeds
    sensitivity_trials = 3

    # Representative phase plot cases
    phase_cases = [(0.02, 0.02), (0.05, 0.05), (0.2, 0.05), (0.5, 0.1)]

    # Seeds
    seed_base_heatmap = 42
    seed_base_sensitivity = 42
    seed_base_phase = 42

    # Baseline beta to use for heatmap
    beta_I_baseline = 2.0

    # ---- Run heatmap: final adoption vs (X0, I0) ----
    print("Starting heatmap sweep (this may take some time)...")
    X0_vals, I0_vals, H = heatmap_final_adoption(
        X0_grid=X0_grid,
        I0_grid=I0_grid,
        a0=a0,
        beta_I=beta_I_baseline,
        b=b,
        g_I=g_I,
        n_nodes=n_nodes,
        p=p,
        m=m,
        T=T,
        trials_per_cell=trials_per_cell,
        network_type=network_type,
        strategy_choice_func=strategy_choice_func,
        tau=tau,
        seed_base=seed_base_heatmap,
        out_dir=out_dir,
    )

    # Save heatmap figure
    plt.figure(figsize=(7, 5))
    im = plt.imshow(H, origin="lower", aspect="auto", extent=[X0_vals[0], X0_vals[-1], I0_vals[0], I0_vals[-1]])
    plt.colorbar(im, label="Mean final adoption X*")
    plt.xlabel("Initial adoption X0")
    plt.ylabel("Initial infrastructure I0")
    plt.title(f"Heatmap: Mean final adoption X* vs (X0, I0)\n(beta_I={beta_I_baseline:.2f})")
    plt.tight_layout()
    plt.savefig(os.path.join(out_dir, "heatmap_final_adoption.png"))
    plt.close()
    print("Heatmap saved to:", os.path.join(out_dir, "heatmap_final_adoption.png"))

    # ---- Sensitivity: beta_I ----
    print("Starting sensitivity sweep over beta_I ...")
    X0_list, betas, sens_res = sensitivity_betaI(
        beta_vals=beta_vals,
        X0_values=X0_sens,
        I0=0.05,
        a0=a0,
        b=b,
        g_I=g_I,
        n_nodes=n_nodes,
        p=p,
        m=m,
        T=T,
        trials=sensitivity_trials,
        network_type=network_type,
        strategy_choice_func=strategy_choice_func,
        tau=tau,
        seed_base=seed_base_sensitivity,
        out_dir=out_dir,
    )
    # Plot sensitivity curves 
    plt.figure(figsize=(7, 5))
    for i, x0 in enumerate(X0_list):
        plt.plot(betas, sens_res[i, :], label=f"X0={x0:.2f}", marker="o", linewidth=1)
    plt.xlabel("beta_I")
    plt.ylabel("Mean final adoption X*")
    plt.title("Sensitivity: final adoption vs beta_I")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(out_dir, "sensitivity_betaI.png"))
    plt.close()
    print("Sensitivity plot saved to:", os.path.join(out_dir, "sensitivity_betaI.png"))

    # ---- Phase trajectories ----
    print("Collecting representative trajectories...")
    trajs = collect_trajectories(
        phase_cases,
        a0=a0,
        beta_I=beta_I_baseline,
        b=b,
        g_I=g_I,
        n_nodes=n_nodes,
        p=p,
        m=m,
        T=T,
        network_type=network_type,
        strategy_choice_func=strategy_choice_func,
        tau=tau,
        seed_base=seed_base_phase,
        out_dir=out_dir,
    )
    print("Phase plots saved to:", out_dir)

    # ---- Tipping estimate (rough) ----
    # For each I0 row in heatmap, estimate smallest X0 where mean final >= threshold
    threshold = 0.9
    tipping_estimates = []
    for i_idx, I0 in enumerate(I0_vals):
        row = H[i_idx, :]
        idxs = np.where(row >= threshold)[0]
        tipping_estimates.append(X0_vals[int(idxs[0])] if len(idxs) > 0 else np.nan)
    tipping_df = pd.DataFrame({"I0": I0_vals, "min_X0_for_Xstar_ge_0.9": tipping_estimates})
    tipping_df.to_csv(os.path.join(out_dir, "tipping_estimates.csv"))
    print("Tipping estimates saved to:", os.path.join(out_dir, "tipping_estimates.csv"))

    # ---- Summary artifact index ----
    artifact_index = {
        "heatmap_png": os.path.join(out_dir, "heatmap_final_adoption.png"),
        "heatmap_csv": os.path.join(out_dir, "heatmap_final_adoption.csv"),
        "sensitivity_png": os.path.join(out_dir, "sensitivity_betaI.png"),
        "sensitivity_csv": os.path.join(out_dir, "sensitivity_betaI.csv"),
        "phase_plots_dir": out_dir,
        "tipping_estimates_csv": os.path.join(out_dir, "tipping_estimates.csv"),
    }
    pd.DataFrame(list(artifact_index.items()), columns=["artifact", "path"]).to_csv(os.path.join(out_dir, "artifact_index.csv"), index=False)

    print("All done. Artifacts written to:", out_dir)


if __name__ == "__main__":
    main()


Starting heatmap sweep (this may take some time)...
Heatmap saved to: output_ev_analysis/heatmap_final_adoption.png
Starting sensitivity sweep over beta_I ...
Sensitivity plot saved to: output_ev_analysis/sensitivity_betaI.png
Collecting representative trajectories...
Phase plots saved to: output_ev_analysis
Tipping estimates saved to: output_ev_analysis/tipping_estimates.csv
All done. Artifacts written to: output_ev_analysis
