<a target="_blank" href="https://colab.research.google.com/github/sonder-art/bandit_simulator/blob/main/bandit_sim.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from tqdm.notebook import tqdm
import random
from typing import Tuple, List, Dict, Callable, Optional, Union
import time


In [None]:
# Set seeds for reproducibility
np.random.seed(None)  # Use system entropy for true randomness
random.seed(None)  # Use system entropy for true randomness

In [None]:
# ================ BANDIT ENVIRONMENT CLASSES ================

class BanditEnvironment:
    """Base class for bandit environments with 2 arms."""
    
    def __init__(self, T: int = 100):
        """
        Initialize the bandit environment.
        
        Args:
            T (int): Number of turns in the game
        """
        self.T = T
        self.current_turn = 0
        self.p1 = None  # Probability of arm 1 (visible)
        self.p2 = None  # Probability of arm 2 (hidden)
        self.history = {
            'actions': [],
            'rewards': [],
            'p1': [],
            'p2': []
        }
    
    def reset(self, T: Optional[int] = None) -> None:
        """
        Reset the environment for a new game.
        
        Args:
            T (int, optional): Number of turns for the new game
        """
        if T is not None:
            self.T = T
        self.current_turn = 0
        self.history = {
            'actions': [],
            'rewards': [],
            'p1': [],
            'p2': []
        }
        self._initialize_probabilities()
    
    def _initialize_probabilities(self) -> None:
        """Initialize the probabilities for both arms."""
        raise NotImplementedError("Subclasses must implement this method")
    
    def _update_probabilities(self) -> None:
        """Update the probabilities based on the current turn."""
        raise NotImplementedError("Subclasses must implement this method")
    
    def step(self, action: int) -> float:
        """
        Take a step in the environment by selecting an arm.
        
        Args:
            action (int): The arm chosen (0 for arm 1, 1 for arm 2)
        
        Returns:
            float: The reward obtained
        """
        if self.current_turn >= self.T:
            raise ValueError("Game is over, please reset.")
        
        self._update_probabilities()
        
        # Record the current probabilities
        self.history['p1'].append(self.p1)
        self.history['p2'].append(self.p2)
        
        # Determine reward based on action
        if action == 0:  # Arm 1
            reward = 1.0 if np.random.random() < self.p1 else 0.0
        else:  # Arm 2
            reward = 1.0 if np.random.random() < self.p2 else 0.0
        
        # Update history
        self.history['actions'].append(action)
        self.history['rewards'].append(reward)
        
        # Increment turn
        self.current_turn += 1
        
        return reward
    
    def get_visible_info(self) -> Dict:
        """
        Get the visible information for the agent.
        
        Returns:
            Dict: Dictionary with visible information
        """
        return {
            'current_turn': self.current_turn,
            'total_turns': self.T,
            'p1': self.p1,
            'history': self.history
        }


class FixedBandit(BanditEnvironment):
    """Bandit with fixed probabilities throughout the game."""
    
    def _initialize_probabilities(self) -> None:
        """Initialize fixed probabilities for both arms."""
        self.p1 = np.random.uniform(0.01, 0.99)
        self.p2 = np.random.uniform(0.01, 0.99)
    
    def _update_probabilities(self) -> None:
        """No updates for fixed bandits."""
        pass


class PeriodicBandit(BanditEnvironment):
    """Bandit with probabilities that change every k turns."""
    
    def __init__(self, T: int = 100, k: int = 10):
        """
        Initialize the periodic bandit.
        
        Args:
            T (int): Number of turns
            k (int): Frequency of probability changes
        """
        super().__init__(T)
        self.k = k
    
    def _initialize_probabilities(self) -> None:
        """Initialize starting probabilities."""
        self.p1 = np.random.uniform(0.01, 0.99)
        self.p2 = np.random.uniform(0.01, 0.99)
    
    def _update_probabilities(self) -> None:
        """Update probabilities every k turns with completely new random values."""
        if self.current_turn % self.k == 0:
            self.p1 = np.random.uniform(0.01, 0.99)
            self.p2 = np.random.uniform(0.01, 0.99)


class DynamicBandit(BanditEnvironment):
    """Bandit with probabilities that change every turn."""
    
    def _initialize_probabilities(self) -> None:
        """Initialize starting probabilities."""
        self.p1 = np.random.uniform(0.01, 0.99)
        self.p2 = np.random.uniform(0.01, 0.99)
    
    def _update_probabilities(self) -> None:
        """Update probabilities every turn with completely new random values."""
        # Completely random new values
        self.p1 = np.random.uniform(0.01, 0.99)
        self.p2 = np.random.uniform(0.01, 0.99)


class FullyRandomBandit(BanditEnvironment):
    """Bandit where both arms are equally likely to be optimal, with random probabilities."""
    
    def _initialize_probabilities(self) -> None:
        """Initialize probabilities randomly for both arms."""
        self.p1 = np.random.uniform(0.01, 0.99)
        self.p2 = np.random.uniform(0.01, 0.99)
    
    def _update_probabilities(self) -> None:
        """
        Randomly reassign probabilities at certain turns to ensure
        no arm has a consistent advantage.
        """
        # Randomly reassign probabilities based on random chance
        if np.random.random() < 0.05:  # 5% chance per turn
            self.p1 = np.random.uniform(0.01, 0.99)
            self.p2 = np.random.uniform(0.01, 0.99)


# ================ EXPERIMENT RUNNER ================

def run_experiment(
    agent_func: Callable,
    n_games: int = 50,
    default_turns: int = 100,
    random_turns: bool = False,
    verbose: bool = True,
    env_type: str = "all"
) -> Dict:
    """
    Run an experiment with the given agent on all bandit environments.
    
    Args:
        agent_func (Callable): The agent function to use
        n_games (int): Number of games to play per environment
        default_turns (int): Default number of turns per game (used when random_turns=False)
        random_turns (bool): Whether to use random number of turns
        verbose (bool): Whether to show progress bar
        env_type (str): Type of environment to use ("all" or specific type)
    
    Returns:
        Dict: Results of the experiment
    """
    # Force different random seed each time
    current_time = time.time()
    np.random.seed(int(current_time * 1000) % 10000)
    random.seed(int(current_time * 2000) % 10000)
    
    # Debug output
    if random_turns:
        print("RANDOM TURNS MODE: Will use different turn counts between 1-300 for each game")
        # Show some sample random turn values
        samples = [np.random.randint(1, 301) for _ in range(5)]
        print(f"Sample turn counts: {samples}")
    else:
        print(f"FIXED TURNS MODE: Using T={default_turns} for all games")
    
    all_environments = {
        "Fixed": FixedBandit(),
        "Periodic": PeriodicBandit(k=10),
        "Dynamic": DynamicBandit(),
        "FullyRandom": FullyRandomBandit()
    }
    
    # If a specific environment type is requested, use only that one
    if env_type != "all" and env_type in all_environments:
        environments = [all_environments[env_type]]
        environment_names = [env_type]
    else:
        # Otherwise use all environments
        environments = list(all_environments.values())
        environment_names = list(all_environments.keys())
    
    results = {
        "environment": [],
        "game": [],
        "total_reward": [],
        "average_reward": [],
        "turns": [],
        "actions": [],
        "optimal_actions": [],
        "regret": []
    }
    
    # Create turn counts in advance for reproducibility and debugging
    all_turn_counts = []
    if random_turns:
        for _ in range(n_games * len(environments)):
            # Generate a truly random T between 1 and 300
            all_turn_counts.append(np.random.randint(1, 301))
    else:
        all_turn_counts = [default_turns] * (n_games * len(environments))
    
    # Use tqdm for progress if verbose
    game_iterator = tqdm(range(n_games * len(environments))) if verbose else range(n_games * len(environments))
    
    for game_idx in game_iterator:
        env_idx = game_idx // n_games
        game_num = game_idx % n_games
        
        env = environments[env_idx]
        
        # Get pre-generated turn count for this game
        T = all_turn_counts[game_idx]
        
        # Debug output
        if verbose and game_idx % 20 == 0:
            print(f"Game {game_idx}, Environment: {environment_names[env_idx]}, Turn count: {T}")
        
        env.reset(T)
        
        total_reward = 0
        optimal_actions = 0
        
        # Start the game
        for _ in range(T):
            # Prepare the visible info for the agent based on agent type
            if agent_func == full_information_agent:
                visible_info = env.get_visible_info()
            elif agent_func == partial_information_agent:
                visible_info = {
                    'current_turn': env.current_turn,
                    'total_turns': env.T,
                    'p1': env.p1,
                    'history': {
                        'actions': env.history['actions'],
                        'rewards': env.history['rewards']
                    }
                }
            else:  # reward_only_agent
                visible_info = {
                    'current_turn': env.current_turn,
                    'history': {
                        'actions': env.history['actions'],
                        'rewards': env.history['rewards']
                    }
                }
            
            # Get action from agent
            action = agent_func(visible_info)
            
            # Take step in environment
            reward = env.step(action)
            total_reward += reward
            
            # Count optimal actions (choosing the arm with highest probability)
            if (action == 0 and env.p1 >= env.p2) or (action == 1 and env.p2 > env.p1):
                optimal_actions += 1
        
        # Calculate regret (difference between optimal and actual rewards)
        optimal_expected_reward = sum(max(p1, p2) for p1, p2 in zip(env.history['p1'], env.history['p2']))
        actual_expected_reward = sum(
            env.history['p1'][i] if action == 0 else env.history['p2'][i]
            for i, action in enumerate(env.history['actions'])
        )
        regret = optimal_expected_reward - actual_expected_reward
        
        # Store results
        results["environment"].append(environment_names[env_idx])
        results["game"].append(game_num)
        results["total_reward"].append(total_reward)
        results["average_reward"].append(total_reward / T)
        results["turns"].append(T)
        results["actions"].append(env.history['actions'])
        results["optimal_actions"].append(optimal_actions / T * 100)  # percentage
        results["regret"].append(regret)
    
    # Verify turn counts are as expected
    if random_turns:
        turn_counts = np.array(results["turns"])
        print(f"\nTurn count summary: Min={turn_counts.min()}, Max={turn_counts.max()}, Mean={turn_counts.mean():.2f}")
        if turn_counts.min() == turn_counts.max():
            print("WARNING: All turn counts are the same! Random turns mode may not be working correctly.")
    
    return results


# ================ STANDARDIZED EXPERIMENT RUNNER ================

def run_standard_experiment(agent_func: Callable, env_type: str, n_experiments: int = 100, fixed_turns: bool = True) -> Dict:
    """
    Run a standardized experiment for a specific agent on a specific environment.
    
    Args:
        agent_func (Callable): The agent function to use
        env_type (str): Type of environment to use
        n_experiments (int): Number of experiments to run (default 100)
        fixed_turns (bool): Whether to use fixed T=100 or random T (1-300)
    
    Returns:
        Dict: Results of the experiment
    """
    # Force different random seed for this experiment
    current_time = time.time()
    np.random.seed(int(current_time * 1000) % 10000)
    random.seed(int(current_time * 2000) % 10000)
    
    if fixed_turns:
        print("\n=== RUNNING WITH FIXED TURNS (T=100) ===")
        default_turns = 100
        random_turns = False
    else:
        print("\n=== RUNNING WITH RANDOM TURNS (T=1-300) ===")
        default_turns = 100  # This value is only used if random_turns is False
        random_turns = True
        
        # Show some sample turn values to verify randomness
        samples = [np.random.randint(1, 301) for _ in range(5)]
        print(f"Sample random turn values: {samples}")
    
    # Run the experiment
    return run_experiment(
        agent_func=agent_func,
        n_games=n_experiments,
        default_turns=default_turns,
        random_turns=random_turns,
        verbose=True,
        env_type=env_type
    )


# ================ VISUALIZATION FUNCTIONS ================

def plot_rewards_by_environment(results: Dict) -> None:
    """
    Plot average rewards for each environment.
    
    Args:
        results (Dict): Results from run_experiment
    """
    df = pd.DataFrame({
        'Environment': results['environment'],
        'Game': results['game'],
        'Average Reward': results['average_reward']
    })
    
    plt.figure(figsize=(12, 6))
    sns.boxplot(x='Environment', y='Average Reward', data=df)
    plt.title('Distribution of Average Rewards by Environment')
    plt.grid(True, alpha=0.3)
    plt.show()


def plot_optimal_actions(results: Dict) -> None:
    """
    Plot percentage of optimal actions for each environment.
    
    Args:
        results (Dict): Results from run_experiment
    """
    df = pd.DataFrame({
        'Environment': results['environment'],
        'Game': results['game'],
        'Optimal Actions (%)': results['optimal_actions']
    })
    
    plt.figure(figsize=(12, 6))
    sns.boxplot(x='Environment', y='Optimal Actions (%)', data=df)
    plt.title('Percentage of Optimal Actions by Environment')
    plt.grid(True, alpha=0.3)
    plt.show()


def plot_regret(results: Dict) -> None:
    """
    Plot regret for each environment.
    
    Args:
        results (Dict): Results from run_experiment
    """
    df = pd.DataFrame({
        'Environment': results['environment'],
        'Game': results['game'],
        'Regret': results['regret']
    })
    
    plt.figure(figsize=(12, 6))
    sns.boxplot(x='Environment', y='Regret', data=df)
    plt.title('Distribution of Regret by Environment')
    plt.grid(True, alpha=0.3)
    plt.show()


def plot_action_history(results: Dict, game_idx: int) -> None:
    """
    Plot action history for a specific game.
    
    Args:
        results (Dict): Results from run_experiment
        game_idx (int): Index of the game to plot
    """
    env_name = results['environment'][game_idx]
    actions = results['actions'][game_idx]
    
    plt.figure(figsize=(12, 4))
    plt.plot(actions, 'o-', markersize=4)
    plt.yticks([0, 1], ['Arm 1', 'Arm 2'])
    plt.title(f'Action History for {env_name} Environment - Game {results["game"][game_idx]}')
    plt.xlabel('Turn')
    plt.ylabel('Action')
    plt.grid(True, alpha=0.3)
    plt.show()


def plot_reward_over_time(results: Dict, n_games: int = 5) -> None:
    """
    Plot cumulative reward over time for selected games.
    
    Args:
        results (Dict): Results from run_experiment
        n_games (int): Number of games to plot per environment
    """
    # Select game indices to plot
    env_names = list(set(results['environment']))
    
    plt.figure(figsize=(15, 8))
    
    for env_name in env_names:
        env_game_indices = [i for i, e in enumerate(results['environment']) if e == env_name]
        
        # Select a subset of games for this environment
        selected_indices = env_game_indices[:n_games]
        
        for idx in selected_indices:
            actions = results['actions'][idx]
            turns = results['turns'][idx]
            
            # Reconstruct cumulative rewards
            cum_rewards = np.cumsum([results['total_reward'][idx] / turns] * turns)
            
            plt.plot(cum_rewards, alpha=0.7, label=f"{env_name} - Game {results['game'][idx]}")
    
    plt.title('Cumulative Reward Over Time')
    plt.xlabel('Turn')
    plt.ylabel('Cumulative Reward')
    plt.grid(True, alpha=0.3)
    plt.legend(loc='best')
    plt.show()


def generate_statistical_summary(results: Dict) -> pd.DataFrame:
    """
    Generate a comprehensive statistical summary from experiment results.
    
    Args:
        results (Dict): Results from run_experiment
    
    Returns:
        pd.DataFrame: Statistical summary
    """
    # Convert results to DataFrame
    data = []
    for i in range(len(results['environment'])):
        data.append({
            'Environment': results['environment'][i],
            'Game': results['game'][i],
            'Total Reward': results['total_reward'][i],
            'Average Reward': results['average_reward'][i],
            'Turns': results['turns'][i],
            'Optimal Actions (%)': results['optimal_actions'][i],
            'Regret': results['regret'][i]
        })
    
    df = pd.DataFrame(data)
    
    # Group by environment and calculate statistics
    summary = df.groupby('Environment').agg({
        'Total Reward': ['mean', 'std', 'min', 'max'],
        'Average Reward': ['mean', 'std', 'min', 'max'],
        'Optimal Actions (%)': ['mean', 'std', 'min', 'max'],
        'Regret': ['mean', 'std', 'min', 'max'],
        'Turns': ['mean', 'count']
    })
    
    # Rename the columns for better readability
    summary.columns = ['_'.join(col).strip() for col in summary.columns.values]
    
    return summary


def plot_performance_metrics(results: Dict, title_prefix: str = "") -> None:
    """
    Plot comprehensive performance metrics for the agent.
    
    Args:
        results (Dict): Results from run_experiment
        title_prefix (str): Prefix for plot titles
    """
    # Convert results to DataFrame
    df = pd.DataFrame({
        'Environment': results['environment'],
        'Game': results['game'],
        'Average Reward': results['average_reward'],
        'Optimal Actions (%)': results['optimal_actions'],
        'Regret': results['regret'],
        'Turns': results['turns']
    })
    
    # Create a figure with multiple subplots
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    
    # Plot average reward
    sns.boxplot(x='Environment', y='Average Reward', data=df, ax=axes[0, 0])
    axes[0, 0].set_title(f'{title_prefix}Average Reward by Environment')
    axes[0, 0].grid(True, alpha=0.3)
    
    # Plot optimal action percentage
    sns.boxplot(x='Environment', y='Optimal Actions (%)', data=df, ax=axes[0, 1])
    axes[0, 1].set_title(f'{title_prefix}Optimal Actions (%) by Environment')
    axes[0, 1].grid(True, alpha=0.3)
    
    # Plot regret
    sns.boxplot(x='Environment', y='Regret', data=df, ax=axes[1, 0])
    axes[1, 0].set_title(f'{title_prefix}Regret by Environment')
    axes[1, 0].grid(True, alpha=0.3)
    
    # Plot learning curve for each environment (using first few games)
    for env_name in set(df['Environment']):
        env_df = df[df['Environment'] == env_name]
        
        # Select a random game for this environment
        selected_game = env_df.iloc[0]['Game']
        
        # Find the index in the original results
        game_idx = next(i for i, (e, g) in enumerate(zip(results['environment'], results['game'])) 
                         if e == env_name and g == selected_game)
        
        actions = results['actions'][game_idx]
        turns = results['turns'][game_idx]
        
        # Plot action history
        axes[1, 1].plot(range(turns), actions, 'o-', markersize=3, label=env_name, alpha=0.7)
    
    axes[1, 1].set_title(f'{title_prefix}Action History by Environment')
    axes[1, 1].set_xlabel('Turn')
    axes[1, 1].set_ylabel('Action')
    axes[1, 1].set_yticks([0, 1])
    axes[1, 1].set_yticklabels(['Arm 1', 'Arm 2'])
    axes[1, 1].grid(True, alpha=0.3)
    axes[1, 1].legend(loc='best')
    
    plt.tight_layout()
    plt.show()


def visualize_simplified_results(results: Dict, agent_name: str = "Agent") -> None:
    """
    Simplified visualization showing only aggregate results across all games.
    
    Args:
        results (Dict): Results from run_experiment
        agent_name (str): Name of the agent for titles
    """
    # Convert results to DataFrame
    data = []
    for i in range(len(results['environment'])):
        data.append({
            'Environment': results['environment'][i],
            'Game': results['game'][i],
            'Average Reward': results['average_reward'][i],
            'Optimal Actions (%)': results['optimal_actions'][i],
            'Regret': results['regret'][i],
            'Turns': results['turns'][i]
        })
    
    df = pd.DataFrame(data)
    
    # Generate and display statistical summary
    summary = df.groupby('Environment').agg({
        'Average Reward': ['mean', 'std', 'min', 'max'],
        'Optimal Actions (%)': ['mean', 'std', 'min', 'max'],
        'Regret': ['mean', 'std'],
        'Turns': ['mean', 'std', 'min', 'max', 'count']
    })
    
    # Rename the columns for better readability
    summary.columns = ['_'.join(col).strip() for col in summary.columns.values]
    
    # Print agent name and summary statistics
    print(f"\n===== {agent_name} Summary Statistics =====\n")
    summary_display = summary.round(3)  # Round to 3 decimal places for cleaner display
    print(summary_display)
    
    # Print the turn count distribution to verify random generation is working
    if "Random" in agent_name:
        print("\nTurn Count Distribution:")
        turn_counts = df.groupby('Turns').size().reset_index(name='Count')
        turn_summary = f"Min: {df['Turns'].min()}, Max: {df['Turns'].max()}, Mean: {df['Turns'].mean():.2f}, Std: {df['Turns'].std():.2f}"
        print(turn_summary)
        
        # Plot turn count histogram
        plt.figure(figsize=(10, 4))
        plt.hist(df['Turns'], bins=30, alpha=0.7)
        plt.title('Turn Count Distribution')
        plt.xlabel('Number of Turns')
        plt.ylabel('Frequency')
        plt.grid(True, alpha=0.3)
        plt.show()
    
    # Create a figure with key metrics
    plt.figure(figsize=(15, 10))
    
    # Average Reward
    plt.subplot(2, 2, 1)
    sns.barplot(x='Environment', y='Average Reward', data=df, errorbar=('ci', 95), capsize=0.2)
    plt.title(f'Average Reward by Environment')
    plt.grid(True, alpha=0.3)
    
    # Optimal Actions
    plt.subplot(2, 2, 2)
    sns.barplot(x='Environment', y='Optimal Actions (%)', data=df, errorbar=('ci', 95), capsize=0.2)
    plt.title(f'Optimal Actions (%) by Environment')
    plt.grid(True, alpha=0.3)
    
    # Regret
    plt.subplot(2, 2, 3)
    sns.barplot(x='Environment', y='Regret', data=df, errorbar=('ci', 95), capsize=0.2)
    plt.title(f'Regret by Environment')
    plt.grid(True, alpha=0.3)
    
    # Reward distribution
    plt.subplot(2, 2, 4)
    sns.boxplot(x='Environment', y='Average Reward', data=df)
    plt.title(f'Reward Distribution by Environment')
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.suptitle(f"{agent_name} Performance Across Environments", fontsize=16, y=1.02)
    plt.show()
    
    # Show reward distributions as violin plots
    plt.figure(figsize=(10, 6))
    sns.violinplot(x='Environment', y='Average Reward', data=df)
    plt.title(f'{agent_name}: Reward Distribution by Environment')
    plt.grid(True, alpha=0.3)
    plt.show()


def visualize_results(results: Dict) -> None:
    """
    Visualize the results of the experiment.
    
    Args:
        results (Dict): Results from run_experiment
    """
    # Convert results to pandas DataFrame for easier analysis
    df = pd.DataFrame({
        'Environment': results['environment'],
        'Game': results['game'],
        'Total Reward': results['total_reward'],
        'Average Reward': results['average_reward'],
        'Turns': results['turns'],
        'Optimal Actions (%)': results['optimal_actions'],
        'Regret': results['regret']
    })
    
    # Summary statistics by environment
    summary = df.groupby('Environment').agg({
        'Total Reward': ['mean', 'std'],
        'Average Reward': ['mean', 'std'],
        'Optimal Actions (%)': ['mean', 'std'],
        'Regret': ['mean', 'std']
    })
    
    print("===== Summary Statistics =====")
    print(summary)
    print("\n")
    
    # Plot visualizations
    plot_rewards_by_environment(results)
    plot_optimal_actions(results)
    plot_regret(results)
    
    # Plot action history for the first game of each environment
    for env_name in ['Fixed', 'Periodic', 'Dynamic']:
        game_idx = results['environment'].index(env_name)
        plot_action_history(results, game_idx)
    
    # Plot reward over time for a subset of games
    plot_reward_over_time(results, n_games=2)


# ================ INDIVIDUAL AGENT EVALUATION FUNCTIONS ================

def evaluate_full_information_agent(fixed_turns: bool = True) -> Dict:
    """
    Run standardized evaluation for just the full information agent.
    
    Args:
        fixed_turns (bool): Whether to use fixed T=100 (True) or random T=1-300 (False)
    
    Returns:
        Dict: Results for the full information agent
    """
    turns_type = "Fixed (T=100)" if fixed_turns else "Random (T=1-300)"
    print(f"===== EVALUATING FULL INFORMATION AGENT WITH {turns_type} TURNS =====")
    
    # Dictionary to store results for each environment
    results = {}
    all_results = {}
    
    # Test on each environment type
    env_types = ["Fixed", "Periodic", "Dynamic", "FullyRandom"]
    for env_type in env_types:
        print(f"\nTesting on {env_type} environment...")
        
        # Run the experiment with specified turns type
        env_results = run_standard_experiment(
            agent_func=full_information_agent,
            env_type=env_type,
            fixed_turns=fixed_turns
        )
        
        # Store results
        results[env_type] = env_results
        all_results.update({env_type: env_results})
    
    # Display aggregate results for all environments together
    print(f"\n===== FULL INFORMATION AGENT SUMMARY ({turns_type} TURNS) =====")
    
    # Combine all environment results
    combined_results = {
        "environment": [],
        "game": [],
        "total_reward": [],
        "average_reward": [],
        "turns": [],
        "actions": [],
        "optimal_actions": [],
        "regret": []
    }
    
    for env_results in results.values():
        for key in combined_results.keys():
            combined_results[key].extend(env_results[key])
    
    # Visualize the combined results
    agent_name = f"Full Information Agent ({turns_type})"
    visualize_simplified_results(combined_results, agent_name=agent_name)
    
    return all_results


def evaluate_partial_information_agent(fixed_turns: bool = True) -> Dict:
    """
    Run standardized evaluation for just the partial information agent.
    
    Args:
        fixed_turns (bool): Whether to use fixed T=100 (True) or random T=1-300 (False)
    
    Returns:
        Dict: Results for the partial information agent
    """
    turns_type = "Fixed (T=100)" if fixed_turns else "Random (T=1-300)"
    print(f"===== EVALUATING PARTIAL INFORMATION AGENT WITH {turns_type} TURNS =====")
    
    # Dictionary to store results for each environment
    results = {}
    all_results = {}
    
    # Test on each environment type
    env_types = ["Fixed", "Periodic", "Dynamic", "FullyRandom"]
    for env_type in env_types:
        print(f"\nTesting on {env_type} environment...")
        
        # Run the experiment with specified turns type
        env_results = run_standard_experiment(
            agent_func=partial_information_agent,
            env_type=env_type,
            fixed_turns=fixed_turns
        )
        
        # Store results
        results[env_type] = env_results
        all_results.update({env_type: env_results})
    
    # Display aggregate results for all environments together
    print(f"\n===== PARTIAL INFORMATION AGENT SUMMARY ({turns_type} TURNS) =====")
    
    # Combine all environment results
    combined_results = {
        "environment": [],
        "game": [],
        "total_reward": [],
        "average_reward": [],
        "turns": [],
        "actions": [],
        "optimal_actions": [],
        "regret": []
    }
    
    for env_results in results.values():
        for key in combined_results.keys():
            combined_results[key].extend(env_results[key])
    
    # Visualize the combined results
    agent_name = f"Partial Information Agent ({turns_type})"
    visualize_simplified_results(combined_results, agent_name=agent_name)
    
    return all_results


def evaluate_reward_only_agent(fixed_turns: bool = True) -> Dict:
    """
    Run standardized evaluation for just the reward-only agent.
    
    Args:
        fixed_turns (bool): Whether to use fixed T=100 (True) or random T=1-300 (False)
    
    Returns:
        Dict: Results for the reward-only agent
    """
    turns_type = "Fixed (T=100)" if fixed_turns else "Random (T=1-300)"
    print(f"===== EVALUATING REWARD-ONLY AGENT WITH {turns_type} TURNS =====")
    
    # Dictionary to store results for each environment
    results = {}
    all_results = {}
    
    # Test on each environment type
    env_types = ["Fixed", "Periodic", "Dynamic", "FullyRandom"]
    for env_type in env_types:
        print(f"\nTesting on {env_type} environment...")
        
        # Run the experiment with specified turns type
        env_results = run_standard_experiment(
            agent_func=reward_only_agent,
            env_type=env_type,
            fixed_turns=fixed_turns
        )
        
        # Store results
        results[env_type] = env_results
        all_results.update({env_type: env_results})
    
    # Display aggregate results for all environments together
    print(f"\n===== REWARD-ONLY AGENT SUMMARY ({turns_type} TURNS) =====")
    
    # Combine all environment results
    combined_results = {
        "environment": [],
        "game": [],
        "total_reward": [],
        "average_reward": [],
        "turns": [],
        "actions": [],
        "optimal_actions": [],
        "regret": []
    }
    
    for env_results in results.values():
        for key in combined_results.keys():
            combined_results[key].extend(env_results[key])
    
    # Visualize the combined results
    agent_name = f"Reward-Only Agent ({turns_type})"
    visualize_simplified_results(combined_results, agent_name=agent_name)
    
    return all_results


def evaluate_agent(agent_func: Callable, agent_name: str) -> Dict:
    """
    Comprehensively evaluate an agent across all environment types.
    
    Args:
        agent_func (Callable): The agent function to evaluate
        agent_name (str): Name of the agent for reporting
    
    Returns:
        Dict: Results for all environments
    """
    print(f"===== Evaluating {agent_name} =====")
    
    # Dictionary to store results for each environment
    all_env_results = {}
    
    # Test on each environment type
    env_types = ["Fixed", "Periodic", "Dynamic", "FullyRandom"]
    for env_type in env_types:
        print(f"\nTesting on {env_type} environment...")
        
        # Fixed turns (T=100) for environments where T is known
        results = run_standard_experiment(
            agent_func=agent_func,
            env_type=env_type,
            fixed_turns=True  # Use T=100
        )
        
        all_env_results[env_type] = results
    
    # If this is the reward-only agent, also test with unknown T
    if agent_func == reward_only_agent:
        print("\nTesting reward-only agent with unknown number of turns...")
        
        for env_type in env_types:
            random_results = run_standard_experiment(
                agent_func=agent_func,
                env_type=env_type,
                fixed_turns=False  # Random T between 1-300
            )
            all_env_results[f"{env_type}_random_T"] = random_results
    
    return all_env_results


def run_all_agents() -> Dict:
    """
    Run standard evaluations for all agent types.
    
    Returns:
        Dict: Comprehensive results for all agents
    """
    print("======= RUNNING COMPREHENSIVE AGENT EVALUATION =======")
    
    results = {}
    
    # Evaluate each agent type
    results["full_information"] = evaluate_agent(full_information_agent, "Full Information Agent")
    results["partial_information"] = evaluate_agent(partial_information_agent, "Partial Information Agent")
    results["reward_only"] = evaluate_agent(reward_only_agent, "Reward-Only Agent")
    
    # Compare agents
    print("\n======= AGENT COMPARISON =======")
    compare_all_agents(results)
    
    return results


def compare_all_agents(all_results: Dict) -> None:
    """
    Compare the performance of all agents across environments.
    
    Args:
        all_results (Dict): Results from run_all_agents
    """
    # Prepare data for comparison
    comparison_data = []
    
    # Collect results from each agent and environment
    for agent_name, agent_results in all_results.items():
        for env_name, results in agent_results.items():
            # Skip random T results for cleaner comparison
            if "_random_T" in env_name:
                continue
                
            # Calculate mean performance metrics for this agent in this environment
            df = pd.DataFrame({
                'Environment': results['environment'],
                'Average Reward': results['average_reward'],
                'Optimal Actions (%)': results['optimal_actions'],
                'Regret': results['regret']
            })
            
            # Get mean values
            mean_reward = df['Average Reward'].mean()
            mean_optimal = df['Optimal Actions (%)'].mean()
            mean_regret = df['Regret'].mean()
            
            # Add to comparison data
            comparison_data.append({
                'Agent': agent_name.replace('_', ' ').title(),
                'Environment': env_name,
                'Average Reward': mean_reward,
                'Optimal Actions (%)': mean_optimal,
                'Regret': mean_regret
            })
    
    # Convert to DataFrame
    comparison_df = pd.DataFrame(comparison_data)
    
    # Display comprehensive summary table (rounded to 3 decimal places for readability)
    print("\n===== AGENT COMPARISON SUMMARY =====")
    print("\nPerformance Metrics by Agent and Environment:")
    display_df = comparison_df.round(3)
    print(display_df)
    
    # Create comparison plots
    plt.figure(figsize=(15, 10))
    
    # Average Reward
    plt.subplot(2, 2, 1)
    sns.barplot(x='Environment', y='Average Reward', hue='Agent', data=comparison_df, errorbar=('ci', 95))
    plt.title('Average Reward by Agent and Environment')
    plt.xticks(rotation=45)
    plt.grid(True, alpha=0.3)
    
    # Optimal Actions
    plt.subplot(2, 2, 2)
    sns.barplot(x='Environment', y='Optimal Actions (%)', hue='Agent', data=comparison_df, errorbar=('ci', 95))
    plt.title('Optimal Actions (%) by Agent and Environment')
    plt.xticks(rotation=45)
    plt.grid(True, alpha=0.3)
    
    # Regret
    plt.subplot(2, 2, 3)
    sns.barplot(x='Environment', y='Regret', hue='Agent', data=comparison_df, errorbar=('ci', 95))
    plt.title('Regret by Agent and Environment')
    plt.xticks(rotation=45)
    plt.grid(True, alpha=0.3)
    
    # Generate heatmap for average reward
    plt.subplot(2, 2, 4)
    reward_pivot = comparison_df.pivot_table(
        index='Agent', 
        columns='Environment', 
        values='Average Reward'
    )
    sns.heatmap(reward_pivot, annot=True, cmap='YlGnBu', fmt='.3f')
    plt.title('Average Reward Heatmap')
    
    plt.tight_layout()
    plt.suptitle("Agent Performance Comparison", fontsize=16, y=1.02)
    plt.show()
    
    # Generate summary of best agent for each environment
    print("\nBest Agent for Each Environment (Based on Average Reward):")
    best_agents = comparison_df.loc[comparison_df.groupby('Environment')['Average Reward'].idxmax()]
    print(best_agents[['Environment', 'Agent', 'Average Reward']].round(3))
    
    # Generate summary of best environment for each agent
    print("\nBest Environment for Each Agent (Based on Average Reward):")
    best_envs = comparison_df.loc[comparison_df.groupby('Agent')['Average Reward'].idxmax()]
    print(best_envs[['Agent', 'Environment', 'Average Reward']].round(3))
    
    # Create simple ranking table based on average reward across all environments
    print("\nOverall Agent Ranking (Based on Average Reward Across All Environments):")
    agent_ranking = comparison_df.groupby('Agent')['Average Reward'].mean().reset_index()
    agent_ranking = agent_ranking.sort_values('Average Reward', ascending=False)
    print(agent_ranking.round(3))




# Example usage for students
"""
# STUDENTS: Implement your solutions for the agent functions above.
# Then run the evaluations to test your implementations.

# To evaluate just one agent type at a time (for optimization):
# With fixed turns (T=100)
full_info_results = evaluate_full_information_agent(fixed_turns=True)
# OR
partial_info_results = evaluate_partial_information_agent(fixed_turns=True)
# OR
reward_only_results = evaluate_reward_only_agent(fixed_turns=True)

# With random turns (T=1-300)
full_info_random_results = evaluate_full_information_agent(fixed_turns=False)
# OR
partial_info_random_results = evaluate_partial_information_agent(fixed_turns=False)
# OR
reward_only_random_results = evaluate_reward_only_agent(fixed_turns=False)

# To run comprehensive evaluation of all agents:
# With fixed turns (T=100)
all_results = run_all_agents(fixed_turns=True)
# With random turns (T=1-300)
all_random_results = run_all_agents(fixed_turns=False)
"""

# Agentes para Definir

## Informacion Completa

In [None]:

from typing import Dict
import numpy as np

def full_information_agent(env_info: Dict) -> int:
    """
    Agente de Información Completa.
    
    Este agente conoce la probabilidad actual de recompensa del brazo 1 (visible)
    y utiliza la historia de acciones y recompensas para estimar el rendimiento del brazo 2,
    cuya probabilidad es oculta. La decisión se basa en comparar la probabilidad
    actual del brazo 1 con la estimación empírica de la tasa de recompensa del brazo 2.
    
    Args:
        env_info (Dict): Diccionario que contiene:
            - current_turn (int): Número de turno actual.
            - total_turns (int): Número total de turnos en el juego.
            - p1 (float): Probabilidad de recompensa del brazo 1 en el turno actual.
            - history (Dict): Diccionario con la información histórica, que incluye:
                - 'actions': Lista de acciones pasadas (0 para brazo 1, 1 para brazo 2).
                - 'rewards': Lista de recompensas obtenidas en turnos anteriores.
                - 'p1': Lista de probabilidades pasadas para el brazo 1.
                (Nota: La probabilidad del brazo 2 no está incluida, pues es oculta.)
    
    Returns:
        int: La acción a tomar (0 para el brazo 1, 1 para el brazo 2).
    """
    
    # Obtener la probabilidad actual del brazo 1
    p1 = env_info.get('p1', 0.5)
    
    # Obtener el historial de acciones y recompensas
    history = env_info.get('history', {})
    actions = history.get('actions', [])
    rewards = history.get('rewards', [])
    
    # Estimar la tasa de éxito empírica para el brazo 2 (acción 1)
    arm2_rewards = [r for a, r in zip(actions, rewards) if a == 1]
    if len(arm2_rewards) > 0:
        p2_estimate = sum(arm2_rewards) / len(arm2_rewards)
    else:
        # Si aún no se ha seleccionado el brazo 2, se opta por explorarlo
        return 1
    
    # Decisión: se elige el brazo con mayor probabilidad estimada de éxito
    if p1 >= p2_estimate:
        return 0  # Elegir brazo 1 (visible)
    else:
        return 1  # Elegir brazo 2 (oculto)
    
# Ejemplo de uso:
if __name__ == "__main__":
    # Ejemplo de env_info con algunos datos históricos
    env_info_example = {
        'current_turn': 10,
        'total_turns': 100,
        'p1': 0.65,
        'history': {
            'actions': [0, 1, 0, 1],
            'rewards': [1, 0, 1, 1],
            'p1': [0.60, 0.62, 0.64, 0.66]
        }
    }
    
    action = full_information_agent(env_info_example)
    print("Acción elegida:", action)




In [None]:
full_info_results = evaluate_full_information_agent()


In [None]:
full_info_results = evaluate_full_information_agent(fixed_turns=False)


#

## Informacion Parcial

In [None]:
def partial_information_agent(env_info: Dict) -> int:
    """
    Agente de Información Parcial.
    
    Este agente solo tiene acceso a:
      - La probabilidad actual de recompensa del brazo 1 (visible).
      - El número de turno actual y el total de turnos.
      - El historial de acciones y recompensas (para inferir el rendimiento del brazo 2).
    
    La estrategia implementada es una variante epsilon-greedy que:
      - Estima la tasa de éxito empírica del brazo 2 a partir de su historial.
      - Usa la probabilidad actual del brazo 1 directamente.
      - Con una probabilidad epsilon (que disminuye a medida que avanza el juego) realiza exploración.
      - Si no se explora, selecciona el brazo con la tasa de éxito (o probabilidad) mayor.
    
    Args:
        env_info (Dict): Diccionario que contiene:
            - current_turn: Número de turno actual.
            - total_turns: Número total de turnos.
            - p1: Probabilidad actual del brazo 1.
            - history: Diccionario con:
                - 'actions': Lista de acciones pasadas (0 para brazo 1, 1 para brazo 2).
                - 'rewards': Lista de recompensas obtenidas en turnos anteriores.
    
    Returns:
        int: La acción a tomar (0 para el brazo 1, 1 para el brazo 2).
    """
    current_turn = env_info.get('current_turn', 0)
    total_turns = env_info.get('total_turns', 100)
    p1 = env_info.get('p1', 0.5)
    history = env_info.get('history', {})
    
    # Extraer historial de acciones y recompensas
    actions = history.get('actions', [])
    rewards = history.get('rewards', [])
    
    # Calcular la tasa de éxito empírica para el brazo 2, usando solo los turnos en los que se jugó ese brazo.
    arm2_rewards = [r for a, r in zip(actions, rewards) if a == 1]
    if len(arm2_rewards) > 0:
        arm2_mean = sum(arm2_rewards) / len(arm2_rewards)
    else:
        # Si el brazo 2 aún no ha sido explorado, usar un valor neutro.
        arm2_mean = 0.5
    
    # Definir un parámetro epsilon para exploración, que disminuye conforme avanzan los turnos.
    # Por ejemplo, epsilon se puede definir de modo lineal, con un mínimo de 0.1.
    epsilon = max(0.1, 1.0 - (current_turn / total_turns))
    
    # Con probabilidad epsilon, se realiza una acción aleatoria para explorar.
    if np.random.rand() < epsilon:
        return np.random.randint(0, 2)
    
    # En caso de explotación, se compara la probabilidad visible del brazo 1 (p1)
    # con la tasa de éxito estimada del brazo 2 (arm2_mean).
    if p1 >= arm2_mean:
        return 0  # Seleccionar el brazo 1
    else:
        return 1  # Seleccionar el brazo 2

# Ejemplo de uso:
if __name__ == "__main__":
    # Ejemplo de env_info
    env_info_example = {
        'current_turn': 20,
        'total_turns': 100,
        'p1': 0.55,
        'history': {
            'actions': [0, 1, 0, 0, 1],
            'rewards': [1, 0, 1, 1, 1]
        }
    }
    
    action = partial_information_agent(env_info_example)
    print("Acción elegida:", action)


In [None]:
partial_info_results = evaluate_partial_information_agent()


In [None]:
partial_info_results = evaluate_partial_information_agent(fixed_turns=False)


## Reward Only

In [None]:
from typing import Dict
import numpy as np

def reward_only_agent(env_info: Dict) -> int:
    """
    Agent that only sees the rewards but not the number of turns T.
    
    This agent does not have access to any probability information or the total number of turns.
    Instead, it must base its decision solely on the observed rewards in its history.
    
    The strategy implemented here is a simple greedy one:
      - Compute the empirical average reward for each arm (0 and 1) based on the history.
      - If an arm has not been played yet, select it to gather information.
      - Otherwise, choose the arm with the higher average reward.
      
    Args:
        env_info (Dict): Dictionary containing:
            - current_turn: Current turn number.
            - history: Dictionary with past actions and rewards:
                - 'actions': List[int] of past actions (0 for arm 1, 1 for arm 2).
                - 'rewards': List[float] of rewards received.
    
    Returns:
        int: The action to take (0 for arm 1, 1 for arm 2).
    """
    # Extract the history information
    history = env_info.get('history', {})
    actions = history.get('actions', [])
    rewards = history.get('rewards', [])
    
    # If no actions have been taken, choose randomly (or default to one arm)
    if not actions:
        return np.random.randint(0, 2)
    
    # Compute empirical averages for each arm
    sum_rewards = {0: 0.0, 1: 0.0}
    count_rewards = {0: 0, 1: 0}
    
    for action, reward in zip(actions, rewards):
        sum_rewards[action] += reward
        count_rewards[action] += 1
    
    # If an arm has not been played yet, select it to ensure exploration
    if count_rewards[0] == 0:
        return 0
    if count_rewards[1] == 0:
        return 1
    
    avg_reward_0 = sum_rewards[0] / count_rewards[0]
    avg_reward_1 = sum_rewards[1] / count_rewards[1]
    
    # Greedy decision: select the arm with the higher average reward
    if avg_reward_0 >= avg_reward_1:
        return 0
    else:
        return 1

# Example usage:
if __name__ == "__main__":
    # Example env_info with a history of actions and rewards
    env_info_example = {
        'current_turn': 15,
        'history': {
            'actions': [0, 1, 1, 0, 0],
            'rewards': [1, 0, 1, 1, 0]
        }
    }
    chosen_action = reward_only_agent(env_info_example)
    print("Chosen action:", chosen_action)



In [None]:
reward_only_results = evaluate_reward_only_agent()


In [None]:
reward_only_results = evaluate_reward_only_agent(fixed_turns=False)
