# INTRODUCTION

This version of our code trains the agent to solve a 9x9 Minesweeper grid with 10 mines. It has issues when it runs in that the agent starts to pick the same action over and over again

In [None]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Conv2D, MaxPooling2D, Activation, Flatten
from keras.callbacks import TensorBoard
from tensorflow.keras.optimizers import Adam
from collections import deque
import numpy as np
import tensorflow as tf
import keras
import math

#Minesweeper Environment

In [None]:
#@title Output { vertical-output: true }
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display

UNKNOWN = 9 # unknown (covered) cell is indentified by value 9

class MinesweeperEnv:

    def __init__(self, rows=9, columns=9, mines=10):

        self.rows = rows
        self.columns = columns
        self.mines = mines
        self.grid = None # grid that contains mine locations
        self.state = np.full((rows, columns), UNKNOWN)  # state that shows cells as covered or uncovered
        self.state_one_hot = (np.arange(UNKNOWN+1) == self.state[...,None]).astype(int) # https://stackoverflow.com/questions/36960320/convert-a-2d-matrix-to-a-3d-one-hot-matrix-numpy
        self.reset_step = True
        self.unchosen_actions = np.ones((rows, columns)) # array that indicates cells that have been chosen

    # creates grid of mines and values indicating neighbouring mines
    def createGrid(self, start_row, start_column):
        grid = np.zeros((self.rows, self.columns), dtype=int)
        mines = self.mines

        while mines > 0:
            # randomly generate a cell location for the mine
            row = np.random.randint(0, self.rows)
            column = np.random.randint(0, self.columns)

            # if the cell location does not already contain a mine and is not the starting cell
            if (grid[row][column] == 0) and (row != start_row and column != start_column) :
                # place mine
                grid[row][column] = -1
                mines -= 1

        # fill remaining cells with values indicating neighbouring mines
        for row in range(self.rows):
            for column in range(self.columns):
                if grid[row][column] != -1: # if it is not a mine
                    grid[row][column] = self.surroundingMines(row, column, grid)

        self.grid = grid.copy()

    # counts mines in 8 cells surrounding cell at (row,column) given
    def surroundingMines(self, row, column, grid):
        mine_count = 0
        for r in [row-1, row, row+1]:
            for c in [column-1, column, column+1]:
                if (0 <= r < self.rows) and (0 <= c < self.columns):
                    if grid[r][c] == -1:
                        mine_count += 1
        return mine_count

    # counts the cells that have been uncovered that surround the cell at (row, column) given
    def anySurroundingRevealed(self, row, column):
        for r in [row-1, row, row+1]:
            for c in [column-1, column, column+1]:
                if (0 <= r < self.rows) and (0 <= c < self.columns) and not(r == row and c == column):
                    if self.state[r][c] != UNKNOWN:
                        return True
        return False

    # calculates one hot encoding for value at (row, column) given
    def updateOneHot(self, row, column):
        one_hot_val = np.zeros(UNKNOWN + 1, dtype=int)
        one_hot_val[self.grid[row, column]] = 1
        self.state_one_hot[row, column] = one_hot_val

    # environment time step
    def step(self, action):

        terminal = False

        # if this is the first move of the game: create board
        if self.reset_step:
            self.createGrid(action[0], action[1])
        
        # if action has already been chosen
        elif self.state[action[0], action[1]] != UNKNOWN:
            reward = -0.3
            return reward, terminal # don't need to update state or unchosen actions

        # update state representation with value in grid
        self.state[action[0], action[1]] = self.grid[action[0], action[1]]

        # if this is the first move of the game: 
        if self.reset_step:
            self.reset_step = False
            self.updateOneHot(action[0], action[1])
            reward = -0.3

        # if the action reveals a mine
        elif self.grid[action[0]][action[1]] == -1:
            self.reset()
            reward = -1
            terminal = True

        # if the game is in a winning state
        elif self.win():
            self.reset()
            reward = 1
            terminal = True

        # if the action taken already has surrounding cells revealed
        elif self.anySurroundingRevealed(action[0], action[1]):
            self.updateOneHot(action[0], action[1])
            reward = 0.3

        # if the action taken has no surrounding cells revealed
        else:
            self.updateOneHot(action[0], action[1])
            reward = -0.3

        # set action as chosen
        self.unchosen_actions[action[0], action[1]] = 0

        return reward, terminal

    # determines if in winning state
    def win(self):
        covered = np.where(self.state == UNKNOWN)
        mines = np.where(self.grid == -1)
        if list(zip(covered[0], covered[1])) == list(zip(mines[0], mines[1])): # if the only covered cells are found at the same cells as mines
            return True
        return False

    # reset environment
    def reset(self):
        self.reset_step = True
        self.grid = None
        self.state = np.full((self.rows, self.columns), UNKNOWN)
        self.state_one_hot = (np.arange(UNKNOWN+1) == self.state[...,None]).astype(int)
        self.unchosen_actions = np.ones((self.rows, self.columns))

    # draw environment
    def drawEnv(self):
        moves = 0
        data = np.full((self.rows, self.columns),2)
        fig, ax = plt.subplots()
        for i in range(self.rows+1):
          ax.axhline(i, lw=2, color='k')
          ax.axvline(i, lw=2, color='k')
        ax.imshow(data, cmap="Greys", vmin=0, vmax=5, extent=[0, 9, 9, 0])
        ax.axis('off')
        display(fig)
        return fig, ax, data, moves
  
    # update drawing of environment
    def updateDrawing(self, fig, ax, data, moves, x, y):
        moves += 1
        if self.grid is None:
          if moves == (self.rows*self.columns)-self.mines:
            print("WON: all cells uncovered")
            ax.text(y+0.2,x+0.5,"WON",c="green")
          else:
            print("LOST: hit a mine at location", (x,y))
            ax.text(y+0.5,x+0.5,"M",c="red")
        else:
          ax.text(y+0.5,x+0.5,self.grid[x,y])
        data[x,y] = 1
        ax.imshow(data, cmap="Greys", vmin=0, vmax=5, extent=[0, 9, 9, 0])
        display(fig)
        return fig, ax, data, moves


# This is to manually test the environment
'''
env = MinesweeperEnv()
fig, ax, data, moves = env.drawEnv()
x = int(input('Enter x value: '))
y = int(input('Enter y value: '))
while x != -1:
    print('Reward: ', env.step((x, y)))
    print('Grid:\n', env.grid)
    print('What agent sees:\n', env.state)
    fig, ax, data, moves = env.updateDrawing(fig, ax, data, moves, x, y)
    x = int(input('Enter x value: '))
    y = int(input('Enter y value: '))
'''


#Deep Q-Learning Agent


In [None]:
from collections import deque
from tensorflow.keras import layers
from tensorflow.keras import Model
from tensorflow.keras import optimizers
import random

GAMMA = 0.99
MINIBATCH_SIZE = 32
TARGET_MODEL_UPDATE_TIME = 10
MIN_REPLAY_MEMORY_SIZE = 1000

class DQAgent:

  def __init__(self, rows=9, columns=9): #initialize replay memory, main nn and target nn
    self.rows = rows
    self.columns = columns
    
    self.replayMem = deque(maxlen=10000)
    self.epsilon = 0.1
    self.learningRate = 0.001

    self.mainModel = self.createModel()
    self.targetModel = self.createModel()
    self.targetModel.set_weights(self.mainModel.get_weights())

    self.targetModelUpdateCounter = 0

  # create neural network for q-values
  def createModel(self):
    model = Sequential([
                Conv2D(128, (3,3), activation='relu', padding='same', input_shape=(9, 9, 10)),
                Conv2D(128, (3,3), activation='relu', padding='same'),
                Conv2D(128, (3,3), activation='relu', padding='same'),
                Conv2D(128, (3,3), activation='relu', padding='same'),
                Flatten(),
                Dense(512, activation='relu'),
                Dense(512, activation='relu'),
                Dense(81, activation='linear')])
    

    # Previously tried neural network setup
    '''
    init = tf.keras.initializers.RandomNormal(mean = 0.5, stddev=0.5)

    model = Sequential()
    
    model.add(keras.layers.Dense(810, input_shape=(9,9,10), activation='relu', kernel_initializer=init)) #self.rows * self.columns * 10
    model.add(keras.layers.Dense(256, activation='relu', kernel_initializer=init)) #round(np.sqrt(self.rows**2 * self.columns**2 * 10))
    model.add(Flatten())
    model.add(keras.layers.Dense(81, activation='linear', kernel_initializer=init)) #self.rows * self.columns
    '''

    # displays model summary
    print(model.summary())

    model.compile(loss="mean_squared_error", optimizer=tf.keras.optimizers.Adam(learning_rate=self.learningRate), metrics=['accuracy'])
    return model

  # adds new transition to replay memory
  def updateReplayMemory(self, currentState, action, reward, nextState, terminalState):
    self.replayMem.append((currentState, action, reward, nextState, terminalState))

  # training step for agent
  def train(self, terminal):

    # replay memory must be at least minimum replay memory size before we start sampling from it
    if len(self.replayMem) < MIN_REPLAY_MEMORY_SIZE:
      return

    # get minibatch of (state, action, reward, next_state) from replay memory
    minibatch = random.sample(self.replayMem, MINIBATCH_SIZE)

    #create list containing each state from each minibatch
    #create list containing each next state from minibatch
    currentStates = []
    nextStates = []
    for sample in minibatch:
      currentStates.append(sample[0])
      nextStates.append(sample[3])

    # query main nn for q values of current states
    currentQVals = self.mainModel.predict(np.array(currentStates))
    # query target nn for q values of next states
    nextQVals = self.targetModel.predict(np.array(nextStates))

    states = []
    updatedQVals = []

    # for each transition in the minibatch
    for index, (currentState, action, reward, nextState, terminalState) in enumerate(minibatch):
      # if next state is a winning game or losing game, update q value with just reward
      if terminalState:
        updatedQVal = reward 
      else:
        # otherwise update q value with q-learning update using q values of next states
        updatedQVal = reward + (GAMMA * np.max(nextQVals[index]))

      currentQsForState = currentQVals[index]
      currentQsForState[(action[0] * self.columns) + action[1]] = updatedQVal #updates the q val for the current state, current action

      states.append(currentState)
      updatedQVals.append(currentQsForState)

    # fit main nn with current states and updated q values
    self.mainModel.fit(np.array(states), np.array(updatedQVals), batch_size=MINIBATCH_SIZE, verbose=0, shuffle=False) 
    
    if terminal:
      self.targetModelUpdateCounter += 1

    # update target nn as main nn if it is time to do so
    if self.targetModelUpdateCounter > TARGET_MODEL_UPDATE_TIME:
      self.targetModel.set_weights(self.mainModel.get_weights())
      self.targetModelUpdateCounter = 0



###Algorithm for deep Q Networks: (From [DQN Nature Paper](https://storage.googleapis.com/deepmind-media/dqn/DQNNaturePaper.pdf))

```
Initialize replay memory D to capacity N
Initialize action-value function Q with random weights θ
Initialize target action-value function Q* with weights θ̄  = θ
For episode = 1,M do
  Initialize sequence s1 = {x1} and preprocessed sequence φ1 = φ(s1)  #preprocess NN inputs
  For t = 1,T do
    With probability 𝜀 select a random action aₜ
    otherwise select aₜ = argmaxₐQ(φ(sₜ), a; θ)
    Execute action aₜ in emulator and observe reward rₜ (and image xₙₑₓₜ ₜ)  #image is vector (pixels)
    Set sₙₑₓₜ ₜ = sₜ,aₜ,xₙₑₓₜ ₜ and preprocess φₙₑₓₜ ₜ = φ(sₙₑₓₜ ₜ)
    Store transition (φₜ, aₜ, rₜ, φₙₑₓₜ ₜ) in D

```
**Agent update step:**

        Sample random minibatch of transitions (φⱼ, aⱼ, rⱼ, φₙₑₓₜ ⱼ) from D
        Set yⱼ = rⱼ if episode terminates at step (next j)  #yⱼ is Q-value
        otherwise set yⱼ = rⱼ + γmaxₐ(φₙₑₓₜ ⱼ, a'; θ̄ )
        Perform a gradient descent step on (yⱼ - Q(φⱼ, aⱼ; θ))^2 with respect to the network parameters θ
        Every C steps reset Q* = Q **

```
  End For
End For
```


#Execution Code

In [None]:
# determines which action the agent chooses by epsilon-greedy
def chooseAction(agent, state, unchosen):
  
  # choose random action if random value is less than epsilon
  if np.random.random() < agent.epsilon:
    actions = np.argwhere(unchosen) # get indices of all unchosen cells
    index = np.random.randint(0,len(actions)) # generate a random index
    action = actions[index] # get unchosen action as that index

  # choose optimal action according to model
  else:
    pred = agent.mainModel.predict(np.array(state).reshape(-1, *state.shape)) #NOTE: we believe this could have caused the repeated action picking issue
    unchosen = unchosen.reshape(1, 81) 
    pred[unchosen == 0] = np.min(pred) # set the recieved q-values for all chosen cells as the minimum q-value so they don't get chosen as the optimal 
    index = np.argmax(pred) # get index of optimal action
    action = convertIndexToAction(index, agent)
  return action


# converts the index from the (1,81) 1D array to an index for a (9,9) 2D array
def convertIndexToAction(index, agent):
  row = math.floor(index / agent.columns)
  column = index - (row * agent.columns)
  return (row, column)


# trains agent for a number of episodes
def train(agent, env, num_episodes=10000):
  rewards = []
  episodes = []
  steps = []

  #loop through episodes
  for episode in range(num_episodes):
    step = 0

    #get initial state
    current_state = env.state_one_hot

    terminal = False
    sumRewards = 0

    #loop through step in episode
    while not terminal:
      step += 1

      #choose next action based on current state
      action = chooseAction(agent, current_state, env.unchosen_actions)

      #execute action with env.step(action) and get reward and next_state
      reward, terminal = env.step(action)
      sumRewards += reward
      next_state = env.state_one_hot

      #update agent's replay memory with (current_state, action, reward, next_state)
      agent.updateReplayMemory(current_state, action, reward, next_state, terminal)

      #train agent
      agent.train(terminal)

      current_state = next_state

    print(episode) # indicates episode number

    rewards.append(sumRewards)
    episodes.append(episode)
    steps.append(step)
  
  # creates plots
  plt.clf()
  plt.plot(episodes,rewards)
  plt.show()
  plt.plot(episodes, steps)
  plt.show()


#evaluates agent for a number of episodes
def evaluate(agent, env, num_episodes=1):
  agent.epsilon = 0 # no random choice in evaluation

  #loop through episodes
  for episode in range(num_episodes):

    #get initial state
    current_state = env.state_one_hot
    fig, ax, data, moves = env.drawEnv()
    terminal = False

    #loop through step in episode
    while not terminal:

      #choose next action based on current state
      action = chooseAction(agent, current_state, env.unchosen_actions)
      print(action)

      #execute action with env.step(action) and get reward and next_state
      reward, terminal = env.step(action)
      fig, ax, data, moves = env.updateDrawing(fig, ax, data, moves, action[0], action[1])
      next_state = env.state_one_hot

      current_state = next_state


In [None]:
dq_agent = DQAgent()
environment = MinesweeperEnv()

train(dq_agent, environment)

In [None]:
evaluate(dq_agent, environment)

https://github.com/jakejhansen/minesweeper_solver 

https://sdlee94.github.io/Minesweeper-AI-Reinforcement-Learning/ 

https://hanialmousli.wordpress.com/2017/10/11/minesweeper-using-reinforcement-learning/

https://www.researchgate.net/profile/Preslav-Nakov/publication/228613592_Minesweeper_Minesweeper/links/00b7d523c1308589ef000000/Minesweeper-Minesweeper.pdf

http://volweb.utk.edu/~mmajumde/research_web/ECE517final.pdf

https://pythonprogramming.net/training-deep-q-learning-dqn-reinforcement-learning-python-tutorial/?completed=/deep-q-learning-dqn-reinforcement-learning-python-tutorial/

https://storage.googleapis.com/deepmind-media/dqn/DQNNaturePaper.pdf