In [None]:
"""
Step 1: Game Environment Implementation
First, we need to create a 2048 game environment that will:

Maintain the game state
Process moves
Calculate rewards
Determine game termination"
"""

import numpy as np
import random

class Game2048:
    def __init__(self):
        # Initialize 4x4 grid with zeros
        self.board = np.zeros((4, 4), dtype=int)
        self.score = 0
        self.game_over = False
        # Add initial tiles
        self.add_random_tile()
        self.add_random_tile()
    
    def reset(self):
        # Reset the game to initial state
        self.__init__()
        return self.get_state()
    
    def add_random_tile(self):
        # Find all empty cells
        empty_cells = np.where(self.board == 0)
        if len(empty_cells[0]) > 0:
            # Choose a random empty cell
            idx = random.randint(0, len(empty_cells[0]) - 1)
            row, col = empty_cells[0][idx], empty_cells[1][idx]
            # Place a 2 (90% chance) or 4 (10% chance)
            self.board[row, col] = 2 if random.random() < 0.9 else 4
    
    def move(self, direction):
        # 0: left, 1: up, 2: right, 3: down
        # Save original state for comparison
        original_board = self.board.copy()
        original_score = self.score
        
        # Move tiles based on direction
        if direction == 0:  # Left
            self.board, score_increase = self._move_left(self.board)
        elif direction == 1:  # Up
            self.board = np.rot90(self.board, k=1)
            self.board, score_increase = self._move_left(self.board)
            self.board = np.rot90(self.board, k=-1)
        elif direction == 2:  # Right
            self.board = np.rot90(self.board, k=2)
            self.board, score_increase = self._move_left(self.board)
            self.board = np.rot90(self.board, k=-2)
        elif direction == 3:  # Down
            self.board = np.rot90(self.board, k=-1)
            self.board, score_increase = self._move_left(self.board)
            self.board = np.rot90(self.board, k=1)
        
        # Update score
        self.score += score_increase
        
        # Check if board changed
        moved = not np.array_equal(self.board, original_board)
        
        # Add new tile if board changed
        if moved:
            self.add_random_tile()
        
        # Check if game is over
        self.game_over = self._is_game_over()
        
        return moved, score_increase, self.game_over
    
    def _move_left(self, board):
        new_board = np.zeros_like(board)
        score_increase = 0
        
        for row in range(4):
            # Extract non-zero elements
            elements = board[row][board[row] != 0]
            merged = []
            i = 0
            
            # Merge adjacent identical tiles
            while i < len(elements):
                if i + 1 < len(elements) and elements[i] == elements[i + 1]:
                    merged.append(elements[i] * 2)
                    score_increase += elements[i] * 2
                    i += 2
                else:
                    merged.append(elements[i])
                    i += 1
            
            # Place merged elements in new board
            new_board[row, :len(merged)] = merged
        
        return new_board, score_increase
    
    def _is_game_over(self):
        # Check if there are any empty cells
        if np.any(self.board == 0):
            return False
        
        # Check if there are any adjacent identical tiles
        for row in range(4):
            for col in range(4):
                current = self.board[row, col]
                # Check right
                if col < 3 and current == self.board[row, col + 1]:
                    return False
                # Check down
                if row < 3 and current == self.board[row + 1, col]:
                    return False
        
        return True
    
    def get_state(self):
        # Return the current state representation
        return self.board.copy()
    
    def get_valid_actions(self):
        # Return list of valid actions
        valid_actions = []
        for direction in range(4):
            # Test if move is valid (board changes)
            test_board = self.board.copy()
            if direction == 0:  # Left
                result, _ = self._move_left(test_board)  # Only unpack 2 values
                if not np.array_equal(result, test_board):
                    valid_actions.append(0)
            elif direction == 1:  # Up
                test_board = np.rot90(test_board, k=1)
                result, _ = self._move_left(test_board)  # Only unpack 2 values
                if not np.array_equal(result, np.rot90(test_board, k=-1)):
                    valid_actions.append(1)
            elif direction == 2:  # Right
                test_board = np.rot90(test_board, k=2)
                result, _ = self._move_left(test_board)  # Only unpack 2 values
                if not np.array_equal(result, np.rot90(test_board, k=-2)):
                    valid_actions.append(2)
            elif direction == 3:  # Down
                test_board = np.rot90(test_board, k=-1)
                result, _ = self._move_left(test_board)  # Only unpack 2 values
                if not np.array_equal(result, np.rot90(test_board, k=1)):
                    valid_actions.append(3)
        
        return valid_actions

In [None]:
"""
Step 2: State Representation
"""

import torch
import torch.nn.functional as F
from math import log2
from numpy import array, zeros

def transform_game_state(game_board):
    # Flatten and prepare the board data
    flattened = array(game_board).ravel()
    
    # Transform values using logarithm base 2
    transformed_values = []
    for cell in flattened:
        transformed_values.append(0 if cell == 0 else int(log2(cell)))
    
    # Create tensor from transformed values
    state_tensor = torch.tensor(transformed_values, dtype=torch.int64)
    
    # Create one-hot representation
    expanded_tensor = F.one_hot(state_tensor, num_classes=16).to(torch.float32)
    
    # Reshape into format suitable for convolutional networks: [batch, channels, height, width]
    reshaped_tensor = expanded_tensor.view(1, 16, 4, 4)
    
    return reshaped_tensor

In [None]:
"""
Step 3: Deep Q-Network Architecture
"""

import torch.nn as nn
import torch

class Block(nn.Module):
    def __init__(self, in_ch, out_ch):
        super(Block, self).__init__()
        # Each filter size gets 1/4 of channels
        c = out_ch // 4
        self.c1 = nn.Conv2d(in_ch, c, 1, padding='same')
        self.c2 = nn.Conv2d(in_ch, c, 2, padding='same')
        self.c3 = nn.Conv2d(in_ch, c, 3, padding='same')
        self.c4 = nn.Conv2d(in_ch, c, 4, padding='same')
    
    def forward(self, x):
        # Join outputs from different filters
        o1 = self.c1(x)
        o2 = self.c2(x)
        o3 = self.c3(x)
        o4 = self.c4(x)
        return torch.cat([o1, o2, o3, o4], 1)

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        # Layers
        self.b1 = Block(16, 128)
        self.b2 = Block(128, 256)
        self.b3 = Block(256, 512)
        
        self.l1 = nn.Linear(512 * 16, 256)
        self.l2 = nn.Linear(256, 4)  # 4 actions
        
        self.act = nn.ReLU()
        self.drop = nn.Dropout(0.2)
    
    def forward(self, x):
        # Process through blocks
        x = self.act(self.b1(x))
        x = self.act(self.b2(x))
        x = self.act(self.b3(x))
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Process through linear layers
        x = self.drop(self.act(self.l1(x)))
        x = self.l2(x)
        
        return x

In [None]:
"""
Step 4: Experience Replay Buffer
"""
    
from collections import deque, namedtuple
import random

# Define a data structure for experiences
Step = namedtuple('Step', 
                ('s', 'a', 'next_s', 'r', 'done'))

class Memory:
    def __init__(self, size):
        self.buffer = deque(maxlen=size)
    
    def add(self, s, a, next_s, r, done):
        # Store new experience
        self.buffer.append(Step(s, a, next_s, r, done))
    
    def get_batch(self, size):
        # Return random batch
        return random.sample(self.buffer, size)
    
    def __len__(self):
        return len(self.buffer)

In [None]:
"""
Step 5: Model Agent
"""

import torch.optim as optim

class DQNAgent:
    def __init__(self, state_size, action_size, 
                 buffer_size=100000, batch_size=64, gamma=0.99,
                 learning_rate=1e-4, update_every=5):
        # Initialize parameters
        self.state_size = state_size
        self.action_size = action_size
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.gamma = gamma
        self.learning_rate = learning_rate
        self.update_every = update_every
        
        # Device configuration
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Q-Networks
        self.policy_net = Model().to(self.device)
        self.target_net = Model().to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()  # Set to evaluation mode
        
        # Optimizer
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=learning_rate)
        
        # Replay buffer
        self.memory = Memory(buffer_size)
        
        # Initialize exploration parameters
        self.eps_start = 0.9
        self.eps_end = 0.05
        self.eps_decay = 0.9999
        self.eps = self.eps_start
        
        # Training step counter
        self.t_step = 0
    
    def step(self, state, action, reward, next_state, done):
        # Save experience in replay memory
        self.memory.add(state, action, next_state, reward, done)
        
        # Increment step counter
        self.t_step += 1
        
        # If enough samples in memory, perform learning
        if len(self.memory) > self.batch_size and self.t_step % self.update_every == 0:
            experiences = self.memory.get_batch(self.batch_size)
            self.learn(experiences)
    
    def act(self, state, valid_actions=None):
        # Epsilon-greedy action selection
        if random.random() > self.eps:
            # Check if state is already a tensor
            if not isinstance(state, torch.Tensor):
                state = torch.from_numpy(state).float().to(self.device)
            else:
                # Make sure it's on the right device
                state = state.to(self.device)
                
            self.policy_net.eval()
            with torch.no_grad():
                action_values = self.policy_net(state)
            self.policy_net.train()
            
            # If valid actions provided, only consider those
            if valid_actions:
                mask = torch.ones_like(action_values) * float('-inf')
                mask[0, valid_actions] = 0
                action_values = action_values + mask
            
            return action_values.cpu().data.numpy().argmax()
        else:
            # Choose random action
            return random.choice(valid_actions) if valid_actions else random.randint(0, self.action_size-1)
        
    def learn(self, experiences):
        # Unpack experiences
        states = torch.cat([e.s for e in experiences]).to(self.device)
        actions = torch.tensor([e.a for e in experiences], device=self.device).unsqueeze(1)
        rewards = torch.tensor([e.r for e in experiences], device=self.device).unsqueeze(1)
        next_states = torch.cat([e.next_s for e in experiences]).to(self.device)
        dones = torch.tensor([e.done for e in experiences], device=self.device).unsqueeze(1).float()
        
        # Get current Q values
        Q_expected = self.policy_net(states).gather(1, actions)
        
        # Get next Q values from target network
        with torch.no_grad():
            Q_targets_next = self.target_net(next_states).detach().max(1)[0].unsqueeze(1)
        
        # Compute target Q values
        Q_targets = rewards + (self.gamma * Q_targets_next * (1 - dones))
        
        # Compute loss
        loss = F.mse_loss(Q_expected, Q_targets)
        
        # Minimize the loss
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Update target network
        if self.t_step % self.update_every == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())
        
        # Update epsilon
        self.eps = max(self.eps_end, self.eps * self.eps_decay)
        
        return loss.item()
    
    def save(self, filename):
        torch.save({
            'policy_net_state_dict': self.policy_net.state_dict(),
            'target_net_state_dict': self.target_net.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epsilon': self.eps,
        }, filename)
    
    def load(self, filename):
        checkpoint = torch.load(filename)
        self.policy_net.load_state_dict(checkpoint['policy_net_state_dict'])
        self.target_net.load_state_dict(checkpoint['target_net_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.eps = checkpoint['epsilon']

In [None]:
"""
Step 6: Training Loop
"""

def train_agent(num_episodes=10000, max_steps=10000):
    # Initialize environment and agent
    env = Game2048()
    agent = DQNAgent(state_size=16, action_size=4)
    
    # Training statistics
    scores = []
    max_tiles = []
    
    for episode in range(1, num_episodes+1):
        # Reset environment
        state = env.reset()
        state = transform_game_state(state)
        score = 0
        
        for step in range(max_steps):
            # Get valid actions
            valid_actions = env.get_valid_actions()
            
            # If no valid actions, game is over
            if not valid_actions:
                break
            
            # Select action
            action = agent.act(state, valid_actions)
            
            # Take action
            moved, reward, done = env.move(action)
            
            # If move didn't change board, assign negative reward
            if not moved:
                reward = -1
            
            # Get next state
            next_state = env.get_state()
            next_state = transform_game_state(next_state)
            
            # Update agent
            agent.step(state, action, reward, next_state, done)
            
            # Update state and score
            state = next_state
            score += reward
            
            if done:
                break
        
        # Record statistics
        scores.append(score)
        max_tiles.append(env.board.max())
        
        # Print progress
        if episode % 100 == 0:
            print(f"Episode {episode}/{num_episodes}, Avg Score: {sum(scores[-100:])/100:.2f}, Max Tile: {max(max_tiles[-100:])}")
            # Save model
            agent.save(f"dqn_2048_episode_{episode}.pth")
    
    return agent, scores, max_tiles

In [None]:
"""
Step 7: Evaluation Loop
"""
def evaluate_agent(agent, num_games=100, max_steps=1000):
    env = Game2048()
    scores = []
    max_tiles = []
    
    for i in range(num_games):
        state = env.reset()
        state = transform_game_state(state)
        done = False
        total_score = 0
        step_count = 0
        
        while not done and step_count < max_steps:
            # Get valid actions
            valid_actions = env.get_valid_actions()
            
            # If no valid actions, game is over
            if not valid_actions:
                break
            
            # Select action (no exploration)
            agent.eps = 0
            
            # Try actions until a valid one is found
            moved = False
            max_attempts = 10  # Limit the number of attempts to find a valid move
            attempts = 0
            
            while not moved and attempts < max_attempts and valid_actions:
                action = agent.act(state, valid_actions)
                
                # Take action
                moved, reward, done = env.move(action)
                
                # If move didn't change board, remove this action and try again
                if not moved:
                    if action in valid_actions:
                        valid_actions.remove(action)
                    attempts += 1
                    # Restore the board state since the move was invalid
                    env.board = env.board.copy()  # This might not be necessary if env.move() already restores the state
                else:
                    # Valid move found, update state and score
                    state = env.get_state()
                    state = transform_game_state(state)
                    total_score += reward
                    step_count += 1
            
            # If we couldn't find a valid move after max attempts, end the game
            if not moved:
                break
        
        scores.append(total_score)
        max_tiles.append(env.board.max())
        
        if i % 10 == 0:
            print(f"Game {i}/{num_games}, Score: {total_score}, Max Tile: {env.board.max()}, Steps: {step_count}")
    
    print(f"Average Score: {sum(scores)/len(scores):.2f}")
    print(f"Average Max Tile: {sum(max_tiles)/len(max_tiles):.2f}")
    
    return scores, max_tiles

In [None]:
"""
Step 8: Train Agent
"""

# Train agent
agent, train_scores, train_max_tiles = train_agent(num_episodes=1)

In [None]:
"""
Step 9: Evaluate Agent
"""
# Evaluate agent
print("Eval")
eval_scores, eval_max_tiles = evaluate_agent(agent, num_games=100)

# Plot results
import matplotlib.pyplot as plt

# Plot training scores
plt.figure(figsize=(10, 5))
plt.plot(train_scores)
plt.title('Training Scores')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.savefig('training_scores.png')

# Plot max tiles during training
plt.figure(figsize=(10, 5))
plt.plot(train_max_tiles)
plt.title('Max Tiles During Training')
plt.xlabel('Episode')
plt.ylabel('Max Tile')
plt.savefig('training_max_tiles.png')

# Plot evaluation score distribution
plt.figure(figsize=(10, 5))
plt.hist(eval_scores, bins=20)
plt.title('Evaluation Score Distribution')
plt.xlabel('Score')
plt.ylabel('Frequency')
plt.savefig('eval_score_dist.png')

# Plot evaluation max tile distribution
plt.figure(figsize=(10, 5))
unique, counts = np.unique(eval_max_tiles, return_counts=True)
plt.bar([str(int(x)) for x in unique], counts)
plt.title('Evaluation Max Tile Distribution')
plt.xlabel('Max Tile')
plt.ylabel('Frequency')
plt.savefig('eval_max_tile_dist.png')