In [1]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import matplotlib
import pandas as pd

In [2]:
# --- Simulation Parameters ---
# These are now set in the run_... functions
# NUM_MOTILE = 50
# NUM_SUBMOTILE = 50
BOX_SIZE = 500.0
DT = 0.1 # Timestep

# --- Global Interaction Parameters (Same for all) ---
LJ_STRENGTH = 10000.0     # B: Short-range Lennard-Jones repulsion
ALIGNMENT_RADIUS = 5.0   # R_align: Radius for neighbors to cause alignment
ALIGNMENT_STRENGTH = 0.05 # K: How fast they align (0 to 1)
PHASE_LOCK_STRENGTH = 0.01
LJ_RADIUS = ALIGNMENT_RADIUS/10
MOBILITY = 1.0           # mu: How much neighbor forces push them
MAX_V_FORCE = 5.0         # Stability cap for velocity
COS_45 = np.cos(np.pi / 4.0) # 0.707...

In [2]:
df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 4]})
df.to_parquet("much_smaller_file.parquet", compression='zstd')

In [3]:
class Particle:
    """Represents a single active particle (a short rod)."""
    def __init__(self, pid, ptype, phase_lock=False):
        self.pid = pid
        # Physical state
        self.pos = np.random.rand(2) * BOX_SIZE
        self.angle = np.random.rand() * 2 * np.pi
        self.dir = np.array([np.cos(self.angle), np.sin(self.angle)])
        
        self.ptype = ptype # 'motile' or 'submotile'

        # --- Set properties based on type ---
        if self.ptype == 'motile':
            self.period = 10.0
            self.pusher_duration = 5.0  # 50% of 10.0
            self.pusher_strength = 250.0
            self.puller_strength = 25.0
            # --- FIX: Reduce speed to allow interactions to "win" ---
            self.propulsion_speed = 1.0 # Was 10.0
            self.color = 'C0' # Matplotlib default blue
        else: # 'submotile'
            self.period = 17.0
            self.pusher_duration = 8.5 # 50% of 17.0
            self.pusher_strength = 25.0 # 10% of motile
            self.puller_strength = 2.5  # 10% of motile
            # --- FIX: Reduce speed 10x to match ---
            self.propulsion_speed = 0.5   # Was 1.0
            self.color = 'C1' # Matplotlib default orange
        
        # Add a random phase offset so they don't all start in sync
        self.phase_offset = np.random.rand()

        # Forces and Torques
        self.force = np.zeros(2)
        self.neighbor_dirs = []
        if phase_lock:
            self.neighbor_phase = []

    def update_dir(self):
        """Recalculate direction vector from angle."""
        self.dir[0] = np.cos(self.angle)
        self.dir[1] = np.sin(self.angle)
        
    def reset_forces(self, phase_lock):
        """Reset forces and alignment data for the new step."""
        self.force.fill(0.0)
        self.neighbor_dirs = []
        if phase_lock:
            self.neighbor_phase = []

class Simulation:
    """Manages the simulation state and physics updates."""
    def __init__(self, particle_list):
        self.particles = particle_list
        self.time = 0.0
        self.data_log = [] # List to store data for DataFrame

    def capture_state(self):
        """Records the current state of all particles to the log."""
        for p in self.particles:
            # Determine current state (Pusher vs Puller) for recording
            curr_time = (self.time + p.phase_offset * p.period) % p.period
            state = "pusher" if curr_time < p.pusher_duration else "puller"

            self.data_log.append({
                "Time": round(self.time, 2),
                "Particle_ID": p.pid,
                "Type": p.ptype,
                "Pos_X": p.pos[0],
                "Pos_Y": p.pos[1],
                "Angle": p.angle,
                "Dir_X": p.dir[0],
                "Dir_Y": p.dir[1],
                "Phase_Offset": p.phase_offset,
                "Current_State": state,
                "Force_X": p.force[0],
                "Force_Y": p.force[1]
            })
            
    def calculate_interactions(self, phase_lock=False):
        """Calculates all pairwise interactions."""
        for p in self.particles:
            p.reset_forces(phase_lock)

        for i in range(len(self.particles)):
            for j in range(i + 1, len(self.particles)):
                p1 = self.particles[i]
                p2 = self.particles[j]

                r_vec = p2.pos - p1.pos
                
                # Apply periodic boundary conditions to find closest vector
                r_vec[0] -= BOX_SIZE * np.round(r_vec[0] / BOX_SIZE)
                r_vec[1] -= BOX_SIZE * np.round(r_vec[1] / BOX_SIZE)
                
                r_mag_sq = r_vec[0]**2 + r_vec[1]**2

                # --- 1. Short-range Lennard-Jones Repulsion ---
                if 1e-4 < r_mag_sq < ALIGNMENT_RADIUS**2: # Only interact if within range
                    r_mag = np.sqrt(r_mag_sq)
                    r_hat = r_vec / r_mag

                    if 1e-4 < r_mag_sq < LJ_RADIUS**2:
                        # (F_lj = B / r^13)
                        lj_mag = LJ_STRENGTH / (r_mag_sq**6.5)
                        force_lj = lj_mag * r_hat
                    else:
                        force_lj = 0

                    # --- 2. Anisotropic "Pusher/Puller" Force ---
                    # Each particle's force is based on its OWN state
                    
                    # Get state for p1
                    p1_time = (self.time + p1.phase_offset*p1.period) % p1.period
                    p1_is_pusher = p1_time < p1.pusher_duration
                    
                    # Get state for p2
                    p2_time = (self.time + p2.phase_offset*p2.period) % p2.period
                    p2_is_pusher = p2_time < p2.pusher_duration

                    # Get dot products
                    dot_1 = np.dot(p1.dir, r_hat)  # p1's orientation relative to r_vec
                    dot_2 = np.dot(p2.dir, -r_hat) # p2's orientation relative to -r_vec

                    # Check if in cone for p1 and p2
                    in_cone_1 = np.abs(dot_1) > COS_45
                    in_cone_2 = np.abs(dot_2) > COS_45

                    # Base pusher logic: Repel in cone (1), Attract at side (-1)
                    base_pusher_force_sign_1 = 1.0 if in_cone_1 else -1.0
                    base_pusher_force_sign_2 = 1.0 if in_cone_2 else -1.0

                    # Calculate final force mag for p1
                    aniso_mag_1 = (base_pusher_force_sign_1 * p1.pusher_strength) if p1_is_pusher else \
                                  (-base_pusher_force_sign_1 * p1.puller_strength)
                    
                    # Calculate final force mag for p2
                    aniso_mag_2 = (base_pusher_force_sign_2 * p2.pusher_strength) if p2_is_pusher else \
                                  (-base_pusher_force_sign_2 * p2.puller_strength)
                                  
                    # Force = (A_aniso / r^2) * r_hat
                    # Force p1 exerts on p2
                    force_1_on_2 = (aniso_mag_1 / r_mag_sq) * r_hat
                    # Force p2 exerts on p1
                    force_2_on_1 = (aniso_mag_2 / r_mag_sq) * (-r_hat)


                    # --- 3. Apply Forces (Active Matter: Non-symmetric!) ---
                    # p1 feels LJ and the force from p2
                    p1.force += -force_lj + force_2_on_1
                    # p2 feels LJ and the force from p1
                    p2.force += force_lj + force_1_on_2

                    # --- 4. Gather Alignment Info ---
                    p1.neighbor_dirs.append(p2.dir)
                    p2.neighbor_dirs.append(p1.dir)

                    if phase_lock:
                        p1.neighbor_phase.append(p2.phase_offset)
                        p2.neighbor_phase.append(p1.phase_offset)

    def update_particles(self, phase_lock=False):
        """Update orientation and position for all particles."""
        
        for p in self.particles:
            
            # --- 1. Update Orientation (Torque) ---
            if p.neighbor_dirs:
                # Calculate average direction of neighbors
                avg_neighbor_dir = np.sum(p.neighbor_dirs, axis=0)
                avg_angle = np.arctan2(avg_neighbor_dir[1], avg_neighbor_dir[0])
                
                # Find the shortest angle to align
                angle_diff = avg_angle - p.angle
                angle_diff = (angle_diff + np.pi) % (2 * np.pi) - np.pi
                
                # Add a small random noise
                noise = np.random.normal(scale=np.pi/3)
                
                p.angle += (angle_diff * ALIGNMENT_STRENGTH + noise) * DT
                p.update_dir()

            # --- 2. Update Position (Translation) ---
            # Overdamped model: dx/dt = v_propulsion + mu * F_interaction
            v_propel = p.dir * p.propulsion_speed # Use particle's own speed
            v_force = p.force * MOBILITY
            
            # STABILITY FIX: Cap the maximum velocity from force
            v_force_mag = np.linalg.norm(v_force)
            if v_force_mag > MAX_V_FORCE:
                v_force = (v_force / v_force_mag) * MAX_V_FORCE

            p.pos += (v_propel + v_force) * DT

            # Apply periodic boundary conditions
            p.pos = p.pos % BOX_SIZE

            # --- 3. Phase locking ---
            if phase_lock:
                avg_phase = np.sum(p.neighbor_phase)
                
                # Find the shortest phase to align
                phase_diff = (avg_phase - p.phase_offset) % 1
                
                # Add a small random noise
                noise_phase = np.random.normal(scale=1/6)
                
                p.phase_offset += (phase_diff * PHASE_LOCK_STRENGTH + noise_phase) * DT
        
        self.time += DT
                
    def step(self, phase_lock=False):
        """Run one full simulation step."""
        # This function is now simpler, just one step
        self.calculate_interactions(phase_lock)
        self.capture_state()
        self.update_particles(phase_lock)

def setup_animation(sim, title):
    """Initialize the plot for animation."""
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xlim(0, BOX_SIZE)
    ax.set_ylim(0, BOX_SIZE)
    ax.set_aspect('equal')
    ax.set_title(title)
    
    # Get initial positions, directions, and colors
    pos = np.array([p.pos for p in sim.particles])
    # --- VISUAL FIX: Manually scale dirs to make arrows bigger ---
    dirs = np.array([p.dir for p in sim.particles]) * 7.0 # Scale for visibility
    colors = [p.color for p in sim.particles]

    # Create the Quiver plot
    # We use color to distinguish the two populations
    quiver = ax.quiver(pos[:, 0], pos[:, 1], dirs[:, 0], dirs[:, 1], 
                     color=colors, 
                     headwidth=3, headlength=4, 
                     scale=80) # Adjust scale to work with new dir scaling
                     
    time_text = ax.text(0.02, 0.95, '', transform=ax.transAxes, fontsize=12)
    
    return fig, ax, quiver, time_text

def update_animation(frame, sim, quiver, time_text, phase_lock=False):
    """Update function for each animation frame."""
    # Run 5 simulation steps per frame to speed it up
    for _ in range(5):
        sim.step(phase_lock)
    
    pos = np.array([p.pos for p in sim.particles])
    # --- VISUAL FIX: Must apply same scaling in update ---
    dirs = np.array([p.dir for p in sim.particles]) * 7.0 # Scale for visibility
    colors = [p.color for p in sim.particles]

    quiver.set_offsets(pos)
    quiver.set_UVC(dirs[:, 0], dirs[:, 1])
    quiver.set_color(colors)
    
    time_text.set_text(f'Time: {sim.time:.1f}')
    return quiver, time_text

# --- Main execution functions ---

def run_homogeneous_simulation(num_par, phase_lock=False):
    """Runs and saves the homogeneous simulation."""
    print(f"Setting up 'Homogeneous Pusher' simulation ({num_par} 'motile').")
    particle_list = [Particle(i, 'motile', phase_lock) for i in range(num_par)]
    sim = Simulation(particle_list)
    fig, ax, quiver, time_text = setup_animation(sim, f"Homogeneous Simulation ({num_par} Motile, phase_lock={phase_lock})")

    anim = FuncAnimation(fig, update_animation, 
                         fargs=(sim, quiver, time_text, phase_lock), 
                         frames=2000, interval=20, blit=True)
    plt.close(fig)

    file_name = f'homogeneous_simulation_{num_par}motile_phaselock={phase_lock}' 
    print(f"Saving animation to {file_name}.mp4.")
    try:
        anim.save(f'{file_name}.mp4', writer='ffmpeg', fps=280, dpi=150)
        print(f"Done saving {file_name}.mp4.")
    except Exception as e:
        print(f"Error saving video (make sure ffmpeg is installed): {e}")

    # --- CSV EXPORT ---
    print("Exporting data to CSV.")
    df = pd.DataFrame(sim.data_log)
    df.to_parquet(f"{file_name}.parquet", compression='zstd')
    print(f"Data saved to {file_name}.parquet.\n")

def run_heterogeneous_simulation(num_par, submotile_fraction, phase_lock=False):
    """Runs and saves the heterogeneous simulation."""
    num_submotile = round(submotile_fraction*num_par)
    num_motile = num_par - num_submotile
    print(f"Setting up 'Heterogeneous Pusher' simulation ({(num_motile)} Motile, {num_submotile} Submotile, phase_lock={phase_lock})")
    particle_list = [Particle(i, 'motile' if i<num_motile else 'submotile', phase_lock) for i in range(num_par)]
        
    sim = Simulation(particle_list)
    fig, ax, quiver, time_text = setup_animation(sim, f"Heterogeneous Simulation ({num_motile} Motile, {num_submotile} Submotile, phase_lock={phase_lock})")

    anim = FuncAnimation(fig, update_animation, 
                         fargs=(sim, quiver, time_text, phase_lock), 
                         frames=2000, interval=20, blit=True)
    plt.close(fig)

    file_name = f'heterogeneous_simulation_{num_motile}motile_{num_submotile}submotile_phaselock={phase_lock}'
    print(f"Saving animation to {file_name}.mp4.")
    try:
        anim.save(f'{file_name}.mp4', writer='ffmpeg', fps=280, dpi=150)
        print(f"Done saving {file_name}.mp4.")
    except Exception as e:
        print(f"Error saving video (make sure ffmpeg is installed): {e}")

    # --- CSV EXPORT ---
    print("Exporting data to CSV...")
    df = pd.DataFrame(sim.data_log)
    df.to_parquet(f"{file_name}.parquet", compression='zstd')
    print(f"Data saved to {file_name}.parquet.\n")

In [4]:
matplotlib.use('Agg') 

run_homogeneous_simulation(1000, phase_lock=True)
run_heterogeneous_simulation(1000, 0.7, phase_lock=True)
print("Both simulations complete.")

Setting up 'Homogeneous Pusher' simulation (200 'motile').
Saving animation to homogeneous_simulation_200motile_phaselock=True.mp4.
Done saving homogeneous_simulation_200motile_phaselock=True.mp4.
Exporting data to CSV.
Data saved to homogeneous_simulation_200motile_phaselock=True.csv.

Setting up 'Heterogeneous Pusher' simulation (60 Motile, 140 Submotile, phase_lock=True)
Saving animation to heterogeneous_simulation_60motile_140submotile_phaselock=True.mp4.


KeyboardInterrupt: 