#### 1. Clean list of words

In [11]:
def clean_word_list(words, min_len=2, max_len=30):
    import re

    cleaned = set()
    for w in words:
        w = w.strip().lower()
        # rule 1: alphabetic only
        if not re.fullmatch(r'[a-z]+', w):
            continue
        # rule 2: length constraints
        if not (min_len <= len(w) <= max_len):
            continue
        # rule 3: skip words of all identical letters
        if len(set(w)) == 1:
            continue
        cleaned.add(w)

    print(f"Number of words removed: {len(words) - len(cleaned):,}")
    print(f"Remaining words: {len(cleaned):,}")
    return sorted(cleaned)



file_path = "words_train.txt"
max_word_len = 30


with open(file_path, 'r') as file:
    words = file.read().split('\n')

clean_words = clean_word_list(words)

Number of words removed: 42
Remaining words: 227,259


#### 2. Prepare training data

In [12]:
import numpy as np
import torch
import string
from torch.utils.data import Dataset


## List of words
words = clean_words


## Alphabets dictionary
all_chars = list(string.ascii_lowercase)
all_chars_len = len(all_chars)
char_to_idx = {c: i for i, c in enumerate(all_chars)}
idx_to_char = {i: c for i, c in enumerate(all_chars)}


## Encoding and decoding functions
def encode_word(word, all_chars_len=26):
    encoded_word = torch.zeros((len(word), all_chars_len))

    for i, char in enumerate(word):
        if char == '_':
            continue  # Skip if masked character
        encoded_word[i, char_to_idx[char]] = 1
    return encoded_word


def decode_actual_word(encoded_word):
    row_sums = encoded_word.sum(dim=1)
    last_nonzero_idx = (row_sums != 0).nonzero(as_tuple=True)[0].max().item() + 1
    # remove padding
    trimmed = encoded_word[:last_nonzero_idx]
    char_indices = trimmed.argmax(dim=1)

    return ''.join(idx_to_char[i.item()] for i in char_indices)


def decode_masked_word(masked_word, word_len):
    trimmed = masked_word[:word_len]
    row_sums = trimmed.sum(dim=1)

    decoded_chars = []
    for i in range(trimmed.size(0)):
        if row_sums[i] == 0:
            decoded_chars.append('_')
        else:
            idx = trimmed[i].argmax().item()
            decoded_chars.append(idx_to_char[idx])
    return ''.join(decoded_chars)


## Generate training data from words
def convert_word_to_training_data(word, all_chars_len=26):
    encoded_word = encode_word(word, all_chars_len=all_chars_len)

    # create random masking, consistent per unique character
    unique_chars = sorted(set(word))
    while True:
        char_mask_map = {c: np.random.randint(0, 2) for c in unique_chars}  # 0=shown, 1=masked
        if len(set(char_mask_map.values())) != 1:
            break
    mask = np.array([char_mask_map[c] for c in word])

    # Apply masking directly to word length only
    x_input = encoded_word.clone()
    mask_tensor = torch.tensor(mask, dtype=torch.float32)
    mask_bool = mask_tensor.bool()
    x_input[mask_bool] = 0.0

    y_target = encoded_word  # same length as x_input

    return x_input, y_target, mask_tensor


## Multithreading to generate training data
def process_all_words(words, all_chars_len=26, cache_file="processed_data.pkl", force_process=False, multiplier=2):
    from multiprocessing.dummy import Pool
    import pickle
    from tqdm import tqdm

    if not force_process:
        try:
            with open(cache_file, "rb") as f:
                print(f"Loading cached processed data from {cache_file}...")
                return pickle.load(f)
        except FileNotFoundError:
            print("No cached data found — preprocessing...")

    # repeat each word `multiplier` times for additional training
    words *= multiplier

    # Worker to apply the conversion
    def worker(w):
        return convert_word_to_training_data(w, all_chars_len)

    # Parallel processing
    with Pool() as pool:
        processed_data = list(
            tqdm(pool.imap(worker, words), total=len(words), desc="Processing words")
        )

    # Remove duplicates (tuples can be used in a set directly)
    processed_data = list(set(processed_data))

    # Cache results
    with open(cache_file, "wb") as f:
        pickle.dump(processed_data, f)
        print(f"Saved preprocessed data to {cache_file}")

    return processed_data


class HangmanDataset(Dataset):
    def __init__(self, processed_data):
        self.data = processed_data
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

In [None]:
processed_data = process_all_words(
    words, 
    multiplier=3, 
    cache_file="processed_data.pkl", 
    force_process=False, 
)

dataset = HangmanDataset(processed_data)

Processing words: 100%|██████████| 681777/681777 [01:04<00:00, 10601.60it/s]


Saved preprocessed data to processed_data.pkl


#### 3. Inspect training data

In [9]:
## Preview
num_previews = 30

for i in range(num_previews):
    actual_word = decode_actual_word(processed_data[i][1])
    masked_word = decode_masked_word(processed_data[i][0], len(actual_word))
    print(f'Actual: {actual_word} \nMasked: {masked_word} \n')

Actual: ricey 
Masked: ri_ey 

Actual: resubmitting 
Masked: _esub__tt_n_ 

Actual: ruel 
Masked: ru_l 

Actual: heterosomati 
Masked: _e_er___m___ 

Actual: orthotomous 
Masked: o_t_oto_ou_ 

Actual: electrobiological 
Masked: e_ect_o__o_og_c__ 

Actual: dispiteous 
Masked: d_sp__eo_s 

Actual: tardity 
Masked: tar__t_ 

Actual: spiritualminded 
Masked: ______u________ 

Actual: barrat 
Masked: _arra_ 

Actual: oleums 
Masked: oleum_ 

Actual: ereuthalion 
Masked: _r__thali__ 

Actual: geobiont 
Masked: _eobiont 

Actual: hirundo 
Masked: _i_u_d_ 

Actual: drawoff 
Masked: _r__o__ 

Actual: tallowberries 
Masked: ta__o_b___i__ 

Actual: soleless 
Masked: __l_l___ 

Actual: isouric 
Masked: i_ou_ic 

Actual: kunmiut 
Masked: kun_iut 

Actual: wincopipe 
Masked: _in___i_e 

Actual: prooestrous 
Masked: pr__es_r__s 

Actual: blazing 
Masked: ___zi__ 

Actual: lunations 
Masked: lu__t____ 

Actual: superappreciation 
Masked: su___a____cia_io_ 

Actual: culicine 
Masked: c_l_c_n_ 

Actual:

In [10]:
## Validate quality of encoded training data

idx = 3

print(decode_actual_word(dataset[idx][1]))

# x_input
print(dataset[idx][0])

# y_input
print(dataset[idx][1])

# mask
print(dataset[idx][2])

heterosomati
tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0.],
   

#### 4. Split data into training and validation sets

In [11]:
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch.utils.data import random_split, DataLoader


def collate_fn_dynamic_padding(batch):
    xs, ys, masks = zip(*batch)
    lengths = [x.shape[0] for x in xs]
    lengths = torch.tensor(lengths)

    xs = pad_sequence(xs, batch_first=True) # [seq_len, feature_dim]
    ys = pad_sequence(ys, batch_first=True)
    masks = pad_sequence(masks, batch_first=True)

    lengths, perm_idx = lengths.sort(0, descending=True)    # sort by descending length for pack_padded_sequence
    xs, ys, masks = xs[perm_idx], ys[perm_idx], masks[perm_idx]
    return xs, ys, masks, lengths


dataset_size = len(dataset)
val_size = int(0.1 * dataset_size)
train_size = dataset_size - val_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=256, collate_fn=collate_fn_dynamic_padding, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=256, collate_fn=collate_fn_dynamic_padding, shuffle=False)

#### 5. Train model

In [None]:
import torch
import torch.nn as nn
from tqdm import tqdm

from nnModels import CustomNN


## Define model, optimizer, loss, learning rate
model_rnn = CustomNN(chars_len=26, embed_dim=16, hidden_dim=128, num_layers=2, dropout=0.2)
optimizer = torch.optim.Adam(model_rnn.parameters(), lr=1e-3)
criterion = torch.nn.BCEWithLogitsLoss()
# reduces LR by factor of 0.5 if val loss stagnant for 2 epochs
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)


## Train
num_epochs = 10
for epoch in range(num_epochs):
    model_rnn.train()
    total_train_loss = 0

    for x, y, mask, lengths in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Train]"):
        logits = model_rnn(x, lengths)
        loss = criterion(logits[mask == 1], y[mask == 1])

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)

    # compute validation loss
    model_rnn.eval()
    total_val_loss = 0
    with torch.no_grad():
        for x, y, mask, lengths in val_loader:
            logits = model_rnn(x, lengths)
            val_loss = criterion(logits[mask == 1], y[mask == 1])
            total_val_loss += val_loss.item()

    avg_val_loss = total_val_loss / len(val_loader)
    scheduler.step(avg_val_loss)

    print(f"Epoch {epoch+1}: Train Loss = {avg_train_loss:.4f}, Val Loss = {avg_val_loss:.4f}")


## save model
if False:
    torch.save(model_rnn.state_dict(), "model_rnn.pth")

Epoch 1/10 [Train]: 100%|██████████| 2397/2397 [04:51<00:00,  8.21it/s]


Epoch 1: Train Loss = 0.1241, Val Loss = 0.1129


Epoch 2/10 [Train]: 100%|██████████| 2397/2397 [05:22<00:00,  7.44it/s]


Epoch 2: Train Loss = 0.1113, Val Loss = 0.1084


Epoch 3/10 [Train]: 100%|██████████| 2397/2397 [04:48<00:00,  8.30it/s]


Epoch 3: Train Loss = 0.1079, Val Loss = 0.1059


Epoch 4/10 [Train]: 100%|██████████| 2397/2397 [04:44<00:00,  8.43it/s]


Epoch 4: Train Loss = 0.1060, Val Loss = 0.1047


Epoch 5/10 [Train]: 100%|██████████| 2397/2397 [04:44<00:00,  8.42it/s]


Epoch 5: Train Loss = 0.1047, Val Loss = 0.1036


Epoch 6/10 [Train]: 100%|██████████| 2397/2397 [04:49<00:00,  8.28it/s]


Epoch 6: Train Loss = 0.1038, Val Loss = 0.1030


Epoch 7/10 [Train]: 100%|██████████| 2397/2397 [04:48<00:00,  8.31it/s]


Epoch 7: Train Loss = 0.1031, Val Loss = 0.1025


Epoch 8/10 [Train]: 100%|██████████| 2397/2397 [04:46<00:00,  8.37it/s]


Epoch 8: Train Loss = 0.1024, Val Loss = 0.1021


Epoch 9/10 [Train]: 100%|██████████| 2397/2397 [04:46<00:00,  8.37it/s]


Epoch 9: Train Loss = 0.1019, Val Loss = 0.1016


Epoch 10/10 [Train]: 100%|██████████| 2397/2397 [04:38<00:00,  8.59it/s]


Epoch 10: Train Loss = 0.1015, Val Loss = 0.1013


In [None]:
## Load model

# import torch
# from nnModels import CustomNN

# model_rnn = CustomNN(chars_len=26, embed_dim=16, hidden_dim=128, num_layers=2, dropout=0.2)
# model_rnn.load_state_dict(torch.load("model_rnn.pth", map_location=torch.device("cpu")))
# model_rnn.eval()

#### 6. Test model

In [None]:
## Choose inference methodology

from Hangman import HangmanRNN


game_test = HangmanRNN(timeout=2000, model=model_rnn)

1.59 MB


In [None]:
## Example 1
masked_word = "a p p _ e"
game_test.guessed_letters = ['a', 'p', 'e', 'b', 'c', 'd', 'h']

char, pos, probs = game_test.guess(masked_word, return_probs=True).values()

if len(set(masked_word)) == 1:
    print(f"Next guess: '{char}'")
else:
    print(f"Next guess: '{char}' at position {pos[0]}")
    print("Probability matrix (masked positions only):")
    masked_positions = [idx for idx, char in enumerate(masked_word.replace(" ", "")) if char == '_']
    for i in masked_positions:
        probs_dict = {idx_to_char[j]: np.round(float(probs[i, j]), 4) for j in range(all_chars_len) if probs[i,j]>0}
        print(f"Position {i}: {probs_dict}")
        print(sorted(probs_dict, key=probs_dict.get, reverse=True))

Next guess: 'l' at position 3
Probability matrix (masked positions only):
Position 3: {'f': 0.0001, 'g': 0.0011, 'i': 0.1913, 'j': 0.0003, 'k': 0.0002, 'l': 0.6618, 'm': 0.0004, 'n': 0.0014, 'o': 0.0239, 'q': 0.0, 'r': 0.0503, 's': 0.0298, 't': 0.0094, 'u': 0.0243, 'v': 0.0004, 'w': 0.0001, 'x': 0.0, 'y': 0.0051, 'z': 0.0001}
['l', 'i', 'r', 's', 'u', 'o', 't', 'y', 'n', 'g', 'm', 'v', 'j', 'k', 'f', 'w', 'z', 'q', 'x']


In [44]:
## Example 2
masked_word = '_ _ _ _'
game_test.guessed_letters = []

char, pos, probs = game_test.guess(masked_word, return_probs=True).values()

if len(set(masked_word.replace(" ", ""))) == 1:
    print(f"Next guess: '{char}'")
else:
    print(f"Next guess: '{char}' at position {pos[0]}")
    print("Probability matrix (masked positions only):")
    masked_positions = [idx for idx, char in enumerate(masked_word.replace(" ", "")) if char == '_']
    for i in masked_positions:
        probs_dict = {idx_to_char[j]: np.round(float(probs[i, j]), 4) for j in range(all_chars_len) if probs[i,j]>0}
        print(f"Position {i}: {probs_dict}")
        print(sorted(probs_dict, key=probs_dict.get, reverse=True))

Next guess: 'e'
