<a href="https://colab.research.google.com/github/lcipolina/open_spiel__arena/blob/main/LLM_OpenSpiel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install system dependencies
!apt-get install -y cmake g++ git python3-dev python3-pip
# Clone OpenSpiel repository
!git clone https://github.com/deepmind/open_spiel.git
# Navigate to the cloned directory
%cd open_spiel
# Install OpenSpiel
!pip install .


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
g++ is already the newest version (4:11.2.0-1ubuntu1).
g++ set to manually installed.
cmake is already the newest version (3.22.1-1ubuntu1.22.04.2).
git is already the newest version (1:2.34.1-1ubuntu1.11).
python3-dev is already the newest version (3.10.6-1~22.04.1).
python3-dev set to manually installed.
The following additional packages will be installed:
  python3-setuptools python3-wheel
Suggested packages:
  python-setuptools-doc
The following NEW packages will be installed:
  python3-pip python3-setuptools python3-wheel
0 upgraded, 3 newly installed, 0 to remove and 49 not upgraded.
Need to get 1,677 kB of archives.
After this operation, 8,968 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy-updates/main amd64 python3-setuptools all 59.6.0-1.2ubuntu0.22.04.2 [340 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy-updates/universe amd64 python3-whee

In [None]:
# Install required libraries
!apt-get install -y cmake g++ git python3-dev python3-pip
!git clone https://github.com/deepmind/open_spiel.git
%cd open_spiel
!pip install .

from transformers import pipeline
from enum import Enum
import open_spiel.python.games.tic_tac_toe as tic_tac_toe
import pyspiel  # OpenSpiel core Python bindings

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
g++ is already the newest version (4:11.2.0-1ubuntu1).
cmake is already the newest version (3.22.1-1ubuntu1.22.04.2).
git is already the newest version (1:2.34.1-1ubuntu1.11).
python3-dev is already the newest version (3.10.6-1~22.04.1).
python3-pip is already the newest version (22.0.2+dfsg-1ubuntu0.5).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.
fatal: destination path 'open_spiel' already exists and is not an empty directory.
/content/open_spiel/open_spiel


In [None]:
from transformers import pipeline
from enum import Enum
import open_spiel.python.games.tic_tac_toe as tic_tac_toe
import pyspiel  # OpenSpiel core Python bindings
import random

class AvailableGames(Enum):
    """
    Enumerator for the games available in this script.
    """
    TIC_TAC_TOE = "Tic-Tac-Toe"
    PRISONERS_DILEMMA = "Python Iterated Prisoner's Dilemma"
    ROCK_PAPER_SCISSORS = "Rock-Paper-Scissors"

class GameSimulator:
    """
    Base class for simulating games with LLMs.
    Handles common functionality like state transitions and scoring.
    """
    def __init__(self, game, game_name, llms, random_bot=False, play_against_itself=False):
        self.game = game
        self.game_name = game_name
        self.llms = llms
        self.random_bot = random_bot
        self.play_against_itself = play_against_itself

    def simulate(self):
        """
        Simulates the game. To be overridden by subclasses for game-specific logic.
        """
        raise NotImplementedError("Subclasses must implement the simulate method.")

    def _apply_default_action(self, state):
        """
        Applies a default action when the current player is invalid.
        """
        state.apply_action(state.legal_actions()[0])

class TicTacToeSimulator(GameSimulator):
    """
    Simulator for Tic-Tac-Toe.
    """
    def simulate(self):
        state = self.game.new_initial_state()
        scores = {name: 0 for name in self.llms.keys()}

        while not state.is_terminal():
            print(f"Current state of {self.game_name}:\n{state}")

            current_player = state.current_player()
            if current_player < 0:  # Invalid player state
                print(f"Skipping invalid player index: {current_player}")
                self._apply_default_action(state)
                continue

            legal_actions = state.legal_actions(current_player)

            if self.random_bot and current_player == 1:  # Player 1 is a random bot
                action = random.choice(legal_actions)
            elif self.play_against_itself:
                model_name = list(self.llms.keys())[current_player % len(self.llms)]
                llm = self.llms[model_name]
                prompt = generate_prompt(self.game_name, str(state), legal_actions)
                action = llm_decide_move(llm, prompt, legal_actions)
            elif current_player < len(self.llms):  # Player is controlled by an LLM
                model_name = list(self.llms.keys())[current_player]
                llm = self.llms[model_name]
                prompt = generate_prompt(self.game_name, str(state), legal_actions)
                action = llm_decide_move(llm, prompt, legal_actions)
            else:
                action = legal_actions[0]  # Simplified bot

            state.apply_action(action)

        final_scores = state.returns()
        for i, score in enumerate(final_scores):
            if i < len(self.llms):
                scores[list(self.llms.keys())[i]] += score

        print(f"Final state of {self.game_name}:\n{state}")
        print(f"Scores: {scores}")
        return scores

class PrisonersDilemmaSimulator(GameSimulator):
    """
    Simulator for Iterated Prisoner's Dilemma.
    Handles simultaneous moves and iterated gameplay with a maximum iteration limit.
    """
    def __init__(self, game, game_name, llms, random_bot=False, play_against_itself=False, max_iterations=50):
        super().__init__(game, game_name, llms, random_bot, play_against_itself)
        self.max_iterations = max_iterations

    def simulate(self):
        state = self.game.new_initial_state()
        scores = {name: 0 for name in self.llms.keys()}
        iteration = 0  # Track the number of iterations

        while not state.is_terminal():
            print(f"Current state of {self.game_name}:\n{state}")

            # Stop if max iterations are reached
            if iteration >= self.max_iterations:
                print(f"Reached maximum iterations: {self.max_iterations}. Ending simulation.")
                break

            # Handle chance nodes
            if state.is_chance_node():
                print("Chance node encountered. Applying default action.")
                action = state.legal_actions()[0]  # Use the default action
                state.apply_action(action)
                continue

            # Collect actions for both players simultaneously
            actions = []
            for player in range(2):  # Assuming a 2-player game
                legal_actions = state.legal_actions(player)
                if self.random_bot and player == 1:
                    action = random.choice(legal_actions)
                elif self.play_against_itself:
                    model_name = list(self.llms.keys())[player % len(self.llms)]
                    llm = self.llms[model_name]
                    prompt = generate_prompt(self.game_name, str(state), legal_actions)
                    action = llm_decide_move(llm, prompt, legal_actions)
                else:
                    model_name = list(self.llms.keys())[player]
                    llm = self.llms[model_name]
                    prompt = generate_prompt(self.game_name, str(state), legal_actions)
                    action = llm_decide_move(llm, prompt, legal_actions)
                actions.append(action)

            # Apply actions simultaneously
            state.apply_actions(actions)
            iteration += 1  # Increment iteration count

        # Gather final scores
        final_scores = state.returns()
        for i, score in enumerate(final_scores):
            scores[list(self.llms.keys())[i]] += score

        print(f"Final state of {self.game_name}:\n{state}")
        print(f"Scores: {scores}")
        return scores

class RockPaperScissorsSimulator(GameSimulator):
    """
    Simulator for Rock-Paper-Scissors.
    """
    def simulate(self):
        state = self.game.new_initial_state()
        scores = {name: 0 for name in self.llms.keys()}

        while not state.is_terminal():
            print(f"Current state of {self.game_name}:\n{state}")

            # Collect actions for both players simultaneously
            actions = []
            for player in range(2):  # Assuming a 2-player game
                legal_actions = state.legal_actions(player)
                if self.random_bot and player == 1:
                    action = random.choice(legal_actions)
                elif self.play_against_itself:
                    model_name = list(self.llms.keys())[player % len(self.llms)]
                    llm = self.llms[model_name]
                    prompt = generate_prompt(self.game_name, str(state), legal_actions)
                    action = llm_decide_move(llm, prompt, legal_actions)
                else:
                    model_name = list(self.llms.keys())[player]
                    llm = self.llms[model_name]
                    prompt = generate_prompt(self.game_name, str(state), legal_actions)
                    action = llm_decide_move(llm, prompt, legal_actions)
                actions.append(action)

            # Apply actions simultaneously
            state.apply_actions(actions)

        # Gather final scores
        final_scores = state.returns()
        for i, score in enumerate(final_scores):
            scores[list(self.llms.keys())[i]] += score

        print(f"Final state of {self.game_name}:\n{state}")
        print(f"Scores: {scores}")
        return scores

# Utility Functions
def generate_prompt(game_name, state, legal_actions):
    return (
        f"Game: {game_name}\n"
        f"State:\n{state}\n"
        f"Legal actions: {legal_actions}\n"
        "Choose the next action (provide the action number)."
    )

def llm_decide_move(llm, prompt, legal_actions):
    response = llm(prompt, max_new_tokens=30, pad_token_id=50256)[0]["generated_text"]
    for word in response.split():
        try:
            move = int(word)
            if move in legal_actions:
                return move
        except ValueError:
            continue
    return legal_actions[0]  # Fallback to the first action

def evaluate_performance(llms, selected_games):
    game_mapping = {
        AvailableGames.TIC_TAC_TOE: (tic_tac_toe.TicTacToeGame(), TicTacToeSimulator),
        AvailableGames.PRISONERS_DILEMMA: (
            pyspiel.load_game("python_iterated_prisoners_dilemma"),
            PrisonersDilemmaSimulator,
        ),
        AvailableGames.ROCK_PAPER_SCISSORS: (
            pyspiel.load_game("matrix_rps"),
            RockPaperScissorsSimulator,
        ),
    }

    overall_results = {name: 0 for name in llms.keys()}

    for selected_game in selected_games:
        game_name = selected_game.value
        game_instance, simulator_class = game_mapping[selected_game]
        if selected_game == AvailableGames.TIC_TAC_TOE:
            simulator = simulator_class(game_instance, game_name, llms, random_bot=True, play_against_itself=False)
        elif selected_game == AvailableGames.PRISONERS_DILEMMA:
            simulator = simulator_class(game_instance, game_name, llms, random_bot=False, play_against_itself=True, max_iterations=10)
        else:
            simulator = simulator_class(game_instance, game_name, llms, random_bot=False, play_against_itself=False)
        print(f"\nStarting game: {game_name}")
        game_results = simulator.simulate()
        for model_name, score in game_results.items():
            overall_results[model_name] += score

    print("\nOverall Performance:")
    for model_name, total_score in overall_results.items():
        print(f"{model_name}: {total_score}")
    return overall_results

# Main Execution
if __name__ == "__main__":
    # Load LLMs
    llm_models = ["google/flan-t5-small", "gpt2"]
    llms = {name: pipeline("text-generation", model=name) for name in llm_models}

    # Display available games
    print("\nAvailable Games:")
    for idx, game in enumerate(AvailableGames, start=1):
        print(f"{idx}. {game.value}")

    # User selects games to play
    selected_indices = input("\nEnter the numbers of the games to play (comma-separated): ")
    selected_indices = [int(idx.strip()) for idx in selected_indices.split(",")]
    selected_games = [list(AvailableGames)[idx - 1] for idx in selected_indices]

    # Evaluate performance
    evaluate_performance(llms, selected_games)
