In [1]:
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as torch_init
import torch.optim as optim

from Minesweeper import *
from Minesweeper import _MINE_
from Minesweeper import Minesweeper as mGame

In [2]:
# If there are GPUs, choose the first one for computing. Otherwise use CPU.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)  
# If 'cuda:0' is printed, it means GPU is available.

cuda:0


In [3]:
class Net(nn.Module):
    def __init__(self):
        # Initialization.
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(11, 64, 3, stride=1, padding=1)
        #self.conv1_normed = nn.BatchNorm2d(12)
        #torch_init.xavier_normal_(self.conv1.weight)
        self.conv2 = nn.Conv2d(64, 64, 3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 64, 3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(64, 64, 3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(64, 1, 1, stride=1, padding=0)
    
    def forward(self, x, revealed):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = torch.sigmoid(self.conv5(x))
        #print('------forward print()------')
        #print(x.shape)
        #print(revealed.shape)
        x = x * revealed
        return x
    
net = Net()     # Create the network instance.
net.to(device)  # Move the network parameters to the specified device.

Net(
  (conv1): Conv2d(11, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv5): Conv2d(64, 1, kernel_size=(1, 1), stride=(1, 1))
)

In [4]:
# We use cross-entropy as loss function.
loss_func = nn.BCELoss()  
# We use stochastic gradient descent (SGD) as optimizer.
#opt = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
opt = optim.Adam(net.parameters())

In [5]:
class MineSweeperDataset(torch.utils.data.Dataset):
    """Minesweeper dataset."""

    def __init__(self, inputs, labels):
        """
        Args:
            inputs (n, 11, dim1, dim2): input minesweeper channels
            masks  (n, 1,  dim1, dim2): binary revealed masks
            labels (n, 1,  dim1, dim2): binary mine map
        """
        self.inputs = inputs
        self.masks = inputs[:, [0], :, :]
        self.masks = torch.where(self.masks == 0, torch.tensor(1), torch.tensor(0))
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return (self.inputs[idx], self.masks[idx], self.labels[idx])

In [6]:
def getInputsFromGame(mGame):
    state = torch.tensor(mGame.state)
    display_board = torch.tensor(mGame.display_board)
    
    inputs = torch.zeros((11, mGame.height, mGame.width))
    
    # channel 0: binary revealed map
    inputs[0] = torch.where(state == HIDDEN, torch.tensor(0), torch.tensor(1))
    
    # channel 1: for zero padding detecting game board edge
    inputs[1] = torch.ones((mGame.height, mGame.width))
    
    # channel 2-10: numeric one-hot encoding
    for i in range(9):
        inputs[i+2] = torch.where(display_board == i, torch.tensor(1), torch.tensor(0))
    
    return inputs

In [7]:
def fit(x, y, batch_size, epochs):
    avg_losses = []   # Avg. losses.
    print_freq = 10  # Print frequency.
    
    trainset = MineSweeperDataset(x, y)
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs):  # Loop over the dataset multiple times.
        running_loss = 0.0       # Initialize running loss.
        
        for i, data in enumerate(trainloader, 0):
            # Move the inputs to the specified device.
            inputs, masks, labels = data
            inputs, masks, labels = inputs.to(device), masks.to(device), labels.to(device)

            # Zero the parameter gradients.
            opt.zero_grad()

            #print('------fit print------')
            #print(inputs.shape)
            #print(masks.shape)
            #print(labels.shape)
            
            # Forward step.
            outputs = net(inputs, masks)
            loss = loss_func(outputs, labels.detach())

            # Backward step.
            loss.backward()

            # Optimization step (update the parameters).
            opt.step()

            # Print statistics.
            avg_losses.append(loss.item())
#             running_loss += loss.item()
#             if i % print_freq == print_freq - 1: # Print every several mini-batches.
#                 avg_loss = running_loss / print_freq
#                 print('[epoch: {}, i: {:5d}] avg mini-batch loss: {:.3f}'.format(
#                     epoch, i, avg_loss))
#                 avg_losses.append(avg_loss)
#                 running_loss = 0.0

#     plt.plot(avg_losses)
#     plt.xlabel('mini-batch index / {}'.format(print_freq))
#     plt.ylabel('avg. mini-batch loss')
#     plt.show()
    print(avg_losses)
    return torch.mean(torch.tensor(avg_losses))          


In [8]:
def trainMineAI(nBatches, nSamples, nEpochsPerBatch, nMiniBatchSize, difficulty):
    """
    Args:
        nBatches: number of batches to train
        nSamples: number of games per batch -> fit(batch_size)
        nEpochsPerBatch: training epochs per batch
        nRows: minesweeper game board length  (# of rows)
        nCols: minesweeper game board width   (# of cols)
    """
    nRows = difficulty['height']
    nCols = difficulty['width']
    
    x = torch.zeros((nSamples, 11, nRows, nCols))  # 11 channels: 1 for if has been revealed, 1 for is-on-board, 1 for each number
    masks = torch.zeros((nSamples, 1, nRows, nCols))
    y = torch.zeros((nSamples, 1, nRows, nCols))
    
    batch_losses = []
    
    print_freq = 100  # Print frequency.
    
    for i in range(nBatches):
        solved_3bv = 0
        gamesPlayed = 0
        gamesWon = 0
        for samplesTaken in range(nSamples): 
            
            # initiate game, first click in center
            game = mGame(difficulty, (int(nRows / 2), int(nCols / 2)))
            
            while not (game.is_finished or samplesTaken == nSamples):
               
                # get data input from current game board
                curr_inputs = getInputsFromGame(game).to(device)
                x[samplesTaken] = curr_inputs
                mask = torch.where(x[samplesTaken][0] == 0, torch.tensor(1), torch.tensor(0))
                         
                # make probability predictions
                #print(curr_inputs.shape)
                #print(mask.shape)
                #print(curr_inputs.unsqueeze(0).shape)
                #print(mask.unsqueeze(0).shape)
                out = net(curr_inputs.unsqueeze(0).to(device), mask.unsqueeze(0).unsqueeze(0).to(device))
                
                # choose best remaining cell
                selected = torch.argmin(out[0][0]+curr_inputs[0]) #add Xnow[0] so that already selected cells aren't chosen
                selected_row = int(selected / nCols)
                selected_col = selected % nCols
                game.click(selected_row, selected_col)
                
                # find truth
                truth = out
                truth[0, 0, selected_row, selected_col] = 1 if game.display_board[selected_row][selected_col] == _MINE_ else 0
                y[samplesTaken] = truth[0]
            
            if game.is_finished:
                gamesPlayed += 1
                solved_3bv += game.get_current_3bv() / game.get_3bv()
                if game.result:
                    gamesWon += 1
                    
            if samplesTaken % print_freq == print_freq - 1: # Print every several mini-batches.
                print('Samples taken: {} / {}'.format(samplesTaken, nSamples))
    
        if gamesPlayed > 0:
            mean3BVSolved = float(solved_3bv) / gamesPlayed
            propGamesWon = float(gamesWon) / gamesPlayed
        print('Games played in batch {}: {} '.format(i, gamesPlayed))
        print('Mean 3BV solved percent in batch {}: {}%'.format(i, mean3BVSolved * 100))
        print('Proportion of games won in batch {}: {}%'.format(i, propGamesWon * 100))
        
        # train
        batch_loss = fit(x, y, nSamples, nEpochsPerBatch)
        batch_losses.append(batch_loss)
        print('Finished batch number {}/{} Training. Batch loss: {}'.format(i, nBatches, batch_loss))

        # save model every 100 batch
        if (i+1) % 100 == 0:
            torch.save({
                'model_state_dict': net.state_dict(),
                'optimizer_state_dict': opt.state_dict(),
                'batch_losses': batch_losses
            }, "trainedModels/testModel.pt")
                         
    plt.plot(batch_losses)
    plt.xlabel('batch index')
    plt.ylabel('batch loss')
    plt.show()

In [None]:
trainMineAI(nBatches=10, nSamples=1000, nEpochsPerBatch=1, nMiniBatchSize=1, difficulty=DIFF_BEGINNER)



Samples taken: 99 / 1000
Samples taken: 199 / 1000
Samples taken: 299 / 1000
Samples taken: 399 / 1000
Samples taken: 499 / 1000
Samples taken: 599 / 1000
Samples taken: 699 / 1000
Samples taken: 799 / 1000
Samples taken: 899 / 1000
Samples taken: 999 / 1000
Games played in batch 0: 1000 
Mean 3BV solved percent in batch 0: 14.268197807827255%
Proportion of games won in batch 0: 0.0%
[0.4993148148059845]
Finished batch number 0/10 Training. Batch loss: 0.4993148148059845
Samples taken: 99 / 1000
Samples taken: 199 / 1000
Samples taken: 299 / 1000
Samples taken: 399 / 1000
Samples taken: 499 / 1000
Samples taken: 599 / 1000
Samples taken: 699 / 1000
Samples taken: 799 / 1000
Samples taken: 899 / 1000
Samples taken: 999 / 1000
Games played in batch 1: 1000 
Mean 3BV solved percent in batch 1: 8.601018610276597%
Proportion of games won in batch 1: 0.0%
[0.5455607175827026]
Finished batch number 1/10 Training. Batch loss: 0.5455607175827026
Samples taken: 99 / 1000
Samples taken: 199 / 100