Build Environment

In [3]:
import numpy as np
from tabulate import tabulate  # for rendering board
from enum import Enum
from random import randint, choice
from copy import copy, deepcopy

In [4]:
# environment possible actions: swipe to left, right, up, down
class Action(Enum):
    def __str__(self):
        return self.name
    Left = 1
    Right = 2
    Up = 3
    Down = 4


class GameEnvironment:
    def __init__(self, board_size=3, target=64, initial_state=None):
        if initial_state == None:
            # start with empty board
            self.__initial_state = np.zeros([board_size, board_size] ,int)
        else:
            # copy to prevent aliassing
            self.__initial_state = copy(initial_state)

        # dynamic board size
        self.board_size = board_size
        self.target = target
        self.__state = self.__initial_state
        self.__possible_states = []
        # maybe to remove
        self.__calculate_possible_states(self.__initial_state, depth=0)

    # maybe to remove - iterate over all possible states
    def __calculate_possible_states(self, state:np.ndarray = None, action=None, depth = 5):
        tile_2_depth = copy(depth)
        tile_4_depth = copy(depth)
        if state is None:
            state = self.__initial_state
        
        if action == None:
            possible_actions = self.get_possible_actions()
        else:
            possible_actions = [action]
            
        # get all possible actions 
        for action in possible_actions:
        # calculate the outcome state
            outcome_state = self.__calculate_transition(state, action, new_tile=False)
        # append to the self.__possible_states 
            empty_tiles = self.get_empty_tiles(outcome_state)
            if len(empty_tiles) > 0:
                # generate new tile at random empty cell
                for tile in empty_tiles:
                    new_state = deepcopy(outcome_state)   
                    #if random generated tile is 2
                    new_state[tile[0]][tile[1]] = 2
                    temp_state = deepcopy(new_state)
                    self.__possible_states.append(temp_state)
                    if not self.is_done(temp_state) and tile_2_depth > 0:
                        self.__calculate_possible_states(deepcopy(temp_state),depth= tile_2_depth - 1)
                    #if random generated tile is 4
                    new_state[tile[0]][tile[1]] = 4
                    temp_state = deepcopy(new_state)
                    self.__possible_states.append(temp_state)
                    if not self.is_done(temp_state) and tile_4_depth > 0:
                        self.__calculate_possible_states(deepcopy(temp_state),depth = tile_4_depth - 1)
                    


    def reset(self):
        self.__state = self.__initial_state
        return self.__state

    # perform action on environment
    def __calculate_transition(self, state:np.ndarray, action:Action, new_tile:bool = True):
        if self.is_done(state):
            return state
        
        new_state = deepcopy(state)

        # 1. change the state to reflect the move by the agent,
        # 2. merge same value tiles

        # swipe to left
        if action == Action.Left:
            new_state = self.swipeToLeft(new_state)
            new_state= self.mergeToLeft(new_state)
        # swipe to right
        elif action == Action.Right:
            new_state = self.swipeToRight(new_state)
            new_state = self.mergeToRight(new_state)
        elif action == Action.Up:
            # take transpose, swipe, then re-take transpose
            temp_state = self.transpose(new_state)
            temp_state = self.swipeToLeft(temp_state)
            temp_state = self.mergeToLeft(temp_state)
            new_state = self.transpose(temp_state)
        elif action == Action.Down:
            # take transpose
            temp_state = self.transpose(new_state)
            temp_state = self.swipeToRight(temp_state)
            temp_state = self.mergeToRight(temp_state)
            new_state = self.transpose(temp_state)

        # 3. generate a new tile on empty cells
        if new_tile:
            empty_state = self.get_empty_tiles(new_state)

            if len(empty_state) > 0:
                # possible generated tile values
                possible_gen_tiles = [2, 4]
                # generate new tile at random empty cell
                row, col = choice(empty_state)
                new_state[row][col] = possible_gen_tiles[randint(0, 1)]


        return new_state

    def swipeToLeft(self, state):
        for i in range(self.board_size):
            for j in range(self.board_size - 1):
                # [0,2,2]: if current cell is empty, swap with the right one
                if state[i][j] == 0:

                    # k is the offset of the first found tile
                    for k in range(1, self.board_size - j):
                        if state[i][j + k] != 0:
                            self.swap(state, i, j, i, j + k)
                            break
        return state

    def mergeToLeft(self, state):
        # merge same tiles together
        for i in range(self.board_size):
            for j in range(self.board_size - 1):
                current_tile = state[i][j]
                if current_tile != 0:
                    right_tile = state[i][j + 1]
                    if right_tile == current_tile:
                        # merge same tiles together
                        state[i][j] = current_tile * 2
                        state[i][j + 1] = 0
                        # shift to the left other tiles
                        for k in range(j + 1, self.board_size - 1):
                            # current tile equal right tile
                            state[i][j + k] = state[i][j + k + 1]

                        # last cell is empty
                        state[i][self.board_size - 1] = 0

        return state

    def swipeToRight(self, state):
        for i in range(self.board_size):
            for j in reversed(range(1, self.board_size)):
                # [2,2,0]: if current cell is empty, swap with the left one
                if state[i][j] == 0:

                    # k is the offset of the first found tile
                    for k in range(1, j + 1):
                        if state[i][j - k] != 0:

                            self.swap(state, i, j, i, j - k)
                            break
        return state

    def mergeToRight(self, state):
        # merge same tiles together
        for i in range(self.board_size):
            for j in reversed(range(1, self.board_size)):
                current_tile = state[i][j]
                if current_tile != 0:
                    left_tile = state[i][j - 1]
                    if left_tile == current_tile:
                        # merge same tiles together
                        state[i][j] = current_tile * 2
                        state[i][j - 1] = 0
                        # shift to the right other tiles
                        for k in reversed(range(1, j - 1)):
                            # current tile equal right tile
                            state[i][j - k] = state[i][j - k - 1]
                        # first cell is empty
                        state[i][0] = 0

        return state

    def transpose(self, array):
        transposed_array = np.transpose(array)
        return transposed_array

    def swap(self, state, x1, y1, x2, y2):
        # x and y are the position of the board matrix
        z = state[x1][y1]
        state[x1][y1] = state[x2][y2]
        state[x2][y2] = z

    # unit step on environment
    def step(self, action):
        old_state = self.__state
        # state after agent action
        self.__state = self.__calculate_transition(action)
        observation = self.__state  # environment is fully observable
        done = self.is_done()
        reward = self.get_reward(self.__state)
        info = {}  # optional    debug info
        return observation, done, reward, info

    # render environment (board) on CLI
    def render(self,state:np.ndarray = None):
        if state is None:
            state = deepcopy(self.__state)
        print_state = []
        for item in state:
            print_state.append(['' if x==0 else x for x in item])
        print(tabulate(print_state, tablefmt="grid"))

    # =========================================================
    # public functions for agent to calculate optimal policy
    # =========================================================

    def get_possible_states(self):
        return self.__possible_states

    # get index of empty cells
    def get_empty_tiles(self, state=None):
        if state is None:
            state = self.__state
        empty_cells = []
        for i in range(self.board_size):
            for j in range(self.board_size):
                if state[i][j] == 0:
                    empty_cells.append([i, j])

        return empty_cells

    def get_possible_actions(self, old_state:np.ndarray = None):
        if old_state is None:
            old_state = copy(self.__initial_state)

        if self.is_done(old_state):
            return []        
        
        return [Action.Left, Action.Right, Action.Up, Action.Down]
        
        possible_actions = []
        
        # Check whether 'swipe left' is possible or not
        state = deepcopy(old_state)
        state = self.swipeToLeft(state)
        break_out_flag = False
        for i in range(self.board_size):
            for j in range(self.board_size - 1):
                current_tile = state[i][j]
                if current_tile != 0:
                    right_tile = state[i][j + 1]
                    if right_tile == current_tile:
                        # left swipe merge is possible
                        possible_actions.append(Action.Left)
                        
                        # exit from nested loop
                        break_out_flag = True
                        break
                        
            if break_out_flag:
                break
                
        # Check whether 'swipe right' is possible or not
        state = deepcopy(old_state)
        state = self.swipeToRight(state)
        break_out_flag = False
        for i in range(self.board_size):
            for j in reversed(range(1, self.board_size)):
                current_tile = state[i][j]
                if current_tile != 0:
                    left_tile = state[i][j - 1]
                    if left_tile == current_tile:
                        # right swipe merge is possible
                        possible_actions.append(Action.Right)
                        
                        # exit from nested loop
                        break_out_flag = True
                        break
                        
            if break_out_flag:
                break
 
        # Check whether 'swipe up' is possible or not
        state = deepcopy(old_state)
        state = self.transpose(state)
        state = self.swipeToLeft(state)
        break_out_flag = False
        for i in range(self.board_size):
            for j in range(self.board_size - 1):
                current_tile = state[i][j]
                if current_tile != 0:
                    right_tile = state[i][j + 1]
                    if right_tile == current_tile:
                        # left swipe merge is possible
                        possible_actions.append(Action.Up)
                        
                        # exit from nested loop
                        break_out_flag = True
                        break
                        
            if break_out_flag:
                break
        state = self.transpose(state)
 
        # Check whether 'swipe down' is possible or not
        state = deepcopy(old_state)
        state = self.transpose(state)
        state = self.swipeToRight(state)
        break_out_flag = False
        for i in range(self.board_size):
            for j in reversed(range(1, self.board_size)):
                current_tile = state[i][j]
                if current_tile != 0:
                    left_tile = state[i][j - 1]
                    if left_tile == current_tile:
                        # right swipe merge is possible
                        possible_actions.append(Action.Down)
                        
                        # exit from nested loop
                        break_out_flag = True
                        break
                        
            if break_out_flag:
                break
        state = self.transpose(state)
        
        return possible_actions        
        
    # determine wheter the game is over
    # either: when all cells are occupied and no more merging is possible,
    # or 2048 tile is generated
    def is_done(self, state:np.ndarray = None):
        if state is None:
            state = self.__state

        # detect if a tile has target value (e.g. 2048)
        for i in range(self.board_size):
            for j in range(self.board_size):
                if self.__state[i][j] == self.target:
                    self.__won = True
                    return True

        # check if all cells are occupied and no more merging is possible
        if 0 not in state:
            # no more merging is possible
            for i in range(self.board_size - 1):
                for j in range(self.board_size - 1):
                    if (state[i][j] == state[i + 1][j]) or (
                        state[i][j] == state[i][j + 1]
                    ):
                        return False
            # check bottom row
            for j in range(self.board_size - 1):
                if state[self.board_size - 1][j] == state[self.board_size - 1][j + 1]:
                    return False

            # check rightmost column
            for i in range(self.board_size - 1):
                if state[i][self.board_size - 1] == state[i + 1][self.board_size - 1]:
                    return False

            return True

        return False


    # Reward R(s) for every possible state
    def get_reward(self, state):
        step_reward = 0.0
        # detect tile with target value (e.g. 2048 tile)
        for i in range(self.board_size):
            for j in range(self.board_size):
                if state[i][j] == self.target:
                    return 1
        # check if all cells are occupied and no more merging is possible
        if 0 not in state:
            # no more merging is possible
            for i in range(self.board_size - 1):
                for j in range(self.board_size - 1):
                    if (state[i][j] == state[i + 1][j]) or (
                        state[i][j] == state[i][j + 1]
                    ):
                        return step_reward
            # check bottom row
            for j in range(self.board_size - 1):
                if state[self.board_size - 1][j] == state[self.board_size - 1][j + 1]:
                    return step_reward
            # check rightmost column
            for i in range(self.board_size - 1):
                if state[i][self.board_size - 1] == state[i + 1][self.board_size - 1]:
                    return step_reward
            return step_reward
        # game is done
        return -1

    def get_transition_prob(self, action, new_state, old_state=None):
        if old_state is None:
            old_state = self.__state

        # if the game is over, no transition can take place
        if self.is_done(old_state):
            return 0.0

        # perform action on old_state
        state_after_action = self.__calculate_transition(deepcopy(old_state), action)
        # calculate possible states
        self.__calculate_possible_states(deepcopy(old_state), action, depth=0)
        possible_states_after_action = self.get_possible_states()
        print(len(possible_states_after_action))
        # check if game is done
        # if self.is_done(state_after_action) and state_after_action == new_state:
        #     # game is done and is won
        #     if self.__won:
        #         return 1.0
        #     else:
        #         return 0.0
        
        # transition probabilities
        prob = 0
        # if possible_states_after_action is not None:
        #     if new_state not in possible_states_after_action:
        #         return 0.0
        #     prob = self.count(possible_states_after_action, new_state) / (len(possible_states_after_action))
        #     # print(f"probability: {prob}")
        # else:
        #     print(f"None list. {action}\n {state_after_action}\n{possible_states_after_action}")
            # self.render(state_after_action)
        
        
        return prob
    def count(self, list, value):
        count = 0
        for x in list:
            if np.array_equal(x, value):
                count += 1
        return count    

In [5]:
mdp = GameEnvironment(3, 64, [[0, 2, 2], [0, 0, 0], [0, 0, 2]])
new_state = [[0, 2, 4], [0, 0, 0], [0, 0, 2]]
# mdp.render()
# for state in mdp.get_possible_states():
#     mdp.render(state)
for action in mdp.get_possible_actions():
    print(mdp.get_transition_prob(action, new_state))


KeyboardInterrupt: 

In [None]:
# example of creation of an environment in the default state
mdp = GameEnvironment(3, 64, [[0, 2, 2], [0, 0, 0], [0, 0, 0]])
mdp.reset()
mdp.render()

i = 1
while not mdp.is_done():
    action = randint(1,4) # random choice
    state, done, reward, info = mdp.step(Action(action))
    print(f"step {i}) Action taken: {Action(action)}, is done: {done}")
    mdp.render()
    i=i+1

print('state =', state, ', reward =', reward, ', done =', done)
mdp.render()
print('possible (internal) game states:')
mdp.get_possible_states()