In [1]:
import PySimpleGUI as sg
import numpy as np
import os
from time import gmtime, strftime
import imageio
import shutil
import matplotlib.pyplot as plt
import cv2
from keras.utils import to_categorical
from keras.models import Model
from keras.layers import Input, Dense, Conv2D, Flatten, concatenate, Add
from keras.optimizers import Adam

In [2]:
pictures = ['open.png', '1.png', '2.png', '3.png', '4.png', '5.png', '6.png',
            '7.png', '8.png', 'close.png', 'mine.png', 'open_mine.png']

In [3]:
path = f'E://NN/minesweeper/field_images/'
checkpoint_dir = f'E://NN/minesweeper/checkpoint/'

In [4]:
lose_reward = -2.5
win_reward = 2.5
yolo_reward = 0.1
rep_point_reward = -0.5
open_point_reward = 2.2

In [5]:
class Minesweeper(object):

  '''
  Class of minesweeper game
  Input parameters:
    - mines_count - number of mines on playfield
    - playfield_size - length of square playfield side
  '''

  def __init__(self, mines_count, playfield_h, playfield_w):
    self.mines_count = mines_count
    self.playfield_h = playfield_h
    self.playfield_w = playfield_w
    # Actions for locate all points around current point (clockwise)
    self.actions = [(-1,-1), (-1,0), (-1,1), (0,1), (1,1), (1,0), (1,-1), (0,-1)]

    self.lose_reward = lose_reward
    self.win_reward = win_reward
    self.yolo_reward = yolo_reward
    self.rep_point_reward = rep_point_reward
    self.open_point_reward = open_point_reward
    self.free_point = set()
    self.history = []

  # Create fake playfield
  def _create_playground(self):
    self.playground = np.zeros((self.playfield_h+2,self. playfield_w+2))
    return self.playground

  # Create filed for mines placement
  def _create_minesfield(self):
    self.minesfield = np.full((self.playfield_h, self.playfield_w), 9)
    return self.minesfield


  # Create mines in random places exept first_step_coord
  def _place_mines(self, first_step_coord):

    first_point_surround = []
    for act in self.actions:
      x1 = first_step_coord[0] + act[0]
      y1 = first_step_coord[1] + act[1]
      first_point_surround.append((x1, y1))

    self.mines_coord = []
    while len(self.mines_coord) < self.mines_count:
      x = np.random.randint(0, self.playfield_h)
      y = np.random.randint(0, self.playfield_w)

      if ((x, y) != first_step_coord) and ((x, y) not in self.mines_coord) and ((x, y) not in first_point_surround):
        self.mines_coord.append((x, y))

    return first_step_coord

  # Create stealth playfield with mines around count
  def _mines_number(self):
    mines_round =[]
    for mine in self.mines_coord:
      mines_round.append((mine[0]+1, mine[1]+1))

    field = np.zeros_like(self.playground)

    for i in range(1, self.playfield_h+1):
      for j in range(1, self.playfield_w+1):
        if (i, j) in mines_round:
          continue
        
        field_round = [(i-1,j-1), (i-1,j), (i-1,j+1),
                       (i,j-1), (i,j), (i,j+1),
                       (i+1,j-1), (i+1,j), (i+1,j+1)]
        mines_count = 0
        for point in field_round:
          mines_count += self.playground[point]
        field[i,j] = mines_count

    return field[1:-1, 1:-1]

  # Create fake playfield where stored info abount point surrunding
  def _create_playfield(self):
    for mine in self.mines_coord:
      x = mine[0] + 1
      y = mine[1] + 1
      self.playground[(x,y)] = 1

    self.fake_playfield = self._mines_number()

  # Check if step has reason (step in near field of open points)
  def _yolo_move(self, action):
    # action - (x,y) coordinate of current step

    for act in self.actions:
      x1 = action[0] + act[0]
      y1 = action[1] + act[1]
      if (x1, y1) in self.free_point:
        return False
    
    return True

  # Open all connected zeros in playfield (for game acceleration)
  def open_zeros(self):
    
    done = True
    scale_reward = 0
    while done:
      k = 0
      for point in self.free_point.copy():
        if self.minesfield[point] == 0:
          
          for act in self.actions:
            x = point[0] + act[0]
            y = point[1] + act[1]
            if x in range(self.playfield_h) and y in range(self.playfield_w):
              if self.minesfield[(x,y)] == 9:
                k = 1
                scale_reward += 1
                self.free_point.add((x,y))
                self.minesfield[(x,y)] = self.fake_playfield[(x,y)]
      if k == 0:
        return scale_reward


  # Initialize field for game
  def initialize_game(self, first_step_coord):

    self._create_playground()
    self._place_mines(first_step_coord)
    self.free_point.add(first_step_coord)
    
    self._create_playfield()
    self._create_minesfield()
    self.minesfield[first_step_coord] = self.fake_playfield[first_step_coord]
    _ = self.open_zeros()
    self.history.append(self.minesfield.copy())


  # Open minesfield point or die 
  def step(self, action):
    # action - (x,y) coordinate of current step

    x = action[0]
    y = action[1]

    # Check if current step coordinates in mines_coord
    if (x,y) in self.mines_coord:
      self.free_point.add((x,y))
      self.minesfield[(x,y)] = 11
      self.mines_coord = set(self.mines_coord) - set([(x,y)])
      for mine_coord in self.mines_coord:
        self.minesfield[(mine_coord[0], mine_coord[1])] = 10
      reward = self.lose_reward
      done = True

    # Check if current step coordinates is already done 
    elif (x,y) in self.free_point:
      reward = self.rep_point_reward
      done = False
      
    else:
      # Add point in set of free points
      self.free_point.add((x,y))
      # Check if current step coordinates is yolo move
      if self._yolo_move((x,y)):
        self.minesfield[(x,y)] = self.fake_playfield[(x,y)]
        reward = self.yolo_reward
        done = False
      else:
        # Check if current step coordinates is last free point (win)
        if len(self.free_point) == int(self.playfield_h*self.playfield_w) - self.mines_count:
          self.minesfield[(x,y)] = self.fake_playfield[(x,y)]
          reward = self.win_reward
          done = True
        else:
        # Check if current step coordinates is not last free point
          self.minesfield[(x,y)] = self.fake_playfield[(x,y)]
          reward = self.open_point_reward
          done = False
    
    _ = self.open_zeros()
    self.history.append(self.minesfield.copy())

    return self.minesfield, reward, done

In [6]:
class Agent(object):

  '''
  Class of agent for play minesweeper
  Input parameters:
    - dicision_field - Size of suqare side for dicision field
    - epsilon - start value of probability of random step selection
  '''

  def __init__(self, dicision_field):

    self.load_model = True # Always loading model if it is exist

    self.checkpoint_dir = checkpoint_dir # Path where model is saved

    self.dicision_field = dicision_field # Size of suqare side for dicision field
    self.num_actions = int(self.dicision_field * self.dicision_field) # Number of possible actions
    self.state_size = (self.dicision_field, self.dicision_field, 10)

    self.model = self.build_model()

    if self.load_model:
      if os.path.exists(os.path.join(self.checkpoint_dir, 'curr_ckpt.h5')) is True:
        self.model.load_weights(os.path.join(self.checkpoint_dir, 'curr_ckpt.h5'))

  def build_model(self):

    input = Input(shape=(self.state_size))

    x1 = Conv2D(32, (5,5), padding='same', activation='tanh')(input)
    x2 = Conv2D(32, (3,3), padding='same', activation='tanh')(input)

    x3 = Conv2D(32, (5,5), padding='same', activation='relu')(input)
    x4 = Conv2D(32, (3,3), padding='same', activation='relu')(input)

    xx = Add()([x1, x2])
    xy = concatenate([xx, x3, x4])
    x = Conv2D(64, (3,3), padding='same', activation='relu')(xy)
    output = Conv2D(1, (3,3), padding='same', activation='linear')(x)

    model = Model(input, output)
    model.compile(optimizer=Adam(lr=0.5e-4),
                  loss='mse')
    
    return model

  # Select action
  def get_action(self, history):
    q_value = self.model.predict(np.expand_dims(history, axis=0))
    return q_value

In [7]:
def save_result(history, player):

  '''
  Функция отображения игрового поля
  Входные параметры:
   - play - игровое поле
   - free_points - координаты точек, куда сходили
   - mines_coord - координаты мин
  '''

  # Цвета для цифр
  text_color = ['black','blue', 'green','red', 'red', 'red', 'red', 'red', 'red']
  # Цвета для клеток поля
  cell_color = [[100, 100, 100], [150, 150, 150], [0, 0, 0]]
  # Размер клетки в пикселях
  step = 20

  # Поле для отображения резльтатов хода
  numpy_grid = np.zeros((history.shape[0]*step, history.shape[1]*step, 3))

  # Разметра цветов клетов в зависимости от открытости
  for x in range(history.shape[0]):
    for y in range(history.shape[1]):
      if history[x,y] != 9:
        numpy_grid[x*step:x*step+step,y*step:y*step+step] = cell_color[1]
      else:
        numpy_grid[x*step:x*step+step,y*step:y*step+step] = cell_color[0]

  # Рисуем черные линии
  for x in range(0, numpy_grid.shape[0], step):
    for y in range(0, numpy_grid.shape[1], step):
      numpy_grid[x:x+1,y:y+step] = cell_color[2]
      numpy_grid[x+step-1:x+step,y:y+step] = cell_color[2]
      numpy_grid[x:x+step,y:y+1] = cell_color[2]
      numpy_grid[x:x+step,y+step-1:y+step] = cell_color[2]
  
  fig = plt.figure(figsize=(history.shape[1]//2, history.shape[0]//2))
  plt.imshow(numpy_grid / 255)
  plt.axis('off')

  # Расcтавляем цифры
  for x, xn in enumerate(range(step//2,numpy_grid.shape[0], step)):
    for y, yn in enumerate(range(step//2, numpy_grid.shape[1], step)):
      if history[x,y] != 9:
        if history[x,y] in [10, 11]:
          label = '*'
          if history[x,y] == 11:
            plt.text(yn-1, xn+4, label, color='red', ha='center', va='center', fontsize=25)
          else:
            plt.text(yn-1, xn+4, label, color='black', ha='center', va='center', fontsize=25)
        elif history[x,y] < 9 and history[x,y] > 0:
          label = history[x, y]
          plt.text(yn, xn, label, color=text_color[label], ha='center', va='center', fontsize=23)
  
  if player:
    plt.title('Player', fontdict={'fontsize':20})
  else:
    plt.title('Neural network', fontdict={'fontsize':20})

#   data = np.asarray(fig)
#   print(data.shape)
    
  fig.canvas.draw()

  # Now we can save it to a numpy array.
  data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
  data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    
  plt.close()
    
  return data
#   plt.savefig(os.path.join(result_dir, str(i) + '.jpg'))
#   plt.show()

In [8]:
def create_fields(history, pad_horizontal, pad_vertical):

    field_1 = np.concatenate((history, pad_horizontal), axis=1)
    field_1 = np.concatenate((field_1, pad_vertical), axis=0)

    field_2 = np.concatenate((pad_horizontal, history), axis=1)
    field_2 = np.concatenate((field_2, pad_vertical), axis=0)

    field_3 = np.concatenate((history, pad_horizontal), axis=1)
    field_3 = np.concatenate((pad_vertical, field_3), axis=0)

    field_4 = np.concatenate((pad_horizontal, history), axis=1)
    field_4 = np.concatenate((pad_vertical, field_4), axis=0)

    return [field_1, field_2, field_3, field_4]

In [9]:
def get_predict(agent, history):
    
    playfield_h = history.shape[0]
    playfield_w = history.shape[1]
    dicision_field = agent.dicision_field
    
    pad_horizontal = np.zeros((playfield_h, dicision_field - playfield_w, 10))
    pad_vertical = np.zeros((dicision_field - playfield_h, dicision_field, 10))
    
    pad_horizontal[:,:,9] = 1
    pad_vertical[:,:,9] = 1
    
    fields_for_predict = create_fields(history, pad_horizontal, pad_vertical)
    
    q_value = np.zeros((playfield_h, playfield_w))

    tmp_q_value = agent.get_action(fields_for_predict[0])
    q_value += tmp_q_value.squeeze()[0:playfield_h,0:playfield_w]

    tmp_q_value = agent.get_action(fields_for_predict[1])
    q_value += tmp_q_value.squeeze()[0:playfield_h,-playfield_w:]

    tmp_q_value = agent.get_action(fields_for_predict[2])
    q_value += tmp_q_value.squeeze()[-playfield_h:,0:playfield_w]

    tmp_q_value = agent.get_action(fields_for_predict[3])
    q_value += tmp_q_value.squeeze()[-playfield_h:,-playfield_w:]
    
    action = np.argmax(q_value)
    
    x = int(action // playfield_w)
    y = int(action - x * playfield_w)
    
    return (x, y)

In [10]:
def create_grid(height, width, start):

    grid_layout = []
    for r in range(height):
        row = []
        for c in range(width):
            t = sg.Button(size=(2,1), pad=(0,0), image_filename=os.path.join(path, 'close.png'), key=str(start+width*r+c))
            # add to tile array
            row.append(t)
        grid_layout.append(row)
    grid = sg.Column(grid_layout)

    return grid

In [11]:
def create_layout(mines_count, playfield_h, playfield_w, grid_for_player, grid_for_nn):
    
    layout = [  [sg.Text('Field height'), sg.InputText(f'{playfield_h}', key='playfield_h', size=(5,1)),
                 sg.Text('Field width'), sg.InputText(f'{playfield_w}', key='playfield_w', size=(5,1)),
                 sg.Text('Mines count'), sg.InputText(f'{mines_count}', key='mines_count', size=(5,1))],
                [sg.HorizontalSeparator()],
                [sg.Column([[sg.Text('Player', key='pl_field', size=(25,1))], [grid_for_player]]),
                 sg.HorizontalSeparator(),
                 sg.Column([[sg.Text('Neural network', key='nn_field', size=(25,1))], [grid_for_nn]])],
                [sg.Button(button_text='Generate grid', key='grid'),
                 sg.Button(button_text='Save results', key='save'),
                 sg.Checkbox('Save player results', default=True, key='save_pl'),
                 sg.Checkbox('Save NNet results', default=True, key='save_nn'),
                 sg.Cancel('Exit')],
                [sg.Text('* Results folder - minesweeper_directory/results/')],
                [sg.Text('** If You losed before neural net, just press any buttom on your field until nn win or lose')]
             ]
    
    return layout

In [14]:
sg.theme('DarkAmber')   # Add a little color to your windows

layout = [  [sg.Text('Select minesweeper folder')],
            [sg.InputText('Minesweeper folder', key='module_dir'), sg.FolderBrowse()],
            [sg.Button(button_text='OK', key='OK')]
         ]

window = sg.Window('Minesweeper', layout)

while True:
    event, values = window.read()
    if event == 'OK':
        module_dir = values['module_dir']
        if os.path.exists(os.path.join(module_dir, 'checkpoint')):
            window.close()
            break
        else:
            window['module_dir'].update('Choose correct minesweeper folder')

# All the stuff inside your window. This is the PSG magic code compactor...
layout = [  [sg.Text('Field height'), sg.InputText('8', key='playfield_h', size=(5,1)),
             sg.Text('Field width'), sg.InputText('8', key='playfield_w', size=(5,1)),
             sg.Text('Mines count'), sg.InputText('10', key='mines_count', size=(5,1))],
            [sg.HorizontalSeparator()],
            [sg.Button(button_text='Generate grid', key='grid'), sg.Cancel('Exit')]]

# Create the Window
window = sg.Window('Minesweeper', layout)
# Event Loop to process "events"
path = os.path.join(module_dir, 'field_images')
checkpoint_dir = os.path.join(module_dir, 'checkpoint')
images_dir = os.path.join(module_dir, 'images')
result_dir = os.path.join(module_dir, 'results')

agent = Agent(dicision_field=35)

while True:             
    event, values = window.read()
    if event in (sg.WIN_CLOSED, 'Exit'):
        break
    if event in ('grid'):
        window.close()
        
        nn_lose = False
        nn_win = False
        player_lose = False
        player_win = False
        
        playfield_h = int(values['playfield_h'])
        playfield_w = int(values['playfield_w'])
        mines_count = int(values['mines_count'])
        field_indices = [str(i) for i in range(playfield_h*playfield_w)]
        minesweeper_for_player = Minesweeper(mines_count, playfield_h, playfield_w)
        minesweeper_for_nn = Minesweeper(mines_count, playfield_h, playfield_w)
        grid_for_player = create_grid(playfield_h, playfield_w, 0)
        grid_for_nn = create_grid(playfield_h, playfield_w, playfield_w*playfield_h)
        layout = create_layout(mines_count, playfield_h, playfield_w, grid_for_player, grid_for_nn)
        window = sg.Window('Minesweeper', layout)
    
    if event in field_indices:
        
        action = int(event)
        x = int(action // playfield_w)
        y = int(action - x * playfield_w)

        if len(minesweeper_for_player.free_point) > 0:
            
            if nn_win or nn_lose:
                new_field_nn = minesweeper_for_nn.minesfield.flatten()
            else:
                x_nn, y_nn = get_predict(agent, history)

                _, reward_nn, done_nn = minesweeper_for_nn.step((x_nn,y_nn))
                if reward_nn == win_reward:
                    window['nn_field'].Update('Neural network --- WIN')
                    nn_win = True
                elif reward_nn == lose_reward:
                    window['nn_field'].Update('Neural network --- LOSE')
                    nn_lose = True
                else:
                    history = to_categorical(minesweeper_for_nn.minesfield, 10)
                    
                new_field_nn = minesweeper_for_nn.minesfield.flatten()
            
            if player_win or player_lose:
                new_field_player = minesweeper_for_player.minesfield.flatten()
            else:
                _, reward, done = minesweeper_for_player.step((x,y))
                if reward == win_reward:
                    window['pl_field'].Update('Player --- WIN')
                    player_win = True
                elif reward == lose_reward:
                    window['pl_field'].Update('Player --- LOSE')
                    player_lose = True
                
                new_field_player = minesweeper_for_player.minesfield.flatten()
            
        else:
            minesweeper_for_player.initialize_game((x,y))
            new_field_player = minesweeper_for_player.minesfield.flatten()
            
            minesweeper_for_nn.free_point = minesweeper_for_player.free_point.copy()
            minesweeper_for_nn.minesfield = minesweeper_for_player.minesfield.copy()
            minesweeper_for_nn.mines_coord = minesweeper_for_player.mines_coord.copy()
            minesweeper_for_nn.fake_playfield = minesweeper_for_player.fake_playfield.copy()

            history = to_categorical(minesweeper_for_nn.minesfield, 10)
            new_field_nn = minesweeper_for_nn.minesfield.flatten()
            

        for i, ind in enumerate(field_indices):
            window[ind].update(image_filename=os.path.join(path, pictures[new_field_player[i]]))
            
        for i, ind in enumerate(field_indices, start=playfield_w*playfield_h):
            window[str(i)].update(image_filename=os.path.join(path, pictures[new_field_nn[int(ind)]]))

            
    if event == 'save':
        
        save_player = values['save_pl']
        save_nn = values['save_nn']
        nn_history_len = len(minesweeper_for_nn.history)
        player_history_len = len(minesweeper_for_player.history)
#         print(nn_history_len, player_history_len)

        if os.path.exists(images_dir) is True:
            shutil.rmtree(images_dir)
        os.mkdir(images_dir)
        
        if nn_history_len < player_history_len:
            frame_to_extend = minesweeper_for_nn.history[-1].copy()
            frames_to_add = player_history_len - nn_history_len
            minesweeper_for_nn.history.extend([frame_to_extend]*frames_to_add)
        elif nn_history_len > player_history_len:
#             print(player_history_len)
            frame_to_extend = minesweeper_for_player.history[-1].copy()
            frames_to_add = nn_history_len - player_history_len
            minesweeper_for_player.history.extend([frame_to_extend]*frames_to_add)
#             print(len(minesweeper_for_player.history))
        
        for i in range(len(minesweeper_for_player.history)):
            data_player = save_result(minesweeper_for_player.history[i], True)
            data_nn = save_result(minesweeper_for_nn.history[i], False)
            if save_player and save_nn:
                data = np.concatenate((data_player, data_nn), axis=1)
            elif save_player:
                data = data_player
            elif save_nn:
                data = data_nn
            else:
                continue

            cv2.imwrite(os.path.join(images_dir, str(i) + '.jpg'), data[:,:,::-1])    
        
        if len(os.listdir(images_dir)) > 0:
            str_time = strftime('%d-%m-%Y_%H-%M-%S', gmtime())
            images = [imageio.imread(os.path.join(images_dir, str(id) + '.jpg')) for id in range(len(os.listdir(images_dir)))]
            imageio.mimsave(os.path.join(result_dir, f'mw_{playfield_h}x{playfield_w}x{mines_count}_{str_time}.gif'), images)
        
window.close()

  data = np.fromstring(fig.canvas.tostring_rgb(), dtype=np.uint8, sep='')
