In [None]:
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

In [None]:
class DeepQNetwork(nn.Module):
  def __init__(self, lr, input_dims, n_actions, fc1_dims, fc2_dims):
    super(DeepQNetwork, self).__init__()
    self.input_dims = input_dims
    self.fc1_dims = fc1_dims
    self.fc2_dims = fc2_dims
    self.n_actions = n_actions
    self.fc1 = nn.Linear(*self.input_dims, self.fc1_dims)
    self.fc2 = nn.Linear(self.fc1_dims, self.fc2_dims)
    self.fc3 = nn.Linear(self.fc2_dims, self.n_actions)
    self.optimizer = optim.Adam(self.parameters(), lr=lr)
    self.loss = nn.MSELoss()
    self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
    self.to(self.device)

  def forward(self, state):
    x = F.relu(self.fc1(state))
    x = F.relu(self.fc2(x))
    actions = self.fc3(x)

    return actions

In [None]:
class Agent():
  def __init__(self, gamma, epsilon, lr, input_dims, batch_size, n_actions, max_mem_size=100000, eps_end=0.01, eps_dec=5e-4):
    self.gamma = gamma
    self.epsilon = epsilon
    self.eps_min = eps_end
    self.eps_dec = eps_dec
    self.lr = lr
    self.action_space=[i for i in range(n_actions)]
    self.mem_size = max_mem_size
    self.batch_size = batch_size
    self.mem_cntr = 0

    self.Q_eval = DeepQNetwork(self.lr, n_actions=n_actions, input_dims=input_dims, fc1_dims=256, fc2_dims=256)
    self.state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
    self.new_state_memory = np.zeros((self.mem_size, *input_dims), dtype=np.float32)
    self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
    self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
    self.terminal_memory = np.zeros(self.mem_size, dtype=np.bool)

  def convert_to_index(self, action):
    action = [str(i) for i in action]
    return int("".join(action), base=6)

  def store_transition(self, state, action, reward, state_, done):
    index = self.mem_cntr & self.mem_size
    self.state_memory[index] = state
    self.new_state_memory[index] = state_
    self.reward_memory[index] = reward
    self.action_memory[index] = self.convert_to_index(action)
    self.terminal_memory[index] = done

    self.mem_cntr += 1

  def choose_action(self, observation):
    if np.random.random() > self.epsilon:
      state = T.tensor([observation],dtype=T.float, device=self.Q_eval.device).to(self.Q_eval.device)
      actions = self.Q_eval.forward(state)
      action = T.argmax(actions).item()
    else:
      action = np.random.choice(self.action_space)
    
    digits = []
    while action > 0:
      digits.append(int(action)%6)
      action = action // 6
    digits = digits + [0]*(4-len(digits))
    return tuple(digits)

  def learn(self):
    if self.mem_cntr < self.batch_size:
      return
    
    self.Q_eval.optimizer.zero_grad()
    max_mem = min(self.mem_cntr, self.mem_size)
    batch = np.random.choice(max_mem, self.batch_size, replace=False)

    batch_index = np.arange(self.batch_size, dtype=np.int32)
    state_batch = T.tensor(self.state_memory[batch]).to(self.Q_eval.device)
    new_state_batch = T.tensor(self.new_state_memory[batch]).to(self.Q_eval.device)
    reward_batch = T.tensor(self.reward_memory[batch]).to(self.Q_eval.device)
    terminal_batch = T.tensor(self.terminal_memory[batch]).to(self.Q_eval.device)

    action_batch = self.action_memory[batch]

    q_eval = self.Q_eval.forward(state_batch)[batch_index, action_batch]
    q_next = self.Q_eval.forward(new_state_batch)
    q_next[terminal_batch] = 0.0

    q_target = reward_batch + self.gamma * T.max(q_next, dim=1)[0]
    loss = self.Q_eval.loss(q_target, q_eval).to(self.Q_eval.device)
    loss.backward()
    self.Q_eval.optimizer.step()

    self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min \
                    else self.eps_min

In [None]:
# This code is taken from https://github.com/jefio/gym-mastermind. This gives us the ability to use OpenAI Gym

from collections import Counter
import logging

import gym
from gym.utils import seeding
from gym import spaces


logger = logging.getLogger(__name__)


class MastermindEnv(gym.Env):
    """
    Guess a 4-digits long password where each digit is between 0 and 5.
    After each step the agent is provided with a 4-digits long tuple:
    - '2' indicates that a digit has been correclty guessed at the correct position.
    - '1' indicates that a digit has been correclty guessed but the position is wrong.
    - '0' otherwise.
    The rewards at the end of the episode are:
    0 if the agent's guess is incorrect
    1 if the agent's guess is correct
    The episode terminates after the agent guesses the target or
    12 steps have been taken.
    """
    values = 6
    size = 4
    guess_max = 12

    def __init__(self):
        self.target = None
        self.guess_count = None
        self.observation = None

        self.observation_space = spaces.Tuple(
            [spaces.Discrete(3) for _ in range(self.size)])
        self.action_space = spaces.Tuple(
            [spaces.Discrete(self.values) for _ in range(self.size)])

        self.seed()
        self.reset()

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def get_observation(self, action):
        match_idxs = set(idx for idx, ai in enumerate(action) if ai == self.target[idx])
        n_correct = len(match_idxs)
        g_counter = Counter(self.target[idx] for idx in range(self.size) if idx not in match_idxs)
        a_counter = Counter(action[idx] for idx in range(self.size) if idx not in match_idxs)
        n_white = sum(min(g_count, a_counter[k])for k, g_count in g_counter.items())
        return tuple([0] * (self.size - n_correct - n_white) + [1] * n_white + [2] * n_correct)

    def step(self, action):
        assert self.action_space.contains(action)
        self.guess_count += 1
        done = action == self.target or self.guess_count >= self.guess_max
        match_idxs = set(idx for idx, ai in enumerate(action) if ai == self.target[idx])
        g_counter = Counter(self.target[idx] for idx in range(self.size) if idx not in match_idxs)
        a_counter = Counter(action[idx] for idx in range(self.size) if idx not in match_idxs)
        n_correct = len(match_idxs)
        n_white = sum(min(g_count, a_counter[k])for k, g_count in g_counter.items())

        reward = n_correct + 0.01*n_white - 0.2*(self.size - n_correct - n_white)

        if (action == self.target):
          reward += 10
          print("Correct")

        return self.get_observation(action), reward, done, {}

    def reset(self):
        self.target = self.action_space.sample()
        logger.debug("target=%s", self.target)
        self.guess_count = 0
        self.observation = (0,) * self.size
        return self.observation

In [None]:
from gym.envs.registration import register

register(
    id='Mastermind-v0',
    entry_point=MastermindEnv,
)

  logger.warn(f"Overriding environment {spec.id}")


In [None]:
env = gym.make('Mastermind-v0')
agent = Agent(gamma=0.99, epsilon=1.0, batch_size=64, n_actions=6**4-1, eps_end=0.01, input_dims=[4], lr=0.003)
scores, eps_history = [], []
n_games = 500

for i in range(n_games):
  score = 0
  done = False
  observation = env.reset()
  while not done:
    action = agent.choose_action(observation)
    observation_, reward, done, info = env.step(action)
    score += reward
    agent.store_transition(observation, action, reward, observation_, done)
    agent.learn()
    observation = observation_
  scores.append(score)
  eps_history.append(agent.epsilon)

  avg_score = np.mean(scores[-100:])

  print('Episode', i, 'score %.2f' % score, 'average score %.2f' % avg_score, 'epsilon %.2f' % agent.epsilon)
  x = [i+1 for i in range(n_games)]

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations


Episode 0 score 5.19 average score 5.19 epsilon 1.00
Episode 1 score 2.46 average score 3.83 epsilon 1.00
Episode 2 score 2.16 average score 3.27 epsilon 1.00
Episode 3 score 1.74 average score 2.89 epsilon 1.00
Episode 4 score 3.78 average score 3.07 epsilon 1.00
Episode 5 score 3.30 average score 3.10 epsilon 1.00
Episode 6 score 0.33 average score 2.71 epsilon 0.99
Episode 7 score 3.45 average score 2.80 epsilon 0.98
Episode 8 score 2.52 average score 2.77 epsilon 0.98
Episode 9 score 1.53 average score 2.65 epsilon 0.97
Episode 10 score -0.09 average score 2.40 epsilon 0.97
Episode 11 score -2.85 average score 1.96 epsilon 0.96
Episode 12 score 3.93 average score 2.11 epsilon 0.95
Episode 13 score 7.68 average score 2.51 epsilon 0.95
Episode 14 score 7.11 average score 2.82 epsilon 0.94
Episode 15 score 4.41 average score 2.92 epsilon 0.94
Episode 16 score 3.36 average score 2.94 epsilon 0.93
Episode 17 score 0.48 average score 2.80 epsilon 0.92
Episode 18 score -1.29 average score