In [None]:
import json, pathlib, random, time
from collections import defaultdict
import numpy as np
import pandas as pd
import multiprocessing as mp


In [None]:
with open('lists.json') as f:
    j = json.load(f)

target_list = j['target']
guess_list = j["guess"]


In [None]:
def char_freq(lst):
    hist = defaultdict(int)
    for word in lst:
        for char in word:
            hist[char] += 1
    mx = max(hist.values())
    for char in hist:
        hist[char] /= mx
    return hist

def print_char_freq(cf):
    for char in sorted(list(cf.keys())):
        print(f'{char}: {cf[char]}')
        
def freq_score(word, cf):
    return sum(cf[x] for x in word) / len(word) 

def uniq_score(word):
    return (len(word) - len(set(word))) / (len(word) - 2)

In [None]:

cf = char_freq(target_list)
#print_char_freq(cf)

In [None]:
print(random.choice(guess_list))
print(random.choice(target_list))

In [None]:
dfg = pd.DataFrame([[w, freq_score(w, cf), uniq_score(w), 1] for w in guess_list], columns=['word', 'freq_score', 'uniq_score', 'is_guess_word'])
dft = pd.DataFrame([[w, freq_score(w, cf), uniq_score(w), 0] for w in target_list], columns=['word', 'freq_score', 'uniq_score', 'is_guess_word'])
df = dfg.append(dft)
df.set_index('word', inplace=True)

In [None]:
df.iloc[2801]

In [None]:


def word_to_action(word, guesses, history):
    return dfword_to_action((word, df.loc[word]), guesses, history)
    
def dfword_to_action(dfword, guesses, history):
    #the action is going to be a word that we will submit next
    #for the purposes of feeding into the model, we will represent the action word as:
    #  how many of the entries in the hint history this word conforms to
    #  how many untried letters it gives us
    #  the number of uniq letters in the word
    #  the frequency of the letters in the word
    #  whether or not the word is in the guess list (as opposed to the target list)
    word = dfword[0]
    dfword = dfword[1]
    if guesses:
        conforms_to_history = sum([int(validate_against_hint(word,g,history[i])) for i,g in enumerate(guesses)]) / len(guesses)
    else: # we haven't made any guess yet, so this must conform
        conforms_to_history = 1.0
    num_untried_letters = len(set(word) - set(''.join(guesses))) / 5 #normalise to 1
    return np.array([conforms_to_history, num_untried_letters, dfword['freq_score'], dfword['uniq_score'], dfword['is_guess_word']])
    

In [None]:
df_global = None

def construct_actions_global(arg): #guesses, history, start_idx, end_idx):
    #global df_global
    guesses, history, start_idx, end_idx = arg
    #print(guesses, history, start_idx, end_idx)
    return np.array([dfword_to_action(dfword, guesses, history) for dfword in df.iloc[start_idx:end_idx].iterrows()])
    
class ActionSpace:
    def __init__(self, n):
        self.n = n
    
    
class Env:
    def __init__(self, df, target_word=None):
        self.df = df
        if target_word:
            self.target = target_word
        else:
            self.target = df[df['is_guess_word'] == 0.0].sample().iloc[0].name
            
        self.num_letters = len(self.target)
        self.num_guesses = 6
        
        self.num_processes = mp.cpu_count() - 1
        self.action_space = ActionSpace(len(self.df))
        self.reset()
        
    def submit_guess(self, guess):
        wrongplace = [0] * len(self.target)
        hints = np.zeros(len(self.target))
        rightplace = [guess[n] == chrt for n,chrt in enumerate(self.target)]
        
        for n,chrt in enumerate(self.target):
            if rightplace[n] == 1: continue #this character has already been scored, skip it
            for m,chrg in enumerate(guess):
                if n == m: continue # we've already checked rightplace matches above
                if chrt != chrg: continue
                if wrongplace[m] == 1: continue
                if rightplace[m] == 1: continue
                
                wrongplace[m] = 1
                break

        for i in range(len(self.target)):
            hints[i] = 2 if rightplace[i] == 1 else wrongplace[i]
        
        return hints
    
    def reset(self):
        self.history = np.array([[]])
        self.guesses = []
        
    def construct_actions(self):
        return np.array([dfword_to_action(dfword, self.guesses, self.history) for dfword in self.df.iterrows()])
    
    def construct_actions_mp(self):
        #global df_global
        
        grp_lst_args = []
        grp_guesses = [self.guesses] * self.num_processes
        grp_history = [self.history] * self.num_processes
        
        #df_global = self.df
        chunk_size = int(len(self.df) / self.num_processes) + 1
        start_offsets = list(range(0, len(self.df), chunk_size))
        end_offsets = start_offsets[1:] + [len(self.df)]
        grp_lst_args = list(zip(grp_guesses, grp_history, start_offsets, end_offsets))
        
        #print(grp_lst_args)
        self.pool = mp.Pool(processes=self.num_processes)
        results = self.pool.map(construct_actions_global, grp_lst_args)
        self.pool.close()
        self.pool.join()
        return np.concatenate(results)
    
    def construct_state(self):
        #print(history)
        #so the state is going to be:
            #  The number of green locations we know
            #  The number of other letters we know to be in the word
            #  The sequence number of the guess (1st guess, 2nd guess etc.)

        #the number of locations which were green at some point in the history
        num_green_locs = np.count_nonzero(self.history.max(axis=0) == 2)

        green_chars = [self.guesses[x][y] for x,y in np.argwhere(self.history == 2) ]
        orange_chars = [self.guesses[x][y] for x,y in np.argwhere(self.history == 1) ]
        num_other_letters = len(set(orange_chars) - set(green_chars))

        sequence_number = self.history.shape[0]

        return np.array([num_green_locs, num_other_letters, sequence_number]) / 5

    def step_by_index(self, guess_idx):
        return self.step(self.df.iloc[guess_idx].name)
    
    
    def step(self, guess): #returns state, reward, done, actions
        #print(actions)
        hints = e.submit_guess(guess)

        print(f'======={guess} => {hints}========')
        print(list(zip(self.guesses,self.history)))
        if self.history.size == 0:
            self.history = np.expand_dims(hints,0)
        else:
            self.history = np.row_stack([self.history, hints])
        self.guesses.append(guess)
        if hints.sum() == self.num_letters * 2 or len(self.guesses) == self.num_guesses:
            reward = hints.sum()
            done = True
        else:
            reward = -1
            done = False
        state = self.construct_state() 
        actions = self.construct_actions_mp()
        return state, reward, done, actions

    
def hint_to_hinty(hint):
    #hint takes form [0,1,2,1,0]
    #hinty takes form {2:[2], 1:[1,3], 0:[0,4]}
    hinty = {}
    for n in [0,1,2]:
        hinty[n] = [i for i, x in enumerate(hint) if x == n]
    #print(f'hint_to_hinty() {hint}, {hinty}')
    return hinty
    
def validate_against_hint(word, guess, hint):
    return validate_against_hinty(word, guess, hint_to_hinty(hint))

def validate_against_hinty(word, guess, hinty):
    #hinty takes form {2:[idx,..], 1:[idx,..], 0:[idx,..]}
    for idx in hinty[2]: # check the fixed letters first
        if word[idx] != guess[idx]:
            return False
    for idx in hinty[0]:
        #get the number of times char appears in target word (minus the times it appears in the correct location)
        indices = [i for i,x in enumerate(word) if x == guess[idx] and i not in hinty[2]]
        #get number of times char appears in guess word in the wrong location
        indices_g = [n for n,x in enumerate(guess) if x == guess[idx] and n in hinty[1]]
        #we already know that there is one not-exist hint for this char, so
        #if there are more fewer wrong location hints for this letter than there are actual occurrences of the letter
        #then the hint does not validate against this word
        if len(indices) > len(indices_g):
            return False
    for idx in hinty[1]:
        if word[idx] == guess[idx]:
            return False
        #get all the indices of the character in the target word
        indices = [i for i,x in enumerate(word) if x == guess[idx] and i not in hinty[2]]
        #remove all the indices where there is already a fixed position hint
        
        #now count all the occurences of the char in guess where the location is wrong
        indices_g = [i for i,x in enumerate(guess) if x == guess[idx] and i in hinty[1]]
        #if there are more wrong loc hints for this char than there are actual occurrences, then it must be bogus
        if len(indices) < len(indices_g):
            return False
    return True            
    

In [None]:
e = Env(df)
e.reset()
st = time.time()
rmp = e.construct_actions_mp()
print(time.time() - st)
e.reset()
st = time.time()
r = e.construct_actions()
print(time.time() - st)

print(r.__class__)
print(r.shape)

print(rmp.__class__)
print(rmp.shape)

print((r == rmp).all())


In [None]:
e_simple = Env(target_list, target_word='abcde')
tests_simple = {'abcde': [2,2,2,2,2],
         'acbde': [2,1,1,2,2],
         'azcde': [2,0,2,2,2],
         'aacde': [2,0,2,2,2],
         'zacde': [0,1,2,2,2],
         'zzdzz': [0,0,1,0,0],
         'zzddz': [0,0,0,2,0],
         'zdddz': [0,0,0,2,0],
         'ddddd': [0,0,0,2,0],
         'zzzdd': [0,0,0,2,0],
         'zzdez': [0,0,1,1,0]}

e_repeat = Env(target_list, target_word='abcae')
tests_repeat = {'abcde': [2,2,2,0,2],
         'acbde': [2,1,1,0,2],
         'azcde': [2,0,2,0,2],
         'aacde': [2,1,2,0,2],
         'zacde': [0,1,2,0,2],
         'zzdzz': [0,0,0,0,0],
         'zzddz': [0,0,0,0,0],
         'zdddz': [0,0,0,0,0],
         'ddddd': [0,0,0,0,0],
         'zzzdd': [0,0,0,0,0],
         'zzdez': [0,0,0,1,0],
         'aaaaa': [2,0,0,2,0],
         'aaaza': [2,1,0,0,0],
         'zaazz': [0,1,1,0,0],
         'zaaza': [0,1,1,0,0]}

for e,tests in [(e_simple, tests_simple),(e_repeat, tests_repeat)]:
    for guess,expected in tests.items():
        #guess = random.choice(guess_list + target_list)
        actual = e.submit_guess(guess)
        hinty = hint_to_hinty(expected)
        hinty_valid = validate_against_hinty(e.target, guess, hinty)
        print(e.target, guess, actual, expected, expected == actual, hinty_valid)

In [None]:
def random_guess(guess_list, target_list):
    guess_idx = random.randint(0, len(guess_list) + len(target_list))
    is_guess = guess_idx < len(guess_list)
    if is_guess:
        word = guess_list[guess_idx]
    else:
        word = target_list[guess_idx - len(guess_list)]
    return word, is_guess

In [None]:
#'beast'
e = Env(df, target_word='beast')
e.step('treat')
#e.guesses = ['treat']
#e.history = np.array([[0.0, 0.0, 1.0, 1.0, 2.0]])
#Env(target_list, target_word='beast').submit_guess('treat')
print(e.guesses, e.history)
actual = e.construct_state()
expected = [0.2, 0.4, 0.2]
print(expected, actual, expected == actual)

actual = word_to_action('feast', ['treat'], np.array([[0.0, 0.0, 1.0, 1.0, 2.0]]))
expected = [1.0, 0.4, 0.62287105, 0.0, 0.0]
print(expected, actual, expected == actual)


In [None]:
num_guesses = 6
e = Env(df)

print(e.target)
num_letters = len(e.target)
history = np.array([[]])
guesses = []
rewards = []
for i in range(num_guesses):
    #guess, is_guess_list = random_guess(guess_list, target_list)
    actions = e.construct_actions_mp()
    state = e.construct_state()
    #here feed it into a model to choose the word
    #guess, value = np.argmax(model(state)) # but do this epsilon greedy
    
    #print(actions)
    hints = e.submit_guess(guess)
    
    print(f'======={guess}========')
    print(list(zip(guesses,history)))
    if history.size == 0:
        history = np.expand_dims(hints,0)
    else:
        history = np.row_stack([history, hints])
    guesses.append(guess)
    if hints.sum() == num_letters * 2 or i == num_guesses - 1:
        reward = hints.sum()
        done = True
    else:
        reward = -1
        done = False
    
    

    
#so the state is going to be:
#  The number of green locations we know
#  The number of other letters we know to be in the word
#  The sequence number of the guess (1st guess, 2nd guess etc.)

#the action is going to be a word that we will submit next
#for the purposes of feeding into the model, we will represent the action word as:
#  whether or not it conforms to the hint history
#  how many new letters it gives us
#  the number of uniq letters in the word
#  the frequency of the letters in the word

#the reward is going to be:
#  -1 on all states except the last one
#  on the last state (which can either be after guess 6 or on guessing the correct word):
#    the sum of the last hint (ie. 2 for a correct letter/position combo, 1 for a letter in the wrong place)

In [None]:
#https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T


# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([],maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class DQN(nn.Module):

    def __init__(self, inputs):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(inputs, 20)
        self.fc2 = nn.Linear(20, 16)
        self.fc3 = nn.Linear(16, 20)
        self.head = nn.Linear(20, 1)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = x.to(device)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        return self.head(x)

In [None]:
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10


# Get number of actions from gym action space
#n_actions = env.action_space.n
n_action_features = 5
n_state_features = 3
n_input_features = n_action_features + n_state_features

policy_net = DQN(n_input_features).to(device)
target_net = DQN(n_input_features).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.RMSprop(policy_net.parameters())
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state, actions):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if True or sample > eps_threshold:
        with torch.no_grad():
            #now combine the state (shape 3,) and action (shape 5, n) into one input array (shape 8,n)
            #first expand the state so that it is shape 3,1
            #then repeat it to 3,n
            states = np.repeat(np.expand_dims(state, 0), actions.shape[0], axis=0)
            print(f'states shape {states.shape} actions shape {actions.shape}')
            #then concatenate to 8,n
            state_actions = np.concatenate((states, actions), axis=1)
            # policy_net(state_action) will return a single value estimate for each state/action row
            # so, probably shape (1,n)
            # Then return the index which has the max value
            
            estimate = policy_net(torch.tensor(state_actions, device=device, dtype=torch.float))
            #print(f'ESTIMATE>>>{estimate.__class__} {estimate.shape} {estimate} {estimate.max(0).indices.item()}<<<')
            return estimate.max(0).indices.item()
    else:
        randindex = random.randrange(len(actions))
        print(f'returning random index {randindex}')
        return randindex #torch.tensor([[randindex]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations():
    plt.figure(2)
    plt.clf()
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        display.clear_output(wait=True)
        display.display(plt.gcf())

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [None]:
#https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
env = Env(df)
num_episodes = 50
print(env.target)
starting_actions = env.construct_actions()
starting_state = env.construct_state()
for i_episode in range(num_episodes):
    # Initialize the environment and state

    state = starting_state
    actions = starting_actions
    for t in count():
        # Select and perform an action
        #print(state, actions)
        action_idx = select_action(state, actions)
        print(f'action {action_idx}')
        next_state, reward, done, next_actions = env.step_by_index(action_idx)
        print(f'reward {reward}')
        reward = torch.tensor([reward], device=device)

        # Store the transition in memory
        memory.push(state, actions[action_idx], reward)

        # Move to the next state
        state = next_state
        actions = next_actions

        # Perform one step of the optimization (on the policy network)
        optimize_model()
        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break
    # Update the target network, copying all weights and biases in DQN
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')
env.render()
env.close()
plt.ioff()
plt.show()