In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import math
import copy
import itertools
import random
import string
import secrets
import time
import re
import collections


model_allready_trained = True    # change to True if you want to load the model I send "hangman_model_12_09.pt" and not start with new training.
run_in_googel_colab = False       # change to True if you want to run on googel colab


if run_in_googel_colab == True:
    from google.colab import drive
    drive.mount('/content/drive')


# and I added this two classes: MyBertModelForHangman and ApplyBert
class MyBertModelForHangman(nn.Module):

    def __init__(self, model_dim_n = None, heads_n = None, layers_n = None, max_word_length = None, dropouts_rate = None):
        super().__init__()
        self.model_dim_n = model_dim_n
        self.heads_n = heads_n
        self.layers_n = layers_n
        self.max_word_length = max_word_length
        self.dropouts_rate = dropouts_rate

        #create dictionary that convert letter to numbers
        alphabet = 'abcdefghijklmnopqrstuvwxyz'
        self.dict_for_letter_to_number = {'[PAD]': 0, '_': 1, '[UNK]': 2}
        self.dict_for_letter_to_number.update({letter: i + 3 for i, letter in enumerate(alphabet)})

        # create instances
        self.embedding = nn.Embedding(len(self.dict_for_letter_to_number), self.model_dim_n, padding_idx = 0)
        # create positional encoding
        self.create_positional_encoding()
        self.create_transformer_layers()
        self.apply_dropout = nn.Dropout(self.dropouts_rate)
        self.forward_projection = nn.Linear(self.model_dim_n, len(self.dict_for_letter_to_number))


    def tokenize_word_from_dictionary(self, word):

        tokenized_word = [self.dict_for_letter_to_number.get(letter, self.dict_for_letter_to_number['[UNK]']) for letter in word]     # in each word convert the letter to number according to dict_for_letter_to_number. If there is unknow character put there the "unknow" token.

        while len(tokenized_word) < self.max_word_length:                                                                             # pad the word with the pad token until it reach the max lenth
            tokenized_word.append(self.dict_for_letter_to_number['[PAD]'])

        return tokenized_word


    def tokenize_mask_word(self, masked_word):
        '''Return a tuple of 2 list: list of the tokeniezed word and list of position of the mask letters.'''
        masked_word_without_spaces = masked_word[::2]  # Remove spaces

        tokenized_masked_word = [self.dict_for_letter_to_number.get(letter, self.dict_for_letter_to_number['[UNK]']) for letter in masked_word_without_spaces]  # as in the tokenize_word_from_dictionary function.

        while len(tokenized_masked_word) < self.max_word_length:                                                                                                # as in the tokenize_word_from_dictionary function.
            tokenized_masked_word.append(self.dict_for_letter_to_number['[PAD]'])

        ind_of_masked_letter = [index for index, token in enumerate(tokenized_masked_word) if token == self.dict_for_letter_to_number['_']]                # find position of masked letters

        return tokenized_masked_word, ind_of_masked_letter


    def create_positional_encoding(self):
        positional_encoding_matrix = torch.zeros(self.max_word_length, self.model_dim_n)         # create a tensor of 0's with the propoer dimentions

        letter_position = torch.arange(0, self.max_word_length).unsqueeze(1).float()                          # create a tensor with the proper structure for letter, for exampl: [[0.], [1.], [2.], [3.], [4.]]

        the_divided_term = torch.exp(torch.arange(0, self.model_dim_n,2).float() * - (math.log(10000) / self.model_dim_n))      # it term to divide the position after applyn to it cos or sin based on the position, based on the formula in attention is all you need.
        positional_encoding_matrix[:, 0::2] = torch.sin(letter_position * the_divided_term)                                     # for all even values apply this formula
        positional_encoding_matrix[:, 1::2] = torch.cos(letter_position * the_divided_term)                                     # for all not even this

        self.register_buffer('positional_encoding', positional_encoding_matrix.unsqueeze(0))                                                             # keep the positional encoding fixed and not train it


    def create_transformer_layers(self):

        self.attention_layer = nn.ModuleList([nn.MultiheadAttention(embed_dim = self.model_dim_n, num_heads = self.heads_n, dropout = self.dropouts_rate, batch_first=True) for _ in range(self.layers_n)])   #  self-attention layers: Each layer helps the model "pay attention" to all tokens in the input sequence

        self.norm_layer_phase_1 = nn.ModuleList([nn.LayerNorm(self.model_dim_n) for _ in range(self.layers_n)])     #  after attention, normalize the activations for stable training

        self.norm_layer_phase_2 = nn.ModuleList([nn.LayerNorm(self.model_dim_n) for _ in range(self.layers_n)])     # second layer normalization: After the feedforward step, normalize again

        self.feed_forward_layer = nn.ModuleList([                                                                 # neural network for deeper learning
            nn.Sequential(
                nn.Linear(self.model_dim_n, self.model_dim_n * 4),  # First, expand the hidden size
                nn.GELU(),                                  # Use GELU activation for non-linearity (used in BERT)
                nn.Dropout(self.dropouts_rate),
                nn.Linear(self.model_dim_n * 4, self.model_dim_n)) for _ in range(self.layers_n)])   # Then project back down to original size


    def apply_the_network(self, x, padding_mask = None): # apply the bert nn architecture, the input x is tokenzed tensor
        x = self.embedding(x) * math.sqrt(self.model_dim_n)  # create embedding and scale by model dimentions
        x = x + self.positional_encoding[:, :x.size(1), :] 
        x = self.apply_dropout(x)

        for layer_index in range(self.layers_n):        # apply transformer block
            attention_results, _ = self.attention_layer[layer_index](x, x, x, key_padding_mask = padding_mask) 
            x = self.norm_layer_phase_1[layer_index](x + self.apply_dropout(attention_results))
            feed_forward_results = self.feed_forward_layer[layer_index](x)
            x = self.norm_layer_phase_2[layer_index](x + self.apply_dropout(feed_forward_results))

        x = self.forward_projection(x)

        return x


## I finished the model definition, now creating class of function that apply the model:
class ApplyBert():

    def __init__(self, train_dataset_path):
        self.my_bert_model = None    # will be update in the hyper parameter search function that also run the full training
        self.train_dataset_path = train_dataset_path
        self.create_train_validation_words()
        self.running_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def create_train_validation_words(self):

        with open(self.train_dataset_path, 'r') as file:
            train_words = [line.strip().lower() for line in file] # word is list of words

        self.train_words, self.validation_words = train_test_split(train_words, test_size = 0.2)   # split for 80% train, 20% validation


    def convert_batch_words_to_masked_words(self, batch_words, max_mask_letter_rate, versions_per_word = 2):

        # create masked inpute
        batch_mask_inputs = []
        batch_labels = []

        for word in batch_words:
            tokenized_word = self.my_bert_model.tokenize_word_from_dictionary(word)[: self.my_bert_model.max_word_length]  # include padding

            # Create 2 masked versions of the same word
            for _ in range(versions_per_word):
                # Different random masking each time
                inds_of_masked_positions = random.sample(range(len(word)),                                                   # from all the indexes of the word
                                                        max(1, int(len(word) * random.uniform(0.2, max_mask_letter_rate))))  # chose the max from 1 or distrubute uniformly between this 2 values.

                masked_tokenized_word = tokenized_word.copy()
                tokenized_word_label_letters = [-100] * len(tokenized_word)      # give me list in the lenth of the word - max length

                for position in inds_of_masked_positions:
                    masked_tokenized_word[position] = self.my_bert_model.dict_for_letter_to_number['_']
                    tokenized_word_label_letters[position] = tokenized_word[position]

                batch_mask_inputs.append(masked_tokenized_word)
                batch_labels.append(tokenized_word_label_letters)

        batch_mask_inputs = torch.tensor(batch_mask_inputs, dtype = torch.long).to(self.running_device)      # tokenies masked words: 4,7,_,6,9,0,0,0 ([PAD],[PAD],[PAD])
        batch_labels = torch.tensor(batch_labels, dtype = torch.long).to(self.running_device)                # the true label of the masked words, the not maske are -100 and ignored -100,-100,8,-100,-100,-100,-100,-100

        return batch_mask_inputs, batch_labels



    def bert_training(self, epochs_n, train_words, validation_words, batch_size, learning_rate, after_what_number_of_failed_epoch_to_stop, max_mask_letter_rate):

        print(f'Apply train with {len(train_words)} words and validate on {len(validation_words)} words')

        # variables to early stopping in the end of this function
        patience_counter = 0
        best_model_so_far = None
        best_validation_loss = float('inf')

        ###### training

        optimizer = torch.optim.AdamW(self.my_bert_model.parameters(), lr = learning_rate)     # define how to minimize the loss. In this case same as adam, because there is no regularization.
        loss_measurement = nn.CrossEntropyLoss(ignore_index = -100)                         # mesure the goodness of the prediction, ignore not masked letters. it is the average of the âˆ’log(predictedÂ probabilityÂ ofÂ theÂ correctÂ class), so as the prediction increaes the loss decreased.


        for epoch in range(epochs_n):             # for each training circle

            # Initialize variables
            epoch_start_time = time.time()
            train_total_loss = 0
            train_batches_n = 0

            self.my_bert_model.train()

            random.shuffle(train_words)

            for index in range(0, len(train_words), batch_size):   # for each batch

                batch_words = train_words[index : index + batch_size]

                batch_mask_inputs, batch_labels = self.convert_batch_words_to_masked_words(batch_words, max_mask_letter_rate)

                # move the batch forward to see prediction
                logits = self.my_bert_model.apply_the_network(batch_mask_inputs, padding_mask = (batch_mask_inputs == 0))     # do True where pad and after that ignore in the attention caculation
                batch_loss = loss_measurement(logits.reshape(-1, logits.size(-1)), batch_labels.reshape(-1))                  # flatten, average loss of -log(logit score)

                # go backward to tune the weights
                optimizer.zero_grad()                               # reset the gradient
                batch_loss.backward()                                   # compute gradients and do backpropagetion to see in which directions the weights should change
                torch.nn.utils.clip_grad_norm_(self.my_bert_model.parameters(), 1.0)    # limit the gradient values if they too large for stability
                optimizer.step()                                                        # update the weights i the direction that reduce loss

                train_total_loss += batch_loss.item()
                train_batches_n += 1

            average_train_loss = train_total_loss / train_batches_n


            ###### validation
            with torch.no_grad():
                self.my_bert_model.eval()   # no weightes changes, only the exist model

                validation_total_loss = 0
                validation_batches_n = 0
                total_words = 0
                correct_words = 0

                for index in range(0, len(validation_words), batch_size):   # for each batch
                    batch_words = validation_words[index : index + batch_size]
                    batch_mask_inputs, batch_labels = self.convert_batch_words_to_masked_words(batch_words, max_mask_letter_rate)

                    # move the bach forward to see prediction
                    logits = self.my_bert_model.apply_the_network(batch_mask_inputs, padding_mask = (batch_mask_inputs == 0))
                    loss_validation = loss_measurement(logits.reshape(-1, logits.size(-1)), batch_labels.reshape(-1))

                    validation_total_loss += loss_validation.item()
                    validation_batches_n += 1

                    # compute accuracy to each word (the loss is per position)
                    predictions = logits.argmax(dim = 2)
                    for i in range(len(batch_words)):
                        masked_positions = (batch_labels[i] != -100)
                        all_predicted_masked_letter_correct = (predictions[i][masked_positions] == batch_labels[i][masked_positions]).all()
                        if all_predicted_masked_letter_correct:
                            correct_words += 1
                        total_words +=1


                average_validation_loss = validation_total_loss / validation_batches_n
                accuracy = correct_words / total_words * 100

            epoch_duration_time_in_minutes = (time.time() - epoch_start_time) / 60

            print(f'Finish epoch number {epoch + 1} in {epoch_duration_time_in_minutes:.2f} minutes, average_batches_train_loss: {average_train_loss:.2f}, average_batches_validation_loss: {average_validation_loss:.2f}, accuracy: {accuracy:.2f}%')


            ###### define early stopping

            if average_validation_loss < best_validation_loss:
                best_validation_loss = average_validation_loss
                patience_counter = 0
                best_model_so_far = copy.deepcopy(self.my_bert_model.state_dict())      
                torch.save(self.my_bert_model.state_dict(), 'hangman_model_12_09.pt')   # save to disk
                print('Best model saved to disk')
                if run_in_googel_colab == True:
                    torch.save(self.my_bert_model.state_dict(), '/content/drive/MyDrive/hangman_model_12_09.pt')
                    print("Model saved to Google Drive successfully!")
            else:
                patience_counter += 1
                if patience_counter >= after_what_number_of_failed_epoch_to_stop:
                    self.my_bert_model.load_state_dict(best_model_so_far)                        
                    print(f'Early stopping in epoch {epoch +1}, load best model.')
                    break

        return self.my_bert_model, accuracy

    def hypert_parameter_search_and_full_training(self, rate_for_hyperparameters_sample_size):

        # create random subset of the dataset - take 10% of the dataset
        train_words_size = int(len(self.train_words) * rate_for_hyperparameters_sample_size)
        validation_words_size = int(len(self.validation_words) * rate_for_hyperparameters_sample_size)

        train_words_sample_for_hyper_parameter_search = random.sample(self.train_words, train_words_size)
        validation_words_sample_for_hyper_parameter_search = random.sample(self.validation_words, validation_words_size)

        # find the len of the longest word
        max_word_length = max(len(word) for word in self.train_words + self.validation_words)

        # creat a grid to parameters search
        paramerters_to_search = {'model_dim_n': [128, 256], 'heads_n':  [4 ,8], 'layers_n': [4, 8],                                   
                                 'max_word_length': [max_word_length], 'dropouts_rate': [0.1, 0.3], 'batch_size': [32, 64],
                                 'learning_rate':  [0.0001, 0.0002], 'max_mask_letter_rate': [0.5, 0.7]}
        
        paramerters_to_search = {'model_dim_n': [256], 'heads_n':  [4], 'layers_n': [4],                                   # I saved only the parameter that perform best to try repreduce results on different dataset
                         'max_word_length': [max_word_length], 'dropouts_rate': [0.1], 'batch_size': [32],
                         'learning_rate':  [0.0001], 'max_mask_letter_rate': [0.5]}

        # apply each option in the parmeter combination
        parameters_and_accuracy = []

        parameters_groups = list(paramerters_to_search)
        parameters_groups_values = list(paramerters_to_search.values())

        for parameters_group in itertools.product(*parameters_groups_values):
            single_parameters_combination = dict(zip(parameters_groups, parameters_group))

            if single_parameters_combination['model_dim_n'] % single_parameters_combination['heads_n'] != 0:       
                continue

            # initalize my bert model for this combination and train it
            self.my_bert_model = MyBertModelForHangman(model_dim_n = single_parameters_combination['model_dim_n'], heads_n = single_parameters_combination['heads_n'],
                                                       layers_n = single_parameters_combination['layers_n'], max_word_length = single_parameters_combination['max_word_length'],
                                                       dropouts_rate = single_parameters_combination['dropouts_rate']).to(self.running_device)
            _, combination_accuracy = self.bert_training(epochs_n = 3, train_words = train_words_sample_for_hyper_parameter_search, validation_words = validation_words_sample_for_hyper_parameter_search,
                                                         batch_size = single_parameters_combination['batch_size'], learning_rate = single_parameters_combination['learning_rate'],
                                                         after_what_number_of_failed_epoch_to_stop = 2, max_mask_letter_rate = single_parameters_combination['max_mask_letter_rate'])

            parameters_and_accuracy.append({'parameters_combination': copy.deepcopy(single_parameters_combination), 'accuracy': combination_accuracy})

            print(f' parameters_combination of: {single_parameters_combination} have accuracy of: {combination_accuracy:.2f}')

        combination_with_highest_accuracy = max(parameters_and_accuracy, key = lambda x: x['accuracy'])
        print(f"The best combination parameters is {combination_with_highest_accuracy['parameters_combination']} with accuracy of {combination_with_highest_accuracy['accuracy']}.")

        # run the model with the best parameters on all the dataset
        combination_with_highest_accuracy = combination_with_highest_accuracy['parameters_combination']      # stor the best parameter properly
        self.my_bert_model = MyBertModelForHangman(model_dim_n = combination_with_highest_accuracy['model_dim_n'], heads_n = combination_with_highest_accuracy['heads_n'],
                                                   layers_n = combination_with_highest_accuracy['layers_n'], max_word_length = combination_with_highest_accuracy['max_word_length'],
                                                   dropouts_rate = combination_with_highest_accuracy['dropouts_rate']).to(self.running_device)
        self.my_bert_model, accuracy = self.bert_training(epochs_n = 1000, train_words = self.train_words, validation_words = self.validation_words,
                                                     batch_size = combination_with_highest_accuracy['batch_size'], learning_rate = combination_with_highest_accuracy['learning_rate'],
                                                     after_what_number_of_failed_epoch_to_stop = 10, max_mask_letter_rate = combination_with_highest_accuracy['max_mask_letter_rate'])

        return self.my_bert_model, accuracy



class Play_Hangman(object):
    def __init__(self):
        self.guessed_letters = []
        
        full_dictionary_location = "words_250000_train.txt"
        self.full_dictionary = self.build_dictionary(full_dictionary_location)        
        self.full_dictionary_common_letter_sorted = collections.Counter("".join(self.full_dictionary)).most_common()
        
        self.current_dictionary = []

        self.incorrect_guesses = 0  # this line added
        self.train_dataset_path = 'words_250000_train.txt' # I created path to the train data:
        self.train_and_load_my_bert_model()
        self.full_dictionary = self.build_dictionary(self.train_dataset_path)

        self.guessed_letters = []



    def build_dictionary(self, dictionary_file_location):
        text_file = open(dictionary_file_location,"r")
        full_dictionary = text_file.read().splitlines()
        text_file.close()
        return full_dictionary

    def train_and_load_my_bert_model(self):
        if model_allready_trained == True:
            self.bert_instance = ApplyBert(self.train_dataset_path)
            self.my_bert_model = MyBertModelForHangman(
                model_dim_n=256,
                heads_n=4,
                layers_n=4,
                max_word_length=29,
                dropouts_rate=0.1).to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
        
            self.my_bert_model.load_state_dict(torch.load("hangman_model_12_09.pt", map_location=torch.device('cpu')))
            self.my_bert_model.eval()
            print(f"Model loaded successfully")
            
        else:
            self.bert_instance = ApplyBert(self.train_dataset_path)
            self.my_bert_model, _ = self.bert_instance.hypert_parameter_search_and_full_training(rate_for_hyperparameters_sample_size = 0.02)
            self.my_bert_model.eval()

    def guess(self, masked_word):
        
        exists_letters = set()               # find visible letters from the masked word
        for letter in masked_word[::2]:  
            if letter != '_':
                exists_letters.add(letter)

        tokenized_masked_word, ind_of_masked_letter = self.my_bert_model.tokenize_mask_word(masked_word)
        tensor_of_tokenized_masked_word = torch.tensor([tokenized_masked_word], dtype=torch.long).to(self.bert_instance.running_device)       #convert it to tensor
        
        with torch.no_grad():
            padding_mask = (tensor_of_tokenized_masked_word == 0)
            logits = self.my_bert_model.apply_the_network(tensor_of_tokenized_masked_word, padding_mask=padding_mask)  # [1, max_lenth, 29]
            scores_for_masked_position = {}
            
            for position in ind_of_masked_letter:
                
                probabilities_for_this_position = F.softmax(logits[0, position], dim = 0)    #  len 29
                for letter in 'abcdefghijklmnopqrstuvwxyz':
                    if letter not in self.guessed_letters and letter not in exists_letters:
                        letter_index = self.my_bert_model.dict_for_letter_to_number[letter]       # convert from letter to to token.  the letter token and the index (an integer)
                        score = probabilities_for_this_position[letter_index].item()              # find the probability of this token
                        scores_for_masked_position[letter] = scores_for_masked_position.get(letter, 0) + score    # add probabilities of all the masked letters
        
        guess_letter = max(scores_for_masked_position.items(), key = lambda x: x[1])[0]    # get the letter_with_the_highest_score

        return guess_letter

    def reset_game(self, word):
        """Reset the game state for a new word"""
        self.guessed_letters = []
        self.incorrect_guesses = 0  # This line was in comment but not executed
        self.current_dictionary = [w for w in self.full_dictionary if len(w) == len(word)]

    def display_word_state(self, word, guessed_letters):
        """Display the current state of the word with guessed letters revealed"""
        return ' '.join([c if c in guessed_letters else '_' for c in word])


    def play_game(self, word):
        """Play a complete hangman game with the given word"""
        self.reset_game(word)
        display_word = self.display_word_state(word, self.guessed_letters)
        
        print(f"Starting game with word: {display_word}")
        
        while '_' in display_word:
            # Check if game failed due to too many incorrect guesses
            if self.incorrect_guesses > 5:
                print(f"ðŸ’€ GAME FAILED! More than 5 incorrect guesses.")
                print(f"The word was: {word}")
                return False
                
            guess = self.guess(display_word)
            
            if guess in self.guessed_letters:
                print(f"Already guessed letter: {guess}")
                continue
                
            self.guessed_letters.append(guess)
            
            # Check if guess is incorrect and increment counter
            if guess not in word:
                self.incorrect_guesses += 1
                
            display_word = self.display_word_state(word, self.guessed_letters)
            print(f"Guess: {guess} -> {display_word} | Incorrect: {self.incorrect_guesses}/5")
        
        print(f"SUCCESS! Word guessed: {word}")
        return True


    def test_multiple_games(self, num_games=5):
        """Test the algorithm on multiple random words"""
        total_guesses = 0
        successful_games = 0
        failed_games = 0
        
        for i in range(num_games):
            word = random.choice(self.full_dictionary)
            print(f"\n=== Game {i+1}: Testing word '{word}' ===")
            
            try:
                success = self.play_game(word)
                if success:
                    successful_games += 1
                    total_guesses += len(self.guessed_letters)
                else:
                    failed_games += 1
            except Exception as e:
                print(f"Error during game: {e}")
                failed_games += 1
        
        print(f"\n=== FINAL RESULTS ===")
        print(f"Successful games: {successful_games}/{num_games}")
        print(f"Failed games: {failed_games}/{num_games}")
        if successful_games > 0:
            avg_guesses = total_guesses / successful_games
            print(f"Average guesses per successful game: {avg_guesses:.2f}")
            print  (f"Accuracy of {successful_games /  num_games} in {failed_games + successful_games} games")

# Usage examples:
games = Play_Hangman()

# Test single random word
print("=== Single Random Word Test ===")
random_word = random.choice(games.full_dictionary)
success = games.play_game(random_word)

# Test multiple words to see win/loss rate
print("\n=== Multiple Words Test ===")
games.test_multiple_games(5)

  self.my_bert_model.load_state_dict(torch.load("hangman_model_12_09.pt", map_location=torch.device('cpu')))


Model loaded successfully
=== Single Random Word Test ===
Starting game with word: _ _ _ _ _ _ _ _ _ _ _
Guess: e -> _ e _ _ _ _ _ _ _ _ _ | Incorrect: 0/5
Guess: i -> _ e _ _ _ _ _ _ _ _ _ | Incorrect: 1/5
Guess: a -> _ e _ _ _ _ a _ _ _ _ | Incorrect: 1/5
Guess: t -> _ e _ _ _ _ a t _ _ _ | Incorrect: 1/5
Guess: n -> _ e _ _ _ _ a t _ _ _ | Incorrect: 2/5
Guess: r -> r e _ _ r _ a t _ r _ | Incorrect: 2/5
Guess: o -> r e _ o r _ a t o r _ | Incorrect: 2/5
Guess: m -> r e _ o r m a t o r _ | Incorrect: 2/5
Guess: f -> r e f o r m a t o r _ | Incorrect: 2/5
Guess: y -> r e f o r m a t o r y | Incorrect: 2/5
SUCCESS! Word guessed: reformatory

=== Multiple Words Test ===

=== Game 1: Testing word 'deathdeep' ===
Starting game with word: _ _ _ _ _ _ _ _ _
Guess: e -> _ e _ _ _ _ e e _ | Incorrect: 0/5
Guess: r -> _ e _ _ _ _ e e _ | Incorrect: 1/5
Guess: s -> _ e _ _ _ _ e e _ | Incorrect: 2/5
Guess: t -> _ e _ t _ _ e e _ | Incorrect: 2/5
Guess: n -> _ e _ t _ _ e e _ | Incorrect: 3/5
G