
# <p style="text-align: center; margin-bottom: 0cm;">Exploring the Daley-Kendall Model using ABMs</p>

#### <p style="text-align: center; margin-bottom: 0cm;">Lowell Monis</p>


***

This project has been completed in fulfillment of the requirements for an Honors Citation in Computational Mathematics, Sciences, and Engineering 202, Computational Modeling Tools and Techniques, taught by Dr. Nathan Haut during the Fall of 2024.

***

## Preamble

***

This study will make use of the following Python modules, libraries, and packages. It is essential to run the following cell to avoid running into errors while viewing this notebook. The following cell also contains setup commands for styles that may be used in the notebook.

***

In [None]:
import numpy as np
from scipy.integrate import solve_ivp
import matplotlib.pyplot as plt
import random
import time
from IPython.display import display, clear_output
import os
import imageio.v2 as imageio

## Code

### Ordinary Differential Equation Visualization

In [None]:
def daley_kendall(time, y, beta, gamma):
    """
    Sets up the Daley-Kendal ordinary differential equation system to model the spread of rumors in an isolated environment.

    Args:
        time: array-like, time stamps to iterate over.
        y: array-like, initial values for the three variables, namely respective fractions of ignorants (I), spreaders (S), and stiflers (R).
        beta: float, interaction rate as a percentage.
        gamma: float, stifling rate as a percentage.

    returns:
        diffs: array-like, list of results from the application of the differential equations.
    """
    I, S, R = y
    dS_dt = -beta * I * S
    dI_dt = beta * I * S - gamma * I
    dR_dt = gamma * I

    diffs = [dI_dt, dS_dt, dR_dt]
    
    return diffs

In [None]:
# parameters
beta = 0.3
gamma = 0.1

# initial conditions: [I, S, R]
I0 = 0.1
S0 = 0.5
R0 = 0.0
initial_conditions = [I0, S0, R0]

# creating time points
time = np.linspace(0, 100, 1000)

In [None]:
solution = solve_ivp(daley_kendall, (0, 100), initial_conditions, t_eval = time, args=(beta, gamma))
I, S, R = solution.y

In [None]:
# plot
plt.figure(figsize=(10, 6))
plt.plot(time, S, label="Ignorants (S)", color="blue")
plt.plot(time, I, label="Spreaders (I)", color="red")
plt.plot(time, R, label="Stiflers (R)", color="green")
plt.title("Daley-Kendall Rumor Spread Timeline")
plt.xlabel("Time")
plt.ylabel("Fraction of Sample Population")
plt.legend()
plt.show()

### Agent-Based Model

In [None]:
class Agent:
    """
    Initiates an agent in the simulation of the Daley-Kendall ordinary differential equation system in an isolated environment.

    Args:
        grid_size: int, length of the side of a square grid
        tendency: array-like, initial values for the three variables, namely respective fractions of ignorants (I), spreaders (S), and stiflers (R).
        state: str, either ignorant, 
    """
    def __init__(self, grid_size, tendency, state = 'ignorant'):
        self.x = random.uniform(0, grid_size)                  # x and y position of the individual in the environment
        self.y = random.uniform(0, grid_size)
        self.memory = random.randint(10,20)                    # memory capacity of the individual in time steps
        self.remember = 0                                      # time for which the agent has known the rumor
        self.max_time = 0                                      # maximum time the individual can keep conversation
        self.tendency = tendency                               # tendency of the individual to indulge in gossip, either a 1 or 0
        self.interaction_threshold = random.uniform(1.0, 3.0)  # distance threshold for interaction
        self.state = state                                     # initial state
        self.social_factor = random.choice([-1, 0, 1])         # randomly assigns agent as either an introvert (-1), extrovert (1), or ambivert (0)
        self.in_conversation = None                            # tracks current conversation partner
        self.conversation_timer = 0                            # tracks conversation duration

    def move(self, grid_size):
        if self.in_conversation is None:                       # only move if not in a conversation
            directions = ["up", "down", "left", "right"]
            move = random.choice(directions)                   # chooses a random direction to move 0.5 points in
            if move == "up":
                self.y = min(grid_size, self.y + 0.5)
            elif move == "down":
                self.y = max(0, self.y - 0.5)
            elif move == "left":
                self.x = max(0, self.x - 0.5)
            elif move == "right":
                self.x = min(grid_size, self.x + 0.5)

    def crave_interaction(self, others):                                               # seek a partner for conversation
        for other in others:
            if other is not self and other.in_conversation is None:                    # free agents only
                distance = np.sqrt((self.x - other.x)**2 + (self.y - other.y)**2)
                if distance < self.interaction_threshold:                              # checks if partner is close enough to interact
                    interaction_likelihood = self.social_factor * other.social_factor  # verifies if social factors are compatible
                    if interaction_likelihood == 0:
                        self.max_time = 2
                        self.in_conversation = other
                        other.in_conversation = self
                        self.conversation_timer = 0                                    # starts the conversation
                        other.conversation_timer = 0
                    elif interaction_likelihood == 1:
                        self.max_time = 5
                        self.in_conversation = other
                        other.in_conversation = self
                        self.conversation_timer = 0
                        other.conversation_timer = 0

    def interact(self):
        if self.in_conversation:
            partner = self.in_conversation
            self.conversation_timer += 1                                               # increments the conversation timer
            partner.conversation_timer += 1

            # interaction logic based on Daley-Kendall model
            if self.state == "ignorant" and partner.state == "spreader":
                if self.tendency == 1:
                    self.state = "spreader"
                    return
                else:
                    self.state = "stifler"
                    return
            elif self.state == "spreader" and partner.state == "ignorant":
                if partner.tendency == 1:
                    partner.state = "spreader"
                else:
                    partner.state = "stifler"
                    return
            elif self.state == "ignorant" and self.state == "ignorant":
                self.leave_conversation()
                return
            elif self.state == "stifler" and self.state == "stifler":
                self.leave_conversation()
                return
            elif self.state == "spreader" and self.state == "spreader":
                self.leave_conversation()
                return
            elif self.state == "ignorant" and self.state == "stifler":
                self.leave_conversation()
                return
            elif self.state == "stifler" and self.state == "ignorant":
                self.leave_conversation()
                return
            if self.conversation_timer >= self.max_time:                             # ends conversation if maximum time is reached
                self.leave_conversation()
                return

    def leave_conversation(self):
        if self.in_conversation:
            partner = self.in_conversation
            self.in_conversation = None
            partner.in_conversation = None
            self.conversation_timer = 0
            partner.conversation_timer = 0
    
    def memory_update(self):                                                         # updates time for which rumor has been spread
        if self.state == "spreader":
            self.remember += 1
            if self.remember == self.memory:                                         # state changes to ignorant if extroverted, else stifler
                if self.social_factor == 1:
                    self.state = "ignorant"
                else:
                    self.state = "stifler"

In [None]:
def simulate(gamma, num_agents=250, grid_size=50, timesteps=101, gif_name=None):
    agents = [Agent(grid_size, tendency=1, state = 'ignorant') for _ in range(num_agents)]
    stiflers = random.sample([agent for agent in agents], int(gamma*num_agents))  # creates agents vulnerable to stifling depending on stifling rate
    for stifler in stiflers:
        stifler.tendency = 0
    # initialize multiple spreaders if they are extroverts. number of agents who are spreaders is equivalent to percentage created as initial value
    spreaders = random.sample([agent for agent in agents if agent.social_factor == 1 and agent.tendency == 1], int(num_agents*I0))
    for spreader in spreaders:
        spreader.state = "spreader"

    # define state colors for visualization
    state_colors = {"ignorant": "blue", "spreader": "red", "stifler": "green"}

    if gif_name == None:
        # simulation loop
        for t in range(timesteps):
            for agent in agents:
                if agent.in_conversation is None:
                    agent.move(grid_size)
                    agent.crave_interaction(agents)
                    agent.memory_update()
                else:
                    agent.interact()
                    agent.memory_update()
    
            # visualization
            fig = plt.figure(figsize=(6, 6))
            plt.xlim(0, grid_size)
            plt.ylim(0, grid_size)
            for agent in agents:
                plt.scatter(agent.x, agent.y, color=state_colors[agent.state], s=20)
            plt.title(t)
            plt.tick_params(labelbottom = False, left = False, bottom = False, labelleft= False)
            clear_output(wait=True)
            display(plt.gcf())
            plt.close(fig)

    else:

        


        # temporary directory to save frames to create the animation
        temp_dir = "temp"
        os.makedirs(temp_dir, exist_ok=True)
        frame_paths = []
    
        # simulation loop
        for t in range(timesteps):
            for agent in agents:
                if agent.in_conversation is None:
                    agent.move(grid_size)
                    agent.crave_interaction(agents)
                    agent.memory_update()
                else:
                    agent.interact()
                    agent.memory_update()
    
            # visualization
            fig = plt.figure(figsize=(6, 6))
            plt.xlim(0, grid_size)
            plt.ylim(0, grid_size)
            for agent in agents:
                plt.scatter(agent.x, agent.y, color=state_colors[agent.state], s=20)
            plt.title(t)
            plt.tick_params(labelbottom = False, left = False, bottom = False, labelleft= False)
            clear_output(wait=True)
            display(plt.gcf())
            frame_path = os.path.join(temp_dir, f"frame_{t:03d}.png")
            plt.savefig(frame_path)
            frame_paths.append(frame_path)
            plt.close(fig)
    
        # creates GIF
        with imageio.get_writer(gif_name, mode="I", duration=750) as writer:
            for frame_path in frame_paths:
                image = imageio.imread(frame_path)
                writer.append_data(image)
    
        # clean up temporary frames
        for frame_path in frame_paths:
            os.remove(frame_path)
        os.rmdir(temp_dir)

# runs the simulation and creates the animation
simulate(gamma=gamma)