In [1]:
import sys
from six import StringIO
from random import randint

import numpy as np
import gym
from gym import spaces

# cell values, non-negatives indicate number of neighboring mines
MINE = [-1]
CLOSED = [9]

def stringify(board):
  # int(bool()) instead of string; convert to tertiary
  # print(''.join([str(board[x][y]) for x in range(len(board)) for y in range(len(board))]))
  return ''.join([str(board[x][y]) if board[x][y] == 0 or board[x][y] == CLOSED else '1' for x in range(len(board)) for y in range(len(board))])


def is_new_move(my_board, x, y):
    return my_board[x][y] == CLOSED

def is_valid(size, x, y):
    return (x >= 0) & (x < size) & (y >= 0) & (y < size)


def is_win(my_board, num_mines):
    return np.count_nonzero(my_board == CLOSED) == num_mines


def is_mine(board, x, y):
    return board[x, y] == MINE


def place_mines(board_size, num_mines):
    mines_placed = 0
    board = np.zeros((board_size, board_size), dtype=object)
    while mines_placed < num_mines:
        rnd = randint(0, board_size * board_size)
        x = int(rnd / board_size)
        y = int(rnd % board_size)
        if is_valid(board_size, x, y) and not (x == 0 and y == 0):
            if not is_mine(board, x, y):
                board[x][y] = MINE
                mines_placed += 1
    return board


class MinesweeperDiscreetEnv(gym.Env):
    metadata = {"render.modes": ["ansi", "human"]}

    def __init__(self, board_size, num_mines):
        self.board_size = board_size
        self.num_mines = num_mines
        self.board = place_mines(board_size, num_mines)
        self.bs = (self.board_size, self.board_size, 1)
        # self.board2 = [[x] for x in y for y in board]
        self.my_board = np.ones((board_size, board_size, 1), dtype=int) * CLOSED
        # print(self.my_board)
        # exit()
        self.num_actions = 0

#       -2 here? 
        self.observation_space = spaces.Box(low=-2, high=9,
                                            shape=(self.board_size, self.board_size), dtype=np.int)
        self.action_space = spaces.Discrete(self.board_size*self.board_size)
        self.valid_actions = np.ones((self.board_size * self.board_size), dtype=np.bool)

    def count_neighbour_mines(self, x, y):
        neighbour_mines = 0
        for _x in range(x - 1, x + 2):
            for _y in range(y - 1, y + 2):
                if is_valid(self.board_size, _x, _y):
                    if is_mine(self.board, _x, _y):
                        neighbour_mines += 1
        return neighbour_mines

    def open_neighbour_cells(self, my_board, x, y):
        for _x in range(x-1, x+2):
            for _y in range(y-1, y+2):
                if is_valid(self.board_size, _x, _y):
                    if is_new_move(my_board, _x, _y):
                        my_board[_x][_y] = self.count_neighbour_mines(_x, _y)
                        if my_board[_x][_y] == 0:
                            my_board = self.open_neighbour_cells(my_board, _x, _y)
        return my_board

    def get_next_state(self, state, x, y):
        my_board = state
        game_over = False
        if is_mine(self.board, x, y):
            my_board[x][y] = MINE
            game_over = True
        else:
            my_board[x][y] = self.count_neighbour_mines(x, y)
            if my_board[x][y] == [0]:
                my_board = self.open_neighbour_cells(my_board, x, y)
        self.my_board = my_board
        return my_board, game_over

    def randomAction(self, state):
      action = 0
      while state[int(action / self.board_size)][action % self.board_size] != CLOSED:
        # print('trying random', state[int(action / self.board_size)][action % self.board_size], CLOSED)
        action = self.action_space.sample()
      return action

    def reset(self):
        self.my_board = np.ones((self.board_size, self.board_size, 1), dtype=int) * CLOSED
        self.board = place_mines(self.board_size, self.num_mines)
        self.bs = (self.board_size, self.board_size, 1)
        # self.board2 = [[x] for x in y for y in board]
        self.num_actions = 0
        self.valid_actions = np.ones((self.board_size * self.board_size), dtype=bool)

        return self.my_board

    def step(self, action):
        state = self.my_board
        x = int(action / self.board_size)
        y = int(action % self.board_size)

        next_state, reward, done, info = self.next_step(state, x, y)
        self.my_board = next_state
        self.num_actions += 1
        self.valid_actions = (next_state.flatten() == CLOSED)
        info['valid_actions'] = self.valid_actions
        info['num_actions'] = self.num_actions
        return next_state, reward, done, info

    def next_step(self, state, x, y):
        my_board = state
        if not is_new_move(my_board, x, y):
            # print("repeat", my_board, x, y)
            return my_board, -0.2, False, {}
        while True:
            state, game_over = self.get_next_state(my_board, x, y)
            if not game_over:
                if is_win(state, self.num_mines):
                    return state, 1, True, {}
                else:
                    return state, 0.2, False, {}
            else:
                return state, -1, True, {}

    def render(self, mode='human'):
        outfile = StringIO() if mode == 'ansi' else sys.stdout
        s = stringify(self.my_board)
        outfile.write(s)
        if mode != 'human':
            return outfile

In [4]:
from keras.models import Sequential
from keras.layers import Conv2D, Dense, Flatten
from keras.optimizers import adam_v2

def create_dqn(learn_rate, input_dims, n_actions, conv_units, dense_units):
    model = Sequential([
                Conv2D(conv_units, (3,3), activation='relu', padding='same', input_shape=input_dims),
                Conv2D(conv_units, (3,3), activation='relu', padding='same'),
                Conv2D(conv_units, (3,3), activation='relu', padding='same'),
                Conv2D(conv_units, (3,3), activation='relu', padding='same'),
                Flatten(),
                Dense(dense_units, activation='relu'),
                Dense(dense_units, activation='relu'),
                Dense(n_actions, activation='linear')])

    model.compile(optimizer=adam_v2.Adam(lr=learn_rate, epsilon=1e-4), loss='mse')

    return model

In [5]:
import os, sys, random

ROOT = os.getcwd()
sys.path.insert(1, f'{os.path.dirname(ROOT)}')

import warnings
warnings.filterwarnings('ignore')

from collections import deque

import tensorflow as tf
from keras.callbacks import TensorBoard

# Environment settings
MEM_SIZE = 50_000 # number of moves to store in replay buffer
MEM_SIZE_MIN = 1_000 # min number of moves in replay buffer

# Learning settings
BATCH_SIZE = 64
learn_rate = 0.01
LEARN_DECAY = 0.99975
LEARN_MIN = 0.001
DISCOUNT = 0.1 #gamma

# Exploration settings
epsilon = 0.95
EPSILON_DECAY = 0.999975
EPSILON_MIN = 0.01

# DQN settings
CONV_UNITS = 64 # number of neurons in each conv layer
DENSE_UNITS = 512 # number of neurons in fully connected dense layer
UPDATE_TARGET_EVERY = 5

# Default model name
MODEL_NAME = f'conv{CONV_UNITS}x4_dense{DENSE_UNITS}x2_y{DISCOUNT}_minlr{LEARN_MIN}'

class ModifiedTensorBoard(TensorBoard):

    # Overriding init to set initial step and writer (we want one log file for all .fit() calls)
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.step = 1
        self.writer = tf.summary.create_file_writer(self.log_dir)
        self._train_dir = self.log_dir + '\\train'
        self._train_step = 1

    # Overriding this method to stop creating default log writer
    def set_model(self, model):
        pass

    # Overrided, saves logs with our step number
    # (otherwise every .fit() will start writing from 0th step)
    def on_epoch_end(self, epoch, logs=None):
        self.update_stats(**logs)

    # Overrided
    # We train for one batch only, no need to save anything at epoch end
    def on_batch_end(self, batch, logs=None):
        pass

    # Overrided, so won't close writer
    def on_train_end(self, _):
        pass

    # Custom method for saving own metrics
    # Creates writer, writes custom metrics and closes writer
    def update_stats(self, **stats):
        with self.writer.as_default():
            for key, value in stats.items():
                tf.summary.scalar(key,value,step=self.step)
                self.writer.flush()


class DQNAgent(object):
    def __init__(self, env, model_name=MODEL_NAME, conv_units=64, dense_units=256):
        self.env = env
        # Deep Q-learning Parameters
        self.discount = DISCOUNT
        self.learn_rate = learn_rate
        self.epsilon = epsilon
        self.model = create_dqn(
            self.learn_rate, self.env.bs, self.env.board_size ** 2, conv_units, dense_units)

        # target model - this is what we predict against every step
        self.target_model = create_dqn(
            self.learn_rate, self.env.bs, self.env.board_size ** 2, conv_units, dense_units)
        self.target_model.set_weights(self.model.get_weights())

        self.replay_memory = deque(maxlen=MEM_SIZE)
        self.target_update_counter = 0

        self.tensorboard = ModifiedTensorBoard(
            log_dir=f'logs\\{model_name}', profile_batch=0)

    def get_action(self, state):
        board = state.reshape(1, self.env.board_size ** 2)
        # unsolved = [i for i, x in enumerate(board[0]) if x==9]

        rand = np.random.random() # random value b/w 0 & 1

        if rand < self.epsilon: # random move (explore)
            move = self.env.randomAction(state)
        else:
            moves = self.model.predict(np.reshape(state, (1, self.env.board_size, self.env.board_size, 1)))
            moves[board!=9] = np.min(moves) # set already clicked tiles to min value
            move = np.argmax(moves)

        return move

    def update_replay_memory(self, transition):
        self.replay_memory.append(transition)

    def train(self, done):
        if len(self.replay_memory) < MEM_SIZE_MIN:
            return

        batch = random.sample(self.replay_memory, BATCH_SIZE)

        current_states = np.array([transition[0] for transition in batch])
        # print(current_states, current_states[0])
        # exit()

        current_qs_list = self.model.predict(current_states)

        new_current_states = np.array([transition[3] for transition in batch])
        future_qs_list = self.target_model.predict(new_current_states)

        X,y = [], []

        for i, (current_state, action, reward, new_current_state, done) in enumerate(batch):
            if not done:
                max_future_q = np.max(future_qs_list[i])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward

            current_qs = current_qs_list[i]
            current_qs[action] = new_q

            X.append(current_state)
            y.append(current_qs)

        self.model.fit(np.array(X), np.array(y), batch_size=BATCH_SIZE,
                       shuffle=False, verbose=0, callbacks=[self.tensorboard]\
                       if done else None)

        # updating to determine if we want to update target_model yet
        if done:
            self.target_update_counter += 1

        if self.target_update_counter > UPDATE_TARGET_EVERY:
            self.target_model.set_weights(self.model.get_weights())
            self.target_update_counter = 0

        # decay learn_rate
        self.learn_rate = max(LEARN_MIN, self.learn_rate*LEARN_DECAY)

        # decay epsilon
        self.epsilon = max(EPSILON_MIN, self.epsilon*EPSILON_DECAY)

if __name__ == "__main__":
    DQNAgent(MinesweeperDiscreetEnv(board_size=4, num_mines=3))


In [None]:
import argparse, pickle
from tqdm.notebook import tqdm
from keras.models import load_model

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

AGG_STATS_EVERY = 100 # calculate stats every 100 games for tensorboard
SAVE_MODEL_EVERY = 10_000 # save model and replay every 10,000 episodes

BOARD_SIZE = 4
NUM_MINES = 3

def main():
    env = MinesweeperDiscreetEnv(board_size=BOARD_SIZE, num_mines=NUM_MINES)
    agent = DQNAgent(env)

    progress_list, wins_list, ep_rewards = [], [], []
    n_clicks, n_wins, n_games, score = 0, 0, 0, 0

    
    for episode in tqdm(range(100000)):
        agent.tensorboard.step = episode

        current_state = env.reset()
        episode_reward = 0
        done = False

        while not done:

            action = agent.get_action(current_state)
            new_state, reward, done, _ = env.step(action)
            episode_reward += reward

            agent.update_replay_memory((current_state, action, reward, new_state, done))
            agent.train(done)

            n_clicks += 1
            current_state = new_state

            if done: 
              score += np.count_nonzero((current_state != [9]) & (current_state != [-1])) / (BOARD_SIZE * BOARD_SIZE - NUM_MINES)
              if reward == 1: n_wins += 1

        # progress_list.append(env.n_progress) # n of non-guess moves
        n_games += 1
        ep_rewards.append(episode_reward)


        if len(agent.replay_memory) < MEM_SIZE_MIN:
            continue

        if not episode % AGG_STATS_EVERY:
            med_progress = round(score / (episode + 1), 4)
            win_rate = round(n_wins / n_games, 4)
            med_reward = round(sum(ep_rewards) / (episode + 1), 4)

            agent.tensorboard.update_stats(
                progress_med = med_progress,
                winrate = win_rate,
                reward_med = med_reward,
                learn_rate = agent.learn_rate,
                epsilon = agent.epsilon)

            print(f'Episode: {episode}, Mean progress: {med_progress}, Mean reward: {med_reward}, Win rate : {win_rate}, Epsilon: {agent.epsilon}')

        if not episode % SAVE_MODEL_EVERY:
            with open(f'replay/model.pkl', 'wb') as output:
                pickle.dump(agent.replay_memory, output)

            agent.model.save(f'models/model.h5')

if __name__ == "__main__":
    main()

  0%|          | 0/100000 [00:00<?, ?it/s]

Episode: 300, Mean progress: 0.5832, Mean reward: -0.3362, Win rate : 0.0664, Epsilon: 0.9476042459652978
Episode: 400, Mean progress: 0.5908, Mean reward: -0.3052, Win rate : 0.0748, Epsilon: 0.9383160475147614
Episode: 500, Mean progress: 0.5802, Mean reward: -0.321, Win rate : 0.0699, Epsilon: 0.9299554837141395
Episode: 600, Mean progress: 0.5851, Mean reward: -0.3201, Win rate : 0.0682, Epsilon: 0.9211165713426723
Episode: 700, Mean progress: 0.573, Mean reward: -0.3258, Win rate : 0.0685, Epsilon: 0.9130918603446204
Episode: 800, Mean progress: 0.5682, Mean reward: -0.3323, Win rate : 0.0649, Epsilon: 0.9046619819597959
Episode: 900, Mean progress: 0.5701, Mean reward: -0.333, Win rate : 0.0633, Epsilon: 0.8960858779154546
Episode: 1000, Mean progress: 0.5683, Mean reward: -0.3269, Win rate : 0.0639, Epsilon: 0.8873026506365144
Episode: 1100, Mean progress: 0.57, Mean reward: -0.3224, Win rate : 0.0627, Epsilon: 0.8782321818557914
Episode: 1200, Mean progress: 0.5715, Mean reward