In [2]:
%pip install mesa==2.3.2

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [6]:
%pip install solara

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [None]:
from mesa.space import NetworkGrid
from mesa.datacollection import DataCollector
import networkx as nx
import matplotlib.pyplot as plt
import ipywidgets as widgets
from IPython.display import display, clear_output
from mesa import Agent, Model
from mesa.time import RandomActivation
from mesa.space import MultiGrid
from mesa.datacollection import DataCollector


In [2]:
# --- Voter Agent ---

class VoterAgent(Agent):
    def __init__(self, unique_id, model, initial_state='S', resistance=0.1, decay_prob=0.05, threshold=0.5, group=None, x=0, y=0):
        super().__init__(unique_id, model)
        self.state = initial_state
        self.resistance = resistance
        self.previous_state = initial_state
        self.decay_prob = decay_prob
        self.threshold = threshold
        self.group = group or self.random.choice(['Youth', 'Senior', 'Urban', 'Rural'])
        self.x = x  # Geographic X position
        self.y = y  # Geographic Y position

    def step(self):
        if self.state in ['A', 'B'] and self.random.random() < self.decay_prob:
            self.state = 'S'
            return

        if self.state != 'S':
            return

        neighbors = self.model.grid.get_neighbors(self.pos)
        influence_A = sum(1 for n in neighbors if n.state == 'A')
        influence_B = sum(1 for n in neighbors if n.state == 'B')
        total = len(neighbors)

        if total == 0:
            return

        if influence_A / total >= self.threshold:
            new_state = 'A'
        elif influence_B / total >= self.threshold:
            new_state = 'B'
        else:
            return

        if self.random.random() > self.resistance:
            if self.previous_state != new_state:
                self.resistance = min(1.0, self.resistance + 0.1)
            self.previous_state = self.state
            self.state = new_state

    # Method to calculate geographical influence
    def get_distance(self, other_agent):
        return ((self.x - other_agent.x)**2 + (self.y - other_agent.y)**2)**0.5

# --- Candidate Agent ---
class CandidateAgent(Agent): 
    def __init__(self, unique_id, model, party, reach_prob=0.1):
        super().__init__(unique_id, model)
        self.party = party
        self.reach_prob = reach_prob
        self.x = self.random.random()  # Candidate's position on the x-axis
        self.y = self.random.random()  # Candidate's position on the y-axis

    def get_distance(self, other_agent):
        """
        Calculate the Euclidean distance between this candidate and another agent (voter or media).
        """
        if isinstance(other_agent, VoterAgent) or isinstance(other_agent, MediaAgent):
            # Compute Euclidean distance
            distance = ((self.x - other_agent.x) ** 2 + (self.y - other_agent.y) ** 2) ** 0.5
            return distance
        else:
            return None
        
    def step(self):
        support = {'A': 0, 'B': 0, 'S': 0}
        for a in self.model.schedule.agents:
            if isinstance(a, VoterAgent):
                support[a.state] += 1

        if self.party == 'A' and support['A'] < support['B']:
            self.reach_prob = min(1.0, self.reach_prob + 0.05)
        elif self.party == 'B' and support['B'] < support['A']:
            self.reach_prob = min(1.0, self.reach_prob + 0.05)

        for a in self.model.schedule.agents:
            if isinstance(a, VoterAgent) and a.state == 'S':
                distance = self.get_distance(a)
                influence = self.reach_prob / (1 + distance)  # Influence is inversely related to distance
                if self.random.random() < influence:
                    a.state = self.party

# --- Media Agent ---
class MediaAgent(Agent):
    def __init__(self, unique_id, model, target='A', influence_strength=0.3, trigger_step=10):
        super().__init__(unique_id, model)
        self.target = target  # Targeting 'A' or 'B'
        self.influence_strength = influence_strength  # Influence strength of media
        self.trigger_step = trigger_step  # When to trigger the media influence

    def step(self):
        # Dynamic triggering based on trends
        if self.model.schedule.steps % self.trigger_step == 0:
            for a in self.model.schedule.agents:
                if isinstance(a, VoterAgent) and a.state == 'S':  # Only target undecided voters
                    # Influence probability based on media strength and current media target
                    prob = self.influence_strength
                    if self.random.random() < prob:
                        a.state = self.target  # Change the state to the target party


# --- Election Model ---
class ElectionModel(Model):
    def __init__(self, N=100, decay_prob=0.05, threshold=0.5, media_count=2, communities=3):
        # Create a probability matrix for the stochastic block model
        # p[i][j] is the probability of an edge between nodes in community i and community j
        p = [[0.2 if i != j else 0.4 for i in range(communities)] for j in range(communities)]
        
        # Create a stochastic block model graph
        self.G = nx.stochastic_block_model([N//communities]*communities, p)
        
        self.grid = NetworkGrid(self.G)
        self.schedule = RandomActivation(self)
        self.datacollector = DataCollector(
            model_reporters={
                'A': lambda m: sum(1 for a in m.schedule.agents if isinstance(a, VoterAgent) and a.state == 'A'),
                'B': lambda m: sum(1 for a in m.schedule.agents if isinstance(a, VoterAgent) and a.state == 'B'),
                'S': lambda m: sum(1 for a in m.schedule.agents if isinstance(a, VoterAgent) and a.state == 'S'),
            }
        )

        for node_id in self.G.nodes:
            group = 'Urban' if node_id % 2 == 0 else 'Rural'
            # Geographical position for each agent (randomly placed)
            x, y = self.random.random(), self.random.random()
            a = VoterAgent(node_id, self, decay_prob=decay_prob, threshold=threshold, group=group, x=x, y=y)
            self.grid.place_agent(a, node_id)  # Place agent at node_id in the network
            self.schedule.add(a)

        for i in range(media_count):
            target = 'A' if i % 2 == 0 else 'B'
            media_agent = MediaAgent(f'media_{i}', self, target=target, influence_strength=0.3, trigger_step=10)
            self.schedule.add(media_agent)

        self.candA = CandidateAgent('candA', self, party='A')
        self.candB = CandidateAgent('candB', self, party='B')
        self.schedule.add(self.candA)
        self.schedule.add(self.candB)

        self.final_results = None

    def step(self):
        self.datacollector.collect(self)
        self.schedule.step()

    def hold_election(self):
        vote_counts = {'A': 0, 'B': 0}
        for a in self.schedule.agents:
            if isinstance(a, VoterAgent):
                if a.state in ['A', 'B']:
                    vote_counts[a.state] += 1
        self.final_results = vote_counts
        print("\nElection Results:")
        print(f"Candidate A: {vote_counts['A']} votes")
        print(f"Candidate B: {vote_counts['B']} votes")

In [3]:
def run_model(N=100, steps=30, decay_prob=0.05, threshold=0.5, media_count=2, communities=3):
    model = ElectionModel(N=N, decay_prob=decay_prob, threshold=threshold, media_count=media_count, communities=communities)
    for _ in range(steps):
        model.step()

    results = model.datacollector.get_model_vars_dataframe()
    results.plot(marker='o', figsize=(10, 6), grid=True, title="Election Dynamics")
    plt.xlabel("Step")
    plt.ylabel("Number of Voters")
    plt.show()

    model.hold_election()

# --- Widgets ---
widgets.interact(
    run_model,
    N=widgets.IntSlider(value=100, min=20, max=300, step=10, description='Voters'),
    steps=widgets.IntSlider(value=30, min=10, max=100, step=5, description='Steps'),
    decay_prob=widgets.FloatSlider(value=0.05, min=0.0, max=0.2, step=0.01, description='Decay Prob'),
    threshold=widgets.FloatSlider(value=0.5, min=0.0, max=1.0, step=0.05, description='Threshold'),
    media_count=widgets.IntSlider(value=2, min=1, max=5, step=1, description='Media Agents'),
    communities=widgets.IntSlider(value=3, min=2, max=5, step=1, description='Communities')
)

interactive(children=(IntSlider(value=100, description='Voters', max=300, min=20, step=10), IntSlider(value=30…

<function __main__.run_model(N=100, steps=30, decay_prob=0.05, threshold=0.5, media_count=2, communities=3)>

In [12]:
import random

In [10]:
class OpinionAgent(Agent):
    """ An agent with an opinion, based on DeGroot's model. """
    def __init__(self, unique_id, model, opinion, neighbors):
        super().__init__(unique_id, model)
        self.opinion = opinion  # Opinion of the agent (between 0 and 1, for example)
        self.neighbors = neighbors  # List of neighboring agents

    def step(self):
        # Update the agent's opinion based on the weighted average of neighbors' opinions
        total_opinion = 0
        total_weight = 0
        for neighbor in self.neighbors:
            weight = self.model.get_weight(self, neighbor)
            total_opinion += weight * neighbor.opinion
            total_weight += weight
        # Normalize the opinion to be between 0 and 1 (optional)
        self.opinion = total_opinion / total_weight if total_weight != 0 else self.opinion

class OpinionDynamicsModel(Model):
    """ The model for simulating opinion dynamics using DeGroot's model. """
    def __init__(self, num_agents, width, height):
        self.num_agents = num_agents
        self.grid = MultiGrid(width, height, True)
        self.schedule = RandomActivation(self)

        # Create agents with random initial opinions between 0 and 1
        for i in range(self.num_agents):
            opinion = random.random()  # Random initial opinion
            # Assuming a random connection for simplicity here
            neighbors = []  # Initially, no neighbors
            a = OpinionAgent(i, self, opinion, neighbors)
            self.schedule.add(a)
            x = self.random.randrange(self.grid.width)
            y = self.random.randrange(self.grid.height)
            self.grid.place_agent(a, (x, y))

        # Data collection
        self.datacollector = DataCollector(
            agent_reporters={"Opinion": "opinion"}
        )

    def get_weight(self, agent1, agent2):
        # Simple function to define the weight of one agent's influence on another.
        # This could be based on social distance, network connection, etc.
        # For simplicity, here we use a fixed weight
        return 1 / len(agent1.neighbors) if len(agent1.neighbors) > 0 else 1

    def step(self):
        # Collect data for each step
        self.datacollector.collect(self)

        # Update opinions of all agents
        self.schedule.step()

In [14]:
# Run the model
model = OpinionDynamicsModel(100, 10, 10)

for i in range(100):  # Run for 100 steps
    model.step()

# Extract data for analysis
data = model.datacollector.get_agent_vars_dataframe()
print(data.tail())

               Opinion
Step AgentID          
99   81       0.951815
     12       0.498503
     59       0.486528
     2        0.896221
     89       0.987667


  super().__init__(unique_id, model)
