##### Imports

In [1]:
import sys
from pathlib import Path
import warnings

import warnings
import pandas as pd
warnings.filterwarnings('ignore')

pd.set_option('display.max_columns', 1000)
pd.set_option('display.max_rows', 1000)

import sys
# Custom library paths
sys.path.extend(['../', './scr'])

from scr.utils import set_seed
from scr.utils import read_words
from pathlib import Path
import random
from collections import Counter, defaultdict
import pickle
from tqdm import tqdm
from torch.utils.data import Dataset

from scr.utils import read_words, save_words_to_file

import pickle
from pathlib import Path
from scr.dataset import *
from scr.utils import *
# # For inference
from scr.feature_engineering import *

import gc

set_seed(42)

import torch
import torch.nn as nn
from pathlib import Path
import random

from scr.utils import print_scenarios
torch.set_float32_matmul_precision('medium')
from pathlib import Path

# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# # Read and Shuffle Word List
word_list = read_words('data/words_250000_train.txt') # , limit=10000)
# word_list = read_words('data/250k.txt', limit=10000)

random.shuffle(word_list)

# Calculate Frequencies and Max Word Length
word_frequencies = calculate_word_frequencies(word_list)
char_frequency = calculate_char_frequencies(word_list)
max_word_length = max(len(word) for word in word_list)

##### Data Dir

In [2]:
NUM_STRATIFIED_SAMPLES = 2_00 # This will be overwritten by Papermill

NUM_WORD_SAMPLE = 1_000 # words for testing

FAST_DEV_RUN = False

MAX_EPOCH = 250

In [3]:
from pathlib import Path
from scr.custom_sampler import *

# Define the base directory and the paths for training and validation parquet files
base_dataset_dir = Path("/media/sayem/510B93E12554BBD1/dataset/")
stratified_samples_dir = base_dataset_dir / str(NUM_STRATIFIED_SAMPLES)
parquet_path = stratified_samples_dir / 'parquets'

# Create directories for train and validation parquets if they don't exist
parquet_path.mkdir(parents=True, exist_ok=True)
# parquet_valid_path.mkdir(parents=True, exist_ok=True)

# Define and create the directory for models
models_dir = Path("/home/sayem/Desktop/Hangman/models")
models_dir.mkdir(parents=True, exist_ok=True)

# Define your output directory
# Define your output directory and logger directory
output_dir = Path("/home/sayem/Desktop/Hangman/training_outputs")
logger_dir = output_dir / "lightning_logs"

# Create the output and logger directories if they don't exist
output_dir.mkdir(parents=True, exist_ok=True)
logger_dir.mkdir(parents=True, exist_ok=True)

# Define the file path for saving the testing words
testing_words_file_path = stratified_samples_dir / "testing_words.txt"

try:
    testing_word_list = read_words(testing_words_file_path)
    print(f"Length of the testing word list: {len(testing_word_list)}")
    sampled_test_words = stratified_sample_by_length_and_uniqueness(testing_word_list, NUM_WORD_SAMPLE)
    print(f"Sampled {len(sampled_test_words)} unique words for testing.")
except FileNotFoundError:
    print(f"File not found: {testing_words_file_path}")

print(len(sampled_test_words))

Length of the testing word list: 10048
Sampled 1085 unique words for testing.
1085


##### Dataset Loading and train test split

In [4]:
# Create datasets directly from the saved parquet files
hangman_dataset = HangmanDataset(parquet_path)
# valid_dataset = HangmanDataset(parquet_valid_path)

from scr.utils import *

# print(len(train_dataset))
# print(len(valid_dataset))

# assert len(train_dataset) > len(valid_dataset)

# Assuming `hangman_dataset` is an instance of HangmanDataset
train_dataset, valid_dataset = split_hangman_dataset(hangman_dataset)

In [5]:
hangman_dataset[(10,)]

{'game_id': 1089,
 'word': 'syncarpous',
 'initial_state': ['syncarp_us'],
 'final_state': 'syncarpous',
 'guessed_states': ['syncarp_us'],
 'guessed_letters': ['o'],
 'game_state': 'nearEnd',
 'difficulty': 'medium',
 'outcome': 'lose',
 'word_length': 10,
 'won': False}

In [6]:
game_states = ['allMasked', 'early', 'quarterRevealed', 'midRevealed', 
               'midLateRevealed', 'lateRevealed', 'nearEnd']
game_state_to_idx = {state: idx for idx, state in enumerate(game_states)}

def encode_game_state(game_state):
    state_vector = [0] * len(game_states)
    state_index = game_state_to_idx.get(game_state, -1)
    if state_index >= 0:
        state_vector[state_index] = 1
    return state_vector

In [7]:
def build_enhanced_feature_set(game_data, char_frequency, \
    max_word_length, ngram_n=3, normalize=True):
    
    complete_word = game_data['final_state']
    word_length = game_data['word_length']
    current_game_state = game_data['game_state']

    # Use existing methods to encode the complete word and extract features
    encoded_complete_word = encode_word(complete_word)
    word_length_feature = [word_length / max_word_length] * word_length
    positional_feature = [pos / max_word_length for pos in range(word_length)]
    frequency_feature = [char_frequency.get(idx_to_char.get(char_idx, '_'), 0) 
                         for char_idx in encoded_complete_word]
    ngrams = extract_ngrams(complete_word, ngram_n)
    ngram_feature = encode_ngrams(ngrams, ngram_n)

    # Encode game state
    game_state_encoded = encode_game_state(current_game_state)

    # New Features
    initial_state_encoded = encode_word(game_data['initial_state'][0])  # Encoding initial state
    final_state_encoded = encode_word(game_data['final_state'])  # Encoding final state
    guessed_letters_encoded = [char_to_idx[char] for char in game_data['guessed_letters']]  # Encoding guessed letters
    difficulty_encoded = [0 if game_data['difficulty'] == 'medium' else 1]  # Encoding difficulty

    # Normalize and pad/truncate new features
    guessed_letters_padded = pad_tensor(torch.tensor(guessed_letters_encoded, dtype=torch.long), max_word_length)
    initial_state_padded = pad_tensor(torch.tensor(initial_state_encoded, dtype=torch.long), max_word_length)
    final_state_padded = pad_tensor(torch.tensor(final_state_encoded, dtype=torch.long), max_word_length)
    
    # Ensure ngram_feature is the same length as other features
    ngram_feature_padded = pad_tensor(torch.tensor(ngram_feature, dtype=torch.float), max_word_length)

    # Combine all features
    combined_features = [
        pad_tensor(torch.tensor(encoded_complete_word, dtype=torch.long), max_word_length),
        pad_tensor(torch.tensor(word_length_feature, dtype=torch.float), max_word_length),
        pad_tensor(torch.tensor(positional_feature, dtype=torch.float), max_word_length),
        pad_tensor(torch.tensor(frequency_feature, dtype=torch.float), max_word_length),
        ngram_feature_padded,
        guessed_letters_padded,
        initial_state_padded,
        final_state_padded,
        pad_tensor(torch.tensor(difficulty_encoded, dtype=torch.float), max_word_length),
        pad_tensor(torch.tensor(game_state_encoded, dtype=torch.float), max_word_length)
    ]

    # Stack features
    features_stacked = torch.stack(combined_features, dim=1)

    # Calculate missed characters feature
    missed_characters_feature = get_missed_characters(complete_word)

    return features_stacked, missed_characters_feature


In [8]:
def process_batch(batch, char_frequency, \
                    max_word_length, ngram_n=3, normalize=True):
    batch_features_list = []
    batch_missed_chars_list = []

    for i in range(len(batch['word_length'])):
        # Construct a sample dictionary for each game
        sample = {
            'word': batch['final_state'][i],
            'word_length': batch['word_length'][i],
            'game_state': batch['game_state'][i],
            'initial_state': batch['initial_state'][i][0],  # Taking the first item from the list
            'final_state': batch['final_state'][i],
            'guessed_letters': batch['guessed_letters'][i],
            'difficulty': batch['difficulty'][i]
        }

        features, missed_chars = build_enhanced_feature_set(sample, \
            char_frequency, max_word_length, ngram_n, normalize)

        # Append the results to the lists
        batch_features_list.append(features)
        batch_missed_chars_list.append(missed_chars)

    # Stack the lists of tensors to create batch tensors
    batch_features = torch.stack(batch_features_list)
    batch_missed_chars = torch.stack(batch_missed_chars_list)

    return batch_features, batch_missed_chars

In [9]:
sample = hangman_dataset[(29,)]

In [10]:
from scr.data_module import *

# Initialize Data Module
initial_batch_size = 1  # Set your initial batch size

# Initialize Data Module with the required arguments
data_module = HangmanDataModule(train_dataset, valid_dataset, 
                                initial_batch_size, 
                                new_custom_collate_fn)
                                # performance_metrics=None)

In [11]:
for batch in data_module.train_dataloader():
    # print(batch)

    batch_features, batch_missed_chars = process_batch(batch, \
        char_frequency, max_word_length)

    print(batch_features.shape)
    print(batch_missed_chars.shape)

    break

Normal sampler kicked...


torch.Size([1, 29, 10])
torch.Size([1, 28])


In [12]:
batch_features.shape

torch.Size([1, 29, 10])

In [13]:
batch_missed_chars.shape

torch.Size([1, 28])

In [14]:
features, missed_chars = build_enhanced_feature_set(sample, \
    char_frequency, max_word_length)

In [15]:
features.shape

torch.Size([29, 10])

In [16]:
STOP

NameError: name 'STOP' is not defined

In [None]:
import torch
import torch.nn as nn

# Sample dimensions
missed_char_dim = 28  # Dimension of missed character vector
hidden_dim = 10       # Arbitrary hidden dimension for output

class TestModel(nn.Module):
    def __init__(self):
        super(TestModel, self).__init__()
        self.miss_linear = nn.Linear(missed_char_dim, hidden_dim)

    def forward(self, missed_chars):
        missed_chars_processed = self.miss_linear(missed_chars)
        return missed_chars_processed

# Create model
model = TestModel()

# Sample data: a batch of size 1 with 28 missed character indicators
# Creating a sample input with 28 values, each being 0 or 1
missed_chars = torch.tensor([0, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, \
    1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1], dtype=torch.float)

# Forward pass
output = model(missed_chars)
output.shape

In [None]:
def calculate_difficulty_score(metrics):
    # Extracting the metrics
    win_rate = metrics.get('performance_wins', 0)
    avg_attempts = metrics.get('performance_total_attempts_used', 0)
    miss_penalty = metrics.get('miss_penalty_avg', 0)

    # Weights for each metric (these can be adjusted)
    weight_win_rate = 1.0   # Higher weight as win rate is a strong indicator of difficulty
    weight_avg_attempts = 0.5  # Moderate weight
    weight_miss_penalty = 0.5  # Moderate weight

    # Normalize the metrics (invert win rate as lower win rate indicates higher difficulty)
    normalized_win_rate = (100 - win_rate) / 100
    # normalized_avg_attempts = avg_attempts / 6  # Assuming max avg_attempts is 6
    normalized_miss_penalty = miss_penalty  # Already in range 0 to 1

    # Calculate the composite score
    composite_score = (
        weight_win_rate * normalized_win_rate +
        weight_miss_penalty * normalized_miss_penalty
    )

    return composite_score

# Example usage
metrics = {
    'performance_wins': 0,  # Example values
    # 'performance_total_attempts_used': 6,
    'miss_penalty_avg': 1
}

score = calculate_difficulty_score(metrics)
print("Difficulty Score:", score)

# # Use the score to determine if the word length should be added to target pairs
# if score >= 0.001:  # Define this threshold based on your game's difficulty scale
#     target_pairs.append((int(word_length),))


In [None]:
def calculate_difficulty_score(metrics):
    # Extracting the metrics
    win_rate = metrics.get('performance_wins', 0)
    miss_penalty = metrics.get('miss_penalty_avg', 0)

    # Weights for each metric
    weight_win_rate = 1.0   # Higher weight for win rate
    weight_miss_penalty = 0.5  # Weight for miss penalty

    # Normalize the metrics (invert win rate as lower win rate indicates higher difficulty)
    normalized_win_rate = (100 - win_rate) / 100
    normalized_miss_penalty = miss_penalty  # Already in range 0 to 1

    # Calculate the composite score
    composite_score = (
        weight_win_rate * normalized_win_rate +
        weight_miss_penalty * normalized_miss_penalty
    )

    return composite_score


# Best-case scenario metrics
best_case_metrics = {
    'performance_wins': 100,  # Maximum win rate
    'miss_penalty_avg': 0     # Minimum miss penalty
}

best_case_score = calculate_difficulty_score(best_case_metrics)
print("Best-Case Difficulty Score:", best_case_score)

In [None]:
# Worst-case scenario metrics
worst_case_metrics = {
    'performance_wins': 0,  # Minimum win rate
    'miss_penalty_avg': 1   # Maximum miss penalty
}

worst_case_score = calculate_difficulty_score(worst_case_metrics)
print("Worst-Case Difficulty Score:", worst_case_score)

In [None]:
def calculate_difficulty_score(metrics, weight_win_rate=1.0, weight_miss_penalty=0.5):
    """
    Calculates the difficulty score based on win rate and miss penalty.
    
    :param metrics: Dictionary containing 'performance_wins' and 'miss_penalty_avg'.
    :param weight_win_rate: Weight for the win rate metric.
    :param weight_miss_penalty: Weight for the miss penalty metric.
    :return: Calculated difficulty score.
    """
    # Extracting the metrics
    win_rate = metrics.get('performance_wins', 0)
    miss_penalty = metrics.get('miss_penalty_avg', 0)

    # Normalize the metrics (invert win rate as lower win rate indicates higher difficulty)
    normalized_win_rate = (100 - win_rate) / 100
    normalized_miss_penalty = miss_penalty  # Already in range 0 to 1

    # Calculate the composite score
    composite_score = (
        weight_win_rate * normalized_win_rate +
        weight_miss_penalty * normalized_miss_penalty
    )

    return composite_score

# Example usage with custom weights
metrics = {
    'performance_wins': 0,  # Example values
    'miss_penalty_avg': 1
}

# Custom weights
custom_weight_win_rate = 1.0
custom_weight_miss_penalty = 0.5

score = calculate_difficulty_score(metrics, custom_weight_win_rate, custom_weight_miss_penalty)
print("Difficulty Score with Custom Weights:", score)

In [None]:
aggregated_metrics = {
    5: {'total_games': 100, 'wins': 60, 'total_attempts_used': 300, 'win_rate': 0.6, \
        'average_attempts_used': 3.0, 'miss_penalty': 0.02},
    6: {'total_games': 150, 'wins': 90, 'total_attempts_used': 450, 'win_rate': 0.6, \
        'average_attempts_used': 3.0, 'miss_penalty': 0.03}
}

# Iterating over the dictionary
for word_len, metrics in aggregated_metrics.items():
    print(f"Word Length: {word_len}")
    for key, value in metrics.items():
        print(f"  {key}: {value}")

In [None]:
def select_target_pairs(performance_metrics, batch_size):
    target_pairs = []

    for word_length, metrics in performance_metrics.items():
        # print(metrics)
        win_rate = metrics.get('win_rate', 0)
        avg_attempts = metrics.get('average_attempts_used', 0)

        if win_rate <= 20 and avg_attempts >= 4:
            target_pairs.append(word_length)

    return target_pairs[:batch_size]

# Test data
performance_metrics = {
    5: {'total_games': 100, 'wins': 60, 'total_attempts_used': 300, 'win_rate': 0.6, 
        'average_attempts_used': 4.0, 'miss_penalty': 0.02},
    6: {'total_games': 150, 'wins': 90, 'total_attempts_used': 450, 'win_rate': 0.6, 
        'average_attempts_used': 4.0, 'miss_penalty': 0.03}
}

# Test the function
batch_size = 10
target_pairs = select_target_pairs(performance_metrics, batch_size)
print("Target Pairs:", target_pairs)

In [None]:
# def calculate_miss_penalty(outputs, miss_chars):
#     if outputs.numel() == 0:
#         print("Empty outputs tensor")
#         return torch.tensor(0.0, device=outputs.device)

#     miss_penalty = torch.sum(outputs * miss_chars) / outputs.numel()
#     return miss_penalty

In [None]:
from scr.feature_engineering import *

In [None]:
word = 'cat'

encoded = encode_word(word)

encoded

In [None]:
# Get missed characters tensor
miss_chars = get_missed_characters(word)

In [None]:
miss_chars

In [None]:
# Creating a dummy output tensor for the word 'cat'
word = 'cat'

# Assumptions for the dummy output
batch_size = 1  # Number of words in the batch
max_seq_len = len(word)  # Maximum sequence length (length of the word)
num_characters = 28  # Total number of characters in the vocabulary

# Creating the dummy output tensor
# For simplicity, filling it with random values between 0 and 1
outputs = torch.rand((batch_size, max_seq_len, num_characters))

outputs.shape, outputs

In [None]:
# probabilities = F.softmax(outputs, dim=-1)

# probabilities

In [None]:
# Creating two dummy output tensors for the word 'cat'
# One where all characters are accurately predicted and another where all are wrong

# Accurate Predictions: Setting the confidence for the correct characters ('c', 'a', 't') to 1
# and others to 0 for each position in the word
correct_outputs = torch.zeros((batch_size, max_seq_len, num_characters))
correct_outputs[0, 0, char_to_idx['c']] = 1  # High confidence for 'c' in the first position
correct_outputs[0, 1, char_to_idx['a']] = 1  # High confidence for 'a' in the second position
correct_outputs[0, 2, char_to_idx['t']] = 1  # High confidence for 't' in the third position

# Wrong Predictions: Setting the confidence for incorrect characters to 1 and for correct ones to 0
wrong_outputs = torch.ones((batch_size, max_seq_len, num_characters))
wrong_outputs[0, 0, char_to_idx['c']] = 0  # Zero confidence for 'c' in the first position
wrong_outputs[0, 1, char_to_idx['a']] = 0  # Zero confidence for 'a' in the second position
wrong_outputs[0, 2, char_to_idx['t']] = 0  # Zero confidence for 't' in the third position

correct_outputs.shape, wrong_outputs.shape

In [None]:
# Calculate miss penalty
miss_penalty = calculate_miss_penalty(wrong_outputs, miss_chars)
miss_penalty

##### Data Dir

In [None]:
NUM_STRATIFIED_SAMPLES = 100 # This will be overwritten by Papermill

NUM_WORD_SAMPLE = 1_000 # words for testing

FAST_DEV_RUN = False

MAX_EPOCH = 15

In [None]:
from pathlib import Path
from scr.custom_sampler import *

# Define the base directory and the paths for training and validation parquet files
base_dataset_dir = Path("/media/sayem/510B93E12554BBD1/dataset/")
stratified_samples_dir = base_dataset_dir / str(NUM_STRATIFIED_SAMPLES)
parquet_path = stratified_samples_dir / 'parquets'

# Create directories for train and validation parquets if they don't exist
parquet_path.mkdir(parents=True, exist_ok=True)
# parquet_valid_path.mkdir(parents=True, exist_ok=True)

# Define and create the directory for models
models_dir = Path("/home/sayem/Desktop/Hangman/models")
models_dir.mkdir(parents=True, exist_ok=True)

# Define your output directory
# Define your output directory and logger directory
output_dir = Path("/home/sayem/Desktop/Hangman/training_outputs")
logger_dir = output_dir / "lightning_logs"

# Create the output and logger directories if they don't exist
output_dir.mkdir(parents=True, exist_ok=True)
logger_dir.mkdir(parents=True, exist_ok=True)

# Define the file path for saving the testing words
testing_words_file_path = stratified_samples_dir / "testing_words.txt"

try:
    testing_word_list = read_words(testing_words_file_path)
    print(f"Length of the testing word list: {len(testing_word_list)}")
    sampled_test_words = stratified_sample_by_length_and_uniqueness(testing_word_list, \
        NUM_WORD_SAMPLE)
    print(f"Sampled {len(sampled_test_words)} unique words for testing.")
except FileNotFoundError:
    print(f"File not found: {testing_words_file_path}")

print(len(sampled_test_words))

##### Dataset Loading

In [None]:
# Create datasets directly from the saved parquet files
hangman_dataset = HangmanDataset(parquet_path)
# valid_dataset = HangmanDataset(parquet_valid_path)

from scr.utils import *

# Assuming `hangman_dataset` is an instance of HangmanDataset
# Usage
train_dataset, valid_dataset \
    = split_hangman_dataset(hangman_dataset, 0.8)

print(len(train_dataset))
print(len(valid_dataset))

assert len(train_dataset) > len(valid_dataset)

In [None]:
hangman_dataset[(29,)]

In [None]:
train_dataset[(29,)]

In [None]:
train_dataset.get_all_group_labels()

In [None]:
len(train_dataset)

In [None]:
# Request a sample with word length 29
word_length = 5
sample = hangman_dataset[(word_length,)]
sample

In [None]:
sample

In [None]:
from scr.data_module import *
from scr.dataset import *

# Initialize Data Module
initial_batch_size = 128  # Set your initial batch size

# Initialize Data Module with the required arguments
data_module = HangmanDataModule(train_dataset, valid_dataset, 
                                initial_batch_size, 
                                custom_collate_fn)

In [None]:
train_loader = data_module.train_dataloader()

In [None]:
dummy_performance_metrics = {
    
    3: {'win_rate': 45, 'average_attempts_used': 5},
    4: {'win_rate': 60, 'average_attempts_used': 3},  # This won't be selected due to high win rate
    5: {'win_rate': 30, 'average_attempts_used': 6},
    6: {'win_rate': 48, 'average_attempts_used': 4},
    # ... add more dummy metrics as needed ...
}

In [None]:
# Initialize PerformanceBasedSampler
sampler = PerformanceBasedSampler(dataset=hangman_dataset, 
                                  performance_metrics=dummy_performance_metrics, 
                                  batch_size=10)

# Verify target pairs
print("Target pairs:", sampler.target_pairs)

In [None]:
next(iter(sampler))

In [None]:
train_dataset[(1,)]