In [None]:
import numpy as np
import collections
import os # For checking the file path

# Define the alphabet and a mapping for easy indexing
ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
CHAR_TO_INDEX = {char: i for i, char in enumerate(ALPHABET)}
INDEX_TO_CHAR = {i: char for i, char in enumerate(ALPHABET)}

print("Libraries imported and alphabet defined.")

Libraries imported and alphabet defined.


In [None]:
import os
import collections

# Define the path to your local corpus file
# Assumes 'corpus.txt' is in the same folder as your notebook
LOCAL_CORPUS_PATH = "corpus.txt"

def load_corpus(filename=LOCAL_CORPUS_PATH):
    """
    Reads the corpus file from the specified local path
    and groups words by length.
    """
    words_by_length = collections.defaultdict(list)
    
    # Check if the file exists before trying to open it
    if not os.path.exists(filename):
        print(f"--- ERROR ---")
        print(f"File not found at: {filename}")
        print("Please make sure 'corpus.txt' is in the same folder as your notebook.")
        
        # Helper to debug: list contents of the current directory
        print("\nListing contents of current directory ('.') ...")
        try:
            print(os.listdir('.'))
        except Exception as e:
            print(f"An error occurred while listing the current directory: {e}")
        return None

    # File exists, proceed to load
    try:
        with open(filename, 'r') as f:
            for word in f:
                cleaned_word = word.strip().upper()
                if cleaned_word: # Ensure it's not an empty line
                    words_by_length[len(cleaned_word)].append(cleaned_word)
        
        print(f"Corpus loaded successfully from {filename}.")
        print("\nWord counts by length:")
        for length in sorted(words_by_length.keys()):
            print(f"  Length {length}: {len(words_by_length[length])} words")
        return words_by_length
        
    except Exception as e:
        print(f"An error occurred while reading the file: {e}")
        return None

# Load the corpus
words_by_length = load_corpus()

Corpus loaded successfully from corpus.txt.

Word counts by length:
  Length 1: 46 words
  Length 2: 84 words
  Length 3: 388 words
  Length 4: 1169 words
  Length 5: 2340 words
  Length 6: 3755 words
  Length 7: 5111 words
  Length 8: 6348 words
  Length 9: 6808 words
  Length 10: 6465 words
  Length 11: 5452 words
  Length 12: 4292 words
  Length 13: 3094 words
  Length 14: 2019 words
  Length 15: 1226 words
  Length 16: 698 words
  Length 17: 375 words
  Length 18: 174 words
  Length 19: 88 words
  Length 20: 40 words
  Length 21: 16 words
  Length 22: 8 words
  Length 23: 3 words
  Length 24: 1 words


In [None]:
def train_hmm_models(words_by_length):
    """
    Trains positional Unigram, Bigram, and Trigram models for each word length.
    """
    hmm_models = {}
    
    for length, words in words_by_length.items():
        if length == 0:
            continue # Skip empty strings if any
            
        model = {}
        
        # --- 1. Unigram Model (Always possible for length >= 1) ---
        unigram_counts = np.ones((length, 26))
        for word in words:
            try:
                for pos, char in enumerate(word):
                    unigram_counts[pos, CHAR_TO_INDEX[char]] += 1
            except KeyError:
                continue
        model['unigram'] = unigram_counts / unigram_counts.sum(axis=1, keepdims=True)

        # --- 2. Bigram Model (Only possible for length >= 2) ---
        if length >= 2:
            bigram_counts = np.ones((length - 1, 26, 26))
            for word in words:
                try:
                    for pos in range(length - 1):
                        prev_char_idx = CHAR_TO_INDEX[word[pos]]
                        curr_char_idx = CHAR_TO_INDEX[word[pos + 1]]
                        bigram_counts[pos, prev_char_idx, curr_char_idx] += 1
                except KeyError:
                    continue
            model['bigram'] = bigram_counts / bigram_counts.sum(axis=2, keepdims=True)

        # --- 3. Trigram Model (Only possible for length >= 3) ---
        if length >= 3:
            trigram_counts = np.ones((length - 2, 26, 26, 26))
            for word in words:
                try:
                    for pos in range(length - 2):
                        prev_prev_char_idx = CHAR_TO_INDEX[word[pos]]
                        prev_char_idx = CHAR_TO_INDEX[word[pos + 1]]
                        curr_char_idx = CHAR_TO_INDEX[word[pos + 2]]
                        trigram_counts[pos, prev_prev_char_idx, prev_char_idx, curr_char_idx] += 1
                except KeyError:
                    continue
            model['trigram'] = trigram_counts / trigram_counts.sum(axis=3, keepdims=True)
        
        # Store the trained models
        hmm_models[length] = model
        
    print(f"Training complete. {len(hmm_models)} (U, B, T) models trained.")
    return hmm_models

# --- Re-train the models ---
hmm_models = train_hmm_models(words_by_length)

# --- Example: Check 'TH' -> 'E' transition for 5-letter words ---
if 5 in hmm_models:
    t_idx = CHAR_TO_INDEX['T']
    h_idx = CHAR_TO_INDEX['H']
    e_idx = CHAR_TO_INDEX['E']
    
    # Get P(Letter_2 | Letter_0='T', Letter_1='H')
    if 'trigram' in hmm_models[5]:
        prob_th_e = hmm_models[5]['trigram'][0, t_idx, h_idx, e_idx]
        print("\n--- Trigram Model Sanity Check (5-letter words) ---")
        print(f"  P(Letter at pos 2 = 'E' | ...pos 0 = 'T', pos 1 = 'H') = {prob_th_e:.4f}")
    else:
        print("Trigram model for length 5 not available (this shouldn't happen).")

Training complete. 24 (U, B, T) models trained.

--- Trigram Model Sanity Check (5-letter words) ---
  P(Letter at pos 2 = 'E' | ...pos 0 = 'T', pos 1 = 'H') = 0.0750


In [None]:
class HMMOracle:
    def __init__(self, models):
        self.models = models
        print("HMMOracle (Trigram w/ Backoff) initialized.")

    def _get_prob_with_backoff(self, model, pos, pattern, char_index):
        """
        Gets the probability of a character at a specific position,
        using the best available n-gram model (Trigram -> Bigram -> Unigram).
        """
        
        # --- Try Trigram ---
        # P(char | char_at_pos-2, char_at_pos-1)
        if pos >= 2 and pattern[pos-2] != '_' and pattern[pos-1] != '_':
            prev_prev_idx = CHAR_TO_INDEX[pattern[pos-2]]
            prev_idx = CHAR_TO_INDEX[pattern[pos-1]]
            # trigram_probs[pos-2, char_{i-2}, char_{i-1}, char_i]
            return model['trigram'][pos-2, prev_prev_idx, prev_idx, char_index]
            
        # --- Try Bigram ---
        # P(char | char_at_pos-1)
        if pos >= 1 and pattern[pos-1] != '_':
            prev_idx = CHAR_TO_INDEX[pattern[pos-1]]
            # bigram_probs[pos-1, char_{i-1}, char_i]
            return model['bigram'][pos-1, prev_idx, char_index]
            
        # --- Fallback to Unigram ---
        # P(char_i)
        return model['unigram'][pos, char_index]
        

    def get_letter_probabilities(self, pattern, guessed_letters):
        """
        Estimates the probability of each remaining letter appearing
        in one of the blank spots, using trigram backoff logic.
        """
        word_length = len(pattern)
        
        if word_length not in self.models:
            # Fallback for models we couldn't train (e.g., length 1)
            unguessed = [c for c in ALPHABET if c not in guessed_letters]
            if not unguessed: return {}
            prob = 1.0 / len(unguessed)
            return {char: prob for char in unguessed}
            
        model = self.models[word_length]
        
        blank_indices = [i for i, char in enumerate(pattern) if char == '_']
        unguessed_chars = [c for c in ALPHABET if c not in guessed_letters]
        
        if not blank_indices or not unguessed_chars:
            return {}
            
        final_scores = {char: 0.0 for char in unguessed_chars}
        
        # 3. Iterate over each blank and score each candidate letter
        for blank_pos in blank_indices:
            for char in unguessed_chars:
                char_index = CHAR_TO_INDEX[char]
                
                # --- P(Letter | Left_Neighbors) ---
                # Get the probability of this char given its left context
                left_prob = self._get_prob_with_backoff(model, blank_pos, pattern, char_index)
                
                # --- P(Right_Neighbors | Letter) ---
                # Now, find how well this char "predicts" its right-side context
                right_prob = 1.0
                
                # Check for known letter at pos+1
                if blank_pos + 1 < word_length and pattern[blank_pos+1] != '_':
                    next_idx = CHAR_TO_INDEX[pattern[blank_pos+1]]
                    # P(char_at_pos+1 | char_at_pos)
                    right_prob *= model['bigram'][blank_pos, char_index, next_idx]
                
                # Check for known letter at pos+2
                if blank_pos + 2 < word_length and pattern[blank_pos+1] != '_' and pattern[blank_pos+2] != '_':
                    next_idx = CHAR_TO_INDEX[pattern[blank_pos+1]]
                    next_next_idx = CHAR_TO_INDEX[pattern[blank_pos+2]]
                    # P(char_at_pos+2 | char_at_pos, char_at_pos+1)
                    right_prob *= model['trigram'][blank_pos, char_index, next_idx, next_next_idx]
                
                # This score represents how well this char "fits" in this blank
                prob_at_this_pos = left_prob * right_prob
                final_scores[char] += prob_at_this_pos

        # 4. Normalize scores to create a probability distribution
        total_score = sum(final_scores.values())
        
        if total_score == 0.0:
            # Fallback: No letter fits, return uniform prob
            prob = 1.0 / len(unguessed_chars)
            return {char: prob for char in unguessed_chars}
            
        normalized_probs = {
            char: score / total_score
            for char, score in final_scores.items()
        }
        
        return normalized_probs

# --- Initialize the new oracle ---
if 'hmm_models' in locals():
    oracle = HMMOracle(hmm_models)
else:
    print("Error: 'hmm_models' not initialized. Please re-run Cell 3.")

HMMOracle (Trigram w/ Backoff) initialized.


In [None]:
# --- Example 1: A common 5-letter word start ---
pattern1 = "S____"
guessed1 = {'S'}
probs1 = oracle.get_letter_probabilities(pattern1, guessed1)

print(f"--- Probs for '{pattern1}' (guessed: {guessed1}) ---")
# Sort by probability, descending
for char, prob in sorted(probs1.items(), key=lambda item: item[1], reverse=True)[:10]:
    print(f"  P({char}): {prob:.4f}")


# --- Example 2: The 'APPLE' example from the prompt ---
pattern2 = "_PPLE"
guessed2 = {'P', 'L', 'E'}
probs2 = oracle.get_letter_probabilities(pattern2, guessed2)

print(f"\n--- Probs for '{pattern2}' (guessed: {guessed2}) ---")
for char, prob in sorted(probs2.items(), key=lambda item: item[1], reverse=True)[:5]:
    print(f"  P({char}): {prob:.4f}")

    
# --- Example 3: More complex pattern ---
pattern3 = "__A_I_G"
guessed3 = {'A', 'I', 'G'}
probs3 = oracle.get_letter_probabilities(pattern3, guessed3)

print(f"\n--- Probs for '{pattern3}' (guessed: {guessed3}) ---")
for char, prob in sorted(probs3.items(), key=lambda item: item[1], reverse=True)[:10]:
    print(f"  P({char}): {prob:.4f}")

--- Probs for 'S____' (guessed: {'S'}) ---
  P(E): 0.1110
  P(A): 0.1020
  P(T): 0.0788
  P(I): 0.0689
  P(O): 0.0680
  P(N): 0.0642
  P(L): 0.0616
  P(R): 0.0597
  P(H): 0.0460
  P(C): 0.0452

--- Probs for '_PPLE' (guessed: {'E', 'P', 'L'}) ---
  P(S): 0.3186
  P(U): 0.1231
  P(A): 0.1220
  P(M): 0.0451
  P(H): 0.0417

--- Probs for '__A_I_G' (guessed: {'A', 'G', 'I'}) ---
  P(N): 0.1331
  P(S): 0.0988
  P(C): 0.0816
  P(P): 0.0694
  P(R): 0.0683
  P(T): 0.0681
  P(B): 0.0622
  P(L): 0.0589
  P(M): 0.0515
  P(D): 0.0482


In [None]:
import os
import random
import collections # Added this import, as it was missing from your first snippet

# --- Assume 'oracle' is defined elsewhere ---
# For this code to run, you must have your HMMOracle class defined 
# and an instance created, e.g.:
# oracle = HMMOracle(words_by_length) 
#
# --- Assume 'words_by_length' is loaded from your first script ---
# Example:
# words_by_length = load_corpus() # From your previous code block

# Define the path to your local test set file
# Assumes 'test.txt' is in the same folder as your notebook
LOCAL_TEST_SET_PATH = "test.txt"

def load_test_set(filename=LOCAL_TEST_SET_PATH):
    """
    Loads the test set words from the specified local path.
    """
    # Check if the file exists
    if not os.path.exists(filename):
        print(f"--- ERROR ---")
        print(f"Test set file not found at: {filename}")
        print("Please make sure 'test.txt' is in the same folder as your notebook.")
        
        # Helper to debug: list contents of the current directory
        print("\nListing contents of current directory ('.') ...")
        try:
            print(os.listdir('.'))
        except Exception as e:
            print(f"An error occurred while listing the current directory: {e}")
        return None

    # File exists, proceed to load
    try:
        with open(filename, 'r') as f:
            # Read words, strip whitespace, convert to upper, and ignore empty lines
            test_words = [word.strip().upper() for word in f if word.strip()]
        
        print(f"Test set loaded successfully from {filename}.")
        print(f"Found {len(test_words)} test words.")
        return test_words
        
    except Exception as e:
        print(f"An error occurred while reading the test file: {e}")
        return None

def play_game(secret_word, oracle, max_lives=6):
    """
    Simulates a single game of Hangman using a greedy agent
    that trusts the HMMOracle.
    """
    word_length = len(secret_word)
    pattern = "_" * word_length
    guessed_letters = set()
    
    wrong_guesses = 0
    repeated_guesses = 0 # Will be 0 with our current oracle
    lives = max_lives
    
    while lives > 0 and "_" in pattern:
        # 1. Get probabilities from the HMM oracle
        probs = oracle.get_letter_probabilities(pattern, guessed_letters)
        
        # 2. Check for failure case (e.g., oracle returns nothing)
        if not probs:
            break # Game is lost
            
        # 3. Greedy Agent: Pick the best letter
        guess = max(probs, key=probs.get)
        
        # Note: Our HMMOracle already filters for guessed_letters,
        # so repeated_guesses will be 0. The RL agent, however,
        # might make this mistake, which is why it's in the formula.
        
        # 4. Update game state
        guessed_letters.add(guess)
        
        if guess in secret_word:
            # Correct guess: update pattern
            new_pattern = list(pattern)
            for i in range(word_length):
                if secret_word[i] == guess:
                    new_pattern[i] = guess
            pattern = "".join(new_pattern)
        else:
            # Wrong guess
            wrong_guesses += 1
            lives -= 1
            
    # 5. Return game results
    game_won = "_" not in pattern
    return game_won, wrong_guesses, repeated_guesses

# --- Main Evaluation Loop ---

# Load the test words
test_words = load_test_set()

# Check if both the test set AND the oracle (from your other script) are loaded
if test_words and 'oracle' in locals() and oracle:
    NUM_GAMES = 2000 # As specified in the PDF 
    MAX_LIVES = 6      # As specified in the PDF 
    
    total_games_won = 0
    total_wrong_guesses = 0
    total_repeated_guesses = 0
    
    print(f"\n--- Starting Evaluation: Playing {NUM_GAMES} games ---")
    
    # Ensure we have words to play with
    if not test_words:
        print("Cannot run evaluation: test_words list is empty.")
    else:
        test_set_size = len(test_words)
        
        games_played = 0 # Track actual games played if some are skipped
        
        for i in range(NUM_GAMES):
            # Pick a word.
            # We use random.choice to get a good sample
            secret_word = random.choice(test_words)
            
            # Ensure the oracle has a model for this word length
            if len(secret_word) not in oracle.models:
                # print(f"Skipping word '{secret_word}': No model for length {len(secret_word)}")
                continue # Skip this game

            won, wrongs, repeats = play_game(secret_word, oracle, MAX_LIVES)
            
            games_played += 1 # Only increment if a game was actually played
            
            if won:
                total_games_won += 1
            total_wrong_guesses += wrongs
            total_repeated_guesses += repeats
            
            if (i + 1) % 200 == 0:
                print(f"  ... processed {i + 1} / {NUM_GAMES} iterations")

        print("--- Evaluation Complete ---")

        if games_played == 0:
            print("\n--- ‚ö†Ô∏è No games were played! ---")
            print("This usually means the test set words have lengths")
            print("that were not present in your training corpus (words_by_length).")
            print("Please check your corpus.txt and test.txt files.")
        else:
            # 1. Calculate Metrics
            success_rate = total_games_won / games_played
            avg_wrong_guesses = total_wrong_guesses / games_played
            avg_repeated_guesses = total_repeated_guesses / games_played
            
            # 2. Calculate Final Score using the formula 
            # Final Score = (Success Rate * 2000) - (Total Wrong Guesses * 5) - (Total Repeated Guesses * 2)
            
            # We scale the score to 2000 games, even if some were skipped
            # This assumes skipped games would have performed at the average rate
            
            # Calculate score based on games played
            score_from_wins = (total_games_won / games_played) * 2000 
            total_wrong_for_2000 = (total_wrong_guesses / games_played) * 2000
            total_repeats_for_2000 = (total_repeated_guesses / games_played) * 2000
            
            penalty_from_wrongs = total_wrong_for_2000 * 5
            penalty_from_repeats = total_repeats_for_2000 * 2
            
            final_score = score_from_wins - penalty_from_wrongs - penalty_from_repeats
            
            # 3. Print Results
            print("\n--- üìä Final Results ---")
            print(f"**Final Score (scaled to 2000 games):** {final_score:.2f}")
            print("------------------------------")
            print(f"Total Games Played:     {games_played}")
            print(f"Total Games Won:        {total_games_won} ({success_rate * 100:.2f}%)")
            print("------------------------------")
            print(f"Total Wrong Guesses:    {total_wrong_guesses} (Avg: {avg_wrong_guesses:.2f} per game)")
            print(f"Total Repeated Guesses: {total_repeated_guesses} (Avg: {avg_repeated_guesses:.2f} per game)")
            print("------------------------------")
            print(f"Scaled Score from Wins:   + {score_from_wins:.2f}")
            print(f"Scaled Penalty (Wrongs):  - {penalty_from_wrongs:.2f}")
            print(f"Scaled Penalty (Repeats): - {penalty_from_repeats:.2f}")
            
else:
    print("\nEvaluation not run. Check the following:")
    if not test_words:
        print(" - 'test_words' could not be loaded.")
    if 'oracle' not in locals() or not oracle:
        print(" - 'oracle' is not initialized. Make sure you have run the HMMOracle class definition and created an instance.")

Test set loaded successfully from test.txt.
Found 2000 test words.

--- Starting Evaluation: Playing 2000 games ---
  ... processed 200 / 2000 iterations
  ... processed 400 / 2000 iterations
  ... processed 600 / 2000 iterations
  ... processed 800 / 2000 iterations
  ... processed 1000 / 2000 iterations
  ... processed 1200 / 2000 iterations
  ... processed 1400 / 2000 iterations
  ... processed 1600 / 2000 iterations
  ... processed 1800 / 2000 iterations
  ... processed 2000 / 2000 iterations
--- Evaluation Complete ---

--- üìä Final Results ---
**Final Score (scaled to 2000 games):** -48934.00
------------------------------
Total Games Played:     2000
Total Games Won:        781 (39.05%)
------------------------------
Total Wrong Guesses:    9943 (Avg: 4.97 per game)
Total Repeated Guesses: 0 (Avg: 0.00 per game)
------------------------------
Scaled Score from Wins:   + 781.00
Scaled Penalty (Wrongs):  - 49715.00
Scaled Penalty (Repeats): - 0.00


In [None]:
import os

# --- Load Corpus Words into a Set ---
LOCAL_CORPUS_PATH = "corpus.txt" # Changed path
corpus_words = set()
try:
    with open(LOCAL_CORPUS_PATH, 'r') as f:
        for word in f:
            corpus_words.add(word.strip().upper())
    print(f"Loaded {len(corpus_words)} unique words from {LOCAL_CORPUS_PATH}")
except FileNotFoundError:
    print(f"Error: '{LOCAL_CORPUS_PATH}' not found.")
    print("Please make sure it's in the same folder as your notebook.")
except Exception as e:
    print(f"Error loading corpus: {e}")

# --- Load Test Words into a Set ---
LOCAL_TEST_SET_PATH = "test.txt" # Changed path
test_words = set()
try:
    with open(LOCAL_TEST_SET_PATH, 'r') as f:
        for word in f:
            test_words.add(word.strip().upper())
    print(f"Loaded {len(test_words)} unique words from {LOCAL_TEST_SET_PATH}")
except FileNotFoundError:
    print(f"Error: '{LOCAL_TEST_SET_PATH}' not found.")
    print("Please make sure it's in the same folder as your notebook.")
except Exception as e:
    print(f"Error loading test set: {e}")

# --- Calculate and Print Overlap ---
if test_words and corpus_words:
    overlap = test_words.intersection(corpus_words)
    overlap_percentage = (len(overlap) / len(test_words)) * 100
    
    print("\n--- üìä Diagnostic Results ---")
    print(f"Total Test Words:     {len(test_words)}")
    print(f"Total Corpus Words:   {len(corpus_words)}")
    print(f"Words in BOTH files:  {len(overlap)}")
    print(f"Overlap Percentage:   {overlap_percentage:.2f}%")
    
    if overlap_percentage == 0.0:
        print("\n**CRITICAL FINDING:** Your test set has 0% overlap with your training corpus.")
        print("This means the corpus filtering logic will *never* be used during evaluation.")
        print("The score you are seeing is purely from your bigram fallback model.")
    elif overlap_percentage < 100.0:
        print(f"\n**FINDING:** Only {overlap_percentage:.2f}% of your test words are in the corpus.")
        print("This means your filter is only working for a fraction of the games.")
    else:
        print("\n**FINDING:** 100% of test words are in the corpus.")
        print("This is good! It means the problem is likely a logic bug in your oracle.")
else:
    print("\nCould not calculate overlap. One or both files failed to load.")

Loaded 49398 unique words from corpus.txt
Loaded 2000 unique words from test.txt

--- üìä Diagnostic Results ---
Total Test Words:     2000
Total Corpus Words:   49398
Words in BOTH files:  0
Overlap Percentage:   0.00%

**CRITICAL FINDING:** Your test set has 0% overlap with your training corpus.
This means the corpus filtering logic will *never* be used during evaluation.
The score you are seeing is purely from your bigram fallback model.


# RL DQN Agent Implementation

_____________________________________________________________________________________________________

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import numpy as np
import collections

# Check if a GPU is available for training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
# Make sure 'test_words' is loaded from Cell 6
if 'test_words' not in locals() or test_words is None:
    print("CRITICAL ERROR: 'test_words' is not defined.")
    print("Please re-run Cell 6 to load the test set before this cell.")
else:
    print(f"HangmanEnv will use the {len(test_words)} words from test_set.txt.")

class HangmanEnv:
    def __init__(self, word_list, max_lives=6):
        self.word_list = [word for word in word_list if word] # Ensure no empty strings
        self.max_lives = max_lives
        self.secret_word = ""
        self.pattern = ""
        self.guessed_letters = set()
        self.lives_left = 0
        
    def reset(self):
        """Starts a new game and returns the initial state."""
        self.secret_word = random.choice(self.word_list).upper()
        self.pattern = "_" * len(self.secret_word)
        self.guessed_letters = set()
        self.lives_left = self.max_lives
        
        return (self.pattern, self.guessed_letters, self.lives_left)

    def step(self, action_char):
        """
        Takes an action (a letter) and returns:
        (next_pattern, next_guessed, next_lives), reward, done
        """
        
        # --- 1. Check for invalid game state ---
        if self.lives_left == 0 or "_" not in self.pattern:
            return (self.pattern, self.guessed_letters, self.lives_left), 0, True # Game is already over

        # --- 2. Define Rewards ---
        REWARD_WIN = 200
        REWARD_LOSE = -200
        REWARD_CORRECT = 10
        REWARD_WRONG = -25
        REWARD_REPEAT = -5 ## this was orginally -5, but that makes no sense
        
        # --- 3. Process Action ---
        done = False
        reward = 0
        
        if action_char in self.guessed_letters:
            # --- 3a. Repeated Guess ---
            reward = REWARD_REPEAT
        
        else:
            self.guessed_letters.add(action_char)
            
            if action_char in self.secret_word:
                # --- 3b. Correct Guess ---
                new_pattern = list(self.pattern)
                for i, char in enumerate(self.secret_word):
                    if char == action_char:
                        new_pattern[i] = action_char
                self.pattern = "".join(new_pattern)
                
                if "_" not in self.pattern:
                    # Game Won!
                    reward = REWARD_WIN
                    done = True
                else:
                    # Correct, but not won yet
                    reward = REWARD_CORRECT
            
            else:
                # --- 3c. Wrong Guess ---
                self.lives_left -= 1
                if self.lives_left == 0:
                    # Game Lost!
                    reward = REWARD_LOSE
                    done = True
                else:
                    # Wrong, but not lost yet
                    reward = REWARD_WRONG

        return (self.pattern, self.guessed_letters, self.lives_left), reward, done

HangmanEnv will use the 2000 words from test_set.txt.


In [None]:
# --- Hyperparameters ---
MAX_WORD_LEN = 30 # (Still needed by env, but not by agent)
CHAR_TO_INT = {char: i+1 for i, char in enumerate(ALPHABET)}
CHAR_TO_INT['_'] = 0

# --- Replay Buffer ---
Experience = collections.namedtuple('Experience', 
                                    ('state', 'action', 'reward', 'next_state', 'done', 'valid_actions'))

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)
    
    def push(self, *args):
        self.buffer.append(Experience(*args))
        
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
        
    def __len__(self):
        return len(self.buffer)

# --- State Processor ---
def get_state_tensor(pattern, guessed_letters, lives, oracle):
    """
    Converts the raw game state into a dictionary of tensors for the network.
    This version is SIMPLIFIED and does NOT include the pattern embedding.
    """
    # 1. Guessed Vector
    guessed_vec = [1.0 if char in guessed_letters else 0.0 for char in ALPHABET]
    guessed_tensor = torch.tensor(guessed_vec, dtype=torch.float, device=device).unsqueeze(0) # Shape: [1, 26]
    
    # 2. Lives
    lives_tensor = torch.tensor([[lives / 6.0]], dtype=torch.float, device=device) # Shape: [1, 1]
    
    # 3. HMM Probabilities
    hmm_probs = oracle.get_letter_probabilities(pattern, guessed_letters)
    hmm_vec = [hmm_probs.get(char, 0.0) for char in ALPHABET]
    hmm_tensor = torch.tensor(hmm_vec, dtype=torch.float, device=device).unsqueeze(0) # Shape: [1, 26]

    return {
        "guessed": guessed_tensor,
        "lives": lives_tensor,
        "hmm": hmm_tensor
    }

def get_valid_actions_mask(guessed_letters):
    """
    Returns a 26-element boolean tensor.
    True if the action is VALID (not guessed), False if INVALID (guessed).
    """
    mask = [True if char not in guessed_letters else False for char in ALPHABET]
    return torch.tensor(mask, dtype=torch.bool, device=device)

In [None]:
class DQN(nn.Module):
    def __init__(self, vocab_size=27, embedding_dim=16, max_len=MAX_WORD_LEN):
        super(DQN, self).__init__()
        
        # --- Total feature size ---
        # guessed_vec (26) + lives (1) + hmm_vec (26)
        total_feature_size = 26 + 1 + 26
        
        # --- Fully Connected Layers ---
        self.fc1 = nn.Linear(total_feature_size, 128)
        self.fc2 = nn.Linear(128, 128)
        # Output layer: 26 Q-values, one for each letter
        self.fc3 = nn.Linear(128, 26) 

    def forward(self, state):
        
        # 1. Concatenate all features
        x = torch.cat([
            state["guessed"],
            state["lives"],
            state["hmm"]
        ], dim=1) # Concatenate along the feature dimension
        
        # 2. Pass through MLP
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        # 3. Output Q-values
        return self.fc3(x)

In [None]:
# --- More Hyperparameters ---
BATCH_SIZE = 128
GAMMA = 0.99       # Discount factor
EPS_START = 0.9     # Epsilon-greedy exploration start
EPS_END = 0.05
EPS_DECAY = 2000    # How fast epsilon decays
TARGET_UPDATE = 10  # How often to update the target network (in episodes)
BUFFER_CAPACITY = 10000
LEARNING_RATE = 0.001

class DQNAgent:
    def __init__(self, oracle):
        self.policy_net = DQN().to(device)
        self.target_net = DQN().to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval() # Target net is only for evaluation
        
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=LEARNING_RATE)
        self.buffer = ReplayBuffer(BUFFER_CAPACITY)
        self.oracle = oracle # The HMM oracle from Cell 4
        self.steps_done = 0

    def select_action(self, state_dict, valid_actions_mask):
        """
        Chooses an action (int 0-25) using an epsilon-greedy policy.
        """
        sample = random.random()
        eps_threshold = EPS_END + (EPS_START - EPS_END) * \
                        np.exp(-1. * self.steps_done / EPS_DECAY)
        self.steps_done += 1
        
        if sample > eps_threshold:
            # --- Exploitation ---
            with torch.no_grad():
                q_values = self.policy_net(state_dict) # Shape: [1, 26]
                
                # --- ACTION MASKING ---
                q_values[0, ~valid_actions_mask] = -float('inf')
                
                action_tensor = q_values.max(1)[1] # Get index of max Q-value
                return action_tensor.view(1, 1)
        else:
            # --- Exploration ---
            valid_indices = torch.where(valid_actions_mask)[0].tolist()
            if not valid_indices: 
                return torch.tensor([[0]], device=device, dtype=torch.long)
            
            chosen_action = random.choice(valid_indices)
            return torch.tensor([[chosen_action]], device=device, dtype=torch.long)

    def learn(self):
        if len(self.buffer) < BATCH_SIZE:
            return 
        
        experiences = self.buffer.sample(BATCH_SIZE)
        batch = Experience(*zip(*experiences))

        # --- 1. Prepare Batch Tensors (SIMPLIFIED) ---
        state_batch = {
            "guessed": torch.cat([s["guessed"] for s in batch.state]),
            "lives": torch.cat([s["lives"] for s in batch.state]),
            "hmm": torch.cat([s["hmm"] for s in batch.state])
        }
        next_state_batch = {
            "guessed": torch.cat([s["guessed"] for s in batch.next_state]),
            "lives": torch.cat([s["lives"] for s in batch.next_state]),
            "hmm": torch.cat([s["hmm"] for s in batch.next_state])
        }
        
        action_batch = torch.cat(batch.action)
        reward_batch = torch.tensor(batch.reward, dtype=torch.float, device=device)
        done_batch = torch.tensor(batch.done, dtype=torch.bool, device=device)
        
        # --- 2. Calculate Q(s, a) ---
        state_action_values = self.policy_net(state_batch).gather(1, action_batch).squeeze()

        # --- 3. Calculate Target Q-value ---
        next_q_values = self.target_net(next_state_batch)
        
        next_valid_actions_mask = torch.stack(batch.valid_actions)
        next_q_values[~next_valid_actions_mask] = -float('inf') 
        
        next_state_max_q = next_q_values.max(1)[0]
        next_state_max_q[done_batch] = 0.0 
        expected_state_action_values = (next_state_max_q * GAMMA) + reward_batch

        # --- 4. Calculate Loss ---
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)
        
        # --- 5. Optimize ---
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1) # Gradient clipping
        self.optimizer.step()

In [None]:
# --- Training Setup ---
# Ensure 'oracle' (Trigram HMM) from Cell 4 is initialized
# Ensure 'test_words' from Cell 6/7 is loaded
if 'oracle' not in locals() or 'test_words' not in locals():
    print("ERROR: Run previous cells to initialize 'oracle' and 'test_words'.")
else:
    NUM_EPISODES = 5000 # Start with 5000, may need 20,000+
    
    env = HangmanEnv(test_words)
    agent = DQNAgent(oracle)
    
    episode_rewards = []
    print(f"--- Starting DQN Training for {NUM_EPISODES} Episodes ---")
    
    for i_episode in range(NUM_EPISODES):
        # --- Run one episode ---
        (pattern, guessed, lives) = env.reset()
        state = get_state_tensor(pattern, guessed, lives, agent.oracle)
        total_reward = 0
        
        for t in range(MAX_WORD_LEN + 10): # Max turns
            valid_actions_mask = get_valid_actions_mask(guessed)
            action_tensor = agent.select_action(state, valid_actions_mask)
            action_char = INDEX_TO_CHAR[action_tensor.item()]
            
            (next_pattern, next_guessed, next_lives), reward, done = env.step(action_char)
            
            total_reward += reward
            
            # Get next state and mask
            next_state = get_state_tensor(next_pattern, next_guessed, next_lives, agent.oracle)
            next_valid_actions_mask = get_valid_actions_mask(next_guessed)
            
            # Store in buffer
            agent.buffer.push(state, action_tensor, reward, next_state, done, next_valid_actions_mask)
            
            state = next_state
            guessed = next_guessed
            
            agent.learn()
            
            if done:
                break
        
        episode_rewards.append(total_reward)
        
        # --- Log Progress ---
        if (i_episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {i_episode+1}/{NUM_EPISODES} | Avg Reward (last 100): {avg_reward:.2f}")
            
        # Update the target network
        if i_episode % TARGET_UPDATE == 0:
            agent.target_net.load_state_dict(agent.policy_net.state_dict())

    print("--- üèÅ Training Complete ---")
    
    # Save the trained model
    torch.save(agent.policy_net.state_dict(), "dqn_hangman_model.pth")
    print("Model saved to dqn_hangman_model.pth")

--- Starting DQN Training for 5000 Episodes ---
Episode 100/5000 | Avg Reward (last 100): -289.30
Episode 200/5000 | Avg Reward (last 100): -285.35
Episode 300/5000 | Avg Reward (last 100): -296.90
Episode 400/5000 | Avg Reward (last 100): -294.60
Episode 500/5000 | Avg Reward (last 100): -265.75
Episode 600/5000 | Avg Reward (last 100): -258.35
Episode 700/5000 | Avg Reward (last 100): -277.05
Episode 800/5000 | Avg Reward (last 100): -247.45
Episode 900/5000 | Avg Reward (last 100): -276.60
Episode 1000/5000 | Avg Reward (last 100): -263.75
Episode 1100/5000 | Avg Reward (last 100): -277.60
Episode 1200/5000 | Avg Reward (last 100): -259.55
Episode 1300/5000 | Avg Reward (last 100): -265.70
Episode 1400/5000 | Avg Reward (last 100): -263.60
Episode 1500/5000 | Avg Reward (last 100): -238.35
Episode 1600/5000 | Avg Reward (last 100): -239.10
Episode 1700/5000 | Avg Reward (last 100): -277.40
Episode 1800/5000 | Avg Reward (last 100): -259.50
Episode 1900/5000 | Avg Reward (last 100): 

In [None]:
# --- Final Evaluation ---
# Load the trained model if needed (or just use 'agent' from previous cell)
# policy_net = DQN().to(device)
# policy_net.load_state_dict(torch.load("dqn_hangman_model.pth"))
# policy_net.eval()
# agent.policy_net = policy_net

# Use the 'agent' and 'env' from the training cell
agent.policy_net.eval() # Set to evaluation mode (disables dropout, etc.)

NUM_GAMES = 2000 # As specified in the PDF
MAX_LIVES = 6

total_games_won = 0
total_wrong_guesses = 0
total_repeated_guesses = 0

print(f"\n--- Starting Evaluation with Trained DQN Agent: {NUM_GAMES} games ---")

for i in range(NUM_GAMES):
    (pattern, guessed, lives) = env.reset()
    done = False
    
    game_wrong_guesses = 0
    game_repeated_guesses = 0
    
    while not done:
        state = get_state_tensor(pattern, guessed, lives, agent.oracle)
        valid_actions_mask = get_valid_actions_mask(guessed)
        
        # --- Pure Exploitation (No Epsilon) ---
        with torch.no_grad():
            q_values = agent.policy_net(state) # Shape [1, 26]
            
            # --- THIS LINE IS FIXED ---
            # Apply the 1D mask to the 0-th batch item
            q_values[0, ~valid_actions_mask] = -float('inf') 
            
            action_tensor = q_values.max(1)[1]
            
        action_char = INDEX_TO_CHAR[action_tensor.item()]
        
        # --- Track penalties BEFORE stepping ---
        if action_char in guessed:
            game_repeated_guesses += 1
        elif action_char not in env.secret_word:
            game_wrong_guesses += 1
        
        (next_pattern, next_guessed, next_lives), reward, done = env.step(action_char)
        pattern, guessed, lives = next_pattern, next_guessed, next_lives

    if lives > 0: # Game was won
        total_games_won += 1
        
    total_wrong_guesses += game_wrong_guesses
    total_repeated_guesses += game_repeated_guesses

print("--- Evaluation Complete ---")

# 1. Calculate Metrics
success_rate = total_games_won / NUM_GAMES
avg_wrong_guesses = total_wrong_guesses / NUM_GAMES
avg_repeated_guesses = total_repeated_guesses / NUM_GAMES

# 2. Calculate Final Score using the PDF formula
score_from_wins = success_rate * 2000 
penalty_from_wrongs = total_wrong_guesses * 5
penalty_from_repeats = total_repeated_guesses * 2

final_score = score_from_wins - penalty_from_wrongs - penalty_from_repeats

# 3. Print Results
print("\n--- üìä Final Results (DQN Agent) ---")
print(f"**Final Score:** {final_score:.2f}")
print("------------------------------")
print(f"Total Games Played:     {NUM_GAMES}")
print(f"Total Games Won:        {total_games_won} ({success_rate * 100:.2f}%)")
print("------------------------------")
print(f"Total Wrong Guesses:    {total_wrong_guesses} (Avg: {avg_wrong_guesses:.2f} per game)")
print(f"Total Repeated Guesses: {total_repeated_guesses} (Avg: {avg_repeated_guesses:.2f} per game)")
print("------------------------------")
print(f"Score from Wins:        + {score_from_wins:.2f}")
print(f"Penalty from Wrongs:    - {penalty_from_wrongs:.2f}")
print(f"Penalty from Repeats:   - {penalty_from_repeats:.2f}")


--- Starting Evaluation with Trained DQN Agent: 2000 games ---
--- Evaluation Complete ---

--- üìä Final Results (DQN Agent) ---
**Final Score:** -55763.00
------------------------------
Total Games Played:     2000
Total Games Won:        417 (20.85%)
------------------------------
Total Wrong Guesses:    11236 (Avg: 5.62 per game)
Total Repeated Guesses: 0 (Avg: 0.00 per game)
------------------------------
Score from Wins:        + 417.00
Penalty from Wrongs:    - 56180.00
Penalty from Repeats:   - 0.00
