In [13]:
import numpy as np
from game2048.board2048 import Board2048
from stable_baselines3 import PPO
import pygame

In [14]:
import pygame
from game2048.board2048 import Board2048


class Board2048Renderer(Board2048):
    def __init__(self, cell_size=100, margin=5, font_size=36, rendering_mode=None):
        super().__init__()
        self.cell_size = cell_size
        self.margin = margin
        self.width = 4 * (cell_size + margin) + margin
        self.height = self.width
        self.colors = self._generate_colors()
        self.rendering_mode = rendering_mode

        if self.rendering_mode == 'human':
            pygame.init()
            self.screen = pygame.display.set_mode((self.width, self.height))
            pygame.display.set_caption("2048")
            self.font = pygame.font.Font(None, font_size)

    def _generate_colors(self):
        """Generate a dictionary of colors for different tile values."""
        colors = {
            0: (205, 193, 180),  # Empty cell
            2: (238, 228, 218),
            4: (237, 224, 200),
            8: (242, 177, 121),
            16: (245, 149, 99),
            32: (246, 124, 95),
            64: (246, 94, 59),
            128: (237, 207, 114),
            256: (237, 204, 97),
            512: (237, 200, 80),
            1024: (237, 197, 63),
            2048: (237, 194, 46),
        }
        return colors

    def render(self):
        """Render the board using pygame."""
        if self.rendering_mode != 'human':
            return
        self.screen.fill((187, 173, 160))  # Background color
        for row in range(4):
            for col in range(4):
                value = self.board[row, col]
                # Default color for large values
                color = self.colors.get(value, (60, 58, 50))
                rect = pygame.Rect(
                    col * (self.cell_size + self.margin) + self.margin,
                    row * (self.cell_size + self.margin) + self.margin,
                    self.cell_size,
                    self.cell_size
                )
                pygame.draw.rect(self.screen, color, rect)
                if value != 0:
                    text_surface = self.font.render(
                        str(value), True, (119, 110, 101))
                    text_rect = text_surface.get_rect(center=rect.center)
                    self.screen.blit(text_surface, text_rect)
        pygame.display.flip()

    def close(self):
        """Close the pygame window."""
        pygame.quit()

In [15]:
import torch
import torch.nn as nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from gymnasium.spaces import Box


class CnnModel(BaseFeaturesExtractor):
    def __init__(self, observation_space: Box, features_dim: int = 128):
        super(CnnModel, self).__init__(observation_space, features_dim)
        
        # Define the CNN layers
        self.conv1 = nn.Conv2d(16, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(self.conv1.out_channels, 16, kernel_size=3, stride=1, padding=0)
        
        # Update the input size for fc1 based on the output of conv2
        self.fc1 = nn.Linear(self.conv2.out_channels * 2 * 2, features_dim)

    def forward(self, x):
        x = x.permute(0, 3, 1, 2)  # Change shape from (batch_size, 4, 4, 16) to (batch_size, 16, 4, 4)
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = x.flatten(start_dim=1)
        x = torch.relu(self.fc1(x))
        return x
    
from stable_baselines3.common.policies import ActorCriticCnnPolicy

class CustomCnnPolicy(ActorCriticCnnPolicy):
    def __init__(self, *args, **kwargs):
        super(CustomCnnPolicy, self).__init__(*args, **kwargs, features_extractor_class=CnnModel)

In [16]:
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.monitor import Monitor



game = Board2048Renderer()
# games = SubprocVecEnv([lambda: Monitor(Board2048Renderer()) for _ in range(4)])

model = PPO(
    policy = CustomCnnPolicy, 
    env = game, 
    verbose=1, 
    device='cuda', 
    tensorboard_log="./ppo_2048_tensorboard/",
    learning_rate=3e-5,
    n_steps=2048,
    batch_size=64,
    n_epochs=10,
    gamma=0.99,
)
model = model.learn(total_timesteps=10_000)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ./ppo_2048_tensorboard/PPO_4


---------------------------------
| rollout/           |          |
|    ep_len_mean     | 135      |
|    ep_rew_mean     | 1.01e+03 |
| time/              |          |
|    fps             | 553      |
|    iterations      | 1        |
|    time_elapsed    | 7        |
|    total_timesteps | 4096     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 146          |
|    ep_rew_mean          | 1.15e+03     |
| time/                   |              |
|    fps                  | 344          |
|    iterations           | 2            |
|    time_elapsed         | 23           |
|    total_timesteps      | 8192         |
| train/                  |              |
|    approx_kl            | 0.0044776476 |
|    clip_fraction        | 0            |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.38        |
|    explained_variance   | 1.7e-05      |
|    learning_r

In [17]:
def main_loop(b: Board2048, direction: int):
    new_board = b.move(direction)
    moved = False
    if (new_board == b.board).all():
        # move is invalid
        pass
    else:
        moved = True
        b.board = new_board
        b.fill_cell()
    return moved


scores = []
repeat = 20
for _ in range(repeat):
  sim_game = Board2048Renderer(rendering_mode=None)
  sim_finish = False
  while not sim_finish:
    sim_direction, _ = model.predict(sim_game.get_obs(), deterministic=True)
    sim_moved = main_loop(sim_game, sim_direction)
    while not sim_moved:
      sim_moved = main_loop(sim_game, np.random.randint(0, 4))
    sim_finish = sim_game.is_game_over()
  scores.append(sim_game.total_score)
mean_score = np.mean(scores)
print(f"Mean Score after {repeat} simulations:", mean_score)
   
   


game = Board2048Renderer(rendering_mode='human')
finish = False
while not finish:
  direction, _ = model.predict(game.get_obs(), deterministic=True)
  # print(game.get_obs())
  # print(direction)
  moved = main_loop(game, direction)
  while not moved:
    moved = main_loop(game, np.random.randint(0, 4))
    # print("Random move:", moved)
  # print(game.get_obs())
  # print(game.total_score)
  game.render()
  finish = game.is_game_over()
  pygame.time.wait(50)
print("Game Over!, Total Score is {}".format(game.total_score))
game.close()

Mean Score after 20 simulations: 1025.8
Game Over!, Total Score is 1624


In [18]:
# from stable_baselines3 import PPO
# import numpy as np

# # Define the parameter grid
# params = {
#     'learning_rate': [3e-3, 3e-4, 3e-5],
#     'n_steps': [1024, 2048, 4096],
#     'batch_size': [32, 64, 128],
#     'n_epochs': [5, 10, 20],
#     'gamma': [0.95, 0.99, 0.999],
# }

# # Function to test a single parameter


# def test_param(param_name, param_values, fixed_params, total_timesteps=50_000):
#     results = {}
#     for value in param_values:
#         print(f"Testing {param_name}={value}")
#         fixed_params[param_name] = value

#         # Create the environment
#         env = Board2048Renderer(rendering_mode=None)

#         # Create the model with the current parameter value
#         model = PPO(
#             policy="MlpPolicy",
#             env=env,
#             verbose=0,
#             device='cpu',
#             tensorboard_log=f"./ppo_2048_tensorboard/{param_name}_{value}/",
#             **fixed_params
#         )

#         # Train the model
#         model.learn(total_timesteps=total_timesteps)


# # Fixed parameters (default values for other parameters)
# fixed_params = {
#     'learning_rate': 3e-4,
#     'n_steps': 2048,
#     'batch_size': 64,
#     'n_epochs': 10,
#     'gamma': 0.99,
# }

# # Test learning rates
# learning_rate_results = test_param(
#     'learning_rate', params['learning_rate'], fixed_params)

# # Test n_steps
# n_steps_results = test_param('n_steps', params['n_steps'], fixed_params)

# # Test batch_size
# batch_size_results = test_param(
#     'batch_size', params['batch_size'], fixed_params)

# # Print results
# print("Learning Rate Results:", learning_rate_results)
# print("n_steps Results:", n_steps_results)
# print("Batch Size Results:", batch_size_results)