In [None]:
# need to install numba and numpy

import pickle
import random
import math
import logging
import os
import yaml

import numpy as np
from datetime import datetime
from numba import jit

In [1]:

class Agent:
    def __init__(self, cultural_sway):
        self.strategy = None  # 'T' or 'I'
        self.payoff = 0.0
        self.cultural_sway = cultural_sway
        self.history = []  
        self.payoff_history = []

In [6]:

def payoff_matrix(theta, strat1, strat2):
    if strat1 == 'T' and strat2 == 'T':
        p = 16 - theta
    elif strat1 == 'I' and strat2 == 'I':
        p = theta
    else:
        p = 4
    return p, p  # symmetric

@jit(nopython=True)
def update_strategies_jit(strategies, payoffs, beta):
    new_strategies = strategies.copy()
    n = len(strategies)
    for i in range(n):
        j = np.random.randint(0, n)
        while j == i:
            j = np.random.randint(0, n)
        prob = 1 / (1 + np.exp(-beta * (payoffs[j] - payoffs[i])))
        if np.random.random() < prob:
            new_strategies[i] = strategies[j]
    return new_strategies

def payoff_matrix_jit(theta, strat1, strat2):
    if strat1 == 0 and strat2 == 0:
        p = 16 - theta
    elif strat1 == 1 and strat2 == 1:
        p = theta
    else:
        p = 4
    return p, p

class Game:
    def __init__(self, theta, beta, max_steps, population):
        self.theta = theta
        self.beta = beta
        self.max_steps = max_steps
        self.population = population
        for agent in population:
            agent.payoff = 0.0
            agent.history = []
            agent.payoff_history = []
        self.strategies = np.array([0 if a.strategy == 'T' else 1 for a in population])
        self.payoffs = np.zeros(len(population))
        logging.info(f"Initialized game for theta {theta}, beta {beta}, max_steps {max_steps}, population size {len(population)}")

    def update_payoff(self):
        # Compute population fractions
        frac_t = np.mean(self.strategies == 0)
        frac_i = 1 - frac_t
        
        # Calculate average payoff for each agent
        payoff_tt = payoff_matrix_jit(self.theta, 0, 0)[0]
        payoff_ti = payoff_matrix_jit(self.theta, 0, 1)[0]
        payoff_it = payoff_matrix_jit(self.theta, 1, 0)[0]
        payoff_ii = payoff_matrix_jit(self.theta, 1, 1)[0]
        
        self.payoffs = np.where(self.strategies == 0, 
                                frac_t * payoff_tt + frac_i * payoff_ti,
                                frac_t * payoff_it + frac_i * payoff_ii)
        
        # Update agent objects and record history
        for i, agent in enumerate(self.population):
            agent.payoff = self.payoffs[i]
            agent.payoff_history.append(agent.payoff)

    def update_strategies(self):
        self.strategies = update_strategies_jit(self.strategies, self.payoffs, self.beta)
        
        # Update agent objects
        for i, agent in enumerate(self.population):
            agent.strategy = 'T' if self.strategies[i] == 0 else 'I'

    def run(self):
        logging.info(f"Starting evolution for {self.max_steps} steps")
        for step in range(self.max_steps):
            self.update_payoff()
            self.update_strategies()
            # Record history after each evolution step
            for agent in self.population:
                agent.history.append(agent.strategy)
        logging.info(f"Evolution completed for theta {self.theta}")
        # Return equilibrium: fractions T and I
        t_count = sum(1 for a in self.population if a.strategy == 'T')
        i_count = len(self.population) - t_count
        frac_t = t_count / len(self.population)
        frac_i = i_count / len(self.population)
        fractions = {'T': frac_t, 'I': frac_i}
        logging.info(f"Final fractions: {fractions}")
        return fractions

In [7]:
class Simulation:
    def __init__(self, N, f_cultural, theta_list, beta, max_steps, ensemble_size):
        self.N = N
        self.f_cultural = f_cultural
        self.theta_list = theta_list
        self.beta = beta
        self.max_steps = max_steps
        self.ensemble_size = ensemble_size
        self.population = [Agent(random.random() < f_cultural) for _ in range(N)]
        self.equilibria = {}  # theta: averaged fractions
        logging.info(f"Initialized simulation with N={N}, f_cultural={f_cultural}, theta_list={theta_list}, beta={beta}, max_steps={max_steps}, ensemble_size={ensemble_size}")

    def initialize_for_theta(self, theta):
        if not self.equilibria:
            for agent in self.population:
                agent.strategy = random.choice(['T', 'I'])
            logging.info(f"Initialized strategies randomly for first theta {theta}")
        else:
            prev_thetas = list(self.equilibria.keys())
            closest_theta = min(prev_thetas, key=lambda t: abs(t - theta))
            frac_dict = self.equilibria[closest_theta]
            for agent in self.population:
                if agent.cultural_sway:
                    agent.strategy = 'T' if random.random() < frac_dict['T'] else 'I'
                else:
                    agent.strategy = random.choice(['T', 'I'])
            logging.info(f"Initialized strategies for theta {theta} based on closest previous theta {closest_theta} with T fraction {frac_dict['T']}")

    def average_fractions(self, frac_list):
        if not frac_list:
            return {}
        keys = frac_list[0].keys()
        avg = {}
        for k in keys:
            avg[k] = sum(f[k] for f in frac_list) / len(frac_list)
        return avg

    def run_simulation(self):
        os.makedirs('results', exist_ok=True)
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        run_dir = f'results/run_{timestamp}'
        os.makedirs(run_dir, exist_ok=True)
        self.data = {}
        logging.info("Starting full simulation run")
        for theta in self.theta_list:
            logging.info(f"Processing theta {theta}")
            self.initialize_for_theta(theta)
            initial_strategies = [a.strategy for a in self.population]
            ensemble_fractions = []
            self.data[theta] = {}
            for i in range(self.ensemble_size):
                # Reset strategies to initial
                for a, s in zip(self.population, initial_strategies):
                    a.strategy = s
                game = Game(theta, self.beta, self.max_steps, self.population)
                game.strategies = np.array([0 if a.strategy == 'T' else 1 for a in self.population])
                fractions = game.run()
                ensemble_fractions.append(fractions)
                # Collect strategies and payoffs
                strategies = [agent.history for agent in self.population]
                payoffs = [agent.payoff_history for agent in self.population]
                self.data[theta][i] = {'strategies': strategies, 'payoffs': payoffs}
            # Average fractions
            averaged_fractions = self.average_fractions(ensemble_fractions)
            self.equilibria[theta] = averaged_fractions
            logging.info(f"Completed theta {theta}, averaged final fractions: {averaged_fractions}")
        logging.info(f"Simulation completed. Equilibria: {self.equilibria}")
        # Save data to pickle
        with open(f'{run_dir}/simulation_data.pkl', 'wb') as f:
            pickle.dump(self.data, f)
        # Save metadata to YAML
        metadata = {
            'N': self.N,
            'f_cultural': self.f_cultural,
            'theta_list': self.theta_list,
            'beta': self.beta,
            'max_steps': self.max_steps,
            'ensemble_size': self.ensemble_size,
            'equilibria': [{'theta': theta, **self.equilibria[theta]} for theta in self.theta_list],
            'timestamp': timestamp,
            'run_dir': run_dir
        }
        with open(f'{run_dir}/simulation_metadata.yaml', 'w') as f:
            yaml.dump(metadata, f)
        return self.equilibria


In [8]:

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

logging.info("Starting main simulation")
# Parameters
N = 1000 # ensure this is even
f_cultural = 3 / 4
theta_list = [15, 1, 13, 1]
beta = 0.01
max_steps = 50
ensemble_size = 100

sim = Simulation(N, f_cultural, theta_list, beta, max_steps, ensemble_size)
equilibria = sim.run_simulation()
logging.info("Main simulation completed")

2025-11-04 21:04:22,280 - INFO - Starting main simulation
2025-11-04 21:04:22,282 - INFO - Initialized simulation with N=1000, f_cultural=0.75, theta_list=[15, 1, 13, 1], beta=0.01, max_steps=50, ensemble_size=100
2025-11-04 21:04:22,282 - INFO - Starting full simulation run
2025-11-04 21:04:22,283 - INFO - Processing theta 15
2025-11-04 21:04:22,283 - INFO - Initialized strategies randomly for first theta 15
2025-11-04 21:04:22,283 - INFO - Initialized game for theta 15, beta 0.01, max_steps 50, population size 1000
2025-11-04 21:04:22,284 - INFO - Starting evolution for 50 steps
2025-11-04 21:04:22,540 - INFO - Evolution completed for theta 15
2025-11-04 21:04:22,540 - INFO - Final fractions: {'T': 0.068, 'I': 0.932}
2025-11-04 21:04:22,541 - INFO - Initialized game for theta 15, beta 0.01, max_steps 50, population size 1000
2025-11-04 21:04:22,541 - INFO - Starting evolution for 50 steps
2025-11-04 21:04:22,549 - INFO - Evolution completed for theta 15
2025-11-04 21:04:22,549 - INFO