In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import random
import re
from collections import Counter
import json
import torch
import torch.nn as nn
from sb3_contrib import RecurrentPPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import BaseCallback, CallbackList
from stable_baselines3.common.vec_env import VecMonitor

# =======================
# Curriculum Management
# =======================

class Curriculum:
    """
    Manages the curriculum phases for training the Hangman agent with state persistence.
    """
    def __init__(self, state_file='curriculum_state.json'):
        self.current_phase = 1
        self.phases = {
            1: {'word_length_range': (3, 6), 'max_attempts': 12},
            2: {'word_length_range': (6, 8), 'max_attempts': 10},
            3: {'word_length_range': (8, 10), 'max_attempts': 8},
            4: {'word_length_range': (10, 12), 'max_attempts': 7},
            5: {'word_length_range': (12, 15), 'max_attempts': 6},
            6: {'word_length_range': (15, 20), 'max_attempts': 6},
        }
        self.state_file = state_file
        self.load_state()

    def get_current_config(self):
        return self.phases[self.current_phase]

    def advance_phase(self):
        if self.current_phase < len(self.phases):
            self.current_phase += 1
            print(f"Advancing to Phase {self.current_phase}")
            self.save_state()
        else:
            if self.current_phase == len(self.phases):
                print("Already in the final curriculum phase.")

    def regress_phase(self):
        if self.current_phase > 1:
            self.current_phase -= 1
            print(f"Regressing to Phase {self.current_phase}")
            self.save_state()
        else:
            print("Already in the first curriculum phase.")

    def save_state(self):
        state = {'current_phase': self.current_phase}
        with open(self.state_file, 'w') as f:
            json.dump(state, f)
        print(f"Curriculum state saved to {self.state_file}")

    def load_state(self):
        try:
            with open(self.state_file, 'r') as f:
                state = json.load(f)
                self.current_phase = state.get('current_phase', 1)
                print(f"Curriculum state loaded: Phase {self.current_phase}")
        except FileNotFoundError:
            print("No existing curriculum state found. Starting from Phase 1.")

# =======================
# Optimized Hangman Env
# =======================

class HangmanEnv(gym.Env):
    """
    Optimized Hangman Environment for Reinforcement Learning with Curriculum Learning.
    """
    metadata = {'render.modes': ['human']}

    def __init__(self, word_list, curriculum, max_word_length=20):
        super(HangmanEnv, self).__init__()

        self.word_list = word_list
        self.max_word_length = max_word_length

        # Action space: 26 letters of the alphabet
        self.action_space = spaces.Discrete(26)

        # Observation space:
        # - Revealed word: max_word_length (integer encoding)
        # - Guessed letters: 26 (binary vector)
        # - Remaining attempts: 1
        # - Word length: 1
        # - Letter frequencies: 26
        # - Last action: 26 (one-hot vector)
        # - Unique letters remaining: 1
        # - Letter probabilities: 26
        obs_size = (
            self.max_word_length +  # Revealed word
            26 +                     # Guessed letters
            1 +                      # Remaining attempts
            1 +                      # Word length
            26 +                     # Letter frequencies
            26 +                     # Last action
            1 +                      # Unique letters remaining
            26                       # Letter probabilities
        )

        self.observation_space = spaces.Box(
            low=-1.0,   # -1 represents unknown letters
            high=25.0,  # 'z' is represented by 25
            shape=(obs_size,),
            dtype=np.float32
        )

        # Initialize state variables
        self.current_word = None
        self.guessed_letters = None
        self.remaining_attempts = None
        self.word_length = None
        self.last_action = None
        self.unique_letters_remaining = None

        # Precompute letter frequencies
        self.letter_frequencies = self._compute_letter_frequencies()

        # Curriculum instance
        self.curriculum = curriculum

        # Initialize max_attempts
        self.max_attempts = None  # Will be set in reset()

        # Initialize incorrect guesses
        self.incorrect_guesses = set()

    def reset(self, word=None, *, seed=None, options=None):
        super().reset(seed=seed)

        # Get current curriculum configuration
        config = self.curriculum.get_current_config()
        min_len, max_len = config['word_length_range']
        current_max_attempts = config['max_attempts']

        # Assign max_attempts based on current phase
        self.max_attempts = current_max_attempts

        # Reset incorrect guesses
        self.incorrect_guesses = set()

        # Select a word
        if word is not None:
            self.current_word = word.lower()
            self.word_length = len(self.current_word)
            if self.word_length > self.max_word_length:
                raise ValueError(f"Word length ({self.word_length}) exceeds max_word_length ({self.max_word_length}).")
            if not re.match('^[a-z]+$', self.current_word):
                raise ValueError("The word must contain only lowercase letters a-z.")
        else:
            # Efficiently select a word within the current word length range
            eligible_words = [w for w in self.word_list if min_len <= len(w) <= max_len]
            if not eligible_words:
                raise ValueError("No words found within the current word length range.")
            self.current_word = random.choice(eligible_words).lower()
            self.word_length = len(self.current_word)

        # Initialize revealed word: -1 for unknown, 0-25 for revealed letters
        self.revealed_word = np.full((self.max_word_length, 27), -1, dtype=np.int32)
        for i in range(self.max_word_length):
            if i < self.word_length:
                self.revealed_word[i, 26] = 1  # Unknown token
            else:
                self.revealed_word[i, :] = -1  # Padding

        # Initialize guessed letters: 0 for not guessed, 1 for guessed
        self.guessed_letters = np.zeros(26, dtype=np.float32)

        # Reset remaining attempts based on curriculum
        self.remaining_attempts = current_max_attempts

        # Reset last action
        self.last_action = -1  # No action taken yet

        # Calculate unique letters remaining
        self.unique_letters_in_word = set(self.current_word)
        self.unique_letters_remaining = len(self.unique_letters_in_word)

        # Initialize letter probabilities
        self.letter_probabilities = self._compute_letter_probabilities()

        # Update state
        self._update_state()

        return self._get_observation(), {}

    def _compute_letter_frequencies(self):
        """
        Compute the frequency of each letter in the training word list.
        """
        all_letters = ''.join(self.word_list)
        letter_counts = Counter(all_letters)
        total_letters = sum(letter_counts.values())
        frequencies = np.zeros(26, dtype=np.float32)
        for i in range(26):
            letter = chr(i + ord('a'))
            frequencies[i] = letter_counts.get(letter, 0) / total_letters if total_letters > 0 else 0.0
        return frequencies

    def _compute_letter_probabilities(self):
        """
        Compute the probability of each letter being in the word given the current state.
        Optimized using precompiled regex and vectorized operations.
        """
        # Get current word length range from curriculum
        config = self.curriculum.get_current_config()
        min_len, max_len = config['word_length_range']

        # Create a regex pattern based on the revealed word
        pattern = ''.join(
            [f"[{chr(int(self.revealed_word[i, :26].argmax() + ord('a')))}]" if self.revealed_word[i, 26] == 0 else '.' 
             for i in range(self.word_length)]
        )
        regex = re.compile(f"^{pattern}$")

        # Filter possible words that match the pattern
        possible_words = [word for word in self.word_list if regex.match(word)]

        # Exclude words containing any incorrect guessed letters
        if self.incorrect_guesses:
            possible_words = [word for word in possible_words if not any(letter in word for letter in self.incorrect_guesses)]

        if not possible_words:
            # If no words match, assign uniform probabilities
            return np.ones(26, dtype=np.float32) / 26

        # Count the frequency of each letter in the possible words
        letter_counts = Counter(''.join(possible_words))
        total_letters = sum(letter_counts.values())
        probabilities = np.zeros(26, dtype=np.float32)
        for i in range(26):
            letter = chr(i + ord('a'))
            probabilities[i] = letter_counts.get(letter, 0) / total_letters if total_letters > 0 else 0.0
        return probabilities

    def _word_matches(self, word):
        """
        Check if the word matches the current revealed word and hasn't been eliminated.
        """
        if len(word) != self.word_length:
            return False
        for idx, char in enumerate(word):
            if self.revealed_word[idx, 26] == 0:
                # Letter is revealed; must match
                revealed_letter_index = np.argmax(self.revealed_word[idx, :26])
                if ord(char) - ord('a') != revealed_letter_index:
                    return False
            else:
                # Letter is hidden; must not have been guessed if it's not in the word
                if self.guessed_letters[ord(char) - ord('a')] == 1.0 and char not in self.current_word:
                    return False
        return True

    def step(self, action):
        done = False
        reward = 0

        # Validate action
        if not self.action_space.contains(action):
            raise ValueError(f"Invalid action: {action}")

        # Map action to corresponding letter
        letter = chr(action + ord('a'))

        # Update last action
        self.last_action = action

        # Compute letter probabilities based on current state
        self.letter_probabilities = self._compute_letter_probabilities()

        # Get the probability of the current action
        letter_prob = self.letter_probabilities[action]

        # Check if the letter has already been guessed
        if self.guessed_letters[action] == 1.0:
            # Invalid action selected
            reward -= 10  # Significant penalty for invalid action
            self.remaining_attempts -= 1  # Decrease remaining attempts

            # Check if the agent has run out of attempts
            if self.remaining_attempts <= 0:
                reward -= 20  # Penalty for losing the game
                done = True
        else:
            # Update guessed letters
            self.guessed_letters[action] = 1.0

            # Check if the guessed letter is in the word
            if letter in self.current_word:
                # Correct guess
                indices = [i for i, l in enumerate(self.current_word) if l == letter]
                new_letters_revealed = 0
                for idx in indices:
                    if self.revealed_word[idx, 26] == 1:
                        new_letters_revealed += 1
                        # Reveal the letter
                        self.revealed_word[idx, :] = -1  # Reset previous state
                        self.revealed_word[idx, action] = 0  # Encode revealed letter
                        self.revealed_word[idx, 26] = 0  # Reset unknown token
                # Reward for each new letter revealed
                reward += 5 * new_letters_revealed

                # Additional reward based on letter probability
                reward += 10 * letter_prob  # Heuristic-based reward

                # Update unique letters remaining
                self.unique_letters_in_word.discard(letter)
                self.unique_letters_remaining = len(self.unique_letters_in_word)

                # Check if the word is completely revealed
                if self.unique_letters_remaining == 0:
                    # Efficiency bonus: reward proportional to remaining attempts
                    efficiency_bonus = 10 * (self.remaining_attempts / self.max_attempts)
                    reward += 50 + efficiency_bonus  # Increased reward for winning
                    done = True
            else:
                # Incorrect guess
                self.remaining_attempts -= 1
                reward -= 2  # Increased penalty

                # Additional penalty based on letter probability
                reward -= 5 * (1 - letter_prob)  # Heuristic-based penalty

                # Track incorrect guess
                self.incorrect_guesses.add(letter)

                # Check if the agent has run out of attempts
                if self.remaining_attempts <= 0:
                    reward -= 20  # Increased penalty for losing
                    done = True

        # Update state
        self._update_state()

        # Prepare observation
        observation = self._get_observation()

        # In gymnasium, return (observation, reward, terminated, truncated, info)
        terminated = done
        truncated = False  # Assuming no truncation
        info = {}

        return observation, reward, terminated, truncated, info

    def _update_state(self):
        """
        Update normalized state variables and last action vector.
        """
        # Normalize remaining attempts
        self.normalized_attempts = self.remaining_attempts / self.max_attempts

        # Normalize word length
        self.normalized_word_length = self.word_length / self.max_word_length

        # Normalize unique letters remaining
        self.normalized_unique_letters_remaining = self.unique_letters_remaining / 26

        # One-hot encode the last action
        self.last_action_vec = np.zeros(26, dtype=np.float32)
        if self.last_action >= 0:
            self.last_action_vec[self.last_action] = 1.0

    def _get_observation(self):
        """
        Construct the observation vector.
        """
        # Revealed word as integer encoding (-1 for unknown, 0-25 for revealed letters)
        revealed_word_int = np.full(self.max_word_length, -1.0, dtype=np.float32)
        for i in range(self.word_length):
            if self.revealed_word[i, 26] == 0:
                # Letter is revealed
                letter_index = np.argmax(self.revealed_word[i, :26])
                revealed_word_int[i] = float(letter_index)
            else:
                # Letter is hidden
                revealed_word_int[i] = -1.0

        # Guessed letters as binary vector (0.0 or 1.0)
        guessed_letters_binary = self.guessed_letters.astype(np.float32)

        # Remaining attempts (normalized between 0 and 1)
        remaining_attempts = np.array([self.normalized_attempts], dtype=np.float32)

        # Word length (normalized between 0 and 1)
        word_length = np.array([self.normalized_word_length], dtype=np.float32)

        # Letter frequencies (float32)
        letter_frequencies = self.letter_frequencies.astype(np.float32)

        # Last action vector (one-hot encoded)
        last_action_vector = self.last_action_vec.astype(np.float32)

        # Unique letters remaining (normalized between 0 and 1)
        unique_letters_remaining = np.array([self.normalized_unique_letters_remaining], dtype=np.float32)

        # Letter probabilities (float32)
        letter_probabilities = self.letter_probabilities.astype(np.float32)

        # Concatenate all components into a single observation vector
        observation = np.concatenate([
            revealed_word_int,             # Revealed word
            guessed_letters_binary,       # Guessed letters
            remaining_attempts,           # Remaining attempts
            word_length,                  # Word length
            letter_frequencies,           # Letter frequencies
            last_action_vector,           # Last action
            unique_letters_remaining,     # Unique letters remaining
            letter_probabilities          # Letter probabilities
        ])

        return observation

    def render(self, mode='human'):
        """
        Render the current state of the game.
        """
        # Displayed word with underscores for unknown letters
        displayed_word = ''
        for i in range(self.word_length):
            if self.revealed_word[i, 26] == 1:
                displayed_word += '_'
            else:
                letter_index = np.argmax(self.revealed_word[i, :26])
                displayed_word += chr(letter_index + ord('a'))
        print(f"Word: {displayed_word}")

        # List of guessed letters
        guessed_letters_list = [chr(i + ord('a')) for i in range(26) if self.guessed_letters[i] == 1.0]
        print(f"Guessed Letters: {guessed_letters_list}")

        # Remaining attempts
        print(f"Remaining Attempts: {self.remaining_attempts}")

    def close(self):
        pass


2024-11-18 15:32:19.035748: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-11-18 15:32:19.057273: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-11-18 15:32:19.057297: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-11-18 15:32:19.058020: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-18 15:32:19.061881: I tensorflow/core/platform/cpu_feature_guar

# 1. Read word list and filter

In [2]:
import re
from nltk.corpus import words
import nltk

# Download the English words corpus
nltk.download("words")
english_words = set(words.words())

# Load your word list from the file
def load_word_list(file_path):
    with open(file_path, 'r') as file:
        words = file.read().splitlines()
    return [word.strip().lower() for word in words if word.strip()]  # Remove empty lines and whitespace

# Clean the word list
def clean_word_list(word_list, min_length=3, max_length=20):
    cleaned_words = []
    for word in word_list:
        # Check if the word meets all criteria
        if (
            min_length <= len(word) <= max_length  # Length criteria
            and word.isalpha()                   # Contains only alphabetic characters
            # and word in english_words            # Valid English word
            and len(set(word)) > 1               # Avoid repeated patterns like "aaa"
        ):
            cleaned_words.append(word)
    return cleaned_words

# File path to the word list
file_path = 'words_250000_train.txt'

# Load and clean the word list
raw_word_list = load_word_list(file_path)
cleaned_word_list = clean_word_list(raw_word_list)

# Save the cleaned word list to a new file
output_file_path = 'cleaned_word_list.txt'
with open(output_file_path, 'w') as file:
    file.write('\n'.join(cleaned_word_list))

print(f"Original word count: {len(raw_word_list)}")
print(f"Cleaned word count: {len(cleaned_word_list)}")

Original word count: 227300
Cleaned word count: 226836


[nltk_data] Downloading package words to
[nltk_data]     /home/sagarnildass/nltk_data...
[nltk_data]   Package words is already up-to-date!


In [3]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import re

# Load the word list from the text file
def load_word_list(file_path):
    with open(file_path, 'r') as file:
        words = file.read().splitlines()
    # Filter out empty lines and strip whitespace
    words = [word.strip() for word in words if word.strip()]
    return words

# Specify the path to your text file
file_path = 'cleaned_word_list.txt'
word_list = load_word_list(file_path)

# Clean the word list to include only lowercase letters
cleaned_word_list = [word.lower() for word in word_list if re.match('^[a-z]+$', word.lower())]

# Use the cleaned word list
word_list = cleaned_word_list

# Set the maximum word length
max_word_length = 20  # Adjust as needed

# Filter out words longer than `max_word_length`
filtered_word_list = [word for word in word_list if len(word) <= max_word_length]

# Check if the filtered list is not empty
if not filtered_word_list:
    raise ValueError("No words found with length less than or equal to max_word_length.")

# Use the filtered word list
word_list = filtered_word_list

In [4]:
# from sklearn.model_selection import train_test_split

# # Split the word list into 80% training and 20% evaluation
# train_words, eval_words = train_test_split(word_list, test_size=0.2, random_state=42)

In [5]:
# !pip install --upgrade stable-baselines3[extra]
# !pip install sb3-contrib

# 2. Initialize the environment and Network Architecture parameters

In [7]:
import torch
import torch.nn as nn
from sb3_contrib import RecurrentPPO
# Removed unnecessary import of RecurrentActorCriticPolicy
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.env_checker import check_env


# Training environment
# Initialize the Curriculum instance
curriculum = Curriculum()

# Initialize the training environment with curriculum
env = HangmanEnv(word_list=word_list, curriculum=curriculum, max_word_length=max_word_length)

# Evaluation environment
# eval_env = HangmanEnv(word_list=eval_words, max_attempts=6, max_word_length=max_word_length)

# Check the environment for compatibility
check_env(env)
# check_env(eval_env)


Curriculum state loaded: Phase 1


In [11]:
# policy_kwargs = dict(
#     activation_fn=nn.ReLU,
#     net_arch=dict(pi=[128], vf=[128]),  # Corrected net_arch
#     lstm_hidden_size=128,
#     n_lstm_layers=1,
#     shared_lstm=True,
#     enable_critic_lstm=False,  # Disable separate LSTM for the critic
# )

policy_kwargs = dict(
    activation_fn=nn.ReLU,
    net_arch=dict(pi=[256, 128], vf=[256, 128]),  # Increased network layers and units
    lstm_hidden_size=128,
    n_lstm_layers=1,
    shared_lstm=True,
    enable_critic_lstm=False,
)



# 3. Define Callbacks

In [4]:
from stable_baselines3.common.callbacks import BaseCallback

class CurriculumCallback(BaseCallback):
    """
    A custom callback to manage curriculum learning by advancing phases based on training steps.
    """
    def __init__(self, curriculum, phase_timesteps, verbose=1):
        """
        :param curriculum: Instance of Curriculum class.
        :param phase_timesteps: List of timesteps at which to advance the curriculum phases.
        :param verbose: Verbosity level.
        """
        super(CurriculumCallback, self).__init__(verbose)
        self.curriculum = curriculum
        self.phase_timesteps = phase_timesteps
        self.current_phase_index = 0

    def _on_step(self) -> bool:
        """
        Called at every step. Checks if it's time to advance the curriculum.
        """
        if self.current_phase_index >= len(self.phase_timesteps):
            return True  # No more phases to advance

        if self.num_timesteps >= self.phase_timesteps[self.current_phase_index]:
            self.curriculum.advance_phase()
            self.current_phase_index += 1

        return True

class AverageRewardCallback(BaseCallback):
    """
    A custom callback that logs average reward over a specified number of steps.
    """
    def __init__(self, check_freq: int, verbose=0):
        super(AverageRewardCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.rewards = []
        self.total_steps = 0

    def _on_step(self) -> bool:
        # Safely collect rewards
        rewards = self.locals.get('rewards', [])
        if len(rewards) > 0:
            reward = rewards[0]
            self.rewards.append(reward)
            self.total_steps += 1
    
            # Check if it's time to compute the average reward
            if self.total_steps % self.check_freq == 0:
                avg_reward = np.mean(self.rewards[-self.check_freq:])
                print(f"Average reward over last {self.check_freq} steps: {avg_reward:.2f}")
        return True


In [8]:

from stable_baselines3.common.callbacks import CallbackList
from stable_baselines3.common.callbacks import CheckpointCallback

# Define when to advance phases (e.g., after 100k, 300k, and 500k timesteps)
phase_timesteps = [800_000, 1_600_000, 2_400_000, 3_200_000, 4_000_000]

# Initialize the CurriculumCallback
curriculum_callback = CurriculumCallback(
    curriculum=curriculum,
    phase_timesteps=phase_timesteps,
    verbose=1
)


# Initialize the AverageRewardCallback
average_reward_callback = AverageRewardCallback(check_freq=5000, verbose=1)

# Define the CheckpointCallback to save models periodically
checkpoint_callback = CheckpointCallback(
    save_freq=50000,               # Save every 50,000 steps
    save_path='./PPO_LSTM_MORE_ROUNDS/',         # Directory to save models
    name_prefix='hangman_model_PPO_LSTM',   # Prefix for saved model files
    save_replay_buffer=False,      # Not needed for PPO
    save_vecnormalize=False        # Not needed if VecNormalize not used
)

# Combine all callbacks
callback = CallbackList([curriculum_callback, average_reward_callback, checkpoint_callback])


# 4. Wrapper Class

In [9]:
from stable_baselines3.common.vec_env import VecMonitor
from stable_baselines3.common.vec_env import SubprocVecEnv

# num_envs = 6  # Number of parallel environments
# vec_train_env = SubprocVecEnv([lambda: HangmanEnv(word_list, max_attempts=6, max_word_length=15) for _ in range(num_envs)])
# vec_train_env = VecMonitor(vec_train_env)

# Wrap the training environment
vec_train_env = DummyVecEnv([lambda: env])
vec_train_env = VecMonitor(vec_train_env)

# Wrap the evaluation environment
# vec_eval_env = DummyVecEnv([lambda: eval_env])

# 5. Training Hyperparameters

In [12]:
# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Initialize the RecurrentPPO model
model = RecurrentPPO(
    "MlpLstmPolicy",
    vec_train_env,
    policy_kwargs=policy_kwargs,
    learning_rate=1e-4,
    n_steps=256,
    batch_size=128,
    n_epochs=4,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.2,
    ent_coef=0.01,
    vf_coef=0.5,
    max_grad_norm=0.5,
    verbose=0,
    device=device,
    tensorboard_log="./PPO_Hangman_tensorboard/"
)

Using device: cuda


# 6. Start Training

In [12]:
from stable_baselines3.common.callbacks import ProgressBarCallback
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.callbacks import CheckpointCallback, CallbackList


# Define the evaluation callback
# eval_callback = EvalCallback(
#     vec_eval_env,
#     best_model_save_path="./logs/recurrent_ppo_hangman_stable_baselines",
#     log_path="./logs/recurrent_ppo_hangman_stable_baselines_eval_logs",
#     eval_freq=5000,  # Evaluate every 5000 steps
#     n_eval_episodes=10,  # Number of episodes to evaluate
#     deterministic=True,
#     render=False,
# )


# Train the model
total_timesteps = 5_000_000  # Adjust as needed
# total_timesteps = 1_0000  # Adjust as needed

model.learn(total_timesteps=total_timesteps, callback=callback, progress_bar=True)

Output()

<sb3_contrib.ppo_recurrent.ppo_recurrent.RecurrentPPO at 0x7ea536c49eb0>

In [15]:
from stable_baselines3.common.evaluation import evaluate_policy

vec_env = model.get_env()
mean_reward, std_reward = evaluate_policy(model, vec_env, n_eval_episodes=20, warn=False)
print(mean_reward)

46.86981


In [35]:
curriculum.save_state()

# Load saved model and retrain

In [5]:
from sb3_contrib import RecurrentPPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecMonitor
import torch

# Initialize the Curriculum instance (loads the saved state)
curriculum = Curriculum(state_file='curriculum_state.json')

# Initialize the Hangman environment with the loaded curriculum
env = HangmanEnv(word_list=word_list, curriculum=curriculum, max_word_length=max_word_length)

# Wrap the environment in DummyVecEnv and VecMonitor
vec_env = DummyVecEnv([lambda: env])
vec_env = VecMonitor(vec_env)

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

policy_kwargs = dict(
    activation_fn=nn.ReLU,
    net_arch=dict(pi=[256, 128], vf=[256, 128]),  # Increased network layers and units
    lstm_hidden_size=128,
    n_lstm_layers=1,
    shared_lstm=True,
    enable_critic_lstm=False,
)



# Load the trained model
trained_model_path = "./PPO_LSTM_MORE_ROUNDS/hangman_model_PPO_LSTM_5000000_steps.zip"  # Update the path accordingly
model = RecurrentPPO.load(trained_model_path, env=vec_env, device=device)

print("Trained model loaded successfully.")

Curriculum state loaded: Phase 6
Using device: cuda
Trained model loaded successfully.


In [6]:
from stable_baselines3.common.callbacks import CallbackList, CheckpointCallback, BaseCallback

# Define new phase timesteps for continued training
# Extend or adjust as needed based on curriculum design
# additional_phase_timesteps = [5_600_000, 6_400_000, 7_200_000, 8_000_000]  # Example extensions

# Combine with existing phase_timesteps if necessary
phase_timesteps = [800_000, 1_600_000, 2_400_000, 3_200_000, 4_000_000]

# Initialize the CurriculumCallback with updated phase_timesteps
# curriculum_callback = CurriculumCallback(
#     curriculum=curriculum,
#     phase_timesteps=phase_timesteps,
#     verbose=1
# )

class AdaptiveCurriculumCallback(BaseCallback):
    """
    A custom callback to adaptively manage curriculum learning based on agent performance.
    """
    def __init__(self, curriculum, phase_timesteps, performance_threshold=0.8, verbose=1):
        """
        :param curriculum: Instance of Curriculum class.
        :param phase_timesteps: List of timesteps at which to evaluate and potentially advance/regress the curriculum phases.
        :param performance_threshold: Threshold to decide whether to advance or regress the curriculum phases.
        :param verbose: Verbosity level.
        """
        super(AdaptiveCurriculumCallback, self).__init__(verbose)
        self.curriculum = curriculum
        self.phase_timesteps = phase_timesteps
        self.current_phase_index = 0
        self.performance_threshold = performance_threshold
        self.recent_rewards = []
    
    def _on_step(self) -> bool:
        """
        Called at every step. Checks if it's time to adjust the curriculum based on performance.
        """
        if self.current_phase_index >= len(self.phase_timesteps):
            return True  # No more phases to adjust
    
        if self.num_timesteps >= self.phase_timesteps[self.current_phase_index]:
            # Calculate average reward over the last check_freq steps
            if len(self.recent_rewards) >= 5000:
                avg_reward = np.mean(self.recent_rewards[-5000:])
                if avg_reward >= self.performance_threshold:
                    self.curriculum.advance_phase()
                elif self.curriculum.current_phase > 1:
                    self.curriculum.regress_phase()
            self.current_phase_index += 1
    
        # Collect rewards
        rewards = self.locals.get('rewards', [])
        if rewards:
            self.recent_rewards.extend(rewards)
    
        return True

adaptive_curriculum_callback = AdaptiveCurriculumCallback(
    curriculum=curriculum,
    phase_timesteps=phase_timesteps,
    performance_threshold=2.6,  # Adjust based on desired performance
    verbose=1
)


# Initialize the AverageRewardCallback
average_reward_callback = AverageRewardCallback(check_freq=5000, verbose=1)

# Define the CheckpointCallback to save models periodically
checkpoint_callback = CheckpointCallback(
    save_freq=50000,               
    save_path='./PPO_LSTM_MORE_ROUNDS_RETRAIN/',         
    name_prefix='hangman_model_PPO_LSTM',   
    save_replay_buffer=False,      
    save_vecnormalize=False        
)

# Combine all callbacks
callback = CallbackList([adaptive_curriculum_callback, average_reward_callback, checkpoint_callback])


In [7]:
# Define additional timesteps for continued training
additional_timesteps = 5_000_000  # Adjust as needed

# Continue training
model.learn(total_timesteps=additional_timesteps, callback=callback, progress_bar=True)

# Save the continued model
#model.save("hangman_model_continued")
#print("Model continued training and saved as hangman_model_continued.zip")

Output()

<sb3_contrib.ppo_recurrent.ppo_recurrent.RecurrentPPO at 0x7e8a254f2d00>

In [8]:
from stable_baselines3.common.evaluation import evaluate_policy

vec_env = model.get_env()
mean_reward, std_reward = evaluate_policy(model, vec_env, n_eval_episodes=20, warn=False)
print(mean_reward)

47.20339


In [27]:
np.save("letter_frequencies.npy", env.letter_frequencies)
print("Letter frequencies saved.")

Letter frequencies saved.


In [None]:
# Save the trained model
# model.save("recurrent_ppo_hangman_stable_baselines")

In [9]:
from tqdm import tqdm
import numpy as np

# Prepare the validation set
validation_words = random.sample(word_list, min(1000, len(word_list)))
#env = HangmanEnv(word_list=word_list, max_attempts=6, max_word_length=max_word_length)

def evaluate_agent(model, env, validation_words):
    wins = 0
    total_games = len(validation_words)

    for idx, word in enumerate(tqdm(validation_words)):
        # Reset the environment with the validation word
        obs, info = env.reset(word=word)
        done = False

        # Initialize the LSTM states
        lstm_states = None  # (h, c)
        episode_starts = True  # True at the beginning of each episode
        step_counter = 0
        max_steps_per_episode = 1000  # Set a reasonable limit

        while not done and step_counter < max_steps_per_episode:
            # Predict the action and update the LSTM states
            action, lstm_states = model.predict(
                obs,
                state=lstm_states,
                episode_start=np.array([episode_starts]),
                deterministic=True
            )
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            episode_starts = done  # Reset LSTM states if episode is done
            step_counter += 1

        if step_counter >= max_steps_per_episode:
            print(f"Exceeded maximum steps for word '{word}'")

        # Check if the agent won
        if env.remaining_attempts > 0:
            wins += 1

    win_percentage = (wins / total_games) * 100
    print(f"Agent won {wins} out of {total_games} games.")
    print(f"Win percentage: {win_percentage:.2f}%")

# Evaluate the agent
evaluate_agent(model, env, validation_words)

100%|███████████████████████████████████████| 1000/1000 [02:51<00:00,  5.84it/s]

Agent won 152 out of 1000 games.
Win percentage: 15.20%





In [59]:
import time
from tqdm import tqdm
import numpy as np

def visualize_agent_playing(model, env, word):
    """
    Visualize the Hangman agent playing a single game.
    
    :param model: Trained RecurrentPPO model.
    :param env: Hangman environment.
    :param word: The word for the environment to guess.
    """
    # Reset the environment with the specified word
    obs, info = env.reset(word=word)
    done = False

    # Initialize LSTM states and flags
    lstm_states = None  # (h, c)
    episode_starts = True  # True at the beginning of each episode
    step_counter = 0
    max_steps_per_episode = 1000  # Set a reasonable limit

    print(f"Word to guess: {word}")
    print("=" * 30)

    while not done and step_counter < max_steps_per_episode:
        # Render the current state of the environment
        env.render()

        # Predict the action and update LSTM states
        action, lstm_states = model.predict(
            obs,
            state=lstm_states,
            episode_start=np.array([episode_starts]),
            deterministic=True
        )
        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        episode_starts = done  # Reset LSTM states if the episode is done
        step_counter += 1

        # Sleep to visualize progress
        time.sleep(0.5)

    # Render the final state of the game
    print("\nFinal state:")
    env.render()

    # Display result
    if env.remaining_attempts > 0:
        print("\nThe agent won!")
    else:
        print("\nThe agent lost!")
    print("=" * 30)

# Example usage
# Pick a random word from the word list to visualize
random_word = random.choice(word_list)
print(random_word)

# Visualize the agent playing the game
visualize_agent_playing(model, env, random_word)


nonpasserine
Word to guess: nonpasserine
Word: ____________
Guessed Letters: []
Remaining Attempts: 6
Word: ____a_______
Guessed Letters: ['a']
Remaining Attempts: 6
Word: ____a__e___e
Guessed Letters: ['a', 'e']
Remaining Attempts: 6
Word: ____a__e_i_e
Guessed Letters: ['a', 'e', 'i']
Remaining Attempts: 6
Word: ____a__eri_e
Guessed Letters: ['a', 'e', 'i', 'r']
Remaining Attempts: 6
Word: _o__a__eri_e
Guessed Letters: ['a', 'e', 'i', 'o', 'r']
Remaining Attempts: 6
Word: _o__a__eri_e
Guessed Letters: ['a', 'e', 'i', 'l', 'o', 'r']
Remaining Attempts: 5
Word: non_a__erine
Guessed Letters: ['a', 'e', 'i', 'l', 'n', 'o', 'r']
Remaining Attempts: 5
Word: non_a__erine
Guessed Letters: ['a', 'e', 'i', 'l', 'n', 'o', 'r', 't']
Remaining Attempts: 4
Word: non_asserine
Guessed Letters: ['a', 'e', 'i', 'l', 'n', 'o', 'r', 's', 't']
Remaining Attempts: 4
Word: non_asserine
Guessed Letters: ['a', 'c', 'e', 'i', 'l', 'n', 'o', 'r', 's', 't']
Remaining Attempts: 3
Word: non_asserine
Guessed Letter

In [None]:
# 1. Load the trained model
# Replace the path with the actual path to your trained model
trained_model_path = "./models/hangman_model_PPO_LSTM_1000000_steps.zip"
model = RecurrentPPO.load(trained_model_path, device='cpu')

# 2. Initialize the environment
# Assuming you have the HangmanEnv class defined as per your code
# and that 'word_list' is already loaded and filtered

# Initialize the Curriculum instance
curriculum = Curriculum()

# Initialize the Hangman environment
env = HangmanEnv(word_list=word_list, curriculum=curriculum, max_word_length=max_word_length)

# Wrap the environment in DummyVecEnv
vec_env = DummyVecEnv([lambda: env])
vec_env = VecMonitor(vec_env)

# 3. Play the game with a specific word
word_to_guess = 'apple'  # Example word

# Call the playback function
win, steps = play_with_agent(model, vec_env, word_to_guess, verbose=True)
