In [None]:
# üß† The Autonomous Colony - Advanced RL

## Part 4: Meta-Learning, World Models, Hierarchical RL, and Curiosity

### RL Concepts Covered:
1. **Meta-RL** / Learning to Learn (MAML-style)
2. **World Models** (simplified DreamerV3)
3. **Hierarchical RL** (Options Framework)
4. **Intrinsic Motivation** (Curiosity/ICM)
5. **Offline RL** / Imitation Learning
6. **Curriculum Learning**
7. **Model-based planning**

### Prerequisites:
- Parts 1-3 (environment, agents, multi-agent)

## ‚úÖ Advanced RL Techniques Ready!

**Key Concepts Implemented:**
- ‚úì World Models for model-based planning
- ‚úì Curiosity-driven exploration (ICM)
- ‚úì Hierarchical RL with Options framework
- ‚úì Meta-learning for fast adaptation
- ‚úì Curriculum learning for efficient training

**These advanced techniques can significantly improve:**
- Sample efficiency
- Exploration in sparse reward environments
- Transfer learning across tasks
- Long-horizon planning and decision making

**Combine these with agents from Parts 2-3 for state-of-the-art performance!**

In [None]:
class CurriculumScheduler:
    """
    Automatically adjust environment difficulty based on agent performance.
    """
    
    def __init__(self, initial_difficulty=0.3, success_threshold=0.7, window_size=20):
        self.difficulty = initial_difficulty  # 0.0 to 1.0
        self.success_threshold = success_threshold
        self.window_size = window_size
        self.recent_results = deque(maxlen=window_size)
        
        print(f"‚úì Curriculum Scheduler initialized (difficulty={initial_difficulty})")
    
    def record_episode(self, success: bool):
        """Record episode outcome"""
        self.recent_results.append(1.0 if success else 0.0)
    
    def update_difficulty(self):
        """Adjust difficulty based on recent performance"""
        if len(self.recent_results) < self.window_size:
            return self.difficulty
        
        success_rate = np.mean(self.recent_results)
        
        if success_rate > self.success_threshold + 0.1:
            # Too easy, increase difficulty
            self.difficulty = min(1.0, self.difficulty + 0.05)
            print(f"üìà Increasing difficulty to {self.difficulty:.2f} (success rate: {success_rate:.2%})")
        elif success_rate < self.success_threshold - 0.1:
            # Too hard, decrease difficulty
            self.difficulty = max(0.1, self.difficulty - 0.05)
            print(f"üìâ Decreasing difficulty to {self.difficulty:.2f} (success rate: {success_rate:.2%})")
        
        return self.difficulty
    
    def get_env_config(self, base_config):
        """Generate environment config based on current difficulty"""
        config = copy.copy(base_config)
        
        # Adjust parameters based on difficulty
        config.obstacle_density = 0.05 + (0.15 * self.difficulty)
        config.food_spawn_rate = 0.03 - (0.015 * self.difficulty)
        config.energy_decay = 0.05 + (0.15 * self.difficulty)
        
        return config

print("‚úì Curriculum Learning scheduler defined")

## 5Ô∏è‚É£ Curriculum Learning

Progressively increase task difficulty for more efficient learning.

**Key Concepts:**
- Automatic difficulty adjustment
- Success-based progression
- Staged learning

In [None]:
class MetaLearner:
    """
    Simplified meta-learning for fast adaptation.
    Based on MAML principles.
    """
    
    def __init__(self, model, meta_lr=1e-3, inner_lr=1e-2):
        self.model = model
        self.meta_optimizer = optim.Adam(model.parameters(), lr=meta_lr)
        self.inner_lr = inner_lr
        
        print("‚úì Meta-Learner initialized")
    
    def inner_loop(self, task_data, n_steps=5):
        """Adapt to specific task (inner loop)"""
        # Clone model for task-specific adaptation
        adapted_model = copy.deepcopy(self.model)
        optimizer = optim.SGD(adapted_model.parameters(), lr=self.inner_lr)
        
        for _ in range(n_steps):
            # Task-specific training step
            loss = self.compute_task_loss(adapted_model, task_data)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        return adapted_model
    
    def meta_update(self, task_batch):
        """Meta-learning outer loop across tasks"""
        meta_loss = 0
        
        for task_data in task_batch:
            # Adapt to task
            adapted_model = self.inner_loop(task_data['train'])
            
            # Evaluate on task test set
            task_loss = self.compute_task_loss(adapted_model, task_data['test'])
            meta_loss += task_loss
        
        # Update meta-model
        self.meta_optimizer.zero_grad()
        meta_loss.backward()
        self.meta_optimizer.step()
        
        return meta_loss.item()
    
    def compute_task_loss(self, model, data):
        """Placeholder for task-specific loss"""
        # Implementation depends on specific task
        return torch.tensor(0.0, requires_grad=True)

print("‚úì Meta-Learning framework defined")

## 4Ô∏è‚É£ Meta-Learning (Learning to Learn)

Simplified meta-learning approach for fast adaptation to new tasks.

**Key Concepts:**
- MAML (Model-Agnostic Meta-Learning)
- Few-shot adaptation
- Task distribution learning

In [None]:
class Option:
    """A single option (skill) with initiation, policy, and termination"""
    
    def __init__(self, name: str, policy_fn, termination_fn, initiation_fn=None):
        self.name = name
        self.policy = policy_fn
        self.termination = termination_fn
        self.initiation = initiation_fn or (lambda s: True)
    
    def can_initiate(self, state):
        """Check if option can be initiated in this state"""
        return self.initiation(state)
    
    def get_action(self, state):
        """Get primitive action from option's policy"""
        return self.policy(state)
    
    def should_terminate(self, state):
        """Check if option should terminate"""
        return self.termination(state)

class HierarchicalAgent:
    """Agent that learns and executes options"""
    
    def __init__(self, options: List[Option]):
        self.options = options
        self.current_option = None
        self.option_history = []
        
        print(f"‚úì Hierarchical Agent with {len(options)} options")
    
    def select_option(self, state):
        """Select which option to execute (meta-policy)"""
        # Simple: random among available options
        available = [opt for opt in self.options if opt.can_initiate(state)]
        if available:
            return np.random.choice(available)
        return None
    
    def step(self, state):
        """Execute one step with hierarchical control"""
        # If no current option or option terminated, select new option
        if self.current_option is None or self.current_option.should_terminate(state):
            self.current_option = self.select_option(state)
            if self.current_option:
                self.option_history.append(self.current_option.name)
        
        # Get action from current option
        if self.current_option:
            return self.current_option.get_action(state)
        else:
            return 0  # Default action

# Example options
def explore_policy(state):
    return np.random.randint(0, 8)  # Random movement

def collect_policy(state):
    return 8  # Collect action

def explore_terminate(state):
    # Terminate if see resource nearby
    grid = state['grid']
    center = grid.shape[0] // 2
    local = grid[center-1:center+2, center-1:center+2, 1:3]  # Food/water channels
    return local.sum() > 0

def collect_terminate(state):
    # Terminate after collecting
    return True

explore_option = Option("explore", explore_policy, explore_terminate)
collect_option = Option("collect", collect_policy, collect_terminate)

print("‚úì Options Framework defined with example options")

## 3Ô∏è‚É£ Hierarchical RL (Options Framework)

Options framework for temporal abstraction and hierarchical policies.

**Key Concepts:**
- Temporal abstraction
- Options/Skills
- Hierarchical decision making

In [None]:
class CuriosityModule(nn.Module):
    """
    Intrinsic Curiosity Module (ICM) for exploration.
    Provides intrinsic reward based on prediction error.
    """
    
    def __init__(self, state_dim: int, action_dim: int, feature_dim: int = 64):
        super().__init__()
        
        # Feature encoder
        self.feature_net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, feature_dim)
        )
        
        # Inverse model: predict action from state transition
        self.inverse_net = nn.Sequential(
            nn.Linear(feature_dim * 2, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim)
        )
        
        # Forward model: predict next state features from current state and action
        self.forward_net = nn.Sequential(
            nn.Linear(feature_dim + action_dim, 128),
            nn.ReLU(),
            nn.Linear(128, feature_dim)
        )
    
    def forward(self, state, next_state, action):
        """Compute intrinsic reward (prediction error)"""
        # Encode states
        state_feat = self.feature_net(state)
        next_state_feat = self.feature_net(next_state)
        
        # Forward model prediction
        action_onehot = F.one_hot(action, num_classes=9).float()
        predicted_next_feat = self.forward_net(torch.cat([state_feat, action_onehot], dim=-1))
        
        # Intrinsic reward = prediction error
        intrinsic_reward = F.mse_loss(predicted_next_feat, next_state_feat, reduction='none').mean(dim=-1)
        
        # Inverse model loss (for learning)
        predicted_action_logits = self.inverse_net(torch.cat([state_feat, next_state_feat], dim=-1))
        
        return intrinsic_reward, predicted_action_logits

print("‚úì Curiosity Module (ICM) defined")

## 2Ô∏è‚É£ Curiosity-Driven Exploration (ICM)

Intrinsic Curiosity Module for exploration bonus based on prediction error.

**Key Concepts:**
- Intrinsic motivation
- Forward/inverse dynamics models
- Exploration bonuses

In [None]:
class WorldModel(nn.Module):
    """
    Simplified world model for planning.
    Learns to predict next state and reward given current state and action.
    """
    
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 128):
        super().__init__()
        
        # Transition model: predicts next state
        self.transition_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, state_dim)
        )
        
        # Reward model: predicts immediate reward
        self.reward_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
        
        # Done prediction
        self.done_net = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )
    
    def forward(self, state, action_onehot):
        """Predict next state, reward, and done flag"""
        x = torch.cat([state, action_onehot], dim=-1)
        
        next_state = self.transition_net(x)
        reward = self.reward_net(x)
        done_prob = self.done_net(x)
        
        return next_state, reward, done_prob
    
    def imagine_trajectory(self, initial_state, policy_fn, horizon: int = 10):
        """Imagine a trajectory using the world model for planning"""
        states = [initial_state]
        actions = []
        rewards = []
        
        state = initial_state
        for _ in range(horizon):
            action = policy_fn(state)
            actions.append(action)
            
            action_onehot = F.one_hot(action, num_classes=9).float()
            next_state, reward, done_prob = self.forward(state, action_onehot)
            
            states.append(next_state)
            rewards.append(reward)
            
            state = next_state
            
            if done_prob > 0.5:
                break
        
        return states, actions, rewards

print("‚úì World Model defined")

## 1Ô∏è‚É£ World Model (Model-Based RL)

Learn a dynamics model to predict next states and rewards, enabling planning.

**Key Concepts:**
- Model-based RL: learn environment dynamics
- Planning: use model to simulate trajectories
- Dyna-Q style: combine model learning with policy learning

In [None]:
# !pip install torch numpy matplotlib -q

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from typing import Dict, List, Tuple, Optional
from collections import deque
import copy

print("‚úì Advanced RL modules loaded")

## üì¶ Setup - Install Dependencies