In [None]:
"""

Integrates the ensemble model (CatBoost + XGBoost) with heuristic methods
to enhance the Hangman guessing strategy. This script interacts with the
Hangman API to play games using the improved guessing algorithm.
"""

import json
import requests
import random
import string
import time
import re
import collections
import pickle
import numpy as np
import pandas as pd
import os
from sklearn.linear_model import LogisticRegression
from typing import Optional, Dict, List
from sklearn.metrics import balanced_accuracy_score
from tqdm import tqdm

# Disable SSL warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [None]:


# Define the Ensemble Model Class
class MultiLabelEnsemble:
    """
    Loads XGBoost and CatBoost multi-label models,
    generates probability predictions, and uses meta-classifiers
    to combine them into final predictions.
    """
    def __init__(self, catboost_model_path, xgboost_model_path, meta_classifier_path=None):
        # Load the pre-trained CatBoost & XGBoost multi-label models
        self.catboost_model = self._load_model(catboost_model_path)
        self.xgboost_model = self._load_model(xgboost_model_path)

        # Load meta-classifiers if provided, else initialize
        if meta_classifier_path and os.path.exists(meta_classifier_path):
            with open(meta_classifier_path, 'rb') as f:
                self.meta_classifiers = pickle.load(f)
        else:
            self.meta_classifiers = [LogisticRegression() for _ in range(26)]

    def _load_model(self, path):
        with open(path, 'rb') as f:
            model = pickle.load(f)
        return model

    def predict_proba(self, X):
        """
        Generate probability predictions for each letter using the ensemble model.

        Parameters:
            X (pd.DataFrame or np.array): Feature matrix of shape (n_samples, 80)

        Returns:
            np.array: Probability matrix of shape (n_samples, 26)
        """
        # Get CatBoost probabilities
        catboost_pred_proba = self.catboost_model.predict_proba(X)  # Shape: (n_samples, 26)

        # Get XGBoost probabilities
        xgboost_pred_proba = self.xgboost_model.predict_proba(X)  # Shape: (n_samples, 26)
        # Concatenate predictions
        stacked_features = np.concatenate([catboost_pred_proba, xgboost_pred_proba], axis=1)  # Shape: (n_samples, 52)

        # Predict with meta-classifiers
        final_preds = []
        for i in range(26):
            probs_i = self.meta_classifiers[i].predict_proba(stacked_features)[:, 1]  # Probability of class '1'
            final_preds.append(probs_i)

        return np.array(final_preds).T  # Shape: (n_samples, 26)

    def save(self, filename):
        with open(filename, 'wb') as f:
            pickle.dump(self.meta_classifiers, f)

    def load_meta_classifiers(self, filename):
        with open(filename, 'rb') as f:
            self.meta_classifiers = pickle.load(f)

# Helper Functions and Mappings
alphabet = "abcdefghijklmnopqrstuvwxyz"
value_map = {ch: idx+1 for idx, ch in enumerate(alphabet)}
value_map['_'] = 0
value_rev = {v: k for k, v in value_map.items()}

def encode_word(trial_word):
    """
    Encodes the current word state into an 80-length feature vector.

    Parameters:
        trial_word (str): Current word state with underscores (e.g., "_pp_e")

    Returns:
        np.array: Encoded feature vector of length 80
    """
    word_length = len(trial_word)
    encoded = np.full(80, -1, dtype=np.int8)

    for i, ch in enumerate(trial_word):
        if ch != '_':
            encoded[i] = value_map.get(ch, 0)
            encoded[80 - word_length + i] = value_map.get(ch, 0)
        else:
            encoded[i] = 0
            encoded[80 - word_length + i] = 0
    return encoded

def create_substrings(trial, guessed=[], n=6, threshold=0.1, multiple=False, words=[]):
    """
    Creates substrings around missing letters to apply heuristic analysis.

    Parameters:
        trial (str): Current word state with underscores
        guessed (list): List of already guessed letters
        n (int): Substring length
        threshold (float): Confidence threshold
        multiple (bool): Whether to consider multiple missing letters
        words (list): List of all training words

    Returns:
        tuple: (most probable letter, probability) or (None, None)
    """
    substring = []
    for i in range(len(trial)):
        if trial[i] == '_':
            if multiple:
                # Replace all '_' with '.' for regex flexibility
                substring.append((trial[max(i-n+1,0):i] + '.' + trial[i+1:min(i+n,len(trial))]).replace("_","*"))
            else:
                # Only consider single missing letters
                if (i == 0 or trial[i-1] != '_') and (i == len(trial)-1 or trial[i+1] != '_'):
                    substring.append(trial[max(i-n+1,0):i] + '.' + trial[i+1:min(i+n,len(trial))])

    # Extract substrings of exact length n
    substrings = []
    for s in substring:
        for j in range(len(s)-n+1):
            substrings.append(s[j:j+n])

    # Filter substrings based on missing letter count
    if multiple:
        substrings = [s for s in substrings if s.count('*') < 2]
    else:
        substrings = [s for s in substrings if s.count('_') == 0]

    substrings = [s for s in substrings if len(s) == n]

    if not substrings:
        return None, None

    ans = []
    for s in substrings:
        ind = s.index('.') if '.' in s else s.index('*')  # Find the position of missing letter
        letters = []
        for word in words:
            match = re.search(s.replace('*', '.'), word)
            if match:
                l = word[match.start()+ind]
                if l not in guessed:
                    letters.append(l)
        if not letters:
            continue
        # Calculate frequency
        counter = collections.Counter(letters)
        total = len(letters)
        freq = [[char, count / total, count] for char, count in counter.most_common()]
        ans.append(freq)

    if not ans:
        return None, None

    # Aggregate frequencies
    bb = np.zeros(26)
    for a in ans:
        for entry in a:
            ch, prob, _ = entry
            bb[value_map[ch]-1] += prob

    if bb.sum() > 0:
        bb /= bb.sum()  # Normalize probabilities
    else:
        bb = np.zeros(26)

    if bb.max() < threshold:
        return None, None

    return value_rev[bb.argmax()+1], bb.max()


def calculate_known_percentage(word_segment):
    """
    Calculate the percentage of known letters in a word segment.

    :param word_segment: string of the word segment to analyze
    :return: float percentage of known letters
    """
    if not word_segment:
        return 0.0

    total_letters = len(word_segment)
    known_letters = sum(1 for char in word_segment if char != '_')
    return (known_letters / total_letters) * 100

def guess_next_letter_suff_pref(trial_word, guessed_letters=None):
    """
    Guess the next letter based on common prefix and suffix heuristics.

    :param trial_word: string representing the current state of the word, e.g., "a _ t _ m a _ i c"
    :param common_prefixes: list of common prefixes
    :param common_suffixes: list of common suffixes
    :param guessed_letters: set of letters already guessed (optional)
    :return: the next letter to guess based on heuristics, or None if no guess can be made
    """
    # Define common prefixes and suffixes
    common_prefixes = [
        "aero", "agri", "allo", "ambi", "amphi",
        "ante", "anti", "arch", "astro", "audio",
        "auto", "bene", "biblio", "cardio", "chrono",
        "circum", "contra", "counter", "crypto", "cryo",
        "cyber", "demi", "demo", "derma", "ecto",
        "electro", "endo", "ethno", "extra", "fore",
        "gastro", "gyro", "helio", "hemi", "hetero",
        "hexo", "homo", "hyper", "hydro", "hypo",
        "ideo", "indo", "infra", "inter", "intra",
        "iso", "litho", "macro", "mammo", "mega",
        "meso", "meta", "micro", "mini", "mono",
        "multi", "necro", "neuro", "omni", "over",
        "para", "peri", "phono", "photo", "poly",
        "post", "proto", "pseudo", "psycho", "pyro",
        "quad", "retro", "rhino", "semi", "socio",
        "stereo", "super", "supra", "techno", "tele",
        "theo", "thermo", "trans", "typo", "ultra",
        "under", "vice", "xeno", "peri"
    ]

    common_suffixes = [
        "able", "aceous", "algia", "ance", "archy",
        "arian", "arium", "atic", "ation", "ative",
        "cidal", "cide", "ciate", "cracy", "dial","cally"
        "ectomy", "ence", "eous", "escent", "esque",
        "etic", "fold", "fully", "graph", "hood",
        "ible", "icide", "icity", "iform", "ious",
        "itis", "itude", "ness", "lling", "logy",
        "mania", "ment", "meter", "morph", "most",
        "less", "ology", "osis", "phile", "phobe",
        "scope", "scopy", "ship", "sion", "some",
        "sophy", "ster", "therm", "tious", "tomy",
        "tude", "ulate", "ulent", "ville", "ward",
        "wards", "wise", "worthy"
    ]

    # Remove spaces and convert to lowercase for uniformity
    word = trial_word.replace(' ', '').lower()

    if guessed_letters is None:
        guessed_letters = set()

    # Prefix Heuristic
    for prefix in common_prefixes:
        if len(prefix) > len(word):
            continue

        prefix_match = True
        missing_letters = []
        word_segment = word[:len(prefix)]

        # Calculate known percentage for the word segment that would match the prefix
        known_percentage = calculate_known_percentage(word_segment)

        # Only proceed if known percentage is > 60%
        if known_percentage <= 55:
            continue

        for i in range(len(prefix)):
            word_char = word[i]
            prefix_char = prefix[i]

            if word_char == '_':
                if prefix_char not in guessed_letters:
                    missing_letters.append(prefix_char)
            elif word_char != prefix_char:
                prefix_match = False
                break

        if prefix_match and missing_letters:
            return missing_letters[0]

    # Suffix Heuristic
    for suffix in common_suffixes:
        if len(suffix) > len(word):
            continue

        suffix_match = True
        missing_letters = []
        word_segment = word[-len(suffix):]

        # Calculate known percentage for the word segment that would match the suffix
        known_percentage = calculate_known_percentage(word_segment)

        # Only proceed if known percentage is > 60%
        if known_percentage <= 65:
            continue

        for i in range(1, len(suffix) + 1):
            word_char = word[-i]
            suffix_char = suffix[-i]

            if word_char == '_':
                if suffix_char not in guessed_letters:
                    missing_letters.append(suffix_char)
            elif word_char != suffix_char:
                suffix_match = False
                break

        if suffix_match and missing_letters:
            return missing_letters[0]

    return None


In [None]:


# Updated HangmanAPI Class with Ensemble Integration
class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None, ensemble_model_path='ensemble_model_xgb_catboost.pkl', catboost_model_path='multilabel_catboost_model.pkl', xgboost_model_path='multilabel_xgb_model.pkl', meta_classifier_path='meta_classifiers.pkl'):
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        self.words = self.build_dictionary("words_250000_train.txt")
        self.successforgame=0
        # Initialize the ensemble model
        self.ensemble_model = MultiLabelEnsemble(catboost_model_path, xgboost_model_path, meta_classifier_path)

    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']
        data = {link: 0 for link in links}
        for link in links:
            for _ in range(10):
                s = time.time()
                try:
                    requests.get(link, timeout=1)
                    data[link] += time.time() - s
                except:
                    data[link] += float('inf')
        # Select the fastest link
        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link




    def guess(self, word):
        """
        Enhanced guess function using ensemble model and heuristics.

        Parameters:
            word (str): Current word state with spaces (e.g., "_ p p _ e")

        Returns:
            str: Next letter to guess
        """
        # Clean the word: remove spaces
        trial_word = word.replace(" ", "")

        # Encode the current word state
        encoded = encode_word(trial_word).reshape(1, -1)  # Shape: (1, 80)

        # Convert to DataFrame if necessary
        if isinstance(encoded, np.ndarray):
            X = pd.DataFrame(encoded, columns=[str(i) for i in range(80)])
        else:
            X = encoded

        # Get ensemble predictions
        probs = self.ensemble_model.predict_proba(X)[0]  # Shape: (26,)

        # Create a probability dictionary
        prob_dict = {ch: probs[i] for i, ch in enumerate(alphabet)}

        # Sort letters by probability in descending order
        sorted_probs = sorted(prob_dict.items(), key=lambda x: x[1], reverse=True)

        # Heuristics: Substring, Prefix, Suffix, Vowels
        # Prioritize using heuristics to adjust probabilities

        # Apply Substring Heuristics
        letter_from_substr, substr_prob = create_substrings(
            trial_word,
            guessed=self.guessed_letters,
            n=6,
            threshold=0.1,
            multiple=True,
            words=self.words
        )

        if letter_from_substr and substr_prob > 0.2:
            if letter_from_substr not in self.guessed_letters:
                return letter_from_substr

        # Apply Prefix/Suffix Heuristics (common prefixes/suffixes)
        # Define common prefixes and suffixes
        letter_from_pref_suffix = guess_next_letter_suff_pref(trial_word=word, guessed_letters=self.guessed_letters)
        if(letter_from_pref_suffix!=None):
            print("Letter from prefix and suffix")
            return letter_from_pref_suffix

        # Prioritize vowels if not enough information
        vowels = ['a', 'e', 'i', 'o', 'u']
        # Sort vowels by their probability and pick the highest one not guessed yet
        vowels_sorted = sorted([v for v in vowels if v not in self.guessed_letters], key=lambda x: -prob_dict.get(x, 0))
        max_vowels_guess=2
        if(len(word)>5):
            max_vowels_guess=3
        if vowels_sorted and len(self.guessed_letters)<max_vowels_guess:
            return vowels_sorted[0]

        # If no heuristics apply, pick the highest probability letter not guessed yet
        for ch, prob in sorted_probs:
            if ch not in self.guessed_letters:
                return ch

        # Fallback: pick a random letter not guessed yet
        remaining_letters = [ch for ch in alphabet if ch not in self.guessed_letters]
        if remaining_letters:
            return random.choice(remaining_letters)

        # If all letters guessed, return a random letter (shouldn't happen)
        return random.choice(alphabet)

    def build_dictionary(self, dictionary_file_location):
        """Read dictionary words from a text file."""
        with open(dictionary_file_location, "r") as f:
            full_dictionary = f.read().splitlines()
        return full_dictionary

    def start_game(self, practice=True, verbose=True):
        # Reset guessed letters
        self.guessed_letters = []

        response = self.request("/new_game", {"practice": practice})
        if response.get('status') == "approved":
            game_id = response.get('game_id')
            word = response.get('word')  # Initial masked word
            tries_remains = response.get('tries_remains')
            if verbose:
                print(f"New game started! Game ID: {game_id}. Tries remaining: {tries_remains}. Word: {word}")
            while tries_remains > 0:
                # Get guessed letter from the enhanced guess function
                guess_letter = self.guess(word)

                # Append guessed letter to guessed letters
                self.guessed_letters.append(guess_letter)
                if verbose:
                    print(f"Guessing letter: {guess_letter}")

                try:
                    res = self.request("/guess_letter", {"request": "guess_letter", "game_id": game_id, "letter": guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e

                if verbose:
                    print(f"Server response: {res}")

                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status == "success":
                    self.successforgame+=1
                    if verbose:
                        print(f"Successfully guessed the word: {res.get('word')}")
                    return True
                elif status == "failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print(f"Failed to guess the word. Reason: {reason}")
                    return False
                elif status == "ongoing":
                    word = res.get('word')  # Update the masked word
        else:
            if verbose:
                print("Failed to start a new game")
        return status == "success"

    ##########################################################
    # The following methods remain unchanged from your original code
    ##########################################################

    def my_status(self):
        return self.request("/my_status", {})

    def request(self, path, args=None, post_args=None, method=None):
        if args is None:
            args = dict()
        if post_args is not None:
            method = "POST"

        # Add `access_token` to post_args or args if it has not already been included.
        if self.access_token:
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)

        num_retry, time_sleep = 50, 2
        for it in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args,
                    verify=False
                )
                break
            except requests.HTTPError as e:
                response = json.loads(e.read())
                raise HangmanAPIError(response)
            except requests.exceptions.SSLError as e:
                if it + 1 == num_retry:
                    raise
                time.sleep(time_sleep)

        headers = response.headers
        if 'json' in headers.get('content-type', ''):
            result = response.json()
        elif "access_token" in parse_qs(response.text):
            query_str = parse_qs(response.text)
            if "access_token" in query_str:
                result = {"access_token": query_str["access_token"][0]}
                if "expires" in query_str:
                    result["expires"] = query_str["expires"][0]
            else:
                raise HangmanAPIError(response.json())
        else:
            raise HangmanAPIError('Maintype was not text, or querystring')

        if result and isinstance(result, dict) and result.get("error"):
            raise HangmanAPIError(result)
        return result

class HangmanAPIError(Exception):
    def __init__(self, result):
        self.result = result
        self.code = None
        try:
            self.type = result["error_code"]
        except (KeyError, TypeError):
            self.type = ""

        try:
            self.message = result["error_description"]
        except (KeyError, TypeError):
            try:
                self.message = result["error"]["message"]
                self.code = result["error"].get("code")
                if not self.type:
                    self.type = result["error"].get("type", "")
            except (KeyError, TypeError):
                try:
                    self.message = result["error_msg"]
                except (KeyError, TypeError):
                    self.message = result
        Exception.__init__(self, self.message)



In [None]:
class MultiLabelXGBoostClassifier:
    """
    Trains 26 separate XGBoost binary classifiers, each predicting whether
    a letter is needed in the guess set.
    Includes checkpointing to save progress after each classifier is trained.
    """
    def __init__(self, num_classes: int = 26, xgb_params: Optional[Dict] = None):
        self.num_classes = num_classes
        # Default XGBoost params can be overridden
        default_params = {
            "n_estimators": 100,
            "max_depth": 6,
            "learning_rate": 0.1,
            "use_label_encoder": False,
            "eval_metric": "logloss",
            "verbosity": 0  # Suppress XGBoost's own logs
        }
        if xgb_params is not None:
            default_params.update(xgb_params)
        self.xgb_params = default_params

        # Create a list of XGBClassifier models
        self.models: List[xgb.XGBClassifier] = [
            xgb.XGBClassifier(**self.xgb_params) for _ in range(num_classes)
        ]

        # Keep track of trained classifier indices
        self.trained_indices = set()

    def train_single_classifier(self, i: int, X: pd.DataFrame, y_i: np.ndarray):
        """
        Trains a single classifier for the given index.
        """
        letter = alphabet[i]
        print(f"Training classifier for letter '{letter}'...")
        self.models[i].fit(X, y_i)
        self.trained_indices.add(i)
        print(f"Classifier for letter '{letter}' trained.")

    def predict_proba(self, X: pd.DataFrame) -> np.ndarray:
        """
        Returns an array of shape (n_samples, 26) with the predicted probability
        for each letter being '1' (i.e., needed).
        """
        all_preds = []
        print("Generating probability predictions...")
        # Initialize tqdm progress bar
        # progress_bar = tqdm(range(self.num_classes), desc="Predicting Probabilities", unit="classifier", dynamic_ncols=True)
        for i in range(self.num_classes):
            letter = alphabet[i]
            preds_i = self.models[i].predict_proba(X)[:, 1]
            all_preds.append(preds_i)
        return np.array(all_preds).T  # shape => (n_samples, 26)

    def save(self, filename: str):
        """
        Saves the trained model to a file.
        """
        with open(filename, 'wb') as f:
            pickle.dump(self, f)
        print(f"Model saved to {filename}")

    @classmethod
    def load(cls, filename: str):
        """
        Loads a trained model from a file.
        """
        with open(filename, 'rb') as f:
            model = pickle.load(f)
        print(f"Model loaded from {filename}")
        return model



In [None]:
class MultiLabelCatBoostClassifier:
    """
    Trains 26 CatBoost binary classifiers (one per letter).
    Each classifier predicts if a letter is 'hidden' (i.e., in the word but not revealed in the partial encoding).
    """
    def __init__(self, num_classes=26, catboost_params=None):
        if catboost_params is None:
            catboost_params = {
                "iterations": 1500,
                "task_type": "GPU",
                "verbose": False
            }
        # Create 26 CatBoost classifiers
        self.classifiers = [
            CatBoostClassifier(**catboost_params) for _ in range(num_classes)
        ]
        self.num_classes = num_classes

    def fit(self, X, y):
        for i in range(self.num_classes):
            self.classifiers[i].fit(X, y[alpha[i]], verbose=100)

    def predict_proba(self, X):
        """
        Return predicted probability for each of the 26 letters.
        Shape: (n_samples, 26)
        """
        num_classes=26
        predictions = np.zeros((len(X), num_classes))
        for i, clf in enumerate(self.classifiers):
            # Probability that letter i is actually hidden in the word
            predictions[:, i] = clf.predict_proba(X)[:, 1]
        return predictions

    def save(self, filename):
        with open(filename, 'wb') as f:
            pickle.dump(self, f)

    @classmethod
    def load(cls, filename):
        with open(filename, 'rb') as f:
            model = pickle.load(f)
        return model

In [None]:
api = HangmanAPI(
        access_token="82ed141920582322b3c4a774b98cbd",
        timeout=2000,
        ensemble_model_path="ensemble_model_xgb_catboost.pkl",
        catboost_model_path="multilabel_catboost_model.pkl",
        xgboost_model_path="multilabel_xgb_model.pkl",
        meta_classifier_path="meta_classifiers.pkl"
    )

In [None]:
import sklearn
print(sklearn.__version__)

1.6.0


In [None]:
api.successforgame=0

runs = 0
while(runs < 50):
    runs += 1
    api.start_game(practice=1, verbose=True)
    [total_practice_runs, total_recorded_runs, total_recorded_successes, total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
    practice_success_rate = api.successforgame / runs
    print('run %d game. practice success rate so far = %.3f' % (runs, practice_success_rate))
    # time.sleep(1.0)
    print(time.localtime())

print("DONE")
practice_success_rate = api.successforgame / runs
print('run %d practice games out of an allotted 100,000. practice success rate so far = %.3f' % (total_practice_runs, practice_success_rate))


HangmanAPIError: {'error': 'Your account has been deactivated!'}

In [None]:
for i in range(1000):
    print('Playing ', i, ' th game')
    # Uncomment the following line to execute your final runs. Do not do this until you are satisfied with your submission
    if(i<10):
       api.start_game(practice=0,verbose=True)
    else:
        api.start_game(practice=0,verbose=False)

    # DO NOT REMOVE as otherwise the server may lock you out for too high frequency of requests
    time.sleep(0.5)

Playing  0  th game


HangmanAPIError: {'error': 'Your account has been deactivated!'}

In [None]:
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
success_rate = total_recorded_successes/total_recorded_runs
print('overall success rate = %.3f' % success_rate)

overall success rate = 0.597
