In [1]:
import os
import json
import numpy as np
import random
from collections import Counter, defaultdict

START = "<S>"
END = "<E>"

In [2]:
# Load training corpus
with open("/content/corpus.txt", "r") as f:
    raw_words = f.read().splitlines()

words = [w.strip().upper() for w in raw_words if w.strip()]
words = [w for w in words if w.replace(" ", "").isalpha()]

print(f"Loaded {len(words)} training words.")

Loaded 50000 training words.


In [3]:
# Laplace normalization helper
def normalize_with_laplace(counter, alpha=1.0, vocab=None):
    if vocab is None:
        vocab = list(counter.keys())
    total = sum(counter.values()) + alpha * len(vocab)
    return {k: (counter.get(k, 0) + alpha) / total for k in vocab}

In [4]:
# Build tokenized words by length
tokenized_by_length = defaultdict(list)
for w in words:
    tok = [START] + list(w) + [END]
    tokenized_by_length[len(w)].append(tok)

letters = [chr(c) for c in range(65, 91)]

length_emission_probs = {}
length_transition_probs = {}

for L, toks in tokenized_by_length.items():
    emiss = defaultdict(Counter)
    trans = defaultdict(Counter)
    for word in toks:
        for i in range(1, len(word) - 1):
            pos = i
            prev = word[i - 1]
            curr = word[i]
            s = (pos, prev)
            ns = (pos + 1, curr)
            emiss[s][curr] += 1
            trans[s][ns] += 1
    length_emission_probs[L] = {s: normalize_with_laplace(c, 1.0, letters) for s, c in emiss.items()}
    length_transition_probs[L] = {s: normalize_with_laplace(c, 1.0, list(c.keys())) for s, c in trans.items()}

print(f"Built HMMs for {len(length_emission_probs)} different word lengths.")


Built HMMs for 24 different word lengths.


In [5]:
def get_letter_probs_combined(masked_word, words_list, length_emission_probs, length_transition_probs, alpha_pattern_weight=2.0):
    masked = masked_word.upper()
    L = len(masked)
    letters = list(masked)
    letter_scores = Counter()

    emission_probs = length_emission_probs.get(L, {})
    transition_probs = length_transition_probs.get(L, {})

    # HMM component
    for i, ch in enumerate(letters):
        if ch == "_":
            prev_letter = letters[i - 1] if i > 0 else START
            state = (i + 1, prev_letter)
            if state in emission_probs:
                for letter, prob in emission_probs[state].items():
                    letter_scores[letter] += prob

    # Pattern matching component
    candidates = []
    for w in words_list:
        if len(w) != L:
            continue
        ok = True
        for i, c in enumerate(masked):
            if c != "_" and w[i] != c:
                ok = False
                break
        if ok:
            candidates.append(w)

    if candidates:
        pattern_counts = Counter()
        for w in candidates:
            for i, c in enumerate(w):
                if masked[i] == "_":
                    pattern_counts[c] += 1
        total = sum(pattern_counts.values())
        for ch, cnt in pattern_counts.items():
            weight = alpha_pattern_weight * (1.0 + (1.0 / max(1, len(candidates))))
            letter_scores[ch] += (cnt / total) * weight

    if not letter_scores:
        letter_scores = {chr(c): 1.0 for c in range(65, 91)}

    total = sum(letter_scores.values())
    return {ch: letter_scores[ch] / total for ch in letter_scores}

In [11]:
def extract_state_features(masked_word, guessed, lives):
    """Extract features from current game state for RL"""
    features = []

    # Basic features
    word_length = len(masked_word)
    num_revealed = sum(1 for c in masked_word if c != "_")
    num_hidden = masked_word.count("_")
    num_guessed = len(guessed)
    lives_remaining = lives

    # Positional features
    revealed_positions = [i for i, c in enumerate(masked_word) if c != "_"]
    hidden_positions = [i for i, c in enumerate(masked_word) if c == "_"]

    # Pattern features
    consecutive_hidden = 0
    max_consecutive_hidden = 0
    for c in masked_word:
        if c == "_":
            consecutive_hidden += 1
            max_consecutive_hidden = max(max_consecutive_hidden, consecutive_hidden)
        else:
            consecutive_hidden = 0
        # Normalized features
    features = [
        word_length / 20.0,  # normalize by max word length
        num_revealed / word_length if word_length > 0 else 0,
        num_hidden / word_length if word_length > 0 else 0,
        num_guessed / 26.0,
        lives_remaining / 6.0,
        max_consecutive_hidden / word_length if word_length > 0 else 0,
    ]

    return np.array(features)


In [6]:
class RLHangmanAgent:
    """Q-Learning agent with function approximation"""

    def __init__(self, learning_rate=0.01, discount=0.95, epsilon=0.1):
        self.lr = learning_rate
        self.gamma = discount
        self.epsilon = epsilon

        # Q-value weights for each letter (linear function approximation)
        self.weights = {chr(c): np.random.randn(6) * 0.01 for c in range(65, 91)}

        # Experience replay buffer
        self.replay_buffer = []
        self.buffer_size = 10000

    def get_q_value(self, state_features, letter):
        """Compute Q(s, a) = w^T * phi(s)"""
        return np.dot(self.weights[letter], state_features)

    def get_action_values(self, masked_word, guessed, lives, base_probs):
        """Get Q-values combined with HMM probabilities"""
        state_features = extract_state_features(masked_word, guessed, lives)

        action_values = {}
        for letter in letters:
            if letter not in guessed:
                # Combine Q-value with base probability
                q_val = self.get_q_value(state_features, letter)
                base_prob = base_probs.get(letter, 0.001)
                # Weighted combination
                action_values[letter] = 0.7 * base_prob + 0.3 * (1.0 / (1.0 + np.exp(-q_val)))  # sigmoid

        return action_values

    def act(self, masked_word, guessed, lives):
        """Epsilon-greedy action selection"""
        base_probs = get_letter_probs_combined(
            masked_word, words, length_emission_probs, length_transition_probs
        )

        # Remove already guessed letters
        for g in guessed:
            base_probs[g] = 0.0

        if sum(base_probs.values()) == 0:
            base_probs = {ch: 1.0 for ch in letters if ch not in guessed}

        # Epsilon-greedy exploration
        if random.random() < self.epsilon:
            available = [ch for ch in letters if ch not in guessed]
            return random.choice(available) if available else 'A'

        # Get action values
        action_values = self.get_action_values(masked_word, guessed, lives, base_probs)

        if not action_values:
            available = [ch for ch in letters if ch not in guessed]
            return random.choice(available) if available else 'A'

        return max(action_values, key=action_values.get)

    def update(self, state, action, reward, next_state, done):
        """Q-learning update with function approximation"""
        state_features = extract_state_features(state['masked'], state['guessed'], state['lives'])

        # Current Q-value
        current_q = self.get_q_value(state_features, action)

        # Target Q-value
        if done:
            target_q = reward
        else:
            # Get best next action value
            base_probs = get_letter_probs_combined(
                next_state['masked'], words, length_emission_probs, length_transition_probs
            )
            next_action_values = self.get_action_values(
                next_state['masked'], next_state['guessed'], next_state['lives'], base_probs
            )

            if next_action_values:
                max_next_q = max(self.get_q_value(
                    extract_state_features(next_state['masked'], next_state['guessed'], next_state['lives']),
                    a
                ) for a in next_action_values.keys())
            else:
                max_next_q = 0

            target_q = reward + self.gamma * max_next_q

        # Gradient update
        td_error = target_q - current_q
        self.weights[action] += self.lr * td_error * state_features

    def store_experience(self, experience):
        """Store experience in replay buffer"""
        self.replay_buffer.append(experience)
        if len(self.replay_buffer) > self.buffer_size:
            self.replay_buffer.pop(0)

    def replay_experiences(self, batch_size=32):
        """Train on random batch from replay buffer"""
        if len(self.replay_buffer) < batch_size:
            return

        batch = random.sample(self.replay_buffer, batch_size)
        for exp in batch:
            self.update(exp['state'], exp['action'], exp['reward'],
                       exp['next_state'], exp['done'])

In [7]:
class HangmanEnv:
    def __init__(self, word_list, emission_probs, transition_probs, max_lives=6):
        self.word_list = word_list
        self.emission_probs = emission_probs
        self.transition_probs = transition_probs
        self.max_lives = max_lives
        self.reset()

    def reset(self):
        self.word = random.choice(self.word_list)
        self.masked = ["_" for _ in self.word]
        self.guessed = set()
        self.lives = self.max_lives
        return self.get_state()

    def get_state(self):
        return {
            'masked': "".join(self.masked),
            'guessed': self.guessed.copy(),
            'lives': self.lives
        }

    def step(self, letter):
        letter = letter.upper()
        self.guessed.add(letter)

        if letter in self.word:
            for i, ch in enumerate(self.word):
                if ch == letter:
                    self.masked[i] = letter
            # Reward proportional to letters revealed
            num_revealed = self.word.count(letter)
            reward = num_revealed * 2  # Positive reward for correct guess
        else:
            self.lives -= 1
            reward = -5  # Penalty for wrong guess

        done = (self.lives == 0) or ("_" not in self.masked)

        # Bonus for winning
        if done and "_" not in self.masked:
            reward += 20
        # Penalty for losing
        elif done and self.lives == 0:
            reward -= 10

        return self.get_state(), reward, done

In [8]:
def train_rl_agent(agent, train_words, num_episodes=5000, batch_size=32):
    """Train the RL agent"""
    env = HangmanEnv(train_words, length_emission_probs, length_transition_probs)

    episode_rewards = []
    win_rates = []

    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False

        # Decay epsilon
        agent.epsilon = max(0.01, 0.1 * (0.995 ** episode))

        while not done:
            action = agent.act(state['masked'], state['guessed'], state['lives'])
            next_state, reward, done = env.step(action)

            # Store experience
            agent.store_experience({
                'state': state,
                'action': action,
                'reward': reward,
                'next_state': next_state,
                'done': done
            })

            # Update Q-values
            agent.update(state, action, reward, next_state, done)

            state = next_state
            total_reward += reward

        # Replay experiences
        if episode % 10 == 0:
            agent.replay_experiences(batch_size)

        episode_rewards.append(total_reward)

        # Track progress
        if (episode + 1) % 500 == 0:
            recent_rewards = np.mean(episode_rewards[-100:])
            print(f"Episode {episode + 1}/{num_episodes} | Avg Reward: {recent_rewards:.2f} | Epsilon: {agent.epsilon:.3f}")

    return agent


In [13]:
def evaluate_on_test_set(agent, test_words, emission_probs, transition_probs, num_games=None, use_rl=True):
    """Evaluate agent on test set"""
    if num_games is None:
        num_games = len(test_words)
    else:
        num_games = min(num_games, len(test_words))

    # Disable exploration during evaluation
    original_epsilon = agent.epsilon if use_rl else 0
    if use_rl:
        agent.epsilon = 0.0

    env = HangmanEnv(test_words[:num_games], emission_probs, transition_probs, max_lives=6)
    total_wins, total_wrong, total_repeat = 0, 0, 0
    failed_words = []

    for i in range(num_games):
        state = env.reset()
        done = False
        wrong_this_game = 0
        repeat_this_game = 0

        while not done:
            action = agent.act(state['masked'], state['guessed'], state['lives'])

            if action in state['guessed']:
                repeat_this_game += 1

            lives_before = state['lives']
            state, _, done = env.step(action)

            if state['lives'] < lives_before:
                wrong_this_game += 1

        won = "_" not in state['masked']
        if won:
            total_wins += 1
        else:
            failed_words.append(env.word)
        total_wrong += wrong_this_game
        total_repeat += repeat_this_game

    # Restore original epsilon
    if use_rl:
        agent.epsilon = original_epsilon

    success_rate = total_wins / num_games
    final_score = (success_rate * num_games) - (total_wrong * 5) - (total_repeat * 2)

    print(f"\n--- TEST SET RESULTS ---")
    print(f"Games: {num_games}")
    print(f"Wins: {total_wins}, Losses: {num_games - total_wins}")
    print(f"Success rate: {success_rate:.2%}")
    print(f"Wrong guesses: {total_wrong}, Repeats: {total_repeat}")
    print(f"Final score: {final_score:.2f}")

    if failed_words:
        print(f"Some failed words: {failed_words[:10]}")

    return success_rate, total_wrong, total_repeat, final_score


# ============ TRAINING AND EVALUATION ============

print("\n" + "="*50)
print("TRAINING RL AGENT")
print("="*50)

# Create and train RL agent
rl_agent = RLHangmanAgent(learning_rate=0.01, discount=0.95, epsilon=0.1)
rl_agent = train_rl_agent(rl_agent, words, num_episodes=5000, batch_size=32)

# Load test words
with open("/content/test.txt", "r") as f:
    test_raw = f.read().splitlines()

test_words = [w.strip().upper() for w in test_raw if w.strip()]
test_words = [w for w in test_words if w.replace(" ", "").isalpha()]
print(f"\nLoaded {len(test_words)} test words.")

# Evaluate
print("\n" + "="*50)
print("EVALUATING ON TEST SET")
print("="*50)

test_result = evaluate_on_test_set(rl_agent, test_words, length_emission_probs,
                                   length_transition_probs, num_games=2000)

print("="*50)
print(f"RL Agent Test: {test_result[0]*100:.2f}% success | Score: {test_result[3]:.2f}")


TRAINING RL AGENT
Episode 500/5000 | Avg Reward: 25.41 | Epsilon: 0.010
Episode 1000/5000 | Avg Reward: 27.66 | Epsilon: 0.010
Episode 1500/5000 | Avg Reward: 28.25 | Epsilon: 0.010
Episode 2000/5000 | Avg Reward: 21.42 | Epsilon: 0.010
Episode 2500/5000 | Avg Reward: 23.70 | Epsilon: 0.010
Episode 3000/5000 | Avg Reward: 24.54 | Epsilon: 0.010
Episode 3500/5000 | Avg Reward: 28.72 | Epsilon: 0.010
Episode 4000/5000 | Avg Reward: 25.11 | Epsilon: 0.010
Episode 4500/5000 | Avg Reward: 23.90 | Epsilon: 0.010
Episode 5000/5000 | Avg Reward: 25.54 | Epsilon: 0.010

Loaded 2000 test words.

EVALUATING ON TEST SET

--- TEST SET RESULTS ---
Games: 2000
Wins: 643, Losses: 1357
Success rate: 32.15%
Wrong guesses: 10390, Repeats: 0
Final score: -51307.00
Some failed words: ['MEJORANA', 'CREEPMOUSY', 'EXAGGERATED', 'SESTI', 'FRECKLISH', 'IMMANACLE', 'UNFORWARDED', 'UNWARPABLE', 'SCHIZOIDISM', 'INCUDAL']
RL Agent Test: 32.15% success | Score: -51307.00
