In [5]:
import numpy as np
import tensorflow as tf
from keras import layers, models
from tqdm import trange
from GymEnv import ChineseCheckersBoard

# === Hyperparameters ===
NUM_PLAYERS = 6
BOARD_SHAPE = (29, 19)
INPUT_DIM = BOARD_SHAPE[0] * BOARD_SHAPE[1] + 2  # state + move
EPISODES = 1000
LEARNING_RATE = 1e-4
GAMMA = 0.99
EPSILON = 0.2  # Exploration rate
TARGET_UPDATE_FREQ = 10

# === Q-Network ===
def create_q_model():
    model = models.Sequential([
        layers.Input(shape=(INPUT_DIM,)),
        layers.Dense(128, activation='relu'),
        layers.Dense(64, activation='relu'),
        layers.Dense(1)  # Q-value
    ])
    return model

q_model = create_q_model()
target_model = create_q_model()
target_model.set_weights(q_model.get_weights())
optimizer = tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE)
loss_fn = tf.keras.losses.MeanSquaredError()

# === Epsilon-greedy move selection ===
def select_move(obs, legal_moves, model, epsilon):
    obs = np.array(obs, dtype=np.float32).flatten()
    if np.random.rand() < epsilon:
        move = legal_moves[np.random.choice(len(legal_moves))]
        return move, np.concatenate([obs, move])
    else:
        obs_batch = np.repeat(obs[None, :], len(legal_moves), axis=0)
        move_batch = np.array(legal_moves, dtype=np.float32)
        input_batch = np.concatenate([obs_batch, move_batch], axis=1)
        q_values = model(input_batch, training=False).numpy().squeeze()
        best_index = np.argmax(q_values)
        return legal_moves[best_index], input_batch[best_index]

# === Training Loop ===
for episode in trange(EPISODES, desc="Q-Learning Training"):
    env = ChineseCheckersBoard(NUM_PLAYERS)
    board, _ = env.reset()

    episode_memory = []  # [(s, a, r, s', done)]
    done = False

    while not done:
        player = env.current_player
        state = board["obs"].flatten().astype(np.float32)
        legal_moves = board["action_mask"]
        if not legal_moves:
            break

        move, input_vector = select_move(state, legal_moves, q_model, EPSILON)

        next_board, _, done, truncated, _ = env.step(move)
        reward = -1.0
        next_state = next_board["obs"].flatten().astype(np.float32)
        next_legal = next_board["action_mask"]
        # done = env.isGameOver(env.GlobalBoard, env.current_player) is not None

        if done:
            print("Done???")
            reward = 10.0

        episode_memory.append((input_vector, reward, next_state, next_legal, done))
        board = next_board

    # === Q-learning updates ===
    for input_vector, reward, next_state, next_legal, done in episode_memory:
        input_tensor = tf.convert_to_tensor(input_vector[None, :], dtype=tf.float32)

        if done or not next_legal:
            target_q = reward
        else:
            next_obs_batch = np.repeat(next_state[None, :], len(next_legal), axis=0)
            next_moves = np.array(next_legal, dtype=np.float32)
            next_input_batch = np.concatenate([next_obs_batch, next_moves], axis=1)
            target_q = reward + GAMMA * np.max(target_model(next_input_batch, training=False).numpy())

        with tf.GradientTape() as tape:
            current_q = tf.squeeze(q_model(input_tensor, training=True), axis=0)
            loss = loss_fn([target_q], [current_q])

        grads = tape.gradient(loss, q_model.trainable_variables)
        optimizer.apply_gradients(zip(grads, q_model.trainable_variables))

    # Update target model
    if episode % TARGET_UPDATE_FREQ == 0:
        target_model.set_weights(q_model.get_weights())

    if episode % 10 == 0:
        print(f"Episode {episode} | Loss: {loss.numpy():.4f}")


Q-Learning Training:   0%|          | 0/1000 [00:00<?, ?it/s]

KeyboardInterrupt: 