In [1]:
# @title Branch and Bound


"""A branch and bound solver for coalitional structure generation.
"""

import collections
from collections.abc import Sequence
import itertools
import random
import sys
from typing import Any, List, Optional, Tuple, Union

from absl import logging  # pylint:disable=unused-import
import numpy as np



class BranchAndBound(solver.CoalitionalGameSolver):
  """Branch and bound method for coalition structure optimization.
  """

  def __init__(
      self,
      num_iterations: int = 1,
      seed: int | None = None,
      reporting: Any = None,
      random_branching: bool = True,  # Flag for randomness control
  ):
    super().__init__(num_iterations, seed, reporting)

    self.coalition_counts = collections.defaultdict(int)  # Optimal cs per iter

    self.random_branching = random_branching
    self.max_values_per_iteration = []
    self.max_value_so_far = -np.inf
    self.nodes_explored_per_iteration = []
    self.nodes_pruned_per_iteration = []

  def solve(
      self,
      characteristic_function: Any = None,
      game: Optional[coalitional_game.CoalitionalGame] = None,
      game_id: Optional[int] = None,
      game_name: Optional[str] = None,
      game_class: Optional[solver.GameClass] = None,
      game_representation: coalitional_game.Representation = coalitional_game.Representation.TABULAR_GAME,
      coalition_size_min: int = 0,
      coalition_size_max: int = sys.maxsize,
      task_cost: float = 0.0,
  ) -> Optional[
      tuple[
          Optional[Sequence[Sequence[int]]],
          Optional[Sequence[float] | np.ndarray],
      ]
  ]:
    """Run multiple iterations of coalition structure search.

    Args:
      characteristic_function: looks up or computes the value of any coalition
      game: an instance of a CoalitionalGame to be solved
      game_id: ID for a specific game instance
      game_name: name of the game to be solved, to be instantiated in function
      game_class: the complexity class of the game to be solved (e.g. convex)
      game_representation: the game representation (e.g. tabular, skill game)
      coalition_size_min: the minimum size of a coalition (soft constraint)
      coalition_size_max: the maximum size of a coalition (capacity constraint)
      task_cost: the cost (in terms of agent effort) of doing a task

    Returns:
      coalition_structure: sets of sets of agents (coalitions)
      payoffs: list of individual agent payoffs
    """

    del characteristic_function

    logging.info("Running Branch and Bound.")

    # Create a CoalitionalGame instance -- appropriate for game_representation
    game = gameplay_utils.create_game_instance(
        game_id=game_id,
        game_class=game_class,
        game_representation=game_representation,
        game_name=game_name,
        coalition_size_min=coalition_size_min,
        coalition_size_max=coalition_size_max,
        task_cost=task_cost,
    )

    results = None
    for _ in range(self._num_iterations):
      (
          max_value,
          max_coal_structure,
          nodes_explored,
          pruned_nodes,
      ) = self.run_optimal_cs_solver(game)

      # Store the max-value coalition structure found on each iteration
      normalized_structure = self.normalize_structure(max_coal_structure)
      self.coalition_counts[str(normalized_structure)] += 1

      # Track the maximum cs value found so far
      if max_value > self.max_value_so_far:
        self.max_value_so_far = max_value
      self.max_values_per_iteration.append(self.max_value_so_far)

      # Track the number of nodes explored and pruned nodes on each iteration
      self.nodes_explored_per_iteration.append(nodes_explored)
      self.nodes_pruned_per_iteration.append(pruned_nodes)

      # Send to XManager
      results = {
          solver.EvaluationKeys.ITERATION.value: _,
          solver.EvaluationKeys.SEED.value: self._seed,
          solver.EvaluationKeys.MAX_WELFARE.value: max_value,
          solver.EvaluationKeys.MAX_COALITION_STRUCTURE.value: (
              max_coal_structure
          ),
          solver.EvaluationKeys.NODES_EXPLORED.value: nodes_explored,
          solver.EvaluationKeys.PRUNED_NODES.value: pruned_nodes,
          solver.EvaluationKeys.COALITION_COUNTS_DICT.value: (
              self.coalition_counts
          ),
      }

      self._reporting.report(results)

    # Returns the standard solution components (CS, payoffs) if they exist
    if game and results:
      payoffs = [results["max_value"] for _ in range(game.num_agents)]
      return (results["max_coal_structure"], payoffs)

    print(f"\n\n Max coal structure: {results['max_coal_structure']}\n\n")
    return None

  def normalize_structure(
      self,
      structure: Sequence[Sequence[int]]
  ) -> List[List[int]]:
    """Normalize a coalition structure by sorting elements within each coalition.

    and then sorting the coalitions within the structure.
    Args:
      structure: The coalition structure to be normalized.

    Returns:
      Tuple[Tuple[int]]: The normalized structure.
    """

    return list(sorted(list(sorted(coalition)) for coalition in structure))

  def deterministic_partition_from_structure(
      self, coalition_structure: List[Tuple[int, ...]]
  ) -> List[Tuple[int, ...]]:
    """Deterministically splits the largest coalition in the structure.

    Args:
      coalition_structure: (List[Tuple[int, ...]]), The current coalition
        structure.

    Returns:
      a new coalition_structure: (List[Tuple[int, ...]]), The new cs
        with the largest coalition randomly selected and partitioned into two
        sub-coalitions.
    """

    # Select the largest coalition to split
    splittable_coalitions = [
        coalition for coalition in coalition_structure if len(coalition) > 1
    ]

    # No coalitions to split, return original structure
    if not splittable_coalitions:
      return coalition_structure

    # Select the largest coalition to split deterministically
    selected_coalition = max(splittable_coalitions, key=len)

    # Split the selected coalition into two halves
    split_point = len(selected_coalition) // 2
    sub_coalition = selected_coalition[:split_point]
    remaining_coalition = selected_coalition[split_point:]

    # Form the new structure by replacing the selected
    # coalition with the two sub-coalitions
    new_structure = [
        coalition if coalition != selected_coalition else sub_coalition
        for coalition in coalition_structure
    ]
    new_structure.append(remaining_coalition)

    return new_structure

  def members_to_binary_coal(
      self,
      members: tuple[int, ...],
      game: coalitional_game.CoalitionalGame
  ) -> np.ndarray:
    """Converts a tuple of agent indices into a binary coalition vector.

    Args:
      members: (tuple[int, ...]), Indices of agents in the coalition.
      game: (game.CoalitionalGame), The game instance with total
        agents.
    Returns:
      np.ndarray: Binary array where positions of members are 1, others are 0.
    """
    # Binary array for coalition
    coal = np.zeros(game.num_agents)
    # Adjust for 1-based indexing (agents start at 1)
    zero_based_members = tuple(member - 1 for member in members)
    # Sets a 1 where agents are in the coalition
    np.put(coal, zero_based_members, np.ones(len(members)))
    return coal

  def random_partition_from_structure(
      self, coalition_structure: List[Tuple[int, ...]]
  ) -> List[Tuple[int, ...]]:
    """Randomly selects a coalition and splits it into two distinct sub-coals.

    Args:
      coalition_structure: (List[Tuple[int, ...]]), The current coal struct.

    Returns:
      List[Tuple[int, ...]]: The new coalition structure with one coalition
        randomly selected and partitioned into two sorted sub-coalitions.
    """
    # Select coalitions that can be split
    splittable_coalitions = [
        coalition for coalition in coalition_structure if len(coalition) > 1
    ]

    if not splittable_coalitions:
      return coalition_structure

    # Choose a random coalition to split
    selected_coalition = random.choice(splittable_coalitions)

    # Randomly partition the coalition into two distinct parts
    partition_size = random.randint(1, len(selected_coalition) - 1)
    sub_coalition = tuple(
        sorted(random.sample(selected_coalition, partition_size))
        )
    remaining_coalition = tuple(
        sorted(set(selected_coalition) - set(sub_coalition))
        )

    # Form the new coalition structure
    new_structure = [
        coalition if coalition != selected_coalition else sub_coalition
        for coalition in coalition_structure
    ]
    new_structure.append(remaining_coalition)

    # Sort each coalition and the whole structure for consistency
    return sorted(new_structure, key=lambda x: (len(x), x))

  def calculate_upper_bound(
      self,
      coalition: Union[Tuple[int, ...], List[int]],
      game: coalitional_game.CoalitionalGame
  ) -> float:
    """Calculate the upper bound for the maximum value of a given coalition.

    Args:
      coalition (Tuple[int, ...]): Tuple representing a coal of agents
      game: game.CoalitionalGame instance
    Returns:
      float: The upper bound for the max possible value of the coal structure.
    """
    # Add the value of all possible sub-coalitions to calculate the upper bound
    upper_bound = 0
    for i in range(1, len(coalition) + 1):
      for sub_coalition in itertools.combinations(coalition, i):
        sub_value = game.characteristic_function(
            self.members_to_binary_coal(tuple(sub_coalition), game)
        )
        upper_bound += sub_value
    return upper_bound



  ##############################################################################
  # Optimizer with Grand Coalition Initialization and Optional Rnd Branching
  ##############################################################################

  def run_optimal_cs_solver(
      self, game: coalitional_game.CoalitionalGame
  ) -> Tuple[float, Sequence[Sequence[int]], int, int]:
    """Branch and Bound algorithm for finding the optimal coalition structure.

    with an adjusted pruning strategy.
    Args:
      game: game.CoalitionalGame instance

    Returns:
      Tuple:
          - float: The best value (maximum sum of coalition values)
          - List[Tuple[int, ...]]: The best coalition structure.
          - int: The total number of nodes explored.
          - int: The total number of pruned nodes.
          (sub-optimal structures removed from consideration).
    """

    # Metrics for analysis
    max_value = -np.inf  # Initialize best value
    max_coal_structure = []  # Placeholder for best coalition structure
    pruned_nodes = 0  # Count the number of pruned nodes
    nodes_explored = 0  # Count the total number of explored nodes

    # Always start from the grand coalition
    initial_agents = list(range(1, game.num_agents + 1))
    initial_structure = [tuple(initial_agents)]

    # Parent class expects a binary np.ndarray or a Sequeence of agent IDs
    initial_value = game.characteristic_function(tuple(initial_agents))

    # Stack stores (structure, value) pairs for exploration
    stack = [(initial_structure, initial_value)]

    logging.info(
        "Coalition to the stack: %s, value: %s", initial_structure,
        initial_value
    )

    # Branch: Generate partitions of coalition structures
    while stack:
      current_structure, current_value = stack.pop()
      current_value = float(current_value)
      nodes_explored += 1

      # Check if current value is better than the best value found so far
      if current_value > max_value:
        max_value = current_value  # Update best value
        max_coal_structure = current_structure  # Update best structure

      # Choose partitioning strategy based on random_branching flag
      if self.random_branching:
        new_structure = self.random_partition_from_structure(current_structure)
      else:
        new_structure = self.deterministic_partition_from_structure(
            current_structure
        )

      # Calculate the value of the new coalition structure
      cs_value = sum([
          game.characteristic_function(coalition)
          for coalition in new_structure
      ])
      # Bounding: Upper bound on largest coalition -
      # to explore whether further branching is promising
      calculated_u_bound = self.calculate_upper_bound(
          max(new_structure, key=len), game
      )
      upper_bound = cs_value + calculated_u_bound

      # Pruning: Don't prune too aggressively if coalitions are large
      large_coalition_tolerance = 0.1 * len(max(new_structure, key=len))

      # If the upper bound is promising, add the new structure to the stack
      if upper_bound > max_value - large_coalition_tolerance:
        stack.append((new_structure, cs_value))

        logging.info(
            "Coalition to the stack: %s, value: %s", new_structure, cs_value
        )

      else:
        pruned_nodes += 1

    # Sort the structure by length for readability and convert to list of lists
    sorted_structure = sorted(
        [list(sorted(tup)) for tup in max_coal_structure],  # 1. What to sort
        key=lambda x: (
            len(x), x,
            ),  # 2. How to sort (by length, then alphabetically)
    )

    return (
        max_value,     # Best value found
        sorted_structure,  # Best coalition structure as list of lists
        nodes_explored,  # Return the total number of explored nodes
        pruned_nodes,  # Return the total number of pruned nodes
    )


NameError: name 'solver' is not defined

In [2]:
# @title Characteristic Functions
class CharacteristicFunctions:

  def __init__(self):
    return None


  def convex_game(self):
      agents = [1, 2, 3, 4]
      values = {
          (1,): 0,
          (2,): 0,
          (3,): 0,
          (4,): 0,
          (1, 2): 10,
          (1, 3): 15,
          (1, 4): 0,
          (2, 3): 0,
          (2, 4): 0,
          (3, 4): 2,
          (1, 2, 3): 25,
          (1, 2, 4): 10,
          (1, 3, 4): 17,
          (2, 3, 4): 5,
          (1, 2, 3, 4): 30
      }
      return agents, values

  def superadditive_game(self):
      agents = [1, 2, 3, 4]
      values = {
          (1,): 0,
          (2,): 0,
          (3,): 0,
          (4,): 0,
          (1, 2): 25,
          (1, 3): 15,
          (1, 4): 0,
          (2, 3): 0,
          (2, 4): 0,
          (3, 4): 0,
          (1, 2, 3): 25,
          (1, 2, 4): 30,
          (1, 3, 4): 17,
          (2, 3, 4): 5,
          (1, 2, 3, 4): 30
      }
      return agents, values

  def monotone_game(self):
      agents = [1, 2, 3, 4, 5]
      values = {
          (1,): 2,
          (2,): 2,
          (3,): 0,
          (4,): 0,
          (5,): 0,
          (1, 2): 2,
          (1, 3): 2,
          (1, 4): 2,
          (1, 5): 2,
          (2, 3): 2,
          (2, 4): 2,
          (2, 5): 2,
          (3, 4): 0,
          (3, 5): 0,
          (4, 5): 0,
          (1, 2, 3): 2,
          (1, 2, 4): 2,
          (1, 2, 5): 2,
          (1, 3, 4): 2,
          (1, 3, 5): 2,
          (1, 4, 5): 2,
          (2, 3, 4): 2,
          (2, 3, 5): 2,
          (2, 4, 5): 2,
          (3, 4, 5): 0,
          (1, 2, 3, 4): 2,
          (1, 2, 3, 5): 2,
          (1, 2, 4, 5): 2,
          (1, 3, 4, 5): 2,
          (2, 3, 4, 5): 2,
          (1, 2, 3, 4, 5): 14
      }
      return agents, values

  def monotone_game2(self):
      """Class for a monotone characteristic function where a cs is optimal."""
      agents = [1, 2, 3, 4, 5]
      values = {
        (1,): 0,
        (2,): 5,
        (3,): 5,
        (4,): 0,
        (5,): 5,
        (1, 2): 21,
        (1, 3): 19,
        (1, 4): 1,
        (1, 5): 19,
        (2, 3): 4,
        (2, 4): 14,
        (2, 5): 4,
        (3, 4): 14,
        (3, 5): 4,
        (4, 5): 14,
        (1, 2, 3): 20,
        (1, 2, 4): 32,
        (1, 2, 5): 20,
        (1, 3, 4): 30,
        (1, 3, 5): 18,
        (1, 4, 5): 30,
        (2, 3, 4): 13,
        (2, 3, 5): 3,
        (2, 4, 5): 13,
        (3, 4, 5): 13,
        (1, 2, 3, 4): 31,
        (1, 2, 3, 5): 17,
        (1, 2, 4, 5): 31,
        (1, 3, 4, 5): 29,
        (2, 3, 4, 5): 12,
        (1, 2, 3, 4, 5): 30,
      }
      return agents, values

  def non_monotone_game(self):
      agents = [1, 2, 3, 4, 5]
      values = {
          (1,): 0,
          (2,): 0,
          (3,): 0,
          (4,): 0,
          (5,): 0,
          (1, 2): 2,
          (1, 3): 0,
          (1, 4): 0,
          (1, 5): 0,
          (2, 3): 2,
          (2, 4): 0,
          (2, 5): 0,
          (3, 4): 0,
          (3, 5): 0,
          (4, 5): 2,
          (1, 2, 3): 4,
          (1, 2, 4): 2,
          (1, 2, 5): 2,
          (1, 3, 4): 0,
          (1, 3, 5): 0,
          (1, 4, 5): 2,
          (2, 3, 4): 2,
          (2, 3, 5): 2,
          (2, 4, 5): 2,
          (3, 4, 5): 5,
          (1, 2, 3, 4): 4,
          (1, 2, 3, 5): 4,
          (1, 2, 4, 5): 4,
          (1, 3, 4, 5): 5,
          (2, 3, 4, 5): 7,
          (1, 2, 3, 4, 5): 14
      }
      return agents, values

In [3]:
import collections
import random
import numpy as np
import itertools
from typing import Any, Optional, Sequence, List, Tuple, Union, Dict
import matplotlib.pyplot as plt
from collections import defaultdict

# Base CoalitionalGame class
class CoalitionalGame:
    """Base class for coalitional games."""

    def __init__(self, num_agents: int, agents: Optional[List[int]] = None, values: Optional[Dict[Tuple[int], float]] = None):
        self.num_agents = num_agents
        self.agents = agents if agents is not None else list(range(1, num_agents + 1))
        self.values = values if values is not None else {}

    def characteristic_function(self, coalition: Tuple[int, ...]) -> float:
        sorted_coalition = tuple(sorted(coalition))
        return self.values.get(sorted_coalition, 0.0)

#########################################################
# Specific game classes
#######################################################

# ConvexGame1: Class for a convex characteristic function
class ConvexGame1(CoalitionalGame):
    def __init__(self, **kwargs):
        del kwargs
        values = {
            (1,): 0, (2,): 0, (3,): 0, (4,): 0,
            (1, 2): 10, (1, 3): 15, (1, 4): 0,
            (2, 3): 0, (2, 4): 0, (3, 4): 2,
            (1, 2, 3): 25, (1, 2, 4): 10, (1, 3, 4): 17,
            (2, 3, 4): 5, (1, 2, 3, 4): 30
        }
        super().__init__(num_agents=4, values=values)

# SuperadditiveGame1: Class for a superadditive characteristic function
class SuperadditiveGame1(CoalitionalGame):
    def __init__(self, **kwargs):
        del kwargs
        values = {
            (1,): 0, (2,): 0, (3,): 0, (4,): 0,
            (1, 2): 25, (1, 3): 15, (1, 4): 0,
            (2, 3): 0, (2, 4): 0, (3, 4): 0,
            (1, 2, 3): 25, (1, 2, 4): 30, (1, 3, 4): 17,
            (2, 3, 4): 5, (1, 2, 3, 4): 30
        }
        super().__init__(num_agents=4, values=values)

# MonotoneGame1: Class for a monotone characteristic function
class MonotoneGame1(CoalitionalGame):
    def __init__(self, **kwargs):
        del kwargs
        values = {
            (1,): 2, (2,): 2, (3,): 0, (4,): 0, (5,): 0,
            (1, 2): 2, (1, 3): 2, (1, 4): 2, (1, 5): 2,
            (2, 3): 2, (2, 4): 2, (2, 5): 2,
            (3, 4): 0, (3, 5): 0, (4, 5): 0,
            (1, 2, 3): 2, (1, 2, 4): 2, (1, 2, 5): 2,
            (1, 3, 4): 2, (1, 3, 5): 2, (1, 4, 5): 2,
            (2, 3, 4): 2, (2, 3, 5): 2, (2, 4, 5): 2,
            (3, 4, 5): 0, (1, 2, 3, 4): 2, (1, 2, 3, 5): 2,
            (1, 2, 4, 5): 2, (1, 3, 4, 5): 2, (2, 3, 4, 5): 2,
            (1, 2, 3, 4, 5): 14
        }
        super().__init__(num_agents=5, values=values)

# MontoneGame2: Characteristic function where the optimal is the CS

class MonotoneGame2(CoalitionalGame):
    def __init__(self, **kwargs):
        del kwargs
        values = {
        (1,): 0,
        (2,): 5,
        (3,): 5,
        (4,): 0,
        (5,): 5,
        (1, 2): 21,
        (1, 3): 19,
        (1, 4): 1,
        (1, 5): 19,
        (2, 3): 4,
        (2, 4): 14,
        (2, 5): 4,
        (3, 4): 14,
        (3, 5): 4,
        (4, 5): 14,
        (1, 2, 3): 20,
        (1, 2, 4): 32,
        (1, 2, 5): 20,
        (1, 3, 4): 30,
        (1, 3, 5): 18,
        (1, 4, 5): 30,
        (2, 3, 4): 13,
        (2, 3, 5): 3,
        (2, 4, 5): 13,
        (3, 4, 5): 13,
        (1, 2, 3, 4): 31,
        (1, 2, 3, 5): 17,
        (1, 2, 4, 5): 31,
        (1, 3, 4, 5): 29,
        (2, 3, 4, 5): 12,
        (1, 2, 3, 4, 5): 30,
      }
        super().__init__(num_agents=5, values=values)

# NonMonotoneGame1: Class for a non-monotone characteristic function
class NonMonotoneGame1(CoalitionalGame):
    def __init__(self, **kwargs):
        del kwargs
        values = {
            (1,): 2, (2,): 2, (3,): 0, (4,): 0, (5,): 0,
            (1, 2): 2, (1, 3): 2, (1, 4): 2, (1, 5): 2,
            (2, 3): 2, (2, 4): 2, (2, 5): 2,
            (3, 4): 0, (3, 5): 0, (4, 5): 0,
            (1, 2, 3): 2, (1, 2, 4): 2, (1, 2, 5): 2,
            (1, 3, 4): 2, (1, 3, 5): 2, (1, 4, 5): 2,
            (2, 3, 4): 2, (2, 3, 5): 2, (2, 4, 5): 2,
            (3, 4, 5): 0, (1, 2, 3, 4): 2, (1, 2, 3, 5): 2,
            (1, 2, 4, 5): 2, (1, 3, 4, 5): 2, (2, 3, 4, 5): 2,
            (1, 2, 3, 4, 5): 14
        }
        super().__init__(num_agents=5, values=values)




In [None]:
# @title Merge and Bound

"""A Merge-and-bound solver for coalitional structure generation.

The parameter alpha controls the merge vs split ratio. alpha=1 always merges,
alpha=0 always splits. initial_structure: "singletons" or "random"
"""
# pylint: disable=bad-indentation
import collections
from collections.abc import Sequence
import random
import sys
from typing import Any, List, Optional, Tuple

from absl import logging  # pylint:disable=unused-import
import numpy as np



class MergeAndBound(solver.CoalitionalGameSolver):
  """Branch and bound method for coalition structure optimization.

  alpha=1 always merges, alpha=0 always splits.
  :init_mode: "singletons" or "random".
  """

  def __init__(
      self,
      num_iterations: int = 1,
      seed: int | None = None,
      reporting: Any = None,
      alpha: float = 1.0,  # 1.0 always merges, 0.0 always splits
      init_mode: str = "singletons",
      random_merging: bool = True,  # Flag for randomness control
  ):

    super().__init__(num_iterations, seed, reporting)

    self.alpha = alpha
    self.init_mode = init_mode
    self.random_merging = random_merging
    self.coalition_counts = collections.defaultdict(int)  # Optimal cs per iter
    self.max_values_per_iteration = []  # Track max values for the plot
    self.iteration_history = []
    self.max_value_so_far = -np.inf  # Track of the maximum value so far
    self.nodes_explored_per_iteration = []  # Track the number of nodes explored
    self.nodes_pruned_per_iteration = []  # Track the number of nodes pruned
    self.visited_structures = set()  # Track of visited coalition structures

  def solve(
      self,
      characteristic_function: Any = None,
      game: Optional[coalitional_game.CoalitionalGame] = None,
      game_id: Optional[int] = None,
      game_name: Optional[str] = None,
      game_class: Optional[solver.GameClass] = None,
      game_representation: coalitional_game.Representation = coalitional_game.Representation.TABULAR_GAME,
      coalition_size_min: int = 0,
      coalition_size_max: int = sys.maxsize,
      task_cost: float = 0.0,
  ) -> Optional[
      tuple[
          Optional[Sequence[Sequence[int]]],
          Optional[Sequence[float] | np.ndarray],
      ]
  ]:
    """Run multiple iterations of coalition structure search.

    Args:
      characteristic_function: looks up or computes the value of any coalition
      game: an instance of a CoalitionalGame to be solved
      game_id: ID for a specific game instance
      game_name: name of the game to be solved, to be instantiated in function
      game_class: the complexity class of the game to be solved (e.g. convex)
      game_representation: the game representation (e.g. tabular, skill game)
      coalition_size_min: the minimum size of a coalition (soft constraint)
      coalition_size_max: the maximum size of a coalition (capacity constraint)
      task_cost: the cost (in terms of agent effort) of doing a task

    Returns:
      coalition_structure: sets of sets of agents (coalitions)
      payoffs: list of individual agent payoffs
    """

    del characteristic_function

    logging.info("Running Merge and Bound.")

    # Create a CoalitionalGame instance -- appropriate for game_representation
    game = gameplay_utils.create_game_instance(
        game_id=game_id,
        game_class=game_class,
        game_representation=game_representation,
        game_name=game_name,
        coalition_size_min=coalition_size_min,
        coalition_size_max=coalition_size_max,
        task_cost=task_cost,
    )

    results = None
    for _ in range(self._num_iterations):
      (
          max_value,
          max_coal_structure,
          nodes_explored,
          pruned_nodes,
      ) = self.run_optimal_cs_solver(game)

      # Store the maximum coalition structure found so far
      normalized_structure = self.normalize_structure(
          max_coal_structure
      )
      self.coalition_counts[str(normalized_structure)] += 1

      # Track the maximum value found so far and store it for plotting
      if max_value > self.max_value_so_far:
        self.max_value_so_far = max_value
      self.max_values_per_iteration.append(self.max_value_so_far)

      # Track the number of nodes explored and pruned nodes
      self.nodes_explored_per_iteration.append(nodes_explored)
      self.nodes_pruned_per_iteration.append(pruned_nodes)

      # Send to XManager
      results = {
          solver.EvaluationKeys.ITERATION.value: _,
          solver.EvaluationKeys.SEED.value: self._seed,
          solver.EvaluationKeys.MAX_WELFARE.value: max_value,
          solver.EvaluationKeys.MAX_COALITION_STRUCTURE.value: (
              max_coal_structure
          ),
          solver.EvaluationKeys.NODES_EXPLORED.value: nodes_explored,
          solver.EvaluationKeys.PRUNED_NODES.value: pruned_nodes,
          solver.EvaluationKeys.COALITION_COUNTS_DICT.value: (
              self.coalition_counts
          ),
      }

      self._reporting.report(results)

    # Returns the standard solution components (CS, payoffs) if they exist
    if game and results:
      payoffs = [results["max_value"] for _ in range(game.num_agents)]
      return (results["max_coal_structure"], payoffs)

    print(f"\n\n Max coal structure: {results['max_coal_structure']}\n\n")
    return None

  def normalize_structure_old(
      self, structure: Sequence[Sequence[int]]
  ) -> List[List[int]]:
    """Normalize a coalition structure by sorting elements within each coalition.

    and then sorting the coalitions within the structure.
    Args:
      structure: The coalition structure to be normalized.

    Returns:
      Sequence[Sequence[int]]: The normalized structure.
    """

    return list(sorted(list(sorted(coalition)) for coalition in structure))

  def normalize_structure(
      self, structure: Sequence[Sequence[int]]
  ) -> Tuple[Tuple[int, ...], ...]:
    """Normalize a coalition structure by sorting elements within each coalition.

    and then sorting the coalitions within the structure.
    Args:
      structure: The coalition structure to be normalized.

    Returns:
      Tuple[Tuple[int, ...], ...]: The normalized structure
      as a tuple of tuples.
    """
    return tuple(tuple(sorted(coalition)) for coalition in sorted(structure))

  def members_to_binary_coal(
      self,
      members: tuple[int, ...],
      game: coalitional_game.CoalitionalGame,
  ) -> np.ndarray:
    """Converts a tuple of agent indices into a binary coalition vector.

    Args:
      members: (tuple[int, ...]), Indices of agents in the coalition.
      game: (game.CoalitionalGame), The game instance with total
        agents.
    Returns:
      np.ndarray: Binary array where positions of members are 1, others are 0.
    """
    # Binary array for coalition
    coal = np.zeros(game.num_agents)
    # Adjust for 1-based indexing (agents start at 1)
    zero_based_members = tuple(member - 1 for member in members)
    # Sets a 1 where agents are in the coalition
    np.put(coal, zero_based_members, np.ones(len(members)))
    return coal

  def generate_initial_structure(
      self, num_agents: int
  ) -> Sequence[Sequence[int]]:
    """Generates the initial coalition structure based on the mode."""
    if self.init_mode == "singletons":
      # Start with singleton coalitions (each agent in its own coalition)
      return [(agent,) for agent in range(1, num_agents + 1)]
    else:
      # Start with a randomly generated partition of agents
      return [(agent,) for agent in range(1, num_agents + 1)]   #  TEST
      #  return self.generate_rnd_initial_structure(num_agents)

  def generate_rnd_initial_structure(
      self, num_agents: int
  ) -> Sequence[Sequence[int]]:
    """Generates a random partition of agents into coalitions."""

    if num_agents <= 0:
      # Return empty structure if no agents
      return []

    agents = list(range(1, num_agents + 1))
    random.shuffle(agents)
    partitions = []

    while agents:
      size = random.randint(1, len(agents))
      partitions.append(tuple(agents[:size]))
      agents = agents[size:]

    # Fallback to singletons in case partitions is somehow empty or None
    if not partitions:
      partitions = [(i,) for i in range(1, num_agents + 1)]

    return partitions

  def merge_random_coalitions(
      self, coalition_structure: Sequence[Sequence[int]], num_agents: int
  ) -> List[List[Sequence[int]]]:
    """Randomly selects two coalitions and merges them into one."""
    if len(coalition_structure) < 2:
      # Return wrapped in a list for consistency
      return [list(coalition_structure)]
    selected_coalitions = random.sample(coalition_structure, 2)
    merged_coalition = tuple(
        set(selected_coalitions[0]) | set(selected_coalitions[1])
    )
    new_structure = [
        coalition for coalition in coalition_structure
        if coalition not in selected_coalitions
    ]
    new_structure.append(merged_coalition)

    # Ensure the new structure is valid
    # Wrap the structure in a list for consistency
    if self.is_valid_partition(new_structure, num_agents):
      return [list(new_structure)]
    return [list(coalition_structure)]

  def merge_all_coalitions(
      self, coalition_structure: Sequence[Sequence[int]], num_agents: int
  ) -> List[List[Sequence[int]]]:
    """Systematically merges all pairs of coalitions from the current structure.

    Args:
        coalition_structure: (List[Tuple[int, ...]]) The current coalition
          structure.
        num_agents: Total number of agents.

    Returns:
        List[List[Tuple[int, ...]]]: A list of new coalition structures formed
        by merging all possible pairs.
    """
    merged_structures = []
    for i in range(len(coalition_structure)):
      for j in range(i + 1, len(coalition_structure)):
        # Merge coalition i and coalition j
        merged_coalition = tuple(
            sorted(set(coalition_structure[i]) | set(coalition_structure[j]))
        )

        # Form the new structure by replacing
        # the two selected coalitions with the merged one
        new_structure = [
            coal
            for k, coal in enumerate(coalition_structure)
            if k != i and k != j
        ]
        new_structure.append(merged_coalition)

        # Ensure the new structure is valid (no duplicate agents, etc.)
        if self.is_valid_partition(new_structure, num_agents):
          merged_structures.append(new_structure)

    return merged_structures

  def random_partition_from_structure(
      self, coalition_structure: Sequence[Sequence[int]], num_agents: int
  ) -> Sequence[Sequence[int]]:
    """Randomly selects a coalition and splits it."""
    splittable_coalitions = [
        coalition for coalition in coalition_structure if len(coalition) > 1
    ]
    if not splittable_coalitions:
      return coalition_structure
    selected_coalition = random.choice(splittable_coalitions)
    partition_size = random.randint(1, len(selected_coalition) - 1)
    sub_coalition = tuple(random.sample(selected_coalition, partition_size))
    remaining_coalition = tuple(set(selected_coalition) - set(sub_coalition))
    new_structure = [
        coalition if coalition != selected_coalition else sub_coalition
        for coalition in coalition_structure
    ]
    new_structure.append(remaining_coalition)
    return (
        new_structure
        if self.is_valid_partition(new_structure, num_agents)
        else coalition_structure
    )

  def is_valid_partition(
      self, coalition_structure: List[Tuple[int, ...]], num_agents: int
  ) -> bool:
    """Check if the coalition structure is a valid partition with no repeated or missing agents."""
    all_agents = []
    for coalition in coalition_structure:
      for agent in coalition:
        all_agents.append(agent)
    return sorted(all_agents) == list(range(1, num_agents + 1))

  def calculate_upper_bound(
      self,
      coalition_structure: Sequence[Sequence[int]],
      game: coalitional_game.CoalitionalGame,
  ) -> float:
    """Calculate an upper bound for the maximum value of the current cs.

    by considering immediate pairwise merges and their maximum potential gain.

    Args:
        coalition_structure (Sequence[Sequence[int]]): The current coalition
          structure (list of coalitions).
        game (coalitional_game.CoalitionalGame): An instance of the coalitional
          game.

    Returns:
        float: The upper bound for the maximum value of the coalition structure.
    """
    # Current value of the coalition structure
    cs_value = sum([
        game.characteristic_function(coalition)
        if isinstance(coalition, tuple)
        else game.characteristic_function(tuple(coalition))
        for coalition in coalition_structure
    ])

    # Estimate potential value from pairwise merges
    potential_merge_value = 0
    num_coalitions = len(coalition_structure)

    # Evaluate every pair of coalitions for merging
    for i in range(num_coalitions):
      for j in range(i + 1, num_coalitions):
        # Merge coalition i and coalition j
        merged_coalition = tuple(
            set(coalition_structure[i]) | set(coalition_structure[j])
        )
        merge_value = game.characteristic_function(merged_coalition)

        # Compare the merged value with the sum of individual values
        individual_value = game.characteristic_function(
            tuple(coalition_structure[i])
        ) + game.characteristic_function(tuple(coalition_structure[j]))

        # Add the maximum possible gain from this merge
        potential_merge_value += max(merge_value - individual_value, 0)

    # The upper bound is the current value plus the potential
    # value from pairwise merges
    upper_bound = cs_value + potential_merge_value

    return upper_bound

  def run_optimal_cs_solver(
      self, game: coalitional_game.CoalitionalGame
  ) -> Tuple[float, Sequence[Sequence[int]], int, int]:
    """Branch and Merge algorithm for finding the optimal coalition structure.

    Uses a combination of merging and splitting strategies to explore possible
    coalition structures. The algorithm includes an upper-bound pruning strategy
    to optimize the search.

    Args:
      game: CoalitionalGame instance representing the problem space, containing
        the characteristic function of coalitions and the number of agents.

    Returns:
      Tuple:
          - float: The maximum value achieved by any coalition structure.
          - Sequence[Sequence[int]]: The optimal coalition structure that yields
            the maximum value.
          - int: The total number of nodes (coalition structures) explored
            during the search.
          - int: The total number of nodes pruned due to the upper-bound
    """
    max_value: float = -np.inf
    max_coal_structure: Sequence[Sequence[int]] = []
    pruned_nodes: int = 0
    nodes_explored: int = 0

    # Initial structure based on the mode
    initial_structure: Sequence[Sequence[int]] = (
        self.generate_initial_structure(game.num_agents)
    )
    initial_value: float = sum([
        game.characteristic_function(coalition)
        if isinstance(coalition, tuple)
        else game.characteristic_function(tuple(coalition))
        for coalition in initial_structure
    ])

    # Initialize stack with the initial structure and its value
    stack = [(initial_structure, initial_value)]
    # Track the initial structure to avoid revisiting it
    self.visited_structures.add(self.normalize_structure(initial_structure))

    while stack:
      current_structure, current_value = stack.pop()
      nodes_explored += 1

      # Update the maximum coalition structure found so far
      if current_value > max_value:
        max_value = current_value
        max_coal_structure = current_structure

      # Check if the current structure is already the grand coalition
      if len(current_structure) == 1:
        print(
            f"Reached the grand coalition: {current_structure}, terminating"
            " search."
        )
        break

      # Decide to merge or split based on alpha value
      if random.random() < self.alpha:
        # Merging logic: choose between random merging or systematic merging
        if self.random_merging:
          # Perform random merging
          new_structure = self.merge_random_coalitions(
              current_structure, game.num_agents
          )
          all_merged_structures = [new_structure] if new_structure else []
        else:
          # Perform systematic merging of all pairs
          all_merged_structures = self.merge_all_coalitions(
              current_structure, game.num_agents
          )

        # Evaluate each new merged structure
        for new_structure in all_merged_structures:
          normalized_new_structure = self.normalize_structure(new_structure)

          # Skip structures that have already been visited
          if normalized_new_structure in self.visited_structures:
            continue

          # Add the new structure to the set of visited structures
          self.visited_structures.add(normalized_new_structure)
          # Calculate the value of the new structure
          cs_value = sum([
              game.characteristic_function(coalition)
              if isinstance(coalition, tuple)
              else game.characteristic_function(tuple(coalition))
              for coalition in new_structure
          ])
          # Calculate the upper bound for the new structure
          upper_bound = self.calculate_upper_bound(new_structure, game)

          # Pruning condition: add the structure to the stack
          # only if its upper bound exceeds the current max
          if upper_bound > max_value:
            stack.append((new_structure, cs_value))
          else:
            pruned_nodes += 1
      else:
        # Splitting logic: create a random partition from
        # the current structure
        new_structure = self.random_partition_from_structure(
            current_structure, game.num_agents
        )
        normalized_new_structure = self.normalize_structure(new_structure)

        # Skip structures that have already been visited
        if normalized_new_structure in self.visited_structures:
          continue

        # Add the new structure to the set of visited structures
        self.visited_structures.add(normalized_new_structure)
        # Calculate the value of the new structure
        cs_value = sum([
            game.characteristic_function(coalition)
            if isinstance(coalition, tuple)
            else game.characteristic_function(tuple(coalition))
            for coalition in new_structure
        ])
        # Calculate the upper bound for the new structure
        upper_bound = self.calculate_upper_bound(new_structure, game)

        # Pruning condition: add the structure to the stack
        # only if its upper bound exceeds the current max
        if upper_bound > max_value:
          stack.append((new_structure, cs_value))
        else:
          pruned_nodes += 1

    # Normalize and sort the final best structure found
    sorted_structure: Sequence[Sequence[int]] = sorted(
        [sorted(tup) for tup in max_coal_structure]
    )

    return max_value, sorted_structure, nodes_explored, pruned_nodes


In [None]:
# @title Genetic Algo
"""A Genetic Algorithm solver for coalitional structure generation.
"""

import collections
import itertools
import random
import sys
from typing import Any, Dict, List, Optional, Sequence, Tuple

from absl import logging  # pylint:disable=unused-import
import numpy as np
import sympy



class GeneticAlgorithm(solver.CoalitionalGameSolver):
  """A solver using a genetic algorithm for cs optimization."""

  def __init__(
      self,
      num_iterations: int = 100,
      seed: int | None = None,
      reporting: Any = None,
      population_size: int = 50,
      mutation_rate: float = 0.1,
      crossover_rate: float = 0.8,
  ):
    super().__init__(num_iterations, seed, reporting)

    self.population_size = population_size
    self.mutation_rate = mutation_rate
    self.crossover_rate = crossover_rate
    self.best_structure_counts: Dict[str, int] = collections.defaultdict(int)
    # Track metrics for each generation
    self.max_values_per_generation: List[float] = []
    self.avg_values_per_generation: List[float] = []
    self.population_diversity_per_generation: List[int] = []

  @staticmethod
  def coalition_structure_to_str(
      structure: Tuple[Tuple[int, ...], ...]
  ) -> str:  # **(New helper method)**
    """Convert a coalition structure (tuple of tuples) to a string."""
    return "_".join("-".join(map(str, coalition)) for coalition in structure)

  def evaluate_fitness(
      self, coalition_structure: List[List[int]],
      game: coalitional_game.CoalitionalGame
  ) -> float:
    """Evaluate the fitness (total value) of a coalition structure by summing its coalition values.

    Args:
      coalition_structure: A list of lists representing the coalition structure
        (list of coalitions).
      game: A CoalitionalGame instance used to evaluate the coalition structure.

    Returns:
      float: The total value (fitness) of the coalition structure based on the
      game characteristic function.
    """
    fitness = 0.0
    for coalition in coalition_structure:
      # sort in order to represent (1,2) and (2,1) as the same coalition
      sorted_coalition = tuple(sorted(coalition))
      fitness += game.characteristic_function(tuple(sorted_coalition))
    return fitness  # value of the CS (as the sum of the component coalitions)

  def crossover(
      self, parent1: List[List[int]], parent2: List[List[int]]
  ) -> Tuple[List[List[int]], List[List[int]]]:
    """Perform crossover (mixing) between two parent coalition structures to produce two offspring.

    Args:
      parent1: The first parent coalition structure.
      parent2: The second parent coalition structure.

    Returns:
      Tuple[List[List[int]], List[List[int]]]: Two offspring coalition
      structures.
    """

    # Flatten both parents CS (List of Lists)
    # into single lists of agents for crossover
    flat_parent1 = list(itertools.chain.from_iterable(parent1))
    flat_parent2 = list(itertools.chain.from_iterable(parent2))

    # Ensure all agents are unique
    flat_parent1 = list(set(flat_parent1))
    flat_parent2 = list(set(flat_parent2))

    # Random crossover point
    # (selects a random point in the flattened list
    # where the crossover (mixing) will occur)
    crossover_point = random.randint(1, len(flat_parent1) - 1)

    # Create offspring ensuring no duplicates (what to take from each list)
    offspring1 = flat_parent1[:crossover_point] + [
        agent
        for agent in flat_parent2
        if agent not in flat_parent1[:crossover_point]
    ]
    offspring2 = flat_parent2[:crossover_point] + [
        agent
        for agent in flat_parent1
        if agent not in flat_parent2[:crossover_point]
    ]

    # Recreate coalition structures from offspring single lists of agts
    offspring1 = self.random_split(offspring1)
    offspring2 = self.random_split(offspring2)

    return offspring1, offspring2

  def mutate(self, coalition_structure: List[List[int]]) -> None:
    """Mutate a coalition structure by randomly merging or splitting coalitions.

    Args:
      coalition_structure: The coalition structure to mutate.

    Returns:
      None
    """

    if random.random() < self.mutation_rate:
      if len(coalition_structure) > 1:    # Randomly merge two coalitions
        index1, index2 = random.sample(range(len(coalition_structure)), 2)
        coalition_structure[index1].extend(coalition_structure[index2])
        del coalition_structure[index2]
      else:
        coalition = coalition_structure[0]
        if len(coalition) > 1:
          # Split a single coalition into two
          split_point = random.randint(1, len(coalition) - 1)
          coalition_structure[0] = coalition[:split_point]
          coalition_structure.append(coalition[split_point:])

  def random_split(self, lst: List[int]) -> List[List[int]]:
    """Randomly splits a list of agents into non-empty sublists (coalitions) without duplicates.

    Args:
      lst: A list of agents to be split into coalitions.

    Returns:
      List[List[int]]: A list of sublists (coalitions).
    """
    split_coalitions: List[List[int]] = []
    remaining_agents = list(lst)  # Ensure it's a list for random.sample
    while remaining_agents:
      coalition_size = random.randint(1, len(remaining_agents))
      coalition = random.sample(remaining_agents, coalition_size)
      split_coalitions.append(coalition)
      for agent in coalition:
        remaining_agents.remove(agent)
    return split_coalitions

  def generate_initial_population(
      self, agents: List[int]
  ) -> List[List[List[int]]]:
    """Generate an initial population of coalition structures.

    The population size is the max of 50 and the Bell number B_n for n agents.

    Args:
      agents: A list of agents to be split into coalition structures.

    Returns:
      List[List[List[int]]]: A list of coalition structures
      (each structure is a list of coalitions).
    """

    # Use min of 50 and Bell number
    if self.population_size is None:
      self.population_size = min(50, sympy.bell(len(agents)))

    population: List[List[List[int]]] = []
    for _ in range(self.population_size):
      coalition_structure = self.random_split(agents)
      population.append(coalition_structure)
    return population

  def solve(
      self,
      characteristic_function: Any = None,
      game: Optional[coalitional_game.CoalitionalGame] = None,
      game_id: Optional[int] = None,
      game_name: Optional[str] = None,
      game_class: Optional[solver.GameClass] = None,
      game_representation: coalitional_game.Representation = coalitional_game.Representation.TABULAR_GAME,
      coalition_size_min: int = 0,
      coalition_size_max: int = sys.maxsize,
      task_cost: float = 0.0,
      seed: Optional[int] = None,
  ) -> Optional[
      tuple[
          Optional[Sequence[Sequence[int]]],
          Optional[Sequence[float] | np.ndarray],
      ]
  ]:
    """Run multiple iterations of coalition structure search.

    Args:
      characteristic_function: looks up or computes the value of any coalition
      game: an instance of a CoalitionalGame to be solved
      game_id: ID for a specific game instance
      game_name: name of the game to be solved, to be instantiated in function
      game_class: the complexity class of the game to be solved (e.g. convex)
      game_representation: the game representation (e.g. tabular, skill game)
      coalition_size_min: the minimum size of a coalition (soft constraint)
      coalition_size_max: the maximum size of a coalition (capacity constraint)
      task_cost: the cost (in terms of agent effort) of doing a task
      seed: seed for the genetic algorithm

    Returns:
      Tuple:
        - Tuple[Tuple[int, ...], ...]: The best coalition structure found.
        - float: The best value (fitness) of the best coalition structure.
        - List[float]: The best values found in each generation.
        - List[float]: The average fitness values for each generation.
        - List[int]: The population diversity for each generation.
        - Dict[Tuple[Tuple[int, ...], ...], int]: A dictionary mapping the best
          coalition structure found in each generation to its count.
    """

    del characteristic_function

    logging.info("Running Genetic Algorithm.")

    # Create a CoalitionalGame instance -- appropriate for game_representation
    game = gameplay_utils.create_game_instance(
        game_id=game_id,
        game_class=game_class,
        game_representation=game_representation,
        game_name=game_name,
        coalition_size_min=coalition_size_min,
        coalition_size_max=coalition_size_max,
        task_cost=task_cost,
    )

    if game:
      num_agents = game.num_agents
    else:
      num_agents = 5

    agents = list(range(1, num_agents + 1))
    population = self.generate_initial_population(agents)

    results = None
    for _ in range(self.num_iterations):
      # Evaluate fitness (value) of each coalition structure
      # - this is the value of the CS
      fitness_scores = [
          self.evaluate_fitness(individual, game) for individual in population
      ]
      max_value = max(fitness_scores)
      avg_value = sum(fitness_scores) / len(fitness_scores)
      diversity = len(
          set(
              tuple(sorted(tuple(sorted(c)) for c in individual))
              for individual in population
          )
      )

      # Track metrics for the current generation
      self.max_values_per_generation.append(max_value)
      self.avg_values_per_generation.append(avg_value)
      self.population_diversity_per_generation.append(diversity)

      # Find the best CS
      best_individual = population[fitness_scores.index(max_value)]

      # Track the best structure found in this generation
      normalized_best_structure = tuple(
          sorted(tuple(sorted(c)) for c in best_individual)
      )
      normalized_best_structure_str = self.coalition_structure_to_str(
          normalized_best_structure)
      self.best_structure_counts[normalized_best_structure_str] += 1

      # Select the top performers for breeding using weighted random selection
      selected_individuals = random.choices(
          population, weights=fitness_scores, k=self.population_size
      )

      # Create the next generation through crossover
      next_generation = []

      for i in range(0, len(selected_individuals), 2):
        parent1 = selected_individuals[i]
        parent2 = selected_individuals[(i + 1) % len(selected_individuals)]

        # Apply crossover based on the crossover rate
        if random.random() < self.crossover_rate:
          offspring1, offspring2 = self.crossover(parent1, parent2)
        else:
          # If no crossover, offspring are copies of the parents
          offspring1, offspring2 = parent1, parent2

        next_generation.extend([offspring1, offspring2])

      # Mutate the new generation
      for individual in next_generation:
        self.mutate(individual)

      # Replace the old population with the new generation
      population = next_generation

      # Find the max-value coal structure and update the counts
      max_coal_structure = max(
          self.best_structure_counts, key=self.best_structure_counts.get
      )

      max_value = max(self.max_values_per_generation)
      nodes_explored = sum(self.population_diversity_per_generation)
      pruned_nodes = nodes_explored - len(self.best_structure_counts)
      self.population_sizecoalition_counts = self.best_structure_counts
      self.coalition_counts = self.best_structure_counts

      logging.info(
          "Max coalition structure: %s with value: %s.",
          max_coal_structure,
          max_value,
      )

      # Send to XManager
      results = {
          solver.EvaluationKeys.ITERATION.value: _,
          solver.EvaluationKeys.SEED.value: self._seed,
          solver.EvaluationKeys.MAX_WELFARE.value: max_value,
          solver.EvaluationKeys.MAX_COALITION_STRUCTURE.value: (
              max_coal_structure
          ),
          solver.EvaluationKeys.NODES_EXPLORED.value: nodes_explored,
          solver.EvaluationKeys.PRUNED_NODES.value: pruned_nodes,
          solver.EvaluationKeys.COALITION_COUNTS_DICT.value: (
              self.coalition_counts
          ),
      }

    self._reporting.report(results)

    # Returns the standard solution components (CS, payoffs) if they exist
    if game and results:
      max_coal_structure_str = results["max_coal_structure"]
      max_coal_structure = str_to_coalition_structure(max_coal_structure_str)

      payoffs = [results["max_value"] for _ in range(game.num_agents)]
      return (max_coal_structure, payoffs)

    print(f"\n\n Max coal structure: {results['max_coal_structure']}\n\n")
    return None


def str_to_coalition_structure(
    s: str,
) -> Tuple[Tuple[int, ...], ...]:  # **(New helper function to convert back)**
  """Convert a string back to a coalition structure (tuple of tuples)."""
  return tuple(
      tuple(map(int, coalition.split("-"))) for coalition in s.split("_")
  )


In [None]:
import collections
import random
import numpy as np
import itertools
from typing import Any, Optional, Sequence, List, Tuple, Union, Dict
import matplotlib.pyplot as plt
from collections import defaultdict




# Plotting class to handle all plots
class Plotting:
    """Class to handle plotting different graphs for the Branch and Bound algorithm."""

    @staticmethod
    def plot_coalition_percentages(coalition_percentages: Dict[Tuple[Tuple[int, ...]], float]):
        """
        Plot the percentage of times each coalition structure is selected as a bar chart.

        Args:
            coalition_percentages: Dictionary of coalition structures and their percentages.
        """
        structures = list(coalition_percentages.keys())
        percentages = list(coalition_percentages.values())

        plt.figure(figsize=(10, 6))
        plt.barh(range(len(structures)), percentages, tick_label=[str(s) for s in structures])
        plt.xlabel('Percentage')
        plt.ylabel('Coalition Structures')
        plt.title('Coalition Structure Percentages')
        plt.tight_layout()
        plt.show()

    @staticmethod
    def plot_value_vs_iterations(max_values: List[float], iterations: int):
        """
        Plot the maximum coalition structure value found so far against the iteration number.

        Args:
            max_values: A list of the max values found at each iteration.
            iterations: Total number of iterations.
        """

        plt.figure(figsize=(10, 6))
        plt.plot(range(1, iterations + 1), max_values, marker='o', linestyle='-', color='b')
        plt.xlabel('Iteration')
        plt.ylabel('Max Coalition Value So Far')
        plt.title('Max Coalition Value Over Iterations')
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    @staticmethod
    def plot_nodes_explored_vs_pruned(nodes_explored: List[int], nodes_pruned: List[int]):
        """
        Plot the number of nodes explored and pruned over time during the Branch and Bound algorithm.

        Args:
            nodes_explored: A list of the number of nodes explored at each iteration.
            nodes_pruned: A list of the number of nodes pruned at each iteration.
        """
        plt.figure(figsize=(10, 6))
        plt.scatter(range(len(nodes_explored)), nodes_explored, label="Nodes Explored", color='blue', marker='o')
        plt.scatter(range(len(nodes_pruned)), nodes_pruned, label="Nodes Pruned", color='red', marker='x')
        plt.xlabel("Iterations")
        plt.ylabel("Nodes")
        plt.title("Nodes Explored vs. Nodes Pruned")
        plt.legend(loc="upper left")
        plt.grid(True)
        plt.tight_layout()
        plt.show()



# Base CoalitionalGame class
class CoalitionalGame:
    """A base class for coalitional games, where coalitions are assigned values."""

    def __init__(self, num_agents: int, agents: Optional[List[int]] = None, values: Optional[Dict[Tuple[int], float]] = None):
        self.num_agents = num_agents
        self.agents = agents if agents is not None else list(range(1, num_agents + 1))
        self.values = values if values is not None else {}

    def characteristic_function(self, coalition: Tuple[int, ...]) -> float:
        """Returns the value of a coalition based on the characteristic function."""
        sorted_coalition = tuple(sorted(coalition))
        return self.values.get(sorted_coalition, 0.0)

# BranchAndBound class with tracking of coalition structures and nodes explored
class BranchAndBound:
    def __init__(
        self,
        num_iterations: int = 1,
        seed: Optional[int] = None,
        reporting: Any = None,
        random_branching: bool = True,  # Flag for randomness control
    ):
        self._num_iterations = num_iterations
        self._seed = seed
        self._reporting = None
        self.random_branching = random_branching
        self.coalition_counts = collections.defaultdict(int)
        self.max_values_per_iteration = []
        self.max_value_so_far = -np.inf
        self.nodes_explored_per_iteration = []
        self.nodes_pruned_per_iteration = []

        if seed is not None:
            random.seed(seed)
            np.random.seed(seed)


    def deterministic_partition_from_structure(self, coalition_structure: List[Tuple[int, ...]]) -> List[Tuple[int, ...]]:
        """Deterministically splits the largest coalition in the structure."""
        # Step 1: Select the largest coalition to split
        splittable_coalitions = [coalition for coalition in coalition_structure if len(coalition) > 1]

        # No coalitions to split, return original structure
        if not splittable_coalitions:
            return coalition_structure

        # Select the largest coalition to split deterministically
        selected_coalition = max(splittable_coalitions, key=len)

        # Split the selected coalition into two halves
        split_point = len(selected_coalition) // 2
        sub_coalition = selected_coalition[:split_point]
        remaining_coalition = selected_coalition[split_point:]

        # Form the new structure by replacing the selected coalition with the two sub-coalitions
        new_structure = [coalition if coalition != selected_coalition else sub_coalition for coalition in coalition_structure]
        new_structure.append(remaining_coalition)

        return new_structure


    def solve(
        self,
        game: CoalitionalGame,
    ) -> Optional[Tuple[Optional[Sequence[Sequence[int]]], Optional[Sequence[float] | np.ndarray]]]:
        """Run multiple iterations of coalition structure search.

        Args:
          characteristic_function: looks up or computes the value of any coalition
          game: an instance of a CoalitionalGame to be solved
        Returns:
          coalition_structure: sets of sets of agents (coalitions)
          payoffs: list of individual agent payoffs
        """

        #logging.info("Running Branch and Bound.")

        results = None
        for iteration in range(self._num_iterations):
            max_value, max_coal_structure, nodes_explored, pruned_nodes = self.run_optimal_cs_solver(game)
            normalized_structure = self.normalize_structure(max_coal_structure)
            self.coalition_counts[normalized_structure] += 1

            # Track the maximum value found so far and store it for plotting
            if max_value > self.max_value_so_far:
                self.max_value_so_far = max_value
            self.max_values_per_iteration.append(self.max_value_so_far)

            # Track the number of nodes explored and pruned nodes
            self.nodes_explored_per_iteration.append(nodes_explored)
            self.nodes_pruned_per_iteration.append(pruned_nodes)

            results = {
                "max_value": max_value,
                "max_coal_structure": max_coal_structure,
                "nodes_explored": nodes_explored,
                "pruned_nodes": pruned_nodes,
            }
           # self._reporting(results)

        return self.coalition_counts

    def members_to_binary_coal(self, members: Tuple[int, ...], game: CoalitionalGame) -> np.ndarray:
        coal = np.zeros(game.num_agents)
        indices = [agent - 1 for agent in members]
        coal[indices] = 1
        return coal

    def random_partition_from_structure(self, coalition_structure: List[Tuple[int, ...]]) -> List[Tuple[int, ...]]:

        """Randomly selects a coalition and splits it.

        Args:
          coalition_structure: (List[Tuple[int, ...]]), The current coalition
          structure.
        Returns:
          List[Tuple[int, ...]]: The new coalition structure with one coalition
            randomly selected and partitioned into two sub-coalitions.
        """

        # Step 1: Randomly select a coalition to split
        splittable_coalitions = [coalition for coalition in coalition_structure if len(coalition) > 1]

        # No coalitions that can be split, so return the original structure
        if not splittable_coalitions:
            return coalition_structure

        # Select a random coalition from the splittable ones
        selected_coalition = random.choice(splittable_coalitions)

        # Step 2: Randomly partition the selected coalition into two
        # sub-coalitions
        # Randomly choose a size for the first sub-coalition
        partition_size = random.randint(1, len(selected_coalition) - 1)

        # Get the two sub-coalitions
        sub_coalition = tuple(random.sample(selected_coalition, partition_size))
        remaining_coalition = tuple(sorted(set(selected_coalition) - set(sub_coalition)))


        # Step 3: Form the new CS by replacing the selected coalition
        #         with the two new sub-coalitions
        new_structure = [coalition if coalition != selected_coalition else sub_coalition for coalition in coalition_structure]
        new_structure.append(remaining_coalition)

        return new_structure

    def calculate_upper_bound(self, coalition: Union[Tuple[int, ...], List[int]], game: CoalitionalGame) -> float:
        """Calculate the upper bound for the maximum value of a given coalition.

        Args:
          coalition (Tuple[int, ...]): Tuple representing a coal of agents
          game: game.CoalitionalGame instance
        Returns:
          float: The upper bound for the max possible value of the coal structure.
        """

        # Add the value of all possible sub-coalitions to calculate the upper bound
        upper_bound = 0
        for i in range(1, len(coalition) + 1):
            for sub_coalition in itertools.combinations(coalition, i):
                sub_value = game.characteristic_function(sub_coalition)
                upper_bound += sub_value
        return upper_bound


    ##############################################################################
    # Optimizer with Grand Coalition Initialization and Randomized Branching
    ##############################################################################
    def run_optimal_cs_solver(self, game: CoalitionalGame) -> Tuple[float, Sequence[Sequence[int]], int, int]:

        """Branch and Bound algorithm for finding the optimal coalition structure.

        with an adjusted pruning strategy.
        Args:
          game: game.CoalitionalGame instance

        Returns:
          Tuple:
              - float: The best value (maximum sum of coalition values)
              - List[Tuple[int, ...]]: The best coalition structure.
              - int: The total number of nodes explored.
              - int: The total number of pruned nodes.
              (sub-optimal structures removed from consideration).
        """

        # Metrics for analysis
        max_value = -np.inf  # Initialize best value
        max_coal_structure = []  # Placeholder for best coalition structure
        pruned_nodes = 0  # Count the number of pruned nodes
        nodes_explored = 0  # Count the total number of explored nodes

        # Always start from the grand coalition
        initial_agents = list(range(1, game.num_agents + 1))
        initial_structure = [tuple(initial_agents)]
        initial_value = game.characteristic_function(tuple(initial_agents)) # Grand coalition's value


        # Stack stores (structure, value) pairs for exploration
        stack = [(initial_structure, initial_value)]

        # Branch: Explore random coalition structures
        while stack:
            current_structure, current_value = stack.pop()
            current_value = float(current_value)
            nodes_explored += 1

            # Check if current value is better than the best value found so far
            if current_value > max_value:
                max_value = current_value  # Update best value
                max_coal_structure = current_structure # Update best structure


             # Choose partitioning strategy based on random_branching flag
            if self.random_branching:
                new_structure = self.random_partition_from_structure(current_structure)
            else:
                new_structure = self.deterministic_partition_from_structure(current_structure)


            # Randomly partition the current structure into sub-coalitions
            #new_structure = self.random_partition_from_structure(current_structure)

            # Calculate the value of the new coalition structure
            cs_value = sum([game.characteristic_function(coalition) for coalition in new_structure])

            # Bounding: Upper bound on remaining coalition
            upper_bound = cs_value + self.calculate_upper_bound(new_structure[-1], game)

            # Pruning: Don't prune too aggressively if coalitions are large
            large_coalition_tolerance = 0.1 * len(new_structure[-1])

            # If the upper bound is promising, add the new structure to the stack
            if upper_bound > max_value - large_coalition_tolerance:
                stack.append((new_structure, cs_value))
            else:
                pruned_nodes += 1

        # Sort the structure by length for readability
        sorted_structure = sorted([tuple(sorted(tup)) for tup in max_coal_structure], key=lambda x: (len(x), x))

        return (
        max_value,  # Return the best value found
        sorted_structure,  # Return the best coalition structure
        nodes_explored,  # Return the total number of explored nodes
        pruned_nodes,  # Return the total number of pruned nodes
    )


    @staticmethod
    def normalize_structure(structure: Sequence[Sequence[int]]) -> Tuple[Tuple[int]]:
        """
        Normalize a coalition structure by sorting elements within each coalition
        and then sorting the coalitions within the structure.
        """
        return tuple(sorted(tuple(sorted(coalition)) for coalition in structure))

    def run_multiple_iterations(self, game: CoalitionalGame, iterations=1000):
        """
        Run the Branch and Bound algorithm for multiple iterations and track the
        percentage of times each coalition structure is selected.

        Args:
            game: The CoalitionalGame instance.
            iterations: Number of iterations to run.

        Returns:
            A dictionary with coalition structures as keys and their percentage of selection.
        """
        # Ensure max_values_per_iteration is reset for each game run
        self.max_values_per_iteration = []
        self.nodes_explored_per_iteration = []  # Reset nodes explored tracking
        self.nodes_pruned_per_iteration = []  # Reset nodes pruned tracking
        self.max_value_so_far = -np.inf  # Reset the max value

        for _ in range(iterations):
            self.solve(game)

        # Calculate percentages based on the total occurrences across all iterations
        total_iterations = sum(self.coalition_counts.values())
        coalition_percentages = {structure: (count / total_iterations) * 100 for structure, count in self.coalition_counts.items()}

        return coalition_percentages


# Running the Branch and Bound algorithm for all characteristic functions
if __name__ == "__main__":
    # List of games to run
    games = [
        ("ConvexGame1", ConvexGame1()),
        ("SuperadditiveGame1", SuperadditiveGame1()),
        ("MonotoneGame1", MonotoneGame1()),
         ("MonotoneGame2", MonotoneGame2()),
        ("NonMonotoneGame1", NonMonotoneGame1())
    ]

    # Create an instance of the plotting class
    plotting = Plotting()

    # Run Branch and Bound for each game and plot the results
    for game_name, game_instance in games:
        print(f"\nRunning Branch and Bound for {game_name}...\n")

        # Instantiate the Branch and Bound algorithm
        bb_solver = BranchAndBound(num_iterations=1, seed=42,random_branching=False)

        # Run the solver with 1000 iterations
        iterations = 100
        coalition_percentages = bb_solver.run_multiple_iterations(game=game_instance, iterations=iterations)

        print(f"Coalition Structure Percentages: {coalition_percentages}\n")

        # Plot 1: Coalition Structure Percentages
        #plotting.plot_coalition_percentages(coalition_percentages)

        # Plot 2: Value vs Iterations
        #plotting.plot_value_vs_iterations(bb_solver.max_values_per_iteration, iterations)

        print(f"Max CS Value: {bb_solver.max_values_per_iteration}\n")

        # Plot 3: Nodes Explored vs Nodes Pruned
        #plotting.plot_nodes_explored_vs_pruned(bb_solver.nodes_explored_per_iteration, bb_solver.nodes_pruned_per_iteration)


### **MCTS**

Key Components of MCTS for Coalition Structure Generation:


**1.Tree Structure:**

* **Nodes:** Each node in the tree represents a partial coalition structure (a subset of the full set of agents).
* **Edges:** Each edge represents the addition of a new agent to an existing coalition structure, forming a new coalition or extending an existing one.


**Iterations:**

The MCTS runs for a specified number of iterations, with each iteration involving selection, expansion, simulation, and backpropagation.

**2.Stages of MCTS:**

* **Selection:** Starting from the root node, select child nodes recursively using a selection policy (often UCB1, Upper Confidence Bound) until reaching a leaf node or a node that requires expansion.

* **Expansion:** Expand the selected node by adding one or more new nodes (representing new coalition structures).

* **Simulation (Rollout):** Simulate the outcome of completing the coalition structure (typically by randomly forming the remaining coalitions) and evaluate its value.

* **Backpropagation:** Update the values of nodes along the path from the leaf node back to the root based on the outcome of the simulation.

**3.Termination:**

The MCTS algorithm continues for a specified number of iterations, or until a certain computational budget (time or simulations) is exhausted.

**4. Output:** The node with the highest visit count after all iterations is considered the best node, and its coalition structure is returned as the result.


$\text{}$

**EXPLORATION/EXPLOITATION**
* Adjusting the iterations parameter to control how many simulations MCTS will run, which in turn controls how thoroughly it explores the search space.

* The 'exploration_weight' in the 'best_child' function can be tuned to balance exploration and exploitation.

$\text{}$

In [None]:
import random
import math
import matplotlib.pyplot as plt
from collections import defaultdict
from typing import List, Tuple, Dict, Optional, Any

# Base Coalitional Game class
class CoalitionalGame:
    def __init__(self, num_agents: int, values: Dict[Tuple[int, ...], float]) -> None:
        """
        Initializes a coalitional game.

        Args:
            num_agents: Number of agents involved in the game.
            values: Dictionary mapping coalition tuples to their values.
        """
        self.num_agents = num_agents
        self.values = values

    def characteristic_function(self, coalition: Tuple[int, ...]) -> float:
        """
        Returns the value of a given coalition.

        Args:
            coalition: A tuple representing a coalition of agents.

        Returns:
            float: The value of the coalition according to the game's characteristic function.
        """
        sorted_coalition = tuple(sorted(coalition))
        return self.values.get(sorted_coalition, 0.0)


class MCTSNode:
    def __init__(self, agents: List[int], coalition_structure: List[List[int]], parent: Optional['MCTSNode'] = None) -> None:
        """
        Initializes a node in the MCTS tree.

        Args:
            agents: List of agents involved in the coalitional game.
            coalition_structure: List of coalitions (each coalition is a list of agents).
            parent: The parent node (if any).
        """
        self.agents = agents
        self.coalition_structure = coalition_structure
        self.parent = parent
        self.children: List[MCTSNode] = []
        self.visits = 0
        self.value = 0.0

    def is_fully_expanded(self) -> bool:
        """
        Checks if all agents are already allocated to coalitions.

        Returns:
            bool: True if all agents are allocated, otherwise False.
        """
        return len(self.coalition_structure) == len(self.agents)

    def best_child(self, exploration_weight: float = 1.0) -> 'MCTSNode':
        """
        Returns the best child of the current node, using the UCB1 formula.
        Calculates the UCB score for each child and selects the one with the highest score.

        Args:
            exploration_weight: The weight for exploration in the UCB1 formula.

        Returns:
            MCTSNode: The best child node.
        """
        choices_weights = [
            (child.value / child.visits) + exploration_weight * math.sqrt(2 * math.log(self.visits) / child.visits)
            for child in self.children
        ]
        return self.children[choices_weights.index(max(choices_weights))]

    def expand(self) -> 'MCTSNode':
        """
        Expands the node by randomly adding one of the unallocated agents to a new coalition.

        Returns:
            MCTSNode: The newly expanded child node.
        """
        unallocated_agents = list(set(self.agents) - set(agent for coalition in self.coalition_structure for agent in coalition))
        if unallocated_agents:
            new_agent = random.choice(unallocated_agents)
            new_coalition_structure = self.coalition_structure + [[new_agent]]
            child_node = MCTSNode(self.agents, new_coalition_structure, parent=self)
            self.children.append(child_node)
            return child_node
        return self

    def simulate(self, values: Dict[Tuple[int, ...], float], grand_coalition_prob: float = 0.0) -> Tuple[List[List[int]], float]:
        """
        Simulates a random completion of the coalition structure by
        randomly assigning unallocated agents into new coalitions.

        Args:
            values: The characteristic function mapping coalitions to their values.
            grand_coalition_prob: Probability of enforcing the grand coalition.

        Returns:
            Tuple[List[List[int]], float]: The completed coalition structure and its fitness value.
        """
        unallocated_agents = list(set(self.agents) - set(agent for coalition in self.coalition_structure for agent in coalition))
        simulated_structure = self.coalition_structure[:]

        # Enforce the grand coalition based on the probability
        if random.random() < grand_coalition_prob:
            simulated_structure = [self.agents]
        else:
            while unallocated_agents:
                random_coalition_size = random.randint(1, len(unallocated_agents))
                new_coalition = random.sample(unallocated_agents, random_coalition_size)
                simulated_structure.append(new_coalition)
                unallocated_agents = list(set(unallocated_agents) - set(new_coalition))
        fitness = MCTS.evaluate_fitness(simulated_structure, values)
        return simulated_structure, fitness

    def backpropagate(self, result: float) -> None:
        """
        Updates the node's estimates for value and visit count, and propagates the result back up the tree.

        Args:
            result: The fitness value to propagate up the tree.

        Returns:
            None
        """
        self.visits += 1
        self.value += result
        if self.parent:
            self.parent.backpropagate(result)


class MCTS:
    def __init__(self, agents: List[int], values: Dict[Tuple[int, ...], float], iterations: int = 1000, grand_coalition_prob: float = 0.0) -> None:
        """
        Initializes the MCTS object with the required agents and characteristic function.

        Args:
            agents: List of agents.
            values: Characteristic function mapping coalitions to values.
            iterations: Number of MCTS iterations to run.
            grand_coalition_prob: The probability of enforcing the grand coalition during simulation.
        """
        self.agents = agents
        self.values = values
        self.iterations = iterations
        self.grand_coalition_prob = grand_coalition_prob
        self.best_structure_counts: Dict[Tuple[Tuple[int, ...], ...], int] = defaultdict(int)
        self.max_values_per_iteration: List[float] = []

    @staticmethod
    def evaluate_fitness(coalition_structure: List[List[int]], values: Dict[Tuple[int, ...], float]) -> float:
        """
        Evaluates the fitness (value) of a coalition structure by summing the values of the individual coalitions.

        Args:
            coalition_structure: List of coalitions (each coalition is a list of agents).
            values: Characteristic function mapping coalitions to their values.

        Returns:
            float: Total value of the coalition structure.
        """
        fitness = 0
        for coalition in coalition_structure:
            sorted_coalition = tuple(sorted(coalition))
            fitness += values.get(sorted_coalition, 0.0)
        return fitness

    def solve(self, game: CoalitionalGame) -> Tuple[List[List[int]], float]:
        """
        Solves the coalition structure problem using Monte Carlo Tree Search.

        Args:
            game: The CoalitionalGame instance (not used here, but for consistency).

        Returns:
            Tuple[List[List[int]], float]: The best coalition structure and its corresponding value.
        """
        root = MCTSNode(self.agents, [])
        return self.monte_carlo_tree_search(root)

    def monte_carlo_tree_search(self, root: MCTSNode, plot: bool = False) -> Tuple[List[List[int]], float]:
        """
        Performs the MCTS algorithm and returns the best node found after a number of iterations.

        Args:
            root: The root node of the MCTS tree.
            plot: Whether to plot the results.

        Returns:
            Tuple[List[List[int]], float]: The best coalition structure and its corresponding value.
        """
        max_value = float('-inf')
        max_coal_structure = None
        for iteration in range(self.iterations):
            node = root
            while node.is_fully_expanded() and node.children:
                node = node.best_child()
            if not node.is_fully_expanded():
                node = node.expand()
            simulated_structure, fitness = node.simulate(self.values, grand_coalition_prob=self.grand_coalition_prob)
            node.backpropagate(fitness)
            if fitness > max_value:
                max_value = fitness
                max_coal_structure = simulated_structure
            self.best_structure_counts[tuple(sorted(tuple(sorted(c)) for c in simulated_structure))] += 1
            self.max_values_per_iteration.append(max_value)
        if plot:
            self.plot_metrics()
        return max_coal_structure, max_value

    def plot_metrics(self) -> None:
        """
        Plot the best value found across iterations.

        Returns:
            None
        """
        plt.figure(figsize=(10, 6))
        plt.plot(range(self.iterations), self.max_values_per_iteration)
        plt.xlabel('Iterations')
        plt.ylabel('Best Value Found')
        plt.title('Best Value vs. Iterations')
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    def calculate_coalition_structure_percentages(self) -> Dict[Tuple[Tuple[int, ...], ...], float]:
        """
        Calculate the percentage of times each coalition structure was the best.

        Returns:
            Dict[Tuple[Tuple[int, ...], ...], float]: A dictionary mapping coalition structures to their percentage of selection.
        """
        total_iterations = self.iterations
        coalition_percentages = {structure: count / total_iterations for structure, count in self.best_structure_counts.items()}
        total_percentage = sum(coalition_percentages.values())
        assert abs(total_percentage - 1.0) < 1e-6, f"Total percentages do not sum to 1, got {total_percentage}"
        return coalition_percentages

    def plot_coalition_structure_percentages(self, coalition_percentages: Dict[Tuple[Tuple[int, ...], ...], float]) -> None:
        """
        Plot the coalition structure percentages as a bar chart.

        Args:
            coalition_percentages: A dictionary mapping coalition structures to their percentages.

        Returns:
            None
        """
        structures = list(coalition_percentages.keys())
        percentages = [p * 100 for p in coalition_percentages.values()]
        plt.figure(figsize=(10, 6))
        plt.barh(range(len(structures)), percentages, tick_label=[str(s) for s in structures])
        plt.xlabel('Percentage (%)')
        plt.ylabel('Coalition Structures')
        plt.title('Coalition Structure Percentages')
        plt.tight_layout()
        plt.show()




# Function to run MCTS algorithm for multiple games
def run_many_games_mcts(selected_games: Optional[List[int]] = None, iterations: int = 1000, grand_coalition_prob: float = 0.0) -> None:
    """
    Run the MCTS algorithm for multiple games.

    Args:
        selected_games: List of indices of games to run MCTS on.
        iterations: Number of MCTS iterations to run.
        grand_coalition_prob: Probability of enforcing the grand coalition during simulation.

    Returns:
        None
    """
    obj = {
        0: ("Convex Game", ConvexGame1),
        1: ("Superadditive Game", SuperadditiveGame1),
        2: ("Monotone Game", MonotoneGame1),
        3: ("Monotone Game2", MonotoneGame2),
        4: ("Non-Monotone Game", NonMonotoneGame1)
    }
    if selected_games is None:
        selected_games = list(obj.keys())
    for i in selected_games:
        if i in obj:
            game_name, game_class = obj[i]
            game_instance = game_class()
            agents = list(range(1, game_instance.num_agents + 1))
            run_mcts_algorithm(game_name, agents, game_instance.values, iterations, grand_coalition_prob)
        else:
            print(f"Invalid game index: {i}. Skipping.")


# Function to run the MCTS algorithm for a single game
def run_mcts_algorithm(game_name: str, agents: List[int], values: Dict[Tuple[int, ...], float], iterations: int = 1000, grand_coalition_prob: float = 0.0) -> None:
    """
    Run the MCTS algorithm for a specific game and plot the results.

    Args:
        game_name: The name of the game.
        agents: List of agents involved in the game.
        values: Characteristic function mapping coalitions to their values.
        iterations: Number of MCTS iterations to run.
        grand_coalition_prob: Probability of enforcing the grand coalition during simulation.

    Returns:
        None
    """
    print(f"Running MCTS for {game_name}...\n")

    # Initialize the MCTS instance
    mcts = MCTS(agents, values, iterations, grand_coalition_prob)

    # Run the MCTS algorithm
    max_coal_structure, max_value = mcts.solve(None)

    # Output the results
    print(f"Best Coalition Structure: {max_coal_structure}")
    print(f"Best Value: {max_value}")

    # Plot the best value found over iterations
    #mcts.plot_metrics()

    # Calculate and plot coalition structure percentages
    coalition_percentages = mcts.calculate_coalition_structure_percentages()
    #mcts.plot_coalition_structure_percentages(coalition_percentages)
    print(f"Coalition Structure Percentages: {coalition_percentages}")

    print(f"Max CS Value: {mcts.max_values_per_iteration}\n")


# Example: Call to run MCTS on all games with 500 iterations and 30% chance of enforcing the grand coalition
run_many_games_mcts([0, 1, 2, 3, 4], iterations=500, grand_coalition_prob=0.3)
