# The Hangman Game
## Sayash Raaj
### IIT Madras

## Instruction:
For this project, your mission is to write an algorithm that plays the game of Hangman through our API server.

When a user plays Hangman, the server first selects a secret word at random from a list. The server then returns a row of underscores (space separated)—one for each letter in the secret word—and asks the user to guess a letter. If the user guesses a letter that is in the word, the word is redisplayed with all instances of that letter shown in the correct positions, along with any letters correctly guessed on previous turns. If the letter does not appear in the word, the user is charged with an incorrect guess. The user keeps guessing letters until either (1) the user has correctly guessed all the letters in the word
or (2) the user has made six incorrect guesses.

You are required to write a "guess" function that takes current word (with underscores) as input and returns a guess letter. You will use the API codes below to play 1,000 Hangman games. You have the opportunity to practice before you want to start recording your game results.

Your algorithm is permitted to use a training set of approximately 250,000 dictionary words. Your algorithm will be tested on an entirely disjoint set of 250,000 dictionary words. Please note that this means the words that you will ultimately be tested on do NOT appear in the dictionary that you are given. You are not permitted to use any dictionary other than the training dictionary we provided. This requirement will be strictly enforced by code review.

You are provided with a basic, working algorithm. This algorithm will match the provided masked string (e.g. a _ _ l e) to all possible words in the dictionary, tabulate the frequency of letters appearing in these possible words, and then guess the letter with the highest frequency of appearence that has not already been guessed. If there are no remaining words that match then it will default back to the character frequency distribution of the entire dictionary.

This benchmark strategy is successful approximately 18% of the time. Your task is to design an algorithm that significantly outperforms this benchmark.

In [3]:
import collections
import random
import string
import time
import secrets
import re
import json
import requests
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Dropout, LSTM, Bidirectional, Embedding
from keras.utils import to_categorical
from keras.preprocessing.sequence import pad_sequences
from keras import callbacks, saving
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
np.set_printoptions(threshold=np.inf)
try:
    from urllib.parse import urlparse, parse_qs, urlencode
except ImportError:
    from urlparse import urlparse, parse_qs
    from urllib import urlencode

### Hyperparameter Tuning: RandomSearch

In [6]:
import keras_tuner as kt
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, Bidirectional, LSTM, Dropout, Dense
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

class HangmanHyperparameterTuning:
    
    def __init__(self, full_dictionary, test_size=0.10):
        self.full_dictionary = full_dictionary
        self.max_length = 0
        self.test_size=test_size
        self.train_x, self.test_x, self.train_y, self.test_y = self.prepare_data()
    
    def prepare_data(self):
        encoded_dict = [[27 if char == '_' else ord(char) - ord('a') + 1 for char in word] for word in self.full_dictionary]
        X = []
        y = []

        # Process each encoded word
        for word_encoding in encoded_dict:
            char_positions = {}
            unique_chars = set(word_encoding)

            # Group character positions
            for char in unique_chars:
                char_positions[char] = [i for i, c in enumerate(word_encoding) if c == char]

            # Create masked words and append to X and y
            for char, positions in char_positions.items():
                masked_word = word_encoding[:]
                for pos in positions:
                    masked_word[pos] = 27
                X.append(masked_word + [len(masked_word)] * 2)  # Add two placeholders
                y.append(char - 1)

        # Update max_length and adjust X
        for i, sample in enumerate(X):
            X[i] = sample[:-2]
            self.max_length = max(self.max_length, len(sample) - 2)

        # Pad sequences for consistent length
        X = pad_sequences(X, maxlen=self.max_length, padding='post')

        # Convert y to categorical labels
        y = to_categorical(y, num_classes=26)

        # Split the data into training and testing sets
        return train_test_split(X, y, test_size=self.test_size, random_state=42)
    
    def build_model(self, hp):
        model=Sequential()
        
        embedding_dim = hp.Int('embedding_dim', min_value=64, max_value=256, step=64)
        model.add(Embedding(28,embedding_dim, input_length=self.max_length))
        
        for i in range(hp.Int('lstm_layers', 2, 5)):
            units = hp.Int(f'lstm_units_{i}', min_value=64, max_value=256, step=64)
            return_sequences = i < (hp.Int('lstm_layers', 2, 5)-1)
            model.add(Bidirectional(LSTM(units, return_sequences=return_sequences)))
            
        model.add(Dropout(hp.Float('dropout_rate', 0.2, 0.5, step=0.1)))
        
        model.add(Dense(26, activation='softmax'))
        
        learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling='log')
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate), loss='categorical_crossentropy', metrics=['accuracy', 'top_k_categorical_accuracy'])
        return model
    
    def tune_model(self):
        tuner = kt.RandomSearch(
            self.build_model,
            objective='val_accuracy',
            max_trials=5,
            executions_per_trial=1,
            directory='hyperparam_tuning_random',
            project_name='hangman_random_search'
        )
        
        stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)
        tuner.search(self.train_x,self.train_y,epochs=2,validation_data=(self.test_x,self.test_y), callbacks=[stop_early])
        
        best_hps = tuner.get_best_hyperparameters()[0]
        return best_hps
    
full_dictionary = open("words.txt").read().splitlines()
hangman_tuner = HangmanHyperparameterTuning(full_dictionary)
best_hps = hangman_tuner.tune_model()

print(best_hps.values)




Search: Running Trial #1

Value             |Best Value So Far |Hyperparameter
256               |256               |embedding_dim
2                 |2                 |lstm_layers
64                |64                |lstm_units_0
256               |256               |lstm_units_1
0.3               |0.3               |dropout_rate
0.000388          |0.000388          |learning_rate


Epoch 1/2



KeyboardInterrupt



#### Hyperparameters found:
embedding_dim: 128,<br>
lstm_layers: 2,<br>
lstm_units_0: 256,<br>
lstm_units_1: 256,<br>
dropout_rate: 0.2,<br>
learning_rate: 0.0006029308510873298,<br>
lstm_units_2: 256

### Training the model, Guessing Strategy

In [7]:
hangman_url = "https://trexsim.com"

class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None, batch_size=1000, epochs=25, test_size=0.10, already_trained=True):
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.hangman_url = self.determine_hangman_url()
        self.test_size = test_size
        self.batch_size = batch_size
        self.epochs = epochs
        self.guessed_letters = []
        self.full_dictionary = self.build_dictionary("words.txt")
        self.max_length = 0
        self.already_trained = already_trained
        self.build_and_train_model()  # three-layer bidirectional LSTM

    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']
        data = {}

        for link in links:
            for _ in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = min(data, key=data.get)
        return link + '/trexsim/hangman'

    def guess(self, word):
        filtered_word = word[::2]
        encoded_word = [27 if char == '_' else ord(char) - ord('a') + 1 for char in filtered_word]
        padded_sequence = pad_sequences([encoded_word], maxlen=self.max_length, padding='post')
        prediction = self.model.predict(padded_sequence)
        letter_order = [chr(i + ord('a')) for i in sorted(range(len(prediction[0])), key=lambda i: prediction[0][i], reverse=True)]

        for letter in letter_order:
            if letter not in self.guessed_letters:
                return letter

    def process_dictionary(self):
        word_data = [[27 if char == '_' else ord(char) - ord('a') + 1 for char in word] for word in self.full_dictionary]

        X, y = [], []

        for word_code in word_data:
            letter_positions = {char: [] for char in set(word_code)}

            for idx, char in enumerate(word_code):
                letter_positions[char].append(idx)

            for char, positions in letter_positions.items():
                masked_word = word_code[:]

                for pos in positions:
                    masked_word[pos] = 27

                target = char - 1
                X.append(masked_word + [0, 0])
                y.append(target)

        self.max_length = max(map(len, X))
        X = pad_sequences(X, maxlen=self.max_length, padding='post')
        y = to_categorical(y, num_classes=26)

        return train_test_split(X, y, test_size=self.test_size, random_state=42)

    def build_and_train_model(self):
        train_x, test_x, train_y, test_y = self.process_dictionary()

        self.model = Sequential([
            Embedding(input_dim=28, output_dim=128, input_length=self.max_length),
            Bidirectional(LSTM(256, return_sequences=True)),
            Bidirectional(LSTM(256, return_sequences=True)),
            Bidirectional(LSTM(256)),
            Dropout(0.2),
            Dense(26, activation='softmax')
        ])

        custom_optimizer = keras.optimizers.Adam(learning_rate=0.0006029308510873298)
        self.model.compile(optimizer=custom_optimizer, loss="categorical_crossentropy", metrics=['top_k_categorical_accuracy', 'accuracy'])

        checkpoint_filepath = 'checkpoint.model.keras'
        model_checkpoint = callbacks.ModelCheckpoint(filepath=checkpoint_filepath, monitor='val_accuracy', mode='max', save_best_only=True)

        if not self.already_trained:
            self.model.fit(train_x, train_y, batch_size=self.batch_size, epochs=self.epochs, validation_data=(test_x, test_y), callbacks=[model_checkpoint])
            self.model.save('model.keras')
        else:
            self.model = keras.models.load_model('model.keras')

    def build_dictionary(self, dictionary_file_location):
        with open(dictionary_file_location, "r") as text_file:
            return text_file.read().splitlines()

    def start_game(self, practice=True, verbose=True):
        self.guessed_letters = []

        response = self.request("/new_game", {"practice": practice})
        if response.get('status') == "approved":
            game_id = response.get('game_id')
            word = response.get('word')
            tries_remains = response.get('tries_remains')
            if verbose:
                print(f"Successfully start a new game! Game ID: {game_id}. # of tries remaining: {tries_remains}. Word: {word}.")
            while tries_remains > 0:
                guess_letter = self.guess(word)
                self.guessed_letters.append(guess_letter)
                if verbose:
                    print(f"Guessing letter: {guess_letter}")

                try:
                    res = self.request("/guess_letter", {"request": "guess_letter", "game_id": game_id, "letter": guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e

                if verbose:
                    print(f"Server response: {res}")
                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status == "success":
                    if verbose:
                        print(f"Successfully finished game: {game_id}")
                    return True
                elif status == "failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print(f"Failed game: {game_id}. Because of: {reason}")
                    return False
                elif status == "ongoing":
                    word = res.get('word')
        else:
            if verbose:
                print("Failed to start a new game")
        return status == "success"

    def my_status(self):
        return self.request("/my_status", {})

    def request(self, path, args=None, post_args=None, method=None):
        if args is None:
            args = {}
        if post_args is not None:
            method = "POST"

        if self.access_token:
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)

        num_retry, time_sleep = 50, 2
        for _ in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args)
                break
            except Exception as e:
                print(f"Exception during request: {str(e)}")
                time.sleep(time_sleep)

        if response.status_code == 200:
            content_type = response.headers.get("content-type")
            if content_type.startswith("text"):
                return response.text
            if content_type.startswith("application/json"):
                return json.loads(response.content.decode("utf-8"))
            return response.content
        elif 400 <= response.status_code < 500:
            raise HangmanAPIError(response)
        return None

# API Usage Examples

## To start a new game:
1. Make sure you have implemented your own "guess" method.
2. Use the access_token that we sent you to create your HangmanAPI object.
3. Start a game by calling "start_game" method.
4. If you wish to test your function without being recorded, set "practice" parameter to 1.
5. Note: You have a rate limit of 20 new games per minute. DO NOT start more than 20 new games within one minute.

In [None]:
api = HangmanAPI(access_token="1db5838f288d84cd4dec2db9be95ab", timeout = 2000, batch_size = 3000, epochs = 5, test_size = 0.1, already_trained=False)

Epoch 1/5
[1m  2/505[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m4:29:39[0m 32s/step - accuracy: 0.0402 - loss: 3.2532 - top_k_categorical_accuracy: 0.1777

## Playing practice games:
You can use the command below to play up to 100,000 practice games.

In [None]:
api.start_game(practice=1,verbose=True)
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
practice_success_rate = total_practice_successes / total_practice_runs
print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))

## Playing recorded games:
Please finalize your code prior to running the cell below. Once this code executes once successfully your submission will be finalized. Our system will not allow you to rerun any additional games.

Please note that it is expected that after you successfully run this block of code that subsequent runs will result in the error message "Your account has been deactivated".

Once you've run this section of the code your submission is complete. Please send us your source code via email.

In [None]:
for i in range(1000):
    print('Playing ', i, ' th game')
    # Uncomment the following line to execute your final runs. Do not do this until you are satisfied with your submission
    #api.start_game(practice=0,verbose=False)
    
    # DO NOT REMOVE as otherwise the server may lock you out for too high frequency of requests
    time.sleep(0.5)

## To check your game statistics
1. Simply use "my_status" method.
2. Returns your total number of games, and number of wins.

In [2]:
api.my_status()

In [17]:
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
success_rate = total_recorded_successes/total_recorded_runs
print('overall success rate = %.3f' % success_rate)

overall success rate = 0.607


### Overall Success Rate: 60.7%