In [None]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import torch.nn.functional as F

In [None]:
# 1. dataset prepare and env setup
import random
from typing import List, Tuple

class HangmanEnvironment:
    def __init__(self, words: List[str], max_incorrect_guesses: int = 6, validation_split: float = 0.2):
        self.all_words = words
        self.max_incorrect_guesses = max_incorrect_guesses
        self.validation_split = validation_split
        self.train_words, self.val_words = self._split_words()
        self.reset()

    def _split_words(self) -> Tuple[List[str], List[str]]:
        """Split the words into training and validation sets."""
        random.shuffle(self.all_words)
        split_idx = int(len(self.all_words) * (1 - self.validation_split))
        return self.all_words[:split_idx], self.all_words[split_idx:]

    def reset(self, use_validation=False, new_game=True, target_word=None):
        if new_game:
            self.words = self.val_words if use_validation else self.train_words
            self.target_word = target_word if target_word else random.choice(self.words)
        self.masked_word = ['_'] * len(self.target_word)
        self.guessed_letters = set()
        self.incorrect_guesses = 0
        self.done = False
        return self.get_state()

    def guess(self, letter: str):
        if self.done or letter in self.guessed_letters:
            return self.get_state(), 0, self.done  # Ensure 'done' is part of the returned state

        self.guessed_letters.add(letter)
        reward = -1



        if len(self.target_word) != len(self.masked_word):
            raise ValueError("Target word and masked word lengths do not match.")

        if letter in self.target_word:
            reward = self.reveal_letters(letter)
        else:
            reward = -100
            self.incorrect_guesses += 1

        if self.incorrect_guesses >= self.max_incorrect_guesses or '_' not in self.masked_word:
            self.done = True
            reward = 1000 if '_' not in self.masked_word else -100



        return self.get_state(), reward, self.done

    def reveal_letters(self, letter: str) -> int:
        """Reveal the guessed letter in the masked word and return the reward."""
        reward = 0
        for i, l in enumerate(self.target_word):

            if l == letter and self.masked_word[i] == '_':
                self.masked_word[i] = l
                reward += 100
        return reward

    def get_state(self) -> dict:
        """Return the current state of the game."""
        return {
            'masked_word': ' '.join(self.masked_word),
            'incorrect_guesses': self.incorrect_guesses,
            'guessed_letters': self.guessed_letters,
            'done': self.done  # Ensure 'done' is included in the state
        }



In [None]:
import random
from copy import deepcopy

class VectorizedHangmanEnvironment:
    def __init__(self, base_env, n_envs):
        self.envs = [deepcopy(base_env) for _ in range(n_envs)]
        self.n_envs = n_envs

    def reset(self):
        return [env.reset() for env in self.envs]

    def step(self, actions):
        results = [self.envs[i].guess(actions[i]) for i in range(self.n_envs)]
        next_states, rewards, dones = zip(*results)
        return list(next_states), list(rewards), list(dones)


In [None]:
class HangmanModel(nn.Module):
    def __init__(self, embedding_dim=10, guessed_size=26, hidden_dim=128, num_layers=4, output_size=26):
        super(HangmanModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        # Embedding layer for 26 letters + underscore ('_')
        self.char_embeddings = nn.Embedding(27, embedding_dim)

        self.combine_fc = nn.Linear(embedding_dim + guessed_size + 1, embedding_dim)

        # GRU layers
        self.gru = nn.GRU(embedding_dim, hidden_dim, num_layers,
                          batch_first=True, bidirectional=True, dropout=0.5)

        # The output layer takes input from the last bidirectional layers * number of directions (2)
        self.fc = nn.Linear(hidden_dim * 2, output_size)

        # Dropout layer
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        combined = F.relu(self.combine_fc(x))

        gru_out, _ = self.gru(combined)
        gru_out = self.dropout(gru_out[:, -1, :])

        out = self.fc(gru_out)
        return out





In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

'''
class HangmanModel(nn.Module):
    def __init__(self, embedding_dim=10):
        super(HangmanModel, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = 128
        self.num_layers = 2  # Increased from 1 to 2 for more depth

        # Embedding layer for 26 letters + underscore ('_')
        self.char_embeddings = nn.Embedding(27, embedding_dim)

        # Increase complexity by adding more layers to the GRU
        self.gru = nn.GRU(embedding_dim, self.hidden_dim, self.num_layers,
                          batch_first=True, bidirectional=True, dropout=0.5)  # Increased dropout for regularization

        # Linear layer takes input from both directions of GRU
        self.fc = nn.Linear(self.hidden_dim * 2, 26)  # *2 for bidirectional output

        # Increased dropout for additional regularization
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):

        # Pass embeddings through GRU
        gru_out, _ = self.gru(x)

        # Apply dropout to the output features
        gru_out = self.dropout(gru_out[:, -1, :])

        # Pass the output through the fully connected layer
        out = self.fc(gru_out)
        return out

'''

"\nclass HangmanModel(nn.Module):\n    def __init__(self, embedding_dim=10):\n        super(HangmanModel, self).__init__()\n        self.embedding_dim = embedding_dim\n        self.hidden_dim = 128\n        self.num_layers = 2  # Increased from 1 to 2 for more depth\n\n        # Embedding layer for 26 letters + underscore ('_')\n        self.char_embeddings = nn.Embedding(27, embedding_dim)\n\n        # Increase complexity by adding more layers to the GRU\n        self.gru = nn.GRU(embedding_dim, self.hidden_dim, self.num_layers,\n                          batch_first=True, bidirectional=True, dropout=0.5)  # Increased dropout for regularization\n\n        # Linear layer takes input from both directions of GRU\n        self.fc = nn.Linear(self.hidden_dim * 2, 26)  # *2 for bidirectional output\n\n        # Increased dropout for additional regularization\n        self.dropout = nn.Dropout(0.5)\n\n    def forward(self, x):\n\n        # Pass embeddings through GRU\n        gru_out, _ = se

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
def encode_state(masked_word, guessed_letters, remaining_incorrect_guesses, embedding):
    # Masked word encoding
    char_indices = [26 if c == '_' else ord(c) - ord('a') for c in masked_word.replace(" ", "")]
    char_indices_tensor = torch.tensor(char_indices, dtype=torch.long).to(device)
    masked_word_embedding = embedding(char_indices_tensor).unsqueeze(0)

    # Guessed letters encoding
    guessed_vec = [1 if chr(i + ord('a')) in guessed_letters else 0 for i in range(26)]
    guessed_tensor = torch.tensor(guessed_vec, dtype=torch.float).to(device)

    # Remaining incorrect guesses
    remaining_guesses_tensor = torch.tensor([remaining_incorrect_guesses / 6.0], dtype=torch.float).to(device)

    # Reshape guessed_tensor and remaining_guesses_tensor to add a dummy dimension for concatenation
    guessed_tensor = guessed_tensor.view(1, 1, -1) # Reshape to (1, 1, 26)
    remaining_guesses_tensor = remaining_guesses_tensor.view(1, 1, -1) # Reshape to (1, 1, 1)

    # Combine all encodings
    combined_encoding = torch.cat((masked_word_embedding, guessed_tensor.expand(-1, masked_word_embedding.size(1), -1),
                                   remaining_guesses_tensor.expand(-1, masked_word_embedding.size(1), -1)), 2)

    return combined_encoding




In [None]:
'''
def encode_state(masked_word, embedding):

    char_indices = [26 if c == '_' else ord(c) - ord('a') for c in masked_word.replace(" ", "")]
    char_indices_tensor = torch.tensor(char_indices, dtype=torch.long).to(device)



    # Obtain embeddings for the masked word
    masked_word_embedding = embedding(char_indices_tensor).unsqueeze(0)  # Add batch dimension

    return masked_word_embedding
'''


'\ndef encode_state(masked_word, embedding):\n\n    char_indices = [26 if c == \'_\' else ord(c) - ord(\'a\') for c in masked_word.replace(" ", "")]\n    char_indices_tensor = torch.tensor(char_indices, dtype=torch.long).to(device)\n\n\n\n    # Obtain embeddings for the masked word\n    masked_word_embedding = embedding(char_indices_tensor).unsqueeze(0)  # Add batch dimension\n\n    return masked_word_embedding\n'

In [None]:
def select_action(state, model, epsilon):
    """Selects an action using epsilon-greedy policy."""



    if random.random() > epsilon:  # Exploit
        with torch.no_grad():


            q_values = model(state)



            action = q_values.max(1)[1].item()

    else:  # Explore

        action = random.randrange(26)

    return action



In [None]:
from collections import namedtuple, deque

# Define a transition tuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        """Sample a batch of transitions"""
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


In [None]:
def train_model(policy_net, target_net, memory, optimizer, batch_size, gamma, desired_state_length):
    if len(memory) < batch_size:
        return  # Not enough samples to train

    # Sample a batch of transitions from memory
    transitions = memory.sample(batch_size)

    # Filter transitions to only those with the desired state length
    filtered_transitions = [trans for trans in transitions if trans.state.shape[1] == desired_state_length]

    # If there are not enough transitions of the desired length, return early
    if len(filtered_transitions) < batch_size:
        return

    # Convert filtered batch-array of Transitions to Transition of batch-arrays
    batch = Transition(*zip(*filtered_transitions))

    # Rest of the training logic remains the same
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), dtype=torch.bool).to(device)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None]).to(device)
    state_batch = torch.cat(batch.state).to(device)
    action_batch = torch.cat(batch.action).to(device)
    reward_batch = torch.cat(batch.reward).to(device)

    state_action_values = policy_net(state_batch).gather(1, action_batch)
    next_state_values = torch.zeros(len(filtered_transitions)).to(device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    expected_state_action_values = (next_state_values * gamma) + reward_batch
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    optimizer.zero_grad()
    loss.backward(retain_graph=True)
    optimizer.step()

    return loss



In [None]:
def update_target_network(policy_net, target_net):
    target_net.load_state_dict(policy_net.state_dict())


In [None]:
def create_curriculum_datasets(all_words):
    level_1 = [word for word in all_words if len(word) < 3]
    level_2 = [word for word in all_words if 3 <= len(word) <= 6]
    level_3 = [word for word in all_words if len(word) > 6]
    return level_1, level_2, level_3


In [None]:
import torch
import torch.optim as optim
import numpy as np
import torch.optim.lr_scheduler as lr_scheduler

policy_net = HangmanModel().to(device)
target_net = HangmanModel().to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()


optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
memory = ReplayMemory(30000)  # Adjust size as needed

scheduler = lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.1)


# Curriculum levels based on word length
words_file_path = "/content/drive/MyDrive/words_250000_train.txt"
with open(words_file_path, 'r') as file:
    words = [line.strip().lower() for line in file]

# Initialize the Hangman environment

curriculum_levels = create_curriculum_datasets(words)
level_thresholds = [0.6, 0.5, 0.5]

env = HangmanEnvironment(curriculum_levels[0])



In [None]:
def evaluate_win_rate(env, model, words, num_episodes=100, use_validation=False):
    win_count = 0
    for _ in range(num_episodes):
        word = random.choice(words)
        state_dict = env.reset(use_validation=use_validation, new_game=True, target_word=word)

        while not state_dict['done']:
            state_encoding = encode_state(
                state_dict['masked_word'],
                state_dict['guessed_letters'],
                env.max_incorrect_guesses - state_dict['incorrect_guesses'],
                policy_net.char_embeddings
            )
            state_encoding = state_encoding.to(device)

            action = select_action(state_encoding, model, epsilon=0.01)  # Use a very low epsilon for evaluation
            letter = chr(action + ord('a'))
            state_dict, _, done = env.guess(letter)

        if '_' not in state_dict['masked_word']:  # Check if the word was completely guessed
            win_count += 1

    return win_count / num_episodes


In [None]:
'''
def evaluate_win_rate(env, model, words, num_episodes=100, use_validation=False):
    win_count = 0
    for _ in range(num_episodes):
        word = random.choice(words)
        env.reset(use_validation=use_validation, new_game=True, target_word=word)

        state = env.get_state()
        state_encoding = encode_state(state['masked_word'], policy_net.char_embeddings)
        state_encoding = state_encoding.to(device)

        while not state['done']:
            action = select_action(state_encoding, model, epsilon=0.01)  # Use a very low epsilon for evaluation
            letter = chr(action + ord('a'))
            state, _, done = env.guess(letter)
            state_encoding = encode_state(state['masked_word'], policy_net.char_embeddings)

        if '_' not in state['masked_word']:  # Check if the word was completely guessed
            win_count += 1

    return win_count / num_episodes
'''

"\ndef evaluate_win_rate(env, model, words, num_episodes=100, use_validation=False):\n    win_count = 0\n    for _ in range(num_episodes):\n        word = random.choice(words)\n        env.reset(use_validation=use_validation, new_game=True, target_word=word)\n\n        state = env.get_state()\n        state_encoding = encode_state(state['masked_word'], policy_net.char_embeddings)\n        state_encoding = state_encoding.to(device)\n\n        while not state['done']:\n            action = select_action(state_encoding, model, epsilon=0.01)  # Use a very low epsilon for evaluation\n            letter = chr(action + ord('a'))\n            state, _, done = env.guess(letter)\n            state_encoding = encode_state(state['masked_word'], policy_net.char_embeddings)\n\n        if '_' not in state['masked_word']:  # Check if the word was completely guessed\n            win_count += 1\n\n    return win_count / num_episodes\n"

In [None]:

num_episodes = 25000
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 1000
batch_size = 128
gamma = 0.8
TARGET_UPDATE = 10
evaluation_interval = 200


In [None]:
def save_model(model, filename="/content/drive/MyDrive/agent_hangman_model_test1.pth"):
    torch.save(model.state_dict(), filename)

training_losses = []
win_rates = []

In [None]:
import matplotlib.pyplot as plt
from tqdm import tqdm

# Make sure to import tqdm at the beginning of your script

for episode in tqdm(range(num_episodes), desc="Training Progress"):
    epsilon = epsilon_start + (epsilon_end - epsilon_start) * (episode / epsilon_decay)
    state = env.reset(new_game=True)  # Start with a new word
    done = False
    win = False
    total_reward = 0
    train_num = 0

    while not done:
        state_encoding = encode_state(state['masked_word'], state['guessed_letters'],
                                      state['incorrect_guesses'], policy_net.char_embeddings)
        action = select_action(state_encoding, policy_net, epsilon)
        next_state, reward, done = env.guess(chr(action + ord('a')))
        total_reward += reward
        next_state_encoding = None if done else encode_state(next_state['masked_word'], next_state['guessed_letters'],
                                                             next_state['incorrect_guesses'], policy_net.char_embeddings)

        memory.push(state_encoding, torch.tensor([[action]], dtype=torch.long), next_state_encoding,
                    torch.tensor([reward], dtype=torch.float), done)

        state = next_state if not done else state

        train_loss = train_model(policy_net, target_net, memory, optimizer, batch_size, gamma,state_encoding.shape[1])
        #training_losses.append(train_loss.item())
        train_num += 1

        if done and reward > 0:
            win = True  # Agent won the game
            #print("episode {} trained with {} times".format(episode, train_num))
        elif done:
            win = False  # Agent lost the game
            env.reset(new_game=False)  # Reset the environment to the current word
            done = False  # Continue training on the same word

    if episode % TARGET_UPDATE == 0:
        update_target_network(policy_net, target_net)

    if episode % evaluation_interval == 0 and episode > 0:
        #train_win_rate = evaluate_win_rate(env, policy_net, env.train_words, num_episodes=200)
        val_win_rate = evaluate_win_rate(env, policy_net, env.val_words, num_episodes=200, use_validation=True)
        win_rates.append(val_win_rate)

        print(f"Episode: {episode}, Validation Win Rate: {val_win_rate:.2f}")

        if val_win_rate >= 0.25:
            print("Saving model based on validation performance.")
            save_model(policy_net)



Training Progress:   1%|          | 201/25000 [02:58<196:14:06, 28.49s/it]

Episode: 200, Validation Win Rate: 0.14


Training Progress:   2%|▏         | 401/25000 [07:16<249:25:52, 36.50s/it]

Episode: 400, Validation Win Rate: 0.10


Training Progress:   2%|▏         | 601/25000 [13:58<219:45:27, 32.42s/it]

Episode: 600, Validation Win Rate: 0.06


Training Progress:   3%|▎         | 801/25000 [21:52<269:11:32, 40.05s/it]

Episode: 800, Validation Win Rate: 0.09


Training Progress:   4%|▍         | 966/25000 [43:30<18:02:22,  2.70s/it] 


KeyboardInterrupt: 

In [None]:

plt.figure(figsize=(12, 5))


plt.plot(training_losses)

plt.show()

In [None]:

plt.figure(figsize=(12, 5))


plt.plot(win_rates)
plt.legend(["Training", "Validation"])

plt.show()

In [None]:
!nvidia-smi
