In [None]:
# Stable Baselines only supports tensorflow 1.x for now
%tensorflow_version 1.x
!pip install stable-baselines[mpi]==2.10.0
!pip install pygame

TensorFlow 1.x selected.
Collecting stable-baselines[mpi]==2.10.0
  Downloading stable_baselines-2.10.0-py3-none-any.whl (248 kB)
[K     |████████████████████████████████| 248 kB 6.7 MB/s 
Installing collected packages: stable-baselines
  Attempting uninstall: stable-baselines
    Found existing installation: stable-baselines 2.2.1
    Uninstalling stable-baselines-2.2.1:
      Successfully uninstalled stable-baselines-2.2.1
Successfully installed stable-baselines-2.10.0
Collecting pygame
  Downloading pygame-2.0.1-cp37-cp37m-manylinux1_x86_64.whl (11.8 MB)
[K     |████████████████████████████████| 11.8 MB 5.8 MB/s 
[?25hInstalling collected packages: pygame
Successfully installed pygame-2.0.1


In [None]:
import numpy as np
import sys
import math
import random

import gym
from gym import spaces
# from gym.envs.classic_control import rendering

import pygame
from pygame.locals import *

import matplotlib.pyplot as plt

pygame 2.0.1 (SDL 2.0.14, Python 3.7.11)
Hello from the pygame community. https://www.pygame.org/contribute.html


# OOP Code

## Piece

In [None]:
class Piece:
    def __init__(self, *args):
        if len(args) == 0:
            self.value = 0
        elif len(args) == 1:
            self.value = args[0]

    def set_value(self, value):
        self.value = value

    def get_value(self):
        return self.value

    def value_upgrade(self):
        self.value *= 2

    def value_remove(self):
        self.value = 0


## Board

In [None]:
class Board:
    def __init__(self, rows, cols):
        self.cols = cols
        self.rows = rows

        self.board = []
        for r in range(self.rows):
            self.board.append([])
            for c in range(self.cols):
                self.board[r].append(Piece())

    def get_board_values(self):
        b = []
        for r in range(self.rows):
            b.append([])
            for c in range(self.cols):
                b[r].append(self.board[r][c].get_value())
        return b

    def get_piece(self, row, col):
        return self.board[row][col]

    def put_piece(self, row, col, piece):
        self.board[row][col] = piece

    def put_piece_value(self, row, col, value):
        self.board[row][col].set_value(value)

    def remove_piece(self, row, col):
        self.board[row][col].value_remove()

    def get_empty_pos(self):
        empty_r = []
        empty_c = []

        for r in range(self.rows):
            for c in range(self.cols):
                if self.board[r][c].get_value() == 0:
                    empty_r.append(r)
                    empty_c.append(c)

        return empty_r, empty_c

    def move_piece(self, old_r, old_c, new_r, new_c):

        piece_o = self.get_piece(old_r, old_c)
        piece_d = self.get_piece(new_r, new_c)

        if piece_o.get_value() == 0:
            raise ValueError('Old position ({}, {}) empty!'.format(old_r, old_c))
        if piece_d.get_value() != 0:
            raise ValueError('New position ({}, {}) already ocupied!'.format(new_r, new_c))

        self.put_piece(new_r, new_c, piece_o)
        self.put_piece(old_r, old_c, Piece())

    def merge_pieces(self, row1, col1, row2, col2):
        self.board[row1][col1].value_upgrade()
        self.remove_piece(row2, col2)
        return self.board[row1][col1].get_value()


## BoardController

In [None]:
class BoardController:
	def __init__(self, rows, cols):
		self.rows = rows
		self.cols = cols
		self.board = Board(self.rows, self.cols)

	def get_rowcols(self):
		return self.rows, self.cols

	def set_board(self, new_board):
		for ind_r, r in enumerate(new_board):
			for ind_c, c in enumerate(r):
				self.board.put_piece(ind_r, ind_c, Piece(c))

	def get_board_values(self):
		return self.board.get_board_values()

	def get_board(self):
		return self.board

	def get_score(self):
		b = self.board.get_board_values()

		sum = 0
		for r in b:
			for c in r:
				sum += c
		return sum

	def get_max_value(self):
		b = self.board.get_board_values()

		max = 0
		for r in b:
			for c in r:
				if c > max:
					max = c
		return max

	def appear_piece(self):
		empty_r, empty_c = self.board.get_empty_pos()
		random_ind = random.randint(0, len(empty_r)-1)

		random_value = random.randint(1,10)
		if random_value <= 9: # 90% value 2
			random_value = 2
		else: # 10% value 4
			random_value = 4

		# print(empty_r, empty_c, random_ind, random_value)

		self.board.put_piece(empty_r[random_ind], empty_c[random_ind], Piece(random_value))
		return empty_r[random_ind], empty_c[random_ind]

	def reestart_board(self):
		for r in range(self.rows):
			for c in range(self.cols):
				self.board.put_piece(r, c, Piece())

	def check_nomoves(self):
		for r in range(self.rows):
			for c in range(self.cols):
				value = self.board.get_piece(r, c).get_value()
				if value == 0:
					return False

				if r != 0:
					value_up = self.board.get_piece(r-1, c).get_value()
					if value_up == value:
						return False
				if c != 0:
					value_left = self.board.get_piece(r, c-1).get_value()
					if value_left == value:
						return False
		return True

	def check_winner(self):
		b = self.get_board_values()
		for r in b:
			for c in r:
				if c == 2048:
					return True
		return False


	def move_left(self):
		movements = 0
		merged_values = 0
		try:
			for r in range(self.rows):
				merged_row = False
				for c in range(self.cols):
					if c != 0: # ignore the pieces on position 0 that will not move
						if self.board.get_piece(r, c).get_value() != 0:
							# print('-----')
							piece_c = c
							while(self.board.get_piece(r, piece_c-1).get_value() == 0):
								self.board.move_piece(r, piece_c, r, piece_c-1)
								# print('moving ({}, {}) to ({}, {})'.format(r,piece_c, r,piece_c-1))

								movements += 1
								piece_c -= 1
								if piece_c == 0:
									break

							if merged_row == False and piece_c != 0:

								# b_values = self.get_board()
								# print(np.matrix(b_values))

								if self.can_be_merged(r, piece_c-1, r, piece_c):
									# print('merged')
									m_value = self.board.merge_pieces(r, piece_c-1, r, piece_c)
									merged_row = True
									movements += 1
									merged_values += m_value

			return movements, merged_values

		except ValueError as ve:
			print(str(ve))
			return 0

	def move_right(self):
		movements = 0
		merged_values = 0
		try:
			for r in range(self.rows):
				merged_row = 0
				for c in range(self.cols-1, -1, -1):
					if c != self.cols-1:
						if self.board.get_piece(r, c).get_value() != 0:
							piece_c = c
							while(self.board.get_piece(r, piece_c+1).get_value() == 0):
								self.board.move_piece(r, piece_c, r, piece_c+1)

								movements += 1
								piece_c += 1
								if piece_c == self.cols-1:
									break

							if merged_row == False and piece_c != self.cols-1:
								if self.can_be_merged(r, piece_c+1, r, piece_c):
									m_value = self.board.merge_pieces(r, piece_c+1, r, piece_c)
									merged_row = True
									movements += 1
									merged_values += m_value

			return movements, merged_values

		except ValueError as ve:
			print(str(ve))
			return 0

	def move_up(self):
		movements = 0
		merged_values = 0
		try:
			for c in range(self.cols):
				merged_col = 0
				for r in range(self.rows):
					if r != 0:
						if self.board.get_piece(r, c).get_value() != 0:
							piece_r = r
							while(self.board.get_piece(piece_r-1, c).get_value() == 0):
								self.board.move_piece(piece_r, c, piece_r-1, c)

								movements += 1
								piece_r -= 1
								if piece_r == 0:
									break

							if merged_col == False and piece_r != 0:
								if self.can_be_merged(piece_r-1, c, piece_r, c):
									m_value = self.board.merge_pieces(piece_r-1, c, piece_r, c)
									merged_row = True
									movements += 1
									merged_values += m_value

			return movements, merged_values

		except ValueError as ve:
			print(str(ve))
			return 0

	def move_down(self):
		movements = 0
		merged_values = 0
		try:
			for c in range(self.cols):
				merged_col = 0
				for r in range(self.rows-1, -1, -1):
					if r != self.rows-1:
						if self.board.get_piece(r, c).get_value() != 0:
							piece_r = r
							while(self.board.get_piece(piece_r+1, c).get_value() == 0):
								self.board.move_piece(piece_r, c, piece_r+1, c)

								movements += 1
								piece_r += 1
								if piece_r == self.cols-1:
									break

							if merged_col == False and piece_r != self.cols-1:
								if self.can_be_merged(piece_r+1, c, piece_r, c):
									m_value = self.board.merge_pieces(piece_r+1, c, piece_r, c)
									merged_row = True
									movements += 1
									merged_values += m_value

			return movements, merged_values

		except ValueError as ve:
			print(str(ve))
			return 0

	def can_be_merged(self, row1, col1, row2, col2):
		val1 = self.board.get_piece(row1, col1).get_value()
		val2 = self.board.get_piece(row2, col2).get_value()

		if val1 == val2:
			return True
		return False


## UIBoard

In [None]:
class UIBoard:
    def __init__(self, bcontrol, area, pygame, screen):
        self.b_controller = bcontrol
        self.rows, self.cols = bcontrol.get_rowcols()
        self.area = area
        self.width, self.height = area
        self.small_width = self.width * 14/16
        self.small_height = self.height * 14/16

        self.time = '0:00'
        self.moves = 0

        self.game = pygame
        self.screen = screen
        self.font_numbers = self.game.font.Font(None, 60)
        self.font_text = self.game.font.Font(None, 60)
        self.font_info = self.game.font.Font(None, 24)
        self.font_score = self.game.font.Font(None, 24)

        self.GRAY = (185, 173, 160)
        self.GRAY2 = (238, 228, 218)
        self.GRAY3 = (205, 196, 179)
        self.BLACK = (0, 0, 0)
        self.WHITE = (255, 255, 255)
        self.ORANGE = (242, 177, 121)
        self.ORANGE2 = (245, 149, 99)
        self.RED = (245, 124, 95)
        self.RED2 = (247, 94, 60)
        self.YELLOW = (236, 206, 115)
        self.YELLOW2 = (236, 200, 80)
        self.YELLOW3 = (237, 204, 97)
        self.YELLOW4 = (237, 197, 63)
        self.YELLOW5 = (238, 194, 46)
        self.BLUE = (0, 0, 255)
        self.GREEN = (0, 128, 0)

        self.values2colorsPiece = {	0: self.GRAY3,      2: self.GRAY2,      4: self.GRAY2,
                                    8: self.ORANGE,     16: self.ORANGE2,   32: self.RED,
                                    64: self.RED2,      128: self.YELLOW,   256: self.YELLOW2,
                                    512: self.YELLOW3,  1024: self.YELLOW4, 2048: self.YELLOW5 }
        self.values2colorsNumber = {0: self.WHITE,      2: self.BLACK,      4: self.BLACK,
                                    8: self.WHITE,      16: self.WHITE,     32: self.WHITE,
                                    64: self.WHITE,     128: self.WHITE,    256: self.WHITE,
                                    512: self.WHITE,    1024: self.WHITE,   2048: self.WHITE }

    def set_time_moves(self, t, m):
        self.time = t
        self.moves = m

    def print_board_values(self):

        b_values = self.b_controller.get_board_values()
        print(np.matrix(b_values))

    def print_base_screen(self):
        board = self.b_controller.get_board_values()

        self.screen.fill(self.WHITE)
        white_borderx = self.width*1/16
        white_bordery = self.height*1/16
        self.game.draw.rect(self.screen, self.GRAY, (white_borderx, white_bordery,
                                                    self.small_width, self.small_height))

        miniborderx = 1/32*self.small_width
        minibordery = 1/32*self.small_height
        #there will be 5 miniborders
        rectanglex = self.small_width*(32-5)/32 / self.cols
        rectangley = self.small_height*(32-5)/32 / self.rows
        for ind_r, r in enumerate(board):
            for ind_c, c in enumerate(r):
                x_pos =  white_borderx + (ind_c+1) * miniborderx + ind_c * rectanglex
                y_pos =  white_bordery + (ind_r+1) * minibordery + ind_r * rectangley

                rr = self.game.draw.rect(self.screen, self.values2colorsPiece[c], (x_pos, y_pos, rectanglex, rectangley), border_radius=3)

                if c != 0:
                    text_o = self.font_numbers.render(str(c), 0, self.values2colorsNumber[c])

                    textRect = text_o.get_rect()
                    textRect.center = (x_pos + rectanglex/2, y_pos + rectangley/2)
                    self.screen.blit(text_o, textRect)


        #score
        score = self.b_controller.get_score()
        text_score = self.font_score.render('Score: {}'.format(score), 0, self.BLACK)
        textRect = text_score.get_rect()
        textRect.x = white_borderx
        textRect.centery = white_bordery/2
        self.screen.blit(text_score, textRect)

        #time
        text_score = self.font_score.render(self.time, 0, self.GRAY)
        textRect = text_score.get_rect()
        textRect.left = self.width - white_borderx*2
        textRect.centery = self.height - white_bordery/2
        self.screen.blit(text_score, textRect)

        #moves
        text_score = self.font_score.render("{} moves".format(self.moves), 0, self.GRAY)
        textRect = text_score.get_rect()
        textRect.x = white_borderx
        textRect.centery = self.height - white_bordery/2
        self.screen.blit(text_score, textRect)

        self.game.display.update()


    def new_piece(self, row, col):
        self.print_base_screen()

        value_p = self.b_controller.get_board().get_piece(row, col).get_value()


        white_borderx = self.width*1/16
        white_bordery = self.height*1/16
        miniborderx = 1/32*self.small_width
        minibordery = 1/32*self.small_height
        rectanglex = self.small_width*(32-5)/32 / self.cols
        rectangley = self.small_height*(32-5)/32 / self.rows

        x_pos =  white_borderx + (col+1) * miniborderx + col * rectanglex
        y_pos =  white_bordery + (row+1) * minibordery + row * rectangley

        rect_full = self.game.Rect(x_pos, y_pos, rectanglex, rectangley)
        rr = self.game.draw.rect(self.screen, self.values2colorsPiece[0], rect_full, border_radius=3)

        divisions = 20
        ms2wait = 5
        for i in range(divisions):
            rect_grow = self.game.Rect(x_pos, y_pos, rectanglex*i/divisions, rectangley*i/divisions)
            rect_grow.center = rect_full.center

            rr = self.game.draw.rect(self.screen, self.values2colorsPiece[value_p], rect_grow, border_radius=3)
            self.game.display.update()

            self.game.time.wait(ms2wait)


    def print_ending(self, text, color_text):
        self.print_base_screen()
        center_screen_x = self.width / 2
        center_screen_y = self.height / 2

        self.game.draw.rect(self.screen, self.WHITE,
                            (self.width / 4 - 1, self.height * 3 / 8 - 1, self.width / 2 + 2, self.height / 4 + 2))
        self.game.draw.rect(self.screen, self.BLACK,
                            (self.width / 4, self.height * 3 / 8, self.width / 2, self.height / 4))

        text_o = self.textOutline(self.font_text, text, color_text, self.WHITE)

        textRect = text_o.get_rect()
        textRect.center = (self.width / 2, self.height / 2)
        self.screen.blit(text_o, textRect)
        self.game.display.update()

    def print_win(self):
        text = 'YOU WIN !!'
        self.print_ending(text, self.GREEN)

    def print_loss(self):
        text = 'YOU LOSE ...'
        self.print_ending(text, self.BLUE)

    def textOutline(self, font, message, fontcolor, outlinecolor):
        base = font.render(message, 0, fontcolor)
        outline = self.textHollow(font, message, outlinecolor)
        img = self.game.Surface(outline.get_size(), 16)
        img.blit(base, (1, 1))
        img.blit(outline, (0, 0))
        img.set_colorkey(0)
        return img

    def textHollow(self, font, message, fontcolor):
        notcolor = [c ^ 0xFF for c in fontcolor]
        base = font.render(message, 0, fontcolor, notcolor)
        size = base.get_width() + 2, base.get_height() + 2
        img = self.game.Surface(size, 16)
        img.fill(notcolor)
        base.set_colorkey(0)
        img.blit(base, (0, 0))
        img.blit(base, (2, 0))
        img.blit(base, (0, 2))
        img.blit(base, (2, 2))
        base.set_colorkey(0)
        base.set_palette_at(1, notcolor)
        img.blit(base, (1, 1))
        img.set_colorkey(notcolor)
        return img


## GameController

In [None]:


class GameController:
	def __init__(self, rows, cols):
		pygame.init()

		self.rows = rows
		self.columns = cols
		self.area = (600, 600)

		self.screen = pygame.display.set_mode(self.area)
		pygame.display.set_caption('2048')
		self.clock = pygame.time.Clock()
		self.clock.tick(30) # 60 FPS
		self.cooldown = 1000 #1s to wait when game ended

		self.b_controller = BoardController(self.rows, self.columns)
		self.ui = UIBoard(self.b_controller, self.area, pygame, self.screen)
		self.player = Player(self.b_controller, pygame, self.ui)


	def play(self):
		running = True
		ended = False

		self.reestart_game()

		moves = 0
		start = pygame.time.get_ticks()
		while running:
			if ended:
				moving = self.player.wait_to_reestart()
				if moving == False:
					running = False

				now = pygame.time.get_ticks()
				if now - start_wait >= self.cooldown:
					ended = False
					self.reestart_game()
					pygame.event.clear()
					start = 0

			else:

				now = pygame.time.get_ticks()
				time_playing = self.ticks2time(now - start)
				self.ui.set_time_moves(time_playing, moves)
				self.ui.print_base_screen()
				moving = self.player.move()
				if moving == True:
					print('next move')
					moves += 1
					self.ui.print_board_values()
					r, c = self.b_controller.appear_piece()
					self.ui.new_piece(r, c)
					if self.b_controller.check_winner():
						print('WINNER!')
						self.ui.print_win()
						start_wait = pygame.time.get_ticks()
						ended = True
					elif self.b_controller.check_nomoves():
						print('NO MORE MOVES')
						self.ui.print_loss()
						start_wait = pygame.time.get_ticks()
						ended = True

				elif moving == False:
					running = False


	def ticks2time(self, ticks):
		seconds = int((ticks/1000) % 60)
		minutes = int((ticks/(1000*60)) % 60)
		hours = int((ticks/(1000*60*60)) % 24)

		if hours > 0:
			return "{}:{}:{:02d}".format(hours, minutes, seconds)
		else:
			return "{}:{:02d}".format(minutes, seconds)

	def reestart_game(self):
		self.b_controller.reestart_board()
		self.ui.print_board_values()

		for _ in range(2):
			r, c = self.b_controller.appear_piece()
			self.ui.new_piece(r, c)

		initial_board = np.array([  [1024,1024,512,4],
									[4,0,2,4],
									[2,4,2,4],
									[4,2,4,2]])
		self.b_controller.set_board(initial_board)
		self.ui.print_board_values()


# CustomEnv

In [None]:
class Game2048Env(gym.Env):
	"""
	Custom Environment to play 2048.
	"""
	metadata = {'render.modes': ['human', 'terminal']}
	# Define constants for clearer code
	LEFT = 0
	RIGHT = 1
	UP = 2
	DOWN = 3

	def __init__(self, rows, cols, reward, showing=False):
		super(Game2048Env, self).__init__()
		pygame.init()

		self.rows = rows
		self.cols = cols
		# Size of the 1Dgame grid
		self.grid_size = rows*cols
		self.moves = 0
		self.time = '0:00'

		self.b_controller = BoardController(self.rows, self.cols)

		# PRINTING PARAMETERS ------------------------------------------------------
		self.area = (600, 600)

		self.clock = pygame.time.Clock()
		self.clock.tick(30) # 30 FPS

		if showing:
			self.screen = pygame.display.set_mode(self.area)
			pygame.display.set_caption('2048')

			self.ui = UIBoard(self.b_controller, self.area, pygame, self.screen)
			self.ui.set_time_moves(self.time, self.moves)
		else:
			self.ui = []
		# --------------------------------------------------------------------------

		# Define action and observation space
		n_actions = 4
		self.action_space = spaces.Discrete(n_actions)
		# The observation will be the coordinate of the agent
		# this can be described both by Discrete and Box space
		self.observation_space = spaces.Box(low=0, high=2048,
											shape=(self.rows*self.cols, ), dtype=np.float32)

		self.viewer = None
		self.list_moves = []
		self.list_states = []
		self.type_reward = reward

	def reset(self):
		"""
		Important: the observation must be a numpy array
		:return: (np.array)
		"""
		self.b_controller.reestart_board()

		self.list_moves = []
		self.list_states = []

		self.moves = 0
		self.time = '0:00'
		# self.ui.set_time_moves(self.time, self.moves)
		self.start = pygame.time.get_ticks()
		# self.ui.print_base_screen()

		for _ in range(2):
			r, c = self.b_controller.appear_piece()
			# self.ui.new_piece(r, c)

		obs = np.array(self.b_controller.get_board_values())
		flat_obs = obs.flatten(order='C')
		return flat_obs

	def step(self, action):
		MAX_MOVES = 1000
		if action == self.LEFT:
				movements, merged = self.b_controller.move_left()
				self.list_moves.append('left')
		elif action == self.RIGHT:
				movements, merged = self.b_controller.move_right()
				self.list_moves.append('right')
		elif action == self.UP:
				movements, merged = self.b_controller.move_up()
				self.list_moves.append('up')
		elif action == self.DOWN:
				movements, merged = self.b_controller.move_down()
				self.list_moves.append('down')
		else:
			raise ValueError("Received invalid action={} which is not part of the action space".format(action))

		self.list_states.append(self.b_controller.get_board_values())

		now = pygame.time.get_ticks()
		self.time = self.ticks2time(now - self.start)
		# self.ui.set_time_moves(time_playing, self.moves)

		if movements != 0:
			# self.ui.print_base_screen()
			r, c = self.b_controller.appear_piece()
			# self.ui.new_piece(r, c)

		self.moves += 1

		# Optionally we can pass additional info, we are not using that for now
		info = {'finished':False, 'max_v':0, 'list_moves':self.list_moves, 'list_states':self.list_states, 'stopped':'idk', 'step':self.moves}

		if self.type_reward == 1:
			# REWARD: sum of values in screen - 1 per each move
			# if finished the score multiplied by 2, to try to reduce moves
			reward = self.b_controller.get_score() - 1*self.moves
			if self.b_controller.check_winner():
				reward *= 2
		elif self.type_reward == 2:
			# REWARD: sum of the maximum value on screen plus 1/10 of the sum of all the values
			# if the move is invalid a negative reward, if finished 5k of reward
			reward = 0.1*self.b_controller.get_score() + self.b_controller.get_max_value()
			if movements == 0:
				reward = -100
			if self.b_controller.check_winner():
				reward = 5000
			# print('r1', self.b_controller.get_board_values())
			# print('r2', self.b_controller.get_score(), self.b_controller.get_max_value(), reward)
		elif self.type_reward == 3:
			# REWARD: the value of the new merged values
			reward = merged
		elif self.type_reward == 4:
			# REWARD: the value of the new merged values
			reward = merged
			if movements == 0:
				reward = -100
			else:
				reward += 1

		done = False
		# print(self.moves, self.b_controller.check_winner(), self.b_controller.check_nomoves())
		if self.b_controller.check_winner():
			info['finished'] = True
			info['stopped'] = 'Winner'
			done = True
		if self.b_controller.check_nomoves():
			info['stopped'] = 'No moves'
			done = True
		if self.moves >= MAX_MOVES:
			info['stopped'] = 'Max moves'
			done = True
		if movements == 0:
			info['stopped'] = 'Ilegal move'
			done = True

		if done == True:
			info['max_v'] = self.b_controller.get_max_value()

		obs = np.array(self.b_controller.get_board_values())
		flat_obs = obs.flatten(order='C')
		return flat_obs, reward, done, info

	def get_state(self):
		state = np.fliplr(np.flip(np.rot90(pygame.surfarray.array3d(
			pygame.display.get_surface()).astype(np.uint8))))
		return state

	def set_specific_table(self, table):
		self.b_controller.set_board(table)


	def render(self, mode='None', specific_table = False):

		if mode == 'human':

			self.ui.set_time_moves(self.time, self.moves)
			self.ui.print_base_screen()
			pygame.time.wait(100)

		elif mode == 'terminal':

			b_values = self.b_controller.get_board_values()
			print(np.matrix(b_values))

		elif mode == 'None':
			pass

		else:
			raise NotImplementedError()

	def close(self):
		pass

	def print(self):
		self.ui.print_base_screen()
		pygame.time.wait(100)

	def ticks2time(self, ticks):
		seconds = int((ticks/1000) % 60)
		minutes = int((ticks/(1000*60)) % 60)
		hours = int((ticks/(1000*60*60)) % 24)

		if hours > 0:
			return "{}:{}:{:02d}".format(hours, minutes, seconds)
		else:
			return "{}:{:02d}".format(minutes, seconds)


#Evaluate

In [None]:
from tqdm import tqdm

In [None]:
def evaluate(model, num_episodes=100):
    """
    Evaluate a RL agent
    :param model: (BaseRLModel object) the RL Agent
    :param num_episodes: (int) number of episodes to evaluate it
    :return: (float) Mean reward for the last num_episodes
    """
    # This function will only work for a single Environment
    env = model.get_env()
    all_episode_rewards = []
    max_value = 0
    for i in tqdm(range(num_episodes)):
        episode_rewards = []
        done = False
        obs = env.reset()
        # for j in tqdm(range(2000)):
        while not done:
            # _states are only useful when using LSTM policies
            action, _states = model.predict(obs)
            # here, action, rewards and dones are arrays
            # because we are using vectorized env
            obs, reward, done, info = env.step(action)
            episode_rewards.append(reward)
            # print('Step {}: {} -> {}'.format(info['step'], info['stopped'], done))
            # if done:
            #   break
        
        # print(info, type(info))
          
        if info[0]['max_v']>max_value:
          max_value = info[0]['max_v']

        all_episode_rewards.append(sum(episode_rewards))

    mean_episode_reward = np.mean(all_episode_rewards)
    print("\nMean reward:", mean_episode_reward, "Num episodes:", num_episodes, "Max value", max_value)

    return mean_episode_reward

## DNQ

In [None]:
from stable_baselines import DQN
env = Game2048Env(4, 4, reward=4)

kwargs = {'double_q': True, 'prioritized_replay': True, 'policy_kwargs': dict(dueling=False)}

dqn_model = DQN('MlpPolicy', env, verbose=1, gamma=0.95, learning_rate=0.01, batch_size=128, seed=42 , **kwargs)

In [None]:
mean_reward_before_train = evaluate(dqn_model, num_episodes=50)

100%|██████████| 50/50 [00:00<00:00, 250.76it/s]


Mean reward: -80.16 Num episodes: 50





In [None]:
dqn_model.learn(total_timesteps=20000, log_interval=50)

--------------------------------------
| % time spent exploring  | 70       |
| episodes                | 50       |
| mean 100 episode reward | -48.2    |
| steps                   | 606      |
--------------------------------------
--------------------------------------
| % time spent exploring  | 41       |
| episodes                | 100      |
| mean 100 episode reward | -51.6    |
| steps                   | 1193     |
--------------------------------------
--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 150      |
| mean 100 episode reward | -33.2    |
| steps                   | 2142     |
--------------------------------------
--------------------------------------
| % time spent exploring  | 2        |
| episodes                | 200      |
| mean 100 episode reward | -10.8    |
| steps                   | 3145     |
--------------------------------------
--------------------------------------
| % time spent exploring 

<stable_baselines.deepq.dqn.DQN at 0x7f324ad99b10>

In [None]:
mean_reward = evaluate(dqn_model, num_episodes=50)

100%|██████████| 50/50 [00:00<00:00, 112.21it/s]


Mean reward: -26.64 Num episodes: 50





In [None]:
# Show 1 example
obs = env.reset()
done = False
step = 0
score = 0
while done == False:
  action, _ = dqn_model.predict(obs, deterministic=True)
  obs, reward, done, info = env.step(action)
  print("Step {}: Action ({}) Score ({})".format(step + 1, action, reward))
  env.render('terminal')
  score += reward
  step += 1
  if done:
    break
print('Final score: {}'.format(score))

Step 1: Action (0) Score (0)
[[0 2 0 0]
 [0 0 0 0]
 [2 0 0 0]
 [2 0 0 0]]
Step 2: Action (0) Score (0)
[[2 0 0 0]
 [0 0 2 0]
 [2 0 0 0]
 [2 0 0 0]]
Step 3: Action (0) Score (0)
[[2 0 2 0]
 [2 0 0 0]
 [2 0 0 0]
 [2 0 0 0]]
Step 4: Action (0) Score (4)
[[4 0 0 0]
 [2 0 0 2]
 [2 0 0 0]
 [2 0 0 0]]
Step 5: Action (2) Score (4)
[[4 0 0 2]
 [4 0 0 0]
 [2 0 0 0]
 [0 0 2 0]]
Step 6: Action (1) Score (0)
[[0 2 4 2]
 [0 0 0 4]
 [0 0 0 2]
 [0 0 0 2]]
Step 7: Action (0) Score (0)
[[2 4 2 0]
 [4 0 0 0]
 [2 0 0 2]
 [2 0 0 0]]
Step 8: Action (0) Score (4)
[[2 4 2 0]
 [4 0 0 0]
 [4 0 0 0]
 [2 0 0 2]]
Step 9: Action (0) Score (4)
[[2 4 2 4]
 [4 0 0 0]
 [4 0 0 0]
 [4 0 0 0]]
Step 10: Action (0) Score (-100)
[[2 4 2 4]
 [4 0 0 0]
 [4 0 0 0]
 [4 0 0 0]]
Final score: -84


## PPO2

In [None]:
from stable_baselines.common.policies import MlpPolicy
from stable_baselines import PPO2

# multiprocess environment
env = Game2048Env(4, 4, 4)

ppo_model = PPO2(MlpPolicy, env, verbose=1, seed=42 )

In [None]:
mean_reward_before_train = evaluate(ppo_model, num_episodes=300)

100%|██████████| 300/300 [00:07<00:00, 39.20it/s]


Mean reward: -29.7 Num episodes: 300





In [None]:
ppo_model.learn(total_timesteps=1000000, log_interval=500)
# model.save("ppo2")

--------------------------------------
| approxkl           | 3.7079775e-05 |
| clipfrac           | 0.0           |
| explained_variance | -0.00176      |
| fps                | 218           |
| n_updates          | 1             |
| policy_entropy     | 1.386264      |
| policy_loss        | -0.002562003  |
| serial_timesteps   | 128           |
| time_elapsed       | 2.41e-05      |
| total_timesteps    | 128           |
| value_loss         | 1547.9459     |
--------------------------------------
--------------------------------------
| approxkl           | 8.8040135e-05 |
| clipfrac           | 0.0           |
| explained_variance | 0.128         |
| fps                | 544           |
| n_updates          | 500           |
| policy_entropy     | 0.58504826    |
| policy_loss        | 0.00037613523 |
| serial_timesteps   | 64000         |
| time_elapsed       | 119           |
| total_timesteps    | 64000         |
| value_loss         | 3359.0767     |
-------------------------

<stable_baselines.ppo2.ppo2.PPO2 at 0x7f634e0f5950>

In [None]:
mean_reward = evaluate(ppo_model, num_episodes=300)

100%|██████████| 300/300 [00:27<00:00, 11.03it/s]


Mean reward: 393.32666 Num episodes: 300 Max value 128





In [None]:
# Show 1 example
obs = env.reset()
done = False
step = 0
score = 0
while done == False:
  action, _ = ppo_model.predict(obs, deterministic=True)
  obs, reward, done, info = env.step(action)
  print("Step {}: Action ({}) Score ({})".format(step + 1, action, reward))
  env.render('terminal')
  score += reward
  step += 1
  if done:
    break
print('Final score: {}'.format(score))

Step 1: Action (1) Score (1)
[[0 0 0 2]
 [0 0 0 4]
 [0 0 0 2]
 [0 0 0 0]]
Step 2: Action (0) Score (1)
[[2 0 0 0]
 [4 0 0 0]
 [2 0 2 0]
 [0 0 0 0]]
Step 3: Action (1) Score (5)
[[0 0 0 2]
 [0 0 0 4]
 [0 0 0 4]
 [0 2 0 0]]
Step 4: Action (0) Score (1)
[[2 0 0 0]
 [4 0 0 0]
 [4 0 2 0]
 [2 0 0 0]]
Step 5: Action (1) Score (1)
[[0 4 0 2]
 [0 0 0 4]
 [0 0 4 2]
 [0 0 0 2]]
Step 6: Action (3) Score (13)
[[0 0 0 0]
 [2 0 0 0]
 [0 0 0 2]
 [0 4 4 8]]
Step 7: Action (0) Score (9)
[[0 0 0 0]
 [2 0 2 0]
 [2 0 0 0]
 [8 8 0 0]]
Step 8: Action (2) Score (5)
[[4 8 2 0]
 [8 0 0 0]
 [0 0 0 0]
 [0 2 0 0]]
Step 9: Action (1) Score (1)
[[0 4 8 2]
 [0 0 0 8]
 [0 2 0 0]
 [0 0 0 2]]
Step 10: Action (3) Score (1)
[[0 0 0 0]
 [0 0 0 2]
 [0 4 2 8]
 [0 2 8 2]]
Step 11: Action (0) Score (1)
[[0 0 0 0]
 [2 0 0 2]
 [4 2 8 0]
 [2 8 2 0]]
Step 12: Action (2) Score (1)
[[2 2 8 2]
 [4 8 2 0]
 [2 0 0 0]
 [0 2 0 0]]
Step 13: Action (1) Score (5)
[[0 4 8 2]
 [0 4 8 2]
 [2 0 0 2]
 [0 0 0 2]]
Step 14: Action (3) Score (33)
[[

# Main

###

In [None]:
from stable_baselines.common.env_checker import check_env
env = Game2048Env(4, 4)
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

In [None]:
import os

import gym
import numpy as np
import matplotlib.pyplot as plt

from stable_baselines import DQN, PPO2, A2C, ACKTR, DDPG
from stable_baselines.ddpg.policies import LnMlpPolicy
from stable_baselines import results_plotter
from stable_baselines.bench import Monitor
from stable_baselines.results_plotter import load_results, ts2xy
from stable_baselines.common.noise import AdaptiveParamNoiseSpec
from stable_baselines.common.callbacks import BaseCallback

from stable_baselines.common.evaluation import evaluate_policy
from stable_baselines.common.cmd_util import make_vec_env

class SaveOnBestTrainingRewardCallback(BaseCallback):
    """
    Callback for saving a model (the check is done every ``check_freq`` steps)
    based on the training reward (in practice, we recommend using ``EvalCallback``).

    :param check_freq: (int)
    :param log_dir: (str) Path to the folder where the model will be saved.
      It must contains the file created by the ``Monitor`` wrapper.
    :param verbose: (int)
    """
    def __init__(self, check_freq: int, log_dir: str, verbose=1):
        super(SaveOnBestTrainingRewardCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.log_dir = log_dir
        self.save_path = os.path.join(log_dir, 'best_model')
        self.best_mean_reward = -np.inf

    def _init_callback(self) -> None:
        # Create folder if needed
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:

          # Retrieve training reward
          x, y = ts2xy(load_results(self.log_dir), 'timesteps')
          if len(x) > 0:
              # Mean training reward over the last 100 episodes
              mean_reward = np.mean(y[-1*self.check_freq:])
              if self.verbose > 0:
                print("Num timesteps: {}".format(self.num_timesteps))
                print("Last mean reward per episode: {:.2f}".format( mean_reward))

              # New best model, you could save the agent here
              if mean_reward > self.best_mean_reward:
                  self.best_mean_reward = mean_reward
                  # Example for saving best model
                  if self.verbose > 0:
                    print("Saving new best model to {}".format(self.save_path))
                  self.model.save(self.save_path)

        return True

# Create log dir
log_dir = "tmp/"
os.makedirs(log_dir, exist_ok=True)

# Create and wrap the environment
env = Game2048Env(4, 4)
env = Monitor(env, log_dir)

# Add some param noise for exploration
param_noise = AdaptiveParamNoiseSpec(initial_stddev=0.1, desired_action_stddev=0.1)
# Because we use parameter noise, we should use a MlpPolicy with layer normalization
model = DQN('MlpPolicy', env, learning_rate=0.001)
# Create the callback: check every 1000 steps
callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir, verbose=1)
# Train the agent
time_steps = 1e6
model.learn(total_timesteps=int(time_steps), callback=callback)

results_plotter.plot_results([log_dir], time_steps, results_plotter.X_TIMESTEPS, "DDPG LunarLander")
plt.show()

In [None]:
from stable_baselines import DQN, PPO2, A2C, ACKTR
from stable_baselines.common.evaluation import evaluate_policy
from stable_baselines.common.cmd_util import make_vec_env

scores = []
better_result = {
	'score' : 0,
	'value' : 0,
	'list_moves' : [],
	'list_states' : [],
	'finished' : False,
	'episode': -1
}

env = Game2048Env(4, 4)

# wrap the env
# env = make_vec_env(lambda: env, n_envs=1)

model = DQN('MlpPolicy', env, learning_rate=1e-2)
# model = ACKTR('MlpPolicy', env, verbose=1).learn(5000)
model.learn(total_timesteps=100000)
print('Trained')

Trained


In [None]:
# Test the trained agent
obs = env.reset()
n_steps = 300
done = False
step = 0
while done == False:
# for step in range(n_steps):
  action, _ = dqn_model.predict(obs, deterministic=True)
  obs, reward, done, info = env.step(action)
  print("Step {}: Action ({}) Score ({})".format(step + 1, action, reward))
  # print('obs=', obs, 'reward=', reward, 'done=', done)
  env.render('terminal')
  score += reward
  step += 1
  if done:
    # Note that the VecEnv resets automatically
    # when a done signal is encountered
    print("Goal reached!", "reward=", reward)
    break



# episodes = 5
# for i_episode in range(episodes):
# 	observation = env.reset()
# 	for t in range(1000):
# 		env.render()
# 		action, _ = model.predict(observation, deterministic=True)
# 		# action = env.action_space.sample()
# 		observation, reward, done, info = env.step(action)
#
# 		if done:
# 			print("Episode {} finished after {} movements with score = {} and maximum value = {}".format(i_episode, t+1, reward, info['max_v']))
# 			scores.append(reward)
#
# 			if reward > better_result['score']:
#
# 				if info['max_v'] == 2048:
# 					better_result['finished'] = True
# 				else:
# 					better_result['finished'] = False
#
# 				better_result['score'] = reward
# 				better_result['value'] = info['max_v']
# 				better_result['list_moves'] = info['list_moves']
# 				better_result['list_states'] = info['list_states']
# 				better_result['episode'] = i_episode
#
# 			break
# env.close()
#
# print()
# if better_result['finished'] == True:
# 	print("Episode {} finished the game!!".format(better_result['episode']))
# else:
# 	print("No episode finished the game...")
# 	print("Max score = {} and Max value = {} on Episode {}".format(better_result['score'], better_result['value'], better_result['episode']))
#
# episodes_ind = [x for x in range(episodes)]
#
# plt.axis((0, episodes, 0, better_result['score']))
# plt.plot(episodes_ind, scores, 'k', label='scores')
# plt.legend(loc="upper right")
# plt.show()
#
#
# # at the end show the same steps the model did on the best try
# print('e')
# env = Game2048Env(4, 4, True)
# for state in better_result['list_states']:
# 	env.set_specific_table(state)
# 	# env.print()
# 	env.render('human')
# env.close()


[1;30;43mSe han truncado las últimas 5000 líneas del flujo de salida.[0m
[[2 0 2 0]
 [0 0 2 0]
 [0 0 0 0]
 [0 0 0 0]]
Step 2: Action (2) Score (4)
[[2 0 4 0]
 [0 0 0 0]
 [0 0 2 0]
 [0 0 0 0]]
Step 3: Action (2) Score (0)
[[2 0 4 0]
 [0 0 2 0]
 [0 0 0 0]
 [0 0 0 2]]
Step 4: Action (2) Score (0)
[[2 0 4 2]
 [0 0 2 0]
 [0 0 0 2]
 [0 0 0 0]]
Step 5: Action (2) Score (4)
[[2 0 4 4]
 [0 0 2 0]
 [0 0 0 0]
 [2 0 0 0]]
Step 6: Action (2) Score (4)
[[4 0 4 4]
 [0 0 2 2]
 [0 0 0 0]
 [0 0 0 0]]
Step 7: Action (0) Score (12)
[[8 4 0 0]
 [4 0 0 0]
 [0 0 0 0]
 [0 0 0 2]]
Step 8: Action (1) Score (0)
[[0 0 8 4]
 [0 0 0 4]
 [0 0 0 0]
 [0 0 2 2]]
Step 9: Action (0) Score (4)
[[8 4 0 0]
 [4 0 0 0]
 [0 2 0 0]
 [4 0 0 0]]
Step 10: Action (1) Score (0)
[[0 0 8 4]
 [0 0 0 4]
 [0 4 0 2]
 [0 0 0 4]]
Step 11: Action (0) Score (0)
[[8 4 0 0]
 [4 0 0 0]
 [4 2 4 0]
 [4 0 0 0]]
Step 12: Action (1) Score (0)
[[0 0 8 4]
 [0 0 0 4]
 [0 4 2 4]
 [0 4 0 4]]
Step 13: Action (0) Score (8)
[[8 4 0 0]
 [4 4 0 0]
 [4 2 4 0]