# Reinforcement Learning Agent for 2048 Game

This notebook implements a Deep Q-Learning (DQN) agent to play the 2048 game. The agent learns to maximize its score by making intelligent moves based on the current game state.

## 1. Import Required Libraries

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
import random
import matplotlib.pyplot as plt
import pandas as pd
from collections import deque
import os
import copy
import time

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)
random.seed(42)

ModuleNotFoundError: No module named 'tensorflow'

## 2. Implement 2048 Game Environment

First, we need to create a game environment that the agent can interact with. This is an adaptation of the existing 2048 game that's been optimized for RL.

In [None]:
class Game2048Env:
    """2048 game environment adapted for reinforcement learning"""
    
    # Action mappings
    ACTIONS = {
        0: 'left',
        1: 'right',
        2: 'up',
        3: 'down'
    }
    
    def __init__(self):
        """Initialize the game environment"""
        self.size = 4
        self.grid = None
        self.score = 0
        self.done = False
        self.reset()
    
    def reset(self):
        """Reset the game to initial state"""
        self.grid = np.zeros((self.size, self.size), dtype=int)
        self.score = 0
        self.done = False
        
        # Add two initial tiles
        self._add_random_tile()
        self._add_random_tile()
        
        return self._get_state()
    
    def _add_random_tile(self):
        """Add a random tile (2 or 4) to an empty cell"""
        if not self._has_empty_cells():
            return False
        
        # Find empty cells
        empty_cells = [(i, j) for i in range(self.size) for j in range(self.size) if self.grid[i, j] == 0]
        
        # Choose a random empty cell
        i, j = random.choice(empty_cells)
        
        # Add a 2 (90% chance) or 4 (10% chance)
        self.grid[i, j] = 2 if random.random() < 0.9 else 4
        
        return True
    
    def _has_empty_cells(self):
        """Check if there are any empty cells"""
        return 0 in self.grid
    
    def _compress_row(self, row):
        """Move all non-zero elements to the left"""
        # Remove zeros
        new_row = np.array([x for x in row if x != 0])
        # Add zeros at the end
        new_row = np.append(new_row, np.zeros(self.size - len(new_row), dtype=int))
        return new_row
    
    def _merge_row(self, row):
        """Merge tiles of the same value in a row"""
        score_increase = 0
        
        # Iterate through the list from left to right
        i = 0
        while i < self.size - 1:
            # If current and next element are the same and not zero
            if row[i] == row[i + 1] and row[i] != 0:
                row[i] *= 2
                score_increase += row[i]
                row[i + 1] = 0
                i += 2
            else:
                i += 1
                
        return row, score_increase
    
    def _move_left(self):
        """Move all tiles to the left and merge if possible"""
        score_increase = 0
        changed = False
        
        for i in range(self.size):
            original_row = self.grid[i, :].copy()
            
            # Compress (move non-zero elements to the left)
            row = self._compress_row(original_row)
            
            # Merge
            row, score = self._merge_row(row)
            score_increase += score
            
            # Compress again after merging
            row = self._compress_row(row)
            
            # Update grid
            if not np.array_equal(original_row, row):
                changed = True
                self.grid[i, :] = row
        
        return changed, score_increase
    
    def _move_right(self):
        """Move all tiles to the right and merge if possible"""
        score_increase = 0
        changed = False
        
        for i in range(self.size):
            original_row = self.grid[i, :].copy()
            
            # Reverse the row
            row = original_row[::-1]
            
            # Compress (move non-zero elements to the left)
            row = self._compress_row(row)
            
            # Merge
            row, score = self._merge_row(row)
            score_increase += score
            
            # Compress again after merging
            row = self._compress_row(row)
            
            # Reverse back
            row = row[::-1]
            
            # Update grid
            if not np.array_equal(original_row, row):
                changed = True
                self.grid[i, :] = row
        
        return changed, score_increase
    
    def _move_up(self):
        """Move all tiles up and merge if possible"""
        score_increase = 0
        changed = False
        
        # Transpose the grid
        self.grid = self.grid.T
        
        # Apply left move logic to each row (which are now columns)
        changed, score_increase = self._move_left()
        
        # Transpose back
        self.grid = self.grid.T
        
        return changed, score_increase
    
    def _move_down(self):
        """Move all tiles down and merge if possible"""
        score_increase = 0
        changed = False
        
        # Transpose the grid
        self.grid = self.grid.T
        
        # Apply right move logic to each row (which are now columns)
        changed, score_increase = self._move_right()
        
        # Transpose back
        self.grid = self.grid.T
        
        return changed, score_increase
    
    def _get_max_tile(self):
        """Get the value of the highest tile on the board"""
        return np.max(self.grid)
    
    def _get_state(self):
        """Return the current state of the game"""
        return self.grid.copy()
    
    def _is_game_over(self):
        """Check if the game is over (no empty cells and no possible merges)"""
        # Check for empty cells
        if self._has_empty_cells():
            return False
        
        # Check for possible merges horizontally
        for i in range(self.size):
            for j in range(self.size - 1):
                if self.grid[i, j] == self.grid[i, j + 1]:
                    return False
        
        # Check for possible merges vertically
        for i in range(self.size - 1):
            for j in range(self.size):
                if self.grid[i, j] == self.grid[i + 1, j]:
                    return False
        
        return True
    
    def step(self, action):
        """Take an action (0: left, 1: right, 2: up, 3: down) and return new state, reward, done"""
        if self.done:
            # If game is already over, return current state with zero reward
            return self._get_state(), 0, True, {}
        
        prev_score = self.score
        prev_max_tile = self._get_max_tile()
        moved = False
        
        # Execute the move
        if action == 0:  # left
            moved, score_increase = self._move_left()
        elif action == 1:  # right
            moved, score_increase = self._move_right()
        elif action == 2:  # up
            moved, score_increase = self._move_up()
        elif action == 3:  # down
            moved, score_increase = self._move_down()
        else:
            raise ValueError(f"Invalid action: {action}")
        
        # Update score if the move was valid
        if moved:
            self.score += score_increase
            self._add_random_tile()
        
        # Check if game is over
        self.done = self._is_game_over()
        
        # Calculate reward
        current_max_tile = self._get_max_tile()
        
        # Three components to the reward:
        # 1. Points gained from merging tiles
        # 2. Bonus for creating a new highest tile
        # 3. Penalty for invalid moves
        
        # Base reward is the score increase
        reward = score_increase
        
        # Bonus for new max tile
        if current_max_tile > prev_max_tile:
            reward += current_max_tile  # Bonus equal to the value of the new max tile
        
        # Penalty for invalid moves
        if not moved:
            reward -= 10  # Small penalty for trying an invalid move
        
        # Penalty for losing the game
        if self.done:
            reward -= 50  # Larger penalty for ending the game
        
        # Return state, reward, done, and info dictionary
        info = {
            'score': self.score,
            'max_tile': current_max_tile,
            'moved': moved
        }
        
        return self._get_state(), reward, self.done, info
    
    def render(self):
        """Display the current state of the game"""
        # Convert 0s to empty strings for cleaner display
        display_grid = np.where(self.grid > 0, self.grid, '')
        
        # Print the grid
        print(f"Score: {self.score}")
        print("+------+------+------+------+")
        for i in range(self.size):
            row_str = "|"                    
            for j in range(self.size):
                cell = display_grid[i, j]
                if cell == '':
                    row_str += "      |"
                else:
                    row_str += f" {cell:4d} |"
            print(row_str)
            print("+------+------+------+------+")
        
        if self.done:
            print("Game Over!")

Let's test our game environment to make sure it works correctly.

In [None]:
# Test the game environment with random moves
env = Game2048Env()
env.reset()
env.render()

# Make some random moves
for _ in range(10):
    action = random.randint(0, 3)
    print(f"\nAction: {Game2048Env.ACTIONS[action]}")
    state, reward, done, info = env.step(action)
    env.render()
    print(f"Reward: {reward}, Done: {done}, Score: {info['score']}")
    
    if done:
        break

## 3. Implement Deep Q-Learning Agent

Now we'll implement the Deep Q-Learning agent that will learn to play 2048 efficiently.

In [None]:
class ReplayBuffer:
    """Experience replay buffer to store and sample agent experiences"""
    
    def __init__(self, max_size=100000):
        self.buffer = deque(maxlen=max_size)
    
    def add(self, state, action, reward, next_state, done):
        """Add a new experience to the buffer"""
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        """Sample a batch of experiences from the buffer"""
        experiences = random.sample(self.buffer, min(batch_size, len(self.buffer)))
        
        states = np.array([exp[0] for exp in experiences])
        actions = np.array([exp[1] for exp in experiences])
        rewards = np.array([exp[2] for exp in experiences])
        next_states = np.array([exp[3] for exp in experiences])
        dones = np.array([exp[4] for exp in experiences])
        
        return states, actions, rewards, next_states, dones
    
    def size(self):
        """Return the current size of the buffer"""
        return len(self.buffer)

In [None]:
def preprocess_state(state):
    """Preprocess the game state for neural network input"""
    # Normalize the state by taking log2 of non-zero values (since tiles are powers of 2)
    # This helps the neural network learn more efficiently
    processed = np.zeros_like(state, dtype=np.float32)
    
    # For each non-zero tile, take log2 and normalize by dividing by 16 (max tile is usually 2^15 = 32768)
    non_zero_mask = state > 0
    processed[non_zero_mask] = np.log2(state[non_zero_mask]) / 16.0
    
    # Reshape to (1, 4, 4, 1) for CNN input or (1, 16) for Dense layers
    return processed.reshape(1, -1)

In [None]:
def create_dqn_model(input_shape, num_actions):
    """Create a deep Q-network model"""
    model = models.Sequential([
        layers.Dense(256, activation='relu', input_shape=input_shape),
        layers.Dense(256, activation='relu'),
        layers.Dense(128, activation='relu'),
        layers.Dense(num_actions, activation='linear')
    ])
    
    model.compile(optimizer=optimizers.Adam(learning_rate=0.0001), loss='mse')
    return model

In [None]:
class DQNAgent:
    """Deep Q-Learning agent for playing 2048"""
    
    def __init__(self, state_shape, num_actions, gamma=0.99, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.9995, batch_size=64):
        # Environment parameters
        self.state_shape = (state_shape,)  # Input shape for the model (flattened grid)
        self.num_actions = num_actions  # Number of possible actions
        
        # Hyperparameters
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilon  # Exploration rate
        self.epsilon_min = epsilon_min  # Minimum exploration rate
        self.epsilon_decay = epsilon_decay  # Decay rate for exploration
        self.batch_size = batch_size  # Size of batches to sample from replay buffer
        
        # Create primary and target networks
        self.primary_network = create_dqn_model(self.state_shape, self.num_actions)
        self.target_network = create_dqn_model(self.state_shape, self.num_actions)
        self.update_target_network()
        
        # Create replay buffer
        self.replay_buffer = ReplayBuffer()
        
        # Metrics for tracking performance
        self.loss_history = []
        self.reward_history = []
        self.max_tile_history = []
        self.epsilon_history = []
    
    def update_target_network(self):
        """Copy weights from primary network to target network"""
        self.target_network.set_weights(self.primary_network.get_weights())
    
    def get_action(self, state):
        """Choose an action using epsilon-greedy policy"""
        if np.random.rand() < self.epsilon:
            # Explore: choose a random action
            return np.random.randint(self.num_actions)
        else:
            # Exploit: choose the best action according to the policy
            q_values = self.primary_network.predict(preprocess_state(state), verbose=0)
            return np.argmax(q_values[0])
    
    def update_replay_buffer(self, state, action, reward, next_state, done):
        """Add a new experience to the replay buffer"""
        self.replay_buffer.add(state, action, reward, next_state, done)
    
    def train(self):
        """Train the agent using experiences from the replay buffer"""
        # Check if we have enough experiences to train
        if self.replay_buffer.size() < self.batch_size:
            return 0
        
        # Sample a batch of experiences
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
        
        # Preprocess states and next_states
        processed_states = np.vstack([preprocess_state(state) for state in states])
        processed_next_states = np.vstack([preprocess_state(next_state) for next_state in next_states])
        
        # Get the current Q values from the primary network
        current_q_values = self.primary_network.predict(processed_states, verbose=0)
        
        # Get the next Q values from the target network
        next_q_values = self.target_network.predict(processed_next_states, verbose=0)
        
        # Initialize the target Q values as the current Q values (we'll update only the chosen actions)
        target_q_values = current_q_values.copy()
        
        # Update the Q values for the actions taken
        for i in range(len(actions)):
            if dones[i]:
                # If the episode ended, there is no next Q value
                target_q_values[i, actions[i]] = rewards[i]
            else:
                # Otherwise, use the Bellman equation to compute the target Q value
                target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
        
        # Train the primary network
        loss = self.primary_network.train_on_batch(processed_states, target_q_values)
        
        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        
        return loss
    
    def save_model(self, filepath):
        """Save the model weights"""
        self.primary_network.save_weights(filepath)
    
    def load_model(self, filepath):
        """Load the model weights"""
        self.primary_network.load_weights(filepath)
        self.update_target_network()

## 4. Training the Agent

Now we'll train our DQN agent to play 2048.

In [None]:
def train_agent(agent, env, num_episodes=10000, target_update_freq=10, render_freq=1000, save_freq=100):
    """Train the agent for a specified number of episodes"""
    max_score = 0
    max_tile = 0
    episode_scores = []
    episode_max_tiles = []
    
    # Set up directories for saving models
    os.makedirs('models', exist_ok=True)
    
    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        done = False
        steps = 0
        
        while not done:
            # Choose an action
            action = agent.get_action(state)
            
            # Take a step in the environment
            next_state, reward, done, info = env.step(action)
            
            # Store the experience in replay buffer
            agent.update_replay_buffer(state, action, reward, next_state, done)
            
            # Train the agent
            loss = agent.train()
            if loss > 0:
                agent.loss_history.append(loss)
            
            # Move to the next state
            state = next_state
            episode_reward += reward
            steps += 1
            
            # Update the target network periodically
            if steps % target_update_freq == 0:
                agent.update_target_network()
        
        # Store episode results
        episode_scores.append(info['score'])
        episode_max_tiles.append(info['max_tile'])
        
        # Update max score and max tile
        if info['score'] > max_score:
            max_score = info['score']
        if info['max_tile'] > max_tile:
            max_tile = info['max_tile']
        
        # Track epsilon
        agent.epsilon_history.append(agent.epsilon)
        
        # Render occasionally to see progress
        if (episode + 1) % render_freq == 0 or (episode + 1) == num_episodes:
            print(f"\nEpisode {episode + 1}/{num_episodes}")
            print(f"Score: {info['score']}, Max Tile: {info['max_tile']}, Epsilon: {agent.epsilon:.4f}")
            print(f"Max Score so far: {max_score}, Max Tile so far: {max_tile}")
            env.render()
        
        # Save the model periodically
        if (episode + 1) % save_freq == 0:
            agent.save_model(f"models/dqn_2048_episode_{episode + 1}.h5")
    
    # Save final model
    agent.save_model("models/dqn_2048_final.h5")
    
    return episode_scores, episode_max_tiles

In [None]:
# Initialize environment and agent
env = Game2048Env()
state_shape = 16  # 4x4 grid flattened
num_actions = 4   # left, right, up, down

# Create agent with specified hyperparameters
agent = DQNAgent(
    state_shape=state_shape,
    num_actions=num_actions,
    gamma=0.99,          # Discount factor
    epsilon=1.0,         # Initial exploration rate
    epsilon_min=0.01,    # Minimum exploration rate
    epsilon_decay=0.9995, # Decay rate for exploration
    batch_size=64        # Batch size for training
)

# Train the agent
# Reduce num_episodes for faster execution (e.g., 1000 for testing)
print("Starting training...")
episode_scores, episode_max_tiles = train_agent(
    agent=agent,
    env=env,
    num_episodes=5000,    # Number of episodes to train for
    target_update_freq=10, # How often to update the target network
    render_freq=500,      # How often to render the game
    save_freq=1000        # How often to save the model
)
print("Training complete!")

## 5. Visualizing Agent Performance

Let's visualize how our agent performed during training.

In [None]:
def plot_training_results(agent, episode_scores, episode_max_tiles):
    """Plot the training results"""
    # Create a figure with multiple subplots
    fig, axs = plt.subplots(2, 2, figsize=(15, 10))
    
    # Plot the score over episodes
    axs[0, 0].plot(episode_scores)
    axs[0, 0].set_title('Score per Episode')
    axs[0, 0].set_xlabel('Episode')
    axs[0, 0].set_ylabel('Score')
    
    # Plot the max tile over episodes
    axs[0, 1].plot(episode_max_tiles)
    axs[0, 1].set_title('Max Tile per Episode')
    axs[0, 1].set_xlabel('Episode')
    axs[0, 1].set_ylabel('Max Tile')
    
    # Plot the loss over training steps
    if len(agent.loss_history) > 0:
        axs[1, 0].plot(agent.loss_history)
        axs[1, 0].set_title('Loss over Training Steps')
        axs[1, 0].set_xlabel('Training Step')
        axs[1, 0].set_ylabel('Loss')
    
    # Plot the epsilon over episodes
    axs[1, 1].plot(agent.epsilon_history)
    axs[1, 1].set_title('Epsilon over Episodes')
    axs[1, 1].set_xlabel('Episode')
    axs[1, 1].set_ylabel('Epsilon')
    
    # Calculate moving averages for smoother plots
    window_size = min(100, len(episode_scores))
    if window_size > 0 and len(episode_scores) > window_size:
        scores_avg = np.convolve(episode_scores, np.ones(window_size)/window_size, mode='valid')
        axs[0, 0].plot(range(window_size-1, len(scores_avg) + window_size-1), scores_avg, 'r-', label=f'{window_size}-episode avg')
        axs[0, 0].legend()
    
    plt.tight_layout()
    plt.show()
    
    # Print some statistics
    print(f"Training Statistics:")
    print(f"Number of Episodes: {len(episode_scores)}")
    print(f"Final Epsilon: {agent.epsilon:.4f}")
    print(f"Max Score: {max(episode_scores)}")
    print(f"Max Tile: {max(episode_max_tiles)}")
    print(f"Average Score (last 100 episodes): {np.mean(episode_scores[-100:]):.2f}")

In [None]:
# Plot the training results
plot_training_results(agent, episode_scores, episode_max_tiles)

## 6. Evaluating the Trained Agent

Now let's evaluate our trained agent by having it play a few games without exploration.

In [None]:
def evaluate_agent(agent, env, num_episodes=10, render=True):
    """Evaluate the agent's performance with no exploration"""
    # Temporarily set epsilon to 0 for evaluation
    original_epsilon = agent.epsilon
    agent.epsilon = 0.0
    
    scores = []
    max_tiles = []
    
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        
        if render:
            print(f"\nEvaluation Episode {episode + 1}/{num_episodes}")
            env.render()
        
        while not done:
            # Choose the best action according to the learned policy
            action = agent.get_action(state)  # With epsilon=0, this will always choose the best action
            
            # Take the action
            state, _, done, info = env.step(action)
            
            if render:
                print(f"\nAction: {Game2048Env.ACTIONS[action]}")
                env.render()
        
        scores.append(info['score'])
        max_tiles.append(info['max_tile'])
        
        if render:
            print(f"Final Score: {info['score']}, Max Tile: {info['max_tile']}")
    
    # Restore original epsilon
    agent.epsilon = original_epsilon
    
    # Print evaluation results
    print(f"\nEvaluation Results:")
    print(f"Average Score: {np.mean(scores):.2f}")
    print(f"Max Score: {max(scores)}")
    print(f"Average Max Tile: {np.mean(max_tiles):.2f}")
    print(f"Max Tile Achieved: {max(max_tiles)}")
    
    return scores, max_tiles

In [None]:
# Evaluate the trained agent
print("Evaluating the trained agent...")
eval_scores, eval_max_tiles = evaluate_agent(agent, env, num_episodes=3, render=True)

## 7. Improving the Agent

Here are some ideas for further improving the agent:

1. **Better State Representation**: Use a CNN to capture the spatial structure of the grid.
2. **Improved Reward Function**: Design a more sophisticated reward function that considers board structure.
3. **Additional Training**: Continue training for more episodes to improve performance.
4. **Advanced Techniques**: Implement extensions like Double DQN or Dueling DQN.
5. **Hyperparameter Tuning**: Experiment with different hyperparameters to optimize performance.

## 8. Conclusion

In this notebook, we've implemented a Deep Q-Learning agent to play the 2048 game. The agent learns to maximize its score by making strategic moves based on the current game state.

The training process involves:
1. Creating a 2048 game environment suitable for reinforcement learning
2. Implementing a Deep Q-Network with experience replay
3. Training the agent using epsilon-greedy exploration
4. Evaluating the trained agent's performance

While our implementation achieves decent results, there's always room for improvement with more sophisticated techniques and longer training periods.