In [1]:
#@title ##### License { display-mode: "form" }
# Copyright 2019 DeepMind Technologies Ltd. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# OpenSpiel: RRPS Example

* This Colab gets you started with installing OpenSpiel and its dependencies.
* OpenSpiel is a framework for reinforcement learning in games.
* For a longer intro to OpenSpiel, see [the tutorial video](https://www.youtube.com/watch?v=8NCPqtPwlFQ), [documentation](https://openspiel.readthedocs.io/en/latest/), or [API reference](https://openspiel.readthedocs.io/en/latest/api_reference.html).
* This colab also includes examples of how to get started with the Roshambo/RRPS environment and bots. It is based on [roshambo_population_example.py](https://github.com/google-deepmind/open_spiel/blob/master/open_spiel/python/examples/roshambo_population_example.py)
* For more info on Roshambo, see the [RRPS benchmark paper](https://openreview.net/pdf?id=gQnJ7ODIAx).

## Install

Install OpenSpiel via pip:

Note that if you are not using a colab, then you would use
python3 -m pip install open_spiel
Additional information about installing OpenSpiel can be found at the links above.


In [2]:
!pip install --upgrade open_spiel



# Simple example of OpenSpiel API: uniform random trajectory on Tic-Tac-Toe

This example is not used for RRPS, but it shows how to load a game in OpenSpiel, how to access a game state, and implements a random strategy.

In [3]:
import numpy as np
import pyspiel

game = pyspiel.load_game("tic_tac_toe")
state = game.new_initial_state()

while not state.is_terminal():
  state.apply_action(np.random.choice(state.legal_actions()))
  print(str(state) + '\n')

...
...
x..

...
o..
x..

...
o..
xx.

.o.
o..
xx.

.o.
o.x
xx.

.o.
oox
xx.

xo.
oox
xx.

xoo
oox
xx.

xoo
oox
xxx



# Getting started with RRPS


In [4]:
# Imports
import numpy as np

from open_spiel.python import rl_agent
from open_spiel.python import rl_environment
import pyspiel


In [5]:
# Some helper classes and functions.
# DO NOT CHANGE.

class BotAgent(rl_agent.AbstractAgent):
  """Agent class that wraps a bot.

  Note, the environment must include the OpenSpiel state in its observations,
  which means it must have been created with use_full_state=True.

  This is a simple wrapper that lets the RPS bots be interpreted as agents under
  the RL API.
  """

  def __init__(self, num_actions, bot, name="bot_agent"):
    assert num_actions > 0
    self._bot = bot
    self._num_actions = num_actions

  def restart(self):
    self._bot.restart()

  def step(self, time_step, is_evaluation=False):
    # If it is the end of the episode, don't select an action.
    if time_step.last():
      return
    _, state = pyspiel.deserialize_game_and_state(
        time_step.observations["serialized_state"])
    action = self._bot.step(state)
    probs = np.zeros(self._num_actions)
    probs[action] = 1.0
    return rl_agent.StepOutput(action=action, probs=probs)


#  We will use this function to evaluate the agents. Do not change.

def eval_agents(env, agents, num_players, num_episodes, verbose=False):
  """Evaluate the agent.

  Runs a number of episodes and returns the average returns for each agent as
  a numpy array.

  Arguments:
    env: the RL environment,
    agents: a list of agents (size 2),
    num_players: number of players in the game (for RRPS, this is 2),
    num_episodes: number of evaluation episodes to run.
    verbose: whether to print updates after each episode.
  """
  sum_episode_rewards = np.zeros(num_players)
  for ep in range(num_episodes):
    for agent in agents:
      # Bots need to be restarted at the start of the episode.
      if hasattr(agent, "restart"):
        agent.restart()
    time_step = env.reset()
    episode_rewards = np.zeros(num_players)
    while not time_step.last():
      agents_output = [
          agent.step(time_step, is_evaluation=True) for agent in agents
      ]
      action_list = [agent_output.action for agent_output in agents_output]
      time_step = env.step(action_list)
      episode_rewards += time_step.rewards
    sum_episode_rewards += episode_rewards
    if verbose:
      print(f"Finished episode {ep}, "
            + f"avg returns: {sum_episode_rewards / (ep+1)}")

  return sum_episode_rewards / num_episodes


def print_roshambo_bot_names_and_ids(roshambo_bot_names):
  print("Roshambo bot population:")
  for i in range(len(roshambo_bot_names)):
    print(f"{i}: {roshambo_bot_names[i]}")

def create_roshambo_bot_agent(player_id, num_actions, bot_names, pop_id):
  name = bot_names[pop_id]
  # Creates an OpenSpiel bot with the default number of throws
  # (pyspiel.ROSHAMBO_NUM_THROWS). To create one for a different number of
  # throws per episode, add the number as the third argument here.
  bot = pyspiel.make_roshambo_bot(player_id, name)
  return BotAgent(num_actions, bot, name=name)


#The following functions are used to load the bots from the original RRPS competition.

In [6]:
# Some basic info and initialize the population

# print(pyspiel.ROSHAMBO_NUM_BOTS)    # 43 bots
# print(pyspiel.ROSHAMBO_NUM_THROWS)  # 1000 steps per episode

# The recall is how many of the most recent actions are presented to the RL
# agents as part of their observations. Note: this is just for the RL agents
# like DQN etc... every bot has access to the full history.
RECALL = 20

# The population of 43 bots. See the RRPS paper for high-level descriptions of
# what each bot does.

print("Loading bot population...")
pop_size = pyspiel.ROSHAMBO_NUM_BOTS
print(f"Population size: {pop_size}")
roshambo_bot_names = pyspiel.roshambo_bot_names()
roshambo_bot_names.sort()
print_roshambo_bot_names_and_ids(roshambo_bot_names)

bot_id = 0
roshambo_bot_ids = {}
for name in roshambo_bot_names:
  roshambo_bot_ids[name] = bot_id
  bot_id += 1


Loading bot population...
Population size: 43
Roshambo bot population:
0: actr_lag2_decay
1: adddriftbot2
2: addshiftbot3
3: antiflatbot
4: antirotnbot
5: biopic
6: boom
7: copybot
8: debruijn81
9: driftbot
10: flatbot3
11: foxtrotbot
12: freqbot2
13: granite
14: greenberg
15: halbot
16: inocencio
17: iocainebot
18: marble
19: markov5
20: markovbails
21: mixed_strategy
22: mod1bot
23: multibot
24: peterbot
25: phasenbott
26: pibot
27: piedra
28: predbot
29: r226bot
30: randbot
31: robertot
32: rockbot
33: rotatebot
34: russrocker4
35: shofar
36: sunCrazybot
37: sunNervebot
38: sweetrock
39: switchalot
40: switchbot
41: textbot
42: zq_move


#Example showing how to load to agents from the RRPS bot population and evalute them against each other.

In [7]:
# Example: create an RL environment, and two agents from the bot population and
# evaluate these two agents head-to-head.

# Note that the include_full_state variable has to be enabled because the
# BotAgent needs access to the full state.
env = rl_environment.Environment(
    "repeated_game(stage_game=matrix_rps(),num_repetitions=" +
    f"{pyspiel.ROSHAMBO_NUM_THROWS}," +
    f"recall={RECALL})",
    include_full_state=True)
num_players = 2
num_actions = env.action_spec()["num_actions"]
# Learning agents might need this:
# info_state_size = env.observation_spec()["info_state"][0]

# Create two bot agents
p0_pop_id = 0   # actr_lag2_decay
p1_pop_id = 1   # adddriftbot2
agents = [
    create_roshambo_bot_agent(0, num_actions, roshambo_bot_names, p0_pop_id),
    create_roshambo_bot_agent(1, num_actions, roshambo_bot_names, p1_pop_id)
]

print("Starting eval run.")
avg_eval_returns = eval_agents(env, agents, num_players, 10, verbose=True)

print("Avg return ", avg_eval_returns)

Starting eval run.
Finished episode 0, avg returns: [ 1. -1.]
Finished episode 1, avg returns: [ 5.5 -5.5]
Finished episode 2, avg returns: [ 4.66666667 -4.66666667]
Finished episode 3, avg returns: [ 13.25 -13.25]
Finished episode 4, avg returns: [ 14. -14.]
Finished episode 5, avg returns: [ 27.16666667 -27.16666667]
Finished episode 6, avg returns: [ 20.71428571 -20.71428571]
Finished episode 7, avg returns: [ 19.625 -19.625]
Finished episode 8, avg returns: [ 18.55555556 -18.55555556]
Finished episode 9, avg returns: [ 19.9 -19.9]
Avg return  [ 19.9 -19.9]


#Basic Template for an RL agent



In [None]:
import torch
import torch.nn.functional as F
import torch.nn as nn

class RPSLSTM(nn.Module):
    def __init__(self, vocab_size=3, emb_dim=8, hidden_size=64):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hidden_size, num_layers=1, batch_first=True)
        self.fc = nn.Linear(hidden_size, 3)  # logits for R,P,S

    def forward(self, x, hidden=None):
        # x: (batch, T) of ints 0/1/2
        emb = self.embed(x)
        out, hidden = self.lstm(emb, hidden)  # out: (batch, T, hidden)
        last = out[:, -1, :]
        logits = self.fc(last)                 # (batch, T, 3)
        return logits, hidden
    

class MyAgent(rl_agent.AbstractAgent):
    """
    Greenberg-lite ensemble agent for repeated RPS:

    - Maintains several simple "expert" strategies (predictors + best-response).
    - Tracks a score for each expert based on how well it *would have* done.
    - At each step, picks the action of the best-scoring expert (with some
      epsilon exploration).
    """

    def __init__(self,
                 player_id,
                 num_actions=3,
                 score_alpha=0.5,      
                 epsilon_action=0.1, 
                 epsilon_expert=0.05, 
                 name="ensemble_agent",
                 lstm_model_path="rps_lstm.pt",
                 lstm_seq_len=200, 
                 gamma = 0.95,
                 mdp_interval = 50):
        super().__init__(player_id=player_id, name=name)
        assert num_actions == 3, "RRPS uses 3 actions (R,P,S)."
        self._player_id = player_id
        self._num_actions = num_actions

        self._score_alpha = score_alpha
        self._eps_action = epsilon_action
        self._eps_expert = epsilon_expert

        self._payoff = np.array([
            [ 0, -1,  1],  
            [ 1,  0, -1],  
            [-1,  1,  0],  
        ], dtype=float)

        # Number of experts we'll define
        self._num_experts = 7
        self._lstm_seq_len = lstm_seq_len
        self._device = torch.device("mps")
        self._lstm_model = RPSLSTM(vocab_size=4, emb_dim=16, hidden_size=64).to(self._device)
        state_dict = torch.load(lstm_model_path, map_location=self._device)
        self._lstm_model.load_state_dict(state_dict)
        self._lstm_model.eval()
        self._lstm_hidden = None

        self._mdp_num_states = self._num_actions*self._num_actions + 1
        self._mdp_start_state = self._mdp_num_states-1
        self._mdp_gamma = gamma
        self._mdp_plan_interval = mdp_interval

        self._mdp_N_sa = np.zeros((self._mdp_num_states, self._num_actions), dtype=np.float64)
        self._mdp_R_sa_sum = np.zeros((self._mdp_num_states, self._num_actions), dtype=np.float64)
        self._mdp_N_sas = np.zeros(
            (self._mdp_num_states, self._num_actions, self._mdp_num_states), dtype=np.float64
        )

        # Value function and policy
        self._mdp_V = np.zeros(self._mdp_num_states, dtype=np.float64)
        self._mdp_policy = -np.ones(self._mdp_num_states, dtype=int)

        self._mdp_prev_state = None
        self._mdp_prev_action = None
        self._mdp_steps_since_plan = 0

        self.restart()

    # ------------ Episode reset ------------

    def restart(self):
        # Opponent statistics
        self._opp_counts = np.ones(self._num_actions, dtype=float)  # smoothed counts
        self._trans_counts = np.ones((self._num_actions, self._num_actions),
                                     dtype=float)  # P(opp_t | opp_{t-1})

        self._last_opp_action = None
        self._last_my_action = None

        # Expert scores and last actions they recommended
        self._expert_scores = np.zeros(self._num_experts, dtype=float)
        self._last_expert_actions = [None] * self._num_experts

        # Track how much of history we've processed (if we use it later)
        self._last_history_len = 0

        self._opp_hist = []
        self._lstm_hidden = None

        self._mdp_prev_state = None
        self._mdp_prev_action = None
        self._mdp_steps_since_plan = 0

    # ------------ Utility helpers ------------

    def _beat(self, move):
        #Return the action that beats 'move'
        if move is None:
            return np.random.randint(self._num_actions)
        return (move + 1) % self._num_actions

    def _update_opp_stats(self, state):
        #Update opponent statistics based on game history.
        history = state.history()
        if len(history) == 0:
            return None

        action = history[-1]
        # Update counts
        self._opp_counts[action] += 1
        if self._last_opp_action is not None:
            self._trans_counts[self._last_opp_action, action] += 1
        self._last_opp_action = action
        self._opp_hist.append(action)
        return action
    
    def _lstm_predict_opp(self):
        if len(self._opp_hist) == 0:
            return None

        # take last seq_len moves (pad/truncate)
        seq = self._opp_hist[-self._lstm_seq_len:]
        seq_tensor = torch.tensor(seq, dtype=torch.long, device=self._device)
        seq_tensor = seq_tensor.unsqueeze(0)  # shape (1, T)

        with torch.no_grad():
            # assume model returns (logits, hidden)
            logits, self._lstm_hidden = self._lstm_model(seq_tensor, self._lstm_hidden)
            # use last time step logits
            # last_logits = logits[:, -1, :]      # shape (1, 3)
            last_logits = logits      # shape (1, 3)

            probs = F.softmax(last_logits, dim=-1).cpu().numpy()[0]

        pred = int(np.argmax(probs))
        return pred
    
    def _mdp_current_state(self):
        """Encode current MDP state from last joint action."""
        if self._last_my_action is None or self._last_opp_action is None:
            return self._mdp_start_state
        return self._last_my_action * self._num_actions + self._last_opp_action

    def _mdp_observe_transition(self, reward, curr_state):
        """Update empirical MDP using previous (s,a) and current state."""
        if self._mdp_prev_state is None or self._mdp_prev_action is None:
            return
        s = self._mdp_prev_state
        a = self._mdp_prev_action
        s_next = curr_state

        self._mdp_N_sa[s, a] += 1.0
        self._mdp_R_sa_sum[s, a] += reward
        self._mdp_N_sas[s, a, s_next] += 1.0

        self._mdp_steps_since_plan += 1
        if self._mdp_steps_since_plan >= self._mdp_plan_interval:
            self._mdp_plan_policy()
            self._mdp_steps_since_plan = 0

    def _mdp_plan_policy(self, max_iters=100, tol=1e-4):
        """Value iteration on empirical MDP; update V and π(s)."""
        V = self._mdp_V.copy()
        S = self._mdp_num_states
        A = self._num_actions
        gamma = self._mdp_gamma

        for _ in range(max_iters):
            delta = 0.0
            for s in range(S):
                best_val = None
                for a in range(A):
                    n_sa = self._mdp_N_sa[s, a]
                    if n_sa <= 0:
                        continue
                    r_hat = self._mdp_R_sa_sum[s, a] / n_sa
                    p_hat = self._mdp_N_sas[s, a] / n_sa  # shape (S,)
                    val = r_hat + gamma * np.dot(p_hat, V)
                    if (best_val is None) or (val > best_val):
                        best_val = val
                if best_val is None:
                    best_val = 0.0
                delta = max(delta, abs(best_val - V[s]))
                V[s] = best_val
            if delta < tol:
                break

        # Extract greedy policy
        policy = -np.ones(S, dtype=int)
        for s in range(S):
            best_val = None
            best_a = 0
            for a in range(A):
                n_sa = self._mdp_N_sa[s, a]
                if n_sa <= 0:
                    continue
                r_hat = self._mdp_R_sa_sum[s, a] / n_sa
                p_hat = self._mdp_N_sas[s, a] / n_sa
                val = r_hat + gamma * np.dot(p_hat, V)
                if (best_val is None) or (val > best_val):
                    best_val = val
                    best_a = a
            if best_val is not None:
                policy[s] = best_a

        self._mdp_V = V
        self._mdp_policy = policy

    # ------------ Expert strategies ------------

    def _expert_actions(self):
        #Compute each expert's recommended action for step.
        actions = []

        # Expert 0: Frequency-based best response
        # Predict opp's most frequent move overall.
        opp_probs = self._opp_counts / np.sum(self._opp_counts)
        pred0 = int(np.argmax(opp_probs))
        actions.append(self._beat(pred0))

        # Expert 1: Last-opponent-move best response
        # Predict they repeat their last move.
        actions.append(self._beat(self._last_opp_action))

        # Expert 2: Markov(1) best response
        # Predict based on last opp move -> next opp move.
        if self._last_opp_action is not None:
            row = self._trans_counts[self._last_opp_action]
            pred2 = int(np.argmax(row))
            actions.append(self._beat(pred2))
        else:
            # fallback to frequency BR
            actions.append(self._beat(pred0))

        # Expert 3: Mirror-me assumption
        # Predict opp plays what I played last time.
        actions.append(self._beat(self._last_my_action))

        # Expert 4: ML (LSTM)
        pred_lstm = self._lstm_predict_opp()
        if pred_lstm is not None:
            actions.append(self._beat(pred_lstm))
        else:
            # fallback: behave like frequency BR
            actions.append(self._beat(pred0))

        # Expert 5: random bullshit
        import random
        actions.append(random.randint(0, 2))

        # Expert 6: MDP expert (greedy from π(s))
        curr_state = self._mdp_current_state()
        if 0 <= curr_state < self._mdp_num_states and self._mdp_policy[curr_state] != -1:
            mdp_action = int(self._mdp_policy[curr_state])
        else:
            # fallback: same as freq-BR
            mdp_action = self._beat(pred0)
        actions.append(mdp_action)

        return actions

    # ------------ Expert scoring ------------

    def _update_expert_scores(self, last_opp_action):
        """
        Update each expert's score based on what happened in the *previous* round.

        We use the payoff matrix: if expert i had played its suggested action,
        what reward would it have gotten vs last_opp_action?
        """
        if last_opp_action is None:
            return  # nothing to update on the first move

        for i in range(self._num_experts):
            a_i = self._last_expert_actions[i]
            if a_i is None:
                continue
            payoff_i = self._payoff[a_i, last_opp_action]
            # Exponential moving average of payoff
            self._expert_scores[i] = (
                (1 - self._score_alpha) * self._expert_scores[i]
                + self._score_alpha * payoff_i
            )

    # ------------ Main step ------------

    def step(self, time_step, is_evaluation=False):
        # Terminal: return dummy
        if time_step.last():
            probs = np.ones(self._num_actions) / self._num_actions
            return rl_agent.StepOutput(action=0, probs=probs)

        # Deserialize game state
        game, state = pyspiel.deserialize_game_and_state(
            time_step.observations["serialized_state"])

        # 1) Update opponent stats (includes last_opp_action)
        last_opp_action = self._update_opp_stats(state)

        curr_mdp_state = self._mdp_current_state()
        reward = None
        if time_step.rewards is not None:
            reward = time_step.rewards[self._player_id]
        elif self._last_my_action is not None and last_opp_action is not None:
            reward = self._payoff[self._last_my_action, last_opp_action]
        if reward is not None:
            self._mdp_observe_transition(reward, curr_mdp_state)

        # 2) Update expert scores based on previous round outcome
        # We don't need time_step.rewards here; we recompute payoff from matrix.
        self._update_expert_scores(last_opp_action)

        # 3) Each expert proposes an action for THIS round
        expert_actions = self._expert_actions()
        self._last_expert_actions = list(expert_actions)  # store for next scoring

        # 4) Choose which expert to follow (epsilon-greedy over expert scores)
        if np.random.rand() < self._eps_expert:
            chosen_expert = np.random.randint(self._num_experts)
        else:
            chosen_expert = int(np.argmax(self._expert_scores))

        chosen_action = expert_actions[chosen_expert]

        # 5) Add action-level randomness for robustness
        if np.random.rand() < self._eps_action:
            action = np.random.randint(self._num_actions)
        else:
            action = chosen_action

        # Save my last action for next round's mirror expert
        self._last_my_action = action

        self._mdp_prev_state = curr_mdp_state
        self._mdp_prev_action = action

        # 6) Build probability distribution (mostly on chosen_action)
        probs = np.ones(self._num_actions) * (self._eps_action / self._num_actions)
        probs[chosen_action] += 1.0 - self._eps_action

        return rl_agent.StepOutput(action=action, probs=probs)

In [None]:
# # Just trying an example out.

my_agent = MyAgent(player_id=0, name="kate_agent", epsilon_action=0.1)
print(my_agent._num_actions)


# p1_pop_id = 17  # adddriftbot2
# agents = [
#     my_agent,
#     create_roshambo_bot_agent(1, num_actions, roshambo_bot_names, p1_pop_id)
# ]


# print("Starting eval run.")
# avg_eval_returns = eval_agents(env, agents, num_players, 10, verbose=True)

# print("Avg return ", avg_eval_returns)

In [20]:
import json
import csv

win_rates = {}   # dictionary bot_name -> average return

for bot_id, bot_name in enumerate(roshambo_bot_names):
    print(f"\nEvaluating against bot {bot_id}: {bot_name}")

    my_agent = MyAgent(player_id=0, name="my_agent", epsilon_action=0.8)
    agents = [
        my_agent,
        create_roshambo_bot_agent(
            1, num_actions, roshambo_bot_names, bot_id
        )
    ]

    avg_eval_returns = eval_agents(env, agents, num_players, 10, verbose=False)

    # avg_eval_returns is an array like [my_return, opponent_return]
    my_avg = float(avg_eval_returns[0])

    print(f"→ My agent vs {bot_name}: {my_avg:.4f}")

    win_rates[bot_name] = my_avg

# ------------------------------
# Save win rates to JSON
# ------------------------------
with open("win_rates.json", "w") as f:
    json.dump(win_rates, f, indent=2)
print("Saved win rates to win_rates.json")

# ------------------------------
# Save win rates to CSV
# ------------------------------
with open("win_rates.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["bot_name", "avg_return"])
    for bot_name, score in win_rates.items():
        writer.writerow([bot_name, score])

print("Saved win rates to win_rates.csv")


Evaluating against bot 0: actr_lag2_decay
→ My agent vs actr_lag2_decay: 6.4000

Evaluating against bot 1: adddriftbot2
→ My agent vs adddriftbot2: 8.4000

Evaluating against bot 2: addshiftbot3
→ My agent vs addshiftbot3: 36.2000

Evaluating against bot 3: antiflatbot
→ My agent vs antiflatbot: 190.5000

Evaluating against bot 4: antirotnbot
→ My agent vs antirotnbot: 34.7000

Evaluating against bot 5: biopic
→ My agent vs biopic: 21.5000

Evaluating against bot 6: boom
→ My agent vs boom: 13.1000

Evaluating against bot 7: copybot
→ My agent vs copybot: 182.0000

Evaluating against bot 8: debruijn81
→ My agent vs debruijn81: 29.7000

Evaluating against bot 9: driftbot
→ My agent vs driftbot: 11.3000

Evaluating against bot 10: flatbot3
→ My agent vs flatbot3: 26.4000

Evaluating against bot 11: foxtrotbot
→ My agent vs foxtrotbot: 17.3000

Evaluating against bot 12: freqbot2
→ My agent vs freqbot2: 157.9000

Evaluating against bot 13: granite
→ My agent vs granite: 31.4000

Evaluati

In [21]:
import pandas as pd

something = pd.read_csv("win_rates.csv")
(something['avg_return'] >= 0).sum()

np.int64(40)