In [1]:
import numpy as np
import pandas as pd
from scipy import sparse as sparse
from scipy.sparse import linalg as splinalg
from sys import argv
import matplotlib.pyplot as plt
import pickle as pl
import warnings
from scipy.integrate import odeint, solve_ivp
warnings.filterwarnings("ignore")
import collections
import os
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.nn  import functional as F
#use gpu if available
device = torch.device('mps' if torch.backends.mps.is_available() else "cpu")
print(device)

mps


## Encode Decode Functions

In [2]:
chars = [' ', '_', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', ',']
# create a mapping from characters to integers and vice versa

stoi = {ch:i for i, ch in enumerate(chars)} # abbrev for string to integer
itos = {i:ch for ch, i in stoi.items()}

# encoder: takes a string and returns a list of integers
encode = lambda s: [stoi[c] for c in s]

# decoder: takes a list of integers and returns a string
decode = lambda l: ''.join([itos[i] for i in l])

#unit test
print(encode('hi_there,'.ljust(40)))
print(decode(encode('hi_there')))
print(stoi)

[9, 10, 1, 21, 9, 6, 19, 6, 28, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
hi_there
{' ': 0, '_': 1, 'a': 2, 'b': 3, 'c': 4, 'd': 5, 'e': 6, 'f': 7, 'g': 8, 'h': 9, 'i': 10, 'j': 11, 'k': 12, 'l': 13, 'm': 14, 'n': 15, 'o': 16, 'p': 17, 'q': 18, 'r': 19, 's': 20, 't': 21, 'u': 22, 'v': 23, 'w': 24, 'x': 25, 'y': 26, 'z': 27, ',': 28}


## Model Class

In [3]:
class Head(nn.Module):
    """Description: decoder only scaled dot product attention"""

    def __init__(self, n_embd, n_head):
        super().__init__()
        """
        Description:
            Initializes various layers used in the forward function
            Note: runs when an object of Head is created
        Input:
            n_embd (C): The embedding dimension
            n_head (d): Hidden dimension aof the single head, i.e., key.shape = query.shape = value.shape = (n_embd, head_size)
        """
        # use linear layers to define key, query and value matrices
        # bias is usually not used in nn.Linear in the attention blocks; use bias = False
        self.key = nn.Linear(n_embd, n_head, bias = False, device = device)
        self.query = nn.Linear(n_embd, n_head, bias = False, device = device)
        self.value = nn.Linear(n_embd, n_head, bias = False, device = device)


    def forward(self, x):
        """
        Description:
            Forward pass of the attention head
        Input:
            x of shape (B, T, C)
        Output:
            z of shape (B, T, d)
        """
        x=x.to(device)
        B, T, C = x.shape # B: batch size; T: block_size; C: embedding dim
        K = self.key(x)    # (B, T, d)
        Q = self.query(x)  # (B, T, d)
        # compute scaled dot product attention scores w = q @ k / sqrt(d)
        W = torch.einsum('btd,bud->btu', Q, K) # (B, T, d) @ (B, T, d) -> (B, T, T)
        # Mask out the attention scores such that the padded values are ignored
        mask = torch.ones((B, T), device = device).masked_fill(torch.sum(x, axis = -1)==0, float('-inf')) # (B, T, T)
        
        # apply a softmax along the last dim;
        # use nn.functional.softmax;
        # Note: nn.functional is imported as F
        W = F.softmax(W + mask[:, None, :], dim=2) # (B, T, T)
#         print(W[:, 0,:])
        # perform the weighted aggregation of the values: V = v @ x
        V = self.value(x) # (B, T, d)
        # output w @ v
        out = W @ V # (B, T, T) @ (B, T, d) -> (B, T, d)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """
    def __init__(self, num_heads, head_size, embd_size):
        super().__init__()
        """
        Description: Performs multi-head attention followed by a projection
        Input:
            num_heads: number of attention heads
            head_size (d): size of each attention head
        """
        self.heads = nn.ModuleList([Head(embd_size, head_size) for i in range(num_heads)]) # Define n_heads copies of Head() of size head_size as a list
        self.proj = nn.Linear(num_heads*head_size, embd_size, bias = False) # projection layer using nn.Linear

    def forward(self, x):
        """"
        Description: Forward pass of multi head attention
        Input: x of shape (B, T, C)
        """
        multihead_out = []
        for i, l in enumerate(self.heads):
            multihead_out.append(l(x))
        out = torch.cat(multihead_out, dim = 2)

        #project the output using a linear
        out = self.proj(out)
        return out

class FeedFoward(nn.Module):
    """ A simple one hidden layer ReLU block followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.layer1 = nn.Linear(n_embd, 4*n_embd)
        self.layer2 = nn.Linear(4*n_embd, n_embd)
        return
        """
        Description: Linear -> ReLU -> Linear
        """

    def forward(self, x):
        "Forward pass of the network"
        x = F.relu(self.layer1(x))
        x = self.layer2(x)
        return x

class Block(nn.Module):
    """ Transformer block: Multi head attention followed by feed forward followed by LayerNorm """

    def __init__(self, n_embd, n_head, head_size = 100):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        """
        Description: Transformer block
        Input:
            n_embd: embedding dimension of the input
            n_head: number of attention heads
        """
        head_size = n_embd // n_head
        self.L1 = nn.LayerNorm(n_embd)
        self.L2 = MultiHeadAttention(n_head, head_size, n_embd)
        self.L3 = FeedFoward(n_embd)
        return

    def forward(self, x):
        "Forward pass of the attention block"
        residual = x
        x = self.L2(x)
        x = x + residual
        residual = x
        x = self.L3(x)
        x = x + residual
        return x
def get_rotary_embedding(dim, seq_len, device):
    """Computes rotary position embeddings using torch built-in functions."""
    theta = 10000.0 ** (-torch.arange(0, dim, 2, dtype=torch.float32, device=device) / dim)
    seq_idx = torch.arange(seq_len, device=device, dtype=torch.float32).unsqueeze(1)
    emb = torch.matmul(seq_idx, theta.unsqueeze(0))
    return torch.cat([emb.sin(), emb.cos()], dim=-1)

def apply_rotary_embedding(x, rotary_emb):
    """Applies rotary embedding using PyTorch operations."""
    x_reshaped = x.view(*x.shape[:-1], x.shape[-1] // 2, 2)
    cos_emb, sin_emb = rotary_emb.chunk(2, dim=-1)
    x1, x2 = x_reshaped.unbind(-1)
    x_rot = torch.stack((x1 * cos_emb - x2 * sin_emb, x1 * sin_emb + x2 * cos_emb), dim=-1)
    return x_rot.flatten(-2)

class Transformer(nn.Module):
    """
    Description: Decoder only transformer model"""
    def __init__(self, vocab_size, block_size, n_embd, n_layers, n_head):
        super().__init__()
        """
        Description: Decoder only transformer model
        Input:
            vocab_size (V) : Vocabulary dimension
            block_size (T): Context length
            n_embd (C): Embedding dimension
            n_layers: number of layers of Transformer blocks
            n_head : number of heads in multi-head attention
        """
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd, padding_idx = 0) 
        self.blocks = nn.Sequential(*[Block(n_embd, n_head) for itre in range(n_layers)])
        self.layer_norm = nn.LayerNorm(n_embd) # Layer norm in the embedding dimension
        self.linear_head = nn.Linear(n_embd, vocab_size) # (C -> V)

    ### DO NOT MODIFY BEYOND THIS
    def forward(self, idx, targets=None):
        """
        Description: Forward pass of transformer
        Inputs:
            idx: The tokenized input sequence
            targets (optional): the tokenized target sequence
        """
        B, T = idx.shape
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        _, _, C = tok_emb.shape
        rotary_emb = get_rotary_embedding(C, block_size, device)
        x = apply_rotary_embedding(tok_emb, rotary_emb).masked_fill(tok_emb==0, 0)
        x = self.blocks(x) # (B,T,C)
        x = self.layer_norm(x) # (B,T,C)
        logits = self.linear_head(x) # (B,T, V)

        # computes loss if targets are provided
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.masked_fill(idx > 1 , 0)
            targets = targets.view(B*T).type(torch.LongTensor).to(device)
            
            loss = F.cross_entropy(logits, targets, ignore_index = 0)

        return logits, loss

## Model Parameters

In [4]:
# hyperparameters
vocab_size = 29
block_size = 40 # maximum context length
n_embd = 127
n_head = 16
n_layers = 5
dropout = 0.0

## Load Model 

In [12]:
state = torch.load('model_127_16_5_epoch_150')
m = Transformer(vocab_size, block_size, n_embd+1, n_layers, n_head)
m.load_state_dict(state)
m = m.to(device)

## Guess Function

In [13]:
m.eval()
def guess(model, word, temperature=2):
    idx = torch.tensor(np.reshape(encode(word.ljust(40)), (1, -1))).to(device)
    logits, loss = model(idx)
    logits = logits/temperature
    mask = idx[:,:,None]!=1
    probs = F.softmax(logits, dim = -1).masked_fill(idx[:,:,None]!=1, 0)
    max_probs = np.sum(probs[0].to('cpu').detach().numpy(), axis = 0)
    return decode(list(np.argsort(-max_probs)))

def guess_topk(model, word):
    idx = torch.tensor(np.reshape(encode(word.ljust(40)), (1, -1))).to(device)
    logits, loss = model(idx)
    # numMasked = 0
    # for itr in idx[0]: # for all non masked letters make the probability of guessing it zero
    #     if itr not in [0, 1]:
    #         logits[:, :, itr] = logits[:, :, itr] - torch.inf
    #     elif itr==1:
    #         numMasked += 1
    # if numMasked == 0: return
    probs = F.softmax(logits, dim = -1).masked_fill(idx[:,:,None]!=1, 0)[0]
    
    
    # now pick top k letters from each position and guess the one that's most common or that has the highest probability
    values, indices = torch.topk(probs, k = 3, dim = 1) #finds top k elements across each position
    mask = torch.zeros(probs.shape[0], probs.shape[1]).to(device)
    mask[indices] = 1
    probs.masked_fill(mask==0, 0)
    probs = torch.nan_to_num(probs/torch.sum(probs, dim = 1, keepdim = True), nan = 0)
    # print(indices[:3], probs[indices][:3], values[:3])

    max_probs = np.sum(probs.to('cpu').detach().numpy(), axis = 0)
    return decode(list(np.argsort(-max_probs)))

## Server and Player

In [14]:
class HangmanPlayer:
    def __init__(self, model):
        self.guessed_letters = []
        self.model = model
        self.no_letter = True
    def guess(self, question):
        question = question.replace('#', '_')
        print(question)
        # return guess(model, question)[0]
        guesses = guess(self.model, question)
        for itr in guesses:
            if itr not in self.guessed_letters:
                self.guessed_letters.append(itr)
                return itr
        return 'a'
    def new_game(self):
        self.guessed_letters = []
        self.no_letter = True

In [15]:

class HangmanServer:
    def __init__(self, player):
        self.player = player
        self.test_words = []

    @staticmethod
    def read_test_words():
        with open('../words_alpha_train_unique.txt') as f:
            words = f.read().split('\n')

        return words[:-1]

    @staticmethod
    def data_iter(words):
        for word in words:
            answer = word.split(',')[1]
            
            question = '#' * len(answer)
            yield question, answer

    def run(self):
        test_words = self.read_test_words()
        np.random.shuffle(test_words)
        test_words = test_words[:20]
        qa_pair = self.data_iter(test_words)
        success = total = 0
        success_rate = 0
        print(f"Total Game Number: {len(test_words)}")
        for question, answer in qa_pair:
            self.player.new_game()
            tries = 6
            success_rate = 0 if total == 0 else success / total
            print("=" * 20, "Game %d" % (total + 1), '=' * 20, "Success Rate: %.2f" % success_rate)
            # if (total + 1) % 100 == 0:
            #     print(total + 1)
            print('provided question: ', " ".join(question))
            guessed_letters = []
            while '#' in question and tries > 0:
                guess = self.player.guess(question + "," + "".join(guessed_letters))
                flag = 1
                question_lst = []
                for q_l, a_l in zip(question, answer):
                    if q_l == '#':
                        if a_l == guess:
                            question_lst.append(a_l)
                            flag = 0
                        else:
                            question_lst.append(q_l)
                    else:
                        question_lst.append(q_l)
                if flag:
                    guessed_letters.append(guess)
                question = "".join(question_lst)
                # print(question)
                if guess not in answer:
                    tries -= 1
                print("provided question: ", " ".join(question), "your guess: %s" % guess, "left tries: %d" % tries, 'answer: %s' % answer)

            if '#' not in question:
                success += 1
            total += 1

        print(f"{success} success out of {total} tries, rate: {success / total:.4f}")
        return(success / total)


player = HangmanPlayer(m)
server = HangmanServer(player)


## 
Evaluation

In [16]:
device = 'mps'
win_rate = server.run()
print(win_rate)

Total Game Number: 20
provided question:  # # # # # # # # # # # # # #
______________,
provided question:  # # e # # # # # # # # # # # your guess: e left tries: 6 answer: preinclination
__e___________,
provided question:  # # e i # # # i # # # i # # your guess: i left tries: 6 answer: preinclination
__ei___i___i__,
provided question:  # # e i n # # i n # # i # n your guess: n left tries: 6 answer: preinclination
__ein__in__i_n,
provided question:  # # e i n # # i n a # i # n your guess: a left tries: 6 answer: preinclination
__ein__ina_i_n,
provided question:  # # e i n # # i n a # i o n your guess: o left tries: 6 answer: preinclination
__ein__ina_ion,
provided question:  # # e i n # # i n a t i o n your guess: t left tries: 6 answer: preinclination
__ein__ination,
provided question:  p # e i n # # i n a t i o n your guess: p left tries: 6 answer: preinclination
p_ein__ination,
provided question:  p r e i n # # i n a t i o n your guess: r left tries: 6 answer: preinclination
prein__ina