<a href="https://colab.research.google.com/github/richy486/MoreChess/blob/main/notesbooks/RL_Chess_Pawn_Game_DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# Check if running in Google Colab
import sys
IN_COLAB = 'google.colab' in sys.modules

BOARD_SIZE = 6  # Change this to modify board dimensions (e.g., 6 for 6x6)


# Use GPU if available
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using device: {DEVICE}")


Using device: cuda


In [None]:
# Define the Pawn-Only Chess Environment

class PawnChessEnv:
    def __init__(self, board_size=BOARD_SIZE):
        self.board_size = board_size  # ✅ Store board size
        self.board = np.zeros((board_size, board_size))
        self.board[0, :] = 1  # Player 1's Pawns
        self.board[-1, :] = -1 # Player 2's Pawns
        self.state_size = board_size * board_size  # 4×4 board flattened
        self.action_size = board_size * board_size  # Possible moves (flattened board positions)

    def reset(self):
        self.__init__(self.board_size)
        return self.get_state()

    def get_state(self):
        return self.board.flatten().astype(np.float32)  # Convert to float32

    def get_valid_moves(self, player):
        valid_moves = []
        direction = 1 if player == 1 else -1  # Player 1 moves down, Player 2 moves up

        for row in range(self.board_size):
            for col in range(self.board_size):
                if self.board[row, col] == player:
                    new_row = row + direction

                    # Forward move (if empty)
                    if 0 <= new_row < self.board_size and self.board[new_row, col] == 0:
                        valid_moves.append((row, col, new_row, col))  # (old_x, old_y, new_x, new_y)

                    # Attack moves (diagonal left & right)
                    for new_col in [col - 1, col + 1]:
                        if 0 <= new_row < self.board_size and 0 <= new_col < self.board_size:
                            if self.board[new_row, new_col] == -player:  # Opponent piece
                                valid_moves.append((row, col, new_row, new_col))
        return valid_moves

    def step(self, move):
        old_x, old_y, new_x, new_y = move
        self.board[new_x, new_y] = self.board[old_x, old_y]
        self.board[old_x, old_y] = 0  # Empty old position

        reward = 1 if new_x == 3 or new_x == 0 else 0  # Reward if pawn reaches the last row
        done = new_x == 3 or new_x == 0  # End episode if pawn reaches end
        return self.get_state(), reward, done


In [None]:
# Define the Deep Q-Network (DQN) Model

class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)

        # Q-value prediction (for reinforcement learning)
        self.q_values = nn.Linear(64, action_size)

        # Move prediction (new output for ML package)
        self.move_output = nn.Linear(64, 4)  # Outputs (fromX, fromY, toX, toY)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))

        q_values = self.q_values(x)  # Standard Q-value predictions
        move_output = self.move_output(x)  # Move prediction

        return q_values, move_output  # Return both outputs


In [None]:
# Define the DQN Agent

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.model = DQN(state_size, action_size).to(DEVICE)
        self.target_model = DQN(state_size, action_size).to(DEVICE)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.memory = deque(maxlen=1000)
        self.gamma = 0.8  # Discount factor
        self.epsilon = 0.5  # Exploration-exploitation tradeoff

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state, env):
        valid_moves = env.get_valid_moves(1)  # AI is player 1
        if not valid_moves:
            return None  # No valid move

        #if random.random() < self.epsilon:
        #    return random.choice(valid_moves)  # Random valid move

        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)

        with torch.no_grad():
          q_values, move_output = self.model(state_tensor)  # Get Q-values and move prediction

        #print("Python Model Input (before inference):", state)  # Debugging line

        # Convert move output to integer values
        fromX, fromY, toX, toY = move_output.cpu().numpy().flatten().astype(int)

        # with torch.no_grad():
        #     action_values = self.model(state_tensor).cpu().numpy().flatten()

        # # Pick the move with the highest Q-value
        # best_move = max(valid_moves, key=lambda move: action_values[move[2] * 4 + move[3]])
        # return best_move

        # Validate the move, fallback if necessary
        best_move = (fromX, fromY, toX, toY)
        if best_move not in valid_moves:
            best_move = random.choice(valid_moves)  # Fallback to a valid move

        return best_move  # Now returning (fromX, fromY, toX, toY)

        # def train(self):
        #     if len(self.memory) < 32:
        #         return
        #     batch = random.sample(self.memory, 32)
        #     for state, action, reward, next_state, done in batch:
        #         target = reward
        #         if not done:
        #             target += self.gamma * torch.max(self.target_model(torch.FloatTensor(next_state).unsqueeze(0).to(DEVICE))).item()
        #         target_f = self.model(torch.FloatTensor(state).unsqueeze(0).to(DEVICE))
        #         target_f[0][action[2] * 4 + action[3]] = target  # Target Q-value update

        #         self.optimizer.zero_grad()
        #         loss = nn.MSELoss()(self.model(torch.FloatTensor(state).unsqueeze(0).to(DEVICE)), target_f)
        #         loss.backward()
        #         self.optimizer.step()
    def train(self):
      if len(self.memory) < 32:
          return

      batch = random.sample(self.memory, 32)

      for state, action, reward, next_state, done in batch:
          target = reward
          if not done:
              next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0).to(DEVICE)

              # Extract Q-values from the model output
              next_q_values, _ = self.target_model(next_state_tensor)  # Ignore move_output

              target += self.gamma * torch.max(next_q_values).item()  # Apply discount factor

          state_tensor = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)

          # Extract Q-values from model output
          q_values, _ = self.model(state_tensor)  # Ignore move_output

          target_f = q_values.clone()
          target_f[0][action[2] * 4 + action[3]] = target  # Target Q-value update

          self.optimizer.zero_grad()
          loss = nn.MSELoss()(q_values, target_f)  # Compute loss
          loss.backward()
          self.optimizer.step()


In [None]:
# Training the DQN Agent

env = PawnChessEnv(board_size=BOARD_SIZE)
agent = DQNAgent(env.state_size, env.action_size)

# for episode in range(1000):
for episode in range(10):
    state = env.reset()
    done = False
    while not done:
        action = agent.act(state, env)
        if action is None:
            break  # No valid moves left
        next_state, reward, done = env.step(action)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        agent.train()

    # Update target model every 100 episodes
    if episode % 100 == 0:
        agent.target_model.load_state_dict(agent.model.state_dict())
        print(f"Episode {episode} complete.")


Episode 0 complete.


In [None]:
# Save & Download the Model

torch.save(agent.model.state_dict(), "dqn_pawn_chess.pth")

# if IN_COLAB:
#     from google.colab import files
#     files.download("dqn_pawn_chess.pth")


In [None]:
!pip install coremltools torch torchvision

import torch
import coremltools as ct

# Load your trained model
class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)

        # Q-value prediction (for reinforcement learning)
        self.q_values = nn.Linear(64, action_size)

        # Move prediction (new output for ML package)
        self.move_output = nn.Linear(64, 4)  # Outputs (fromX, fromY, toX, toY)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))

        q_values = self.q_values(x)  # Standard Q-value predictions
        move_output = self.move_output(x)  # Move prediction

        return q_values, move_output  # Return both outputs

# Load the trained weights
state_size = BOARD_SIZE * BOARD_SIZE  # Ensure this matches your board size
action_size = BOARD_SIZE * BOARD_SIZE
model = DQN(state_size, action_size)
model.load_state_dict(torch.load("dqn_pawn_chess.pth", map_location=torch.device('cpu')))
model.eval()

# Convert the PyTorch model to Core ML
example_input = torch.rand(1, state_size)  # Example input tensor
traced_model = torch.jit.trace(model, example_input)  # Convert to TorchScript
mlmodel = ct.convert(traced_model,
                     inputs=[ct.TensorType(name="board", shape=example_input.shape)]
                    #  ,
                    #  outputs=[ct.TensorType(name="q_values")]
                     )

# Save the Core ML model
mlmodel.save("DQN_PawnGame.mlpackage")

# Download for use in Swift (if using Google Colab)
import shutil
import os
from google.colab import files
# Create a zip archive of the folder
shutil.make_archive("DQN_PawnGame", 'zip', "DQN_PawnGame.mlpackage")

# Download the zip archive
files.download("DQN_PawnGame.zip")

print("Done")


Collecting coremltools
  Downloading coremltools-8.2-cp311-none-manylinux1_x86_64.whl.metadata (2.5 kB)
Collecting cattrs (from coremltools)
  Downloading cattrs-24.1.3-py3-none-any.whl.metadata (8.4 kB)
Collecting pyaml (from coremltools)
  Downloading pyaml-25.1.0-py3-none-any.whl.metadata (12 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12

Converting PyTorch Frontend ==> MIL Ops:  71%|███████▏  | 5/7 [00:00<00:00, 3269.65 ops/s]
Running MIL frontend_pytorch pipeline: 100%|██████████| 5/5 [00:00<00:00, 2163.13 passes/s]
Running MIL default pipeline: 100%|██████████| 89/89 [00:00<00:00, 1455.88 passes/s]
Running MIL backend_mlprogram pipeline: 100%|██████████| 12/12 [00:00<00:00, 2568.33 passes/s]


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Done


In [None]:
# Load the Trained Model

import torch

# Load trained model
agent.model.load_state_dict(torch.load("dqn_pawn_chess.pth", map_location=DEVICE))
agent.model.eval()  # Set to evaluation mode
print("Model Loaded!")


Model Loaded!


In [None]:
# Helper Function to Print the Board

def print_board(board):
    board_symbols = {1: "♙", -1: "♟", 0: "."}
    for row in board:
        print(" ".join(board_symbols[cell] for cell in row))
    print("\n")


In [None]:
# Play a Game with AI Moves

env = PawnChessEnv()
state = env.reset()
print("Initial Board:")
print_board(env.board)

done = False
player_turn = 1  # AI starts first

while not done:
    if player_turn == 1:
        action = agent.act(state, env)
    else:
        valid_moves = env.get_valid_moves(-1)
        action = random.choice(valid_moves) if valid_moves else None

    if action is None:
        print("No valid moves left. Game Over!")
        break

    state, reward, done = env.step(action)
    print(f"Player {player_turn} moves: {action}")
    print_board(env.board)

    if reward == 1:
        print(f"Player {player_turn} wins!")
        break

    # Switch player
    player_turn *= -1


Initial Board:
♙ ♙ ♙ ♙ ♙ ♙
. . . . . .
. . . . . .
. . . . . .
. . . . . .
♟ ♟ ♟ ♟ ♟ ♟


Player 1 moves: (0, 4, 1, 4)
♙ ♙ ♙ ♙ . ♙
. . . . ♙ .
. . . . . .
. . . . . .
. . . . . .
♟ ♟ ♟ ♟ ♟ ♟


Player -1 moves: (5, 5, 4, 5)
♙ ♙ ♙ ♙ . ♙
. . . . ♙ .
. . . . . .
. . . . . .
. . . . . ♟
♟ ♟ ♟ ♟ ♟ .


Player 1 moves: (0, 1, 1, 1)
♙ . ♙ ♙ . ♙
. ♙ . . ♙ .
. . . . . .
. . . . . .
. . . . . ♟
♟ ♟ ♟ ♟ ♟ .


Player -1 moves: (5, 2, 4, 2)
♙ . ♙ ♙ . ♙
. ♙ . . ♙ .
. . . . . .
. . . . . .
. . ♟ . . ♟
♟ ♟ . ♟ ♟ .


Player 1 moves: (1, 4, 2, 4)
♙ . ♙ ♙ . ♙
. ♙ . . . .
. . . . ♙ .
. . . . . .
. . ♟ . . ♟
♟ ♟ . ♟ ♟ .


Player -1 moves: (4, 5, 3, 5)
♙ . ♙ ♙ . ♙
. ♙ . . . .
. . . . ♙ .
. . . . . ♟
. . ♟ . . .
♟ ♟ . ♟ ♟ .


Player -1 wins!
