In [1]:
import pyspiel
import numpy as np
from cheat import *

In [21]:
game = pyspiel.load_game("python_cheat")
game.new_initial_state()

P0: [1,2,2,3,3,3,4,5,5,6,6,6]
P1: [1,1,1,2,2,3,4,4,4,5,5,6]

In [5]:
from open_spiel.python import rl_environment
from open_spiel.python.algorithms import tabular_qlearner

# Initialise environment
env = rl_environment.Environment("python_cheat")

num_players = env.num_players
num_actions = env.action_spec()["num_actions"]

# Initialise agents
agents = [
  tabular_qlearner.QLearner(player_id=idx, num_actions=num_actions)
  for idx in range(num_players)
]


In [18]:
total_episodes = 500

for cur_episode in range(total_episodes):
	if cur_episode % 10 == 0:
		print(f"Episodes: {cur_episode}/{total_episodes}", end="\r")

	time_step = env.reset()

	while not env.get_state.is_terminal() and not time_step.last():
		if env.get_state.is_chance_node():
			outcomes_with_probs = state.chance_outcomes()
			action_list, prob_list = zip(*outcomes_with_probs)
			action = np.random.choice(action_list, p=prob_list)
			env.get_state.apply_action(action)
		else:
			actions = []
			# print(env.get_state)
			for player in range(num_players):
				actions.append(agents[player].step(time_step).action)
				# print(env.get_state._action_to_string(player, actions[-1]))
			time_step = env.step(actions)

	for agent in agents:
		agent.step(time_step)
		
print(f"Episodes: {total_episodes}/{total_episodes}", end="\r")
print("\nCompleted training")

Episodes: 500/500
Completed training


In [19]:
from open_spiel.python.algorithms import random_agent

eval_agents = [
    agents[0],
    random_agent.RandomAgent(1, env.game.num_distinct_actions(), "Random"),
]
# eval_agents = agents
# eval_agents = [
#     random_agent.RandomAgent(0, env.game.num_distinct_actions(), "Random"),
#     agents[1],
# ]

n_games = 100
n_wins = 0
is_evaluation = True

for cur_episode in range(n_games):
    # if cur_episode % 10 == 0:
    time_step = env.reset()
    curr = 0
    while not env.get_state.is_terminal() and not time_step.last():
        if env.get_state.is_chance_node():
            outcomes_with_probs = state.chance_outcomes()
            action_list, prob_list = zip(*outcomes_with_probs)
            action = np.random.choice(action_list, p=prob_list)
            env.get_state.apply_action(action)
        else:
            curr += 1
            actions = []
            for player in range(len(eval_agents)):
                actions.append(
                    eval_agents[player].step(time_step, is_evaluation=is_evaluation).action
                )
            time_step = env.step(actions)
        
    for agent in eval_agents:
        agent.step(time_step, is_evaluation=is_evaluation)

    if env.get_state.lead_player() == 0:
        n_wins += 1

    print(f"Episode {cur_episode + 1}/{n_games}, Won: {n_wins} ({100 * n_wins / (cur_episode + 1):.2f}%)", end="\r")
print("\nGames ended!")


Episode 100/100, Won: 74 (74.00%)
Games ended!


In [32]:
# Interactive
first_player = True
player = 0 if first_player else 1
opponent = agents[1 - player]

is_evaluation = True

from collections import Counter

time_step = env.reset()
transcript = []

while not env.get_state.is_terminal() and not time_step.last():
    if env.get_state.is_chance_node():
        outcomes_with_probs = state.chance_outcomes()
        action_list, prob_list = zip(*outcomes_with_probs)
        action = np.random.choice(action_list, p=prob_list)
        state.apply_action(action)
    else:
        actions = []

        state = env.get_state

        actions.append(opponent.step(time_step, is_evaluation=is_evaluation).action)

        last_claim_action = state.last_claim_action

        if last_claim_action == -1:
            last_put = "n/a"
        else:
            _, count, num = state._decode_play(last_claim_action)
            last_put = f"{count} x {num}s"

        player_legal_actions = state.legal_actions(player)

        possible_actions = []
        action_accuse = state._ENCODE_ACTION[Action.ACCUSE]
        action_pass = state._ENCODE_ACTION[Action.PASS]

        if action_accuse in player_legal_actions:
            possible_actions.append("accuse (a)")
        if action_pass in player_legal_actions:
            possible_actions.append("pass (p)")
        if any(i > 1 for i in player_legal_actions):
            possible_actions.append("claim (c [amount] [num] [c_1],...,[c_amount])")

        msg = "\n".join(
            (
                f"Your deck: {state.hand_bits_to_nums(state.hands[player])}.",
                f"Pile has {int.bit_count(int(state.pile))} cards.",
                f"Last put on pile: {last_put}.",
                f"Actions: {', '.join(possible_actions)}.",
                "Next move: ",
            )
        )

        command = ""
        first_entry = True
        while True:
            if not first_entry:
                print(f"Invalid move: {command}, try again.")
            first_entry = False

            command = input(msg)

            if not command:
                continue
            if command == "a":
                if action_accuse not in player_legal_actions:
                    continue
                actions.append(action_accuse)
                break
            if command == "p":
                if action_pass not in player_legal_actions:
                    continue
                actions.append(action_pass)
                break

            args = command.split(" ")
            if (
                len(args) != 4
                or args[0] != "c"
                or not args[1].isnumeric()
                or not args[2].isnumeric()
            ):
                continue
            _, _count, _num, _truth = args
            num, count = int(_num), int(_count)

            _cards_played = _truth.split(",")
            if len(_cards_played) != count or not all(
                card.isnumeric() for card in _cards_played
            ) or num not in state._legal_next_number() or count > state._NUM_NUMBERS:
                continue

            cards_played = map(int, _cards_played)
            counter = Counter(cards_played)
            card_masks = [
                (
                    num,
                    sum(
                        1 << (state._NUM_NUMBERS * i + (num - 1))
                        for i in range(state._NUM_SUITES)
                    ),
                )
                for num in counter
            ]

            truth = 0
            insufficient_cards = False

            for num, mask in card_masks:
                play = mask & state.hands[player]
                num_available = int.bit_count(int(play))
                num_played = counter[num]

                if num_available < num_played:
                    insufficient_cards = True
                    break

                if num_available == num_played:
                    truth += play
                    continue
                
                rem_to_dispose = num_available - num_played
                bits_traversed = 0

                while rem_to_dispose > 0:
                    if play % 2 == 1:
                        rem_to_dispose -= 1
                    play >>= 1
                    bits_traversed += 1

                play <<= bits_traversed
                truth += play

            if insufficient_cards:
                continue

            action = state._ENCODE_ACTION[state._encode_play(truth, count, num)]
            assert action in player_legal_actions
            actions.append(action)
            break
            
        if actions[0] == action_accuse:
            opponent_msg = "Opponent accuses!"
        elif actions[0] == action_pass:
            opponent_msg = "Opponent passes."
        else:
            _, count, num = state._decode_play(state._DECODE_ACTION[actions[0]])
            opponent_msg = f"Opponent claims {count} x {num}s."
        
        if actions[1] == action_accuse:
            player_msg = "You accuse!"
        elif actions[1] == action_pass:
            player_msg = "You pass."
        else:
            _, count, num = state._decode_play(state._DECODE_ACTION[actions[1]])
            player_msg = f"You claim {count} x {num}s."
        
        print("----")
        if first_player:
            print(player_msg, flush=True)
            print(opponent_msg, flush=True)
        else:
            print(opponent_msg, flush=True)
            print(player_msg, flush=True)

        if first_player:
            actions = actions[::-1]
        transcript.extend(state._action_to_string(i, actions[i]) for i in range(2))

        time_step = env.step(actions)

opponent.step(time_step, is_evaluation=is_evaluation)

print("----\nGame ended!")
if env.get_state.lead_player() == player:
    print("You won!")
else:
    print("You lost!")


----
You claim 3 x 1s.
Opponent passes.
----
You pass.
Opponent claims 2 x 2s.
----
You claim 1 x 2s.
Opponent passes.
----
You pass.
Opponent claims 3 x 2s.
----
You accuse!
Opponent passes.
----
You claim 1 x 3s.
Opponent passes.
----
You pass.
Opponent claims 4 x 3s.
----
You accuse!
Opponent passes.
----
You claim 3 x 4s.
Opponent passes.
----
You pass.
Opponent claims 3 x 5s.
----
You accuse!
Opponent passes.
----
You claim 2 x 5s.
Opponent passes.
----
You pass.
Opponent claims 4 x 6s.
----
You accuse!
Opponent passes.
----
You claim 2 x 6s.
Opponent passes.
----
You pass.
Opponent claims 4 x 6s.
----
Game ended!
You won!


In [None]:
print("----\nTranscript:")
print("\n".join(transcript))