In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6"
from cleanrl import dqn

dqn()

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import random

class TwentyFortyEightEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 4}

    def __init__(self, render_mode=None,log_reward = True):
        super().__init__()
        self.board_size = 4
        self.log_reward = log_reward
        # Actions: 0 = up, 1 = down, 2 = left, 3 = right
        self.action_space = spaces.Discrete(4)

        # Board is 4x4 integers; observation is a flattened vector of size 16
        self.observation_space = spaces.Box(
            low=0,
            high=2**16,
            shape=(16,),
            dtype=np.int32
        )

        self.render_mode = render_mode
        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.board = np.zeros((self.board_size, self.board_size), dtype=np.int32)
        self.score = 0

        self._add_tile()
        self._add_tile()

        return self._get_obs(), {}

    def step(self, action):
        old_board = self.board[:][:]

        if action == 0:
            reward = self._move_up()
        elif action == 1:
            reward = self._move_down()
        elif action == 2:
            reward = self._move_left()
        elif action == 3:
            reward = self._move_right()

        # Invalid move (board unchanged)
        if np.array_equal(self.board, old_board):
            done = True
            reward = -10
        else:
            # Only add a tile after a valid move
            self._add_tile()
            done = not self._moves_available()

        return self._get_obs(), reward, done, False, {"score": self.score}

    # -------- Rendering -------- #

    def render(self):
        print("\nScore:", self.score)
        print("-" * 25)
        for row in self.board:
            print("|" + "|".join(f"{num:^5}" if num != 0 else "     " for num in row) + "|")
            print("-" * 25)

    # -------- Helper Methods -------- #

    def _get_obs(self):
        return self.board.flatten()
    
    def _add_tile(self):
        empty = list(zip(*np.where(self.board == 0)))
        if not empty:
            return
        i, j = random.choice(empty)
        self.board[i, j] = 4 if random.random() < 0.1 else 2

    def _moves_available(self):
        if np.any(self.board == 0):
            return True
        for i in range(4):
            for j in range(4):
                if j < 3 and self.board[i, j] == self.board[i, j + 1]:
                    return True
                if i < 3 and self.board[i, j] == self.board[i + 1, j]:
                    return True
        return False

    # -------- Movement Logic -------- #

    def _compress(self, row):
        new = row[row != 0]
        return np.concatenate([new, np.zeros(4 - len(new), dtype=np.int32)])

    def _merge(self, row):
        score_gain = 0
        for i in range(3):
            if row[i] != 0 and row[i] == row[i + 1]:
                row[i] *= 2
                row[i + 1] = 0
                score_gain += np.log2(row[i]) if self.log_reward else row[i]
        return row, score_gain

    def _move_left(self):
        total_gain = 0
        new_board = np.zeros((self.board_size, self.board_size), dtype=np.int32)

        for i in range(4):
            row = self._compress(self.board[i])
            row, gain = self._merge(row)
            row = self._compress(row)

            new_board[i] = row
            total_gain += gain

        self.board = new_board
        self.score += total_gain
        return float(total_gain)

    def _move_right(self):
        self.board = np.fliplr(self.board)
        reward = self._move_left()
        self.board = np.fliplr(self.board)
        return reward

    def _move_up(self):
        self.board = self.board.T
        reward = self._move_left()
        self.board = self.board.T
        return reward

    def _move_down(self):
        self.board = self.board.T
        reward = self._move_right()
        self.board = self.board.T
        return reward


In [6]:
env = TwentyFortyEightEnv()
env.reset()
env.render()
env.step(0)
env.render()
env.step(0)

env.step(0)
env.render()


Score: 0
-------------------------
|     |     |     |     |
-------------------------
|     |     |     |     |
-------------------------
|  2  |     |     |     |
-------------------------
|     |     |  2  |     |
-------------------------

Score: 0
-------------------------
|  2  |     |  2  |     |
-------------------------
|     |     |     |     |
-------------------------
|  2  |     |     |     |
-------------------------
|     |     |     |     |
-------------------------

Score: 2.0
-------------------------
|  4  |     |  2  |     |
-------------------------
|     |     |  4  |     |
-------------------------
|     |     |     |     |
-------------------------
|     |     |     |  2  |
-------------------------


In [7]:
import torch
import torch.nn as nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class MyCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space, features_dim=16):
        super().__init__(observation_space, features_dim)

        n_input_channels = observation_space.shape[0]   # channels

        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 4, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(4, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Flatten()
        )

        # compute output dim
        with torch.no_grad():
            sample = torch.as_tensor(observation_space.sample()[None]).float()
            n_flatten = self.cnn(sample).shape[1]

        self.linear = nn.Sequential(
            nn.Linear(n_flatten, features_dim),
            nn.ReLU()
        )

    def forward(self, x):
        return self.linear(self.cnn(x))


In [None]:
from sb3_contrib import QRDQN

policy_kwargs = dict(
    net_arch=[16, 16, 16],   # MLP after the CNN
    n_quantiles=51
)

model = QRDQN(
    "MlpPolicy",
    env,
    verbose=2,
    learning_rate=1e-3,
    buffer_size=50_000,
    batch_size=128,
    learning_starts=10_000,
    exploration_fraction=0.01,
    exploration_final_eps=0.005,
    gamma=0.95,
    target_update_interval=250,
    train_freq=1,
    gradient_steps=1,
    policy_kwargs=policy_kwargs
)

model.learn(total_timesteps=10**7)
model.save("qr_dqn_2048")


Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 9.75     |
|    ep_rew_mean      | 0.75     |
|    exploration_rate | 1        |
| time/               |          |
|    episodes         | 4        |
|    fps              | 6358     |
|    time_elapsed     | 0        |
|    total_timesteps  | 39       |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 14.6     |
|    ep_rew_mean      | 9.88     |
|    exploration_rate | 0.999    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 6509     |
|    time_elapsed     | 0        |
|    total_timesteps  | 117      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 16.1     |
|    ep_rew_mean      | 14.4  

KeyboardInterrupt: 

In [None]:
import numpy as np
from sb3_contrib import QRDQN





model = QRDQN.load("qr_dqn_2048")

env = TwentyFortyEightEnv(log_reward=False)
obs, _ = env.reset()

done = False
total_reward = 0

while not done:
    action, _ = model.predict(obs.flatten(), deterministic=True)
    obs, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    total_reward += reward

    print(env.board)
    print("Action:", action)
    print("Reward:", total_reward)
    q = model.quantile_net(torch.tensor(obs.flatten(),device='cuda').unsqueeze(0)) # (1,q,a)
    print(f"Q value is: {q.mean(dim = 1)}")
    x = input("Press Enter for next step...")
    if x == 'X':
        break

print("Final Score:", env.score)


[[2 0 0 0]
 [0 0 0 0]
 [0 4 0 0]
 [0 2 0 0]]
Action: 1
Reward: 0.0
Q value is: tensor([[69.6633, 84.4236, -0.1136, 75.2865]], device='cuda:0',
       grad_fn=<MeanBackward1>)
[[0 0 0 0]
 [0 0 0 0]
 [0 4 0 0]
 [2 2 0 2]]
Action: 1
Reward: 0.0
Q value is: tensor([[80.0549, 63.5140, 11.9493, 81.7994]], device='cuda:0',
       grad_fn=<MeanBackward1>)
[[0 0 0 0]
 [0 0 0 2]
 [0 0 0 4]
 [0 0 2 4]]
Action: 3
Reward: 4.0
Q value is: tensor([[75.1566, 78.2515, 89.9696, 72.7947]], device='cuda:0',
       grad_fn=<MeanBackward1>)
[[0 0 0 0]
 [2 0 0 0]
 [4 0 2 0]
 [2 4 0 0]]
Action: 2
Reward: 4.0
Q value is: tensor([[  74.1352,   35.2699, -140.6964,   92.5827]], device='cuda:0',
       grad_fn=<MeanBackward1>)
[[0 0 0 0]
 [2 0 0 2]
 [0 0 4 2]
 [0 0 2 4]]
Action: 3
Reward: 4.0
Q value is: tensor([[85.7062, 84.8194, 93.6838, 78.5084]], device='cuda:0',
       grad_fn=<MeanBackward1>)
[[0 0 0 0]
 [4 0 0 0]
 [4 2 0 2]
 [2 4 0 0]]
Action: 2
Reward: 8.0
Q value is: tensor([[  80.8732, -143.4408, -108.40