In [None]:
from mesa import Agent, Model
from mesa.datacollection import DataCollector
from scipy.stats import poisson, norm

import random
import matplotlib.pyplot as plt
import numpy as np
import math
from abc import ABC, abstractmethod

In [None]:
# Constants
SLOT_DURATION_MS = 12000  # Duration of an Ethereum slot in milliseconds
TIME_GRANULARITY_MS = 100 # Simulation time step in milliseconds
ATTESTATION_TIME_MS = 4000 # Default time for Attesters to attest (can be varied per Attester)

# Network Latency Model Parameters: latency = BASE_NETWORK_LATENCY_MS + (distance_ratio * MAX_ADDITIONAL_NETWORK_LATENCY_MS)
BASE_NETWORK_LATENCY_MS = 50  # Minimum network latency regardless of distance
MAX_ADDITIONAL_NETWORK_LATENCY_MS = 2000 # Max additional latency for max distance on sphere

# MEV yield model (simulating Builder bids increasing over time)
## The number is set randomly now.
BASE_MEV_AMOUNT = 0.2  # Initial MEV in ETH
MEV_INCREASE_PER_SECOND = 0.08 # MEV increase per second (ETH/sec)

# --- Spatial Classes ---

class Space(ABC):
    """
    Abstract base class defining the interface for a 'space'
    where nodes can live. Subclasses must implement:
      - sample_point()
      - distance(p1, p2)
    """

    @abstractmethod
    def sample_point(self):
        """Samples a random point within the space."""
        pass

    @abstractmethod
    def distance(self, p1, p2):
        """Calculates the distance between two points in the space."""
        pass

    @abstractmethod
    def get_area(self):
        """Returns the total 'area' or size of the space."""
        pass

    @abstractmethod
    def get_max_dist(self):
        """Returns the maximum possible distance between any two points in the space."""
        pass

class SphericalSpace(Space):
    """
    Sample points on (or near) the unit sphere.
    distance() returns geodesic distance (great-circle distance).
    """
    def sample_point(self):
        """Samples a random point on the unit sphere (x, y, z)."""
        # Sample (x, y, z) from Normal(0, 1),
        # then normalize to lie on the unit sphere.
        while True:
            x = random.gauss(0, 1)
            y = random.gauss(0, 1)
            z = random.gauss(0, 1)
            r2 = x*x + y*y + z*z
            if r2 > 1e-12: # Avoid division by zero for very small magnitudes
                scale = 1.0 / math.sqrt(r2)
                return (x*scale, y*scale, z*scale)

    def distance(self, p1, p2):
        """
        Calculates the geodesic distance between two points on a unit sphere.
        Distance = arc length = arccos(dot(p1,p2)).
        """
        dotp = p1[0]*p2[0] + p1[1]*p2[1] + p1[2]*p2[2]
        # Numerical safety clamp for dot product to be within [-1, 1] due to floating point inaccuracies
        dotp = max(-1.0, min(1.0, dotp))
        return math.acos(dotp)
    
    def get_area(self):
        """Returns the surface area of a unit sphere."""
        return 4 * np.pi

    def get_max_dist(self):
        """Returns the maximum possible geodesic distance on a unit sphere (half circumference)."""
        return np.pi # Half the circumference of a unit circle (pi * diameter = pi * 2 * radius = 2*pi * 1 / 2 = pi)
    
def init_distance_matrix(positions, space):
    """
    Build the initial distance matrix for all node pairs.
    Returns a 2D list (or NumPy array) of shape (n, n).
    """
    n = len(positions)
    dist_matrix = np.zeros((n, n))
    for i in range(n):
        for j in range(i + 1, n):
            d = space.distance(positions[i], positions[j])
            dist_matrix[i][j] = d
            dist_matrix[j][i] = d # Symmetric matrix
    return dist_matrix

# --- Validator Agent Class Definition ---

class ValidatorAgent:
    def __init__(self, validator_id):
        self.validator_id = validator_id
        self.position = None # (x, y, z) coordinates in the space
        self.role = "none"  # "proposer" or "attester"
        self.network_latency_to_proposer = -1 # Latency for block from Proposer to THIS validator

        self.reset_for_new_slot()

        # Specific attributes for Proposer role
        self.proposer_strategy = None 
        # For random delay proposer strategy
        self.random_propose_time = -1 
        self.attestation_time_ms = ATTESTATION_TIME_MS # Default attestation time for Attesters


    def reset_for_new_slot(self):
        """Resets the validator's ephemeral state for a new slot."""
        self.has_proposed_block = False
        self.proposed_time_ms = -1
        self.mev_captured = 0.0 # MEV captured if supermajority is achieved
        self.block_arrival_at_self_ms = -1 # Time the Proposer's block arrives at this validator
        self.has_attested = False
        self.attested_to_proposer_block = False # True if this attester made a valid attestation for Proposer's block
        self.role = "none"

    def set_proposer_role(self):
        """Sets this validator as the Proposer for the current slot."""
        self.role = "proposer"

    def set_proposer_strategy(self, strategy):
        """Sets the Proposer's strategy for block proposal."""
        self.proposer_strategy = strategy
        # Initialize random propose time if using random strategy
        if strategy["type"] == "random_delay":
            self.random_propose_time = random.randint(strategy["min_delay_ms"], strategy["max_delay_ms"])

    def set_attester_role(self, current_proposer_id, distance_matrix, space_max_dist):
        """Sets this validator as an Attester for the current slot and calculates its specific latency."""
        self.role = "attester"
        
        # Calculate specific network latency based on distance to the current proposer
        distance = distance_matrix[self.validator_id][current_proposer_id]
        self.network_latency_to_proposer = BASE_NETWORK_LATENCY_MS + \
                                           (distance / space_max_dist) * MAX_ADDITIONAL_NETWORK_LATENCY_MS

    def get_current_mev_offer(self, current_slot_time_ms):
        """Simulates the current MEV bid offered by Relays."""
        time_in_seconds = current_slot_time_ms / 1000
        return max(BASE_MEV_AMOUNT, BASE_MEV_AMOUNT + time_in_seconds * MEV_INCREASE_PER_SECOND)

    def decide_and_propose(self, current_slot_time_ms):
        """
        Proposer (this validator) decides whether to propose a block based on its strategy.
        Returns (should_propose, mev_offer_if_proposing)
        """
        if self.role != "proposer" or self.has_proposed_block:
            return False, 0.0

        mev_offer = self.get_current_mev_offer(current_slot_time_ms)

        if self.proposer_strategy["type"] == "fixed_delay":
            if current_slot_time_ms >= self.proposer_strategy["delay_ms"]:
                return True, mev_offer
        elif self.proposer_strategy["type"] == "threshold_and_max_delay":
            if mev_offer >= self.proposer_strategy["mev_threshold"] or \
               current_slot_time_ms >= self.proposer_strategy["max_delay_ms"]:
                return True, mev_offer
        elif self.proposer_strategy["type"] == "random_delay":
            if self.random_propose_time == -1: # Should ideally be set by set_proposer_role, but as a fallback
                self.random_propose_time = random.randint(self.proposer_strategy["min_delay_ms"], self.proposer_strategy["max_delay_ms"])
            if current_slot_time_ms >= self.random_propose_time:
                return True, mev_offer
        
        return False, 0.0

    def propose_block(self, current_slot_time_ms, mev_offer):
        """Executes the block proposal action for the Proposer."""
        self.has_proposed_block = True
        self.proposed_time_ms = current_slot_time_ms
        self.mev_captured_potential = mev_offer # Store potential MEV before supermajority check
        # print(f"  Validator {self.validator_id} (Proposer) proposed block at {current_slot_time_ms}ms, MEV: {mev_offer:.4f} ETH.")

    def receive_block(self, proposed_time_ms):
        """Simulates this validator receiving the block from the Proposer."""
        # Calculate time when block arrives at THIS specific validator
        self.block_arrival_at_self_ms = proposed_time_ms + self.network_latency_to_proposer

    def decide_and_attest(self, current_slot_time_ms, proposer_proposed_time_ms):
        """
        Attester (this validator) decides whether to attest to the Proposer's block.
        proposer_proposed_time_ms: The time the Proposer actually broadcasted the block.
        """
        if self.role != "attester" or self.has_attested:
            return

        # Calculate this attester's specific block arrival time
        block_arrival_at_this_attester_ms = proposer_proposed_time_ms + self.network_latency_to_proposer

        if current_slot_time_ms >= self.attestation_time_ms:
            # Check if Proposer's block arrived before or at this Attester's attestation time
            if proposer_proposed_time_ms != -1 and \
               block_arrival_at_this_attester_ms <= self.attestation_time_ms:
                self.attested_to_proposer_block = True
                # print(f"    Validator {self.validator_id} (Attester) at {current_slot_time_ms}ms made valid attestation.")
            else:
                self.attested_to_proposer_block = False
                # print(f"    Validator {self.validator_id} (Attester) at {current_slot_time_ms}ms attested to blank or missed.")
            self.has_attested = True


In [None]:


# --- Main Simulation Loop ---
def run_multi_validator_simulation(num_validators, proposer_strategies_pool, num_slots=100):
    # Initialize the space and assign positions to all validators
    space = SphericalSpace()
    validators = [ValidatorAgent(i) for i in range(num_validators)]
    for validator in validators:
        # position
        validator.position = space.sample_point()
        # strategy
        proposer_strategy = random.choice(proposer_strategies_pool)
        validator.set_proposer_strategy(proposer_strategy)
    
    # Pre-calculate distance matrix for all validator pairs
    validator_positions = [v.position for v in validators]
    distance_matrix = init_distance_matrix(validator_positions, space)
    space_max_dist = space.get_max_dist() # Maximum possible distance in our spherical space

    total_mev_earned = 0.0
    total_successful_attestations = 0 # Sum of successful attestations across all slots
    total_attesters_count = num_slots * (num_validators - 1) # Total possible attestations across all slots
    proposed_block_times = [] # Records the time of block proposal for each slot
    supermajority_met_slots = 0 # Counter for slots where supermajority was achieved

    print(f"\n--- Simulation Started: {num_validators} Validators ---")
    print(f"Proposer will randomly choose from {len(proposer_strategies_pool)} strategies:")
    for i, strategy in enumerate(proposer_strategies_pool):
        print(f"  Strategy {i+1}: Type={strategy['type']}", end="")
        if "delay_ms" in strategy: print(f", Delay={strategy['delay_ms']}ms", end="")
        if "mev_threshold" in strategy: print(f", Threshold={strategy['mev_threshold']:.4f} ETH, Max Delay={strategy['max_delay_ms']}ms", end="")
        if "min_delay_ms" in strategy: print(f", Random Range={strategy['min_delay_ms']}-{strategy['max_delay_ms']}ms", end="")
        print()
    print(f"Attester Default Attestation Time: {ATTESTATION_TIME_MS}ms.")
    print(f"Network Latency Model: {BASE_NETWORK_LATENCY_MS}ms + (distance_ratio * {MAX_ADDITIONAL_NETWORK_LATENCY_MS}ms)\n")

    for slot_idx in range(num_slots):
        print(f"\n--- Slot {slot_idx + 1} ---")
        
        # Reset all validators for the new slot
        for validator in validators:
            validator.reset_for_new_slot()

        # Randomly select a Proposer for this slot
        proposer_agent = random.choice(validators)
        proposer_agent.set_proposer_role() # Set this validator as the Proposer
        
        # Set remaining validators as Attesters and calculate their specific latencies
        attesters = [v for v in validators if v.validator_id != proposer_agent.validator_id]
        for attester in attesters:
            attester.set_attester_role(proposer_agent.validator_id, distance_matrix, space_max_dist)
        
        current_proposer_proposed_time = -1 # Tracks when the actual Proposer proposes their block
        slot_potential_mev = 0.0 # Potential MEV for this slot, before supermajority check

        for current_slot_time_ms in range(0, SLOT_DURATION_MS, TIME_GRANULARITY_MS):
            # Proposer's decision and proposal
            should_propose, mev_offer = proposer_agent.decide_and_propose(current_slot_time_ms)
            if should_propose:
                proposer_agent.propose_block(current_slot_time_ms, mev_offer)
                current_proposer_proposed_time = proposer_agent.proposed_time_ms
                slot_potential_mev = mev_offer # Store potential MEV for this slot
                # Once proposed, Proposer doesn't propose again in this slot

            # Attesters' decisions and attestations
            # Each attester decides based on its own specific network latency to the proposer
            for attester in attesters:
                attester.decide_and_attest(
                    current_slot_time_ms, 
                    current_proposer_proposed_time
                )

            # Check if all relevant actions for this slot are done (Proposer proposed & all Attesters attested)
            if (proposer_agent.has_proposed_block and all(a.has_attested for a in attesters)) or \
               (all(a.has_attested for a in attesters)):
                break
            
        # End of slot statistics and REWARD CALCULATION based on SUPERMAJORITY
        if proposer_agent.has_proposed_block:
            proposed_block_times.append(proposer_agent.proposed_time_ms)
            
            slot_successful_attestations = sum(1 for a in attesters if a.attested_to_proposer_block)
            
            # --- SUPERMAJORITY REWARD LOGIC ---
            required_attesters_for_supermajority = math.ceil((2/3) * len(attesters))
            
            if slot_successful_attestations >= required_attesters_for_supermajority:
                proposer_agent.mev_captured = slot_potential_mev # Proposer gets full MEV + consensus reward
                total_mev_earned += proposer_agent.mev_captured
                supermajority_met_slots += 1
                print(f"Slot {slot_idx + 1} SUCCESS: Proposer earned {proposer_agent.mev_captured:.4f} ETH. Attestations: {slot_successful_attestations}/{len(attesters)} (Needed {required_attesters_for_supermajority}).")
            else:
                proposer_agent.mev_captured = 0.0 # Proposer gets zero reward if supermajority not met
                print(f"Slot {slot_idx + 1} FAILED (NO SUPERMAJORITY): Proposer earned {proposer_agent.mev_captured:.4f} ETH. Attestations: {slot_successful_attestations}/{len(attesters)} (Needed {required_attesters_for_supermajority}).")
            # --- END SUPERMAJORITY REWARD LOGIC ---

            total_successful_attestations += slot_successful_attestations

    avg_mev_per_slot = total_mev_earned / num_slots
    avg_attestation_rate = (total_successful_attestations / total_attesters_count) * 100 if total_attesters_count > 0 else 0
    supermajority_success_rate = (supermajority_met_slots / num_slots) * 100

    print(f"\n--- Simulation Results ---")
    print(f"Total Slots Simulated: {num_slots}")
    print(f"Average MEV Earned Per Slot (by Proposer): {avg_mev_per_slot:.4f} ETH")
    print(f"Average Effective Attestation Rate (across all Attesters): {avg_attestation_rate:.2f}%")
    print(f"Supermajority Attestation Success Rate: {supermajority_success_rate:.2f}%")

    return avg_mev_per_slot, avg_attestation_rate, supermajority_success_rate, proposed_block_times

In [None]:
# --- Simulation Execution ---
random.seed(0x06511)  # For reproducibility
np.random.seed(0x06511)  # For reproducibility in NumPy operations
# --- Define the Pool of Proposer Strategies ---
NUM_VALIDATORS = 100 # Example: Simulate 100 validators

# This pool contains all the different strategies that the Proposer can randomly choose from
all_proposer_strategies = [
    {"type": "fixed_delay", "delay_ms": 500}, # Strategy 1: Fixed delay at 0.5 seconds
    {"type": "fixed_delay", "delay_ms": 1500}, # Strategy 2: Faster fixed delay at 1.5 second
    {"type": "threshold_and_max_delay", "mev_threshold": 0.3, "max_delay_ms": 2500}, # Strategy 3: Threshold 0.4 ETH or max 2.5s delay
    {"type": "random_delay", "min_delay_ms": 1000, "max_delay_ms": 3000}, # Strategy 4: Random delay between 1s and 3s
    {"type": "threshold_and_max_delay", "mev_threshold": 0.4, "max_delay_ms": 3000}, # Strategy 5: More aggressive threshold 0.5 ETH or max 3s delay
]

# Run a single large simulation where the Proposer randomly picks a strategy each slot
avg_mev_overall, avg_attestation_rate, supermajority_success_rate, times_overall = \
    run_multi_validator_simulation(NUM_VALIDATORS, all_proposer_strategies, num_slots=1000) # Increased slots for better averages


--- Simulation Started: 100 Validators ---
Proposer will randomly choose from 5 strategies:
  Strategy 1: Type=fixed_delay, Delay=500ms
  Strategy 2: Type=fixed_delay, Delay=1500ms
  Strategy 3: Type=threshold_and_max_delay, Threshold=0.3000 ETH, Max Delay=2500ms
  Strategy 4: Type=random_delay, Random Range=1000-3000ms
  Strategy 5: Type=threshold_and_max_delay, Threshold=0.4000 ETH, Max Delay=3000ms
Attester Default Attestation Time: 4000ms.
Network Latency Model: 50ms + (distance_ratio * 2000ms)


--- Slot 1 ---
Slot 1 SUCCESS: Proposer earned 0.4000 ETH. Attestations: 85/99 (Needed 66).

--- Slot 2 ---
Slot 2 SUCCESS: Proposer earned 0.3040 ETH. Attestations: 99/99 (Needed 66).

--- Slot 3 ---
Slot 3 SUCCESS: Proposer earned 0.2400 ETH. Attestations: 99/99 (Needed 66).

--- Slot 4 ---
Slot 4 SUCCESS: Proposer earned 0.3200 ETH. Attestations: 99/99 (Needed 66).

--- Slot 5 ---
Slot 5 SUCCESS: Proposer earned 0.2400 ETH. Attestations: 99/99 (Needed 66).

--- Slot 6 ---
Slot 6 SUCCES