In [5]:
!pip install torch numpy

Collecting torch
  Using cached torch-2.5.1-cp311-none-macosx_11_0_arm64.whl.metadata (28 kB)
Collecting filelock (from torch)
  Using cached filelock-3.16.1-py3-none-any.whl.metadata (2.9 kB)
Collecting networkx (from torch)
  Downloading networkx-3.4.2-py3-none-any.whl.metadata (6.3 kB)
Collecting fsspec (from torch)
  Using cached fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Collecting sympy==1.13.1 (from torch)
  Using cached sympy-1.13.1-py3-none-any.whl.metadata (12 kB)
Collecting mpmath<1.4,>=1.1.0 (from sympy==1.13.1->torch)
  Using cached mpmath-1.3.0-py3-none-any.whl.metadata (8.6 kB)
Using cached torch-2.5.1-cp311-none-macosx_11_0_arm64.whl (63.9 MB)
Using cached sympy-1.13.1-py3-none-any.whl (6.2 MB)
Using cached filelock-3.16.1-py3-none-any.whl (16 kB)
Using cached fsspec-2024.12.0-py3-none-any.whl (183 kB)
Downloading networkx-3.4.2-py3-none-any.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m3.2 MB/s[0m eta [36m0:00:

### In this case, the environment is Tic-Tac-Toe. We will define the game environment in a way that can be interacted with by the DQN agent.

In [1]:
import random
import numpy as np

In [2]:
class TicTacToe:
    def __init__(self):
        self.board = np.zeros((3, 3), dtype=int)  # A 3x3 grid
        self.current_player = 1  # Player 1 starts (X)

    def clone(self):
        """Returns a new instance with the current board state."""
        new_game = TicTacToe()
        new_game.board = np.copy(self.board)
        new_game.current_player = self.current_player
        return new_game

    def reset(self):
        """Reset the game board."""
        self.board = np.zeros((3, 3), dtype=int)
        self.current_player = 1
        return self.board.flatten()  # Return the flattened state (1D array)

    def available_moves(self):
        """Get available moves."""
        return [(i, j) for i in range(3) for j in range(3) if self.board[i, j] == 0]

    def make_move(self, row, col, player):
        """Make a move for the current player."""
        if self.board[row, col] == 0:
            self.board[row, col] = player
            return True
        return False

    def check_winner(self):
        """Check if there's a winner."""
        for i in range(3):
            if abs(sum(self.board[i, :])) == 3: return 1 if self.board[i, 0] == 1 else -1
            if abs(sum(self.board[:, i])) == 3: return 1 if self.board[0, i] == 1 else -1
        if abs(sum(np.diag(self.board))) == 3: return 1 if self.board[0, 0] == 1 else -1
        if abs(sum(np.diag(np.fliplr(self.board)))) == 3: return 1 if self.board[0, 2] == 1 else -1
        if len(self.available_moves()) == 0: return 0  # Draw
        return None  # Game still in progress

    def display_board(self):
        """Display the game board."""
        for row in self.board:
            print(" | ".join(['X' if x == 1 else 'O' if x == -1 else ' ' for x in row]))
            print("-" * 9)


## Define the DQN Model
### The agent will use a deep neural network to approximate the Q-value function. The network will take the current game state as input and output the Q-values for each possible action.
### We’ll define a simple feed-forward neural network using PyTorch.

In [3]:
class Node:
    def __init__(self, state, parent=None, player=1):
        self.state = state  # TicTacToe instance
        self.parent = parent  # Parent node
        self.children = []  # List of child nodes
        self.visits = 0  # Number of times this node has been visited
        self.wins = 0  # Number of wins from this node
        self.player = player  # Player who made the move (-1 or 1)

    def is_fully_expanded(self):
        """Check if all possible moves have been explored."""
        return len(self.children) == len(self.state.available_moves())

    def add_child(self, child_node):
        self.children.append(child_node)

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim

ModuleNotFoundError: No module named 'torch'

In [6]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)  # First fully connected layer
        self.fc2 = nn.Linear(128, 128)  # Second fully connected layer
        self.fc3 = nn.Linear(128, output_dim)  # Output layer for Q-values

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)  # Q-values for each action

## Define the Experience Replay Buffer
## The experience replay buffer stores the agent's experiences (state, action, reward, next_state, done). The agent will sample random mini-batches from the buffer to train the DQN. This helps to break the correlation between consecutive experiences and stabilizes training.

In [7]:
class ReplayBuffer:
    def __init__(self, capacity=10000000):
        self.buffer = []
        self.capacity = capacity
        self.size = 0

    def push(self, experience):
        """Store an experience in the buffer."""
        if self.size < self.capacity:
            self.buffer.append(experience)
        else:
            self.buffer[self.size % self.capacity] = experience
        self.size += 1

    def sample(self, batch_size):
        """Sample a random mini-batch from the buffer."""
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)


## Define the Agent
### The DQN agent interacts with the environment, stores experiences in the replay buffer, and trains the model. The agent will choose actions using an epsilon-greedy policy (exploration vs exploitation).

In [8]:
class DQNAgent:
    def __init__(self, input_dim, output_dim, epsilon=0.2, gamma=0.99, batch_size=64, lr=1e-3):
        self.model = DQN(input_dim, output_dim)
        self.target_model = DQN(input_dim, output_dim)  # Target model for stable learning
        self.target_model.load_state_dict(self.model.state_dict())  # Initialize target model
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.epsilon = epsilon
        self.gamma = gamma
        self.batch_size = batch_size
        self.replay_buffer = ReplayBuffer()
        self.loss_fn = nn.MSELoss()
        self.encountered_states = set()  # Set to track seen states-action pairs

    def select_action(self, state):
        """Select an action using epsilon-greedy policy with only valid moves."""
        if random.random() < self.epsilon:
            # Random action, but only from valid moves
            valid_moves = [i for i in range(9) if state[i] == 0]
            return random.choice(valid_moves)
        else:
            # Convert the state to a tensor of shape [1, input_dim] for batch processing
            state_tensor = torch.FloatTensor(state).unsqueeze(0)  # Add batch dimension
            q_values = self.model(state_tensor)  # Get Q-values for all possible moves

            # Find valid moves and filter out invalid ones
            valid_moves = [i for i in range(9) if state[i] == 0]

            # Get Q-values corresponding to valid moves
            valid_q_values = q_values[0, valid_moves]  # Get the Q-values for valid moves (1D tensor)

            # Select the valid move with the highest Q-value
            best_move_idx = torch.argmax(valid_q_values).item()

            # Map the best move back to the original index of the valid move
            return valid_moves[best_move_idx]

    def store_experience(self, state, action, reward, next_state, done):
        """Store the experience if it's not already in the encountered set."""
        state_action_pair = tuple(state.flatten()) + (action,)  # State-action pair
        if state_action_pair not in self.encountered_states:
            self.replay_buffer.push((state, action, reward, next_state, done))
            self.encountered_states.add(state_action_pair)  # Mark this pair as encountered

    def update_target_network(self):
        """Update target network with model's weights."""
        self.target_model.load_state_dict(self.model.state_dict())

    def train(self):
        """Train the DQN model using a batch from the replay buffer."""
        if len(self.replay_buffer) < self.batch_size:
            return  # Not enough data to train

        # Sample a batch of experiences
        batch = self.replay_buffer.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states_tensor = torch.FloatTensor(states)
        actions_tensor = torch.LongTensor(actions)
        rewards_tensor = torch.FloatTensor(rewards)
        next_states_tensor = torch.FloatTensor(next_states)
        dones_tensor = torch.BoolTensor(dones)

        # Q-values for current states
        q_values = self.model(states_tensor)
        q_values = q_values.gather(1, actions_tensor.unsqueeze(1)).squeeze(1)

        # Q-values for next states
        next_q_values = self.target_model(next_states_tensor)
        next_q_values_max = next_q_values.max(1)[0]  # Max Q-value for each next state

        # Compute the target Q-value
        target_q_values = rewards_tensor + (self.gamma * next_q_values_max * ~dones_tensor)

        # Compute loss
        loss = self.loss_fn(q_values, target_q_values)
        
        # Backpropagate and optimize
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


## Training the DQN Agent
Now we’ll set up the training loop. In each episode:

The agent interacts with the environment.
The agent selects actions using its policy (epsilon-greedy).
The agent stores experiences in the replay buffer.
The agent trains the DQN model by sampling from the replay buffer.

In [18]:
def train_dqn_agent(episodes=10000):
    env = TicTacToe()
    agent = DQNAgent(input_dim=9, output_dim=9)  # 9 possible moves for a 3x3 Tic-Tac-Toe board

    for episode in range(episodes):
        state = env.reset()
        done = False

        while not done:
            action = agent.select_action(state)
            row, col = divmod(action, 3)
            valid_move = env.make_move(row, col, env.current_player)  # Pass the current player

            if valid_move:
                winner = env.check_winner()
                if winner is not None:
                    reward = 10 if winner == 1 else -10 if winner == -1 else 0
                    done = True
                else:
                    reward = 0
                next_state = env.board.flatten()

                agent.store_experience(state, action, reward, next_state, done)
                state = next_state

                agent.train()  # Train the model

        # Update target network every 10 episodes
        if episode % 10 == 0:
            agent.update_target_network()

        print(f"Episode {episode + 1} completed.")
    # After training in train_dqn_agent
    torch.save(agent.model.state_dict(), 'tictactoe_dqn_model.pth')
train_dqn_agent()

Episode 1 completed.
Episode 2 completed.
Episode 3 completed.
Episode 4 completed.
Episode 5 completed.
Episode 6 completed.
Episode 7 completed.
Episode 8 completed.
Episode 9 completed.
Episode 10 completed.
Episode 11 completed.
Episode 12 completed.
Episode 13 completed.
Episode 14 completed.
Episode 15 completed.
Episode 16 completed.
Episode 17 completed.
Episode 18 completed.
Episode 19 completed.
Episode 20 completed.
Episode 21 completed.
Episode 22 completed.
Episode 23 completed.
Episode 24 completed.
Episode 25 completed.
Episode 26 completed.
Episode 27 completed.
Episode 28 completed.
Episode 29 completed.
Episode 30 completed.
Episode 31 completed.
Episode 32 completed.
Episode 33 completed.
Episode 34 completed.
Episode 35 completed.
Episode 36 completed.
Episode 37 completed.
Episode 38 completed.
Episode 39 completed.
Episode 40 completed.
Episode 41 completed.
Episode 42 completed.
Episode 43 completed.
Episode 44 completed.
Episode 45 completed.
Episode 46 complete

In [10]:
# Before starting the game loop
# Load the trained model (if it exists)
def load_model(agent, filename="tictactoe_dqn_model.pth"):
    try:
        agent.model.load_state_dict(torch.load(filename))
        agent.model.eval()  # Set the model to evaluation mode
        print(f"Model loaded from {filename}")
    except FileNotFoundError:
        print(f"No model found at {filename}. Starting fresh.")


In [16]:
# Main Game Loop
game = TicTacToe()
root = Node(state=game.clone(), player=1)

use_mcts = False  # Set to False to use DQN
agent = DQNAgent(input_dim=9, output_dim=9)  # 9 possible moves for a 3x3 Tic-Tac-Toe board
load_model(agent)  # Load the trained model (if exists)

while True:
    game.display_board()

    # Check if the game is over
    winner = game.check_winner()
    if winner is not None:
        if winner == 0:
            print("It's a draw!")
        else:
            print(f"Player {'X' if winner == 1 else 'O'} wins!")
        # Save the model after the game ends
        torch.save(agent.model.state_dict(), 'tictactoe_dqn_model.pth')  # Save the model here
        break  # Exit the game loop

    if game.current_player == 1:  # Player's turn
        print("Your turn (Player X). Enter row and column (0-2):")
        row, col = map(int, input().split())
        if game.make_move(row, col, 1):
            root = Node(state=game.clone(), player=-1)  # Update root for AI
            game.current_player = -1  # Switch to AI
        else:
            print("Invalid move. Try again.")
    else:  # AI's turn
        print("AI's turn (Player O).")

        # Use DQN to determine the best move
        state = game.board.flatten()  # Flatten the board state to 1D for DQN input
        action = agent.select_action(state)  # Get the best move from the DQN agent

        # Convert the action from the flattened index to row and column
        row, col = divmod(action, 3)

        # Apply the move to the game board
        valid_move = game.make_move(row, col, -1)  # AI is Player -1

        if valid_move:
            # Check if the game is over (win, loss, or draw)
            winner = game.check_winner()
            done = winner is not None  # End the game if there's a winner or draw
            reward = 1 if winner == -1 else -1 if winner == 1 else 0

            # Store the experience and train the DQN model
            next_state = game.board.flatten()
            agent.store_experience(state, action, reward, next_state, done)
            agent.train()  # Train the model with the stored experience

        game.current_player = 1  # Switch back to player


  agent.model.load_state_dict(torch.load(filename))


Model loaded from tictactoe_dqn_model.pth
  |   |  
---------
  |   |  
---------
  |   |  
---------
Your turn (Player X). Enter row and column (0-2):


 1 1


  |   |  
---------
  | X |  
---------
  |   |  
---------
AI's turn (Player O).
  |   |  
---------
O | X |  
---------
  |   |  
---------
Your turn (Player X). Enter row and column (0-2):


 0 0


X |   |  
---------
O | X |  
---------
  |   |  
---------
AI's turn (Player O).
X |   |  
---------
O | X | O
---------
  |   |  
---------
Your turn (Player X). Enter row and column (0-2):


KeyboardInterrupt: Interrupted by user