In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm
import gymnasium as gym
import gymnasium_2048
from rdqn_r2d2 import RDQNR2D2Agent

# yo let's make sure we get consistent results
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)


In [None]:
# let's get this party started
env = gym.make("gymnasium_2048/TwentyFortyEight-v0")

# helper function to decode the board state
def decode_board(obs):
    if obs.ndim == 3:
        idxs = np.argmax(obs, axis=-1)
        mask = (obs.sum(axis=-1) == 1)
        return (2 ** idxs) * mask
    return obs

# preprocess function to convert board to our format
def preprocess(obs):
    board = decode_board(obs).astype(int)
    idxs = np.zeros_like(board, dtype=int)
    nonzero = board > 0
    idxs[nonzero] = np.log2(board[nonzero]).astype(int)
    return idxs.flatten()

# initialize our awesome agent
agent = RDQNR2D2Agent(
    state_dim=16,  # 4x4 board
    action_dim=4,  # up, down, left, right - the classics
    gamma=0.99,    # gotta think about the future
    n_step=5,      # look ahead a bit
    sequence_length=20,  # remember the past
    burn_in_length=5,   # warm up that lstm
    learning_rate=0.0001,  # slow and steady wins the race
    target_update_freq=1000,  # keep that target net fresh
    device="cuda" if torch.cuda.is_available() else "cpu"  # use that gpu if ya got it
)

# training settings - feel free to tweak these
num_episodes = 10000
batch_size = 32
epsilon_start = 1.0   # start exploring a lot
epsilon_end = 0.01    # end with mostly exploitation
epsilon_decay = 0.995  # smooth decay
max_steps_per_episode = 2000  # don't let episodes run forever

# keep track of how we're doing
episode_rewards = []
max_tiles = []
episode_lengths = []


In [None]:
# alright, let's train this bad boy
epsilon = epsilon_start
hidden = None  # start with fresh lstm state

for episode in tqdm(range(num_episodes)):
    state, _ = env.reset()  # gymnasium style
    state = preprocess(state)  # convert to our format
    episode_reward = 0
    agent.current_sequence = []  # fresh sequence for each episode
    
    for step in range(max_steps_per_episode):
        # pick an action
        action, new_hidden = agent.select_action(state, hidden, epsilon)
        hidden = new_hidden
        
        # take that action and see what happens
        next_state, reward, done, truncated, info = env.step(action)  # gymnasium style
        next_state = preprocess(next_state)
        done = done or truncated  # combine terminal conditions
        
        # add to our sequence memory
        agent.current_sequence.append((state, action, reward, next_state, done))
        
        # if sequence is full or episode ends, save it
        if len(agent.current_sequence) == agent.sequence_length or done:
            agent.replay_buffer.push(agent.current_sequence)
            agent.current_sequence = []
            hidden = None  # reset lstm state for new sequence
        
        # keep track of rewards
        episode_reward += reward
        state = next_state
        
        # learn from experience if we have enough
        if len(agent.replay_buffer.buffer) > batch_size:
            loss = agent.update(batch_size)
        
        if done:
            break
    
    # track our progress
    episode_rewards.append(episode_reward)
    max_tiles.append(np.max(decode_board(env.get_board())))  # get max tile from board
    episode_lengths.append(step + 1)
    
    # reduce exploration over time
    epsilon = max(epsilon_end, epsilon * epsilon_decay)
    
    # brag about our progress every 100 episodes
    if (episode + 1) % 100 == 0:
        avg_reward = np.mean(episode_rewards[-100:])
        avg_max_tile = np.mean(max_tiles[-100:])
        avg_length = np.mean(episode_lengths[-100:])
        print(f"\nEpisode {episode + 1} - how're we doing?")
        print(f"Average Reward (last 100): {avg_reward:.2f}")
        print(f"Average Max Tile (last 100): {avg_max_tile:.2f}")
        print(f"Average Episode Length (last 100): {avg_length:.2f}")
        print(f"Current Epsilon: {epsilon:.3f} (still exploring!)")


In [None]:
# let's see how we did with some pretty plots
plt.figure(figsize=(15, 5))

# show those sweet rewards
plt.subplot(131)
plt.plot(episode_rewards, alpha=0.6)
plt.plot(np.convolve(episode_rewards, np.ones(100)/100, mode='valid'), 
         label='100-ep avg', color='red')
plt.title('rewards over time')
plt.xlabel('episode')
plt.ylabel('total reward')
plt.legend()

# check out our max tiles
plt.subplot(132)
plt.plot(max_tiles, alpha=0.6)
plt.plot(np.convolve(max_tiles, np.ones(100)/100, mode='valid'),
         label='100-ep avg', color='red')
plt.title('highest tile reached')
plt.xlabel('episode')
plt.ylabel('max tile value')
plt.legend()

# how long did episodes last?
plt.subplot(133)
plt.plot(episode_lengths, alpha=0.6)
plt.plot(np.convolve(episode_lengths, np.ones(100)/100, mode='valid'),
         label='100-ep avg', color='red')
plt.title('episode length')
plt.xlabel('episode')
plt.ylabel('steps')
plt.legend()

plt.tight_layout()
plt.show()

# save our hard work for later
print("\nsaving the model... ", end="")
torch.save({
    'online_net_state_dict': agent.online_net.state_dict(),
    'target_net_state_dict': agent.target_net.state_dict(),
    'optimizer_state_dict': agent.optimizer.state_dict(),
    'training_history': {
        'rewards': episode_rewards,
        'max_tiles': max_tiles,
        'lengths': episode_lengths
    }
}, 'rdqn_r2d2_model.pth')
print("done! 🎉")


In [None]:
# let's see how well our agent plays
def evaluate_agent(agent, env, num_episodes=10, render=False):
    total_rewards = []
    max_tiles = []
    episode_lengths = []
    hidden = None
    
    for episode in range(num_episodes):
        state, _ = env.reset()  # gymnasium style
        state = preprocess(state)  # convert to our format
        episode_reward = 0
        steps = 0
        
        while True:
            if render:
                env.render()
            
            # let the agent do its thing (no random actions)
            action, new_hidden = agent.select_action(state, hidden, epsilon=0.0)
            hidden = new_hidden
            
            # take the action
            next_state, reward, done, truncated, info = env.step(action)  # gymnasium style
            next_state = preprocess(next_state)
            done = done or truncated
            
            episode_reward += reward
            steps += 1
            
            if done:
                break
                
            state = next_state
        
        total_rewards.append(episode_reward)
        max_tiles.append(np.max(decode_board(env.get_board())))
        episode_lengths.append(steps)
    
    return {
        'avg_reward': np.mean(total_rewards),
        'avg_max_tile': np.mean(max_tiles),
        'avg_length': np.mean(episode_lengths),
        'max_tile_achieved': max(max_tiles)
    }

# time for the moment of truth!
print("\nrunning some test games...")
eval_results = evaluate_agent(agent, env, num_episodes=10, render=False)
print("\nhow'd we do? 🤔")
print(f"Average Reward: {eval_results['avg_reward']:.2f}")
print(f"Average Max Tile: {eval_results['avg_max_tile']:.2f}")
print(f"Average Episode Length: {eval_results['avg_length']:.2f}")
print(f"Highest Tile Reached: {eval_results['max_tile_achieved']} 🎮")
