In [59]:
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
import os


class generic_model(nn.Module):
    """
    contains basic functions for storing and loading a model
    """

    def __init__(self, config):

        super(generic_model, self).__init__()

        self.config_file = config

    def loss(self, predicted, truth):

        return self.loss_func(predicted, truth)
    
    def save_model(self, is_best, epoch, train_loss, test_loss, rnn_name, layers, hidden_dim):

        base_path = self.config_file['models']
        if is_best:
            filename = os.path.join(base_path, 'best_' + '_'.join([rnn_name, str(layers), str(hidden_dim)]) + '.pth')
        else:
            filename = os.path.join(base_path, str(epoch) + '_' + '_'.join([rnn_name, str(layers), str(hidden_dim)]) + '.pth')

        torch.save({
            'epoch': epoch,
            'model_state_dict': self.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'train_loss': train_loss,
            'test_loss': test_loss,
        }, filename)

        print("Saved model")

    def load_model(self, mode, rnn_name, layers, hidden_dim, epoch=None):

        if mode == 'test' or mode == 'test_one':

            try:
                if epoch is None:
                    filename = os.path.join(self.config_file['models'], 'best_' + '_'.join(
                        [rnn_name, str(layers), str(hidden_dim)]) + '.pth')
                else:
                    filename = os.path.join(self.config_file['models'], str(epoch) + '_' + '_'.join(
                        [rnn_name, str(layers), str(hidden_dim)]) + '.pth')
                print(filename)

                checkpoint = torch.load(filename, map_location=lambda storage, loc: storage)
                # load model parameters
                self.load_state_dict(checkpoint['model_state_dict'])
                self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
                print("Loaded pretrained model from:", filename)

            except:
                print("Couldn't find model for testing")
                exit(0)

        else:
      
            if epoch is not None:
                filename = self.config_file['models'] + str(epoch) + '_' + '_'.join(
                    [rnn_name, str(layers), str(hidden_dim)]) + '.pth'
            else:
                directory = [x.split('_') for x in os.listdir(self.config_file['models'])]
                to_check = []
                for poss in directory:
                    try:
                        to_check.append(int(poss[0]))
                    except:
                        continue

                if len(to_check) == 0:
                    print("No pretrained model found")
                    return 0, [], []
       
                filename = os.path.join(self.config_file['models'], str(max(to_check)) + '_' + '_'.join(
                    [rnn_name, str(layers), str(hidden_dim)]) + '.pth')


            checkpoint = torch.load(filename, map_location=lambda storage, loc: storage)
            self.load_state_dict(checkpoint['model_state_dict'])
            self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

            print("Loaded pretrained model from:", filename)

            return checkpoint['epoch'], checkpoint['train_loss'], checkpoint['test_loss']


class RNN(generic_model):

    def __init__(self, config):

        super(RNN, self).__init__(config)


        self.rnn_name = config['rnn']
        self.input_dim = config['vocab_size'] + 1
        self.hidden_dim = config['hidden_dim']
        self.num_layers = config['num_layers']
        self.embed_dim = config['embedding_dim']
        self.output_dim = config['vocab_size']

        if config['use_embedding']:
            self.use_embedding = True
            self.embedding = nn.Embedding(self.input_dim, self.embed_dim)
        else:
            self.use_embedding = False

 
        if self.rnn_name == 'Transformer':
            in_features = self.embed_dim +config['miss_linear_dim']
        else:
            in_features = config['miss_linear_dim'] + self.hidden_dim*2
        mid_features = config['output_mid_features']
        self.linear1_out = nn.Linear(in_features, mid_features)
        self.relu = nn.ReLU()
        self.linear2_out = nn.Linear(mid_features, self.output_dim)


        self.miss_linear = nn.Linear(config['vocab_size'], config['miss_linear_dim'])

        if self.rnn_name == 'LSTM':
            self.encoder = nn.LSTM(input_size=self.embed_dim if self.use_embedding else self.input_dim, hidden_size=self.hidden_dim, num_layers=self.num_layers,
                               dropout=config['dropout'],
                               bidirectional=True, batch_first=True)
        elif self.rnn_name == 'GRU':
            self.encoder = nn.GRU(input_size=self.embed_dim if self.use_embedding else self.input_dim, hidden_size=self.hidden_dim, num_layers=self.num_layers,
                              dropout=config['dropout'],
                              bidirectional=True, batch_first=True)
        elif self.rnn_name == 'Transformer':
            self.cls_token = nn.Parameter(torch.randn(1, 1, self.embed_dim), requires_grad=True)
            encoder_layer = nn.TransformerEncoderLayer(d_model=self.embed_dim, nhead=8, dropout=config['dropout'], batch_first=True)
            self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=self.num_layers)

        self.optimizer = optim.Adam(self.parameters(), lr=config['lr'])

    def forward(self, x, x_lens, miss_chars):
        """
        Forward pass through RNN
        :param x: input tensor of shape (batch size, max sequence length, input_dim)
        :param x_lens: actual lengths of each sequence < max sequence length (since padded with zeros)
        :param miss_chars: tensor of length batch_size x vocab size. 1 at index i indicates that ith character is NOT present
        :return: tensor of shape (batch size, max sequence length, output dim)
        """
        if self.use_embedding:
            x = self.embedding(x)

        batch_size, seq_len, _ = x.size()
        if self.rnn_name != 'Transformer':
            x = torch.nn.utils.rnn.pack_padded_sequence(x, x_lens.cpu(), batch_first=True, enforce_sorted=False)

        if self.rnn_name == 'LSTM':
            output, (hidden, _) = self.encoder(x)
        elif self.rnn_name == 'GRU':
            output, hidden = self.encoder(x)
        elif self.rnn_name == 'Transformer':
            x = torch.cat((self.cls_token.repeat(batch_size, 1, 1), x), dim=1)
            hidden = self.encoder(x)
            hidden = hidden[:, 0, :]

        if self.rnn_name != 'Transformer':
            hidden = hidden.view(self.num_layers, 2, -1, self.hidden_dim)
            hidden = hidden[-1]
            hidden = hidden.permute(1, 0, 2)
            hidden = hidden.contiguous().view(hidden.shape[0], -1)


        miss_chars = self.miss_linear(miss_chars)

        concatenated = torch.cat((hidden, miss_chars), dim=1)
    
        return self.linear2_out(self.relu(self.linear1_out(concatenated)))

    def calculate_loss(self, model_out, labels, input_lens, miss_chars, use_cuda):
        """
        :param model_out: tensor of shape (batch size, max sequence length, output dim) from forward pass
        :param labels: tensor of shape (batch size, vocab_size). 1 at index i indicates that ith character should be predicted
        :param: miss_chars: tensor of length batch_size x vocab size. 1 at index i indicates that ith character is NOT present
                            passed here to check if model's output probability of missed_chars is decreasing
        """
        outputs = nn.functional.log_softmax(model_out, dim=1)

        miss_penalty = torch.sum(outputs*miss_chars, dim=(0,1))/outputs.shape[0]

        input_lens = input_lens.float()
  
        weights_orig = (1/input_lens)/torch.sum(1/input_lens).unsqueeze(-1)
        weights = torch.zeros((weights_orig.shape[0], 1))

        weights[:, 0] = weights_orig

        if use_cuda:
            weights = weights.cuda()

        loss_func = nn.BCEWithLogitsLoss(weight=weights, reduction='sum')
        actual_penalty = loss_func(model_out, labels)
        return actual_penalty, miss_penalty


In [60]:
import numpy as np
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os
import pickle
import yaml

np.random.seed(7)

extra_vocab = 1

def filter_and_encode(word, vocab_size, min_len, char_to_id):
    """
    checks if word length is greater than threshold and returns one-hot encoded array along with character sets
    :param word: word string
    :param vocab_size: size of vocabulary (26 in this case)
    :param min_len: word with length less than this is not added to the dataset
    :param char_to_id
    """

    word = word.strip().lower()
    if len(word) < min_len:
        return None, None, None

    encoding = np.zeros((len(word), vocab_size + extra_vocab))

    chars = {k: [] for k in range(vocab_size)}

    for i, c in enumerate(word):
        idx = char_to_id[c]

        chars[idx].append(i)

        encoding[i][idx] = 1

    return encoding, [x for x in chars.values() if len(x)], set(list(word))


def batchify_words(batch, vocab_size, using_embedding):
    """
    converts a list of words into a batch by padding them to a fixed length array
    :param batch: a list of words encoded using filter_and_encode function
    :param: size of vocabulary (26 in our case)
    :param: use_embedding: if True,
    """

    total_seq = len(batch)
    if using_embedding:

        max_len = max([len(x) for x in batch])
        final_batch = []

        for word in batch:
            if max_len != len(word):
   
                zero_vec = vocab_size*np.ones((max_len - word.shape[0]))
                word = np.concatenate((word, zero_vec), axis=0)
            final_batch.append(word)

        return np.array(final_batch)
    else:
        max_len = max([x.shape[0] for x in batch])
        final_batch = []

        for word in batch:

            if max_len != word.shape[0]:
                zero_vec = np.zeros((max_len - word.shape[0], vocab_size + extra_vocab))
                word = np.concatenate((word, zero_vec), axis=0)
            final_batch.append(word)

        return np.array(final_batch)


def encoded_to_string(encoded, target, missed, encoded_len, char_to_id, use_embedding):
    """
    convert an encoded input-output pair back into a string so that we can observe the input into the model
    encoded: array of dimensions padded_word_length x vocab_size
    target: 1 x vocab_size array with 1s at indices wherever character is present
    missed: 1 x vocav_size array with 1s at indices wherever a character which is NOT in the word, is present
    encoded_len: length of word. Needed to retrieve the original word from the padded word
    char_to_id: dict which maps characters to ids
    use_embedding: if character embeddings are used
    """

    id_to_char = {v:k for k, v in char_to_id.items()}

    if use_embedding:
        word = [id_to_char[x] if x < len(char_to_id) - 1 else '*' for x in list(encoded[:encoded_len])]
    else:
        word = [id_to_char[x] if x < len(char_to_id) - 1 else '*' for x in list(np.argmax(encoded[:encoded_len, :], axis=1))]

    word = ''.join(word)
    target = [id_to_char[x] for x in list(np.where(target != 0)[0])]
    missed = [id_to_char[x] for x in list(np.where(missed != 0)[0])]
    print("Word, target and missed characters:", word, target, missed)



class WordDataset(Dataset):
    def __init__(self, mode, config):
        self.mode = mode
        self.vocab_size = config['vocab_size']
        self.blank_vec = np.zeros((1, self.vocab_size + extra_vocab))
        self.blank_vec[0, self.vocab_size] = 1
        self.cur_epoch = 0
        self.total_epochs = config['epochs']

        self.char_to_id = {chr(97+x): x for x in range(self.vocab_size)}
        self.char_to_id['BLANK'] = self.vocab_size
        self.id_to_char = {v:k for k, v in self.char_to_id.items()}

        self.drop_uniform = config['drop_uniform']
        self.use_embedding = config['use_embedding']
        self.min_len = config['min_len']

        if mode == 'train':
            filename = config['dataset'] + "train_set_90.txt"
        else:
            filename = config['dataset'] + "test_set_10.txt"

        pkl_path = config['pickle'] + mode + '_input_dump.pkl'
        if os.path.exists(pkl_path):
            with open(pkl_path, 'rb') as f:
                self.final_encoded = pickle.load(f)
        else:
            corpus = []
            with open(filename, 'r') as f:
                corpus = f.readlines()

            self.final_encoded = []
            for i, word in enumerate(corpus):
                encoding, unique_pos, chars = filter_and_encode(word, self.vocab_size, self.min_len, self.char_to_id)
                if encoding is not None:
                    self.final_encoded.append((encoding, unique_pos, chars))

            with open(pkl_path, 'wb') as f:
                pickle.dump(self.final_encoded, f)

        print("Length of " + mode + " dataset:", len(self.final_encoded))

    def update_epoch(self, epoch):
        self.cur_epoch = epoch

    def __len__(self):
        return len(self.final_encoded)

    def __getitem__(self, idx):
        word, unique_pos, chars = self.final_encoded[idx]

        all_chars = list(self.char_to_id.keys())
        all_chars.remove('BLANK')
        all_chars = set(all_chars)

        drop_prob = 1/(1+np.exp(-self.cur_epoch/self.total_epochs))
        num_to_drop = np.random.binomial(len(unique_pos), drop_prob)
        if num_to_drop == 0:
            num_to_drop = 1

        if self.drop_uniform:
            to_drop = np.random.choice(len(unique_pos), num_to_drop, replace=False)
        else:
            prob = [1/len(x) for x in unique_pos]
            prob_norm = [x/sum(prob) for x in prob]
            to_drop = np.random.choice(len(unique_pos), num_to_drop, p=prob_norm, replace=False)

        drop_idx = []
        for char_group in to_drop:
            drop_idx += unique_pos[char_group]

        target = np.clip(np.sum(word[drop_idx], axis=0), 0, 1)
        assert(target[self.vocab_size] == 0)
        target = target[:-1]

        input_vec = np.copy(word)
        input_vec[drop_idx] = self.blank_vec

        if self.use_embedding:
            input_vec = np.argmax(input_vec, axis=1)

        not_present = np.array(sorted(list(all_chars - chars)))
        num_misses = np.random.randint(0, 10)
        miss_chars = np.random.choice(not_present, num_misses)
        miss_chars = list(set([self.char_to_id[x] for x in miss_chars]))

        miss_vec = np.zeros((self.vocab_size))
        miss_vec[miss_chars] = 1

        return input_vec, target, miss_vec


class WordDataLoader(DataLoader):
    def __init__(self, mode, config):
        self.dataset = WordDataset(mode, config)

        collate_fn = lambda batch: WordDataLoader.collate_fn(batch, config['vocab_size'], config['use_embedding'])
        super(WordDataLoader, self).__init__(self.dataset, batch_size=config['batch_size'], shuffle=True, num_workers=config['num_workers'], collate_fn=collate_fn)

    def update_dataset(self, epoch):
        self.dataset.update_epoch(epoch)

    @staticmethod
    def collate_fn(batch, vocab_size, use_embedding):
        lens = np.array([len(x[0]) for x in batch])
        inputs = batchify_words([x[0] for x in batch], vocab_size, use_embedding)
        labels = np.array([x[1] for x in batch])
        miss_chars = np.array([x[2] for x in batch])
        return inputs, labels, miss_chars, lens

In [61]:
"""
The main driver file responsible for training, testing and predicting
"""

import torch
import yaml
import matplotlib.pyplot as plt
import numpy as np
import os
import torch.nn as nn
from tqdm import tqdm


class dl_model():

	def __init__(self, mode):

		with open("/home/leon/Documents/new_trex/config.yaml", 'r') as stream:
			try:
				self.config = yaml.safe_load(stream)
			except yaml.YAMLError as exc:
				print(exc)
		self.mode = mode

		feature_dim = self.config['vocab_size']
		self.arch_name = '_'.join(
			[self.config['rnn'], str(self.config['num_layers']), str(self.config['hidden_dim']), str(feature_dim)])

		print("Architecture:", self.arch_name)

		self.config['models'] = os.path.join(self.config['models'], self.arch_name)
		self.config['plots'] = os.path.join(self.config['plots'], self.arch_name)


		if not os.path.exists(self.config['models']):
			os.mkdir(self.config['models'])
		if not os.path.exists(self.config['plots']):
			os.mkdir(self.config['plots'])
		if not os.path.exists(self.config['pickle']):
			os.mkdir(self.config['pickle'])

		self.cuda = (self.config['cuda'] and torch.cuda.is_available())

		if mode == 'train' or mode == 'test':

			self.plots_dir = self.config['plots']
	
			self.total_epochs = self.config['epochs']
			self.test_every = self.config['test_every_epoch']
			self.test_per = self.config['test_per_epoch']
			self.print_per = self.config['print_per_epoch']
			self.save_every = self.config['save_every']
			self.plot_every = self.config['plot_every']

			self.train_loader = WordDataLoader('train', self.config)
			self.test_loader = WordDataLoader('test', self.config)
	
			self.model = RNN(self.config)

			self.start_epoch = 1
			self.edit_dist = []
			self.train_losses, self.test_losses = [], []

		else:

			self.model = RNN(self.config)

		if self.cuda:
			self.model.cuda()

		if self.mode == 'train' and self.config['resume']:
			self.start_epoch, self.train_losses, self.test_losses = self.model.load_model(mode, self.model.rnn_name, self.model.num_layers, self.model.hidden_dim)
			self.start_epoch += 1

		elif self.mode == 'test' or mode == 'test_one':
			self.model.load_model(mode, self.config['rnn'], self.model.num_layers, self.model.hidden_dim, epoch=None)

		if self.config['use_embedding']:
			self.use_embedding = True
		else:
			self.use_embedding = False

	def train(self):

		self.model.train()


		print_range = list(np.linspace(0, len(self.train_loader), self.print_per + 2, dtype=np.uint32)[1:-1])
		if self.test_per == 0:
			test_range = []
		else:
			test_range = list(np.linspace(0, len(self.train_loader), self.test_per + 2, dtype=np.uint32)[1:-1])

		for epoch in range(self.start_epoch, self.total_epochs + 1):

			epoch_loss = 0.0

			pbar = tqdm(total=len(self.train_loader), desc='Epoch [%i/%i]' % (epoch, self.total_epochs), ncols=100)

			for i, (inputs, labels, miss_chars, input_lens) in enumerate(self.train_loader):
				if self.use_embedding:
					inputs = torch.from_numpy(inputs).long()
				else:
					inputs = torch.from_numpy(inputs).float()

				labels = torch.from_numpy(labels).float()
				miss_chars = torch.from_numpy(miss_chars).float()
				input_lens = torch.from_numpy(input_lens).long()

				if self.cuda:
					inputs = inputs.cuda()
					labels = labels.cuda()
					miss_chars = miss_chars.cuda()
					input_lens = input_lens.cuda()

				self.model.optimizer.zero_grad()

				outputs = self.model(inputs, input_lens, miss_chars)
				loss, miss_penalty = self.model.calculate_loss(outputs, labels, input_lens, miss_chars, self.cuda)
				loss.backward()

				self.model.optimizer.step()

				epoch_loss += loss.item()

				if i in print_range and epoch == 1:
					pbar.set_description('Epoch [%i/%i], Loss=%.4f' % (epoch, self.total_epochs, epoch_loss / i))
				elif i in print_range and epoch > 1:
					pbar.set_description('Epoch [%i/%i], Loss=%.4f, Avg.L=%.4f, MissL=%.4f' % (
						epoch, self.total_epochs, epoch_loss / i, np.mean(np.array([x[0] for x in self.train_losses])), miss_penalty))
				pbar.update()

				if i in test_range:
					self.test(epoch)
					self.model.train()


			if epoch % self.config['reset_after'] == 0:
				self.train_loader.update_dataset(epoch)

			self.train_losses.append((epoch_loss / len(self.train_loader), epoch))

			if epoch % self.save_every == 0:
				self.model.save_model(False, epoch, self.train_losses, self.test_losses,
										self.model.rnn_name, self.model.num_layers, self.model.hidden_dim)

			if epoch % 5 == 0 and epoch < self.test_every:
				self.test(epoch)
				self.model.train()
			elif epoch % self.test_every == 0:
				self.test(epoch)
				self.model.train()

			if epoch % self.plot_every == 0:
				self.plot_loss_acc(epoch)

	def test(self, epoch=None):

		self.model.eval()

		test_loss = 0


		with torch.no_grad():

			for inputs, labels, miss_chars, input_lens in tqdm(self.test_loader, desc='Testing', ncols=100):

				if self.use_embedding:
					inputs = torch.from_numpy(inputs).long()
				else:
					inputs = torch.from_numpy(inputs).float()

				labels = torch.from_numpy(labels).float()
				miss_chars = torch.from_numpy(miss_chars).float()
				input_lens= torch.from_numpy(input_lens).long()

				if self.cuda:
					inputs = inputs.cuda()
					labels = labels.cuda()
					miss_chars = miss_chars.cuda()
					input_lens = input_lens.cuda()

				self.model.optimizer.zero_grad()
	
				outputs = self.model(inputs, input_lens, miss_chars)
				loss, miss_penalty = self.model.calculate_loss(outputs, labels, input_lens, miss_chars, self.cuda)
				test_loss += loss.item()

	
		test_loss /= len(self.test_loader)

		print("Test Loss: %.7f, Miss Penalty: %.7f" % (test_loss, miss_penalty))


		self.test_losses.append((test_loss, epoch))

		if test_loss == min([x[0] for x in self.test_losses]) and self.mode == 'train':
			print("Best new model found!")
			self.model.save_model(True, epoch, self.train_losses, self.test_losses,
								  self.model.rnn_name, self.model.num_layers, self.model.hidden_dim)

		return test_loss

	def predict(self, string, misses):
		"""
		called during inference
		:param string: word with predicted characters and blanks at remaining places
		:param misses: list of characters which were predicted but game feedback indicated that they are not present
		:param char_to_id: mapping from characters to id
		"""
	
		char_to_id = {chr(97+x): x for x in range(26)}
		
		char_to_id['.'] = len(char_to_id)

		id_to_char = {v:k for k,v in char_to_id.items()}


		if self.use_embedding:
			encoded = np.zeros((len(string)))
			for i, c in enumerate(string):
				if c == '.':
					encoded[i] = len(id_to_char) - 1
				else:
					encoded[i] = char_to_id[c]

			inputs = np.array(encoded)[None, :]
			inputs = torch.from_numpy(inputs).long()

		else:

			encoded = np.zeros((len(string), len(char_to_id)))
			for i, c in enumerate(string):
				if c == '.':
					encoded[i][len(id_to_char) - 1] = 1
				else:
					encoded[i][char_to_id[c]] = 1

			inputs = np.array(encoded)[None, :, :]
			inputs = torch.from_numpy(inputs).float()

		
		miss_encoded = np.zeros((len(char_to_id) - 1))
		for c in misses:
			miss_encoded[char_to_id[c]] = 1
		miss_encoded = np.array(miss_encoded)[None, :]
		miss_encoded = torch.from_numpy(miss_encoded).float()

		input_lens = np.array([len(string)])
		input_lens= torch.from_numpy(input_lens).long()

		if self.cuda:
			inputs = inputs.cuda()
			miss_encoded = miss_encoded.cuda()
			input_lens = input_lens.cuda()

		output = self.model(inputs, input_lens, miss_encoded)
		probs = nn.functional.softmax(output, dim=1)
		output = output.detach().cpu().numpy()[0]
		probs = probs.detach().cpu().numpy()[0]
	
		sorted_predictions = np.argsort(output)[::-1]

		return [id_to_char[x] for x in sorted_predictions], probs

	def plot_loss_acc(self, epoch):
		"""
		take train/test loss and test accuracy input and plot it over time
		:param epoch: to track performance across epochs
		"""

		plt.clf()
		fig, ax1 = plt.subplots()

		ax1.set_xlabel('Epoch')
		ax1.set_ylabel('Loss')
		ax1.plot([x[1] for x in self.train_losses], [x[0] for x in self.train_losses], color='r', label='Train Loss')
		ax1.plot([x[1] for x in self.test_losses], [x[0] for x in self.test_losses], color='b', label='Test Loss')
		ax1.tick_params(axis='y')
		ax1.legend(loc='upper left')

		fig.tight_layout() 
		plt.grid(True)
		plt.legend()
		plt.title(self.arch_name)

		filename = os.path.join(self.plots_dir, 'plot_' + self.arch_name + '_' + str(epoch) + '.png')
		plt.savefig(filename)

In [63]:
import os
import collections
from tqdm import tqdm

os.environ["CUDA_VISIBLE_DEVICES"] = "3"

In [64]:
import collections
import re
import numpy as np

freq_table = {
    1: ['a', 'i'],2: ['a', 'o', 'e', 'i', 'u', 'm', 'b', 'h'],3: ['a', 'e', 'o', 'i', 'u', 'y', 'h', 'b', 'c', 'k'],4: ['a', 'e', 'o', 'i', 'u', 'y', 's', 'b', 'f'],
    5: ['s', 'e', 'a', 'o', 'i', 'u', 'y', 'h'],6: ['e', 'a', 'i', 'o', 'u', 's', 'y'],7: ['e', 'i', 'a', 'o', 'u', 's'],8: ['e', 'i', 'a', 'o', 'u'],
    9: ['e', 'i', 'a', 'o', 'u'],10: ['e', 'i', 'o', 'a', 'u'],11: ['e', 'i', 'o', 'a', 'd'],12: ['e', 'i', 'o', 'a', 'f'],13: ['i', 'e', 'o', 'a'],14: ['i', 'e', 'o'],15: ['i', 'e', 'a'],
    16: ['i', 'e', 'h'],17: ['i', 'e', 'r'],18: ['i', 'e', 'a'],19: ['i', 'e', 'a'],20: ['i', 'e']
}
#selected based on frequency of letters that occur for each word length in the train dataset

def build_n_gram(word_list, max_n):
    # create n-gram from word list
    n_grams = {}
    for n in range(1, max_n + 1):
        n_grams[n] = collections.defaultdict(int)
        for word in word_list:
            for i in range(len(word) - n + 1):
                n_grams[n][word[i:i + n]] += 1
    return n_grams


def build_n_gram_from_file(file_path, max_n=5):

    with open(file_path, 'r') as f:
        word_list = f.read().splitlines()
    return build_n_gram(word_list, max_n)


def get_n_gram_prob(n_grams, word, guessed_letters):

    not_guessed_letters = [i for i in range(26) if chr(97 + i) not in guessed_letters]

    next_letter_count = np.zeros(26, dtype=float)
    alphas = [0.05, 0.1, 0.2, 0.3, 0.5]

    gram_1_count = np.array([n_grams[1][chr(97 + j)] if j in not_guessed_letters else 0 for j in range(26)])
    next_letter_count += alphas[0] * (gram_1_count / sum(gram_1_count))

    for i in range(len(word) - 1):

        gram_2_count = np.zeros(26)
        if word[i] == '.' and word[i+1] != '.':
            gram_2_count = np.array([n_grams[2][chr(97 + j) + word[i+1]] if j in not_guessed_letters else 0 for j in range(26)])
        elif word[i] != '.' and word[i+1] == '.':
            gram_2_count = np.array([n_grams[2][word[i] + chr(97 + j)] if j in not_guessed_letters else 0 for j in range(26)])
        if sum(gram_2_count) != 0:
            next_letter_count += alphas[1] * (gram_2_count / sum(gram_2_count))

    for i in range(len(word) - 2):
        gram_3_count = np.zeros(26)
 
        if word[i] == '.' and word[i+1] != '.' and word[i+2] != '.':
            gram_3_count = np.array([n_grams[3][chr(97 + j) + word[i+1:i+3]] if j in not_guessed_letters else 0 for j in range(26)])
        elif word[i] != '.' and word[i+1] == '.' and word[i+2] != '.':
            gram_3_count = np.array([n_grams[3][word[i] + chr(97 + j) + word[i+2]] if j in not_guessed_letters else 0 for j in range(26)])
        elif word[i] != '.' and word[i+1] != '.' and word[i+2] == '.':
            gram_3_count = np.array([n_grams[3][word[i:i+2] + chr(97 + j)] if j in not_guessed_letters else 0 for j in range(26)])
        if sum(gram_3_count) != 0:
            next_letter_count += alphas[2] * (gram_3_count / sum(gram_3_count))

    for i in range(len(word) - 3):
        gram_4_count = np.zeros(26)

        if word[i] == '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] != '.':
            gram_4_count = np.array([n_grams[4][chr(97 + j) + word[i+1:i+4]] if j in not_guessed_letters else 0 for j in range(26)])
        elif word[i] != '.' and word[i+1] == '.' and word[i+2] != '.' and word[i+3] != '.':
            gram_4_count = np.array([n_grams[4][word[i] + chr(97 + j) + word[i+2:i+4]] if j in not_guessed_letters else 0 for j in range(26)])
        elif word[i] != '.' and word[i+1] != '.' and word[i+2] == '.' and word[i+3] != '.':
            gram_4_count = np.array([n_grams[4][word[i:i+2] + chr(97 + j) + word[i+3]] if j in not_guessed_letters else 0 for j in range(26)])
        elif word[i] != '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] == '.':
            gram_4_count = np.array([n_grams[4][word[i:i+3] + chr(97 + j)] if j in not_guessed_letters else 0 for j in range(26)])
        
        if sum(gram_4_count) != 0:
            next_letter_count += alphas[3] * (gram_4_count / sum(gram_4_count))

        gram_4_2_count = np.zeros(26)

        if word[i] == '.' and word[i+1] == '.' and word[i+2] != '.' and word[i+3] != '.':
            for j in range(26):
                for k in range(26):
                    if j in not_guessed_letters and k in not_guessed_letters:
                        gram_4_2_count[j] += n_grams[4][chr(97 + j) + chr(97 + k) + word[i+2:i+4]]
                        gram_4_2_count[k] += n_grams[4][chr(97 + j) + chr(97 + k) + word[i+2:i+4]]
        elif word[i] != '.' and word[i+1] == '.' and word[i+2] == '.' and word[i+3] != '.':
            for j in range(26):
                for k in range(26):
                    if j in not_guessed_letters and k in not_guessed_letters:
                        gram_4_2_count[j] += n_grams[4][word[i] + chr(97 + j) + chr(97 + k) + word[i+3]]
                        gram_4_2_count[k] += n_grams[4][word[i] + chr(97 + j) + chr(97 + k) + word[i+3]]
        elif word[i] != '.' and word[i+1] != '.' and word[i+2] == '.' and word[i+3] == '.':
            for j in range(26):
                for k in range(26):
                    if j in not_guessed_letters and k in not_guessed_letters:
                        gram_4_2_count[j] += n_grams[4][word[i:i+2] + chr(97 + j) + chr(97 + k)]
                        gram_4_2_count[k] += n_grams[4][word[i:i+2] + chr(97 + j) + chr(97 + k)]
        elif word[i] == '.' and word[i+1] != '.' and word[i+2] == '.' and word[i+3] != '.':
            for j in range(26):
                for k in range(26):
                    if j in not_guessed_letters and k in not_guessed_letters:
                        gram_4_2_count[j] += n_grams[4][chr(97 + j) + word[i+1] + chr(97 + k) + word[i+3]]
                        gram_4_2_count[k] += n_grams[4][chr(97 + j) + word[i+1] + chr(97 + k) + word[i+3]]
        elif word[i] != '.' and word[i+1] == '.' and word[i+2] != '.' and word[i+3] == '.':
            for j in range(26):
                for k in range(26):
                    if j in not_guessed_letters and k in not_guessed_letters:
                        gram_4_2_count[j] += n_grams[4][word[i] + chr(97 + j) + word[i+2] + chr(97 + k)]
                        gram_4_2_count[k] += n_grams[4][word[i] + chr(97 + j) + word[i+2] + chr(97 + k)]
        elif word[i] == '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] == '.':
            for j in range(26):
                for k in range(26):
                    if j in not_guessed_letters and k in not_guessed_letters:
                        gram_4_2_count[j] += n_grams[4][chr(97 + j) + word[i+1:i+3] + chr(97 + k)]
                        gram_4_2_count[k] += n_grams[4][chr(97 + j) + word[i+1:i+3] + chr(97 + k)]
        
        if sum(gram_4_2_count) != 0:
            next_letter_count += (alphas[3] / 2) * (gram_4_2_count / sum(gram_4_2_count))

    for i in range(len(word) - 4):
        dot_count = sum([1 for c in word[i:i+5] if c == '.'])
        
        if dot_count == 1:
            gram_5_count = np.zeros(26)

            if word[i] == '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] != '.' and word[i+4] != '.':
                gram_5_count = np.array([n_grams[5][chr(97 + j) + word[i+1:i+5]] if j in not_guessed_letters else 0 for j in range(26)])
            elif word[i] != '.' and word[i+1] == '.' and word[i+2] != '.' and word[i+3] != '.' and word[i+4] != '.':
                gram_5_count = np.array([n_grams[5][word[i] + chr(97 + j) + word[i+2:i+5]] if j in not_guessed_letters else 0 for j in range(26)])
            elif word[i] != '.' and word[i+1] != '.' and word[i+2] == '.' and word[i+3] != '.' and word[i+4] != '.':
                gram_5_count = np.array([n_grams[5][word[i:i+2] + chr(97 + j) + word[i+3:i+5]] if j in not_guessed_letters else 0 for j in range(26)])
            elif word[i] != '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] == '.' and word[i+4] != '.':
                gram_5_count = np.array([n_grams[5][word[i:i+3] + chr(97 + j) + word[i+4]] if j in not_guessed_letters else 0 for j in range(26)])
            elif word[i] != '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] != '.' and word[i+4] == '.':
                gram_5_count = np.array([n_grams[5][word[i:i+4] + chr(97 + j)] if j in not_guessed_letters else 0 for j in range(26)])
            if sum(gram_5_count) != 0:
                next_letter_count += alphas[4] * (gram_5_count / sum(gram_5_count))

        elif dot_count == 2:
            gram_5_2_count = np.zeros(26)
            if word[i] == '.' and word[i+1] == '.' and word[i+2] != '.' and word[i+3] != '.' and word[i+4] != '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][chr(97 + j) + chr(97 + k) + word[i+2:i+5]]
                            gram_5_2_count[k] += n_grams[5][chr(97 + j) + chr(97 + k) + word[i+2:i+5]]
            elif word[i] != '.' and word[i+1] == '.' and word[i+2] == '.' and word[i+3] != '.' and word[i+4] != '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][word[i] + chr(97 + j) + chr(97 + k) + word[i+3:i+5]]
                            gram_5_2_count[k] += n_grams[5][word[i] + chr(97 + j) + chr(97 + k) + word[i+3:i+5]]
            elif word[i] != '.' and word[i+1] != '.' and word[i+2] == '.' and word[i+3] == '.' and word[i+4] != '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][word[i:i+2] + chr(97 + j) + chr(97 + k) + word[i+4]]
                            gram_5_2_count[k] += n_grams[5][word[i:i+2] + chr(97 + j) + chr(97 + k) + word[i+4]]
            elif word[i] != '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] == '.' and word[i+4] == '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][word[i:i+3] + chr(97 + j) + chr(97 + k)]
                            gram_5_2_count[k] += n_grams[5][word[i:i+3] + chr(97 + j) + chr(97 + k)]
            elif word[i] == '.' and word[i+1] != '.' and word[i+2] == '.' and word[i+3] != '.' and word[i+4] != '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][chr(97 + j) + word[i+1] + chr(97 + k) + word[i+3:i+5]]
                            gram_5_2_count[k] += n_grams[5][chr(97 + j) + word[i+1] + chr(97 + k) + word[i+3:i+5]]
            elif word[i] == '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] == '.' and word[i+4] != '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][chr(97 + j) + word[i+1:i+3] + chr(97 + k) + word[i+4]]
                            gram_5_2_count[k] += n_grams[5][chr(97 + j) + word[i+1:i+3] + chr(97 + k) + word[i+4]]
            elif word[i] == '.' and word[i+1] != '.' and word[i+2] != '.' and word[i+3] != '.' and word[i+4] == '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][chr(97 + j) + word[i+1:i+4] + chr(97 + k)]
                            gram_5_2_count[k] += n_grams[5][chr(97 + j) + word[i+1:i+4] + chr(97 + k)]
            elif word[i] != '.' and word[i+1] == '.' and word[i+2] != '.' and word[i+3] == '.' and word[i+4] != '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][word[i] + chr(97 + j) + word[i+2] + chr(97 + k) + word[i+4]]
                            gram_5_2_count[k] += n_grams[5][word[i] + chr(97 + j) + word[i+2] + chr(97 + k) + word[i+4]]
            elif word[i] != '.' and word[i+1] == '.' and word[i+2] != '.' and word[i+3] != '.' and word[i+4] == '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][word[i] + chr(97 + j) + word[i+2:i+4] + chr(97 + k)]
                            gram_5_2_count[k] += n_grams[5][word[i] + chr(97 + j) + word[i+2:i+4] + chr(97 + k)]
            elif word[i] != '.' and word[i+1] != '.' and word[i+2] == '.' and word[i+3] != '.' and word[i+4] == '.':
                for j in range(26):
                    for k in range(26):
                        if j in not_guessed_letters and k in not_guessed_letters:
                            gram_5_2_count[j] += n_grams[5][word[i:i+2] + chr(97 + j) + word[i+3] + chr(97 + k)]
                            gram_5_2_count[k] += n_grams[5][word[i:i+2] + chr(97 + j) + word[i+3] + chr(97 + k)]
            
            if sum(gram_5_2_count) != 0:
                next_letter_count += (alphas[4] / 2) * (gram_5_2_count / sum(gram_5_2_count))

        next_letter_count /= sum(next_letter_count)

    return next_letter_count
    


In [65]:
import json
import requests
import random
import string
import secrets
import time
import re
import collections

try:
    from urllib.parse import parse_qs, urlencode, urlparse
except ImportError:
    from urlparse import parse_qs, urlparse
    from urllib import urlencode

from requests.packages.urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

In [66]:
class HangmanAPI(object):
    def __init__(self, access_token=None, session=None, timeout=None):
        self.hangman_url = self.determine_hangman_url()
        self.access_token = access_token
        self.session = session or requests.Session()
        self.timeout = timeout
        self.guessed_letters = []
        self.misses = []
        
        full_dictionary_location = "words_250000_train.txt"
        self.full_dictionary = self.build_dictionary(full_dictionary_location)        
        self.full_dictionary_common_letter_sorted = collections.Counter("".join(self.full_dictionary)).most_common()
        self.model = dl_model('test_one')
        self.current_dictionary = []
        self.n_grams = build_n_gram_from_file(full_dictionary_location)

    @staticmethod
    def determine_hangman_url():
        links = ['https://trexsim.com', 'https://sg.trexsim.com']

        data = {link: 0 for link in links}

        for link in links:

            requests.get(link)

            for i in range(10):
                s = time.time()
                requests.get(link)
                data[link] = time.time() - s

        link = sorted(data.items(), key=lambda x: x[1])[0][0]
        link += '/trexsim/hangman'
        return link

    def guess(self, word): # word input example: "_ p p _ e "
        ###############################################
        # Replace with your own "guess" function here #
        ###############################################

        # clean the word so that we strip away the space characters
        # replace "_" with "." as "." indicates any character in regular expressions
        self.misses = []
        for char in self.guessed_letters:
            # Check if the character is not in the word
            if char not in word and char != '_':
                # Append to missed_letters list
                self.misses.append(char)

        clean_word = word[::2].replace("_",".")
        len_word = len(clean_word)
        len_right_letters = len(clean_word) - clean_word.count('.')
    
        if len_right_letters == 0 and len_word in freq_table:
            order = freq_table[len_word]
            for letter in order:
                if letter not in self.guessed_letters:
                    # print("first guess: ", letter)
                    return letter 
        
        ngram_probab = get_n_gram_prob(self.n_grams, clean_word, self.guessed_letters)

        best_chars, nn_probab = self.model.predict(clean_word, self.misses)

        nn_probab = [p if chr(i+97) not in self.misses and chr(i+97) not in clean_word else 0.0 for i,p in enumerate(nn_probab)]
        nn_probab = [p/sum(nn_probab) for p in nn_probab]

        final_probab = nn_probab + ngram_probab
        guess_letter = chr(final_probab.argmax() + 97)

        return guess_letter

    ##########################################################
    # You'll likely not need to modify any of the code below #
    ##########################################################
    
    def build_dictionary(self, dictionary_file_location):
        text_file = open(dictionary_file_location,"r")
        full_dictionary = text_file.read().splitlines()
        text_file.close()
        return full_dictionary
                
    def start_game(self, practice=True, verbose=True):
        # reset guessed letters to empty set and current plausible dictionary to the full dictionary
        self.guessed_letters = []
        self.misses = []
        self.current_dictionary = self.full_dictionary
                         
        response = self.request("/new_game", {"practice":practice})
        if response.get('status')=="approved":
            game_id = response.get('game_id')
            word = response.get('word')
            tries_remains = response.get('tries_remains')
            if verbose:
                print("Successfully start a new game! Game ID: {0}. # of tries remaining: {1}. Word: {2}.".format(game_id, tries_remains, word))
            while tries_remains>0:
                # get guessed letter from user code
                guess_letter = self.guess(word)
                    
                # append guessed letter to guessed letters field in hangman object
                self.guessed_letters.append(guess_letter)
                if verbose:
                    print("Guessing letter: {0}".format(guess_letter))
                    
                try:    
                    res = self.request("/guess_letter", {"request":"guess_letter", "game_id":game_id, "letter":guess_letter})
                except HangmanAPIError:
                    print('HangmanAPIError exception caught on request.')
                    continue
                except Exception as e:
                    print('Other exception caught on request.')
                    raise e
               
                if verbose:
                    print("Sever response: {0}".format(res))
                status = res.get('status')
                tries_remains = res.get('tries_remains')
                if status=="success":
                    if verbose:
                        print("Successfully finished game: {0}".format(game_id))
                    return True
                elif status=="failed":
                    reason = res.get('reason', '# of tries exceeded!')
                    if verbose:
                        print("Failed game: {0}. Because of: {1}".format(game_id, reason))
                    return False
                elif status=="ongoing":
                    word = res.get('word')
        else:
            if verbose:
                print("Failed to start a new game")
        return status=="success"
        
    def my_status(self):
        return self.request("/my_status", {})
    
    def request(
            self, path, args=None, post_args=None, method=None):
        if args is None:
            args = dict()
        if post_args is not None:
            method = "POST"

        # Add `access_token` to post_args or args if it has not already been
        # included.
        if self.access_token:
            # If post_args exists, we assume that args either does not exists
            # or it does not need `access_token`.
            if post_args and "access_token" not in post_args:
                post_args["access_token"] = self.access_token
            elif "access_token" not in args:
                args["access_token"] = self.access_token

        time.sleep(0.2)

        num_retry, time_sleep = 50, 2
        for it in range(num_retry):
            try:
                response = self.session.request(
                    method or "GET",
                    self.hangman_url + path,
                    timeout=self.timeout,
                    params=args,
                    data=post_args,
                    verify=False
                )
                break
            except requests.HTTPError as e:
                response = json.loads(e.read())
                raise HangmanAPIError(response)
            except requests.exceptions.SSLError as e:
                if it + 1 == num_retry:
                    raise
                time.sleep(time_sleep)

        headers = response.headers
        if 'json' in headers['content-type']:
            result = response.json()
        elif "access_token" in parse_qs(response.text):
            query_str = parse_qs(response.text)
            if "access_token" in query_str:
                result = {"access_token": query_str["access_token"][0]}
                if "expires" in query_str:
                    result["expires"] = query_str["expires"][0]
            else:
                raise HangmanAPIError(response.json())
        else:
            raise HangmanAPIError('Maintype was not text, or querystring')

        if result and isinstance(result, dict) and result.get("error"):
            raise HangmanAPIError(result)
        return result
    
class HangmanAPIError(Exception):
    def __init__(self, result):
        self.result = result
        self.code = None
        try:
            self.type = result["error_code"]
        except (KeyError, TypeError):
            self.type = ""

        try:
            self.message = result["error_description"]
        except (KeyError, TypeError):
            try:
                self.message = result["error"]["message"]
                self.code = result["error"].get("code")
                if not self.type:
                    self.type = result["error"].get("type", "")
            except (KeyError, TypeError):
                try:
                    self.message = result["error_msg"]
                except (KeyError, TypeError):
                    self.message = result

        Exception.__init__(self, self.message)

In [67]:
api = HangmanAPI(access_token="77554fdf3f995010f852fdde021f1d", timeout=2000)

Architecture: GRU_4_512_26
models/GRU_4_512_26/best_GRU_4_512.pth
Loaded pretrained model from: models/GRU_4_512_26/best_GRU_4_512.pth


In [89]:
for i in range(1000):
    print('Playing ', i, ' th game')
    # Uncomment the following line to execute your final runs. Do not do this until you are satisfied with your submission
    api.start_game(practice=0,verbose=False)
    
    # DO NOT REMOVE as otherwise the server may lock you out for too high frequency of requests
    time.sleep(0.5)

Playing  0  th game
Playing  1  th game
Playing  2  th game
Playing  3  th game
Playing  4  th game
Playing  5  th game
Playing  6  th game
Playing  7  th game
Playing  8  th game
Playing  9  th game
Playing  10  th game
Playing  11  th game
Playing  12  th game
Playing  13  th game
Playing  14  th game
Playing  15  th game
Playing  16  th game
Playing  17  th game
Playing  18  th game
Playing  19  th game
Playing  20  th game
Playing  21  th game
Playing  22  th game
Playing  23  th game
Playing  24  th game
Playing  25  th game
Playing  26  th game
Playing  27  th game
Playing  28  th game
Playing  29  th game
Playing  30  th game
Playing  31  th game
Playing  32  th game
Playing  33  th game
Playing  34  th game
Playing  35  th game
Playing  36  th game
Playing  37  th game
Playing  38  th game
Playing  39  th game
Playing  40  th game
Playing  41  th game
Playing  42  th game
Playing  43  th game
Playing  44  th game
Playing  45  th game
Playing  46  th game
Playing  47  th game
Pl

HangmanAPIError: {'error': 'You have reached 1000 of games', 'status': 'denied'}

In [90]:
[total_practice_runs,total_recorded_runs,total_recorded_successes,total_practice_successes] = api.my_status() # Get my game stats: (# of tries, # of wins)
success_rate = total_recorded_successes/total_recorded_runs
print('overall success rate = %.3f' % success_rate)

overall success rate = 0.619
