In [85]:
import random
import string
import numpy as np

class WordleEnv:
    def __init__(self, word_length=5, max_attempts=6, subset_size=None):
        #Length of wordle words, 5
        self.word_length = word_length
        # Possible number of attempts in wordle, 6
        self.max_attempts = max_attempts
        # We will store the target word in this
        self.target_word = ''
        # Initializations for the attempts, we will update this when we reset the environment
        self.attempts_left = 0
        self.attempts = 0
        # Current actions will be stored in this variable
        self.current_guess = ''
        # Opening the txt file containing possible words and get a random subset of them if necessary
        with open('previous_wordles.txt', 'r') as f:
        #with open('wordle_words.txt', 'r') as f:
            words = [word.strip().upper() for word in f.readlines() if len(word.strip()) == word_length]
            if subset_size is not None:
                words = self.get_random_subset(words, subset_size)
            self.words = words
        # State space has 78 dimensions (3 for each letter, gray, yellow, and green states)
        self.state_size = 78
        # Possible actions are the number of words in the dataset
        self.action_size = len(self.words)
        # Current state starts as all zeros one hot encoded matrix, then it will be built after each move
        self.current_state = np.zeros(self.state_size, dtype=np.float32)
        
        
    # This function removes incompatible words based on current guesses.
    def remove_incompatible_words(self, current_guess):
        new_available_actions = []
        for i in self.available_actions:
            word = self.words[i]
            compatible = True
            for idx, (guess_char, target_char) in enumerate(zip(current_guess, self.target_word)):
                if guess_char == target_char and word[idx] != guess_char:
                    compatible = False
                    break
                elif guess_char != target_char and word[idx] == guess_char:
                    compatible = False
                    break
            if compatible:
                new_available_actions.append(i)

        # Ensure at least one word is left in the available word list
        if len(new_available_actions) > 0:
            self.available_actions = new_available_actions

    # This function masks action for the incompatible actions.
    def mask_action(self, action):
        self.available_actions.remove(action)
    
    # This function gets a random subset of words
    def get_random_subset(self, words, subset_size):
        return random.sample(words, subset_size)
    
    # This function chooses a random number between 0 and length of dataset, which will be transformed into word based on the index.
    def get_random_action(self):
        return random.randint(0, self.action_size - 1)

    # Before starting each episode, the environment is resetted to give the initial conditions.
    def reset(self):
        self.target_word = random.choice(self.words)
        self.attempts_left = self.max_attempts
        self.attempts = 0
        self.current_guess = '_' * self.word_length
        self.available_actions = list(range(self.action_size))
        #self.available_actions = self.words
        self.current_state = np.zeros(self.state_size, dtype=np.float32)
        
        return self.current_state

    # Each time we make an action (make a guess), we check how many of the letters are correct.
    def step(self, action):
        self.current_guess = self.words[action]
        self.mask_action(action)  # Mask the taken action
        self.attempts += 1
        reward = 0
        done = False
        # If the guess is correct, +10 reward.
        if self.current_guess == self.target_word:
            reward = 10
            done = True
        # If some of the letters are correct, give intermediate reward for the number of correct letters [1,4]
        else:
            correct_letters = sum([1 for guessed_letter, target_letter in zip(self.current_guess, self.target_word) if guessed_letter == target_letter])
            reward = 1 * correct_letters
            
            self.attempts_left -= 1
            # If there is no attempts left, unsuccessful, -10 reward.
            if self.attempts_left <= 0:
                reward = -10
                done = True
                
        self.remove_incompatible_words(self.current_guess)

        return self.get_state(), reward, done, {}
    
    # In each turn, get the new state based on the correctness of the letters
    def get_state(self):
        #state = np.zeros(self.state_size, dtype=np.float32)
        state = self.current_state
        # Check each letter of the guess
        for idx, letter in enumerate(self.current_guess):
            # If correct location and letter (green), that is allocated for 0,25
            if letter == self.target_word[idx]:
                state[(ord(letter) - 65)] = 1
            # If only correct letter (yellow), allocated for second 26 indices.
            elif letter in self.target_word:
                state[(ord(letter) - 65) + 26] = 1
            # If the letter is not in the word, allocated for the last 26 indices.
            else:
                state[(ord(letter) - 65) + 26*2] = 1
        return state

    # Printing output purposes.
    def render(self):
        print(f"Current guess: {self.current_guess}")
        print(f"Target word: {self.target_word}")
        print(f"Attempts left: {self.attempts_left}")





In [86]:
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

# Initialize the environment (input subset size if necessary)
size = None
env = WordleEnv(subset_size=size)  # A random subset of size words will be used

# CUDA purposes for M1 Pro.

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print(x)
else:
    print ("MPS device not found.")

tensor([1.], device='mps:0')


In [87]:
# Replay memory object for replaying samples from the memory.

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [88]:
# DQN Agent's network
# 3 Layers, starts with the state space shape and ends with action space shape.

class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 64)
        self.layer2 = nn.Linear(64, 64)
        self.layer3 = nn.Linear(64, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [89]:
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.99
EPS_END = 0.05
EPS_DECAY = 150000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_size
# Get the number of state observations
state = env.reset()
n_observations = len(state)

policy_net = DQN(env.state_size, env.action_size).to(device)
target_net = DQN(env.state_size, env.action_size).to(device)


target_net.load_state_dict(policy_net.state_dict())

# Optimizer initialization
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0
# Epsilon greedy action selection
# Gradually decrease epsilon
# If epsilon is greater than the random sample, take random action
# Otherwise, take the action that gives the most Q value.
def select_action(state, available_actions, action_size):
    global steps_done
    sample = random.random()
    global eps_threshold
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # Create a mask tensor for previously chosen actions
            mask = torch.full((1, action_size), -float('inf'), device=device)
            for idx in available_actions:
                mask[0, idx] = 0

            # Add the mask to the DQN output and select the maximum value
            masked_output = policy_net(state) + mask
            return masked_output.max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.choice(available_actions)]], device=device, dtype=torch.long)


In [90]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1)[0].
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0]
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

In [91]:
num_episodes = 300000
average_reward = 0
for episode in range(num_episodes):
    # Initialize the environment and get it's state
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state, env.available_actions, env.action_size)
        observation, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        #print(observation)
        if done:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            average_reward += reward/1000
            if(episode % 1000 == 0):
                print(f"Episode: {episode}/{num_episodes}, Attempts: {env.attempts}, Reward: {reward[0]}")
                #print(f"Episode: {episode}/{num_episodes}, Attempts: {env.attempts}, Average Reward: {average_reward[0]}")
                average_reward = 0
            break

print('Complete')



Episode: 0/300000, Attempts: 6, Reward: 10
Episode: 1000/300000, Attempts: 6, Reward: -10
Episode: 2000/300000, Attempts: 6, Reward: 10
Episode: 3000/300000, Attempts: 6, Reward: -10
Episode: 4000/300000, Attempts: 5, Reward: 10
Episode: 5000/300000, Attempts: 2, Reward: 10
Episode: 6000/300000, Attempts: 6, Reward: -10
Episode: 7000/300000, Attempts: 6, Reward: 10
Episode: 8000/300000, Attempts: 5, Reward: 10
Episode: 9000/300000, Attempts: 6, Reward: 10
Episode: 10000/300000, Attempts: 6, Reward: -10
Episode: 11000/300000, Attempts: 6, Reward: -10
Episode: 12000/300000, Attempts: 6, Reward: -10
Episode: 13000/300000, Attempts: 6, Reward: 10
Episode: 14000/300000, Attempts: 6, Reward: 10
Episode: 15000/300000, Attempts: 6, Reward: 10
Episode: 16000/300000, Attempts: 5, Reward: 10
Episode: 17000/300000, Attempts: 6, Reward: -10
Episode: 18000/300000, Attempts: 5, Reward: 10
Episode: 19000/300000, Attempts: 5, Reward: 10
Episode: 20000/300000, Attempts: 6, Reward: -10
Episode: 21000/300

Episode: 173000/300000, Attempts: 4, Reward: 10
Episode: 174000/300000, Attempts: 6, Reward: 10
Episode: 175000/300000, Attempts: 6, Reward: 10
Episode: 176000/300000, Attempts: 6, Reward: -10
Episode: 177000/300000, Attempts: 6, Reward: 10
Episode: 178000/300000, Attempts: 6, Reward: -10
Episode: 179000/300000, Attempts: 3, Reward: 10
Episode: 180000/300000, Attempts: 4, Reward: 10
Episode: 181000/300000, Attempts: 3, Reward: 10
Episode: 182000/300000, Attempts: 4, Reward: 10
Episode: 183000/300000, Attempts: 4, Reward: 10
Episode: 184000/300000, Attempts: 6, Reward: 10
Episode: 185000/300000, Attempts: 5, Reward: 10
Episode: 186000/300000, Attempts: 4, Reward: 10
Episode: 187000/300000, Attempts: 6, Reward: -10
Episode: 188000/300000, Attempts: 4, Reward: 10
Episode: 189000/300000, Attempts: 6, Reward: -10
Episode: 190000/300000, Attempts: 6, Reward: -10
Episode: 191000/300000, Attempts: 6, Reward: 10
Episode: 192000/300000, Attempts: 6, Reward: 10
Episode: 193000/300000, Attempts: 5

In [92]:
total_attempts = 0
correct_guesses = 0

no_test_trials = 1000

eps_threshold = 1e-11 #For only exploration

for episode in range(no_test_trials):
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

    for t in count():
        action = select_action(state, env.available_actions, env.action_size)
        observation, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        
        if done:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        state = next_state
        total_attempts += 1

        if done:
            if(reward[0] == 10):
                correct_guesses += 1
            break

success_rate = correct_guesses / (no_test_trials)
average_attempts = total_attempts / (no_test_trials)

print(f"Trials: {no_test_trials}, Success rate: {success_rate:.2f}, Average number of attempts: {average_attempts:.2f}")

Trials: 1000, Success rate: 0.66, Average number of attempts: 5.09


In [94]:
total_attempts = 0
correct_guesses = 0

no_test_trials = 1000

eps_threshold = 1e-11 #For only exploration

for episode in range(no_test_trials):
    state = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    
    action = torch.tensor([[345]], device=device, dtype=torch.long) #Salet start.
    for t in count():
        observation, reward, done, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        
        if done:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        state = next_state
        total_attempts += 1

        if done:
            if(reward[0] == 10):
                correct_guesses += 1
            break
        action = select_action(state, env.available_actions, env.action_size)

success_rate = correct_guesses / (no_test_trials)
average_attempts = total_attempts / (no_test_trials)

print(f"Trials with SALET start: {no_test_trials}, Success rate: {success_rate:.2f}, Average number of attempts: {average_attempts:.2f}")

Trials with SALET start: 1000, Success rate: 0.71, Average number of attempts: 4.95


In [133]:
## AN INSTANT WHERE BOTH AGENT'S FIRST WORD (SLOSH) AND SALET PERFORM THE SAME

In [131]:
state = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

for t in count():
    action = select_action(state, env.available_actions, env.action_size)
    observation, reward, done, _ = env.step(action.item())
    reward = torch.tensor([reward], device=device)
    env.render()
    if done:
        next_state = None
    else:
        next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
    if done:
        break

    state = next_state

Current guess: SLOSH
Target word: THORN
Attempts left: 5
Current guess: BOOZY
Target word: THORN
Attempts left: 4
Current guess: ATOLL
Target word: THORN
Attempts left: 3
Current guess: ERODE
Target word: THORN
Attempts left: 2
Current guess: THORN
Target word: THORN
Attempts left: 2


In [132]:
state = env.reset()
env.target_word = 'THORN'
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
action = torch.tensor([[344]], device=device, dtype=torch.long) #Salet start.

for t in count():
    observation, reward, done, _ = env.step(action.item())
    reward = torch.tensor([reward], device=device)
    env.render()
    if done:
        next_state = None
    else:
        next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
    if done:
        break
    action = select_action(state, env.available_actions, env.action_size)
    state = next_state

Current guess: SALET
Target word: THORN
Attempts left: 5
Current guess: KEBAB
Target word: THORN
Attempts left: 4
Current guess: TRAWL
Target word: THORN
Attempts left: 3
Current guess: TWINE
Target word: THORN
Attempts left: 2
Current guess: THORN
Target word: THORN
Attempts left: 2


In [None]:
## AN INSTANT WHERE SALET FIRST WORD PERFORM BETTER THAN SLOSH

In [141]:
state = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

for t in count():
    action = select_action(state, env.available_actions, env.action_size)
    observation, reward, done, _ = env.step(action.item())
    reward = torch.tensor([reward], device=device)
    env.render()
    if done:
        next_state = None
    else:
        next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
    if done:
        break

    state = next_state

Current guess: SLOSH
Target word: FRONT
Attempts left: 5
Current guess: BOOZY
Target word: FRONT
Attempts left: 4
Current guess: ATOLL
Target word: FRONT
Attempts left: 3
Current guess: ERODE
Target word: FRONT
Attempts left: 2
Current guess: GROIN
Target word: FRONT
Attempts left: 1
Current guess: FRONT
Target word: FRONT
Attempts left: 1


In [142]:
state = env.reset()
env.target_word = 'FRONT'
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
action = torch.tensor([[344]], device=device, dtype=torch.long) #Salet start.

for t in count():
    observation, reward, done, _ = env.step(action.item())
    reward = torch.tensor([reward], device=device)
    env.render()
    if done:
        next_state = None
    else:
        next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
    if done:
        break
    action = select_action(state, env.available_actions, env.action_size)
    state = next_state

Current guess: SALET
Target word: FRONT
Attempts left: 5
Current guess: POINT
Target word: FRONT
Attempts left: 4
Current guess: CHANT
Target word: FRONT
Attempts left: 3
Current guess: FRONT
Target word: FRONT
Attempts left: 3


In [None]:
## AN INSTANT WHERE SLOSH PERFORM BETTER THAN SALET

In [157]:
state = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

for t in count():
    action = select_action(state, env.available_actions, env.action_size)
    observation, reward, done, _ = env.step(action.item())
    reward = torch.tensor([reward], device=device)
    env.render()
    if done:
        next_state = None
    else:
        next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
    if done:
        break

    state = next_state

Current guess: SLOSH
Target word: CHOKE
Attempts left: 5
Current guess: BOOZY
Target word: CHOKE
Attempts left: 4
Current guess: WROTE
Target word: CHOKE
Attempts left: 3
Current guess: CHOKE
Target word: CHOKE
Attempts left: 3


In [158]:
state = env.reset()
env.target_word = 'CHOKE'
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
action = torch.tensor([[344]], device=device, dtype=torch.long) #Salet start.

for t in count():
    observation, reward, done, _ = env.step(action.item())
    reward = torch.tensor([reward], device=device)
    env.render()
    if done:
        next_state = None
    else:
        next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
    if done:
        break
    action = select_action(state, env.available_actions, env.action_size)
    state = next_state

Current guess: SALET
Target word: CHOKE
Attempts left: 5
Current guess: KEBAB
Target word: CHOKE
Attempts left: 4
Current guess: TRAWL
Target word: CHOKE
Attempts left: 3
Current guess: ACUTE
Target word: CHOKE
Attempts left: 2
Current guess: DODGE
Target word: CHOKE
Attempts left: 1
Current guess: ELOPE
Target word: CHOKE
Attempts left: 0
