In [1]:
from envs import TicTacToeTrainingEnv
import warnings
warnings.filterwarnings("ignore")
from copy import deepcopy
from sb3_contrib.ppo_mask import MaskablePPO
from utils.terminal_colors import *
from utils.json_utils import save_opponent_stats, load_opponent_stats
from utils.models_utils import should_save_model, get_models, get_last_model_number
from utils.evaluator import evaluate_model_by_opponent
from training.advanced_training.config import *
from sb3_contrib.common.wrappers import ActionMasker
from test.action_mask_ import mask_fn

<h1 style="color:#0b9ed8">TRAINING</h1>

In [2]:
def create_env(opponent_pool):
    """Create and wrap the training environment once per training session."""
    env_init = TicTacToeTrainingEnv(
        board_length=TRAINING_DEFAULT_BOARD_LENGTH,
        pattern_victory_length=TRAINING_DEFAULT_PATTERN_VICTORY_LENGTH,
        opponent_pool=opponent_pool,
        first_play_rate=TRAINING_DEFAULT_FIRST_PLAY_RATE,
        lost_games_path=None,
        review_ratio=TRAINING_DEFAULT_REVIEW_RATIO,
        opponent_statistics_file=STATS_PATH,
    )
    env = ActionMasker(env_init, mask_fn)
    env.reset()
    return env

In [3]:
def initialize_model(env, last_model_num, ent_coef, n_steps, batch_size, learning_rate):
    """Create or load a model depending on whether it is the first model or not."""
    if last_model_num == 0:
        model = MaskablePPO(
            "MultiInputPolicy",
            env=env,
            verbose=1,
            gamma=GAMMA,
            gae_lambda=GAE_LAMBDA,
            ent_coef=ent_coef,
            n_steps=n_steps,
            batch_size=batch_size,
            learning_rate=learning_rate,
            policy_kwargs=policy_kwargs
        )
    else:
        prev_model_path = get_models(MODELS_DIR)[-1]
        model = MaskablePPO.load(prev_model_path, env=env)
        model.ent_coef = ent_coef
        model.n_steps = n_steps
        model.batch_size = batch_size
        model.learning_rate = learning_rate
    return model


In [4]:
def train_one_model():
    last_model_num = get_last_model_number(MODELS_DIR)
    next_model_num = last_model_num + 1
    model_name = f"{BASE_MODELS_NAME}_{next_model_num}.zip"
    model_path = os.path.join(MODELS_DIR, model_name)

    opponent_models = get_models(MODELS_DIR)
    opponent_pool = ["random", "smart_random"] + opponent_models

    improvement = False
    best_model = None
    best_stats = load_opponent_stats(opponent_pool)
    n_checks = TOTAL_STEPS // CHECKPOINT_INTERVAL

    # Create environment once
    env = create_env(opponent_pool)

    for check in range(n_checks):
        current_progress = (check * CHECKPOINT_INTERVAL) / TOTAL_STEPS

        # Dynamic training parameters
        n_steps = int(2048 + (4096 - 2048) * current_progress**0.8)
        batch_size = int(512 + (2048 - 512) * current_progress**1.0)
        ent_coef = max(0.001, 0.015 * (1 - current_progress**0.6))
        learning_rate = LR_SCHEDULE(current_progress)

        # Initialize or load model only on first checkpoint
        if check == 0:
            model = initialize_model(env, last_model_num, ent_coef, n_steps, batch_size, learning_rate)

        print(f"\n{YELLOW}=== Training segment {check+1}/{n_checks} ===")
        print(f"Steps: {check*CHECKPOINT_INTERVAL}-{(check+1)*CHECKPOINT_INTERVAL}")
        print(f"Params: n_steps={n_steps}, batch={batch_size}, ent_coef={ent_coef:.4f}")
        print(f"Opponents: {opponent_pool}{RESET}\n")

        # Train the model
        model.learn(total_timesteps=CHECKPOINT_INTERVAL)

        # Evaluate against opponents
        results = evaluate_model_by_opponent(model, opponent_pool, n_episodes=200)
        current_stats = {k: {"defeat_rate": v["defeat_rate"], "victory_rate": v["victory_rate"]} for k, v in results.items()}

        # Save if improvement
        if should_save_model(current_stats, best_stats, IMPROVEMENT_THRESHOLD):
            print(f"{GREEN}Saved new best model at checkpoint {check}{RESET}")
            print(f"{RED}Old best stats -> {best_stats}{RESET}")
            print(f"{YELLOW}New best stats -> {current_stats}{RESET}")

            improvement = True
            best_model = model
            best_stats = deepcopy(current_stats)
            model.save(model_path)
            save_opponent_stats(best_stats, STATS_PATH)

    return improvement


In [5]:
def main_training_loop(nb_models_to_train=2):
    trained_count = 0
    while trained_count < nb_models_to_train:
        improvement = train_one_model()
        if improvement:
            print(f"{GREEN}Training completed for model {trained_count+1}. Best model saved.{RESET}")
            trained_count += 1
        else:
            print(f"{RED}Warning: No model met improvement criteria{RESET}")
            # Decide if you want to break or continue anyway
            trained_count += 1  # or break?

In [None]:
# Run the training loop
main_training_loop(2)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.

[33m=== Training segment 1/10 ===
Steps: 0-10000
Params: n_steps=2048, batch=512, ent_coef=0.0150
Opponents: ['random', 'smart_random'][0m

---------------------------------
| rollout/           |          |
|    ep_len_mean     | 3.56     |
|    ep_rew_mean     | -0.384   |
| time/              |          |
|    fps             | 93       |
|    iterations      | 1        |
|    time_elapsed    | 21       |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 3.56        |
|    ep_rew_mean          | -0.35       |
| time/                   |             |
|    fps                  | 91          |
|    iterations           | 2           |
|    time_elapsed         | 44          |
|    total_timesteps      | 4096        |
| train/                  |             |
|  