In [1]:
!pip install tensorflow hmmlearn numpy
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam
from hmmlearn import hmm
import random
import string
import re
from collections import defaultdict, deque
import os
import time

# Suppress TensorFlow warnings for cleaner output
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.get_logger().setLevel('ERROR')

# ---
# 1. Part 0: Corpus Loading and Preprocessing
# ---
def load_corpus(filename="corpus.txt"):
    """
    Loads the 50,000-word corpus.
    The provided file is a single text block.
    """
    print(f"Loading corpus from {filename}...")
    try:
        # Use the content from the provided 'corpus.txt'
        with open(filename, 'r') as f:
            full_content = f.read()

        # Split by any whitespace and filter for valid words
        all_words = re.split(r'\s+', full_content)
        valid_words = sorted(list(set(
            word.lower() for word in all_words if word.isalpha()
        )))

        print(f"Loaded {len(valid_words)} unique, valid words.")
        return valid_words

    except FileNotFoundError:
        print(f"Error: {filename} not found.")
        print("Please ensure 'corpus.txt' is in the same directory.")
        return []

def group_words_by_length(words):
    """
    Groups words by their length, as hinted for HMM training.
    """
    words_by_length = defaultdict(list)
    for word in words:
        words_by_length[len(word)].append(word)
    print(f"Grouped words into {len(words_by_length)} length-based buckets.")
    return words_by_length

def word_to_sequence(word):
    """Converts a word into a numpy array of integer emissions (0-25)."""
    return np.array([ord(char) - ord('a') for char in word]).reshape(-1, 1)

def sequence_to_word(seq):
    """Converts a sequence of integers back to a word."""
    return "".join([chr(ord('a') + num) for num in seq.flatten()])

# ---
# 2. Part 1: The Hidden Markov Model (HMM) Oracle
# ---
class HMMOracle:
    """
    Implements the hybrid probabilistic oracle.
    It uses a word-list filter as the primary source and a
    positional HMM as a fallback.
    """
    def __init__(self, words_by_length):
        self.words_by_length = words_by_length
        self.max_len = max(words_by_length.keys())
        self.hmms = {} # Stores HMMs by length
        self.pattern_cache = {}

    def _get_pattern(self, masked_word, guessed_letters):
        """Creates a regex pattern to filter the word list."""
        key = (masked_word, tuple(sorted(guessed_letters)))
        if key in self.pattern_cache:
            return self.pattern_cache[key]

        pattern = list(masked_word)
        wrong_guesses = guessed_letters - set(masked_word)

        regex_parts = []
        for char in pattern:
            if char == '_':
                negation_set = "".join(sorted(wrong_guesses))
                # Fix: Handle empty negation set
                if negation_set:
                    regex_parts.append(f"[^{negation_set}]")
                else:
                    regex_parts.append(".") # Any character if no wrong guesses
            else:
                regex_parts.append(char)

        regex_str = f"^{''.join(regex_parts)}$"
        self.pattern_cache[key] = re.compile(regex_str)
        return self.pattern_cache[key]

    def train(self):
        """
        Trains one HMM for each word length.
        State = Position in word (Hidden State)
        Emission = Letter (Emission)
        """
        print("Training HMMs (positional frequency models)...")
        for length, word_list in self.words_by_length.items():
            if not word_list:
                continue

            model = hmm.CategoricalHMM(
                n_components=length, n_features=26,
                init_params="", params=""
            )
            model.startprob_ = np.array([1.0] + [0.0] * (length - 1))
            transmat = np.zeros((length, length))
            for i in range(length - 1):
                transmat[i, i + 1] = 1.0
            transmat[length - 1, length - 1] = 1.0
            model.transmat_ = transmat

            emission_prob = np.ones((length, 26))
            for word in word_list:
                for i, char in enumerate(word):
                    emission_prob[i, ord(char) - ord('a')] += 1

            emission_prob /= np.sum(emission_prob, axis=1, keepdims=True)
            model.emissionprob_ = emission_prob
            self.hmms[length] = model
        print("HMM training complete.")

    def get_probabilities(self, masked_word, guessed_letters):
        """Calculates the probability distribution over the alphabet."""
        length = len(masked_word)
        if length not in self.words_by_length:
            return np.zeros(26)

        pattern = self._get_pattern(masked_word, guessed_letters)
        relevant_words = [
            word for word in self.words_by_length[length] if pattern.match(word)
        ]

        probs = np.zeros(26)
        available_letters = set(string.ascii_lowercase) - guessed_letters

        if relevant_words:
            for word in relevant_words:
                for i, char in enumerate(word):
                    if masked_word[i] == '_':
                        probs[ord(char) - ord('a')] += 1

            if np.sum(probs) > 0:
                probs /= np.sum(probs)

            for i in range(26):
                if chr(ord('a') + i) not in available_letters:
                    probs[i] = 0
            return probs

        model = self.hmms.get(length)
        if model is None:
            return np.zeros(26)

        probs = np.zeros(26)
        for i, char in enumerate(masked_word):
            if char == '_':
                probs += model.emissionprob_[i, :]

        for i in range(26):
            if chr(ord('a') + i) not in available_letters:
                probs[i] = 0

        total_prob = np.sum(probs)
        if total_prob > 0:
            return probs / total_prob
        else:
            return np.zeros(26)

# ---
# 3. Part 2: The Reinforcement Learning (RL) Environment (STABLE REWARDS)
# ---
class HangmanEnv:
    """Implements the Hangman Game Environment for the RL agent."""
    def __init__(self, word_list, oracle, max_lives=6):
        self.all_words = word_list
        self.oracle = oracle
        self.max_lives = max_lives
        self._reset_game_stats()

    def _reset_game_stats(self):
        """Generates a new game state."""
        self.word = random.choice(self.all_words)
        self.word_len = len(self.word)
        self.masked_word = "_" * self.word_len
        self.lives_left = self.max_lives
        self.guessed_letters = set()
        self.game_wrong_guesses = 0
        self.game_repeated_guesses = 0
        self.game_won = False

    def _get_state(self):
        """Defines the state representation for the RL agent."""
        hmm_probs = self.oracle.get_probabilities(
            self.masked_word, self.guessed_letters
        )
        guessed_vec = np.array([
            1.0 if c in self.guessed_letters else 0.0
            for c in string.ascii_lowercase
        ])
        lives_norm = self.lives_left / self.max_lives
        blanks_norm = self.masked_word.count('_') / self.word_len
        state = np.concatenate([
            hmm_probs, guessed_vec, [lives_norm], [blanks_norm]
        ])
        return state.astype(np.float32)

    def reset(self):
        """Resets the environment and returns the initial state."""
        self._reset_game_stats()
        return self._get_state()

    def step(self, action):
        """Executes one action (guessing a letter) in the environment."""
        letter = chr(ord('a') + action)
        done = False
        info = {"repeated": 0, "wrong": 0, "won": False, "word": self.word, "masked": self.masked_word}

        if letter in self.guessed_letters:
            self.game_repeated_guesses += 1
            info["repeated"] = 1
            reward = -2
            return self._get_state(), reward, done, info

        self.guessed_letters.add(letter)

        if letter in self.word:
            new_masked_word = list(self.masked_word)
            for i, char in enumerate(self.word):
                if char == letter:
                    new_masked_word[i] = letter
            self.masked_word = "".join(new_masked_word)
            reward = 10  # More aggressive positive reward
            info["wrong"] = 0
        else:
            self.lives_left -= 1
            info["wrong"] = 1
            self.game_wrong_guesses += 1
            reward = -5 # Penalty from formula

        if "_" not in self.masked_word:
            done = True
            self.game_won = True
            info["won"] = True
            reward = 100 # Large, stable win bonus
        elif self.lives_left == 0:
            done = True
            self.game_won = False
            info["won"] = False
            reward = -100 # Large, stable loss penalty

        return self._get_state(), reward, done, info

# ---
# 4. Part 2: The Reinforcement Learning (RL) Agent (TUNED HYPERPARAMETERS)
# ---
class DQNAgent:
    """Implements a Deep Q-Network (DQN) agent."""
    def __init__(self, state_size, action_size, learning_rate=0.0001, gamma=0.99):
        self.state_size = state_size
        self.action_size = action_size # Back to 26
        self.memory = deque(maxlen=100000)
        self.gamma = gamma    # Discount factor (high for long-term planning)
        self.learning_rate = learning_rate # Lowered for stability
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.9992 # Fast decay
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()

    def _build_model(self):
        """Builds the neural network to approximate the Q-function."""
        model = Sequential([
            Input(shape=(self.state_size,)),
            Dense(128, activation='relu'),
            Dense(128, activation='relu'),
            Dense(64, activation='relu'),
            Dense(self.action_size, activation='linear') # Output 26 actions
        ])
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def update_target_model(self):
        """Syncs the Target Network with the weights of the Main Network."""
        self.target_model.set_weights(self.model.get_weights())

    def remember(self, state, action, reward, next_state, done):
        """Stores an experience tuple in the replay buffer."""
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        """Chooses an action (0-25) using an epsilon-greedy policy."""
        guessed_mask = state[0][26:52].astype(bool)
        available_actions = [i for i, guessed in enumerate(guessed_mask) if not guessed]
        if not available_actions:
            return 0

        if np.random.rand() <= self.epsilon:
            return random.choice(available_actions)

        act_values = self.model.predict(state, verbose=0)[0]
        masked_act_values = [
            act_values[i] if i in available_actions else -np.inf
            for i in range(self.action_size)
        ]
        return np.argmax(masked_act_values)

    def replay(self, batch_size):
        """Trains the model on a minibatch from the replay buffer."""
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)
        states = np.vstack([t[0] for t in minibatch])
        next_states = np.vstack([t[3] for t in minibatch])

        q_values_current = self.model.predict(states, verbose=0)
        q_values_next_target = self.target_model.predict(next_states, verbose=0)

        targets = []
        for i, (state, action, reward, next_state, done) in enumerate(minibatch):
            if done:
                target = reward
            else:
                target = reward + self.gamma * np.amax(q_values_next_target[i])

            current_q_target = q_values_current[i]
            current_q_target[action] = target
            targets.append(current_q_target)

        self.model.fit(states, np.array(targets), epochs=1, verbose=0)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

Collecting hmmlearn
  Downloading hmmlearn-0.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Downloading hmmlearn-0.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (165 kB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m166.0/166.0 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: hmmlearn
Successfully installed hmmlearn-0.3.3


In [None]:
# ---
# 5. Global Setup and Training Loop
# ---

# --- 1. Load and Prepare Data ---
print("--- Loading and Preparing Data ---")
all_words = load_corpus("corpus.txt")
words_by_length = group_words_by_length(all_words)

# --- 2. Build HMM Oracle ---
print("\n--- Building HMM Oracle ---")
oracle = HMMOracle(words_by_length)
oracle.train()

# --- 3. Build Environment and Agent (GLOBAL) ---
print("\n--- Initializing Agent and Environment ---")
env = HangmanEnv(all_words, oracle, max_lives=6)
STATE_SIZE = 54
ACTION_SIZE = 26 # Back to 26

# Create the agent in the global scope
agent = DQNAgent(STATE_SIZE, ACTION_SIZE)

# --- 4. Training Loop ---
EPISODES = 6500  # Run just long enough for epsilon to decay
BATCH_SIZE = 64
UPDATE_TARGET_FREQ = 500 # Slower target update for stability

print(f"\n--- Starting RL Agent Training ({EPISODES} episodes) ---")
start_time = time.time()
episode_rewards = []
episode_wins = [] # Track win rate

for e in range(1, EPISODES + 1):
    state = env.reset()
    state = np.reshape(state, [1, STATE_SIZE])
    done = False
    total_episode_reward = 0

    while not done:
        action = agent.act(state)
        next_state, reward, done, info = env.step(action)
        next_state = np.reshape(next_state, [1, STATE_SIZE])

        agent.remember(state, action, reward, next_state, done)

        state = next_state
        total_episode_reward += reward

        if done:
            episode_rewards.append(total_episode_reward)
            if info["won"]:
                episode_wins.append(1)
            else:
                episode_wins.append(0)

    agent.replay(BATCH_SIZE)

    if e % UPDATE_TARGET_FREQ == 0:
        agent.update_target_model()
        print(f"--- Target network updated at episode {e} ---")

    if e % 100 == 0:
        avg_reward = np.mean(episode_rewards[-100:])
        win_rate = np.mean(episode_wins[-100:]) # Also show win rate
        print(f"Episode: {e}/{EPISODES} | "
              f"Avg Reward (last 100): {avg_reward:.2f} | "
              f"Win Rate (last 100): {win_rate*100:.1f}% | "
              f"Epsilon: {agent.epsilon:.3f}")

training_time = time.time() - start_time
print(f"--- Training Finished in {training_time:.2f}s ---")
print("Agent is trained and ready for evaluation in the next cell.")

--- Loading and Preparing Data ---
Loading corpus from corpus.txt...
Loaded 49399 unique, valid words.
Grouped words into 24 length-based buckets.

--- Building HMM Oracle ---
Training HMMs (positional frequency models)...
HMM training complete.

--- Initializing Agent and Environment ---

--- Starting RL Agent Training (6500 episodes) ---
Episode: 100/6500 | Avg Reward (last 100): -99.60 | Win Rate (last 100): 0.0% | Epsilon: 0.928
Episode: 200/6500 | Avg Reward (last 100): -98.10 | Win Rate (last 100): 0.0% | Epsilon: 0.857
Episode: 300/6500 | Avg Reward (last 100): -92.15 | Win Rate (last 100): 2.0% | Epsilon: 0.791
Episode: 400/6500 | Avg Reward (last 100): -88.90 | Win Rate (last 100): 0.0% | Epsilon: 0.730
--- Target network updated at episode 500 ---
Episode: 500/6500 | Avg Reward (last 100): -90.50 | Win Rate (last 100): 1.0% | Epsilon: 0.674
Episode: 600/6500 | Avg Reward (last 100): -76.85 | Win Rate (last 100): 5.0% | Epsilon: 0.622
Episode: 700/6500 | Avg Reward (last 100):

In [None]:
# ---
# 6. Evaluation Loop
# ---

EVAL_GAMES = 2000
print(f"\n--- Starting Evaluation ({EVAL_GAMES} games) ---")

# Set agent to exploitation-only mode
agent.epsilon = 0.0

total_wins = 0
total_wrong_guesses = 0
total_repeated_guesses = 0

# Load the hidden test set
try:
    with open("test.txt", 'r') as f:
        test_content = f.read()
    test_words = re.split(r'\s+', test_content)
    test_words = [word.lower() for word in test_words if word.isalpha()]
    # Ensure we only use words from the test set
    env.all_words = test_words
    print(f"Loaded {len(test_words)} words from test.txt for evaluation.")
except FileNotFoundError:
    print("Warning: test.txt not found. Evaluating on the training corpus.")
    pass


for g in range(1, EVAL_GAMES + 1):
    state = env.reset()
    state = np.reshape(state, [1, STATE_SIZE])

    done = False
    game_wrong = 0
    game_repeated = 0

    while not done:
        # Agent selects best-known action (action masking is internal)
        action = agent.act(state)

        next_state, reward, done, info = env.step(action)
        state = np.reshape(next_state, [1, STATE_SIZE])

        game_wrong += info["wrong"]
        game_repeated += info["repeated"]

        if done:
            if info["won"]:
                total_wins += 1
            total_wrong_guesses += game_wrong
            total_repeated_guesses += game_repeated

    if g % 200 == 0:
        print(f"Played game {g}/{EVAL_GAMES}...")

# --- 7. Final Results ---
print("\n--- üèÅ Final Evaluation Results ---")

success_rate = total_wins / EVAL_GAMES
avg_wrong = total_wrong_guesses / EVAL_GAMES
avg_repeated = total_repeated_guesses / EVAL_GAMES

# Calculate final score based on the formula
final_score = (success_rate * 2000) - (total_wrong_guesses * 5) - (total_repeated_guesses * 2)

print(f"Total Games Played: {EVAL_GAMES}")
print("\n--- Averages ---")
print(f"Success Rate:         {success_rate * 100:.2f}%")
print(f"Avg. Wrong Guesses:   {avg_wrong:.3f}")
print(f"Avg. Repeated Guesses: {avg_repeated:.3f}")

print("\n--- Totals ---")
print(f"Total Wins:             {total_wins}")
print(f"Total Wrong Guesses:    {total_wrong_guesses}")
print(f"Total Repeated Guesses: {total_repeated_guesses}")

print("\n--- SCORE ---")
print(f"Success Points:  ( {success_rate:.3f} * 2000 )   = {success_rate * 2000:,.0f}")
print(f"Wrong Penalty:   ( {total_wrong_guesses} * 5 )      = -{total_wrong_guesses * 5:,.0f}")
print(f"Repeat Penalty:  ( {total_repeated_guesses} * 2 )      = -{total_repeated_guesses * 2:,.0f}")
print(f"**Final Score**:                            = **{final_score:,.0f}**")