<a href="https://colab.research.google.com/github/lunathanael/Auto-Chess-main/blob/master/policy_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [73]:
import collections
import math
import typing
from typing import Dict, List, Optional
import numpy
import tensorflow as tf
from typing import List
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, ReLU, Add, Dense
from tensorflow.keras.models import Model

In [74]:
MAXIMUM_FLOAT_VALUE = float('inf')

KnownBounds = collections.namedtuple('KnownBounds', ['min', 'max'])

In [75]:
class MinMaxStats(object):
  """A class that holds the min-max values of the tree."""

  def __init__(self, known_bounds: Optional[KnownBounds]):
    self.maximum = known_bounds.max if known_bounds else -MAXIMUM_FLOAT_VALUE
    self.minimum = known_bounds.min if known_bounds else MAXIMUM_FLOAT_VALUE

  def update(self, value: float):
    self.maximum = max(self.maximum, value)
    self.minimum = min(self.minimum, value)

  def normalize(self, value: float) -> float:
    if self.maximum > self.minimum:
      # We normalize only when we have set the maximum and minimum values.
      return (value - self.minimum) / (self.maximum - self.minimum)
    return value

In [61]:
def residual_block(x, filters):
    """Create a residual block."""
    y = Conv2D(filters, kernel_size=3, padding='same')(x)
    y = BatchNormalization()(y)
    y = ReLU()(y)
    y = Conv2D(filters, kernel_size=3, padding='same')(y)
    y = BatchNormalization()(y)
    y = Add()([y, x])
    y = ReLU()(y)
    return y

def make_network():
  # Input layer
  input_layer = Input(shape=(8, 8, 73))

  # Body
  x = Conv2D(256, kernel_size=3, padding='same')(input_layer)
  x = BatchNormalization()(x)
  x = ReLU()(x)
  for _ in range(19):
      x = residual_block(x, 256)

  # Policy Head
  policy_head = Conv2D(256, kernel_size=3, padding='same')(x)
  policy_head = BatchNormalization()(policy_head)
  policy_head = ReLU()(policy_head)
  policy_head = Conv2D(73, kernel_size=1)(policy_head)

  # Value Head
  value_head = Conv2D(1, kernel_size=1)(x)
  value_head = BatchNormalization()(value_head)
  value_head = ReLU()(value_head)
  value_head = Dense(256, activation='relu')(value_head)
  value_head = Dense(1, activation='tanh')(value_head)

  # Create the model
  model = tf.keras.Model(inputs=input_layer, outputs=[policy_head, value_head])
  return model

def ConstantPolicyHead(shape):
    # Custom layer for policy head
    class CPHead(tf.keras.layers.Layer):
        def call(self, inputs):
            return tf.constant(1/73, shape=shape)

    return CPHead()

def ConstantValueHead():
    # Custom layer for value head
    class CVHead(tf.keras.layers.Layer):
        def call(self, inputs):
            return tf.constant(0.5, shape=(1,))

    return CVHead()

def make_uniform_network():
    # Input layer
    input_layer = Input(shape=(8, 8, 73))

    # Body
    x = Conv2D(256, kernel_size=3, padding='same')(input_layer)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    for _ in range(19):
        x = residual_block(x, 256)

    # Modified Policy Head for uniform output
    policy_head = ConstantPolicyHead((8, 8, 73))(x)

    # Modified Value Head for constant output
    value_head = ConstantValueHead()(x)

    # Create the model with constant outputs
    model = Model(inputs=input_layer, outputs=[policy_head, value_head])
    return model


In [None]:
def make_board_game_config(action_space_size: int, max_moves: int,
                           dirichlet_alpha: float,
                           lr_init: float) -> Config:

  def visit_softmax_temperature(num_moves, training_steps):
    if num_moves < 30:
      return 1.0
    else:
      return 0.0  # Play according to the max.

  return Config(
      action_space_size=action_space_size,
      max_moves=max_moves,
      discount=1.0,
      dirichlet_alpha=dirichlet_alpha,
      num_simulations=800,
      batch_size=2048,
      td_steps=max_moves,  # Always use Monte Carlo return.
      num_actors=3000,
      lr_init=lr_init,
      lr_decay_steps=400e3,
      visit_softmax_temperature_fn=visit_softmax_temperature,
      known_bounds=KnownBounds(-1, 1))

In [None]:
def make_chess_config() -> MuZeroConfig:
  return make_board_game_config(
      action_space_size=4672, max_moves=512, dirichlet_alpha=0.3, lr_init=0.1)

Helpers

In [33]:
class Config(object):


  def __init__(self,
               action_space_size: int,
               max_moves: int,
               discount: float,
               dirichlet_alpha: float,
               num_simulations: int,
               batch_size: int,
               td_steps: int,
               num_actors: int,
               lr_init: float,
               lr_decay_steps: float,
               visit_softmax_temperature_fn,
               known_bounds: Optional[KnownBounds] = None):
    ### Self-Play
    self.action_space_size = action_space_size
    self.num_actors = num_actors

    self.visit_softmax_temperature_fn = visit_softmax_temperature_fn
    self.max_moves = max_moves
    self.num_simulations = num_simulations
    self.discount = discount

    # Root prior exploration noise.
    self.root_dirichlet_alpha = dirichlet_alpha
    self.root_exploration_fraction = 0.25

    # UCB formula
    self.pb_c_base = 19652
    self.pb_c_init = 1.25

    # If we already have some information about which values occur in the
    # environment, we can use them to initialize the rescaling.
    # This is not strictly necessary, but establishes identical behaviour to
    # AlphaZero in board games.
    self.known_bounds = known_bounds

    ### Training
    self.training_steps = int(1000e3)
    self.checkpoint_interval = int(1e3)
    self.window_size = int(1e6)
    self.batch_size = batch_size
    self.num_unroll_steps = 5
    self.td_steps = td_steps

    self.weight_decay = 1e-4
    self.momentum = 0.9

    # Exponential learning rate schedule
    self.lr_init = lr_init
    self.lr_decay_rate = 0.1
    self.lr_decay_steps = lr_decay_steps

  def new_game(self):
    return Game(self.action_space_size, self.discount

In [32]:
class Node(object):

  def __init__(self, prior: float):
    self.visit_count = 0
    self.to_play = -1
    self.prior = prior
    self.value_sum = 0
    self.children = {}

  def expanded(self):
    return len(self.children) > 0

  def value(self):
    if self.visit_count == 0:
      return 0
    return self.value_sum / self.visit_count


In [31]:
class Game(object):

  def __init__(self, history=None):
    self.history = history or []
    self.child_visits = []
    self.num_actions = 4672  # action space size for chess; 11259 for shogi, 362 for Go

  def terminal(self): #TODO
    # Game specific termination rules.
    pass

  def terminal_value(self, to_play): #TODO
    # Game specific value.
    pass

  def legal_actions(self): #TODO
    # Game specific calculation of legal actions.
    return []

  def clone(self):
    return Game(list(self.history))

  def apply(self, action):
    self.history.append(action)

  def store_search_statistics(self, root):
    sum_visits = sum(child.visit_count for child in root.children.itervalues())
    self.child_visits.append([
        root.children[a].visit_count / sum_visits if a in root.children else 0
        for a in range(self.num_actions)
    ])

  def make_image(self, state_index: int): #TODO
    # Game specific feature planes.
    return []

  def make_target(self, state_index: int):
    return (self.terminal_value(state_index % 2),
            self.child_visits[state_index])

  def to_play(self):
    return len(self.history) % 2

In [30]:
class ReplayBuffer(object):

  def __init__(self, config: AlphaZeroConfig):
    self.window_size = config.window_size
    self.batch_size = config.batch_size
    self.buffer = []

  def save_game(self, game):
    if len(self.buffer) > self.window_size:
      self.buffer.pop(0)
    self.buffer.append(game)

  def sample_batch(self):
    # Sample uniformly across positions.
    move_sum = float(sum(len(g.history) for g in self.buffer))
    games = numpy.random.choice(
        self.buffer,
        size=self.batch_size,
        p=[len(g.history) / move_sum for g in self.buffer])
    game_pos = [(g, numpy.random.randint(len(g.history))) for g in games]
    return [(g.make_image(i), g.make_target(i)) for (g, i) in game_pos]

In [29]:
class Network(object):

  def __init__(self, uniform_model: bool=False):
    if (uniform_model):
      self.model = make_uniform_network()
    else:
      self.model= make_network()

  def inference(self, image):
      # Run the neural network model to get predictions
      value, policy_logits = self.model.predict(np.array([image]))

      value = value[0][0] # The value output is a scalar representing the predicted game outcome

      # The policy_logits are the raw outputs of the network representing move probabilities
      policy = tf.nn.softmax(policy_logits).numpy().tolist()[0]

      return value, policy


  def get_weights(self): #TODO
    # Returns the weights of this network.
    return self.model.get_weights()

In [67]:
class SharedStorage(object):

  def __init__(self):
    self._networks = {}

  def latest_network(self) -> Network:
    if self._networks:
      return self._networks[max(self._networks.iterkeys())]
    else:
      return Network(True)  # policy -> uniform, value -> 0.5

  def save_network(self, step: int, network: Network):
    self._networks[step] = network


In [None]:
# AlphaZero training is split into two independent parts: Network training and
# self-play data generation.
# These two parts only communicate by transferring the latest network checkpoint
# from the training to the self-play, and the finished games from the self-play
# to the training.
def alphazero(config: AlphaZeroConfig):
  storage = SharedStorage()
  replay_buffer = ReplayBuffer(config)

  for i in range(config.num_actors):
    launch_job(run_selfplay, config, storage, replay_buffer)

  train_network(config, storage, replay_buffer)

  return storage.latest_network()



In [69]:
##################################
####### Part 1: Self-Play ########


# Each self-play job is independent of all others; it takes the latest network
# snapshot, produces a game and makes it available to the training job by
# writing it to a shared replay buffer.
def run_selfplay(config: AlphaZeroConfig, storage: SharedStorage,
                 replay_buffer: ReplayBuffer):
  while True:
    network = storage.latest_network()
    game = play_game(config, network)
    replay_buffer.save_game(game)


# Each game is produced by starting at the initial board position, then
# repeatedly executing a Monte Carlo Tree Search to generate moves until the end
# of the game is reached.
def play_game(config: AlphaZeroConfig, network: Network):
  game = Game()
  while not game.terminal() and len(game.history) < config.max_moves:
    action, root = run_mcts(config, game, network)
    game.apply(action)
    game.store_search_statistics(root)
  return game


# Core Monte Carlo Tree Search algorithm.
# To decide on an action, we run N simulations, always starting at the root of
# the search tree and traversing the tree according to the UCB formula until we
# reach a leaf node.
def run_mcts(config: AlphaZeroConfig, game: Game, network: Network):
  root = Node(0)
  evaluate(root, game, network)
  add_exploration_noise(config, root)

  for _ in range(config.num_simulations):
    node = root
    scratch_game = game.clone()
    search_path = [node]

    while node.expanded():
      action, node = select_child(config, node)
      scratch_game.apply(action)
      search_path.append(node)

    value = evaluate(node, scratch_game, network)
    backpropagate(search_path, value, scratch_game.to_play())
  return select_action(config, game, root), root



def softmax_sample(visit_counts, temperature=10.0):
    counts, actions = zip(*visit_counts)
    # Apply softmax with temperature
    counts = np.array(counts)
    counts = counts / temperature  # Apply temperature scaling
    softmax_probs = np.exp(counts) / sum(np.exp(counts))
    # Sample an action based on the softmax probabilities
    action = np.random.choice(actions, p=softmax_probs)
    return softmax_probs, action


def select_action(config: AlphaZeroConfig, game: Game, root: Node):
  visit_counts = [(child.visit_count, action)
                  for action, child in root.children.iteritems()]
  if len(game.history) < config.num_sampling_moves:
    _, action = softmax_sample(visit_counts)
  else:
    _, action = max(visit_counts)
  return action


# Select the child with the highest UCB score.
def select_child(config: AlphaZeroConfig, node: Node):
  _, action, child = max((ucb_score(config, node, child), action, child)
                         for action, child in node.children.iteritems())
  return action, child


# The score for a node is based on its value, plus an exploration bonus based on
# the prior.
def ucb_score(config: AlphaZeroConfig, parent: Node, child: Node):
  pb_c = math.log((parent.visit_count + config.pb_c_base + 1) /
                  config.pb_c_base) + config.pb_c_init
  pb_c *= math.sqrt(parent.visit_count) / (child.visit_count + 1)

  prior_score = pb_c * child.prior
  value_score = child.value()
  return prior_score + value_score


# We use the neural network to obtain a value and policy prediction.
def evaluate(node: Node, game: Game, network: Network):
  value, policy_logits = network.inference(game.make_image(-1))

  # Expand the node.
  node.to_play = game.to_play()
  policy = {a: math.exp(policy_logits[a]) for a in game.legal_actions()}
  policy_sum = sum(policy.itervalues())
  for action, p in policy.iteritems():
    node.children[action] = Node(p / policy_sum)
  return value


# At the end of a simulation, we propagate the evaluation all the way up the
# tree to the root.
def backpropagate(search_path: List[Node], value: float, to_play):
  for node in search_path:
    node.value_sum += value if node.to_play == to_play else (1 - value)
    node.visit_count += 1


# At the start of each search, we add dirichlet noise to the prior of the root
# to encourage the search to explore new actions.
def add_exploration_noise(config: AlphaZeroConfig, node: Node):
  actions = node.children.keys()
  noise = numpy.random.gamma(config.root_dirichlet_alpha, 1, len(actions))
  frac = config.root_exploration_fraction
  for a, n in zip(actions, noise):
    node.children[a].prior = node.children[a].prior * (1 - frac) + n * frac


######### End Self-Play ##########
##################################

##################################
####### Part 2: Training #########


def train_network(config: AlphaZeroConfig, storage: SharedStorage,
                  replay_buffer: ReplayBuffer):
  network = Network()
  optimizer = tf.keras.optimizers.SGD(config.learning_rate_schedule,
                                         config.momentum)
  for i in range(config.training_steps):
    if i % config.checkpoint_interval == 0:
      storage.save_network(i, network)
    batch = replay_buffer.sample_batch()
    update_weights(optimizer, network, batch, config.weight_decay)
  storage.save_network(config.training_steps, network)

def update_weights(optimizer: tf.keras.optimizers, network: Network, batch,
                   weight_decay: float):
  loss = 0
  for image, (target_value, target_policy) in batch:
    value, policy_logits = network.inference(image)
    loss += (
        tf.losses.mean_squared_error(value, target_value) +
        tf.nn.softmax_cross_entropy_with_logits(
            logits=policy_logits, labels=target_policy))

  for weights in network.get_weights():
    loss += weight_decay * tf.nn.l2_loss(weights)

  optimizer.minimize(loss)


######### End Training ###########
##################################


def launch_job(f, *args):
  f(*args)




In [None]:
import numpy as np

def fen_to_board_array(fen):
    # Split the FEN string to get the relevant parts
    parts = fen.split(' ')
    board_fen, player, castling, _, halfmove, fullmove = parts[:6]

    # Initialize an empty board with additional planes for L constant-valued inputs
    board = np.zeros((12 + 5, 8, 8), dtype=float)  # 12 for pieces, 5 for additional planes

    # Define piece order and mapping to layers
    piece_map = {'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5,
                 'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11}

    # Fill the board with pieces
    row = 0
    col = 0
    for char in board_fen:
        if char.isdigit():
            col += int(char)
        elif char == '/':
            row += 1
            col = 0
        else:
            board[piece_map[char], row, col] = 1
            col += 1

    # Add extra layers for player color, move number, etc.
    # Player color (1 for black, 0 for white)
    board[12, :, :] = 1 if player == 'b' else 0

    # Castling rights encoded in four binary planes
    board[13, :, :] = 1 if 'K' in castling else 0  # White kingside
    board[14, :, :] = 1 if 'Q' in castling else 0  # White queenside
    board[15, :, :] = 1 if 'k' in castling else 0  # Black kingside
    board[16, :, :] = 1 if 'q' in castling else 0  # Black queenside

    # Move number (as a real value)
    board[17, :, :] = float(fullmove)

    return board

# Example usage
fen = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1"
board_array = fen_to_board_array(fen)


In [71]:
config = AlphaZeroConfig()
network_1 = alphazero(config)

0


ValueError: in user code:

    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 2440, in predict_function  *
        return step_function(self, iterator)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 2425, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 2413, in run_step  **
        outputs = model.predict_step(data)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/training.py", line 2381, in predict_step
        return self(x, training=False)
    File "/usr/local/lib/python3.10/dist-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/usr/local/lib/python3.10/dist-packages/keras/src/engine/input_spec.py", line 298, in assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer "model_7" is incompatible with the layer: expected shape=(None, 8, 8, 73), found shape=(None, 0)
