In [1]:
DIRECTORY = 'drive/MyDrive/Informatics/Sphere@mail.ru/NN/Project/'

# Архитектура игры
За основу взят репозиторий: https://github.com/bennuttall/uno.git

## Реализация

In [2]:
from random import shuffle, choice
from itertools import product, repeat, chain
import random


COLORS = ['red', 'yellow', 'green', 'blue']
ALL_COLORS = COLORS + ['black']
NUMBERS = list(range(10)) + list(range(1, 10))
SPECIAL_CARD_TYPES = ['skip', 'reverse', '+2']
COLOR_CARD_TYPES = NUMBERS + SPECIAL_CARD_TYPES * 2
BLACK_CARD_TYPES = ['wildcard', '+4']
CARD_TYPES = NUMBERS + SPECIAL_CARD_TYPES + BLACK_CARD_TYPES


class UnoCard:
    """
    Represents a single Uno Card, given a valid color and card type.

    color: string
    card_type: string/int

    >>> card = UnoCard('red', 5)
    """
    def __init__(self, color, card_type):
        self._validate(color, card_type)
        self.color = color
        self.card_type = card_type
        self.temp_color = None

    def __repr__(self):
        return '<UnoCard object: {} {}>'.format(self.color, self.card_type)

    def __str__(self):
        return '{}{}'.format(self.color_short, self.card_type_short)

    def __eq__(self, other):
        return self.color == other.color and self.card_type == other.card_type
    
    def __lt__(self, other):
        return CARD_TYPES.index(self.card_type) < CARD_TYPES.index(other.card_type)
    
    def __hash__(self):
        return hash(str(self))

    def _validate(self, color, card_type):
        """
        Check the card is valid, raise exception if not.
        """
        if color not in ALL_COLORS:
            raise ValueError('Invalid color')
        if color == 'black' and card_type not in BLACK_CARD_TYPES:
            raise ValueError('Invalid card type')
        if color != 'black' and card_type not in COLOR_CARD_TYPES:
            raise ValueError('Invalid card type')

    @property
    def color_short(self):
        return self.color[0].upper()

    @property
    def card_type_short(self):
        if self.card_type in ('skip', 'reverse', 'wildcard'):
            return self.card_type[0].upper()
        else:
            return self.card_type

    @property
    def _color(self):
        return self.temp_color if self.temp_color else self.color

    @property
    def temp_color(self):
        return self._temp_color

    @temp_color.setter
    def temp_color(self, color):
        if color is not None:
            if color not in COLORS:
                raise ValueError('Invalid color')
        self._temp_color = color

    def playable(self, other):
        """
        Return True if the other card is playable on top of this card,
        otherwise return False
        """
        return (
            self._color == other.color or
            self.card_type == other.card_type or
            other.color == 'black'
        )


class UnoPlayer:
    """
    Represents a player in an Uno game. A player is created with a list of 7
    Uno cards.

    cards: list of 7 UnoCards
    player_id: int/str (default: None)

    >>> cards = [UnoCard('red', n) for n in range(7)]
    >>> player = UnoPlayer(cards)
    """
    def __init__(self, cards, player_id=None):
        if len(cards) != 7:
            raise ValueError(
                'Invalid player: must be initalised with 7 UnoCards'
            )
        if not all(isinstance(card, UnoCard) for card in cards):
            raise ValueError(
                'Invalid player: cards must all be UnoCard objects'
            )
        self.hand = cards
        self.player_id = player_id

    def __repr__(self):
        if self.player_id is not None:
            return '<UnoPlayer object: player {}>'.format(self.player_id)
        else:
            return '<UnoPlayer object>'

    def __str__(self):
        if self.player_id is not None:
            return str(self.player_id)
        else:
            return repr(self)

    def can_play(self, current_card):
        """
        Return True if the player has any playable cards (on top of the current
        card provided), otherwise return False
        """
        return any(current_card.playable(card) for card in self.hand)
    
    def _strategy(self, current_card, playable_cards):
        """
        Return:
        index of card that will be played or None if pick,
        color of new card if card color is black else None
        Random strategy
        """
        card = random.choice(playable_cards)
        if card.color == 'black':
            new_color = random.choice(COLORS)
        else:
            new_color = None
        return self.hand.index(card), new_color
    
    def play(self, current_card):
        """
        Return:
        index of card that will be played or None if pick,
        color of new card if card color is black else None
        """
        if self.can_play(current_card):
            playable_cards = [card for card in self.hand if current_card.playable(card)]
            playable_cards = list(set(playable_cards))
            return self._strategy(current_card, playable_cards)
        else:
            return None, None


class UnoGame:
    """
    Represents an Uno game.

    players: int
    is_random: bool (default: True)

    >>> game = UnoGame(5)
    """
    def __init__(self, players, is_random=True, verbose=True):
        if not isinstance(players, int):
            raise ValueError('Invalid game: players must be integer')
        if not 2 <= players <= 15:
            raise ValueError('Invalid game: must be between 2 and 15 players')
        self.deck = self._create_deck(is_random)
        self.players = [
            UnoPlayer(self._deal_hand(), n) for n in range(players)
        ]
        self._player_cycle = ReversibleCycle(self.players)
        self._current_player = next(self._player_cycle)
        self._winner = None
        self._steps = 0
        self.verbose = verbose
    
    def replace_player(self, ClassPlayer, index=0):
        
        if self._steps != 0:
            raise ValueError('Game already started')
        player = self.players.pop(index)
        player_id = player.player_id
        new_player = ClassPlayer(player.hand, player_id)

        if not isinstance(new_player, UnoPlayer):
            self.players.insert(index, player)
            raise ValueError('Invalid player: should be the instance of UnoPlayer')
        
        self.players.insert(index, new_player)
        self._player_cycle = ReversibleCycle(self.players)
        self._current_player = next(self._player_cycle)
        return new_player

    def __next__(self):
        """
        Iteration sets the current player to the next player in the cycle.
        """
        self._current_player = next(self._player_cycle)

    def _create_deck(self, is_random):
        """
        Return a list of the complete set of Uno Cards. If is_random is True, the
        deck will be shuffled, otherwise will be unshuffled.
        """
        color_cards = product(COLORS, COLOR_CARD_TYPES)
        black_cards = product(repeat('black', 4), BLACK_CARD_TYPES)
        all_cards = chain(color_cards, black_cards)
        deck = [UnoCard(color, card_type) for color, card_type in all_cards]
        if is_random:
            shuffle(deck)
            return deck
        else:
            return list(reversed(deck))

    def _deal_hand(self):
        """
        Return a list of 7 cards from the top of the deck, and remove these
        from the deck.
        """
        return [self.deck.pop() for i in range(7)]

    @property
    def current_card(self):
        return self.deck[-1]

    @property
    def is_active(self):
        return all(len(player.hand) > 0 for player in self.players)

    @property
    def current_player(self):
        return self._current_player

    @property
    def winner(self):
        return self._winner
    
    def play(self, *args, **kwargs):
        try:
            result = self._play(*args, **kwargs)
            self._steps += 1
            return result
        except Exception:
            raise

    def _play(self, player, card=None, new_color=None):
        """
        Process the player playing a card.

        player: int representing player index number
        card: int representing index number of card in player's hand

        It must be player's turn, and if card is given, it must be playable.
        If card is not given (None), the player picks up a card from the deck.

        If game is over, raise an exception.
        """
        if not isinstance(player, int):
            raise ValueError('Invalid player: should be the index number')
        if not 0 <= player < len(self.players):
            raise ValueError('Invalid player: index out of range')
        _player = self.players[player]
        if self.current_player != _player:
            raise ValueError('Invalid player: not their turn')
        if card is None:
            for i in range(1):  # TODO: const
                self._pick_up(_player, 1)
                card, new_color = _player.play(self.current_card)
                if card is not None:
                    break
            if card is None:
                next(self)
                return
        _card = _player.hand[card]
        if not self.current_card.playable(_card):
            raise ValueError(
                'Invalid card: {} not playable on {}'.format(
                    _card, self.current_card
                )
            )
        if _card.color == 'black':
            if new_color not in COLORS:
                raise ValueError(
                    'Invalid new_color: must be red, yellow, green or blue'
                )
        if not self.is_active:
            raise ValueError('Game is over')

        played_card = _player.hand.pop(card)
        self.deck.append(played_card)

        card_color = played_card.color
        card_type = played_card.card_type
        if card_color == 'black':
            self.current_card.temp_color = new_color
            if card_type == '+4':
                next(self)
                self._pick_up(self.current_player, 4)
        elif card_type == 'reverse':
            self._player_cycle.reverse()
        elif card_type == 'skip':
            next(self)
        elif card_type == '+2':
            next(self)
            self._pick_up(self.current_player, 2)

        if self.is_active:
            next(self)
        else:
            self._winner = _player
            if self.verbose:
                self._print_winner()

    def _print_winner(self):
        """
        Print the winner name if available, otherwise look up the index number.
        """
        if self.winner.player_id:
            winner_name = self.winner.player_id
        else:
            winner_name = self.players.index(self.winner)
        print("Player {} wins!".format(winner_name))

    def _pick_up(self, player, n):
        """
        Take n cards from the bottom of the deck and add it to the player's
        hand.

        player: UnoPlayer
        n: int
        """
        penalty_cards = [self.deck.pop(0) for i in range(n)]
        player.hand.extend(penalty_cards)
    
    @property
    def steps(self):
        return self._steps


class ReversibleCycle:
    """
    Represents an interface to an iterable which can be infinitely cycled (like
    itertools.cycle), and can be reversed.

    Starts at the first item (index 0), unless reversed before first iteration,
    in which case starts at the last item.

    iterable: any finite iterable

    >>> rc = ReversibleCycle(range(3))
    >>> next(rc)
    0
    >>> next(rc)
    1
    >>> rc.reverse()
    >>> next(rc)
    0
    >>> next(rc)
    2
    """
    def __init__(self, iterable):
        self._items = list(iterable)
        self._pos = None
        self._reverse = False

    def __next__(self):
        if self.pos is None:
            self.pos = -1 if self._reverse else 0
        else:
            self.pos = self.pos + self._delta
        return self._items[self.pos]

    @property
    def _delta(self):
        return -1 if self._reverse else 1

    @property
    def pos(self):
        return self._pos

    @pos.setter
    def pos(self, value):
        self._pos = value % len(self._items)

    def reverse(self):
        """
        Reverse the order of the iterable.
        """
        self._reverse = not self._reverse


class AIUnoGame:
    def __init__(self, players):
        self.game = UnoGame(players)
        self.player = choice(self.game.players)
        self.player_index = self.game.players.index(self.player)
        print('The game begins. You are Player {}.'.format(self.player_index))
        self.print_hand()
        while self.game.is_active:
            print()
            next(self)

    def __next__(self):
        game = self.game
        player = game.current_player
        player_id = player.player_id
        current_card = game.current_card
        if player == self.player:
            print('Current card: {}, color: {}'.format(
                game.current_card, game.current_card._color
            ))
            self.print_hand()
            if player.can_play(current_card):
                played = False
                while not played:
                    card_index = int(input('Which card do you want to play? '))
                    card = player.hand[card_index]
                    if not game.current_card.playable(card):
                        print('Cannot play that card')
                    else:
                        if card.color == 'black':
                            new_color = input('Which color do you want? ')
                        else:
                            new_color = None
                        game.play(player_id, card_index, new_color)
                        played = True
            else:
                print('You cannot play. You must pick up a card.')
                game.play(player_id, card=None)
                self.print_hand()
        elif player.can_play(game.current_card):
            for i, card in enumerate(player.hand):
                if game.current_card.playable(card):
                    if card.color == 'black':
                        new_color = choice(COLORS)
                    else:
                        new_color = None
                    print("Player {} played {}".format(player, card))
                    game.play(player=player_id, card=i, new_color=new_color)
                    break
        else:
            print("Player {} picked up".format(player))
            game.play(player=player_id, card=None)

    def print_hand(self):
        print('Your hand: {}'.format(
            ' '.join(str(card) for card in self.player.hand)
        ))

In [3]:
def make_game(game, verbose=False):
    if verbose:
        print("Starting a {} player game".format(players))

    while game.is_active:
        player = game.current_player
        player_id = player.player_id
        card_id, new_color = player.play(game.current_card)
        if card_id is not None:
            if verbose:
                print("Player {} played {}".format(player, player.hand[card_id]))
            game.play(player=player_id, card=card_id, new_color=new_color)
        else:
            if verbose:
                print("Player {} picked up".format(player))
            game.play(player=player_id, card=card_id)

    if verbose:
        print("{} player game - {} cards played".format(players, game.steps))
    
    return game.winner

## Test game

In [4]:
import random

players = random.randint(2, 15)
game = UnoGame(players)

make_game(game, verbose=True);

Starting a 5 player game
Player 0 played Y9
Player 1 played B9
Player 2 played B6
Player 3 played B+4
Player 0 played B2
Player 1 played R2
Player 2 played R8
Player 3 played R0
Player 4 played R3
Player 0 played R6
Player 1 played R1
Player 2 played Y1
Player 3 played Y6
Player 4 played YR
Player 3 played Y2
Player 2 played G2
Player 1 played G4
Player 0 played G5
Player 4 played G9
Player 3 played B9
Player 2 played B6
Player 1 played B3
Player 0 picked up
Player 4 played Y3
Player 3 picked up
Player 2 picked up
Player 1 played Y3
Player 0 played Y7
Player 4 played B+4
Player 2 picked up
Player 1 picked up
Player 0 played Y4
Player 4 picked up
Player 3 played B+4
Player 1 picked up
Player 0 played Y6
Player 4 picked up
Player 2 played G6
Player 1 played R6
Player 0 picked up
Player 4 played RR
Player 0 picked up
Player 1 picked up
Player 2 played RS
Player 4 played R2
Player 0 picked up
Player 1 picked up
Player 2 played G6
Player 3 played G4
Player 4 played G+2
Player 1 played G3
Pl

# Функции

In [5]:
import numpy as np
from tqdm.autonotebook import tqdm
import pickle
import os
import time
import sys

  


In [8]:
from collections import Counter

In [9]:
class AutoPopList(list):
    def __init__(self, *args, auto_pop_size=None, **kwargs):
        self.auto_pop_size = auto_pop_size
        return super().__init__(*args, **kwargs)
        
    def append(self, *args, **kwargs):
        if self.auto_pop_size:
            while len(self) >= self.auto_pop_size:
                self.pop()
        return super().append(*args, **kwargs)

In [52]:
def save_model(model, filename='model', dir_path=DIRECTORY + 'models_autosave/', verbose=True, time_filename=False, testing=False):
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    if time_filename:
        filename += '_' + time.asctime()
    if not filename.endswith('.pickle'):
        filename += '.pickle'
    full_filename = os.path.join(dir_path, filename)
    with open(full_filename, 'wb') as f:
        pickle.dump(model, f)
    if verbose:
        filesize = os.stat(full_filename).st_size
        print(f'FILENAME: {filename}\n{filesize} B\n{filesize / 1024} KB\n{filesize / 1024 / 1024} MB')
    if testing:
        os.remove(full_filename)

    return filename

def load_model(filename, dir_path=DIRECTORY + 'models_autosave'):
    full_filename = os.path.join(dir_path, filename)
    with open(full_filename, 'rb') as f:
        model = pickle.load(f)
    return model

In [11]:
def learning(
        epochs=100000, players=2, verbose=20, 
        players_to_replace=None,
        autosave=None, autotrain=None):
    wins_count = 0
    if players_to_replace is not None and len(players_to_replace) > players:
        raise ValueError('len of players_to_replace is bigger then (players + 1)')
    try:
        for epoch in tqdm(range(epochs)):
            game = UnoGame(players, verbose=None)

            player = game.replace_player(players_to_replace[0], 0)
            player_id = player.player_id
            for i, newPlayerClass in enumerate(players_to_replace[1:]):
                player = game.replace_player(newPlayerClass, i + 1)

            winner = make_game(game)
            is_win = winner.player_id == player_id
            wins_count += is_win
            
            if hasattr(player, 'backprop'):
                player.backprop(is_win)
            if verbose and epoch % (epochs // verbose) == 0:
                print(f'Wins: {wins_count} / {epoch + 1} ({round(100 * wins_count / (epoch + 1))}%)')
            if autosave is not None and epoch % autosave == 0:
                save_model(mcst, time_filename=True)
            if autotrain is not None and epoch % autotrain == 0:
                learning(mcst, epochs=1000, players=2, is_train=False)
    
        if verbose:
            print(f'Result: {wins_count} / {epochs} ({round(100 * wins_count / (epochs))}%)')
    except KeyboardInterrupt:
        pass
    return wins_count

In [12]:
class UnoPlayerWrapper(UnoPlayer):
    def __init__(self, UnoPlayerClass, *args, **kwargs):
        self.UnoPlayerClass = UnoPlayerClass
        self.saved_args = args
        self.saved_kwargs = kwargs
    
    def __call__(self, *args, **kwargs):
        kwargs.update(self.saved_kwargs)
        args += self.saved_args
        istance = self.UnoPlayerClass(*args, **kwargs)
        return istance

# Разные агенты

In [13]:
class UnoPlayerStratMin(UnoPlayer):
    def _strategy(self, current_card, playable_cards):
        card = sorted(playable_cards)[0] # Sorted by value 
                                         # according to CARD_TYPES
        if card.color == 'black':
            freq = Counter(card.color for card in self.hand if card.color is not 'black')
            if not len(freq.items()):
                new_color = random.choice(COLORS)
            else:
                new_color = freq.most_common(1)[0][0]
        else:
            new_color = None
        return self.hand.index(card), new_color


In [None]:
class UnoPlayerStratMinChColor(UnoPlayer):
    '''
    Min + change color when possible.
    '''
    def _strategy(self, current_card, playable_cards):
        """
        Return:
        index of card that will be played or None if pick,
        color of new card if card color is black else None
        Random strategy
        """

        cards = sorted(playable_cards) # Sorted by value 
                                         # according to CARD_TYPES

        card = cards[0]
        for new_card in cards:
            if new_card.color != current_card.color:
                card = new_card
                break

        if card.color == 'black':
            freq = Counter(card.color for card in self.hand if card.color is not 'black')
            if not len(freq.items()):
                new_color = random.choice(COLORS)
            else:
                new_color = freq.most_common(1)[0][0]
        else:
            new_color = None
        return self.hand.index(card), new_color


In [None]:
class UnoPlayerStratMax(UnoPlayer):
    def _strategy(self, current_card, playable_cards):
        """
        Return:
        index of card that will be played or None if pick,
        color of new card if card color is black else None
        Random strategy
        """

        card = sorted(playable_cards)[-1] # Sorted by value 
                                         # according to CARD_TYPES

        if card.color == 'black':
            freq = Counter(card.color for card in self.hand if card.color is not 'black')
            if not len(freq.items()):
                new_color = random.choice(COLORS)
            else:
                new_color = freq.most_common(1)[0][0]
                
        else:
            new_color = None
        return self.hand.index(card), new_color


In [None]:
class UnoPlayerStratMinMax(UnoPlayer):
    state = False
    
    def _strategy(self, current_card, playable_cards):
        """
        Return:
        index of card that will be played or None if pick,
        color of new card if card color is black else None
        Random strategy
        """

        if self.state:
            card = sorted(playable_cards)[0]
        else:    
            card = sorted(playable_cards)[-1] # Sorted by value 
                                             # according to CARD_TYPES
        self.state = not self.state

        if card.color == 'black':
            freq = Counter(card.color for card in self.hand if card.color is not 'black')
            if not len(freq.items()):
                new_color = random.choice(COLORS)
            else:
                new_color = freq.most_common(1)[0][0]

        else:
            new_color = None
        return self.hand.index(card), new_color


In [15]:
list_of_players = [
    UnoPlayerStratMin, UnoPlayer
]
learning(epochs=10000, players=2,
         players_to_replace=list_of_players)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 1 / 1 (100%)
Wins: 298 / 501 (59%)
Wins: 581 / 1001 (58%)
Wins: 874 / 1501 (58%)
Wins: 1180 / 2001 (59%)
Wins: 1477 / 2501 (59%)
Wins: 1770 / 3001 (59%)
Wins: 2091 / 3501 (60%)
Wins: 2376 / 4001 (59%)
Wins: 2681 / 4501 (60%)
Wins: 2978 / 5001 (60%)
Wins: 3274 / 5501 (60%)
Wins: 3575 / 6001 (60%)
Wins: 3872 / 6501 (60%)
Wins: 4170 / 7001 (60%)
Wins: 4465 / 7501 (60%)
Wins: 4774 / 8001 (60%)
Wins: 5067 / 8501 (60%)
Wins: 5358 / 9001 (60%)
Wins: 5666 / 9501 (60%)

Result: 5940 / 10000 (59%)


5940

# Статистики

In [None]:
from tqdm.autonotebook import tqdm
import numpy as np

def test_classes(
        epochs=100000, players=2, verbose=20, 
        players_to_replace=None):
    wins_count = 0
    all_cnt = 0
    all_black = 0
    win_black = 0
    game_steps = np.zeros(epochs)
    if players_to_replace is not None and len(players_to_replace) > players:
        raise ValueError('len of players_to_replace is bigger then (players + 1)')
    try:
        for epoch in tqdm(range(epochs)):
            game = UnoGame(players, verbose=None)

            player = game.replace_player(players_to_replace[0], 0)
            player_id = player.player_id
            for i, newPlayerClass in enumerate(players_to_replace[1:]):
                player = game.replace_player(newPlayerClass, i + 1)

            winner = make_game(game)
            game_steps[all_cnt] = game.steps
            all_cnt += 1
            if game.current_card.color == 'black':
                all_black += 1
                win_black += (winner.player_id == player_id)
            is_win = winner.player_id == player_id
            
            wins_count += is_win
            if verbose and epoch % (epochs // verbose) == 0:
                print(f'Wins: {wins_count} / {epoch + 1} ({round(100 * wins_count / (epoch + 1))}%)')
                # print('All black:', all_black / all_cnt)
                # print('Win black:', win_black / all_cnt)
    
        if verbose:
            print(f'Test result: {wins_count} / {epochs} ({round(100 * wins_count / (epochs))}%)')
    except KeyboardInterrupt:
        pass
    return wins_count, all_cnt, all_black, win_black, np.mean(game_steps)

In [None]:
_, all_cnt, all_black, win_black, steps = test_classes(epochs=10000, players=2, verbose=20, 
    players_to_replace=[UnoPlayerStratMinMax, UnoPlayer])

print('All games:', all_cnt)
print('All black:', all_black / all_cnt)
print('Win black:', win_black / all_cnt)
print('Steps:', steps)

# Простой вариант

In [16]:
class UnoPlayer_V1(UnoPlayer):
    def __init__(self, *args, need_backprop=False, mcst=None, **kwargs):
        self.need_backprop = need_backprop
        self.mcst = mcst
        return super().__init__(*args, **kwargs)

    def _strategy(self, current_card, playable_cards):
        state = current_card
        actions = []
        new_color = None
        colors_counter = Counter([card.color for card in self.hand if card.color != 'black']).most_common(1)
        if len(colors_counter) > 0:
            new_color = colors_counter[0][0]
        else:
            new_color = random.choice(COLORS)
        for card in playable_cards:
            if card.color == 'black':
                actions.append((card, new_color))
            else:
                actions.append((card, None))
        if self.need_backprop:
            self.mcst.train()
        else:
            self.mcst.eval()
        card, new_color = self.mcst.select(state, actions)
        return self.hand.index(card), new_color

In [30]:
class SimpleMCSTNode_V1:
    def __init__(self):
        self.n = 0
        self.n_win = 0
        self.n_lose = 0


class SimpleMCST_V1:
    def __init__(self, auto_pop_size=None, c=1):
        self.root = {}
        self.need_backprop = True
        self.n = 0
        self.c = 1
        self.backprop_list = AutoPopList(auto_pop_size=auto_pop_size)
    
    def from_another(self, another):
        self.root = another.root
        self.need_backprop = False
        self.n = another.n
        return self
    
    def select(self, state, actions):
        best_act, best_score, best_node = None, None, None

        if state not in self.root:
            self.root[state] = {}

        for action in actions:
            if action not in self.root[state]:
                self.root[state][action] = SimpleMCSTNode_V1()
            if self.root[state][action].n == 0:
                best_act = action
                best_node = self.root[state][action]
                break
            first_add = self.root[state][action].n_win - self.root[state][action].n_lose
            second_add = self.c * np.sqrt(2 * np.log(self.n) / self.root[state][action].n)
            score = first_add + second_add
            if best_act is None or score > best_score:
                best_score = score
                best_act = action
                best_node = self.root[state][action]

        self.backprop_list.append(best_node)
        return best_act

    def eval(self):
        self.need_backprop = False
    
    def train(self):
        self.need_backprop = True
    
    def no_backprop(self):
        self.cur = self.root
    
    def backprop(self, is_win):
        if not self.need_backprop:
            raise RuntimeError('Backprop in eval mode not allowed')
        win = 1
        lose = 1
        if is_win:
            lose = 0
        else:
            win = 0
        self.n += 1
        for node in self.backprop_list:
            node.n += 1
            node.n_win += win
            node.n_lose += lose
        self.backprop_list.clear()

In [None]:
simple_mcst_v1 = SimpleMCST_V1(10)
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V1, mcst=simple_mcst_v1, need_backprop=True),
    UnoPlayerWrapper(UnoPlayer_V1, mcst=simple_mcst_v1)
]

epochs = 10000
learning(
    epochs=epochs, players=2,
    players_to_replace=list_of_players,
    autosave=None, autotrain=None)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 0 / 1 (0%)
Wins: 264 / 501 (53%)
Wins: 518 / 1001 (52%)
Wins: 775 / 1501 (52%)
Wins: 1033 / 2001 (52%)
Wins: 1282 / 2501 (51%)
Wins: 1531 / 3001 (51%)
Wins: 1777 / 3501 (51%)
Wins: 2033 / 4001 (51%)
Wins: 2271 / 4501 (50%)
Wins: 2531 / 5001 (51%)
Wins: 2801 / 5501 (51%)
Wins: 3068 / 6001 (51%)
Wins: 3303 / 6501 (51%)
Wins: 3549 / 7001 (51%)
Wins: 3800 / 7501 (51%)
Wins: 4048 / 8001 (51%)
Wins: 4316 / 8501 (51%)
Wins: 4578 / 9001 (51%)
Wins: 4853 / 9501 (51%)

Result: 5102 / 10000 (51%)


5102

In [None]:
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V1, mcst=mcst),
]
learning(epochs=10000, players=2,
         players_to_replace=list_of_players)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 0 / 1 (0%)
Wins: 284 / 501 (57%)
Wins: 543 / 1001 (54%)
Wins: 810 / 1501 (54%)
Wins: 1086 / 2001 (54%)
Wins: 1358 / 2501 (54%)
Wins: 1626 / 3001 (54%)
Wins: 1931 / 3501 (55%)
Wins: 2222 / 4001 (56%)
Wins: 2482 / 4501 (55%)
Wins: 2771 / 5001 (55%)
Wins: 3061 / 5501 (56%)
Wins: 3332 / 6001 (56%)
Wins: 3616 / 6501 (56%)
Wins: 3899 / 7001 (56%)
Wins: 4173 / 7501 (56%)
Wins: 4454 / 8001 (56%)
Wins: 4709 / 8501 (55%)
Wins: 4994 / 9001 (55%)
Wins: 5266 / 9501 (55%)

Result: 5534 / 10000 (55%)


5534

In [None]:
save_model(mcst, dir_path='.', time_filename=True, testing=True);

FILENAME: model_Tue May 25 13:19:36 2021.pickle
91334 B
89.193359375 KB
0.08710289001464844 MB


Вывод:

Работает лучше рандомного агента (57-58% побед)

При backprop только на последние n узлов дерева работает еще чуть лучше (59-60% побед)

Плохо работает, так как мало входных параметров

# Чуть сложнее

In [46]:
states_list_V2 = []
state_to_id_V2 = {}

actions_list_V2 = []
action_to_id_V2 = {}

In [47]:
class UnoPlayer_V2(UnoPlayer): 
    def __init__(self, *args, need_backprop=False, mcst=None, **kwargs):
        self.need_backprop = need_backprop
        self.mcst = mcst
        return super().__init__(*args, **kwargs)
  
    def _strategy(self, current_card, playable_cards):
        state = (current_card,) + tuple(set([card for card in self.hand]))
        actions = []
        for card in playable_cards:
            if card.color == 'black':
                for new_color in COLORS:
                    actions.append((card, new_color))
            else:
                actions.append((card, None))
        if self.need_backprop:
            self.mcst.train()
        else:
            self.mcst.eval()
        if state not in state_to_id_V2:
            states_list_V2.append(state)
            state_idx = len(states_list_V2) - 1
            state_to_id_V2[state] = state_idx
        state_idx = state_to_id_V2[state]
        action_idxs = []
        for action in actions:
            if action not in action_to_id_V2:
                actions_list_V2.append(action)
                action_idx = len(actions_list_V2) - 1
                action_to_id_V2[action] = action_idx
            action_idx = action_to_id_V2[action]
            action_idxs.append(action_idx)
        
        action_idx = self.mcst.select(state_idx, action_idxs)
        card, new_color = actions_list_V2[action_idx]
        return self.hand.index(card), new_color

In [48]:
class MCSTNode_V2:
    def __init__(self, state=None, action=None, prev=None, c=1):
        self.next = {} # State - Action - MCSTNode_V2
        self.prev = prev
        self.n = 0
        self.n_win = 0
        self.n_lose = 0
        self.state = state
        self.action = action
        self.c = c
    
    def choose_action(self, state, actions, n_games):
        best_act, best_score = None, None

        if state not in self.next:
            self.next[state] = {}

        for action in actions:
            if action not in self.next[state]:
                self.next[state][action] = MCSTNode_V2(state, action, self)
            if self.next[state][action].n == 0:
                best_act = self.next[state][action]
                break
            first_add = self.next[state][action].n_win - self.next[state][action].n_lose
            second_add = self.c * np.sqrt(2 * np.log(n_games) / self.next[state][action].n)
            score = first_add + second_add
            if best_act is None or score > best_score:
                best_score = score
                best_act = self.next[state][action]

        return best_act


class MCST_V2:
    def __init__(self):
        self.root = MCSTNode_V2()
        self.need_backprop = True
        self.cur = self.root
    
    @property
    def n(self):
        return self.root.n
    
    def from_another(self, another):
        self.root = another.root
        self.need_backprop = False
        self.cur = self.root
        return self
    
    def select(self, state, actions):
        best_node = self.cur.choose_action(state, actions, self.root.n)
        action = best_node.action
        self.cur = best_node
        return action

    def eval(self):
        self.need_backprop = False
    
    def train(self):
        self.need_backprop = True
    
    def no_backprop(self):
        self.cur = self.root
    
    def backprop(self, is_win):
        if not self.need_backprop:
            raise RuntimeError('Backprop in eval mode not allowed')
        win = 1
        lose = 1
        if is_win:
            lose = 0
        else:
            win = 0
        while self.cur is not None:
            self.cur.n += 1
            self.cur.n_win += win
            self.cur.n_lose += lose
            self.cur = self.cur.prev
        self.cur = self.root

In [54]:
simple_mcst_v2 = SimpleMCST_V1(20) # Дерево из первого (два слоя)
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V2, mcst=simple_mcst_v2, need_backprop=True),
    #UnoPlayerWrapper(UnoPlayer_V2, mcst=simple_mcst_v2)
]

epochs = 100000
learning(
    epochs=epochs, players=2,
    players_to_replace=list_of_players,
    autosave=None, autotrain=None)

HBox(children=(FloatProgress(value=0.0, max=100000.0), HTML(value='')))

Wins: 0 / 1 (0%)
Wins: 2569 / 5001 (51%)
Wins: 5121 / 10001 (51%)
Wins: 7667 / 15001 (51%)
Wins: 10162 / 20001 (51%)
Wins: 12734 / 25001 (51%)
Wins: 15300 / 30001 (51%)
Wins: 17852 / 35001 (51%)
Wins: 20391 / 40001 (51%)
Wins: 22971 / 45001 (51%)
Wins: 25506 / 50001 (51%)
Wins: 27956 / 55001 (51%)
Wins: 30494 / 60001 (51%)
Wins: 33059 / 65001 (51%)
Wins: 35673 / 70001 (51%)
Wins: 38240 / 75001 (51%)
Wins: 40797 / 80001 (51%)
Wins: 43369 / 85001 (51%)
Wins: 45913 / 90001 (51%)
Wins: 48441 / 95001 (51%)

Result: 51015 / 100000 (51%)


51015

In [55]:
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V2, mcst=simple_mcst_v2),
]
learning(epochs=10000, players=2,
         players_to_replace=list_of_players)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 1 / 1 (100%)
Wins: 271 / 501 (54%)
Wins: 548 / 1001 (55%)
Wins: 799 / 1501 (53%)
Wins: 1051 / 2001 (53%)
Wins: 1308 / 2501 (52%)
Wins: 1572 / 3001 (52%)
Wins: 1824 / 3501 (52%)
Wins: 2078 / 4001 (52%)
Wins: 2326 / 4501 (52%)
Wins: 2591 / 5001 (52%)
Wins: 2825 / 5501 (51%)
Wins: 3085 / 6001 (51%)
Wins: 3342 / 6501 (51%)
Wins: 3608 / 7001 (52%)
Wins: 3866 / 7501 (52%)
Wins: 4111 / 8001 (51%)
Wins: 4376 / 8501 (51%)
Wins: 4625 / 9001 (51%)
Wins: 4869 / 9501 (51%)

Result: 5120 / 10000 (51%)


5120

In [56]:
mcst_v2 = MCST_V2()
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V2, mcst=mcst_v2, need_backprop=True),
    UnoPlayerWrapper(UnoPlayer_V2, mcst=mcst_v2)
]

epochs = 10000
learning(
    epochs=epochs, players=2,
    players_to_replace=list_of_players,
    autosave=None, autotrain=None)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 1 / 1 (100%)
Wins: 262 / 501 (52%)
Wins: 496 / 1001 (50%)
Wins: 764 / 1501 (51%)
Wins: 1000 / 2001 (50%)
Wins: 1249 / 2501 (50%)
Wins: 1488 / 3001 (50%)
Wins: 1757 / 3501 (50%)
Wins: 1993 / 4001 (50%)
Wins: 2247 / 4501 (50%)
Wins: 2498 / 5001 (50%)
Wins: 2750 / 5501 (50%)
Wins: 2997 / 6001 (50%)
Wins: 3262 / 6501 (50%)
Wins: 3524 / 7001 (50%)
Wins: 3792 / 7501 (51%)
Wins: 4031 / 8001 (50%)
Wins: 4286 / 8501 (50%)
Wins: 4552 / 9001 (51%)
Wins: 4810 / 9501 (51%)

Result: 5083 / 10000 (51%)


5083

In [57]:
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V2, mcst=mcst_v2),
]
learning(epochs=10000, players=2,
         players_to_replace=list_of_players)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 0 / 1 (0%)
Wins: 257 / 501 (51%)
Wins: 506 / 1001 (51%)
Wins: 751 / 1501 (50%)
Wins: 988 / 2001 (49%)
Wins: 1243 / 2501 (50%)
Wins: 1488 / 3001 (50%)
Wins: 1712 / 3501 (49%)
Wins: 1966 / 4001 (49%)
Wins: 2205 / 4501 (49%)
Wins: 2459 / 5001 (49%)
Wins: 2711 / 5501 (49%)
Wins: 2968 / 6001 (49%)
Wins: 3237 / 6501 (50%)
Wins: 3488 / 7001 (50%)
Wins: 3734 / 7501 (50%)
Wins: 3988 / 8001 (50%)
Wins: 4237 / 8501 (50%)
Wins: 4494 / 9001 (50%)
Wins: 4734 / 9501 (50%)

Result: 4975 / 10000 (50%)


4975

In [None]:
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V1, mcst=simple_mcst_v1),
    UnoPlayerWrapper(UnoPlayer_V2, mcst=simple_mcst_v2),
]
learning(epochs=10000, players=2,
         players_to_replace=list_of_players)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 1 / 1 (100%)
Wins: 273 / 501 (54%)
Wins: 555 / 1001 (55%)
Wins: 830 / 1501 (55%)
Wins: 1142 / 2001 (57%)
Wins: 1407 / 2501 (56%)
Wins: 1665 / 3001 (55%)
Wins: 1954 / 3501 (56%)
Wins: 2237 / 4001 (56%)
Wins: 2510 / 4501 (56%)
Wins: 2798 / 5001 (56%)
Wins: 3068 / 5501 (56%)
Wins: 3359 / 6001 (56%)
Wins: 3623 / 6501 (56%)
Wins: 3916 / 7001 (56%)
Wins: 4180 / 7501 (56%)
Wins: 4444 / 8001 (56%)
Wins: 4745 / 8501 (56%)
Wins: 5037 / 9001 (56%)
Wins: 5308 / 9501 (56%)

Result: 5583 / 10000 (56%)


5583

In [None]:
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V1, mcst=simple_mcst_v1),
    UnoPlayerWrapper(UnoPlayer_V2, mcst=mcst_v2),
]
learning(epochs=10000, players=2,
         players_to_replace=list_of_players)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 1 / 1 (100%)
Wins: 289 / 501 (58%)
Wins: 590 / 1001 (59%)
Wins: 856 / 1501 (57%)
Wins: 1137 / 2001 (57%)
Wins: 1406 / 2501 (56%)
Wins: 1687 / 3001 (56%)
Wins: 1974 / 3501 (56%)
Wins: 2262 / 4001 (57%)
Wins: 2559 / 4501 (57%)
Wins: 2827 / 5001 (57%)
Wins: 3107 / 5501 (56%)
Wins: 3377 / 6001 (56%)
Wins: 3673 / 6501 (56%)
Wins: 3969 / 7001 (57%)
Wins: 4253 / 7501 (57%)
Wins: 4542 / 8001 (57%)
Wins: 4827 / 8501 (57%)
Wins: 5088 / 9001 (57%)
Wins: 5371 / 9501 (57%)

Result: 5666 / 10000 (57%)


5666

In [None]:
list_of_players = [
    UnoPlayerWrapper(UnoPlayer_V2, mcst=simple_mcst_v2),
    UnoPlayerWrapper(UnoPlayer_V2, mcst=mcst_v2),
]
learning(epochs=10000, players=2,
         players_to_replace=list_of_players)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))

Wins: 0 / 1 (0%)
Wins: 254 / 501 (51%)
Wins: 515 / 1001 (51%)
Wins: 755 / 1501 (50%)
Wins: 1024 / 2001 (51%)
Wins: 1298 / 2501 (52%)
Wins: 1544 / 3001 (51%)
Wins: 1819 / 3501 (52%)
Wins: 2073 / 4001 (52%)
Wins: 2355 / 4501 (52%)
Wins: 2601 / 5001 (52%)
Wins: 2857 / 5501 (52%)
Wins: 3120 / 6001 (52%)
Wins: 3371 / 6501 (52%)
Wins: 3642 / 7001 (52%)
Wins: 3890 / 7501 (52%)
Wins: 4141 / 8001 (52%)
Wins: 4392 / 8501 (52%)
Wins: 4661 / 9001 (52%)
Wins: 4901 / 9501 (52%)

Result: 5175 / 10000 (52%)


5175

In [53]:
save_model(simple_mcst_v2, dir_path='.', time_filename=True, testing=True);

FILENAME: model_Wed May 26 14:39:07 2021.pickle
24329622 B
23759.396484375 KB
23.20253562927246 MB


# Rlcard

https://github.com/datamllab/rlcard

In [58]:
!pip3 install rlcard[torch] 
!pip3 install rlcard[tensorflow] 

Collecting tensorflow<2.0,>=1.14; extra == "tensorflow"
[?25l  Downloading https://files.pythonhosted.org/packages/9a/51/99abd43185d94adaaaddf8f44a80c418a91977924a7bc39b8dacd0c495b0/tensorflow-1.15.5-cp37-cp37m-manylinux2010_x86_64.whl (110.5MB)
[K     |████████████████████████████████| 110.5MB 93kB/s 
Collecting tensorboard<1.16.0,>=1.15.0
[?25l  Downloading https://files.pythonhosted.org/packages/1e/e9/d3d747a97f7188f48aa5eda486907f3b345cd409f0a0850468ba867db246/tensorboard-1.15.0-py3-none-any.whl (3.8MB)
[K     |████████████████████████████████| 3.8MB 31.9MB/s 
Collecting keras-applications>=1.0.8
[?25l  Downloading https://files.pythonhosted.org/packages/71/e3/19762fdfc62877ae9102edf6342d71b28fbfd9dea3d2f96a882ce099b03f/Keras_Applications-1.0.8-py3-none-any.whl (50kB)
[K     |████████████████████████████████| 51kB 5.7MB/s 
[?25hCollecting tensorflow-estimator==1.15.1
[?25l  Downloading https://files.pythonhosted.org/packages/de/62/2ee9cd74c9fa2fa450877847ba560b260f5d0fb70ee

In [59]:
def remove_illegal(action_probs, legal_actions):
    ''' Remove illegal actions and normalize the
        probability vector
    Args:
        action_probs (numpy.array): A 1 dimention numpy array.
        legal_actions (list): A list of indices of legal actions.
    Returns:
        probd (numpy.array): A normalized vector without legal actions.
    '''
    probs = np.zeros(action_probs.shape[0])
    probs[legal_actions] = action_probs[legal_actions]
    if np.sum(probs) == 0:
        probs[legal_actions] = 1 / len(legal_actions)
    else:
        probs /= sum(probs)
    return probs

In [60]:
def tournament(env, num, type_payoff='rlcard'):
    ''' Evaluate he performance of the agents in the environment
    Args:
        env (Env class): The environment to be evaluated.
        num (int): The number of games to play.
        type_payoff (string): 'rlcard' or 'percent'
    Returns:
        A list of avrage payoffs for each player
    '''
    payoffs = [0 for _ in range(env.player_num)]
    counter = 0
    while counter < num:
        _, _payoffs = env.run(is_training=False)
        if isinstance(_payoffs, list):
            for _p in _payoffs:
                for i, _ in enumerate(payoffs):
                    if type_payoff == 'percent':
                        to_add = max(_p[i], 0)
                    elif type_payoff == 'rlcard':
                        to_add = _p[i]
                    payoffs[i] += to_add
                counter += 1
        else:
            for i, _ in enumerate(payoffs):
                if type_payoff == 'percent':
                    to_add = max(_payoffs[i], 0)
                elif type_payoff == 'rlcard':
                    to_add = _payoffs[i]
                payoffs[i] += to_add
            counter += 1
    for i, _ in enumerate(payoffs):
        payoffs[i] /= counter
    return payoffs

## CFR

In [65]:
import os

def get_model_from_drive():
    dir_path = DIRECTORY + 'models_autosave/cfr_model'
    if not os.path.exists(dir_path):
        return
    if not os.path.exists('./cfr_model'):
        os.makedirs('./cfr_model')
    ! cp $dir_path/* ./cfr_model/

get_model_from_drive()

In [66]:
import numpy as np
import collections

import os
import pickle

class CFRAgent():
    ''' Implement CFR (chance sampling) algorithm
    '''

    def __init__(self, env, model_path='./cfr_model', level_limit=None):
        ''' Initilize Agent
        Args:
            env (Env): Env class
        '''
        self.use_raw = False
        self.env = env
        self.model_path = model_path
        self.level_limit = level_limit

        # A policy is a dict state_str -> action probabilities
        self.policy = collections.defaultdict(list)
        self.average_policy = collections.defaultdict(np.array)

        # Regret is a dict state_str -> action regrets
        self.regrets = collections.defaultdict(np.array)

        self.iteration = 0
        
        self.verbose = True

    def train(self):
        ''' Do one iteration of CFR
        '''
        self.iteration += 1
        # Firstly, traverse tree to compute counterfactual regret for each player
        # The regrets are recorded in traversal
        self.traverse_iter = 0
        for player_id in range(self.env.player_num):
            self.env.reset()
            probs = np.ones(self.env.player_num)
            self.traverse_tree(probs, player_id)

        # Update policy
        self.update_policy()

    def traverse_tree(self, probs, player_id, level=0):
        ''' Traverse the game tree, update the regrets
        Args:
            probs: The reach probability of the current node
            player_id: The player to update the value
        Returns:
            state_utilities (list): The expected utilities for all the players
        '''
        if self.env.is_over():
            return self.env.get_payoffs()
        if self.verbose:
            print(f'\r iter {self.traverse_iter}. level {level}', end='')
        if self.traverse_iter % 10000 == 0:
            print()
        self.traverse_iter += 1

        current_player = self.env.get_player_id()

        action_utilities = {}
        state_utility = np.zeros(self.env.player_num)
        obs, legal_actions = self.get_state(current_player)
        action_probs = self.action_probs(obs, legal_actions, self.policy)

        if self.level_limit is not None and level > self.level_limit:
            return state_utility

        for action in legal_actions:
            action_prob = action_probs[action]
            new_probs = probs.copy()
            new_probs[current_player] *= action_prob

            # Keep traversing the child state
            self.env.step(action)
            utility = self.traverse_tree(new_probs, player_id, level + 1)
            self.env.step_back()

            state_utility += action_prob * utility
            action_utilities[action] = utility

        if not current_player == player_id:
            return state_utility

        # If it is current player, we record the policy and compute regret
        player_prob = probs[current_player]
        counterfactual_prob = (np.prod(probs[:current_player]) *
                                np.prod(probs[current_player + 1:]))
        player_state_utility = state_utility[current_player]

        if obs not in self.regrets:
            self.regrets[obs] = np.zeros(self.env.action_num)
        if obs not in self.average_policy:
            self.average_policy[obs] = np.zeros(self.env.action_num)
        for action in legal_actions:
            action_prob = action_probs[action]
            regret = counterfactual_prob * (action_utilities[action][current_player]
                    - player_state_utility)
            self.regrets[obs][action] += regret
            self.average_policy[obs][action] += self.iteration * player_prob * action_prob
        return state_utility

    def update_policy(self):
        ''' Update policy based on the current regrets
        '''
        for obs in self.regrets:
            self.policy[obs] = self.regret_matching(obs)

    def regret_matching(self, obs):
        ''' Apply regret matching
        Args:
            obs (string): The state_str
        '''
        regret = self.regrets[obs]
        positive_regret_sum = sum([r for r in regret if r > 0])

        action_probs = np.zeros(self.env.action_num)
        if positive_regret_sum > 0:
            for action in range(self.env.action_num):
                action_probs[action] = max(0.0, regret[action] / positive_regret_sum)
        else:
            for action in range(self.env.action_num):
                action_probs[action] = 1.0 / self.env.action_num
        return action_probs

    def action_probs(self, obs, legal_actions, policy):
        ''' Obtain the action probabilities of the current state
        Args:
            obs (str): state_str
            legal_actions (list): List of leagel actions
            player_id (int): The current player
            policy (dict): The used policy
        Returns:
            (tuple) that contains:
                action_probs(numpy.array): The action probabilities
                legal_actions (list): Indices of legal actions
        '''
        if obs not in policy.keys():
            action_probs = np.array([1.0/self.env.action_num for _ in range(self.env.action_num)])
            self.policy[obs] = action_probs
        else:
            action_probs = policy[obs]
        action_probs = remove_illegal(action_probs, legal_actions)
        return action_probs

    def eval_step(self, state):
        ''' Given a state, predict action based on average policy
        Args:
            state (numpy.array): State representation
        Returns:
            action (int): Predicted action
        '''
        probs = self.action_probs(state['obs'].tostring(), state['legal_actions'], self.average_policy)
        action = np.random.choice(len(probs), p=probs)
        return action, probs

    def get_state(self, player_id):
        ''' Get state_str of the player
        Args:
            player_id (int): The player id
        Returns:
            (tuple) that contains:
                state (str): The state str
                legal_actions (list): Indices of legal actions
        '''
        state = self.env.get_state(player_id)
        return state['obs'].tostring(), state['legal_actions']

    def save(self):
        ''' Save model
        '''
        if not os.path.exists(self.model_path):
            os.makedirs(self.model_path)

        policy_file = open(os.path.join(self.model_path, 'policy.pkl'),'wb')
        pickle.dump(self.policy, policy_file)
        policy_file.close()

        average_policy_file = open(os.path.join(self.model_path, 'average_policy.pkl'),'wb')
        pickle.dump(self.average_policy, average_policy_file)
        average_policy_file.close()

        regrets_file = open(os.path.join(self.model_path, 'regrets.pkl'),'wb')
        pickle.dump(self.regrets, regrets_file)
        regrets_file.close()

        iteration_file = open(os.path.join(self.model_path, 'iteration.pkl'),'wb')
        pickle.dump(self.iteration, iteration_file)
        iteration_file.close()

    def load(self):
        ''' Load model
        '''
        if not os.path.exists(self.model_path):
            return False

        policy_file = open(os.path.join(self.model_path, 'policy.pkl'),'rb')
        self.policy = pickle.load(policy_file)
        policy_file.close()

        average_policy_file = open(os.path.join(self.model_path, 'average_policy.pkl'),'rb')
        self.average_policy = pickle.load(average_policy_file)
        average_policy_file.close()

        regrets_file = open(os.path.join(self.model_path, 'regrets.pkl'),'rb')
        self.regrets = pickle.load(regrets_file)
        regrets_file.close()

        iteration_file = open(os.path.join(self.model_path, 'iteration.pkl'),'rb')
        self.iteration = pickle.load(iteration_file)
        iteration_file.close()

        return True

In [67]:
from warnings import filterwarnings
filterwarnings('ignore')

In [71]:
import os

import rlcard
# from rlcard.agents import CFRAgent
from rlcard.agents import RandomAgent
# from rlcard.utils.utils import tournament
from rlcard.utils.utils import set_global_seed
from rlcard.utils import Logger

# Make environment
env = rlcard.make('uno', config={'seed': 0, 'allow_step_back':True})
eval_env = rlcard.make('uno', config={'seed': 0})

# The paths for saving the logs and learning curves
log_dir = './experiments/uno_cfr_result/'

# Set a global seed
set_global_seed(0)

agent = CFRAgent(env, level_limit=15)
if agent.load():
    print('Agent loaded')

random_agent = RandomAgent(action_num=eval_env.action_num)
env.set_agents([agent, random_agent])
eval_env.set_agents([agent, random_agent])

# Init a Logger to plot the learning curve
logger = Logger(log_dir)

# Set the iterations numbers and how frequently we evaluate the performance
evaluate_every = 10
epochs = 100
evaluate_num = 10000

for epoch in tqdm(range(epochs)):
    try:
        agent.train()
    except KeyboardInterrupt:
        print('KeyboardInterrupt on epoch {epoch')
        break
    print('\rIteration {}'.format(epoch), end='')
    # Evaluate the performance. Play with NFSP agents.
    if (epoch + 1) % evaluate_every == 0:
        agent.save() # Save model
        logger.log_performance(env.timestep, tournament(eval_env, evaluate_num, type_payoff='percent')[0])

# Close files in the logger
logger.close_files()

# Plot the learning curve
logger.plot('CFR')

Agent loaded


HBox(children=(FloatProgress(value=0.0), HTML(value='')))

In [72]:
tournament(eval_env, 10000, type_payoff='percent')

[0.4928, 0.5072]

In [74]:
!ls -lh ./cfr_model

total 366M
-rw-r--r-- 1 root root  57M May 26 15:05 average_policy.pkl
-rw-r--r-- 1 root root    5 May 26 15:05 iteration.pkl
-rw-r--r-- 1 root root 253M May 26 15:05 policy.pkl
-rw-r--r-- 1 root root  57M May 26 15:05 regrets.pkl


In [75]:
dir_path=DIRECTORY + 'models_autosave/cfr_model/'
if not os.path.exists(dir_path):
    os.makedirs(dir_path)
! cp ./cfr_model/* $dir_path

## DQN

In [None]:
''' An example of learning a Deep-Q Agent on UNO
'''

import tensorflow as tf
import os

import rlcard
from rlcard.agents import DQNAgent
from rlcard.agents import RandomAgent
from rlcard.utils.utils import set_global_seed, tournament
from rlcard.utils import Logger
from tqdm.autonotebook import tqdm

# Make environment
env = rlcard.make('uno', config={'seed': 0})
eval_env = rlcard.make('uno', config={'seed': 0})

# Set the iterations numbers and how frequently we evaluate the performance
evaluate_every = 100
evaluate_num = 1000
episode_num = 100000

# The intial memory size
memory_init_size = 1000

# Train the agent every X steps
train_every = 1

# The paths for saving the logs and learning curves
log_dir = './experiments/uno_dqn_result/'

# Set a global seed
set_global_seed(0)

with tf.Session() as sess:

    # Initialize a global step
    global_step = tf.Variable(0, name='global_step', trainable=False)

    # Set up the agents
    agent = DQNAgent(sess,
                     scope='dqn',
                     action_num=env.action_num,
                     replay_memory_size=20000,
                     replay_memory_init_size=memory_init_size,
                     train_every=train_every,
                     state_shape=env.state_shape,
                     mlp_layers=[512,512])
    random_agent = RandomAgent(action_num=eval_env.action_num)
    env.set_agents([agent, random_agent])
    eval_env.set_agents([agent, random_agent])

    # Initialize global variables
    sess.run(tf.global_variables_initializer())

    # Init a Logger to plot the learning curve
    logger = Logger(log_dir)

    for episode in tqdm(range(episode_num)):

        # Generate data from the environment
        trajectories, _ = env.run(is_training=True)

        # Feed transitions into agent memory, and train the agent
        for ts in trajectories[0]:
            agent.feed(ts)

        # Evaluate the performance. Play with random agents.
        if episode % evaluate_every == 0:
            logger.log_performance(env.timestep, tournament(eval_env, evaluate_num)[0])

    # Close files in the logger
    logger.close_files()

    # Plot the learning curve
    logger.plot('DQN')
    
    # Save model
    save_dir = 'models/uno_dqn'
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    saver = tf.train.Saver()
    saver.save(sess, os.path.join(save_dir, 'model'))

In [None]:
# Последний вывод

# INFO - Agent dqn, step 557000, rl-loss: 0.002407593885436654
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 558000, rl-loss: 0.003889711108058691
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 559000, rl-loss: 0.003005279926583171
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 559009, rl-loss: 0.009682988747954369
# ----------------------------------------
#   timestep     |  1108168
#   reward       |  0.092
# ----------------------------------------
# INFO - Agent dqn, step 560000, rl-loss: 0.003239815589040518
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 561000, rl-loss: 0.0022931178100407124
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 561420, rl-loss: 0.003184293396770954
# ----------------------------------------
#   timestep     |  1112947
#   reward       |  0.1
# ----------------------------------------
# INFO - Agent dqn, step 562000, rl-loss: 0.019770847633481026
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 563000, rl-loss: 0.006966903805732727
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 563639, rl-loss: 0.020534304901957512
# ----------------------------------------
#   timestep     |  1117374
#   reward       |  0.062
# ----------------------------------------
# INFO - Agent dqn, step 564000, rl-loss: 0.0037206162232905626
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 565000, rl-loss: 0.011016655713319778
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 566000, rl-loss: 0.056470587849617004
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 566116, rl-loss: 0.00207774480804801
# ----------------------------------------
#   timestep     |  1122293
#   reward       |  0.092
# ----------------------------------------
# INFO - Agent dqn, step 567000, rl-loss: 0.026489363983273506
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 568000, rl-loss: 0.0018865950405597687
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 568491, rl-loss: 0.0013463989598676562
# ----------------------------------------
#   timestep     |  1126995
#   reward       |  0.088
# ----------------------------------------
# INFO - Agent dqn, step 569000, rl-loss: 0.002077497309073806
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 570000, rl-loss: 0.0022331371437758207
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 570952, rl-loss: 0.0008809937862679362
# ----------------------------------------
#   timestep     |  1131888
#   reward       |  0.066
# ----------------------------------------
# INFO - Agent dqn, step 571000, rl-loss: 0.004484367091208696
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 572000, rl-loss: 0.0006894284160807729
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 573000, rl-loss: 0.011121409013867378
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 573282, rl-loss: 0.001479726517572999
# ----------------------------------------
#   timestep     |  1136468
#   reward       |  0.096
# ----------------------------------------
# INFO - Agent dqn, step 574000, rl-loss: 0.0050432817079126835
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 575000, rl-loss: 0.0030693798325955868
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 575711, rl-loss: 0.007900373078882694
# ----------------------------------------
#   timestep     |  1141283
#   reward       |  0.098
# ----------------------------------------
# INFO - Agent dqn, step 576000, rl-loss: 0.01133214682340622
# INFO - Copied model parameters to target network.
# INFO - Agent dqn, step 576040, rl-loss: 0.003942469134926796