Retrocausality project

In [None]:
#%pip install mesa
#pip install ipykernel
#pip install ipywidgets --upgrade

In [2]:
from mesa import Agent, Model
from mesa.space import MultiGrid
import random

### Initial setup example:

In [2]:
class TimeAgent(Agent):
    def __init__(self, model):
        super().__init__(model) # Mesa 3: only model is passed
        self.model = model  # Explicitly store model reference if needed
        self.positions = [] # Will be populated with initial position in TimeModel

    def step(self):
        attempts = 0
        max_attempts = 10  # Prevent infinite loops
        while attempts < max_attempts:
            x, y = self.pos
            move = random.choice([(0, 1), (0, -1), (1, 0), (-1, 0)]) # Move randomly: up, down, left, right
            new_pos = (x + move[0], y + move[1])
            if (0 <= new_pos[0] < self.model.grid.width) and (0 <= new_pos[1] < self.model.grid.height):
                if new_pos not in self.model.occupied_positions or new_pos == self.pos:
                    self.model.grid.move_agent(self, new_pos)
                    self.model.occupied_positions.discard(self.pos) # Remove old position
                    self.model.occupied_positions.add(new_pos) # Add new position
                    self.positions.append(new_pos) # Record new position
                    break
            attempts += 1
        # If no valid move is found after max_attempts, agent stays put (position unchanged)
        else:
            self.positions.append(self.pos)  # Explicitly stay put

          
class TimeModel(Model):
    def __init__(self):
        super().__init__()
        self.grid = MultiGrid(10, 10, False)  # 10x10 grid, torus disabled
        self.schedule = []  # Manual agent list
        self.random = random.Random()
        self.step_count = 0  # Track step number
        self.occupied_positions = set()  # Track occupied positions
        
        # Create 5 agents with unique random starting positions
        available_positions = [(x, y) for x in range(10) for y in range(10)]  # All 10x10 positions
        self.random.shuffle(available_positions)  # Randomize order
        for i in range(5):
            agent = TimeAgent(self)
            start_pos = available_positions[i]  # Take a unique position
            self.grid.place_agent(agent, start_pos)
            agent.pos = start_pos  # Explicitly set pos (Mesa 3 compatibility)
            
            agent.positions.append(start_pos)  # Record initial position
            self.occupied_positions.add(start_pos)
            self.schedule.append(agent)    

    def step(self):
        # Reset occupied positions for this step (will be rebuilt)
        self.occupied_positions.clear()
        for agent in self.schedule:
            self.occupied_positions.add(agent.pos)
        
        random.shuffle(self.schedule)  # Random activation
        for agent in self.schedule:
            agent.step()
        self.step_count += 1  # Increment before printing    
        # self.print_positions()
        

    # Return the list of position histories for all agents
    def get_positions(self):
        sorted_agents = sorted(self.schedule, key=lambda a: a.unique_id)
        return [agent.positions for agent in sorted_agents]
    
    # Print history positions of all agents
    def print_positions(self):
        sorted_agents = sorted(self.schedule, key=lambda a: a.unique_id)
        for agent in sorted_agents:
            print(f"Agent {agent.unique_id}: {agent.positions}")


model = TimeModel()

for _ in range(5):
    model.step()
model.print_positions()

Agent 1: [(6, 0), (5, 0), (5, 1), (5, 2), (5, 3), (4, 3)]
Agent 2: [(7, 4), (7, 5), (6, 5), (7, 5), (7, 4), (6, 4)]
Agent 3: [(6, 8), (6, 9), (7, 9), (6, 9), (6, 8), (6, 7)]
Agent 4: [(7, 6), (8, 6), (7, 6), (6, 6), (7, 6), (7, 7)]
Agent 5: [(5, 1), (4, 1), (4, 0), (5, 0), (5, 1), (6, 1)]


In [2]:
import torch
print(torch.__version__)

2.6.0+cpu


### 30 agents in 10x10 grid

### Situation 1: Turn Left if Occupied

In [9]:
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
import pickle
import random
from mesa import Agent, Model
from mesa.space import MultiGrid

# TimeAgent with New Rule
class TimeAgent(Agent):
    def __init__(self, model, use_rule=True):
        super().__init__(model)
        self.model = model
        self.positions = []
        self.use_rule = use_rule

    def step(self):
        global trained_model
        x, y = self.pos
        moves = [(0, 1), (-1, 0), (0, -1), (1, 0)]  # Up, Left, Down, Right
        
        avoid_move = None
        if len(self.positions) >= 5 and trained_model is not None and self.use_rule:
            seq = self.positions[-5:]
            others = [a.pos for a in self.model.schedule if a != self]
            # Prepare input: 5 steps, each with (x, y) + 10 features
            input_data = []
            for pos in seq:
                step_features = list(pos)  # [x, y]
                # Add 5 other agents' relative positions (10 values)
                for i in range(min(5, len(others))):
                    ox, oy = others[i]
                    step_features.extend([ox - pos[0], oy - pos[1]])
                while len(step_features) < 12:  # Pad to 12
                    step_features.extend([0, 0])
                input_data.append(step_features[:12])
            input_data = torch.tensor([input_data], dtype=torch.float32) / 9.0
            with torch.no_grad():
                pred_dir = trained_model(input_data).argmax().item()
            avoid_move = list(reverse_map.keys())[pred_dir]

        attempts = 0
        max_attempts = 10
        move_idx = random.randint(0, 3)

        if self.use_rule:
            while attempts < max_attempts:
                move = moves[move_idx]
                if move == avoid_move and attempts < max_attempts - 1:
                    move_idx = (move_idx + 1) % 4
                    attempts += 1
                    continue
                new_pos = (x + move[0], y + move[1])
                if (0 <= new_pos[0] < self.model.grid.width) and (0 <= new_pos[1] < self.model.grid.height):
                    if new_pos not in self.model.occupied_positions or new_pos == self.pos:
                        self.model.grid.move_agent(self, new_pos)
                        self.model.occupied_positions.discard(self.pos)
                        self.model.occupied_positions.add(new_pos)
                        self.positions.append(new_pos)
                        break
                left_idx = (move_idx + 1) % 4
                left_pos = (x + moves[left_idx][0], y + moves[left_idx][1])
                if (0 <= left_pos[0] < self.model.grid.width) and (0 <= left_pos[1] < self.model.grid.height) and left_pos not in self.model.occupied_positions:
                    self.model.grid.move_agent(self, left_pos)
                    self.model.occupied_positions.discard(self.pos)
                    self.model.occupied_positions.add(left_pos)
                    self.positions.append(left_pos)
                    break
                opp_idx = (move_idx + 2) % 4
                opp_pos = (x + moves[opp_idx][0], y + moves[opp_idx][1])
                if (0 <= opp_pos[0] < self.model.grid.width) and (0 <= opp_pos[1] < self.model.grid.height) and opp_pos not in self.model.occupied_positions:
                    self.model.grid.move_agent(self, opp_pos)
                    self.model.occupied_positions.discard(self.pos)
                    self.model.occupied_positions.add(opp_pos)
                    self.positions.append(opp_pos)
                    break
                self.positions.append(self.pos)
                break
        else:
            while attempts < max_attempts:
                move = random.choice(moves)
                new_pos = (x + move[0], y + move[1])
                if (0 <= new_pos[0] < self.model.grid.width) and (0 <= new_pos[1] < self.model.grid.height):
                    if new_pos not in self.model.occupied_positions or new_pos == self.pos:
                        self.model.grid.move_agent(self, new_pos)
                        self.model.occupied_positions.discard(self.pos)
                        self.model.occupied_positions.add(new_pos)
                        self.positions.append(new_pos)
                        break
                attempts += 1
            else:
                self.positions.append(self.pos)

# TimeModel
class TimeModel(Model):
    def __init__(self, use_rule=True):
        super().__init__()
        self.grid = MultiGrid(10, 10, False)
        self.schedule = []
        self.random = random.Random()
        self.step_count = 0
        self.occupied_positions = set()
        self.use_rule = use_rule
        
        available_positions = [(x, y) for x in range(10) for y in range(10)]
        self.random.shuffle(available_positions)
        for i in range(30):
            agent = TimeAgent(self, use_rule=self.use_rule)
            start_pos = available_positions[i]
            self.grid.place_agent(agent, start_pos)
            agent.pos = start_pos
            agent.positions.append(start_pos)
            self.occupied_positions.add(start_pos)
            self.schedule.append(agent)

    def step(self):
        self.occupied_positions.clear()
        for agent in self.schedule:
            self.occupied_positions.add(agent.pos)
        random.shuffle(self.schedule)
        for agent in self.schedule:
            agent.step()
        self.step_count += 1

    def get_positions(self):
        sorted_agents = sorted(self.schedule, key=lambda a: a.unique_id)
        return [(agent.positions, [a.pos for a in sorted_agents if a != agent]) for agent in sorted_agents]

# TCN
class TCN(nn.Module):
    def __init__(self, input_size=12, output_size=5, num_channels=[64, 64, 64], kernel_size=5, dropout=0.2):
        super(TCN, self).__init__()
        layers = []
        for i in range(len(num_channels)):
            dilation = 2 ** i
            in_channels = input_size if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            padding = (kernel_size - 1) * dilation
            layers.append(nn.Conv1d(in_channels, out_channels, kernel_size, padding=padding, dilation=dilation))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            if padding > 0:
                layers.append(nn.ConstantPad1d((-padding, 0), 0))
        self.tcn = nn.Sequential(*layers)
        self.fc = nn.Linear(num_channels[-1], output_size)

    def forward(self, x):
        x = x.transpose(1, 2)  # [batch_size, input_size, seq_len]
        out = self.tcn(x)
        out = out[:, :, -1]  # Last time step
        return out

# Data collection
def collect_and_save_data(num_runs=2000, filename="abm_data_with_rule.pkl", use_rule=True):
    all_data = []
    for run in range(num_runs):
        model = TimeModel(use_rule=use_rule)
        positions_history = []
        for _ in range(10):
            model.step()
            positions_history.append(model.get_positions())
        all_data.append(positions_history)
        if (run + 1) % 100 == 0:
            print(f"Completed {run + 1}/{num_runs} runs")
    with open(filename, 'wb') as f:
        pickle.dump(all_data, f)
    print(f"Saved {len(all_data)} runs to {filename}")
    return all_data

# Prepare data (Fixed)
def prepare_training_data(data, seq_len=5):
    X, y = [], []
    direction_map = {(0, 1): 0, (0, -1): 1, (1, 0): 2, (-1, 0): 3, (0, 0): 4}
    for run_data in data:
        for step_idx in range(len(run_data) - seq_len):
            step_data = run_data[step_idx:step_idx + seq_len]
            for positions, others in step_data[-1]:
                if len(positions) < seq_len + 1:
                    continue
                seq = positions[-seq_len - 1:-1]  # 5 steps
                seq_data = []
                for pos in seq:
                    step_features = list(pos)  # [x, y]
                    # Add 5 other agents' relative positions (10 values)
                    for i in range(min(5, len(others))):
                        ox, oy = others[i]
                        step_features.extend([ox - pos[0], oy - pos[1]])
                    while len(step_features) < 12:  # Pad to 12
                        step_features.extend([0, 0])
                    seq_data.append(step_features[:12])
                X.append(seq_data)
                # Target
                x1, y1 = positions[-2]
                x2, y2 = positions[-1]
                direction = (x2 - x1, y2 - y1)
                y.append(direction_map[direction])
    X = torch.tensor(X, dtype=torch.float32) / 9.0
    y = torch.tensor(y, dtype=torch.long)
    return X, y

# Training
def train_tcn(X, y, epochs=300, batch_size=32, learning_rate=0.001, patience=20):
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
    train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
    val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)

    model = TCN(input_size=12, output_size=5, num_channels=[64, 64, 64], kernel_size=5, dropout=0.2)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_correct = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            train_correct += (outputs.argmax(dim=1) == batch_y).sum().item()

        model.eval()
        val_loss = 0
        val_correct = 0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                outputs = model(batch_X)
                val_loss += criterion(outputs, batch_y).item()
                val_correct += (outputs.argmax(dim=1) == batch_y).sum().item()

        scheduler.step()
        
        train_loss_avg = train_loss / len(train_loader)
        val_loss_avg = val_loss / len(val_loader)
        train_acc = train_correct / len(X_train)
        val_acc = val_correct / len(X_val)
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss_avg:.4f}, Val Loss: {val_loss_avg:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, LR: {scheduler.get_last_lr()[0]:.6f}")
        
        if val_loss_avg < best_val_loss:
            best_val_loss = val_loss_avg
            epochs_no_improve = 0
            best_model_state = model.state_dict()
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping triggered after {epoch+1} epochs")
                break
    
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    torch.save(model.state_dict(), "tcn_model.pth")
    print("Model saved to tcn_model.pth")
    return model

# Global variables
trained_model = None
direction_map = {0: "Up (0, 1)", 1: "Down (0, -1)", 2: "Right (1, 0)", 3: "Left (-1, 0)", 4: "No move (0, 0)"}
reverse_map = {(0, 1): 0, (0, -1): 1, (1, 0): 2, (-1, 0): 3, (0, 0): 4}

# Train with rule
sequences_with_rule = collect_and_save_data(num_runs=2000, filename="abm_data_with_rule.pkl", use_rule=True)
X, y = prepare_training_data(sequences_with_rule)
print(f"Loaded {len(sequences_with_rule)} runs with rule")
print(f"Training data shape: X={X.shape}, y={y.shape}")

print("\nTraining TCN (Direction Prediction with Rule)...")
trained_model = train_tcn(X, y, epochs=300, batch_size=32, learning_rate=0.001, patience=20)

# Compare with vs. without rule
def compare_rule_effects(runs=100, steps=10):
    no_rule_model = TimeModel(use_rule=False)
    no_rule_moves = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0}
    for _ in range(runs):
        for _ in range(steps):
            no_rule_model.step()
        for agent in no_rule_model.schedule:
            for i in range(len(agent.positions) - 1):
                dx, dy = agent.positions[i + 1][0] - agent.positions[i][0], agent.positions[i + 1][1] - agent.positions[i][1]
                move_idx = reverse_map.get((dx, dy), 4)
                no_rule_moves[move_idx] += 1

    with_rule_model = TimeModel(use_rule=True)
    with_rule_moves = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0}
    for _ in range(runs):
        for _ in range(steps):
            with_rule_model.step()
        for agent in with_rule_model.schedule:
            for i in range(len(agent.positions) - 1):
                dx, dy = agent.positions[i + 1][0] - agent.positions[i][0], agent.positions[i + 1][1] - agent.positions[i][1]
                move_idx = reverse_map.get((dx, dy), 4)
                with_rule_moves[move_idx] += 1

    total_no_rule = sum(no_rule_moves.values())
    total_with_rule = sum(with_rule_moves.values())
    print("\nNo Rule Move Frequencies:", {direction_map[k]: v/total_no_rule for k, v in no_rule_moves.items()})
    print("With Rule Move Frequencies:", {direction_map[k]: v/total_with_rule for k, v in with_rule_moves.items()})

# Run comparison
compare_rule_effects(runs=100, steps=10)

Completed 100/2000 runs
Completed 200/2000 runs
Completed 300/2000 runs
Completed 400/2000 runs
Completed 500/2000 runs
Completed 600/2000 runs
Completed 700/2000 runs
Completed 800/2000 runs
Completed 900/2000 runs
Completed 1000/2000 runs
Completed 1100/2000 runs
Completed 1200/2000 runs
Completed 1300/2000 runs
Completed 1400/2000 runs
Completed 1500/2000 runs
Completed 1600/2000 runs
Completed 1700/2000 runs
Completed 1800/2000 runs
Completed 1900/2000 runs
Completed 2000/2000 runs
Saved 2000 runs to abm_data_with_rule.pkl
Loaded 2000 runs with rule
Training data shape: X=torch.Size([300000, 5, 12]), y=torch.Size([300000])

Training TCN (Direction Prediction with Rule)...
Epoch [10/300], Train Loss: 3.7390, Val Loss: 3.6636, Train Acc: 0.2126, Val Acc: 0.2430, LR: 0.001000
Epoch [20/300], Train Loss: 3.7366, Val Loss: 3.6650, Train Acc: 0.2127, Val Acc: 0.2430, LR: 0.001000
Epoch [30/300], Train Loss: 3.7356, Val Loss: 3.6635, Train Acc: 0.2131, Val Acc: 0.2432, LR: 0.001000
Epoch 

### Situation 2: Cycling through directions (next index modulo 4)

In [None]:
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
import pickle
import random
from mesa import Agent, Model
from mesa.space import MultiGrid

# TimeAgent with Simpler Rule
class TimeAgent(Agent):
    def __init__(self, model, use_rule=True):
        super().__init__(model)
        self.model = model
        self.positions = []
        self.use_rule = use_rule

    def step(self):
        global trained_model
        x, y = self.pos
        moves = [(0, 1), (-1, 0), (0, -1), (1, 0)]
        
        avoid_move = None
        if len(self.positions) >= 5 and trained_model is not None and self.use_rule:
            seq = self.positions[-5:]
            others = [a.pos for a in self.model.schedule if a != self]
            input_data = []
            for pos in seq:
                step_features = list(pos)
                for ox, oy in others:  # All 29
                    step_features.extend([ox - pos[0], oy - pos[1]])
                while len(step_features) < 60:
                    step_features.extend([0, 0])
                input_data.append(step_features[:60])
            input_data = torch.tensor([input_data], dtype=torch.float32) / 9.0
            with torch.no_grad():
                pred_dir = trained_model(input_data).argmax().item()
            avoid_move = list(reverse_map.keys())[pred_dir]

        attempts = 0
        max_attempts = 10
        move_idx = random.randint(0, 3)

        if self.use_rule:
            while attempts < max_attempts:
                move = moves[move_idx]
                if move == avoid_move and attempts < max_attempts - 1:
                    move_idx = (move_idx + 1) % 4
                    attempts += 1
                    continue
                new_pos = (x + move[0], y + move[1])
                if (0 <= new_pos[0] < self.model.grid.width) and (0 <= new_pos[1] < self.model.grid.height):
                    if new_pos not in self.model.occupied_positions or new_pos == self.pos:
                        self.model.grid.move_agent(self, new_pos)
                        self.model.occupied_positions.discard(self.pos)
                        self.model.occupied_positions.add(new_pos)
                        self.positions.append(new_pos)
                        break
                move_idx = (move_idx + 1) % 4
                attempts += 1
            else:
                self.positions.append(self.pos)
        else:
            while attempts < max_attempts:
                move = random.choice(moves)
                new_pos = (x + move[0], y + move[1])
                if (0 <= new_pos[0] < self.model.grid.width) and (0 <= new_pos[1] < self.model.grid.height):
                    if new_pos not in self.model.occupied_positions or new_pos == self.pos:
                        self.model.grid.move_agent(self, new_pos)
                        self.model.occupied_positions.discard(self.pos)
                        self.model.occupied_positions.add(new_pos)
                        self.positions.append(new_pos)
                        break
                attempts += 1
            else:
                self.positions.append(self.pos)

# TimeModel
class TimeModel(Model):
    def __init__(self, use_rule=True):
        super().__init__()
        self.grid = MultiGrid(10, 10, False)
        self.schedule = []
        self.random = random.Random()
        self.step_count = 0
        self.occupied_positions = set()
        self.use_rule = use_rule
        
        available_positions = [(x, y) for x in range(10) for y in range(10)]
        self.random.shuffle(available_positions)
        for i in range(30):
            agent = TimeAgent(self, use_rule=self.use_rule)
            start_pos = available_positions[i]
            self.grid.place_agent(agent, start_pos)
            agent.pos = start_pos
            agent.positions.append(start_pos)
            self.occupied_positions.add(start_pos)
            self.schedule.append(agent)

    def step(self):
        self.occupied_positions.clear()
        for agent in self.schedule:
            self.occupied_positions.add(agent.pos)
        random.shuffle(self.schedule)
        for agent in self.schedule:
            agent.step()
        self.step_count += 1

    def get_positions(self):
        sorted_agents = sorted(self.schedule, key=lambda a: a.unique_id)
        return [(agent.positions, [a.pos for a in sorted_agents if a != agent]) for agent in sorted_agents]

# TCN
class TCN(nn.Module):
    def __init__(self, input_size=60, output_size=5, num_channels=[128, 128, 128], kernel_size=7, dropout=0.2):
        super(TCN, self).__init__()
        layers = []
        for i in range(len(num_channels)):
            dilation = 2 ** i
            in_channels = input_size if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            padding = (kernel_size - 1) * dilation
            layers.append(nn.Conv1d(in_channels, out_channels, kernel_size, padding=padding, dilation=dilation))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            if padding > 0:
                layers.append(nn.ConstantPad1d((-padding, 0), 0))
        self.tcn = nn.Sequential(*layers)
        self.fc = nn.Linear(num_channels[-1], output_size)

    def forward(self, x):
        x = x.transpose(1, 2)
        out = self.tcn(x)
        out = out[:, :, -1]
        return out

# Data collection
def collect_and_save_data(num_runs=2000, filename="abm_data_with_rule_2.pkl", use_rule=True):
    all_data = []
    for run in range(num_runs):
        model = TimeModel(use_rule=use_rule)
        positions_history = []
        for _ in range(10):
            model.step()
            positions_history.append(model.get_positions())
        all_data.append(positions_history)
        if (run + 1) % 100 == 0:
            print(f"Completed {run + 1}/{num_runs} runs")
    with open(filename, 'wb') as f:
        pickle.dump(all_data, f)
    print(f"Saved {len(all_data)} runs to {filename}")
    return all_data

# Prepare data
def prepare_training_data(data, seq_len=5):
    X, y = [], []
    direction_map = {(0, 1): 0, (0, -1): 1, (1, 0): 2, (-1, 0): 3, (0, 0): 4}
    for run_data in data:
        for step_idx in range(len(run_data) - seq_len):
            step_data = run_data[step_idx:step_idx + seq_len]
            for positions, others in step_data[-1]:
                if len(positions) < seq_len + 1:
                    continue
                seq = positions[-seq_len - 1:-1]
                seq_data = []
                for pos in seq:
                    step_features = list(pos)
                    for ox, oy in others:
                        step_features.extend([ox - pos[0], oy - pos[1]])
                    while len(step_features) < 60:
                        step_features.extend([0, 0])
                    seq_data.append(step_features[:60])
                X.append(seq_data)
                x1, y1 = positions[-2]
                x2, y2 = positions[-1]
                direction = (x2 - x1, y2 - y1)
                y.append(direction_map[direction])
    X = torch.tensor(X, dtype=torch.float32) / 9.0
    y = torch.tensor(y, dtype=torch.long)
    return X, y

# Training
def train_tcn(X, y, epochs=300, batch_size=32, learning_rate=0.001, patience=20):
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
    train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
    val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)

    model = TCN(input_size=60, output_size=5, num_channels=[128, 128, 128], kernel_size=7, dropout=0.2)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_correct = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            train_correct += (outputs.argmax(dim=1) == batch_y).sum().item()

        model.eval()
        val_loss = 0
        val_correct = 0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                outputs = model(batch_X)
                val_loss += criterion(outputs, batch_y).item()
                val_correct += (outputs.argmax(dim=1) == batch_y).sum().item()

        scheduler.step()
        
        train_loss_avg = train_loss / len(train_loader)
        val_loss_avg = val_loss / len(val_loader)
        train_acc = train_correct / len(X_train)
        val_acc = val_correct / len(X_val)
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss_avg:.4f}, Val Loss: {val_loss_avg:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, LR: {scheduler.get_last_lr()[0]:.6f}")
        
        if val_loss_avg < best_val_loss:
            best_val_loss = val_loss_avg
            epochs_no_improve = 0
            best_model_state = model.state_dict()
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping triggered after {epoch+1} epochs")
                break
    
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    torch.save(model.state_dict(), "tcn_model_2.pth")
    print("Model saved to tcn_model_2.pth")
    return model

# Global variables
trained_model = None
direction_map = {0: "Up (0, 1)", 1: "Down (0, -1)", 2: "Right (1, 0)", 3: "Left (-1, 0)", 4: "No move (0, 0)"}
reverse_map = {(0, 1): 0, (0, -1): 1, (1, 0): 2, (-1, 0): 3, (0, 0): 4}

# Train with rule
sequences_with_rule = collect_and_save_data(num_runs=2000, filename="abm_data_with_rule_2.pkl", use_rule=True)
X, y = prepare_training_data(sequences_with_rule)
print(f"Loaded {len(sequences_with_rule)} runs with rule")
print(f"Training data shape: X={X.shape}, y={y.shape}")

print("\nTraining TCN (Direction Prediction with Rule)...")
trained_model = train_tcn(X, y, epochs=300, batch_size=32, learning_rate=0.001, patience=20)

# Compare with vs. without rule
def compare_rule_effects(runs=100, steps=10):
    no_rule_model = TimeModel(use_rule=False)
    no_rule_moves = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0}
    for _ in range(runs):
        for _ in range(steps):
            no_rule_model.step()
        for agent in no_rule_model.schedule:
            for i in range(len(agent.positions) - 1):
                dx, dy = agent.positions[i + 1][0] - agent.positions[i][0], agent.positions[i + 1][1] - agent.positions[i][1]
                move_idx = reverse_map.get((dx, dy), 4)
                no_rule_moves[move_idx] += 1

    with_rule_model = TimeModel(use_rule=True)
    with_rule_moves = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0}
    for _ in range(runs):
        for _ in range(steps):
            with_rule_model.step()
        for agent in with_rule_model.schedule:
            for i in range(len(agent.positions) - 1):
                dx, dy = agent.positions[i + 1][0] - agent.positions[i][0], agent.positions[i + 1][1] - agent.positions[i][1]
                move_idx = reverse_map.get((dx, dy), 4)
                with_rule_moves[move_idx] += 1

    total_no_rule = sum(no_rule_moves.values())
    total_with_rule = sum(with_rule_moves.values())
    print("\nNo Rule Move Frequencies:", {direction_map[k]: v/total_no_rule for k, v in no_rule_moves.items()})
    print("With Rule Move Frequencies:", {direction_map[k]: v/total_with_rule for k, v in with_rule_moves.items()})

# Run comparison
compare_rule_effects(runs=100, steps=10)

Completed 100/2000 runs
Completed 200/2000 runs
Completed 300/2000 runs
Completed 400/2000 runs
Completed 500/2000 runs
Completed 600/2000 runs
Completed 700/2000 runs
Completed 800/2000 runs
Completed 900/2000 runs
Completed 1000/2000 runs
Completed 1100/2000 runs
Completed 1200/2000 runs
Completed 1300/2000 runs
Completed 1400/2000 runs
Completed 1500/2000 runs
Completed 1600/2000 runs
Completed 1700/2000 runs
Completed 1800/2000 runs
Completed 1900/2000 runs
Completed 2000/2000 runs
Saved 2000 runs to abm_data_with_rule_2.pkl
Loaded 2000 runs with rule
Training data shape: X=torch.Size([300000, 5, 60]), y=torch.Size([300000])

Training TCN (Direction Prediction with Rule)...
Epoch [10/300], Train Loss: 2.3149, Val Loss: 1.7476, Train Acc: 0.3431, Val Acc: 0.3700, LR: 0.001000
Epoch [20/300], Train Loss: 2.2849, Val Loss: 1.7241, Train Acc: 0.3629, Val Acc: 0.3876, LR: 0.001000
Epoch [30/300], Train Loss: 2.2738, Val Loss: 1.7110, Train Acc: 0.3703, Val Acc: 0.3972, LR: 0.001000
Epoc

### Situation 3: Add rule -- Adds center bias to prioritize moves toward (5, 5)

In [2]:
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
import pickle
import random
from mesa import Agent, Model
from mesa.space import MultiGrid

# TimeAgent with Center Bias
class TimeAgent(Agent):
    def __init__(self, model, use_rule=True):
        super().__init__(model)
        self.model = model
        self.positions = []
        self.use_rule = use_rule

    def step(self):
        global trained_model
        x, y = self.pos
        moves = [(0, 1), (-1, 0), (0, -1), (1, 0)]
        
        avoid_move = None
        if len(self.positions) >= 5 and trained_model is not None and self.use_rule:
            seq = self.positions[-5:]
            others = [a.pos for a in self.model.schedule if a != self]
            input_data = []
            for pos in seq:
                step_features = list(pos)
                for ox, oy in others:
                    step_features.extend([ox - pos[0], oy - pos[1]])
                while len(step_features) < 60:
                    step_features.extend([0, 0])
                input_data.append(step_features[:60])
            input_data = torch.tensor([input_data], dtype=torch.float32) / 9.0
            with torch.no_grad():
                pred_dir = trained_model(input_data).argmax().item()
            avoid_move = list(reverse_map.keys())[pred_dir]

        attempts = 0
        max_attempts = 10
        move_idx = random.randint(0, 3)

        if self.use_rule:
            while attempts < max_attempts:
                move = moves[move_idx]
                if move == avoid_move and attempts < max_attempts - 1:
                    move_idx = (move_idx + 1) % 4
                    attempts += 1
                    continue
                new_pos = (x + move[0], y + move[1])
                if (0 <= new_pos[0] < self.model.grid.width) and (0 <= new_pos[1] < self.model.grid.height):
                    if new_pos not in self.model.occupied_positions or new_pos == self.pos:
                        self.model.grid.move_agent(self, new_pos)
                        self.model.occupied_positions.discard(self.pos)
                        self.model.occupied_positions.add(new_pos)
                        self.positions.append(new_pos)
                        break
                center_x, center_y = 5, 5
                if x < center_x and (1, 0) not in self.model.occupied_positions:
                    move_idx = 2
                elif x > center_x and (-1, 0) not in self.model.occupied_positions:
                    move_idx = 1
                elif y < center_y and (0, 1) not in self.model.occupied_positions:
                    move_idx = 0
                elif y > center_y and (0, -1) not in self.model.occupied_positions:
                    move_idx = 3
                else:
                    move_idx = (move_idx + 1) % 4
                attempts += 1
            else:
                self.positions.append(self.pos)
        else:
            while attempts < max_attempts:
                move = random.choice(moves)
                new_pos = (x + move[0], y + move[1])
                if (0 <= new_pos[0] < self.model.grid.width) and (0 <= new_pos[1] < self.model.grid.height):
                    if new_pos not in self.model.occupied_positions or new_pos == self.pos:
                        self.model.grid.move_agent(self, new_pos)
                        self.model.occupied_positions.discard(self.pos)
                        self.model.occupied_positions.add(new_pos)
                        self.positions.append(new_pos)
                        break
                attempts += 1
            else:
                self.positions.append(self.pos)

# TimeModel
class TimeModel(Model):
    def __init__(self, use_rule=True):
        super().__init__()
        self.grid = MultiGrid(10, 10, False)
        self.schedule = []
        self.random = random.Random()
        self.step_count = 0
        self.occupied_positions = set()
        self.use_rule = use_rule
        
        available_positions = [(x, y) for x in range(10) for y in range(10)]
        self.random.shuffle(available_positions)
        for i in range(30):
            agent = TimeAgent(self, use_rule=self.use_rule)
            start_pos = available_positions[i]
            self.grid.place_agent(agent, start_pos)
            agent.pos = start_pos
            agent.positions.append(start_pos)
            self.occupied_positions.add(start_pos)
            self.schedule.append(agent)

    def step(self):
        self.occupied_positions.clear()
        for agent in self.schedule:
            self.occupied_positions.add(agent.pos)
        random.shuffle(self.schedule)
        for agent in self.schedule:
            agent.step()
        self.step_count += 1

    def get_positions(self):
        sorted_agents = sorted(self.schedule, key=lambda a: a.unique_id)
        return [(agent.positions, [a.pos for a in sorted_agents if a != agent]) for agent in sorted_agents]

# TCN
class TCN(nn.Module):
    def __init__(self, input_size=60, output_size=5, num_channels=[128, 128, 128], kernel_size=7, dropout=0.2):
        super(TCN, self).__init__()
        layers = []
        for i in range(len(num_channels)):
            dilation = 2 ** i
            in_channels = input_size if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            padding = (kernel_size - 1) * dilation
            layers.append(nn.Conv1d(in_channels, out_channels, kernel_size, padding=padding, dilation=dilation))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout))
            if padding > 0:
                layers.append(nn.ConstantPad1d((-padding, 0), 0))
        self.tcn = nn.Sequential(*layers)
        self.fc = nn.Linear(num_channels[-1], output_size)

    def forward(self, x):
        x = x.transpose(1, 2)
        out = self.tcn(x)
        out = out[:, :, -1]
        return out

# Data collection (10 steps)
def collect_and_save_data(num_runs=2000, filename="abm_data_with_rule_3.pkl", use_rule=True):
    all_data = []
    for run in range(num_runs):
        model = TimeModel(use_rule=use_rule)
        positions_history = []
        for _ in range(10):
            model.step()
            positions_history.append(model.get_positions())
        all_data.append(positions_history)
        if (run + 1) % 100 == 0:
            print(f"Completed {run + 1}/{num_runs} runs")
    with open(filename, 'wb') as f:
        pickle.dump(all_data, f)
    print(f"Saved {len(all_data)} runs to {filename}")
    return all_data

# Prepare data
def prepare_training_data(data, seq_len=5):
    X, y = [], []
    direction_map = {(0, 1): 0, (0, -1): 1, (1, 0): 2, (-1, 0): 3, (0, 0): 4}
    for run_data in data:
        for step_idx in range(len(run_data) - seq_len + 1):
            step_data = run_data[step_idx:step_idx + seq_len]
            for positions, others in step_data[-1]:
                if len(positions) < step_idx + seq_len + 1:
                    continue
                seq = positions[step_idx:step_idx + seq_len]
                seq_data = []
                for pos in seq:
                    step_features = list(pos)
                    for ox, oy in others:
                        step_features.extend([ox - pos[0], oy - pos[1]])
                    while len(step_features) < 60:
                        step_features.extend([0, 0])
                    seq_data.append(step_features[:60])
                X.append(seq_data)
                x1, y1 = positions[step_idx + seq_len - 1]
                x2, y2 = positions[step_idx + seq_len]
                direction = (x2 - x1, y2 - y1)
                y.append(direction_map[direction])
    X = torch.tensor(X, dtype=torch.float32) / 9.0
    y = torch.tensor(y, dtype=torch.long)
    return X, y

# Training
def train_tcn(X, y, epochs=300, batch_size=64, learning_rate=0.001, patience=20):
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
    train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
    val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)

    model = TCN(input_size=60, output_size=5, num_channels=[128, 128, 128], kernel_size=7, dropout=0.2)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.5)

    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_correct = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            train_correct += (outputs.argmax(dim=1) == batch_y).sum().item()

        model.eval()
        val_loss = 0
        val_correct = 0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                outputs = model(batch_X)
                val_loss += criterion(outputs, batch_y).item()
                val_correct += (outputs.argmax(dim=1) == batch_y).sum().item()

        scheduler.step()
        
        train_loss_avg = train_loss / len(train_loader)
        val_loss_avg = val_loss / len(val_loader)
        train_acc = train_correct / len(X_train)
        val_acc = val_correct / len(X_val)
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss_avg:.4f}, Val Loss: {val_loss_avg:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, LR: {scheduler.get_last_lr()[0]:.6f}")
        
        if val_loss_avg < best_val_loss:
            best_val_loss = val_loss_avg
            epochs_no_improve = 0
            best_model_state = model.state_dict()
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping triggered after {epoch+1} epochs")
                break
    
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    torch.save(model.state_dict(), "tcn_model_3.pth")
    print("Model saved to tcn_model_3.pth")
    return model


In [3]:
# Global variables
trained_model = None
direction_map = {0: "Up (0, 1)", 1: "Down (0, -1)", 2: "Right (1, 0)", 3: "Left (-1, 0)", 4: "No move (0, 0)"}
reverse_map = {(0, 1): 0, (0, -1): 1, (1, 0): 2, (-1, 0): 3, (0, 0): 4}

# Train with rule
sequences_with_rule = collect_and_save_data(num_runs=2000, filename="abm_data_with_rule_3.pkl", use_rule=True)
X, y = prepare_training_data(sequences_with_rule)
print(f"Loaded {len(sequences_with_rule)} runs with rule")
print(f"Training data shape: X={X.shape}, y={y.shape}")

print("\nTraining TCN (Direction Prediction with Rule)...")
trained_model = train_tcn(X, y, epochs=300, batch_size=64, learning_rate=0.001, patience=20)

# Compare with vs. without rule
def compare_rule_effects(runs=100, steps=10):
    no_rule_model = TimeModel(use_rule=False)
    no_rule_moves = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0}
    for _ in range(runs):
        for _ in range(steps):
            no_rule_model.step()
        for agent in no_rule_model.schedule:
            for i in range(len(agent.positions) - 1):
                dx, dy = agent.positions[i + 1][0] - agent.positions[i][0], agent.positions[i + 1][1] - agent.positions[i][1]
                move_idx = reverse_map.get((dx, dy), 4)
                no_rule_moves[move_idx] += 1

    with_rule_model = TimeModel(use_rule=True)
    with_rule_moves = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0}
    for _ in range(runs):
        for _ in range(steps):
            with_rule_model.step()
        for agent in with_rule_model.schedule:
            for i in range(len(agent.positions) - 1):
                dx, dy = agent.positions[i + 1][0] - agent.positions[i][0], agent.positions[i + 1][1] - agent.positions[i][1]
                move_idx = reverse_map.get((dx, dy), 4)
                with_rule_moves[move_idx] += 1

    total_no_rule = sum(no_rule_moves.values())
    total_with_rule = sum(with_rule_moves.values())
    print("\nNo Rule Move Frequencies:", {direction_map[k]: v/total_no_rule for k, v in no_rule_moves.items()})
    print("With Rule Move Frequencies:", {direction_map[k]: v/total_with_rule for k, v in with_rule_moves.items()})

# Run comparison
compare_rule_effects(runs=100, steps=10)

Completed 100/2000 runs
Completed 200/2000 runs
Completed 300/2000 runs
Completed 400/2000 runs
Completed 500/2000 runs
Completed 600/2000 runs
Completed 700/2000 runs
Completed 800/2000 runs
Completed 900/2000 runs
Completed 1000/2000 runs
Completed 1100/2000 runs
Completed 1200/2000 runs
Completed 1300/2000 runs
Completed 1400/2000 runs
Completed 1500/2000 runs
Completed 1600/2000 runs
Completed 1700/2000 runs
Completed 1800/2000 runs
Completed 1900/2000 runs
Completed 2000/2000 runs
Saved 2000 runs to abm_data_with_rule_3.pkl
Loaded 2000 runs with rule
Training data shape: X=torch.Size([360000, 5, 60]), y=torch.Size([360000])

Training TCN (Direction Prediction with Rule)...
Epoch [10/300], Train Loss: 3.0290, Val Loss: 2.6938, Train Acc: 0.2859, Val Acc: 0.2952, LR: 0.001000
Epoch [20/300], Train Loss: 3.0124, Val Loss: 2.6229, Train Acc: 0.2934, Val Acc: 0.3073, LR: 0.001000
Epoch [30/300], Train Loss: 3.0106, Val Loss: 2.6400, Train Acc: 0.2961, Val Acc: 0.3086, LR: 0.001000
Epoc

## 3 agents in 5x5 grid 7 steps 
3 agents, 5x5 grid, 7 steps, Mesa 2.3.0, TCN-based

In [4]:
import mesa
print(mesa.__version__)

2.3.0


### Generate data

In [1]:
from mesa import Agent, Model
from mesa.space import MultiGrid
from mesa.time import RandomActivation
import random

class RetroAgent(Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        # Find a random empty position
        while True:
            pos = (random.randint(0, 4), random.randint(0, 4))
            if model.grid.is_cell_empty(pos):
                break
        
        # Don’t set self.pos here; let place_agent handle it
        model.grid.place_agent(self, pos)  # Place agent directly

    def step(self):
        possible_moves = ['up', 'down', 'left', 'right', 'stay']
        move = random.choice(possible_moves)
        new_pos = self.get_new_position(move)
        self.model.grid.move_agent(self, new_pos)

        # Each agent makes one move (or stays) per step
        '''
        # Allow move to occupied cell with 20% probability
        if random.random() < 0.2 or self.model.grid.is_cell_empty(new_pos):
            self.model.grid.move_agent(self, new_pos)
            break
        '''
    def get_new_position(self, move):
        x, y = self.pos
        if move == 'up': return (x, (y + 1) % 5)
        if move == 'down': return (x, (y - 1) % 5)
        if move == 'left': return ((x - 1) % 5, y)
        if move == 'right': return ((x + 1) % 5, y)
        return (x, y)

class RetroModel(Model):
    def __init__(self):
        super().__init__()  # Initialize Model to fix FutureWarning
        self.grid = MultiGrid(5, 5, torus=True)
        self.schedule = RandomActivation(self)
        self.positions = []
        for i in range(3):
            agent = RetroAgent(i, self)
            self.schedule.add(agent)
            # No need for self.grid.place_agent here; already done in RetroAgent

    def step(self):
        step_data = {
            'positions': {agent.unique_id: agent.pos for agent in self.schedule.agents},
            'collision': len(set(agent.pos for agent in self.schedule.agents)) < 3
        }
        self.positions.append(step_data)
        # step_positions = {agent.unique_id: agent.pos for agent in self.schedule.agents}
        # self.positions.append(step_positions)
        self.schedule.step()

    def run_simulation(self, steps=7):
        collisions = 0
        print("Step 0 (Initial):")
        for agent in self.schedule.agents:
            print(f"Agent {agent.unique_id}: {agent.pos}")
        print()
        
        for step in range(1, steps + 1):
            self.step()
            print(f"Step {step}:")
            positions = [agent.pos for agent in self.schedule.agents]
            if len(positions) != len(set(positions)):
                collisions += 1
                print("(Collision detected)")
            for agent in self.schedule.agents:
                print(f"Agent {agent.unique_id}: {agent.pos}")
            print()
        print(f"Collisions: {collisions}/{steps} steps")
        return self.positions  # Ensure this returns the list of position dictionaries

# Run the simulation
# model = RetroModel()
# model.run_simulation(steps=7)

In [2]:
import pickle
import pathlib  # Added for path handling

# Run multiple simulations and save data
all_positions = []
num_runs = 1000
total_collisions = 0

for run in range(num_runs):
    model = RetroModel()
    positions = model.run_simulation(steps=7)
    all_positions.append(positions)
    collisions = sum(1 for step in positions if step['collision'])
    # collisions = sum(1 for step in positions if len(set(step.values())) < 3)
    total_collisions += collisions

# Save pickle file in the same directory as RetroMind.ipynb
notebook_dir = pathlib.Path.cwd()  # Get current working directory (notebook's dir)
pickle_path = notebook_dir / 'abm_3agents_5x5.pkl'  # Construct path
pickle.dump(all_positions, open(pickle_path, 'wb'))

print(f"Total runs: {num_runs}")
print(f"Total collisions: {total_collisions} in {num_runs * 7} steps (~{total_collisions / num_runs:.2f}/run)")
print(f"Saved pickle file!")

Step 0 (Initial):
Agent 0: (4, 1)
Agent 1: (1, 0)
Agent 2: (0, 3)

Step 1:
Agent 0: (0, 1)
Agent 1: (2, 0)
Agent 2: (0, 4)

Step 2:
Agent 2: (0, 4)
Agent 0: (0, 2)
Agent 1: (2, 0)

Step 3:
Agent 0: (0, 1)
Agent 1: (2, 0)
Agent 2: (4, 4)

Step 4:
Agent 0: (4, 1)
Agent 1: (2, 1)
Agent 2: (4, 3)

Step 5:
(Collision detected)
Agent 2: (4, 2)
Agent 0: (3, 1)
Agent 1: (3, 1)

Step 6:
(Collision detected)
Agent 2: (4, 3)
Agent 1: (2, 1)
Agent 0: (2, 1)

Step 7:
Agent 0: (2, 2)
Agent 1: (1, 1)
Agent 2: (4, 3)

Collisions: 2/7 steps
Step 0 (Initial):
Agent 0: (2, 0)
Agent 1: (3, 3)
Agent 2: (2, 4)

Step 1:
(Collision detected)
Agent 0: (2, 4)
Agent 1: (2, 3)
Agent 2: (2, 3)

Step 2:
Agent 2: (2, 2)
Agent 1: (2, 4)
Agent 0: (2, 3)

Step 3:
Agent 0: (3, 3)
Agent 1: (2, 0)
Agent 2: (2, 2)

Step 4:
Agent 0: (3, 4)
Agent 2: (3, 2)
Agent 1: (1, 0)

Step 5:
Agent 1: (2, 0)
Agent 2: (3, 3)
Agent 0: (3, 0)

Step 6:
Agent 1: (1, 0)
Agent 2: (3, 3)
Agent 0: (4, 0)

Step 7:
Agent 0: (4, 1)
Agent 1: (1, 1)


Checks for Collision: len(set(step.values())) < 3 is true if there are fewer than 3 unique positions, 

meaning at least two agents share a position (collision).

positions = [
    
    {0: (2, 3), 1: (4, 1), 2: (1, 4)},  # len(set([(2, 3), (4, 1), (1, 4)])) = 3, no collision

    {0: (0, 1), 1: (2, 4), 2: (0, 1)},  # len(set([(0, 1), (2, 4), (0, 1)])) = 2, collision

    {0: (0, 2), 1: (2, 3), 2: (0, 2)}   # len(set([(0, 2), (2, 3), (0, 2)])) = 2, collision
]

collisions = sum(1 for step in positions if len(set(step.values())) < 3)

Result: collisions = 2 (steps 1 and 2 have collisions)

**Data description:** 

Baseline data comes from a simulation where all agents (0, 1, 2) move randomly (up, down, left, right, 

stay) on a 5x5 grid with torus=True, resulting in 705 collisions over 7000 steps

### Build TCN

In [4]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import sys
print(sys.executable)
print(tf.__version__)
print("Imports successful!")

d:\anaconda3\envs\retro_env\python.exe
2.17.0
Imports successful!


In [2]:
import pickle
import numpy as np
from tcn import TCN
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from sklearn.model_selection import train_test_split

# Load pickle file
with open('abm_3agents_5x5.pkl', 'rb') as f:
    all_positions = pickle.load(f)

# Preprocess data
def prepare_data(runs, timesteps=3):
    X, y = [], []
    for run in runs:
        for step in range(timesteps, len(run)):  # Start from step 3 to have 3 prior steps
            # Input: last 3 steps' positions and collisions
            input_seq = []
            for t in range(step - timesteps, step):
                pos = run[t]['positions']
                collision = 1 if run[t]['collision'] else 0
                # Flatten positions: [x0,y0,x1,y1,x2,y2,collision]
                input_seq.append([pos[0][0], pos[0][1], pos[1][0], pos[1][1], pos[2][0], pos[2][1], collision])
            X.append(input_seq)
            # Output: move of agent 0 at step (simplified; repeat for each agent)
            # Infer move by comparing pos at step-1 and step
            prev_pos = run[step-1]['positions'][0]
            curr_pos = run[step]['positions'][0]
            move = infer_move(prev_pos, curr_pos)
            y.append(move)
    return np.array(X), np.array(y)

def infer_move(prev_pos, curr_pos):
    px, py = prev_pos
    cx, cy = curr_pos
    if cx == px and cy == (py + 1) % 5: return 0  # up
    if cx == px and cy == (py - 1) % 5: return 1  # down
    if cx == (px - 1) % 5 and cy == py: return 2  # left
    if cx == (px + 1) % 5 and cy == py: return 3  # right
    return 4  # stay

X, y = prepare_data(all_positions)
y = np.eye(5)[y]  # One-hot encode moves

# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Build TCN model
model = Sequential([
    Input(shape=(3, 7)),  # Explicit input layer
    TCN(input_shape=(3, 7), nb_filters=32, kernel_size=2, dilations=[1, 2, 4], return_sequences=False),
    Dense(16, activation='relu'),
    Dense(5, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train model
model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=10, batch_size=32)

# Save model
model.save('tcn_model.keras')
print("TCN model trained and saved!")

Epoch 1/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 11ms/step - accuracy: 0.2071 - loss: 2.1597 - val_accuracy: 0.1863 - val_loss: 1.6652
Epoch 2/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.2080 - loss: 1.6229 - val_accuracy: 0.2087 - val_loss: 1.6180
Epoch 3/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.1952 - loss: 1.6096 - val_accuracy: 0.2087 - val_loss: 1.6129
Epoch 4/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.1885 - loss: 1.6067 - val_accuracy: 0.2125 - val_loss: 1.6258
Epoch 5/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.2245 - loss: 1.5996 - val_accuracy: 0.2025 - val_loss: 1.6260
Epoch 6/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.2278 - loss: 1.5955 - val_accuracy: 0.1875 - val_loss: 1.6227
Epoch 7/10
[1m100/100[0m 

- The current TCN is trained to predict random moves (supervised learning on infer_move), 

  not to minimize collisions. Random moves don’t inherently avoid collisions, 

  so the model learns a near-random policy (~20% accuracy expected).

- TCN should predict moves that reduce future collisions, requiring a reward-based approach rather than 

  mimicking random moves.

- **We need to add rule**: Implement the collision-avoidance and center-bias rule for Agent 0, 

  making agent behavior more predictable

### Rule: Collision Prediction with Safe Move

- Train TCN to predict collisions (binary: 0=no collision, 1=collision) for Agent 0’s next move. 

  If collision predicted, choose a random move that avoids others’ current positions; 

  else, bias to center (30%) or move randomly.

- **Retrocausal Connection:** The TCN directly predicts a future collision event, and Agent 0 acts to prevent it，

  simulating a future-to-present causal link. This rule focuses on the critical outcome (collision) rather than 

  specific moves, making the retrocausal effect explicit—Agent 0 “knows” a future crash and alters the present to avoid it.

In [3]:
import pickle
import pathlib
import numpy as np
from tcn import TCN
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from sklearn.model_selection import train_test_split
import tensorflow as tf
from mesa import Agent, Model
from mesa.space import MultiGrid
from mesa.time import RandomActivation
import random

# Step 1: Retrain TCN for collision prediction
with open('abm_3agents_5x5.pkl', 'rb') as f:
    all_positions = pickle.load(f)

def prepare_data(runs, timesteps=3):
    X, y = [], []
    for run in runs:
        for step in range(timesteps, len(run)):
            input_seq = []
            for t in range(step - timesteps, step):
                pos = run[t]['positions']
                collision = 1 if run[t]['collision'] else 0
                input_seq.append([pos[0][0], pos[0][1], pos[1][0], pos[1][1], pos[2][0], pos[2][1], collision])
            X.append(input_seq)
            y.append(1 if run[step]['collision'] else 0)  # Predict collision
    return np.array(X), np.array(y)

X, y = prepare_data(all_positions)
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Build TCN for binary classification
collision_model = Sequential([
    Input(shape=(3, 7)),
    TCN(input_shape=(3, 7), nb_filters=32, kernel_size=2, dilations=[1, 2, 4], return_sequences=False),
    Dense(16, activation='relu'),
    Dense(1, activation='sigmoid')
])

collision_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
collision_model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=10, batch_size=32)

# Add recall and F1-score calculation
from sklearn.metrics import classification_report
y_pred = (collision_model.predict(X_val) > 0.5).astype(int)
print("Classification Report for TCN Collision Prediction:")
print(classification_report(y_val, y_pred, target_names=['No Collision', 'Collision']))

collision_model.save('tcn_collision_model.keras')
print("TCN collision model trained and saved!")

Epoch 1/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 7ms/step - accuracy: 0.5806 - loss: 2.3198 - val_accuracy: 0.8825 - val_loss: 0.3748
Epoch 2/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.8707 - loss: 0.3846 - val_accuracy: 0.8813 - val_loss: 0.3734
Epoch 3/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - accuracy: 0.8767 - loss: 0.3647 - val_accuracy: 0.8825 - val_loss: 0.3673
Epoch 4/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.8727 - loss: 0.3706 - val_accuracy: 0.8813 - val_loss: 0.3727
Epoch 5/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.8693 - loss: 0.3621 - val_accuracy: 0.8800 - val_loss: 0.3736
Epoch 6/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.8768 - loss: 0.3483 - val_accuracy: 0.8763 - val_loss: 0.3828
Epoch 7/10
[1m100/100[0m 

High accuracy reflects the model correctly predicting "No Collision" for most of the 706 non-collision instances (88% of the 800 samples). 

However, it completely misses the 94 collision instances (0% recall for "Collision").

- No Collision: High precision (0.88), recall (1.00), and F1-score (0.94) show the TCN excels at identifying non-collision states.

- Collision: Zero precision, recall, and F1-score indicate the TCN never predicts collisions, rendering it ineffective for retrocausal collision avoidance.

- Imbalance: The dataset is heavily skewed (706:94, ~88% No Collision vs. ~12% Collision), causing the TCN to overfit to the majority class.

Reasonable loss values suggest the model is learning, but the imbalance skews optimization toward non-collision predictions.

In [None]:
# Step 2: Run simulations with collision prediction rule (17min)
# Applied to only Agent 0.
# TCN Role: Predicted collision probability (pred > 0.5) for Agent 0 using tcn_collision_model.keras.
# Other Agents (1, 2): Moved randomly

class RetroAgent(Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        self.tcn_model = tf.keras.models.load_model('tcn_collision_model.keras') if unique_id == 0 else None
        while True:
            pos = (random.randint(0, 4), random.randint(0, 4))
            if model.grid.is_cell_empty(pos):
                break
        model.grid.place_agent(self, pos)

    def step(self):
        if self.unique_id == 0 and len(self.model.positions) >= 3:  # Use TCN after 3 steps
            # Prepare input: last 3 steps' positions and collisions
            input_seq = []
            for t in range(-3, 0):
                pos = self.model.positions[t]['positions']
                collision = 1 if self.model.positions[t]['collision'] else 0
                input_seq.append([pos[0][0], pos[0][1], pos[1][0], pos[1][1], pos[2][0], pos[2][1], collision])
            input_seq = np.array([input_seq])  # Shape: (1, 3, 7)
            pred = self.tcn_model.predict(input_seq, verbose=0)[0][0]
            
            # Rule: If collision predicted (>0.5), choose safe move; else bias to center (30%) or random
            others_pos = [self.model.positions[-1]['positions'][i] for i in [1, 2]]
            moves = ['up', 'down', 'left', 'right', 'stay']
            if pred > 0.5:
                safe_moves = [m for m in moves if self.get_new_position(m) not in others_pos]
                move = random.choice(safe_moves) if safe_moves else random.choice(moves)
            elif random.random() < 0.3:
                move = self.get_center_bias_move()
            else:
                move = random.choice(moves)
        else:
            move = random.choice(['up', 'down', 'left', 'right', 'stay'])
        
        new_pos = self.get_new_position(move)
        self.model.grid.move_agent(self, new_pos)

    def get_center_bias_move(self):
        x, y = self.pos
        center = (2, 2)
        if x < center[0]: return 'right'
        if x > center[0]: return 'left'
        if y < center[1]: return 'up'
        if y > center[1]: return 'down'
        return 'stay'

    def get_new_position(self, move):
        x, y = self.pos
        if move == 'up': return (x, (y + 1) % 5)
        if move == 'down': return (x, (y - 1) % 5)
        if move == 'left': return ((x - 1) % 5, y)
        if move == 'right': return ((x + 1) % 5, y)
        return (x, y)

class RetroModel(Model):
    def __init__(self):
        super().__init__()
        self.grid = MultiGrid(5, 5, torus=True)
        self.schedule = RandomActivation(self)
        self.positions = []
        for i in range(3):
            agent = RetroAgent(i, self)
            self.schedule.add(agent)

    def step(self):
        step_data = {
            'positions': {agent.unique_id: agent.pos for agent in self.schedule.agents},
            'collision': len(set(agent.pos for agent in self.schedule.agents)) < 3
        }
        self.positions.append(step_data)
        self.schedule.step()

    def run_simulation(self, steps=7):
        collisions = 0
        for step in range(steps):
            self.step()
            if self.positions[-1]['collision']:
                collisions += 1
        return self.positions, collisions

# Run 1000 simulations
all_positions_tcn = []
total_collisions_tcn = 0
num_runs = 1000

for run in range(num_runs):
    model = RetroModel()
    positions, collisions = model.run_simulation(steps=7)
    all_positions_tcn.append(positions)
    total_collisions_tcn += collisions

# Save post-TCN data
notebook_dir = pathlib.Path.cwd()
pickle_path = notebook_dir / 'abm_3agents_5x5_tcn.pkl'
pickle.dump(all_positions_tcn, open(pickle_path, 'wb'))

# Compare collisions
baseline_collisions = 705
print(f"Baseline collisions (no TCN): {baseline_collisions} in {num_runs * 7} steps (~{baseline_collisions / num_runs:.3f}/run)")
print(f"Post-TCN collisions: {total_collisions_tcn} in {num_runs * 7} steps (~{total_collisions_tcn / num_runs:.3f}/run)")
if total_collisions_tcn < baseline_collisions:
    print(f"Success: Reduced collisions by {baseline_collisions - total_collisions_tcn}!")
else:
    print(f"Collisions not reduced. Consider refining rule or improving TCN accuracy.")
# print(f"Saved post-TCN data to {pickle_path}")

Baseline collisions (no TCN): 705 in 7000 steps (~0.705/run)
Post-TCN collisions: 666 in 7000 steps (~0.666/run)
Success: Reduced collisions by 39!


### Generate better training data with Some Collisions

In [8]:
from mesa import Agent, Model
from mesa.space import MultiGrid
from mesa.time import RandomActivation
import random
import pickle
import pathlib

class RetroAgent(Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        while True:
            pos = (random.randint(0, 4), random.randint(0, 4))
            if model.grid.is_cell_empty(pos):
                break
        model.grid.place_agent(self, pos)

    def step(self):
        others_pos = [agent.pos for agent in self.model.schedule.agents if agent.unique_id != self.unique_id]
        moves = ['up', 'down', 'left', 'right', 'stay']
        safe_moves = [m for m in moves if self.get_new_position(m) not in others_pos]
        # 70% chance to choose safe move, 30% chance for random move to allow some collisions
        if safe_moves and random.random() < 0.7:
            move = random.choice(safe_moves)
        else:
            move = random.choice(moves)
        self.model.grid.move_agent(self, self.get_new_position(move))

    def get_new_position(self, move):
        x, y = self.pos
        if move == 'up': return (x, (y + 1) % 5)
        if move == 'down': return (x, (y - 1) % 5)
        if move == 'left': return ((x - 1) % 5, y)
        if move == 'right': return ((x + 1) % 5, y)
        return (x, y)

class RetroModel(Model):
    def __init__(self):
        super().__init__()
        self.grid = MultiGrid(5, 5, torus=True)
        self.schedule = RandomActivation(self)
        self.positions = []
        for i in range(3):
            agent = RetroAgent(i, self)
            self.schedule.add(agent)

    def step(self):
        step_data = {
            'positions': {agent.unique_id: agent.pos for agent in self.schedule.agents},
            'collision': len(set(agent.pos for agent in self.schedule.agents)) < 3
        }
        self.positions.append(step_data)
        self.schedule.step()

    def run_simulation(self, steps=7):
        collisions = 0
        self.positions = []
        for step in range(steps):
            self.step()
            if len(set(agent.pos for agent in self.schedule.agents)) < 3:
                collisions += 1
        return self.positions, collisions

all_positions = []
num_runs = 1000
total_collisions = 0

for run in range(num_runs):
    model = RetroModel()
    positions, collisions = model.run_simulation(steps=7)
    all_positions.append(positions)
    total_collisions += collisions

notebook_dir = pathlib.Path.cwd()
pickle_path = notebook_dir / 'abm_3agents_5x5_better.pkl'
pickle.dump(all_positions, open(pickle_path, 'wb'))

print(f"Total runs: {num_runs}")
print(f"Total collisions: {total_collisions} in {num_runs * 7} steps (~{total_collisions / num_runs:.3f}/run)")
print(f"Saved pickle file!")

Total runs: 1000
Total collisions: 196 in 7000 steps (~0.196/run)
Saved pickle file!


### Extra experience -- If Apply TCN to all agents ......x

Applying the TCN to all three agents means all are now predicting and adjusting their moves, 

potentially leading to a feedback loop where agents’ avoidance actions interfere with each other.

**Example:** 

If all agents predict a collision at (3,3) and try to avoid it by turning left, 

they might all move to (2,3), causing a new collision. 

This synchronized adjustment creates a "herding effect," where agents cluster and collide more often.

In [20]:
import pickle
import numpy as np
from tcn import TCN
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input, Dropout
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

# Load pickle file
# EDIT HERE: Change filename if using a different pickle file
with open('abm_3agents_5x5_better.pkl', 'rb') as f:
    all_positions = pickle.load(f)

# Preprocess data
def prepare_data(runs, timesteps=3):
    X, y = [], []
    for run in runs:
        for step in range(timesteps, len(run)):
            for agent_id in range(3):
                input_seq = []
                for t in range(step - timesteps, step):
                    pos = run[t]['positions']
                    collision = 1 if run[t]['collision'] else 0
                    self_pos = [p / 4 for p in pos[agent_id]]  # Normalize to [0, 1]
                    other1_pos = pos[(agent_id + 1) % 3]
                    other2_pos = pos[(agent_id + 2) % 3]
                    rel_dist1 = [(self_pos[0] - other1_pos[0]) / 4, (self_pos[1] - other1_pos[1]) / 4]  # Normalize to [-1, 1]
                    rel_dist2 = [(self_pos[0] - other2_pos[0]) / 4, (self_pos[1] - other2_pos[1]) / 4]
                    # Add collision history for context
                    collision_history = 1 if any(run[tt]['collision'] for tt in range(max(0, t - 2), t + 1)) else 0
                    input_seq.append([self_pos[0], self_pos[1], rel_dist1[0], rel_dist1[1], rel_dist2[0], rel_dist2[1], collision_history])
                X.append(input_seq)
                prev_pos = run[step - 1]['positions'][agent_id]
                other_pos = [run[step - 1]['positions'][i] for i in range(3) if i != agent_id]
                # Relabel to prioritize safe moves and maximize Manhattan distance
                safe_moves = [m for m in range(5) if compute_new_pos(prev_pos, m) not in other_pos]
                if safe_moves:
                    # EDIT HERE: Comment out to disable distance maximization; use random.choice(safe_moves) instead
                    move = max(safe_moves, key=lambda m: sum(abs(compute_new_pos(prev_pos, m)[0] - p[0]) + abs(compute_new_pos(prev_pos, m)[1] - p[1]) for p in other_pos))
                else:
                    move = infer_move(prev_pos, run[step]['positions'][agent_id])
                y.append(move)
    return np.array(X), np.array(y)

def compute_new_pos(pos, move):
    x, y = pos
    if move == 0: return (x, (y + 1) % 5)  # up
    if move == 1: return (x, (y - 1) % 5)  # down
    if move == 2: return ((x - 1) % 5, y)  # left
    if move == 3: return ((x + 1) % 5, y)  # right
    return (x, y)  # stay

def infer_move(prev_pos, curr_pos):
    px, py = prev_pos
    cx, cy = curr_pos
    if cx == px and cy == (py + 1) % 5: return 0  # up
    if cx == px and cy == (py - 1) % 5: return 1  # down
    if cx == (px - 1) % 5 and cy == py: return 2  # left
    if cx == (px + 1) % 5 and cy == py: return 3  # right
    return 4  # stay

X, y = prepare_data(all_positions)
y_one_hot = np.eye(5)[y]

# Check move distribution
# EDIT HERE: Comment out after checking to reduce output
y_labels = np.argmax(y_one_hot, axis=1)
unique, counts = np.unique(y_labels, return_counts=True)
print("Move distribution:", dict(zip(['up', 'down', 'left', 'right', 'stay'], counts / len(y_labels))))

X_train, X_val, y_train, y_val = train_test_split(X, y_one_hot, test_size=0.2, random_state=42)

# Class weights
# EDIT HERE: Comment out to test without class weights
class_weights = compute_class_weight('balanced', classes=np.arange(5), y=y_labels)
class_weights = dict(enumerate(class_weights))
print("Class weights:", class_weights)

# TCN model
# EDIT HERE: Adjust nb_filters (e.g., 32), dilations (e.g., [1]), or dropout (e.g., 0.3) if needed
model = Sequential([
    Input(shape=(3, 7)),
    TCN(nb_filters=32, kernel_size=2, dilations=[1, 2], return_sequences=False),
    Dropout(0.2),
    Dense(16, activation='relu', kernel_regularizer=l2(0.01)),
    Dropout(0.2),
    Dense(5, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train
# EDIT HERE: Adjust epochs (e.g., 50), batch_size (e.g., 32), or remove class_weights
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=30, batch_size=64, class_weight=class_weights, callbacks=[early_stopping])

# Save
# EDIT HERE: Change filename if needed
model.save('tcn_model_all_agents_better.keras')
print("TCN model retrained and saved!")

Move distribution: {'up': 0.3680833333333333, 'down': 0.2668333333333333, 'left': 0.18933333333333333, 'right': 0.14725, 'stay': 0.0285}
Class weights: {0: 0.5433552184740774, 1: 0.749531542785759, 2: 1.056338028169014, 3: 1.3582342954159592, 4: 7.017543859649122}
Epoch 1/30
[1m150/150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 6ms/step - accuracy: 0.2742 - loss: 1.8288 - val_accuracy: 0.3758 - val_loss: 1.6543
Epoch 2/30
[1m150/150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - accuracy: 0.3301 - loss: 1.6304 - val_accuracy: 0.4321 - val_loss: 1.4804
Epoch 3/30
[1m150/150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - accuracy: 0.3620 - loss: 1.5562 - val_accuracy: 0.4929 - val_loss: 1.3381
Epoch 4/30
[1m150/150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.4196 - loss: 1.4113 - val_accuracy: 0.5996 - val_loss: 1.1777
Epoch 5/30
[1m150/150[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step -

In [None]:
# Compare collisions
# 40 min
import pickle
import numpy as np
import tensorflow as tf
from mesa import Agent, Model
from mesa.space import MultiGrid
from mesa.time import RandomActivation
import random

class RetroAgent(Agent):
    def __init__(self, unique_id, model):
        super().__init__(unique_id, model)
        # Load TCN model
        # EDIT HERE: Change filename if using a different model
        self.tcn_model = tf.keras.models.load_model('tcn_model_all_agents_better.keras')
        while True:
            pos = (random.randint(0, 4), random.randint(0, 4))
            if model.grid.is_cell_empty(pos):
                break
        model.grid.place_agent(self, pos)

    def step(self):
        if len(self.model.positions) >= 3:
            input_seq = []
            for t in range(-3, 0):
                pos = self.model.positions[t]['positions']
                collision = 1 if self.model.positions[t]['collision'] else 0
                agent_id = self.unique_id
                self_pos = [p / 4 for p in pos[agent_id]]  # Normalize to [0, 1]
                other1_pos = pos[(agent_id + 1) % 3]
                other2_pos = pos[(agent_id + 2) % 3]
                rel_dist1 = [(self_pos[0] - other1_pos[0]) / 4, (self_pos[1] - other1_pos[1]) / 4]  # Normalize to [-1, 1]
                rel_dist2 = [(self_pos[0] - other2_pos[0]) / 4, (self_pos[1] - other2_pos[1]) / 4]
                collision_history = 1 if any(self.model.positions[tt]['collision'] for tt in range(max(0, t - 2), t + 1)) else 0
                input_seq.append([self_pos[0], self_pos[1], rel_dist1[0], rel_dist1[1], rel_dist2[0], rel_dist2[1], collision_history])
            input_seq = np.array([input_seq])
            move_probs = self.tcn_model.predict(input_seq, verbose=0)[0]
            moves = ['up', 'down', 'left', 'right', 'stay']
            # Hybrid approach: Check if TCN move is safe
            others_pos = [self.model.positions[-1]['positions'][i] for i in range(3) if i != self.unique_id]
            safe_moves = [m for m in moves if self.get_new_position(m) not in others_pos]
            move_idx = np.argmax(move_probs)
            move = moves[move_idx]
            # EDIT HERE: Adjust fallback strategy (e.g., random.choice(moves) for no safety check)
            if safe_moves and move not in safe_moves:
                move = random.choice(safe_moves)
        else:
            move = random.choice(['up', 'down', 'left', 'right', 'stay'])
        self.model.grid.move_agent(self, self.get_new_position(move))

    def get_new_position(self, move):
        x, y = self.pos
        if move == 'up': return (x, (y + 1) % 5)
        if move == 'down': return (x, (y - 1) % 5)
        if move == 'left': return ((x - 1) % 5, y)
        if move == 'right': return ((x + 1) % 5, y)
        return (x, y)

class RetroModel(Model):
    def __init__(self):
        super().__init__()
        self.grid = MultiGrid(5, 5, torus=True)
        self.schedule = RandomActivation(self)
        self.positions = []
        for i in range(3):
            agent = RetroAgent(i, self)
            self.schedule.add(agent)

    def step(self):
        step_data = {
            'positions': {agent.unique_id: agent.pos for agent in self.schedule.agents},
            'collision': len(set(agent.pos for agent in self.schedule.agents)) < 3
        }
        self.positions.append(step_data)
        self.schedule.step()

    def run_simulation(self, steps=7):
        collisions = 0
        self.positions = []
        manhattan_sums = []
        for step in range(steps):
            self.step()
            if len(set(agent.pos for agent in self.schedule.agents)) < 3:
                collisions += 1
            # Calculate Manhattan distance sum for this step
            positions = [agent.pos for agent in self.schedule.agents]
            manhattan_sum = (
                abs(positions[0][0] - positions[1][0]) + abs(positions[0][1] - positions[1][1]) +
                abs(positions[1][0] - positions[2][0]) + abs(positions[1][1] - positions[2][1]) +
                abs(positions[2][0] - positions[0][0]) + abs(positions[2][1] - positions[0][1])
            )
            manhattan_sums.append(manhattan_sum)
        return self.positions, collisions, np.mean(manhattan_sums)

# Run simulations
# EDIT HERE: Adjust number of runs (e.g., 100 for testing)
all_positions_tcn = []
total_collisions_tcn = 0
total_manhattan_sums = []
for run in range(1000):
    model = RetroModel()
    positions, collisions, avg_manhattan_sum = model.run_simulation(steps=7)
    all_positions_tcn.append(positions)
    total_collisions_tcn += collisions
    total_manhattan_sums.append(avg_manhattan_sum)

# Save results
# EDIT HERE: Change filename if needed
with open('abm_3agents_5x5_tcn_all_agents_better.pkl', 'wb') as f:
    pickle.dump(all_positions_tcn, f)

print(f"Baseline collisions: 196 in 7000 steps (~0.196/run)")
print(f"Post-TCN collisions: {total_collisions_tcn} in 7000 steps (~{total_collisions_tcn / 1000:.3f}/run)")
print(f"Average Manhattan distance sum per run: {np.mean(total_manhattan_sums):.2f}")

Baseline collisions: 196 in 7000 steps (~0.196/run)
Post-TCN collisions: 1322 in 7000 steps (~1.322/run)
Average Manhattan distance sum per run: 11.16


### Step 1: Generate a Mixed Dataset
Rule: Collision Prediction with Safe Move

In [11]:
import mesa
import random
import pickle
import pathlib
import numpy as np

class RetroAgent(mesa.Agent):
    def __init__(self, unique_id, model, allow_collisions=False):
        super().__init__(unique_id, model)
        self.allow_collisions = allow_collisions
        while True:
            pos = (random.randint(0, model.grid.width - 1), random.randint(0, model.grid.height - 1))
            if model.grid.is_cell_empty(pos):
                break
        model.grid.place_agent(self, pos)

    def get_relative_positions(self):
        others = [agent for agent in self.model.schedule.agents if agent.unique_id != self.unique_id]
        rel_pos = []
        for other in others:
            dx = other.pos[0] - self.pos[0]
            dy = other.pos[1] - self.pos[1]
            rel_pos.extend([dx, dy])
        return rel_pos

    def get_new_position(self, move):
        x, y = self.pos
        max_x, max_y = self.model.grid.width - 1, self.model.grid.height - 1
        if move == 'up' and y < max_y: return (x, y + 1)
        if move == 'down' and y > 0: return (x, y - 1)
        if move == 'left' and x > 0: return (x - 1, y)
        if move == 'right' and x < max_x: return (x + 1, y)
        if move == 'stay': return (x, y)
        return (x, y)

    def step(self):
        others = [agent for agent in self.model.schedule.agents if agent.unique_id != self.unique_id]
        others_pos = [agent.pos for agent in others]
        moves = ['up', 'down', 'left', 'right', 'stay']
        if not self.allow_collisions:
            safe_moves = [m for m in moves if self.get_new_position(m) not in others_pos]
            move = random.choice(safe_moves) if safe_moves else 'stay'
        else:
            if random.random() < 0.9:  # EDIT: Increased from 0.6 to 0.9 for more collisions
                target_pos = random.choice(others_pos) if others_pos else self.pos
                # EDIT: Prioritize moving toward Agent 0 if available
                agent0_pos = [agent.pos for agent in others if agent.unique_id == 0]
                target_pos = agent0_pos[0] if agent0_pos and self.unique_id != 0 else target_pos
                dx, dy = target_pos[0] - self.pos[0], target_pos[1] - self.pos[1]
                possible_moves = []
                if dx > 0: possible_moves.append('right')
                elif dx < 0: possible_moves.append('left')
                if dy > 0: possible_moves.append('up')
                elif dy < 0: possible_moves.append('down')  # FIXED: Changed from 'dy <-stats' to 'dy < 0'
                move = random.choice(possible_moves) if possible_moves else 'stay'
            else:
                move = random.choice(moves)
        new_pos = self.get_new_position(move)
        self.model.grid.move_agent(self, new_pos)

class RetroModel(mesa.Model):
    def __init__(self, allow_collisions=False, width=5, height=5):
        super().__init__()
        self.grid = mesa.space.MultiGrid(width, height, torus=False)
        self.schedule = mesa.time.RandomActivation(self)
        self.allow_collisions = allow_collisions
        self.data = []
        self.agent0_collisions = 0
        for i in range(3):
            agent = RetroAgent(i, self, allow_collisions)
            self.schedule.add(agent)

    def step(self):
        self.schedule.step()
        agent0 = next(agent for agent in self.schedule.agents if agent.unique_id == 0)
        others_pos = [agent.pos for agent in self.schedule.agents if agent.unique_id != 0]
        # EDIT: Compute collision based on Agent 0's next position
        moves = ['up', 'down', 'left', 'right', 'stay']
        next_move = random.choice(moves)  # Simulate a potential move for t+1
        next_pos = agent0.get_new_position(next_move)
        collision = next_pos in others_pos
        if collision:
            self.agent0_collisions += 1
        # EDIT: Store Agent 0's data only with t+1 collision label
        if agent0.unique_id == 0:
            self.data.append({
                'agent_id': agent0.unique_id,
                'step': self.schedule.steps,
                'pos': agent0.pos,
                'rel_pos': agent0.get_relative_positions(),
                'next_move': next_move,
                'collision': collision
            })

    def run_simulation(self, steps=7):
        self.data = []
        self.agent0_collisions = 0
        for _ in range(steps):
            self.step()
        return self.data, self.agent0_collisions

def collect_mixed_data(num_sims_collision_prone=800, num_sims_collision_free=200, steps=7):
    all_data = []
    total_collisions = 0
    notebook_dir = pathlib.Path.cwd()
    mixed_data_dir = notebook_dir / 'MixedData'
    mixed_data_dir.mkdir(exist_ok=True)
    pickle_path = mixed_data_dir / 'abm_mixed_3agents_5x5_collisions.pkl'
    # EDIT: Adjusted ratio to 700:300 for more collisions
    for _ in range(num_sims_collision_free):
        model = RetroModel(allow_collisions=False, width=5, height=5)
        data, collisions = model.run_simulation(steps)
        all_data.extend(data)
        total_collisions += collisions
    for _ in range(num_sims_collision_prone):
        model = RetroModel(allow_collisions=True, width=4, height=4) # 4x4 for collision-prone
        data, collisions = model.run_simulation(steps)
        all_data.extend(data)
        total_collisions += collisions
    with open(pickle_path, 'wb') as f:
        pickle.dump(all_data, f)
    return all_data, total_collisions, pickle_path

# EDIT: Add data preparation function for TCN
def prepare_training_data(data, seq_len=5):
    agent0_data = [d for d in data if d['agent_id'] == 0]
    X, y = [], []
    for i in range(len(agent0_data) - seq_len):
        sequence = []
        for j in range(i, i + seq_len):
            # Normalize positions to [0, 1] based on grid size
            max_x = 4 if any(d['step'] == agent0_data[j]['step'] and 'width' in d and d['width'] == 4 for d in data) else 5
            max_y = max_x
            pos_x = agent0_data[j]['pos'][0] / max_x
            pos_y = agent0_data[j]['pos'][1] / max_y
            rel_pos = [x / max_x if i % 2 == 0 else x / max_y for i, x in enumerate(agent0_data[j]['rel_pos'])]
            features = [pos_x, pos_y] + rel_pos
            sequence.append(features)

            #features = [agent0_data[j]['pos'][0], agent0_data[j]['pos'][1]] + agent0_data[j]['rel_pos']
            #sequence.append(features)
        X.append(sequence)
        y.append(agent0_data[i + seq_len]['collision'])
    return np.array(X), np.array(y)

# Run and analyze
data, total_collisions, pickle_path = collect_mixed_data()
agent0_data = [d for d in data if d['agent_id'] == 0]
agent0_collisions = sum(d['collision'] for d in agent0_data)
print(f"Total runs: {1000}")
print(f"Total data points: {len(data)}")
print(f"Agent 0 collisions: {agent0_collisions} in {len(agent0_data)} steps (~{agent0_collisions/len(agent0_data):.3f}/step)")
print(f"Total collisions (all runs): {total_collisions}")
print(f"Saved pickle file!")
X, y = prepare_training_data(data)
print(f"Prepared training data: X shape {X.shape}, y shape {y.shape}")

Total runs: 1000
Total data points: 7000
Agent 0 collisions: 1979 in 7000 steps (~0.283/step)
Total collisions (all runs): 1979
Saved pickle file!
Prepared training data: X shape (6995, 5, 6), y shape (6995,)


### Step 2: Train TCN

Predict Agent 0 only t+1 collision

In [15]:
import pickle
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score
import pathlib

# Define TCN model
class TCN(nn.Module):
    def __init__(self, input_size=6, num_channels=[64, 64, 32], kernel_size=5, output_size=1):
        super(TCN, self).__init__()
        layers = []
        for i in range(len(num_channels)):
            dilation = 2 ** i
            layers += [
                nn.Conv1d(input_size if i == 0 else num_channels[i-1], num_channels[i],
                          kernel_size, padding=(kernel_size-1)*dilation, dilation=dilation),
                nn.ReLU(),
                nn.BatchNorm1d(num_channels[i]),
                nn.Dropout(0.2)
            ]
        self.tcn = nn.Sequential(*layers)
        self.fc = nn.Linear(num_channels[-1], output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.tcn(x)  # [batch_size, num_channels[-1], seq_len]
        x = x[:, :, -1]  # [batch_size, num_channels[-1]]
        x = self.fc(x)  # [batch_size, output_size=1]
        x = self.sigmoid(x).squeeze(-1)  # [batch_size]
        return x

# Load dataset
notebook_dir = pathlib.Path.cwd()
mixed_data_dir = notebook_dir / 'MixedData'
pickle_path = mixed_data_dir / 'abm_mixed_3agents_5x5_collisions.pkl'
with open(pickle_path, 'rb') as f:
    data = pickle.load(f)

# Prepare data
def prepare_training_data(data, seq_len=5):
    agent0_data = [d for d in data if d['agent_id'] == 0]
    X, y = [], []
    for i in range(len(agent0_data) - seq_len):
        sequence = []
        for j in range(i, i + seq_len):
            max_x = 4 if any(d['step'] == agent0_data[j]['step'] and 'width' in d and d['width'] == 4 for d in data) else 5
            max_y = max_x
            pos_x = agent0_data[j]['pos'][0] / max_x
            pos_y = agent0_data[j]['pos'][1] / max_y
            rel_pos = [x / max_x if i % 2 == 0 else x / max_y for i, x in enumerate(agent0_data[j]['rel_pos'])]
            features = [pos_x, pos_y] + rel_pos
            sequence.append(features)
        X.append(sequence)
        y.append(agent0_data[i + seq_len]['collision'])
    return np.array(X), np.array(y)

X, y = prepare_training_data(data)
print(f"Loaded data: X shape {X.shape}, y shape {y.shape}, collision rate {y.mean():.3f}")

# Split data
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
train_dataset = TensorDataset(torch.FloatTensor(X_train).permute(0, 2, 1), torch.FloatTensor(y_train))
val_dataset = TensorDataset(torch.FloatTensor(X_val).permute(0, 2, 1), torch.FloatTensor(y_val))
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

# Initialize model, loss, and optimizer
model = TCN(input_size=6, num_channels=[64, 64, 32], kernel_size=5, output_size=1)
pos_weight = torch.tensor([(1 - y.mean()) / y.mean()])  # Weight for positive class (~2.53 for 28.3% collisions)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)  # Use BCEWithLogitsLoss for stability
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)

# Training loop
best_val_acc = 0
patience, counter = 15, 0
for epoch in range(100):
    model.train()
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)  # [batch_size]
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
    
    # Validation
    model.eval()
    val_correct = 0
    total = 0
    val_preds = []
    val_true = []
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            outputs = model(X_batch)  # [batch_size]
            predicted = (outputs > 0.5).float()
            total += y_batch.size(0)
            val_correct += (predicted == y_batch).sum().item()
            val_preds.extend(predicted.cpu().numpy())
            val_true.extend(y_batch.cpu().numpy())
    val_acc = val_correct / total
    val_precision = precision_score(val_true, val_preds, zero_division=0)
    val_recall = recall_score(val_true, val_preds, zero_division=0)
    print(f"Epoch {epoch+1}/100 - Val Accuracy: {val_acc:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}")
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), "tcn_model.pt")
        counter = 0
    else:
        counter += 1
    
    # Early stopping
    if counter >= patience:
        print("Early stopping")
        break

print(f"Best validation accuracy: {best_val_acc:.4f}")

# Final evaluation
model.load_state_dict(torch.load("tcn_model.pt"))
model.eval()
val_preds = []
val_true = []
with torch.no_grad():
    for X_batch, y_batch in val_loader:
        outputs = model(X_batch)
        predicted = (outputs > 0.5).float()
        val_preds.extend(predicted.cpu().numpy())
        val_true.extend(y_batch.cpu().numpy())
final_acc = (np.array(val_preds) == np.array(val_true)).mean()
final_precision = precision_score(val_true, val_preds, zero_division=0)
final_recall = recall_score(val_true, val_preds, zero_division=0)
print(f"Final metrics - Accuracy: {final_acc:.4f}, Precision: {final_precision:.4f}, Recall: {final_recall:.4f}")

Loaded data: X shape (6995, 5, 6), y shape (6995,), collision rate 0.283
Epoch 1/100 - Val Accuracy: 0.7212, Precision: 0.4912, Recall: 0.1443
Epoch 2/100 - Val Accuracy: 0.7269, Precision: 0.5455, Recall: 0.0928
Epoch 3/100 - Val Accuracy: 0.6955, Precision: 0.4195, Recall: 0.2552
Epoch 4/100 - Val Accuracy: 0.7105, Precision: 0.4525, Recall: 0.2088
Epoch 5/100 - Val Accuracy: 0.6962, Precision: 0.4280, Recall: 0.2835
Epoch 6/100 - Val Accuracy: 0.6969, Precision: 0.4371, Recall: 0.3222
Epoch 7/100 - Val Accuracy: 0.6955, Precision: 0.4354, Recall: 0.3299
Epoch 8/100 - Val Accuracy: 0.6969, Precision: 0.4313, Recall: 0.2912
Epoch 9/100 - Val Accuracy: 0.6955, Precision: 0.4367, Recall: 0.3376
Epoch 10/100 - Val Accuracy: 0.6976, Precision: 0.4415, Recall: 0.3402
Epoch 11/100 - Val Accuracy: 0.6998, Precision: 0.4433, Recall: 0.3222
Epoch 12/100 - Val Accuracy: 0.6941, Precision: 0.4367, Recall: 0.3557
Epoch 13/100 - Val Accuracy: 0.6969, Precision: 0.4434, Recall: 0.3634
Epoch 14/100 

### Step 3: Compare Retrocausal Rule

In [5]:
import mesa
import random
import pickle
import pathlib
import numpy as np
import torch
import torch.nn as nn

# TCN Model (same as training code for consistency)
class TCN(nn.Module):
    def __init__(self, input_size, num_channels, kernel_size, dropout=0.2):
        super(TCN, self).__init__()
        self.layers = nn.ModuleList()
        self.residuals = nn.ModuleList()
        for i in range(len(num_channels)):
            dilation = 2 ** i
            in_channels = input_size if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            self.layers.append(nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size, padding=(kernel_size-1) * dilation, dilation=dilation),
                nn.BatchNorm1d(out_channels),
                nn.ReLU(),
                nn.Dropout(dropout)
            ))
            self.residuals.append(
                nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else nn.Identity()
            )
        self.fc = nn.Linear(num_channels[-1], 1)  # Single output for t+1 collision

    def forward(self, x):
        for i, (layer, residual) in enumerate(zip(self.layers, self.residuals)):
            residual_x = residual(x)
            x = layer(x)
            x = x + residual_x if x.size() == residual_x.size() else x
        x = x[:, :, -1]
        return self.fc(x)

class RetroAgent(mesa.Agent):
    def __init__(self, unique_id, model, allow_collisions=False):
        super().__init__(unique_id, model)
        self.allow_collisions = allow_collisions
        while True:
            pos = (random.randint(0, 4), random.randint(0, 4))
            if model.grid.is_cell_empty(pos):
                break
        model.grid.place_agent(self, pos)

    def get_relative_positions(self):
        others = [agent for agent in self.model.schedule.agents if agent.unique_id != self.unique_id]
        rel_pos = []
        for other in others:
            dx = other.pos[0] - self.pos[0]
            dy = other.pos[1] - self.pos[1]
            rel_pos.extend([dx, dy])
        # Ensure exactly 4 values (for 2 other agents)
        while len(rel_pos) < 4:
            rel_pos.extend([0.0, 0.0])
        return rel_pos

    def get_new_position(self, move):
        x, y = self.pos
        if move == 'up' and y < 4: return (x, y + 1)
        if move == 'down' and y > 0: return (x, y - 1)
        if move == 'left' and x > 0: return (x - 1, y)
        if move == 'right' and x < 4: return (x + 1, y)
        if move == 'stay': return (x, y)
        return (x, y)

    def step(self):
        if self.unique_id == 0 and self.model.tcn is not None:
            # Prepare TCN input (mimic training preprocessing)
            pos_list = list(self.pos)  # Convert tuple to list
            rel_pos = self.get_relative_positions()  # List of [dx1, dy1, dx2, dy2]
            input_seq = [pos_list + rel_pos] * 5  # Replicate current step
            input_seq = [[x / 4.0 for x in input_seq[0]]] * 5  # Normalize
            input_tensor = torch.tensor([input_seq], dtype=torch.float32).transpose(1, 2).to(self.model.device)
            with torch.no_grad():
                collision_prob = torch.sigmoid(self.model.tcn(input_tensor)).item()
            others_pos = [agent.pos for agent in self.model.schedule.agents if agent.unique_id != 0]
            moves = ['up', 'down', 'left', 'right', 'stay']
            if collision_prob > 0.5:
                safe_moves = [m for m in moves if self.get_new_position(m) not in others_pos]
                move = random.choice(safe_moves) if safe_moves else 'stay'
            else:
                # 30% chance to move toward center (2,2)
                if random.random() < 0.3 and self.pos != (2, 2):
                    dx, dy = 2 - self.pos[0], 2 - self.pos[1]
                    possible_moves = []
                    if dx > 0: possible_moves.append('right')
                    elif dx < 0: possible_moves.append('left')
                    if dy > 0: possible_moves.append('up')
                    elif dy < 0: possible_moves.append('down')
                    move = random.choice(possible_moves) if possible_moves else 'stay'
                else:
                    move = random.choice(moves)
        else:
            # Other agents or no-TCN case: collision-prone or collision-free logic
            others = [agent for agent in self.model.schedule.agents if agent.unique_id != self.unique_id]
            others_pos = [agent.pos for agent in others]
            moves = ['up', 'down', 'left', 'right', 'stay']
            if not self.allow_collisions:
                safe_moves = [m for m in moves if self.get_new_position(m) not in others_pos]
                move = random.choice(safe_moves) if safe_moves else 'stay'
            else:
                if random.random() < 0.6:  # 60% chance to move toward others
                    target_pos = random.choice(others_pos) if others_pos else self.pos
                    dx, dy = target_pos[0] - self.pos[0], target_pos[1] - self.pos[1]
                    possible_moves = []
                    if dx > 0: possible_moves.append('right')
                    elif dx < 0: possible_moves.append('left')
                    if dy > 0: possible_moves.append('up')
                    elif dy < 0: possible_moves.append('down')
                    move = random.choice(possible_moves) if possible_moves else 'stay'
                else:
                    move = random.choice(moves)
        new_pos = self.get_new_position(move)
        self.model.grid.move_agent(self, new_pos)

class RetroModel(mesa.Model):
    def __init__(self, allow_collisions=False, tcn_path=None):
        super().__init__()
        self.grid = mesa.space.MultiGrid(5, 5, torus=False)
        self.schedule = mesa.time.RandomActivation(self)
        self.allow_collisions = allow_collisions
        self.data = []
        self.agent0_collisions = 0
        self.device = torch.device("cpu")  # Match training device
        self.tcn = None
        if tcn_path:
            self.tcn = TCN(input_size=6, num_channels=[128, 128, 128], kernel_size=3, dropout=0.2)
            self.tcn.load_state_dict(torch.load(tcn_path, map_location=self.device))
            self.tcn.eval()
            self.tcn.to(self.device)
        for i in range(3):
            agent = RetroAgent(i, self, allow_collisions)
            self.schedule.add(agent)

    def step(self):
        self.schedule.step()
        agent0 = next(agent for agent in self.schedule.agents if agent.unique_id == 0)
        others_pos = [agent.pos for agent in self.schedule.agents if agent.unique_id != 0]
        next_pos = agent0.get_new_position(random.choice(['up', 'down', 'left', 'right', 'stay']))
        collision = next_pos in others_pos
        if collision:
            self.agent0_collisions += 1
        for agent in self.schedule.agents:
            self.data.append({
                'agent_id': agent.unique_id,
                'step': self.schedule.steps,
                'pos': agent.pos,
                'rel_pos': agent.get_relative_positions(),
                'collision': collision and agent.unique_id == 0
            })

    def run_simulation(self, steps=7):
        self.data = []
        self.agent0_collisions = 0
        for _ in range(steps):
            self.step()
        return self.data, self.agent0_collisions

def collect_mixed_data(num_sims_collision_prone=600, num_sims_collision_free=400, steps=7):
    all_data = []
    total_collisions = 0
    notebook_dir = pathlib.Path.cwd()
    mixed_data_dir = notebook_dir / 'MixedData'
    mixed_data_dir.mkdir(exist_ok=True)
    pickle_path = mixed_data_dir / 'abm_mixed_3agents_5x5_collisions.pkl'
    for _ in range(num_sims_collision_free):
        model = RetroModel(allow_collisions=False)
        data, collisions = model.run_simulation(steps)
        all_data.extend(data)
        total_collisions += collisions
    for _ in range(num_sims_collision_prone):
        model = RetroModel(allow_collisions=True)
        data, collisions = model.run_simulation(steps)
        all_data.extend(data)
        total_collisions += collisions
    with open(pickle_path, 'wb') as f:
        pickle.dump(all_data, f)
    return all_data, total_collisions, pickle_path

def compare_rule_effects(num_sims=100, steps=7, tcn_path='MixedData/tcn_model_best.pth'):
    collisions_no_rule, collisions_with_rule = 0, 0
    for _ in range(num_sims):
        model_no_rule = RetroModel(allow_collisions=True)
        _, c1 = model_no_rule.run_simulation(steps)
        collisions_no_rule += c1
        model_with_rule = RetroModel(allow_collisions=True, tcn_path=tcn_path)
        _, c2 = model_with_rule.run_simulation(steps)
        collisions_with_rule += c2
    avg_collisions_no_rule = collisions_no_rule / num_sims
    avg_collisions_with_rule = collisions_with_rule / num_sims
    print(f"No-rule average collisions: {avg_collisions_no_rule:.2f} per simulation")
    print(f"With-rule average collisions: {avg_collisions_with_rule:.2f} per simulation")
    print(f"Total collisions (no-rule): {collisions_no_rule} in {num_sims * steps} steps")
    print(f"Total collisions (with-rule): {collisions_with_rule} in {num_sims * steps} steps")
    return collisions_no_rule, collisions_with_rule

if __name__ == "__main__":
    random.seed(42)
    torch.manual_seed(42)
    # Optionally regenerate data to ensure consistency
    data, total_collisions, pickle_path = collect_mixed_data()
    print(f"Generated dataset: {total_collisions} collisions, saved!")
    # Run comparison
    collisions_no_rule, collisions_with_rule = compare_rule_effects()

Generated dataset: 1067 collisions, saved!
No-rule average collisions: 1.39 per simulation
With-rule average collisions: 0.90 per simulation
Total collisions (no-rule): 139 in 700 steps
Total collisions (with-rule): 90 in 700 steps


In [6]:
from sklearn.metrics import precision_score, recall_score

def evaluate_tcn_predictions(model, steps=7):
    true_labels, pred_labels = [], []
    for _ in range(steps):
        model.step()
        if model.tcn:
            agent0 = next(agent for agent in model.schedule.agents if agent.unique_id == 0)
            pos_list = list(agent0.pos)
            rel_pos = agent0.get_relative_positions()
            input_seq = [pos_list + rel_pos] * 5
            input_seq = [[x / 4.0 for x in input_seq[0]]] * 5
            input_tensor = torch.tensor([input_seq], dtype=torch.float32).transpose(1, 2).to(model.device)
            with torch.no_grad():
                collision_prob = torch.sigmoid(model.tcn(input_tensor)).item()
            pred_labels.append(1 if collision_prob > 0.5 else 0)
            true_labels.append(1 if model.data[-1]['collision'] else 0)
    precision = precision_score(true_labels, pred_labels, zero_division=0)
    recall = recall_score(true_labels, pred_labels, zero_division=0)
    print(f"TCN Precision: {precision:.4f}, Recall: {recall:.4f}")
    return precision, recall

# Run evaluation
model = RetroModel(allow_collisions=True, tcn_path='MixedData/tcn_model_best.pth')
precision, recall = evaluate_tcn_predictions(model)

TCN Precision: 0.0000, Recall: 0.0000
