In [1]:
import json
import requests
import random
import string
import secrets
import time
import re
import collections
import numpy as np
from charbert import *
from collections import defaultdict
from joblib import Parallel, delayed
from tqdm import tqdm

try:
    from urllib.parse import parse_qs, urlencode, urlparse
except ImportError:
    from urlparse import parse_qs, urlparse
    from urllib import urlencode

from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# Model loading

In [None]:
# Create the tokenizer
tokenizer = CharTokenizer()

# Create the model
vocab_size=len(tokenizer.vocab)
model = CNNBERT(vocab_size=vocab_size)

# use cuda if available
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
model = model.to(device)

model.load_state_dict(torch.load("model.pth"), strict=True)

In [8]:
class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None):
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        
        full_dictionary_location = "words_250000_train.txt"
        self.full_dictionary = self.build_dictionary(full_dictionary_location) 
        self.full_dictionary = list( set(self.full_dictionary) )
        
        # updated the prob with length adjusted. 
        self.ngrams = 3
        self.Ngramdict = self.ngram_prob(self.full_dictionary, self.ngrams)
        self.unigram = sorted( [(k,v) for k,v in self.Ngramdict['gram_1'].items()], key = lambda x: x[1], reverse = True)
        
    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']

        data = {link: 0 for link in links}

        for link in links:

            requests.get(link)

            for i in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link

    def ngram_prob(self,full_dictionary, Ngrams = 6):
        """
        Generate the Ngram probabilities for the dictionary

        Args:
            full_dictionary (_type_): _description_
            Ngrams (int, optional): _description_. Defaults to 6.
        """
        def generate_N_grams(text,ngram=1):
            char = [i for i in text]

            temp=zip(*[char[i:] for i in range(0,ngram)])
            ans=[' '.join(ngram) for ngram in temp]
            return ans
        
        Ngramdict = {f"gram_{n}": defaultdict(int) for n in range(1,Ngrams+1)}
        FNgramdict = {}

        for gram,dict in Ngramdict.items():
            for word in full_dictionary:
                for ngram in generate_N_grams(word,int(gram[-1])):
                    dict[ngram]+=1
            # normalize the ngram counts
            sum_ = sum(dict.values())
            FNgramdict[gram] = {k: np.log(v/sum_) for k,v in dict.items()}
        
        return FNgramdict

    def calculate_probability_chain_rule(self, word):
        """
        Calculate the probability of a word using the chain rule

        Args:
            word: word to calculate the probability of

        Returns:
            log_probability: the log probability of the word
        """
        Ngramdict = self.Ngramdict
        # Convert word to a list of characters and initialize the chain
        chain = ["".join(word[:i])[-self.ngrams:] for i in range(1, len(word) + 1)]
        
        # calculate the log probability of the word using the chain rule
        log_probability = 0
        for c in chain[::-1]:
            use_n = min(self.ngrams, len(c))  # Determine the appropriate N-gram length
            c = " ".join(c)  # Format the chain segment as a space-separated string
            
            gram_key = f"gram_{use_n}"
            if c in Ngramdict[gram_key]:
                log_probability += Ngramdict[gram_key][c]
            else: 
                log_probability += 2*min( Ngramdict[gram_key].values())

        return log_probability
    
    def scaledcharprob(self, given_dictionary):
        """
        Get the character probabilities scaled using the scores

        Args:
            given_dictionary: dictionary of words and their scores

        Returns:
            char_prob: list of characters and their probabilities
        """
        listofchar = ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z')
        
        
        scores = [i[1] for i in given_dictionary]
        scores = np.array(scores)[:,None] # beam, 1
        assert scores.shape == (len(given_dictionary),1)

        words = [i[0] for i in given_dictionary]
        if len(words) == 0:
            return [(i, 1/len(listofchar)) for i in listofchar]
        
        
        char_prob = []
        for word in words: 
            dummy = [0]*len(listofchar)
            for char in word:
                dummy[listofchar.index(char)] += 1
         
            dummy = [x / len(word) for x in dummy]
            # smoothing 
            dummy = [i + max(dummy)*0.01 for i in dummy]
            dummy = [i/sum(dummy) for i in dummy]
            char_prob.append(dummy)
        
        char_prob = np.array(char_prob)
        char_prob = np.log(char_prob) # beam, 26
        
        char_prob += scores
        char_prob /= char_prob.shape[0]
        
        char_prob = char_prob.sum(axis=0)

        length = len(char_prob)
        char_prob = [(listofchar[i], char_prob[i]) for i in range(length)]
        char_prob.sort(key = lambda x: x[1], reverse = True)        

        return char_prob
    
    def guess(self, word): # word input example: "_ p p _ e "[::2].replace("_",".")
                  
        def likelihhod_charbert(word,topk=4): # using character bert model 
            """
            Get the likelihood of the possible words using the character bert model

            Args:
                word: word to get the likelihood of
                topk: number of top k possible words to return

            Returns:
                possible_words: dictionary of possible words and their likelihood scores
            """

            ratio = 1 - len(word.replace("_",""))/len(word)
            topk = max(1, int(topk*ratio))
            word = word[::2]
            
            possible_words = Test(model, tokenizer, device, topk, word, self.guessed_letters)
            possible_words = {i[0]:i[1] for i in possible_words} # log probability of the words
       
            return possible_words

        def posterior(word):
            """
            Calculate the posterior of the possible words using the likelihood and prior

            Args:
                word: mask word to calculate the posterior of

            Returns:
                scaledcharprob: list of characters and their probabilities
            """
            
            # word statistics
            num_masked = word.count("_")
            len_word = len(word[::2])
            
            # step 1: if all the characters are masked, return the unigram
            if num_masked == len_word : 
                return self.unigram 
            
            # step 2: if some characters are masked, return the posterior using the likelihood and prior
            
            # charbert_likelihood
            self.likelihood = likelihhod_charbert(word)
            
            # Use the ngrams to get the Prior over the possible words generated by the charbert model
            dummy = {}
            for k in self.likelihood.keys():
                dummy[k] = self.calculate_probability_chain_rule(k)
            self.Prior = dummy
            
            # rescale the likelihood and prior to the same scale
            max_likelihood = max(self.likelihood.values())
            max_prior = max(self.Prior.values())
            rescale = max_likelihood / max_prior
            self.Prior = {k: v*rescale for k,v in self.Prior.items()}
        
            # posterior
            self.Posterior = {}
            for k in self.likelihood.keys():
                self.Posterior[k] = self.likelihood[k] + self.Prior[k]
                
            self.Posterior = [(k,v) for k,v in self.Posterior.items()]
            return self.scaledcharprob(self.Posterior)
        
        def Guess(dist):
            """
            Guess the letter to fill in the word

            Args:
                dist: distribution of the possible characters

            Returns:
                guess_letter: the most probable character to fill in the masked word
            """
            dist.sort(key = lambda x: x[1], reverse = True) # sort the distribution in descending order
                 
            for letter,_ in dist:
                if letter not in self.guessed_letters:
                    guess_letter = letter
                    break            
                    
            return guess_letter
        
        guess_letter = Guess(posterior(word))    
        
        return guess_letter

    
    def build_dictionary(self, dictionary_file_location):
        text_file = open(dictionary_file_location,"r")
        full_dictionary = text_file.read().splitlines()
        text_file.close()
        return full_dictionary
                
    def start_game(self, practice=True, verbose=True):
        # reset guessed letters to empty set and current plausible dictionary to the full dictionary
        self.guessed_letters = []
        self.current_dictionary = self.full_dictionary
                         
        response = self.request("/new_game", {"practice":practice})
        if response.get('status')=="approved":
            game_id = response.get('game_id')
            word = response.get('word')
            tries_remains = response.get('tries_remains')
            if verbose:
                print("Successfully start a new game! Game ID: {0}. # of tries remaining: {1}. Word: {2}.".format(game_id, tries_remains, word))
            while tries_remains>0:
                # get guessed letter from user code
                guess_letter = self.guess(word)
                    
                # append guessed letter to guessed letters field in hangman object
                self.guessed_letters.append(guess_letter)
                if verbose:
                    print("Guessing letter: {0}".format(guess_letter))
                    
                try:    
                    res = self.request("/guess_letter", {"request":"guess_letter", "game_id":game_id, "letter":guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e
               
                if verbose:
                    print("Sever response: {0}".format(res))
                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status=="success":
                    if verbose:
                        print("Successfully finished game: {0}".format(game_id))
                    return True
                elif status=="failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print("Failed game: {0}. Because of: {1}".format(game_id, reason))
                    return False
                elif status=="ongoing":
                    word = res.get('word')
        else:
            if verbose:
                print("Failed to start a new game")
        return status=="success"
        
    def my_status(self):
        return self.request("/my_status", {})
    
    def request(
            self, path, args=None, post_args=None, method=None):
        if args is None:
            args = dict()
        if post_args is not None:
            method = "POST"

        # Add `access_token` to post_args or args if it has not already been
        # included.
        if self.access_token:
            # If post_args exists, we assume that args either does not exists
            # or it does not need `access_token`.
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)

        num_retry, time_sleep = 50, 2
        for it in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args,
                    verify=False
                )
                break
            except requests.HTTPError as e:
                response = json.loads(e.read())
                raise HangmanAPIError(response)
            except requests.exceptions.SSLError as e:
                if it + 1 == num_retry:
                    raise
                time.sleep(time_sleep)

        headers = response.headers
        if 'json' in headers['content-type']:
            result = response.json()
        elif "access_token" in parse_qs(response.text):
            query_str = parse_qs(response.text)
            if "access_token" in query_str:
                result = {"access_token": query_str["access_token"][0]}
                if "expires" in query_str:
                    result["expires"] = query_str["expires"][0]
            else:
                raise HangmanAPIError(response.json())
        else:
            raise HangmanAPIError('Maintype was not text, or querystring')

        if result and isinstance(result, dict) and result.get("error"):
            raise HangmanAPIError(result)
        return result
    
class HangmanAPIError(Exception):
    def __init__(self, result):
        self.result = result
        self.code = None
        try:
            self.type = result["error_code"]
        except (KeyError, TypeError):
            self.type = ""

        try:
            self.message = result["error_description"]
        except (KeyError, TypeError):
            try:
                self.message = result["error"]["message"]
                self.code = result["error"].get("code")
                if not self.type:
                    self.type = result["error"].get("type", "")
            except (KeyError, TypeError):
                try:
                    self.message = result["error_msg"]
                except (KeyError, TypeError):
                    self.message = result

        Exception.__init__(self, self.message)
    

In [5]:
api = HangmanAPI(access_token="f718c442ecd42d896508fe7db1789a", timeout=2000)

In [None]:
api.start_game(practice=1,verbose=True)

[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
practice_success_rate = total_practice_successes / total_practice_runs
print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))

total_practice_successes

## Final submission

In [None]:
for i in range(1000):
    print('Playing ', i, ' th game')
    api.start_game(practice=0,verbose=False)
    
    time.sleep(0.5)

In [29]:
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
success_rate = total_recorded_successes/total_recorded_runs
print('overall success rate = %.3f' % success_rate)

overall success rate = 0.530


# The END