## 🦚 Imports

In [1]:
!source ../venv/bin/activate

In [2]:
import gymnasium
import os
import numpy as np
from gymnasium.spaces import Discrete, MultiDiscrete
from gymnasium import spaces

from IPython.display import clear_output
import time
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector, wrappers
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import csv
import pandas as pd
#from scipy.interpolate import interp1d

import os
from typing import Optional, Tuple

from torch.utils.tensorboard import SummaryWriter
from tianshou.utils import TensorboardLogger

import gymnasium
import numpy as np
import torch
from copy import deepcopy
from tianshou.data import Collector, VectorReplayBuffer
from tianshou.env import DummyVectorEnv
from tianshou.env.pettingzoo_env import PettingZooEnv
from tianshou.policy import BasePolicy, DQNPolicy, RainbowPolicy, MultiAgentPolicyManager, RandomPolicy
from tianshou.trainer import OffpolicyTrainer
from tianshou.utils.net.common import Net

import random
import copy

NUM_ITERS = 1000
PLAYS = {"bastaushy": 0, "qostaushy": 0}
REWARDS = {"bastaushy": [], "qostaushy": []}

## 🐘 AEC environment

In [3]:
class TogyzQumalaqEnv(AECEnv):
    """
    The "name" metadata allows the environment to be pretty printed.
    """

    metadata = {
        "render_modes": ["ansi", "human"],
        "name": "togyzqumalaq_v0"
        }

    def __init__(self, render_mode=None):
        """
        The init method takes in environment arguments and
         should define the following attributes:
        - otaular
        - tuzdyq
        - qazandar
        - possible_agents
        - render_mode
        """
        self.otaular = []
        self.tuzdyq = []
        self.qazandar = []
        self.direction = []
        self.agents = ["bastaushy", "qostaushy"]
        self.possible_agents = self.agents[:]
        self.action_spaces = {i: spaces.Discrete(9) for i in self.agents}
        self.observation_spaces = {
            i: spaces.Dict(
                {
                    "observation":
                        MultiDiscrete([100] * 18 + [10] * 2 + [162] * 2 + [2]),
                    "action_mask":
                        Discrete(9),
                }
            )
            for i in self.agents
        }
        self.render_mode = render_mode

    # Action space should be defined here.
    def action_space(self, agent):
        return self.action_spaces[agent]

    # Observation space should be defined here.
    def observation_space(self, agent):
        return self.observation_spaces[agent]

    def render(self):
        """
        Renders the environment. In human mode,
        it can print to terminal, open
        up a graphical window, or open up some
        other display that
        a human can see and understand.
        """
        """Renders the environment."""
        if self.render_mode is None:
            gymnasium.logger.warn(
                "You are calling render method without "
                "specifying any render mode."
            )
            return

        if len(self.agents) == 2:
            points_bastaushy_x = np.array([i * 2 for i in range(10)])
            points_bastaushy_y = np.array([i % 5 for i in range(50)])

            qazandar = self.qazandar
            otaular = self.otaular
            tuzdyq = self.tuzdyq
            x = np.arange(-3, 225, 1)
            y = -1

            text_kwargs = dict(ha='center', va='center', fontsize=12)
            plt.figure(figsize=(15, 4))

            for i in range(9):
                # qostaushy's part
                plt.scatter(np.repeat(
                    points_bastaushy_x + 25 * i, 5)[:otaular[17 - i]],
                            points_bastaushy_y[:otaular[17 - i]], marker='o')
                # vertical lines
                plt.plot(np.repeat(25 * i - 2, len(x)),
                         np.arange(-7, 5, 12 / len(x)))
                # bastaushy's part
                plt.scatter(np.repeat(points_bastaushy_x + 25 * i, 5)[:otaular[i]],
                                points_bastaushy_y[:otaular[i]] - 6, marker='o')
            
            # horizontal line
            x_lims = np.arange(-3, 245, 1)
            plt.plot(x_lims, np.repeat(y, len(x_lims)))
            # last vertical line
            plt.plot(np.repeat(25 * 9 - 2, 13),
                     np.arange(-7, 6, 1))
        
            for i in range(9):
                # bastaushy's qumalaqtar
                plt.text(25 * i + 10, -7,
                         f'{i} ({otaular[i]})', **text_kwargs)
                # qostaushy's qumalaqtar
                plt.text(25 * i + 10, 5,
                         f'{17 - i} ({otaular[17 - i]})', **text_kwargs)
            # bastaushy qazan's qumalaqtar
            plt.text(235, -4,
                     f'qazan: {qazandar[0]}', **text_kwargs)
            # qostaushy qazan's qumalaqtar
            plt.text(235, 2,
                     f'qazan: {qazandar[1]}', **text_kwargs)
            # bastaushy tuzdyq's qumalaqtar
            plt.text(235, -6,
                     f'tuzdyq: {tuzdyq[0]}', **text_kwargs)
            # qostaushy tuzdyq's qumalaqtar
            plt.text(235, 0,
                     f'tuzdyq: {tuzdyq[1]}', **text_kwargs)
            plt.xticks([])
            plt.yticks([])
            plt.show()
        else:
            if self.render_mode == "human":
                print("Game over")
        time.sleep(1)
        #clear_output()

    def _legal_moves(self, agent):
        cur_player = self.possible_agents.index(agent)
        opp_player = (cur_player + 1) % 2
        return [item for item in range(9) if self.tuzdyq[opp_player] != item  + cur_player * 9 and self.otaular[item + cur_player * 9] > 0]

    def observe(self, agent):
        """
        Observe should return the observation of the specified agent. This function
        should return a sane observation (though not necessarily the most up to date possible)
        at any time after reset() is called.
        """
        # observation of one agent is the previous state of the other
        legal_moves = self._legal_moves(agent) if agent == self.agent_selection else []
        action_mask = np.zeros(9, "int8")
        
        for i in legal_moves:
            action_mask[i] = 1
        observation = tuple(
            self.otaular + self.tuzdyq + self.qazandar + [self.possible_agents.index(self.agent_selection)]
        )
        return {"observation": observation, "action_mask": action_mask}

    def close(self):
        """
        Close should release any graphical displays, subprocesses, network connections
        or any other environment data which should not be kept around after the
        user is no longer using the environment.
        """
        pass

    def reset(self, seed=None, options=None):
        """
        Reset needs to initialize the following attributes
        - agents
        - rewards
        - _cumulative_rewards
        - terminations
        - truncations
        - infos
        - agent_selection
        """
        self.agents = self.possible_agents[:]
        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.otaular = [9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9]
        self.direction = [list(range(18)), [9, 10, 11, 12, 13, 14, 15, 16, 17, 0, 1, 2, 3, 4, 5, 6, 7, 8]]
        self.tuzdyq = [-1, -1]
        self.qazandar = [0, 0]
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.num_moves = 0
        observation = tuple(
            self.otaular + self.tuzdyq + self.qazandar + [0]
        )
        self.observations = {agent: observation for agent in self.agents}
        """
        Our agent_selector utility allows easy cyclic stepping through the agents list.
        """
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()

    def step(self, action):
        """
        step(action) takes in an action for the current agent (specified by
        agent_selection) and needs to update
        - rewards
        - _cumulative_rewards (accumulating the rewards)
        - terminations
        - truncations
        - infos
        - agent_selection (to the next agent)
        And any internal state used by observe() or render()
        """
        if (
            self.terminations[self.agent_selection]
            or self.truncations[self.agent_selection]
        ):
            # handles stepping an agent which is already dead
            # accepts a None action for the one agent, and moves the agent_selection to
            # the next dead agent,  or if there are no more dead agents, to the next live agent
            self._was_dead_step(action)
            return
        self.rewards = {agent: 0 for agent in self.agents}
        cur_player = self.possible_agents.index(self.agent_selection)
        opp_player = (cur_player + 1) % 2
        self.num_moves += 1
        if self.render_mode == "human":
            print(f'MOVE #{self.num_moves}')
        # The truncations dictionary must be updated for all players.
        self.truncations = {
            agent: self.num_moves >= NUM_ITERS for agent in self.agents
        }
        # distribute qumalaqs
        if cur_player == 1:
            action += 9
        if self.render_mode == "human":
            print(f'{self.agent_selection} made action {action}')
        num_qumalaq = self.otaular[action]
        idx_action = self.direction[cur_player].index(action)
        if self.otaular[action] == 1:
            self.otaular[self.direction[cur_player][idx_action + 1]] += 1
            self.otaular[action] -= 1
        else:
            i = 1
            coef = 1
            if self.otaular[action] / 18 > 1:
                coef = int(self.otaular[action] / 18) + 1
            while self.otaular[action] > coef:
                self.otaular[self.direction[cur_player][(idx_action + i) % 18]] += 1
                self.otaular[action] -= 1
                i += 1
        # check tuzdyq & add rewards to qazandar
        reward = 0
        if self.tuzdyq[cur_player] < 0 and self.check_tuzdyq(self.agent_selection, action, num_qumalaq):
            reward += 3
            if self.render_mode == "human":
                print(f'{self.agent_selection} won tuzdyq {reward}')
                
            #******* awarding a rewards for receiving tuzdyq **********
            # self.rewards[self.agent_selection] += 50
            # self.rewards[self.possible_agents[opp_player]] -= 50
        else:

            if num_qumalaq > 1:
                last_otau = self.direction[cur_player][(idx_action + num_qumalaq - 1) % 18]
            else:
                last_otau = self.direction[cur_player][(idx_action + num_qumalaq) % 18]

            if (last_otau in range(opp_player * 9, (opp_player + 1) * 9) and
                    self.otaular[last_otau] % 2 == 0):
                reward += self.otaular[last_otau]
                if self.render_mode == "human":
                    print(f'{self.agent_selection} won {reward}')
                self.otaular[last_otau] = 0
            if (self.tuzdyq[cur_player] >= 0 and
                    self.otaular[self.tuzdyq[cur_player]] > 0):
                reward += self.otaular[self.tuzdyq[cur_player]]
                if self.render_mode == "human":
                    print(f'{self.agent_selection} won tuzdyq {self.otaular[self.tuzdyq[cur_player]]}')
                self.otaular[self.tuzdyq[cur_player]] = 0
        if self.render_mode == "human":
            print(f'{self.agent_selection} won total {reward}')
        self.qazandar[cur_player] += reward

        #******* awarding a rewards from otaular **********
        self.rewards[self.agent_selection] += reward
        self.rewards[self.possible_agents[opp_player]] -= reward
        
        # check if there is a winner
        winner = self.check_for_winner()
        if winner:
            self.terminations = {i: True for i in self.agents}
            if self.render_mode == "human":
                print(f'{self.agent_selection} won the game!!!')
                
            #******* awarding a reward for winning a game **********
            self.rewards[self.agent_selection] += self.qazandar[opp_player]
            self.rewards[self.possible_agents[opp_player]] -= self.qazandar[opp_player]
            for i in range(9):
                self.rewards[self.agent_selection] += self.otaular[i + 9 * opp_player]
                self.rewards[self.possible_agents[opp_player]] -= self.otaular[i + 9 * opp_player]
            
        # selects the next agent.
        self.agent_selection = self._agent_selector.next()
        # Adds .rewards to ._cumulative_rewards
        self._accumulate_rewards()

        total_rewards = sum(self.rewards.values())
        assert total_rewards == 0, f"Error: Total reward is not zero: {total_rewards}"
        total_qumalaqs = 0
        for i in self.otaular:
            total_qumalaqs += i
        for i in self.qazandar:
            total_qumalaqs += i
        assert total_qumalaqs == 162, f"Error: Total qumalaqs is not equal to 162: {total_qumalaqs}"
        if self.render_mode == "human":
            self.render()

    def check_tuzdyq(self, agent, action, num_qumalaq):
        cur_player = self.possible_agents.index(agent)
        opp_player = (cur_player + 1) % 2
        idx = self.direction[cur_player].index(action)

        if num_qumalaq > 1:
            last_otau = self.direction[cur_player][(idx + num_qumalaq - 1) % 18]
        else:
            last_otau = self.direction[cur_player][(idx + num_qumalaq) % 18]

        if (last_otau in range(opp_player * 9, (opp_player + 1) * 9) and
                self.otaular[last_otau] == 3 and last_otau != 17 - cur_player * 9 and
                abs(last_otau - self.tuzdyq[opp_player]) != 9):
            self.tuzdyq[cur_player] = last_otau
            self.otaular[last_otau] = 0
            if self.render_mode == "human":
                print(f'{agent} got tuzdyq {last_otau}!')
            return True

        return False

    def check_atsyrau(self, agent):
        cur_player = self.possible_agents.index(agent)
        opp_player = (cur_player + 1) % 2

        for idx, i in enumerate(
                self.otaular[cur_player * 9: (cur_player + 1) * 9]):
            if i > 0 and idx + cur_player * 9 != self.tuzdyq[opp_player]:
                return False
        if self.render_mode == "human":
            print(f'{agent} reached atsyrau')
        return True

    def check_for_winner(self):
        cur_player = self.possible_agents.index(self.agent_selection)
        opp_player = (cur_player + 1) % 2
        if self.qazandar[cur_player] > 81:
            PLAYS[self.agent_selection] += 1
            return True
        if (self.check_atsyrau(self.possible_agents[opp_player])
                and self.qazandar[opp_player] <= 81):
            PLAYS[self.agent_selection] += 1
            return True
        return False


def _get_env(render_mode=None):
    """This function is needed to provide callables for DummyVectorEnv."""
    def env(render_mode=None):
        """
        The env function often wraps the environment in wrappers by default.
        You can find full documentation for these methods
        elsewhere in the developer documentation.
        """
        internal_render_mode = render_mode \
            if render_mode != "ansi" else "human"
        env = TogyzQumalaqEnv(render_mode=internal_render_mode)
        # This wrapper is only for environments
        # which print results to the terminal
        if render_mode == "ansi":
            env = wrappers.CaptureStdoutWrapper(env)
        # this wrapper helps error handling for discrete action spaces
        env = wrappers.AssertOutOfBoundsWrapper(env)
        # Provides a wide vareity of helpful user errors
        # Strongly recommended
        env = wrappers.OrderEnforcingWrapper(env)
        return env
    return PettingZooEnv(env(render_mode=render_mode))

## 🦏 Testing convert functions

In [4]:
sys.path.append('../model_based_approach')
from utils import *
sys.path.append('../model_based_approach/TogyzQumalaq')
from TogyzQumalaqLogic import Board
from TogyzQumalaqGame import TogyzQumalaqGame

## 🐠 Testing game functions

In [5]:
for _ in range(50):
    game = TogyzQumalaqGame(10, 162)
    board = Board()
    
    player = 1
    moves = {1: [], -1: []}
    print(f'MOVE #0')
    game.display_board(board.pieces)
    for i in range(350):
        val_moves = game.getValidMoves(board.pieces, player)
        
        move = np.random.choice(np.nonzero(val_moves)[0], 1)[0]
        moves[player].append(move)
        print(f'MOVE #{i+1}')
        print(f'{player} makes action #{move + 1}')
        pieces, player = game.getNextState(board.pieces, player, move)
        board.pieces = pieces
        env_loc = convertBoard2TogyzQumalaq(pieces, player)
        # print('env_loc.tuzdyq[0]:', env_loc.tuzdyq[0])
        # print('env_loc.tuzdyq[1]:', env_loc.tuzdyq[1])
        game.display_board(board.pieces)
        if game.getGameEnded(board.pieces, player) != 0:
            print(f'{game.getGameEnded(board.pieces, player)} won game')
            break

MOVE #0
000|0|------------------------------------------------------------------------------------------------------------------------------------------------------------------|0|000
009|1|XXXXXXXXX------------------------------------------------------------------------------------------------------------------------------------------------OOOOOOOOO|1|009
009|2|XXXXXXXXX------------------------------------------------------------------------------------------------------------------------------------------------OOOOOOOOO|2|009
009|3|XXXXXXXXX------------------------------------------------------------------------------------------------------------------------------------------------OOOOOOOOO|3|009
009|4|XXXXXXXXX------------------------------------------------------------------------------------------------------------------------------------------------OOOOOOOOO|4|009
009|5|XXXXXXXXX------------------------------------------------------------------------------------------------------

AssertionError: Error (from convertBoard2TogyzQumalaq): Total qumalaqs is not equal to 162 : 161 player: -1 qazandar (40, 16) [1, 14, 14, 2, 2, 1, 0, 3, 4, 16, 2, 2, 4, 3, 3, 3, 16, 15] board_pieces: [[ 1.  1.  1. ... -1. -1. -1.]
 [ 1.  0.  0. ... -1. -1. -1.]
 [ 1.  1.  1. ...  0. -1. -1.]
 ...
 [-1. -1.  0. ... -1. -1. -1.]
 [ 1.  1.  1. ... -1. -1. -1.]
 [ 1.  1.  1. ... -1. -1. -1.]]

In [None]:
sum([13, 2, 13, 5, 1, 2, 12, 12, 2, 1, 14, 0, 12, 3, 2, 14, 13, 13]) + 15 + 12