In [2]:
# This constitutes the game logic

# %%
from random import randint, uniform
from numpy import *

# %%
def add_two(mat):
    a = randint(0, len(mat)-1) # Choose any x
    b = randint(0, len(mat)-1) # Choose any y

    while(mat[a][b]!=0):  # If the x,y coord is occupied, try again and again
        a = randint(0,len(mat)-1)
        b = randint(0,len(mat)-1)

    # 10% chance that the number will be 4 instead of 2. 
    mat[a][b] = (uniform(0, 1) < 0.1)*2 + 2  

    return mat

def reverse(mat):
    new=[]
    for i in range(len(mat)):
        new.append([])
        for j in range(len(mat[0])):
            new[i].append(mat[i][len(mat[0])-j-1])
    return new

def transpose(mat):
    new=[]
    for i in range(len(mat[0])):
        new.append([])
        for j in range(len(mat)):
            new[i].append(mat[j][i])
    return new

def cover_up(mat):
    new = [[0,0,0,0],[0,0,0,0],[0,0,0,0],[0,0,0,0]]
    done = False
    for i in range(4):
        count=0
        for j in range(4):
            if mat[i][j]!=0:
                new[i][count]=mat[i][j]
                if j!=count:
                    done=True
                count+=1
    return new, done

def merge(mat):
    done = False
    addPoints = 0
    for i in range(4):
         for j in range(3):
             if mat[i][j]==mat[i][j+1] and mat[i][j]!=0:
                 mat[i][j]*=2
                 addPoints += mat[i][j]
                 mat[i][j+1]=0
                 done=True
    return mat, done, addPoints

In [3]:
# This also constitutes the game logic

class GameState:
    """ Simple version of the 2048 board with moves, processing, and point system available.
        Use this class to pair with MCTS algorithm. """

    def __init__(self, mat):
        self.matrix = mat
        self.point_count = 0

    def __str__(self):
        s = ""
        for i in range(4):
            for j in range(4):
                s += "\t"
                s += str(self.matrix[i][j])
                if j == 3:
                    s += "\n"
        return s

    def game_state(self):
        mat = self.matrix
        for i in range(len(mat)):
            for j in range(len(mat[0])):
                if mat[i][j]==2048:
                    return 'win'
        for i in range(len(mat)-1): # Intentionally reduced to check the row on the right and below
            for j in range(len(mat[0])-1): # More elegant to use exceptions but most likely this will be their solution
                if mat[i][j]==mat[i+1][j] or mat[i][j+1]==mat[i][j]:
                    return 'not over'
        for i in range(len(mat)): # Check for any zero entries
            for j in range(len(mat[0])):
                if mat[i][j]==0:
                    return 'not over'
        for k in range(len(mat)-1): # To check the left/right entries on the last row
            if mat[len(mat)-1][k]==mat[len(mat)-1][k+1]:
                return 'not over'
        for j in range(len(mat)-1): # Check up/down entries on last column
            if mat[j][len(mat)-1]==mat[j+1][len(mat)-1]:
                return 'not over'
        return 'lose'

    # Done indicates whether the move does anything
    def up(self):
        game = self.matrix
        game = transpose(game)
        game, done = cover_up(game)
        temp = merge(game)
        game = temp[0]
        done = done or temp[1]
        game = cover_up(game)[0]
        game = transpose(game)
        add_points = temp[2]
        return game, done, add_points

    def down(self):
        game = self.matrix
        game = reverse(transpose(game))
        game, done = cover_up(game)
        temp = merge(game)
        game = temp[0]
        done = done or temp[1]
        game = cover_up(game)[0]
        game = transpose(reverse(game))
        add_points = temp[2]
        return game, done, add_points

    def left(self):
        game = self.matrix
        game, done = cover_up(game)
        temp = merge(game)
        game = temp[0]
        done = done or temp[1]
        game = cover_up(game)[0]
        add_points = temp[2]
        return game, done, add_points

    def right(self):
        game = self.matrix
        game = reverse(game)
        game, done = cover_up(game)
        temp = merge(game)
        game = temp[0]
        done = done or temp[1]
        game = cover_up(game)[0]
        game = reverse(game)
        add_points = temp[2]
        return game, done, add_points

    def clone(self):
        st = GameState(self.matrix)
        st.point_count = self.point_count
        return st
    
    def do_move(self, move):
        """ Move input should be one of the following: "up", "down", "left", "right"
            Make sure when this function is called, the move is a possible move. """
        move_funcs = {
            'up':       self.up(),
            'down':     self.down(),
            'left':     self.left(),
            'right':    self.right()
        }

        self.matrix, _, add_points = move_funcs[move]
        self.point_count += add_points # Update Points
        
        # Check if there any zeros in the grid.
        zero_exists = False
        for i in range(4):
            for j in range(4):
                if self.matrix[i][j] == 0:
                    zero_exists = True
                    break
        
        if zero_exists:
            self.matrix = add_two(self.matrix) # Add 2 or 4 in the matrix
            game_over = self.game_state() != 'not over'
            return game_over 
        else:
            game_over = True
            return game_over 
        
    def get_moves(self):
        """ Get all possible moves from this state. """
        _, done_up, _     = self.up()
        _, done_down, _   = self.down()
        _, done_left, _   = self.left()
        _, done_right, _  = self.right()

        move_possible = []
        if done_up:
            move_possible.append("up")
        if done_down:
            move_possible.append("down")
        if done_left:
            move_possible.append("left")
        if done_right:
            move_possible.append("right")
        
        return move_possible

    def get_result(self):
        """ Get the score of the given state."""
        return self.point_count

# %%
if __name__ == "__main__":
    initialMat = [[4,2,4,0],[0,0,4,0],[0,2,0,0],[8,0,2,0]]
    deathMatrix = [[8, 4, 2, 8], [2, 16, 8, 4], [256, 32, 4, 2], [4, 2, 4, 2]]

In [26]:
# --------------
# 2048 RL TRAINING FIXED
# --------------

import numpy as np
import random
import pickle

# -----------------
# GameState class
# -----------------

class GameState:
    def __init__(self, mat=None):
        self.matrix = np.array(mat) if mat is not None else np.zeros((4, 4), dtype=int)

    def clone(self):
        return GameState(self.matrix.copy())

    def add_random_tile(self):
        empty = list(zip(*np.where(self.matrix == 0)))
        if empty:
            y, x = random.choice(empty)
            self.matrix[y][x] = 2 if random.random() < 0.9 else 4

    def get_moves(self):
        moves = []
        for move in ['up', 'down', 'left', 'right']:
            clone = self.clone()
            if clone.do_move(move):
                moves.append(move)
        return moves

    def do_move(self, direction):
        moved = False
        for i in range(4):
            if direction in ('left', 'right'):
                line = self.matrix[i, :]
            else:
                line = self.matrix[:, i]

            if direction in ('right', 'down'):
                line = line[::-1]

            merged_line, done = self.merge(line)

            if done:
                moved = True
                if direction in ('left', 'right'):
                    if direction == 'left':
                        self.matrix[i, :] = merged_line
                    else:
                        self.matrix[i, :] = merged_line[::-1]
                else:
                    if direction == 'up':
                        self.matrix[:, i] = merged_line
                    else:
                        self.matrix[:, i] = merged_line[::-1]

        if moved:
            self.add_random_tile()
        return moved

    def merge(self, line):
        non_zero = line[line != 0]
        merged = []
        skip = False
        for j in range(len(non_zero)):
            if skip:
                skip = False
                continue
            if j + 1 < len(non_zero) and non_zero[j] == non_zero[j+1]:
                merged.append(non_zero[j]*2)
                skip = True
            else:
                merged.append(non_zero[j])
        merged = np.array(merged + [0]*(4-len(merged)))
        return merged, not np.array_equal(merged, line)


# -----------------
# N-Tuple TD Agent
# -----------------

import numpy as np
import random
import collections

class N_TupleTDAgent:
    def __init__(self, n_tuples, alpha=0.01, epsilon=0.1, gamma=0.99, lambda_=0.5):
        self.n_tuples = n_tuples
        self.alpha = alpha
        self.epsilon = epsilon
        self.gamma = gamma
        self.lambda_ = lambda_

        self.value_table = collections.defaultdict(float)
        self.eligibility_traces = collections.defaultdict(float)

    def reset_eligibilities(self):
        self.eligibility_traces.clear()

    def get_features(self, state):
        # Flatten the 4x4 state into 1D list of 16 tiles
        flat = [tile for row in state.matrix for tile in row]
        # Discretize with log2 to reduce state space
        log_tiles = [0 if x == 0 else int(np.log2(x)) for x in flat]

        # Extract features based on n-tuples
        features = []
        for tup in self.n_tuples:
            index = tuple(log_tiles[i] for i in tup)
            features.append(index)
        return features

    def value(self, features):
        return sum(self.value_table[f] for f in features)

    def choose_move(self, state):
        if random.random() < self.epsilon:
            possible_moves = state.get_moves()
            return random.choice(possible_moves) if possible_moves else None

        best_value = -float('inf')
        best_move = None

        for move in state.get_moves():
            next_state = state.clone()
            next_state.do_move(move)
            features = self.get_features(next_state)
            value = self.value(features)
            if value > best_value:
                best_value = value
                best_move = move

        return best_move

    def train(self, state, action, reward, next_state, done):
        features = self.get_features(state)
        next_features = self.get_features(next_state) if not done else []

        current_value = self.value(features)
        next_value = self.value(next_features) if not done else 0.0
        delta = reward + self.gamma * next_value - current_value

        # Update eligibility traces
        for f in features:
            self.eligibility_traces[f] += 1

        # Update weights
        for f in self.eligibility_traces:
            self.value_table[f] += self.alpha * delta * self.eligibility_traces[f]
            self.eligibility_traces[f] *= self.gamma * self.lambda_

        if done:
            self.reset_eligibilities()


In [31]:
import os
import pickle
import numpy as np

# --- 1. Try loading previous agent ---
checkpoint_file = "agent_checkpoint.pkl"

if os.path.exists(checkpoint_file):
    with open(checkpoint_file, "rb") as f:
        agent = pickle.load(f)
    print("✅ Loaded agent from checkpoint!")
    
    # Also try loading episode scores if available
    try:
        episode_scores = list(np.load("episode_scores.npy"))
        episode_max_tiles = list(np.load("episode_max_tiles.npy"))
        print("✅ Loaded episode tracking stats.")
    except:
        episode_scores = []
        episode_max_tiles = []
        print("⚠️ Could not load tracking stats, starting fresh.")
else:
    # --- 2. Create a new agent ---
    n_tuples = [
        (0, 1, 2, 3),
        (4, 5, 6, 7),
        (8, 9, 10, 11),
        (12, 13, 14, 15)
        ]
    agent = N_TupleTDAgent(
        n_tuples,
        0.005,   # alpha (learning rate)
        0.01,    # epsilon (exploration rate)
        1.0    # gamma (discount factor)
    )
    episode_scores = []
    episode_max_tiles = []
    print("🚀 No checkpoint found. Starting new agent.")


🚀 No checkpoint found. Starting new agent.


In [32]:
indicator_fresh_start = 1

epsilon = 1.0
epsilon_decay = 0.9999
min_epsilon = 0.01

def new_game():
    state = GameState()
    state.add_random_tile()
    state.add_random_tile()
    return state.matrix

if indicator_fresh_start == 1:
    # -----------------
    # Initialize agent
    # -----------------

    n_tuples = [

        # Rows
        (0, 1, 2, 3),
        (4, 5, 6, 7),
        (8, 9, 10, 11),
        (12, 13, 14, 15),

        # Columns
        (0, 4, 8, 12),
        (1, 5, 9, 13),
        (2, 6, 10, 14),
        (3, 7, 11, 15),

        # Diagonals
        (0, 5, 10, 15),
        (3, 6, 9, 12)
    ]


    # Initialization
    epsilon = 1.0
    epsilon_decay = 0.9996
    min_epsilon = 0.01
    agent = N_TupleTDAgent(
            n_tuples,
            0.002,   # alpha (learning rate)
            epsilon,    # epsilon (exploration rate)
            0.99,    # gamma (discount factor)
            0.5 # lambda
        )

    # -----------------
    # Training Loop
    # -----------------

    episode_scores = []
    episode_max_tiles = []



n_episodes = 300000
import builtins  # Add this at the top of your file
for episode in range(n_episodes):
    epsilon = builtins.max(min_epsilon, epsilon * epsilon_decay)
    agent.epsilon = epsilon
    state = GameState(new_game())
    agent.reset_eligibilities()

    done = False

    while not done:
        possible_moves = state.get_moves()
        if not possible_moves:
            break

        move = agent.choose_move(state)
        if move is None:
            break

        next_state = state.clone()
        next_state.do_move(move)

        reward = np.sum(next_state.matrix) - np.sum(state.matrix)
        done = not next_state.get_moves()

        agent.train(state, move, reward, next_state, done)

        state = next_state

    final_sum = np.sum(state.matrix)
    max_tile = np.max(state.matrix)

    episode_scores.append(final_sum)
    episode_max_tiles.append(max_tile)

    if (episode + 1) % 100 == 0:
        avg_score_100 = np.mean(episode_scores[-100:])
        best_tile_100 = np.max(episode_max_tiles[-100:])

        print(f"\n📢 Episode {episode + 1} summary:")
        print(f"🧮 Average final board sum (last 100): {avg_score_100:.2f}")
        print(f"🏆 Best tile achieved (last 100): {best_tile_100}")

        with open("agent_checkpoint.pkl", "wb") as f:
            pickle.dump(agent, f)
        np.save("episode_scores.npy", np.array(episode_scores))
        np.save("episode_max_tiles.npy", np.array(episode_max_tiles))


  return sum(self.value_table[f] for f in features)



📢 Episode 100 summary:
🧮 Average final board sum (last 100): 274.42
🏆 Best tile achieved (last 100): 256

📢 Episode 200 summary:
🧮 Average final board sum (last 100): 271.54
🏆 Best tile achieved (last 100): 256

📢 Episode 300 summary:
🧮 Average final board sum (last 100): 260.04
🏆 Best tile achieved (last 100): 256

📢 Episode 400 summary:
🧮 Average final board sum (last 100): 274.06
🏆 Best tile achieved (last 100): 256

📢 Episode 500 summary:
🧮 Average final board sum (last 100): 276.84
🏆 Best tile achieved (last 100): 256

📢 Episode 600 summary:
🧮 Average final board sum (last 100): 264.46
🏆 Best tile achieved (last 100): 256

📢 Episode 700 summary:
🧮 Average final board sum (last 100): 269.04
🏆 Best tile achieved (last 100): 256

📢 Episode 800 summary:
🧮 Average final board sum (last 100): 276.44
🏆 Best tile achieved (last 100): 256

📢 Episode 900 summary:
🧮 Average final board sum (last 100): 276.38
🏆 Best tile achieved (last 100): 512

📢 Episode 1000 summary:
🧮 Average final board

KeyboardInterrupt: 

In [33]:
import pandas as pd

# Load arrays if not already in memory
episode_scores = np.load("episode_scores.npy")
episode_max_tiles = np.load("episode_max_tiles.npy")

# Create a DataFrame
df = pd.DataFrame({
    "Final Board Sum": episode_scores,
    "Highest Tile": episode_max_tiles
})

# Save to Excel
df.to_excel("training_results_success.xlsx", index=False)

print("✅ Exported to training_results.xlsx!")

✅ Exported to training_results.xlsx!


In [None]:
# 🧹 RESET EVERYTHING (agent, stats, files) - FINAL WORKING

import os
import numpy as np

indicator = 0
if indicator == 1:
    # 1. Delete old save files if they exist
    files_to_delete = ["agent_checkpoint.pkl", "episode_scores.npy", "episode_max_tiles.npy"]
    for file in files_to_delete:
        if os.path.exists(file):
            os.remove(file)
            print(f"✅ Deleted {file}")
        else:
            print(f"ℹ️ {file} does not exist, skipping.")

    # 2. Re-initialize empty agent (only 4 arguments: n_tuples, alpha, epsilon, gamma)
    example_n_tuples = [
        (0, 1, 2, 3),
        (4, 5, 6, 7),
        (8, 9, 10, 11),
        (12, 13, 14, 15),
        # (0, 4, 8, 12),
        # (1, 5, 9, 13),
        # (2, 6, 10, 14),
        # (3, 7, 11, 15)
    ]

    agent = N_TupleTDAgent(
        example_n_tuples,
        0.005,   # alpha (learning rate)
        0.01,    # epsilon (exploration rate)
        0.99    # gamma (discount factor)
    )

    print("🔄 Agent re-initialized successfully!")

    # 3. Reset episode tracking lists
    episode_scores = []
    episode_max_tiles = []
    print("🗑️ Episode score histories cleared!")


✅ Deleted agent_checkpoint.pkl
✅ Deleted episode_scores.npy
✅ Deleted episode_max_tiles.npy
🔄 Agent re-initialized successfully!
🗑️ Episode score histories cleared!
