In [1]:
# Download Codebase from GitHub
import os
import subprocess
import sys
from pathlib import Path

# Repository information
REPO_URL = "https://github.com/yimianxyz/homepage.git"
BRANCH = "neuro-predator"
REPO_DIR = "homepage"

def download_codebase():
    """Download the codebase from GitHub if not already present"""

    if os.path.exists(REPO_DIR):
        print(f"Repository directory '{REPO_DIR}' already exists.")

        # Check if it's the correct repository and branch
        try:
            os.chdir(REPO_DIR)

            # Check current branch
            result = subprocess.run(['git', 'branch', '--show-current'],
                                  capture_output=True, text=True, check=True)
            current_branch = result.stdout.strip()

            if current_branch != BRANCH:
                print(f"Switching to branch '{BRANCH}'...")
                subprocess.run(['git', 'checkout', BRANCH], check=True)

            # Pull latest changes
            print("Updating repository...")
            subprocess.run(['git', 'pull', 'origin', BRANCH], check=True)

            print(f"✅ Repository updated successfully!")

        except subprocess.CalledProcessError as e:
            print(f"❌ Error updating repository: {e}")
            print("Repository directory exists but may not be a valid git repository.")

        except Exception as e:
            print(f"❌ Error: {e}")

    else:
        print(f"Cloning repository from {REPO_URL} (branch: {BRANCH})...")

        try:
            # Clone the specific branch
            subprocess.run(['git', 'clone', '-b', BRANCH, REPO_URL, REPO_DIR], check=True)

            print(f"✅ Repository cloned successfully!")

            # Change to repository directory
            os.chdir(REPO_DIR)

        except subprocess.CalledProcessError as e:
            print(f"❌ Error cloning repository: {e}")
            print("Make sure you have git installed and internet connection.")
            return False

        except Exception as e:
            print(f"❌ Error: {e}")
            return False

    # Verify key files exist
    key_files = [
        'config/constants.py',
        'data_generation/generate_training_data.py',
        'export_to_js.py',
        'policy/transformer/transformer_policy.js',
        'simulation/processors/input_processor.py'
    ]

    missing_files = []
    for file_path in key_files:
        if not os.path.exists(file_path):
            missing_files.append(file_path)

    if missing_files:
        print(f"⚠️  Warning: Some key files are missing:")
        for file_path in missing_files:
            print(f"  - {file_path}")
        return False

    print(f"✅ All key files found!")
    print(f"📁 Working directory: {os.getcwd()}")

    return True

# Download the codebase
success = download_codebase()

if success:
    print("\n🎉 Setup complete! You can now run the training notebook.")
    print("📖 Repository structure:")

    # Show repository structure
    for root, dirs, files in os.walk('.'):
        # Skip hidden directories and __pycache__
        dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__']
        level = root.replace('.', '').count(os.sep)
        indent = ' ' * 2 * level
        print(f"{indent}{os.path.basename(root)}/")

        # Only show first level of files to avoid clutter
        if level < 2:
            subindent = ' ' * 2 * (level + 1)
            for file in files:
                if not file.startswith('.') and not file.endswith('.pyc'):
                    print(f"{subindent}{file}")

        # Limit depth to avoid too much output
        if level >= 2:
            break

else:
    print("❌ Setup failed. Please check the errors above and try again.")
    print("Manual setup: git clone -b neuro-predator https://github.com/yimianxyz/homepage.git")


Cloning repository from https://github.com/yimianxyz/homepage.git (branch: neuro-predator)...
✅ Repository cloned successfully!
✅ All key files found!
📁 Working directory: /content/homepage

🎉 Setup complete! You can now run the training notebook.
📖 Repository structure:
./
  LICENSE
  export_to_js.py
  transformer_training.ipynb
  playground.css
  index.html
  styles.css
  shared.css
  playground.html
  data_generation/
    generate_training_data.py
  simulation/
    __init__.py
    random_state_generator/


In [2]:
# Verify Setup and Import Project Modules
import sys
from pathlib import Path

# Ensure we're in the correct directory and add to Python path
project_root = Path.cwd()
if project_root.name != 'homepage':
    print(f"⚠️  Warning: Current directory is '{project_root.name}', expected 'homepage'")
    print("Make sure the first cell downloaded the repository correctly.")

if str(project_root) not in sys.path:
    sys.path.append(str(project_root))

# Import project modules
try:
    from config.constants import CONSTANTS
    print(f"✅ Successfully imported simulation constants")
    print(f"📁 Project root: {project_root}")
    print(f"🔧 Key constants: MAX_DISTANCE={CONSTANTS.MAX_DISTANCE}, BOID_MAX_SPEED={CONSTANTS.BOID_MAX_SPEED}")
except ImportError as e:
    print(f"❌ Failed to import constants: {e}")
    print("Make sure the repository was downloaded correctly in the first cell.")
    raise


✅ Successfully imported simulation constants
📁 Project root: /content/homepage
🔧 Key constants: MAX_DISTANCE=2000, BOID_MAX_SPEED=3.5


In [3]:
# Architecture Configuration
import torch
import torch.nn as nn
import torch.optim as optim
import json
import numpy as np
from pathlib import Path
import matplotlib.pyplot as plt
from datetime import datetime
import random
from torch.utils.data import Dataset, DataLoader
from typing import Dict, List, Any, Tuple

# Architecture constants - modify these as needed
D_MODEL = 128
N_HEADS = 8
N_LAYERS = 4
FFN_HIDDEN = 512
DROPOUT = 0.1
MAX_BOIDS = 50  # Maximum number of boids to handle

# Training constants
BATCH_SIZE = 256
LEARNING_RATE = 1e-4
WEIGHT_DECAY = 0.01
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"Configuration:")
print(f"  Architecture: d_model={D_MODEL}, n_heads={N_HEADS}, n_layers={N_LAYERS}, ffn_hidden={FFN_HIDDEN}")
print(f"  Training: batch_size={BATCH_SIZE}, lr={LEARNING_RATE}, device={DEVICE}")
print(f"  Max boids: {MAX_BOIDS}")

# Import project modules
from config.constants import CONSTANTS
print(f"✓ Simulation constants loaded: MAX_DISTANCE={CONSTANTS.MAX_DISTANCE}")


Configuration:
  Architecture: d_model=128, n_heads=8, n_layers=4, ffn_hidden=512
  Training: batch_size=256, lr=0.0001, device=cuda
  Max boids: 50
✓ Simulation constants loaded: MAX_DISTANCE=2000


In [4]:
# Transformer Model Definition
class GEGLU(nn.Module):
    def forward(self, x):
        x, gate = x.chunk(2, dim=-1)
        return x * torch.nn.functional.gelu(gate)

class TransformerLayer(nn.Module):
    def __init__(self, d_model, n_heads, ffn_hidden, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads

        self.norm1 = nn.LayerNorm(d_model)
        self.self_attn = nn.MultiheadAttention(d_model, n_heads, dropout=dropout, batch_first=True)

        self.norm2 = nn.LayerNorm(d_model)

        # GEGLU FFN with separate projections for export compatibility
        self.ffn_gate_proj = nn.Linear(d_model, ffn_hidden)
        self.ffn_up_proj = nn.Linear(d_model, ffn_hidden)
        self.ffn_down_proj = nn.Linear(ffn_hidden, d_model)

    def forward(self, x, padding_mask=None):
        # Self-attention with residual
        normed = self.norm1(x)
        attn_out, _ = self.self_attn(normed, normed, normed, key_padding_mask=padding_mask)
        x = x + attn_out

        # FFN with residual
        normed = self.norm2(x)
        gate = torch.nn.functional.gelu(self.ffn_gate_proj(normed))
        up = self.ffn_up_proj(normed)
        ffn_out = self.ffn_down_proj(gate * up)
        x = x + ffn_out

        return x

class TransformerPredictor(nn.Module):
    def __init__(self, d_model=48, n_heads=4, n_layers=2, ffn_hidden=96, max_boids=50, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.n_heads = n_heads
        self.n_layers = n_layers
        self.ffn_hidden = ffn_hidden
        self.max_boids = max_boids

        # CLS token embedding
        self.cls_embedding = nn.Parameter(torch.randn(d_model))

        # Type embeddings
        self.type_embeddings = nn.ParameterDict({
            'cls': nn.Parameter(torch.randn(d_model)),
            'ctx': nn.Parameter(torch.randn(d_model)),
            'predator': nn.Parameter(torch.randn(d_model)),
            'boid': nn.Parameter(torch.randn(d_model))
        })

        # Input projections
        self.ctx_projection = nn.Linear(2, d_model)  # canvas_width, canvas_height
        self.predator_projection = nn.Linear(4, d_model)  # velX, velY, 0, 0 (padded to 4D)
        self.boid_projection = nn.Linear(4, d_model)  # relX, relY, velX, velY

        # Transformer layers
        self.transformer_layers = nn.ModuleList([
            TransformerLayer(d_model, n_heads, ffn_hidden, dropout)
            for _ in range(n_layers)
        ])

        # Output projection
        self.output_projection = nn.Linear(d_model, 2)  # predator action [x, y]

    def forward(self, structured_inputs, padding_mask=None):
        batch_size = len(structured_inputs) if isinstance(structured_inputs, list) else 1

        # Handle single sample vs batch
        if isinstance(structured_inputs, dict):
            structured_inputs = [structured_inputs]
            batch_size = 1

        # Build token sequences for each sample in batch
        sequences = []
        masks = []

        for sample in structured_inputs:
            tokens = []

            # CLS token
            cls_token = self.cls_embedding + self.type_embeddings['cls']
            tokens.append(cls_token)

            # Context token
            ctx_input = torch.tensor([sample['context']['canvasWidth'], sample['context']['canvasHeight']],
                                   dtype=torch.float32, device=self.cls_embedding.device)
            ctx_token = self.ctx_projection(ctx_input) + self.type_embeddings['ctx']
            tokens.append(ctx_token)

            # Predator token - expand to 4D
            predator_input = torch.tensor([sample['predator']['velX'], sample['predator']['velY'], 0.0, 0.0],
                                        dtype=torch.float32, device=self.cls_embedding.device)
            predator_token = self.predator_projection(predator_input) + self.type_embeddings['predator']
            tokens.append(predator_token)

            # Boid tokens
            sample_mask = [False, False, False]  # CLS, CTX, Predator are not padding

            for boid in sample['boids']:
                boid_input = torch.tensor([boid['relX'], boid['relY'], boid['velX'], boid['velY']],
                                        dtype=torch.float32, device=self.cls_embedding.device)
                boid_token = self.boid_projection(boid_input) + self.type_embeddings['boid']
                tokens.append(boid_token)
                sample_mask.append(False)

            # Pad to max_boids + 3 (CLS + CTX + Predator)
            while len(tokens) < self.max_boids + 3:
                padding_token = torch.zeros(self.d_model, device=self.cls_embedding.device)
                tokens.append(padding_token)
                sample_mask.append(True)  # Mark as padding

            sequences.append(torch.stack(tokens))
            masks.append(sample_mask)

        # Stack sequences
        x = torch.stack(sequences)  # [batch_size, seq_len, d_model]

        # Create padding mask
        if padding_mask is None:
            padding_mask = torch.tensor(masks, dtype=torch.bool, device=x.device)

        # Pass through transformer layers
        for layer in self.transformer_layers:
            x = layer(x, padding_mask)

        # Extract CLS token and project to output
        cls_output = x[:, 0]  # [batch_size, d_model]
        action = self.output_projection(cls_output)  # [batch_size, 2]

        # Apply tanh to ensure [-1, 1] range
        action = torch.tanh(action)

        return action.squeeze(0) if batch_size == 1 else action

print("✓ Model architecture defined")


✓ Model architecture defined


In [5]:
# Dataset Class
class BoidsDataset(Dataset):
    def __init__(self, data_path):
        print(f"Loading dataset from {data_path}...")
        with open(data_path, 'r') as f:
            data = json.load(f)

        self.samples = data['samples']
        self.metadata = data['metadata']

        print(f"✓ Loaded {len(self.samples)} samples")
        print(f"  Dataset metadata: {self.metadata['total_samples']} total samples")
        print(f"  Valid targets: {self.metadata['statistics']['valid_target_percentage']:.1f}%")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]

        # Input is the structured format
        inputs = sample['input']

        # Output is the action [x, y]
        target = torch.tensor(sample['output'], dtype=torch.float32)

        return inputs, target

print("✓ Dataset class defined")


✓ Dataset class defined


In [6]:
# Model Initialization
def create_model():
    model = TransformerPredictor(
        d_model=D_MODEL,
        n_heads=N_HEADS,
        n_layers=N_LAYERS,
        ffn_hidden=FFN_HIDDEN,
        max_boids=MAX_BOIDS,
        dropout=DROPOUT
    ).to(DEVICE)

    # Count parameters
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

    print(f"✓ Model created:")
    print(f"  Total parameters: {total_params:,}")
    print(f"  Trainable parameters: {trainable_params:,}")
    print(f"  Architecture: {D_MODEL}×{N_HEADS}×{N_LAYERS}×{FFN_HIDDEN}")

    return model

# Create fresh model
model = create_model()


✓ Model created:
  Total parameters: 1,059,842
  Trainable parameters: 1,059,842
  Architecture: 128×8×4×512


In [7]:
# Load Model from Checkpoint (optional)
def load_checkpoint_and_update_arch(checkpoint_path, optimizer=None):
    global D_MODEL, N_HEADS, N_LAYERS, FFN_HIDDEN, MAX_BOIDS, model

    print(f"Loading checkpoint from {checkpoint_path}...")
    checkpoint = torch.load(checkpoint_path, map_location=DEVICE)

    # Extract and update architecture parameters from checkpoint
    if 'architecture' in checkpoint:
        arch = checkpoint['architecture']
        D_MODEL = arch.get('d_model', D_MODEL)
        N_HEADS = arch.get('n_heads', N_HEADS)
        N_LAYERS = arch.get('n_layers', N_LAYERS)
        FFN_HIDDEN = arch.get('ffn_hidden', FFN_HIDDEN)
        MAX_BOIDS = arch.get('max_boids', MAX_BOIDS)

        print(f"✓ Updated architecture from checkpoint:")
        print(f"  d_model: {D_MODEL}")
        print(f"  n_heads: {N_HEADS}")
        print(f"  n_layers: {N_LAYERS}")
        print(f"  ffn_hidden: {FFN_HIDDEN}")
        print(f"  max_boids: {MAX_BOIDS}")
    else:
        print("Warning: No architecture info in checkpoint, using current values")

    # Recreate model with correct architecture
    model = TransformerPredictor(
        d_model=D_MODEL,
        n_heads=N_HEADS,
        n_layers=N_LAYERS,
        ffn_hidden=FFN_HIDDEN,
        max_boids=MAX_BOIDS,
        dropout=DROPOUT
    ).to(DEVICE)

    # Load model state
    model.load_state_dict(checkpoint['model_state_dict'])

    # Update optimizer if provided
    if optimizer and 'optimizer_state_dict' in checkpoint:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

    epoch = checkpoint.get('epoch', 0)
    best_val_loss = checkpoint.get('best_val_loss', float('inf'))

    print(f"✓ Loaded checkpoint from epoch {epoch}, best val loss: {best_val_loss:.6f}")
    return epoch, best_val_loss, model

# Uncomment to load from checkpoint
# checkpoint_path = "checkpoints/best_model.pt"
# if os.path.exists(checkpoint_path):
#     start_epoch, best_val_loss, model = load_checkpoint_and_update_arch(checkpoint_path)
#     print(f"✓ Model recreated with checkpoint architecture")
# else:
#     print(f"Checkpoint not found: {checkpoint_path}")
#     start_epoch, best_val_loss = 0, float('inf')

start_epoch, best_val_loss = 0, float('inf')
print(f"✓ Starting from epoch {start_epoch}")


✓ Starting from epoch 0


In [8]:
# Generate Training Data (if needed)
def generate_training_data(num_samples=10000, output_path="training_data.json"):
    print(f"Generating {num_samples} training samples...")
    import subprocess
    result = subprocess.run([
        sys.executable, "data_generation/generate_training_data.py",
        "--samples", str(num_samples),
        "--output", output_path
    ], capture_output=True, text=True)

    if result.returncode == 0:
        print(f"✓ Training data generated: {output_path}")
        return True
    else:
        print(f"❌ Error generating training data:")
        print(result.stderr)
        return False

# Load Training Data
data_path = "training_data.json"

if not os.path.exists(data_path):
    print(f"Training data not found at {data_path}")
    print("Generating training data...")
    success = generate_training_data(1000000, data_path)
    if not success:
        print("Failed to generate training data")
        train_loader = val_loader = None

if os.path.exists(data_path):
    try:
        dataset = BoidsDataset(data_path)

        # Split into train/val
        train_size = int(0.8 * len(dataset))
        val_size = len(dataset) - train_size
        train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

        # Create data loaders
        train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
        val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

        print(f"✓ Data loaded:")
        print(f"  Train samples: {len(train_dataset)}")
        print(f"  Val samples: {len(val_dataset)}")
        print(f"  Batch size: {BATCH_SIZE}")

    except Exception as e:
        print(f"❌ Error loading data: {e}")
        train_loader = val_loader = None
else:
    train_loader = val_loader = None


Training data not found at training_data.json
Generating training data...
Generating 1000000 training samples...
✓ Training data generated: training_data.json
Loading dataset from training_data.json...
✓ Loaded 1000000 samples
  Dataset metadata: 1000000 total samples
  Valid targets: 100.0%
✓ Data loaded:
  Train samples: 800000
  Val samples: 200000
  Batch size: 256


In [9]:
# Training Setup and Functions
def setup_training(model):
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
    criterion = nn.MSELoss()

    print(f"✓ Training setup:")
    print(f"  Optimizer: AdamW (lr={LEARNING_RATE}, weight_decay={WEIGHT_DECAY})")
    print(f"  Scheduler: ReduceLROnPlateau (factor=0.5, patience=5)")
    print(f"  Criterion: MSELoss")

    return optimizer, scheduler, criterion

def train_epoch(model, train_loader, optimizer, criterion, epoch):
    model.train()
    total_loss = 0
    num_batches = len(train_loader)

    for batch_idx, (inputs, targets) in enumerate(train_loader):
        targets = targets.to(DEVICE)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)

        # Backward pass
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if batch_idx % 100 == 0:
            print(f'Epoch {epoch}, Batch {batch_idx}/{num_batches}, Loss: {loss.item():.6f}')

    avg_loss = total_loss / num_batches
    return avg_loss

def validate_epoch(model, val_loader, criterion):
    model.eval()
    total_loss = 0
    num_batches = len(val_loader)

    with torch.no_grad():
        for inputs, targets in val_loader:
            targets = targets.to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            total_loss += loss.item()

    avg_loss = total_loss / num_batches
    return avg_loss

def save_checkpoint(model, optimizer, epoch, val_loss, is_best=False):
    os.makedirs("checkpoints", exist_ok=True)

    checkpoint = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'val_loss': val_loss,
        'best_val_loss': best_val_loss,
        'architecture': {
            'd_model': D_MODEL,
            'n_heads': N_HEADS,
            'n_layers': N_LAYERS,
            'ffn_hidden': FFN_HIDDEN,
            'max_boids': MAX_BOIDS
        },
        'timestamp': datetime.now().isoformat()
    }

    # Save epoch checkpoint
    epoch_path = f"checkpoints/model_epoch_{epoch}.pt"
    torch.save(checkpoint, epoch_path)
    print(f"✓ Saved checkpoint: {epoch_path}")

    # Save best model
    if is_best:
        best_path = "checkpoints/best_model.pt"
        torch.save(checkpoint, best_path)
        print(f"✓ Saved best model: {best_path}")

    return epoch_path

# Setup training
optimizer, scheduler, criterion = setup_training(model)
print("✓ Training components ready")


✓ Training setup:
  Optimizer: AdamW (lr=0.0001, weight_decay=0.01)
  Scheduler: ReduceLROnPlateau (factor=0.5, patience=5)
  Criterion: MSELoss
✓ Training components ready




In [None]:
# Training Loop
def custom_collate_fn(batch):
    """Custom collate function for structured inputs with variable-length boids"""
    inputs = []
    targets = []

    for sample_input, sample_target in batch:
        inputs.append(sample_input)  # Keep structured format
        targets.append(sample_target)

    # Stack targets into a tensor
    targets = torch.stack(targets)

    return inputs, targets

def train_model(model, train_loader, val_loader, optimizer, scheduler, criterion, num_epochs=50):
    global best_val_loss

    train_losses = []
    val_losses = []

    print(f"🚀 Starting training for {num_epochs} epochs...")

    for epoch in range(start_epoch, start_epoch + num_epochs):
        print(f"\nEpoch {epoch+1}/{start_epoch + num_epochs}")
        print("-" * 50)

        # Train
        train_loss = train_epoch(model, train_loader, optimizer, criterion, epoch+1)
        train_losses.append(train_loss)

        # Validate
        val_loss = validate_epoch(model, val_loader, criterion)
        val_losses.append(val_loss)

        # Learning rate scheduling
        scheduler.step(val_loss)

        print(f"Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")

        # Save checkpoint
        is_best = val_loss < best_val_loss
        if is_best:
            best_val_loss = val_loss
            print(f"🎯 New best validation loss: {best_val_loss:.6f}")

        # Save every 5 epochs or if best
        if (epoch + 1) % 5 == 0 or is_best:
            save_checkpoint(model, optimizer, epoch + 1, val_loss, is_best)

    return train_losses, val_losses

# Create data loaders with custom collate function
if os.path.exists(data_path):
    dataset = BoidsDataset(data_path)

    # Split into train/val
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

    # Create data loaders with custom collate function
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                             num_workers=0, collate_fn=custom_collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,
                           num_workers=0, collate_fn=custom_collate_fn)

    print(f"Data loaded:")
    print(f"  Train samples: {len(train_dataset)}")
    print(f"  Val samples: {len(val_dataset)}")
    print(f"  Batch size: {BATCH_SIZE}")
    print(f"  Custom collate function: ✓")

# Run training (modify NUM_EPOCHS as needed)
if train_loader is not None and val_loader is not None:
    NUM_EPOCHS = 20  # Adjust as needed
    train_losses, val_losses = train_model(model, train_loader, val_loader, optimizer, scheduler, criterion, NUM_EPOCHS)
    print("✅ Training completed!")
else:
    print("❌ Cannot start training: data not loaded")
    train_losses = val_losses = None

Loading dataset from training_data.json...
✓ Loaded 1000000 samples
  Dataset metadata: 1000000 total samples
  Valid targets: 100.0%
Data loaded:
  Train samples: 800000
  Val samples: 200000
  Batch size: 256
  Custom collate function: ✓
🚀 Starting training for 20 epochs...

Epoch 1/20
--------------------------------------------------
Epoch 1, Batch 0/3125, Loss: 0.949517
Epoch 1, Batch 100/3125, Loss: 0.495001
Epoch 1, Batch 200/3125, Loss: 0.526856
Epoch 1, Batch 300/3125, Loss: 0.496496
Epoch 1, Batch 400/3125, Loss: 0.482994
Epoch 1, Batch 500/3125, Loss: 0.496084
Epoch 1, Batch 600/3125, Loss: 0.496468
Epoch 1, Batch 700/3125, Loss: 0.504417
Epoch 1, Batch 800/3125, Loss: 0.488186
Epoch 1, Batch 900/3125, Loss: 0.492478
Epoch 1, Batch 1000/3125, Loss: 0.389856
Epoch 1, Batch 1100/3125, Loss: 0.160098
Epoch 1, Batch 1200/3125, Loss: 0.155775
Epoch 1, Batch 1300/3125, Loss: 0.103763
Epoch 1, Batch 1400/3125, Loss: 0.081833
Epoch 1, Batch 1500/3125, Loss: 0.071068
Epoch 1, Batch 1

In [None]:
# Training Progress Visualization and Model Evaluation
def plot_training_progress(train_losses, val_losses):
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss', color='blue')
    plt.plot(val_losses, label='Val Loss', color='red')
    plt.xlabel('Epoch')
    plt.ylabel('MSE Loss')
    plt.title('Training Progress')
    plt.legend()
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.plot(val_losses, label='Val Loss', color='red')
    plt.xlabel('Epoch')
    plt.ylabel('Validation MSE Loss')
    plt.title('Validation Loss')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    print(f"📊 Training Summary:")
    print(f"  Final train loss: {train_losses[-1]:.6f}")
    print(f"  Final val loss: {val_losses[-1]:.6f}")
    print(f"  Best val loss: {min(val_losses):.6f}")

def evaluate_model(model, val_loader, num_samples=100):
    model.eval()

    predictions = []
    targets = []

    with torch.no_grad():
        count = 0
        for inputs, batch_targets in val_loader:
            batch_targets = batch_targets.to(DEVICE)
            batch_outputs = model(inputs)

            predictions.extend(batch_outputs.cpu().numpy())
            targets.extend(batch_targets.cpu().numpy())

            count += len(batch_targets)
            if count >= num_samples:
                break

    predictions = np.array(predictions[:num_samples])
    targets = np.array(targets[:num_samples])

    # Calculate metrics
    mse = np.mean((predictions - targets) ** 2)
    mae = np.mean(np.abs(predictions - targets))

    print(f"📈 Evaluation on {len(predictions)} samples:")
    print(f"  MSE: {mse:.6f}")
    print(f"  MAE: {mae:.6f}")

    # Plot predictions vs targets
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.scatter(targets[:, 0], predictions[:, 0], alpha=0.5, color='blue')
    plt.plot([-1, 1], [-1, 1], 'r--', label='Perfect prediction')
    plt.xlabel('Target X')
    plt.ylabel('Predicted X')
    plt.title('X Component Prediction')
    plt.legend()
    plt.grid(True)

    plt.subplot(1, 2, 2)
    plt.scatter(targets[:, 1], predictions[:, 1], alpha=0.5, color='green')
    plt.plot([-1, 1], [-1, 1], 'r--', label='Perfect prediction')
    plt.xlabel('Target Y')
    plt.ylabel('Predicted Y')
    plt.title('Y Component Prediction')
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.show()

    return mse, mae

# Plot training progress and evaluate model
if train_losses is not None and val_losses is not None:
    plot_training_progress(train_losses, val_losses)

if val_loader is not None:
    mse, mae = evaluate_model(model, val_loader)
else:
    print("❌ Cannot evaluate model: validation data not available")


In [None]:
# Model Export and Testing
def export_to_js(model, output_path="policy/transformer/models/trained_model.js"):
    print(f"🔄 Exporting model to JavaScript format...")

    # Save PyTorch checkpoint first
    checkpoint_path = "checkpoints/export_checkpoint.pt"
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'epoch': 'export',
        'architecture': {
            'd_model': D_MODEL,
            'n_heads': N_HEADS,
            'n_layers': N_LAYERS,
            'ffn_hidden': FFN_HIDDEN
        }
    }
    torch.save(checkpoint, checkpoint_path)
    print(f"✓ Saved checkpoint for export: {checkpoint_path}")

    # Use export_to_js.py script
    import subprocess
    result = subprocess.run([
        sys.executable, "export_to_js.py",
        "--checkpoint", checkpoint_path,
        "--output", output_path
    ], capture_output=True, text=True)

    if result.returncode == 0:
        print(f"✅ Model exported to: {output_path}")
        print("🎉 You can now use this model in the browser simulation!")
        print(result.stdout)
    else:
        print(f"❌ Export failed:")
        print(result.stderr)

    return result.returncode == 0

def test_model_inference(model):
    model.eval()

    # Create a test input
    test_input = {
        'context': {'canvasWidth': 0.8, 'canvasHeight': 0.6},
        'predator': {'velX': 0.1, 'velY': -0.2},
        'boids': [
            {'relX': 0.1, 'relY': 0.3, 'velX': 0.5, 'velY': -0.1},
            {'relX': -0.2, 'relY': 0.1, 'velX': -0.3, 'velY': 0.4}
        ]
    }

    with torch.no_grad():
        output = model(test_input)

    print(f"🧪 Test inference:")
    print(f"  Input: {len(test_input['boids'])} boids")
    print(f"  Output: [{output[0]:.4f}, {output[1]:.4f}]")
    print(f"  Output range: [{output.min():.4f}, {output.max():.4f}]")
    print(f"  ✓ Output is properly bounded in [-1, 1]")

    return output

# Test the model
test_output = test_model_inference(model)

# Export the trained model
export_success = export_to_js(model)

if export_success:
    print("\n" + "="*60)
    print("🎯 TRAINING PIPELINE COMPLETED SUCCESSFULLY!")
    print("="*60)
    print("✅ Model trained and exported to JavaScript")
    print("✅ Ready to use in the browser simulation")
    print("✅ Check the playground.html to test your model")
    print("="*60)
else:
    print("\n❌ Export failed - model trained but not exported")


In [None]:
# Quick Test of Model
def test_model_inference(model):
    model.eval()

    # Create a test input
    test_input = {
        'context': {'canvasWidth': 0.8, 'canvasHeight': 0.6},
        'predator': {'velX': 0.1, 'velY': -0.2},
        'boids': [
            {'relX': 0.1, 'relY': 0.3, 'velX': 0.5, 'velY': -0.1},
            {'relX': -0.2, 'relY': 0.1, 'velX': -0.3, 'velY': 0.4}
        ]
    }

    with torch.no_grad():
        output = model(test_input)

    print(f"Test inference:")
    print(f"  Input: {len(test_input['boids'])} boids")
    print(f"  Output: [{output[0]:.4f}, {output[1]:.4f}]")
    print(f"  Output range: [{output.min():.4f}, {output.max():.4f}]")

    return output

# Test the model
test_output = test_model_inference(model)
