In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from collections import deque
from matplotlib.patches import Polygon
from sklearn.mixture import GaussianMixture
from scipy.interpolate import make_interp_spline
from statsmodels.nonparametric.smoothers_lowess import lowess
from sklearn.cluster import MiniBatchKMeans # For robust GMM initialization

In [4]:
# =============================================================================
# --- Simulation Definitions (Bird, Model, Utility & Thermodynamic Functions) ---
# =============================================================================

class Bird:
    """
    Represents a single bird agent in the flocking simulation.
    Each bird has a position, velocity, speed, and parameters governing its
    interaction with neighbors (cohesion, separation, alignment).
    It also maintains a history of its local ensemble size for thermodynamic calculations.
    """
    def __init__(self, pos, vel, speed, cohere_factor, separation,
                 separate_factor, match_factor, visual_distance, ensemble, history_length):
        self.pos = np.array(pos, dtype=np.float64)
        self.vel = np.array(vel, dtype=np.float64)
        # Ensure the initial velocity is a unit vector
        norm_vel = np.linalg.norm(self.vel)
        self.vel = self.vel / norm_vel if norm_vel > 0 else np.array([1.0, 0.0]) # Default if vel is zero
        self.speed = speed
        self.cohere_factor = cohere_factor
        self.separation = separation
        self.separate_factor = separate_factor
        self.match_factor = match_factor
        self.visual_distance = visual_distance
        self.ensemble = ensemble # Represents a local 'energy' or 'state' of the bird
        self.ensemble_history = deque(maxlen=history_length) # History for ensemble calculation

class Model:
    """
    Represents the overall flocking simulation environment.
    Manages all bird agents, their interactions, and the simulation's state over time.
    """
    def __init__(self, n_birds=100, speed=2.0, cohere_factor=0.4, separation=4.0,
                 separate_factor=0.25, match_factor=0.02, visual_distance=5.0,
                 ensemble=1, history_length=8, extent=(50, 50), seed=1234):
        self.iteration = 0 # Current simulation step
        self.extent = extent  # (width, height) of the simulation area
        self.agents = []      # List of all bird agents
        self.rng = np.random.default_rng(seed) # Random number generator for reproducibility
        
        # Initialize each bird agent with random positions and velocities
        for _ in range(n_birds):
            vel = self.rng.uniform(-1, 1, size=2)
            if np.linalg.norm(vel) == 0:
                vel = np.array([1.0, 0.0]) # Default velocity if randomly generated is zero
            else:
                vel = vel / np.linalg.norm(vel) # Normalize to unit vector
            pos = self.rng.uniform(low=[0, 0], high=extent, size=2) # Random position within extent
            
            bird = Bird(pos, vel, speed, cohere_factor, separation,
                        separate_factor, match_factor, visual_distance,
                        ensemble, history_length)
            self.agents.append(bird)

def move_agent(agent, model):
    """Moves the agent based on its velocity and speed, applying periodic boundary conditions."""
    agent.pos += agent.vel * agent.speed
    # Apply periodic boundary conditions (toroidal world)
    agent.pos[0] %= model.extent[0]
    agent.pos[1] %= model.extent[1]

def get_neighbors(agent, model, radius):
    """
    Returns a list of agents within the specified radius of the given agent,
    excluding the agent itself.
    """
    neighbors = []
    for other in model.agents:
        if other is agent:
            continue # Skip self
        if np.linalg.norm(agent.pos - other.pos) < radius:
            neighbors.append(other)
    return neighbors

def agent_step(agent, model):
    """
    Updates an individual agent's velocity based on Boids rules (cohesion, separation, alignment)
    and then moves the agent.
    """
    neighbors = get_neighbors(agent, model, agent.visual_distance)
    
    # Initialize rule vectors
    match = np.array([0.0, 0.0])    # Alignment
    separate = np.array([0.0, 0.0]) # Separation
    cohere = np.array([0.0, 0.0])   # Cohesion
    
    bird_speed = np.linalg.norm(agent.vel)
    
    # Apply Boids rules based on neighbors
    for neighbor in neighbors:
        heading = neighbor.pos - agent.pos
        cohere += heading # Sum of vectors towards neighbors for cohesion
        
        if np.linalg.norm(heading) < agent.separation:
            separate -= heading # Move away from close neighbors for separation
        
        match += neighbor.vel # Sum of neighbor velocities for alignment
        
        # The 'angle_between' calculation is present but 'ensemble_count' is unused.
        # Keeping the angle calculation for potential future use or if it was part of a discarded rule.
        neighbor_speed = np.linalg.norm(neighbor.vel)
        if bird_speed * neighbor_speed > 0:
            cos_angle = np.dot(agent.vel, neighbor.vel) / (bird_speed * neighbor_speed + 1e-9)
            cos_angle = np.clip(cos_angle, -1, 1)
            _ = np.arccos(cos_angle) # Calculate angle, but result is not used further in this function
    
    N = max(len(neighbors), 1) # Number of neighbors (at least 1 to avoid division by zero)
    
    # Update ensemble history and calculate current ensemble value
    agent.ensemble_history.append(len(neighbors))
    median_history = np.median(list(agent.ensemble_history)) if len(agent.ensemble_history) > 0 else 0
    agent.ensemble = 1 / (1 + median_history) # Ensemble value (e.g., inverse of local density)
    
    # Weight and normalize rule contributions
    cohere = (cohere / N) * agent.cohere_factor
    separate = (separate / N) * agent.separate_factor
    match = (match / N) * agent.match_factor
    
    # Combine current velocity with rule contributions
    new_vel = (agent.vel + cohere + separate + match) / 2.0 # Simple averaging
    norm_new_vel = np.linalg.norm(new_vel)
    agent.vel = new_vel / norm_new_vel if norm_new_vel > 0 else agent.vel # Normalize new velocity
    
    move_agent(agent, model) # Move the agent

def model_step(model):
    """Advances the simulation model by one iteration."""
    model.iteration += 1

def Step_Num(model):
    """Returns the current iteration number of the model."""
    return model.iteration

# -------------------------------
# Thermodynamic Quantities
# -------------------------------

def Model_Ensemble_Energy(model):
    """Calculates the total ensemble energy of the model (sum of individual bird ensembles)."""
    return sum(bird.ensemble for bird in model.agents)

def Model_Energy_DistributionFN(model, bins=10):
    """
    Calculates the probability distribution function (PDF) of ensemble energies
    across all birds in the model.
    """
    energies = np.array([bird.ensemble for bird in model.agents], dtype=float)
    counts, bin_edges = np.histogram(energies, bins=bins, density=False) # Get raw counts
    total_count = np.sum(counts)
    pdf = counts / total_count if total_count > 0 else np.zeros_like(counts, dtype=float) # Normalize to get PDF
    bin_midpoints = (bin_edges[:-1] + bin_edges[1:]) / 2
    energy_distribution = list(zip(bin_midpoints, pdf))
    return energy_distribution

def Model_Entropy(e_p_array):
    """
    Calculates the Shannon entropy of the given energy probability distribution.
    S = -sum(p * log(p))
    """
    S = 0.0
    for _, p in e_p_array:
        if p > 0: # Avoid log(0)
            S -= p * np.log(p)
    return S

def Model_Temperature(e_p_array):
    """
    Estimates the 'temperature' of the system based on the energy distribution.
    This is a simplified approximation, often related to the inverse of the slope
    of log(p) vs E.
    """
    sum_result = 0.0
    for i in range(1, len(e_p_array)):
        E_prev, p_prev = e_p_array[i - 1]
        E, p = e_p_array[i]
        delta_E = E - E_prev
        
        # Avoid division by zero or very small delta_E
        if abs(delta_E) < 1e-9: # If energy difference is negligible
            continue
        
        dp_dE = (p - p_prev) / delta_E
        
        # Avoid issues with extreme or zero slopes in dp_dE
        if abs(dp_dE) < 1e-9 or abs(dp_dE) > 1e8:
            continue
        
        if p > 0:
            sum_result += dp_dE * np.log(p)
            
    if sum_result == 0:
        return 0.0 # Return 0 if no meaningful sum can be computed
    else:
        T = -1 / sum_result
        return abs(T) # Temperature should be positive

def Model_Heat(S_current, S_prev, T_current):
    """Calculates the 'heat' transfer (dQ) based on change in entropy and temperature."""
    dS = S_current - S_prev
    dQ = abs(T_current) * dS
    return dQ

# Global variable to store the previous entropy for dQ calculation across steps
S_prev_global = None 
def Model_Thermo_State(model):
    """
    Calculates and returns a dictionary of key thermodynamic state variables:
    Entropy (S), Temperature (T), Heat (dQ), and the energy probability array.
    Manages the global S_prev_global for dQ calculation.
    """
    global S_prev_global
    e_p_array = Model_Energy_DistributionFN(model)
    S = Model_Entropy(e_p_array)
    T = Model_Temperature(e_p_array)
    dQ = 0.0
    if S_prev_global is not None:
        dQ = Model_Heat(S, S_prev_global, T)
    S_prev_global = S # Update global for next iteration
    return {"S": S, "T": T, "dQ": dQ, "e_p_array": e_p_array}

def Model_Internal_Energy(model):
    """Calculates the internal energy of the model based on the energy distribution."""
    e_p_array = Model_Energy_DistributionFN(model)
    tmp_U = sum(E * p for E, p in e_p_array)
    return tmp_U

def Model_Volume(model):
    """
    Estimates the 'volume' of the flock based on the average pairwise distance
    between birds. A larger average distance implies a larger volume.
    """
    birds = model.agents
    total_distance = 0.0
    count = 0
    n = len(birds)
    # Calculate sum of distances for all unique pairs
    for i in range(n - 1):
        for j in range(i + 1, n):
            total_distance += np.linalg.norm(birds[i].pos - birds[j].pos)
            count += 1
    
    average_distance = total_distance / count if count > 0 else 0.0
    # Volume is inversely proportional to average distance (a simple proxy)
    return 1 / average_distance if average_distance > 0 else 0.0

def Flock_Model_State(model):
    """Returns a simple state metric for the flock, currently the ensemble energy."""
    return Model_Ensemble_Energy(model)

In [5]:
def run_simulation(model, step_num, agent_data_funcs, model_data_funcs):
    """
    Runs the flocking simulation for a given number of steps.
    Records specified agent-level attributes and model-level function outputs at each step.
    
    Args:
        model: The Model instance to simulate.
        step_num: The number of simulation steps to run.
        agent_data_funcs: List of attribute names (strings) to record for each agent.
        model_data_funcs: List of functions (callable) to record model-level data.
    
    Returns:
        tuple: (agent_records, model_records)
            agent_records: List of lists, where each inner list contains dictionaries
                           of agent attributes for a given step.
            model_records: List of dictionaries, where each dictionary contains
                           model-level function results for a given step.
    """
    agent_records = []
    model_records = []
    for _ in range(step_num):
        # Update each agent's state
        for agent in model.agents:
            agent_step(agent, model)
        model_step(model) # Advance model iteration
        
        # Record agent data for the current step
        agents_snapshot = []
        for agent in model.agents:
            record = {attr: getattr(agent, attr) for attr in agent_data_funcs}
            agents_snapshot.append(record)
        agent_records.append(agents_snapshot)
        
        # Record model-level data for the current step
        model_record = {}
        for func in model_data_funcs:
            model_record[func.__name__] = func(model)
        model_records.append(model_record)
    return agent_records, model_records

In [6]:
# =============================================================================
# --- GMM Training and Analysis Functions ---
# =============================================================================

def extract_features_from_model_df(model_df):
    """
    Extracts key thermodynamic features (Energy, Entropy, Temperature, dQ, Volume)
    from a DataFrame of model-level simulation data.
    Adds a small amount of Gaussian noise to stabilize data for GMM training.
    """
    S_vals = model_df["Model_Thermo_State"].apply(lambda x: x.get("S", 0.0)).to_numpy()
    T_vals = model_df["Model_Thermo_State"].apply(lambda x: x.get("T", 0.0)).to_numpy()
    dQ_vals = model_df["Model_Thermo_State"].apply(lambda x: x.get("dQ", 0.0)).to_numpy()
    E_vals = model_df["Model_Ensemble_Energy"].to_numpy()
    V_vals = model_df["Model_Volume"].to_numpy()
    
    data = np.column_stack((E_vals, S_vals, T_vals, dQ_vals, V_vals))
    # Add small noise to prevent singular covariance matrices if data points are identical
    data += 1e-6 * np.random.randn(*data.shape)
    return data

def train_gmm(data, num_clusters):
    """Trains a Gaussian Mixture Model on the given feature data."""
    print(f"Training Gaussian Mixture Model with {num_clusters} clusters...")
    gmm = GaussianMixture(n_components=num_clusters, covariance_type='full', random_state=42)
    gmm.fit(data)
    return gmm

def enforce_thermodynamic_constraints(gmm, data):
    """
    Applies a simplified 'thermodynamic constraint' example.
    In a real physical system, certain thermodynamic laws (e.g., entropy increase
    in isolated systems) might be used to guide or refine clustering.
    This function demonstrates a conceptual check and a simple adjustment.
    """
    entropy_values = data[:, 1]  # Entropy is the second column (index 1)
    
    # Example constraint check: if entropy is not generally increasing (simplified)
    # A more rigorous check would involve statistical tests or physical models.
    if np.any(np.diff(entropy_values) < 0): 
        print("Warning: Entropy is not strictly non-decreasing in some segments. This might indicate non-equilibrium or open system behavior.")
    
    # Example 'adjustment': if average energy is high, suggest a 'cooler' state
    if np.mean(data[:, 0]) > 0.5: 
        print("Adjusting GMM: High average energy observed, potentially indicating a less cohesive state.")
        # Re-train GMM with slightly scaled data to conceptually 'cool down' the system
        num_components = gmm.n_components
        data_adjusted = data * 0.95 # Artificially scale down features
        gmm_adjusted = GaussianMixture(n_components=num_components, covariance_type='full', random_state=42)
        gmm_adjusted.fit(data_adjusted)
        return gmm_adjusted
    return gmm

# --- Plotting Functions for Thermodynamic Evolution ---

def plot_internal_energy(energy_array, title_prefix=""):
    """Plots the evolution of internal energy over simulation steps."""
    plt.figure(figsize=(10, 6))
    plt.plot(energy_array, label="Internal Energy")
    plt.title(f"{title_prefix}Internal Energy Evolution")
    plt.xlabel("Simulation Step")
    plt.ylabel("Internal Energy")
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    plt.show()

def plot_cumsum_dQ(dQ_array, title_prefix=""):
    """Plots the cumulative sum of heat transfer (dQ) over simulation steps."""
    plt.figure(figsize=(10, 6))
    plt.plot(np.cumsum(dQ_array), label="Cumulative dQ")
    plt.title(f"{title_prefix}Cumulative Sum of Heat (dQ)")
    plt.xlabel("Simulation Step")
    plt.ylabel("Cumulative dQ")
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    plt.show()

def plot_volume(volume_array, title_prefix=""):
    """Plots the evolution of the flock's 'volume' over simulation steps."""
    plt.figure(figsize=(10, 6))
    plt.plot(volume_array, label="Volume")
    plt.title(f"{title_prefix}Volume Evolution")
    plt.xlabel("Simulation Step")
    plt.ylabel("Volume")
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.legend()
    plt.show()

# --- Functions to Merge and Normalize Energy Distribution Arrays ---

def merge_e_p_array(ep_tmp, e_p_sums):
    """
    Merges energy probability distributions from a new simulation run
    into an accumulated sum. This is useful for averaging distributions.
    
    Args:
        ep_tmp (list): List of (E, p) tuples for each step of the current run.
        e_p_sums (list): Accumulated list of (E, p) tuples from previous runs.
    
    Returns:
        list: The updated accumulated list of (E, p) tuples.
    """
    new_e_p_array = []
    for i in range(len(ep_tmp)):
        # Convert to dictionary for efficient merging by energy value
        merged_dict = dict(e_p_sums[i]) 
        for E, p in ep_tmp[i]:
            merged_dict[E] = merged_dict.get(E, 0.0) + p
        
        # Convert back to list of tuples and sort by energy for consistent order
        new_e_p_array.append(sorted(list(merged_dict.items())))
    return new_e_p_array

def normalize_e_p_array(e_p_array):
    """
    Normalizes a list of energy probability arrays so that the probabilities
    for each step sum to 1.
    """
    normalized = []
    for arr in e_p_array:
        total_probability = sum(p for (_, p) in arr)
        if total_probability != 0:
            normalized.append([(E, p / total_probability) for (E, p) in arr])
        else:
            normalized.append(arr) # If total_probability is 0, append as is
    return normalized

# --- Bird Marker and Plot Function for Enhanced Visualization ---

# Define a base polygon shape for a bird marker
bird_polygon = np.array([[-1, -1], [2, 0], [-1, 1]]) # Simple triangle

def bird_marker(bird):
    """
    Rotates the base bird polygon based on the bird's velocity direction
    and translates it to the bird's current position.
    """
    phi = np.arctan2(bird.vel[1], bird.vel[0]) # Angle of velocity vector
    R = np.array([[np.cos(phi), -np.sin(phi)],
                  [np.sin(phi),  np.cos(phi)]]) # Rotation matrix
    rotated = (R @ bird_polygon.T).T # Apply rotation
    rotated += bird.pos  # Apply translation to bird's position
    return rotated

def plot_birds_iter(model, heat_tmp, iteration_tmp):
    """
    Plots all bird agents as rotated polygons, indicating their position and direction.
    Includes current heat and iteration in the title.
    """
    title_str = f"Avg. Heat={heat_tmp:.2f} at Iteration={iteration_tmp}"
    fig, ax = plt.subplots(figsize=(8, 8))
    for bird in model.agents:
        poly_points = bird_marker(bird)
        polygon = Polygon(poly_points, closed=True, edgecolor='k', facecolor='gray', alpha=0.8)
        ax.add_patch(polygon)
    ax.set_xlim(0, model.extent[0])
    ax.set_ylim(0, model.extent[1])
    ax.set_title(title_str, fontsize=16)
    ax.set_aspect('equal') # Ensure aspect ratio is equal for correct visualization
    plt.grid(True, linestyle=':', alpha=0.5)
    plt.show()
    return fig