# Trexquant Interview Project (The Hangman Game)

* Copyright Trexquant Investment LP. All Rights Reserved. 
* Redistribution of this question without written consent from Trexquant is prohibited

## Instruction:
For this coding test, your mission is to write an algorithm that plays the game of Hangman through our API server. 

When a user plays Hangman, the server first selects a secret word at random from a list. The server then returns a row of underscores (space separated)—one for each letter in the secret word—and asks the user to guess a letter. If the user guesses a letter that is in the word, the word is redisplayed with all instances of that letter shown in the correct positions, along with any letters correctly guessed on previous turns. If the letter does not appear in the word, the user is charged with an incorrect guess. The user keeps guessing letters until either (1) the user has correctly guessed all the letters in the word
or (2) the user has made six incorrect guesses.

You are required to write a "guess" function that takes current word (with underscores) as input and returns a guess letter. You will use the API codes below to play 1,000 Hangman games. You have the opportunity to practice before you want to start recording your game results.

Your algorithm is permitted to use a training set of approximately 250,000 dictionary words. Your algorithm will be tested on an entirely disjoint set of 250,000 dictionary words. Please note that this means the words that you will ultimately be tested on do NOT appear in the dictionary that you are given. You are not permitted to use any dictionary other than the training dictionary we provided. This requirement will be strictly enforced by code review.

You are provided with a basic, working algorithm. This algorithm will match the provided masked string (e.g. a _ _ l e) to all possible words in the dictionary, tabulate the frequency of letters appearing in these possible words, and then guess the letter with the highest frequency of appearence that has not already been guessed. If there are no remaining words that match then it will default back to the character frequency distribution of the entire dictionary.

This benchmark strategy is successful approximately 18% of the time. Your task is to design an algorithm that significantly outperforms this benchmark.

In [2]:
import xgboost as xgb
import numpy as np
import pandas as pd
import pickle
import string

class HangmanXGBoostPredictor:
    def __init__(self, model_path="xgboost_hangman_models2.pkl", pos_features=65):
        self.alphabet = list(string.ascii_lowercase)
        self.num_classes = len(self.alphabet)
        self.models = []
        self.pos_features = pos_features
        self._load_models(model_path)

    def _load_models(self, model_path):
        with open(model_path, 'rb') as f:
            self.models = pickle.load(f)

    def _encode_word_state(self, word: str, guessed_letters: set) -> np.ndarray:
        feat = np.full(self.pos_features, -1, dtype=np.int8)
        length = len(word)
        offset = self.pos_features - length

        for i, ch in enumerate(word):
            code = 0 if ch == '*' else ord(ch) - ord('a') + 1
            feat[i] = code
            feat[offset + i] = code

        return feat.reshape(1, -1)

    def predict_letter(self, word: str, guessed_letters: set) -> str:
        features = self._encode_word_state(word, guessed_letters)
        scores = []
        # print(self.models)
        for idx, model in enumerate(self.models.items()):
            # print(idx,model)
            model=model[1]
            letter = self.alphabet[idx]
            if letter in guessed_letters:
                scores.append(-np.inf)
                continue
            prob = model.predict_proba(features)[0][1]  # probability of presence
            scores.append(prob)

        return scores

In [3]:
import torch
from transformers import CanineTokenizer, CanineConfig, CanineForSequenceClassification
from transformers import CanineConfig, CanineTokenizer, AutoModelForSequenceClassification
from scipy.special import softmax

# # Constants for separating and masking in the CANINE input sequence
# CANINE_SEP_TOKEN   = " [SEP] "
# CANINE_MASK_TOKEN  = "[MASK]"

class CanineHangmanPlayer:
    def __init__(self, pretrained_model_path: str = "google/canine-s", device: torch.device = None,xgb=None):
        # Load CANINE tokenizer & config
        self.tokenizer = CanineTokenizer.from_pretrained('google/canine-s')
        self.config    = CanineConfig.from_pretrained(pretrained_model_path)
        self.config.num_labels = 26
        self.xgb=xgb
        self.nice=1
        self.trigger=1
        self.all_letters = [chr(i) for i in range(ord('a'), ord('z')+1)]
        self.state=False
        self.guesses={}

        
        # Set up device
        self.device = "cuda"#device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Load a sequence-classification head on top of CANINE
        self.model =AutoModelForSequenceClassification.from_pretrained(
            pretrained_model_path,
            config=self.config).to(self.device)
        
        # Tokens for building the game state string
        self.CANINE_MASK_TOKEN = self.tokenizer.mask_token
        self.CANINE_SEP_TOKEN = self.tokenizer.sep_token
        
        # Toggle for self-play finetuning
        self.training = False
        
    def predict(self,word,guesses):
            encoded_word=word.replace(" ","").replace("_","*")
            missing_count = encoded_word.count("*")

            guesses={x:True for x in guesses}
                

            # check = self.xgb.predict_letter(''.join(encoded_word), guesses)
            state = ''.join(guesses.keys()) + self.CANINE_SEP_TOKEN + encoded_word.replace('*', self.CANINE_MASK_TOKEN)
            enc = self.tokenizer(state, padding="max_length", truncation=True, max_length=64, return_tensors="pt").to(self.device)

            with torch.no_grad():
                logits = self.model(**enc).logits

            # arr = logits.cpu().numpy()[0]
            arr = softmax(logits.cpu().numpy()[0]).tolist()
            # arr1=softmax(np.argsort(arr)[::-1]).tolist()
            # arr1=softmax(arr1)
            
            total = sum(arr)
            
            guess_idx = np.argmax(arr)
            # arr = [x / total for x in arr]
            
            # arr1=np.argsort(check)[::-1].tolist()
            # rank1=np.argsort(check)[::-1].tolist()
            # pos1=rank1.index(guess_idx)
            # guess = all_letters[guess_idx]
            # org=arr[guess_idx]
            # verify=check[guess_idx]
            return self.all_letters[guess_idx],arr[guess_idx]


            if org<0.005 and pos1>=7 and missing_count<=3:
                # print("Trigger")
                trigger=True
                guess_idx = rank1[0]
            guess = self.all_letters[guess_idx]
            # guesses[guess]=True
            return guess


In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pickle
import re
from collections import Counter

def build_dictionary(dictionary_file_location):
    text_file = open(dictionary_file_location,"r")
    full_dictionary = text_file.read().splitlines()
    text_file.close()
    return full_dictionary

words = build_dictionary("words_250000_train.txt")
value = {ch: i for i, ch in {i: chr(96 + i) for i in range(1, 27)}.items()}
value["_"] = 0
value_rev = {i: chr(96 + i) for i in range(1, 27)}
value_rev[0] = "_"
# value_rev[27] = "_"

In [6]:
import re
from collections import Counter
import numpy as np


def guesss(pattern, tried, verbose=False):
    """
    Predict next letter: use model.predict or fallback to substrings analysis.
    Returns (letter, use_model_flag).
    """
    clean = pattern.replace(" ", "")
    letter, prob = model.predict(clean, tried)
    prob *= 1.2

    for size in range(8, 2, -1):
        allow_multi = size > 3
        candidate, cand_prob = create_substrings(clean, tried_letters=tried,
                                                window=size, min_prob=prob,
                                                allow_multiple=allow_multi)
        if candidate:
            if verbose:
                print(f"used{size}", candidate, cand_prob)
            return candidate, False

    return letter, True


def create_substrings(pattern_str, tried_letters=None, window=6, min_prob=0.1, allow_multiple=False):
    tried_letters = tried_letters or []
    clean = pattern_str.replace(" ", "")

    fragments = []
    for idx, ch in enumerate(clean):
        if ch == '_':
            left = max(idx - (window - 1), 0)
            right = min(idx + window, len(clean))
            placeholder = '.'
            segment = clean[left:idx] + placeholder + clean[idx+1:right]
            if allow_multiple or _is_isolated(clean, idx):
                fragments.append(segment.replace('_', '*') if allow_multiple else segment)

    subs = [seg[i:i+window] for seg in fragments for i in range(len(seg) - window + 1)]
    if allow_multiple:
        subs = [s for s in subs if s.count('*') < 2]
    else:
        subs = [s for s in subs if '_' not in s]
    subs = [s for s in subs if len(s) == window]

    if not subs:
        return None, None

    distributions = []
    for s in subs:
        dot_pos = s.index('.')
        pattern = s.replace('*', '.') if allow_multiple else s
        counts = _count_matching_letters(pattern, dot_pos, tried_letters)
        if counts:
            total = sum(counts.values())
            distributions.append({ltr: cnt/total for ltr, cnt in counts.items()})

    if not distributions:
        return None, None

    agg = _aggregate_probs(distributions)
    best_letter, best_prob = max(agg.items(), key=lambda x: x[1])
    if best_prob < min_prob:
        return None, None
    return best_letter, best_prob


def _is_isolated(s, i):
    return (i == 0 or s[i-1] != '_') and (i == len(s)-1 or s[i+1] != '_')


def _count_matching_letters(regex, dot_index, tried):
    counts = Counter()
    for w in words:
        m = re.search(regex, w)
        if m:
            letter = w[m.start() + dot_index]
            if letter not in tried:
                counts[letter] += 1
    return counts


def _aggregate_probs(dist_list):
    alpha_counts = np.zeros(26)
    for dist in dist_list:
        for ltr, p in dist.items():
            alpha_counts[value[ltr]-1] += p
    alpha_probs = alpha_counts / alpha_counts.sum()
    return {value_rev[i+1]: prob for i, prob in enumerate(alpha_probs)}


In [7]:
xgb = HangmanXGBoostPredictor("xgboost_hangman_models2.pkl")
model=CanineHangmanPlayer("canine-pretrained-hangman-checkpoints/canine-pretrained-hangman-checkpoint-7",xgb=xgb)
model.training=False

tokenizer_config.json:   0%|          | 0.00/854 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/657 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/670 [00:00<?, ?B/s]

In [8]:
import json
import requests
import random
import string
import secrets
import time
import re
import collections

try:
    from urllib.parse import parse_qs, urlencode, urlparse
except ImportError:
    from urlparse import parse_qs, urlparse
    from urllib import urlencode

    
from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [9]:
class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None):
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        
        full_dictionary_location = "words_250000_train.txt"
        self.full_dictionary = self.build_dictionary(full_dictionary_location)        
        self.full_dictionary_common_letter_sorted = collections.Counter("".join(self.full_dictionary)).most_common()
        
        self.current_dictionary = []
        
    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com']

        data = {link: 0 for link in links}

        for link in links:

            requests.get(link)

            for i in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link

    def guess(self, word): # word input example: "_ p p _ e "
        ###############################################
        # Replace with your own "guess" function here #
        ###############################################
        ans,a=guesss(word,self.guessed_letters)
        return ans
 
    def build_dictionary(self, dictionary_file_location):
        text_file = open(dictionary_file_location,"r")
        full_dictionary = text_file.read().splitlines()
        text_file.close()
        return full_dictionary
                
    def start_game(self, practice=True, verbose=True):
        # reset guessed letters to empty set and current plausible dictionary to the full dictionary
        self.guessed_letters = []
        self.current_dictionary = self.full_dictionary
                         
        response = self.request("/new_game", {"practice":practice})
        if response.get('status')=="approved":
            game_id = response.get('game_id')
            word = response.get('word')
            tries_remains = response.get('tries_remains')
            if verbose:
                print("Successfully start a new game! Game ID: {0}. # of tries remaining: {1}. Word: {2}.".format(game_id, tries_remains, word))
            while tries_remains>0:
                # get guessed letter from user code
                guess_letter = self.guess(word)
                    
                # append guessed letter to guessed letters field in hangman object
                self.guessed_letters.append(guess_letter)
                if verbose:
                    print("Guessing letter: {0}".format(guess_letter))
                    
                try:    
                    res = self.request("/guess_letter", {"request":"guess_letter", "game_id":game_id, "letter":guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e
               
                if verbose:
                    print("Sever response: {0}".format(res))
                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status=="success":
                    if verbose:
                        print("Successfully finished game: {0}".format(game_id))
                    return True
                elif status=="failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print("Failed game: {0}. Because of: {1}".format(game_id, reason))
                    return False
                elif status=="ongoing":
                    word = res.get('word')
        else:
            if verbose:
                print("Failed to start a new game")
        return status=="success"
        
    def my_status(self):
        return self.request("/my_status", {})
    
    def request(
            self, path, args=None, post_args=None, method=None):
        if args is None:
            args = dict()
        if post_args is not None:
            method = "POST"

        # Add `access_token` to post_args or args if it has not already been
        # included.
        if self.access_token:
            # If post_args exists, we assume that args either does not exists
            # or it does not need `access_token`.
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)

        num_retry, time_sleep = 50, 2
        for it in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args,
                    verify=False
                )
                break
            except requests.HTTPError as e:
                response = json.loads(e.read())
                raise HangmanAPIError(response)
            except requests.exceptions.SSLError as e:
                if it + 1 == num_retry:
                    raise
                time.sleep(time_sleep)

        headers = response.headers
        if 'json' in headers['content-type']:
            result = response.json()
        elif "access_token" in parse_qs(response.text):
            query_str = parse_qs(response.text)
            if "access_token" in query_str:
                result = {"access_token": query_str["access_token"][0]}
                if "expires" in query_str:
                    result["expires"] = query_str["expires"][0]
            else:
                raise HangmanAPIError(response.json())
        else:
            raise HangmanAPIError('Maintype was not text, or querystring')

        if result and isinstance(result, dict) and result.get("error"):
            raise HangmanAPIError(result)
        return result
    
class HangmanAPIError(Exception):
    def __init__(self, result):
        self.result = result
        self.code = None
        try:
            self.type = result["error_code"]
        except (KeyError, TypeError):
            self.type = ""

        try:
            self.message = result["error_description"]
        except (KeyError, TypeError):
            try:
                self.message = result["error"]["message"]
                self.code = result["error"].get("code")
                if not self.type:
                    self.type = result["error"].get("type", "")
            except (KeyError, TypeError):
                try:
                    self.message = result["error_msg"]
                except (KeyError, TypeError):
                    self.message = result

        Exception.__init__(self, self.message)

# API Usage Examples

## To start a new game:
1. Make sure you have implemented your own "guess" method.
2. Use the access_token that we sent you to create your HangmanAPI object. 
3. Start a game by calling "start_game" method.
4. If you wish to test your function without being recorded, set "practice" parameter to 1.
5. Note: You have a rate limit of 20 new games per minute. DO NOT start more than 20 new games within one minute.

In [10]:
api = HangmanAPI(access_token="e67b107337bbe4cc1c633dd23d2a03", timeout=2000)


## Playing practice games:
You can use the command below to play up to 100,000 practice games.

In [34]:
api.start_game(practice=1,verbose=True)
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
practice_success_rate = total_practice_successes / total_practice_runs

print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))


Successfully start a new game! Game ID: 4839cde1c6fb. # of tries remaining: 6. Word: _ _ _ _ _ _ _ _ _ _ _ _ _ .
Guessing letter: e
Sever response: {'game_id': '4839cde1c6fb', 'status': 'ongoing', 'tries_remains': 5, 'word': '_ _ _ _ _ _ _ _ _ _ _ _ _ '}
Guessing letter: i
Sever response: {'game_id': '4839cde1c6fb', 'status': 'ongoing', 'tries_remains': 5, 'word': '_ _ _ _ _ _ _ _ _ i _ i _ '}
Guessing letter: t
Sever response: {'game_id': '4839cde1c6fb', 'status': 'ongoing', 'tries_remains': 5, 'word': '_ _ t _ _ _ _ _ _ i t i _ '}
Guessing letter: a
Sever response: {'game_id': '4839cde1c6fb', 'status': 'ongoing', 'tries_remains': 5, 'word': 'a _ t a _ _ _ _ _ i t i _ '}
Guessing letter: n
Sever response: {'game_id': '4839cde1c6fb', 'status': 'ongoing', 'tries_remains': 5, 'word': 'a n t a _ _ _ _ _ i t i _ '}
Guessing letter: l
Sever response: {'game_id': '4839cde1c6fb', 'status': 'ongoing', 'tries_remains': 4, 'word': 'a n t a _ _ _ _ _ i t i _ '}
Guessing letter: o
Sever response: 

In [None]:
total,s=total_practice_runs,total_practice_successes
for i in range(200):
    api.start_game(practice=1,verbose=True)
    [total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
    practice_success_rate = (total_practice_successes-s) / (total_practice_runs-total)*100
    time.sleep(0.5)
    print(f"{practice_success_rate:.2f} {total_practice_successes - s} {total_practice_runs - total}", end="\r")

In [12]:
print(total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes)

3711 595 384 2366


In [None]:
m=api.my_status()
total,s=m[0],m[3]
for i in range(400):
    api.start_game(practice=1,verbose=False)
    # api.start_game(practice=1,verbose=True)
    [total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
    practice_success_rate = (total_practice_successes-s) / (total_practice_runs-total)*100
    # time.sleep(0.5)
    print(f"{practice_success_rate:.2f} {total_practice_runs} {i} ", end="\r")

## Playing recorded games:
Please finalize your code prior to running the cell below. Once this code executes once successfully your submission will be finalized. Our system will not allow you to rerun any additional games.

Please note that it is expected that after you successfully run this block of code that subsequent runs will result in the error message "Your account has been deactivated".

Once you've run this section of the code your submission is complete. Please send us your source code via email.

In [None]:
for i in range(2,1000):
    api.start_game(practice=0,verbose=False)
    # api.start_game(practice=1,verbose=True)
    [total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
    rec_success_rate = (total_recorded_successes) / (total_recorded_runs)*100
    # time.sleep(0.5)
    print(f"Playing {i}th game | Success Rate: {rec_success_rate:.2f} ")

Playing 2th game | Success Rate: 33.33 
Playing 3th game | Success Rate: 50.00 
Playing 4th game | Success Rate: 60.00 
Playing 5th game | Success Rate: 66.67 
Playing 6th game | Success Rate: 71.43 
Playing 7th game | Success Rate: 75.00 
Playing 8th game | Success Rate: 77.78 
Playing 9th game | Success Rate: 70.00 
Playing 10th game | Success Rate: 72.73 
Playing 11th game | Success Rate: 75.00 
Playing 12th game | Success Rate: 76.92 
Playing 13th game | Success Rate: 78.57 
Playing 14th game | Success Rate: 73.33 
Playing 15th game | Success Rate: 68.75 
Playing 16th game | Success Rate: 64.71 
Playing 17th game | Success Rate: 66.67 
Playing 18th game | Success Rate: 68.42 
Playing 19th game | Success Rate: 70.00 
Playing 20th game | Success Rate: 66.67 
Playing 21th game | Success Rate: 68.18 
Playing 22th game | Success Rate: 69.57 
Playing 23th game | Success Rate: 70.83 
Playing 24th game | Success Rate: 68.00 
Playing 25th game | Success Rate: 65.38 
Playing 26th game | Succ

In [13]:
for i in range(594,1000):
    api.start_game(practice=0,verbose=False)
    # api.start_game(practice=1,verbose=True)
    [total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
    rec_success_rate = (total_recorded_successes) / (total_recorded_runs)*100
    # time.sleep(0.5)
    print(f"Playing {i}th game | Success Rate: {rec_success_rate:.2f} ")

Playing 594th game | Success Rate: 64.43 
Playing 595th game | Success Rate: 64.32 
Playing 596th game | Success Rate: 64.38 
Playing 597th game | Success Rate: 64.44 
Playing 598th game | Success Rate: 64.50 
Playing 599th game | Success Rate: 64.39 
Playing 600th game | Success Rate: 64.45 
Playing 601th game | Success Rate: 64.51 
Playing 602th game | Success Rate: 64.57 
Playing 603th game | Success Rate: 64.63 
Playing 604th game | Success Rate: 64.69 
Playing 605th game | Success Rate: 64.74 
Playing 606th game | Success Rate: 64.64 
Playing 607th game | Success Rate: 64.53 
Playing 608th game | Success Rate: 64.59 
Playing 609th game | Success Rate: 64.48 
Playing 610th game | Success Rate: 64.54 
Playing 611th game | Success Rate: 64.44 
Playing 612th game | Success Rate: 64.50 
Playing 613th game | Success Rate: 64.55 
Playing 614th game | Success Rate: 64.45 
Playing 615th game | Success Rate: 64.51 
Playing 616th game | Success Rate: 64.56 
Playing 617th game | Success Rate:

HangmanAPIError: {'error': 'You have reached 1000 of games', 'status': 'denied'}

In [None]:
for i in range(1000):
    print('Playing ', i, ' th game')
    #Uncomment the following line to execute your final runs. Do not do this until you are satisfied with your submission
    api.start_game(practice=0,verbose=False)
    
    #DO NOT REMOVE as otherwise the server may lock you out for too high frequency of requests
    time.sleep(0.5)

## To check your game statistics
1. Simply use "my_status" method.
2. Returns your total number of games, and number of wins.

In [15]:
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
success_rate = total_recorded_successes/total_recorded_runs
print('overall success rate = %.4f' % success_rate)

overall success rate = 0.6640
