In [None]:
# Softmax classifier for guessing minesweeper board position and whether it has a mine or not

In [None]:
# Import libraries for simulation
import tensorflow as tf
import numpy as np
import random as r
import datetime as dt
import multiprocessing as mp

In [None]:
dimensions = (8,8)
mineProbability = 0.16      # Probability that a square contain a mine

In [None]:
# Clears a square on the minesweeper board.
# If it had a mine, return true
# Otherwise if it has no adjacent mines, recursively run on adjacent squares
# Return false
def clearSquare(board,adjacency,row,col):
    rows,cols = dimensions
    if board[row,col] == 1:
        return True
    if adjacency[row,col] >= 0:
        return False
    n = 0
    for r in range(row-1,row+2):
        for c in range(col-1,col+2):
            if 0 <= r and r < rows and 0 <= c and c < cols:
                n += board[r,c]
    adjacency[row,col] = n
    if n == 0:
        for r in range(row-1,row+2):
            for c in range(col-1,col+2):
                if 0 <= r and r < rows and 0 <= c and c < cols:
                    clearSquare(board,adjacency,r,c)
    return False

In [None]:
# This takes a mine board and gives a mine count with mines removed, and other random squares removed
def boardPartialMineCounts(board):
    clearProbability = r.uniform(0.05,0.5)
    result = np.full(dimensions,-1)
    for index, x in np.random.permutation(list(np.ndenumerate(board))):
        row,col = index
        if not(x) and result[row,col] == -1 and r.uniform(0,1) < clearProbability:
            clearSquare(board,result,row,col)
    return result

In [None]:
# Generates a random training batch of size n
def randomBoard(i):
    return(np.random.random(dimensions) < mineProbability)

def encodeCountsOneHot(counts):
    countsOneHot = np.zeros((counts.size,10))
    countsOneHot[np.arange(counts.size), counts.flatten() + 1] = 1
    return(countsOneHot.flatten())

def validGuesses(boardAndCounts):
    board,counts = boardAndCounts
    validGuesses = np.append(((counts == -1).astype(int) - board).flatten().astype(float),
        board.flatten().astype(float))
    validGuessesSum = sum(validGuesses)
    if validGuessesSum > 0:
        return(validGuesses / validGuessesSum)
    else:
        return(np.zeros(board.size*2))

try:
    cpus = mp.cpu_count()
except NotImplementedError:
    cpus = 2   # arbitrary default

pool = mp.Pool(processes=cpus)

def next_training_batch(n):
    boards = pool.map(randomBoard, range(n))
    counts = pool.map(boardPartialMineCounts, boards)
    batch_xs = pool.map(encodeCountsOneHot, counts)
    batch_ys = pool.map(validGuesses, zip(boards,counts))
    return(batch_xs, batch_ys, boards)

In [None]:
# Create the model
rows, cols = dimensions
size = rows*cols
mineCountsOneHot = tf.placeholder(tf.float32, [None, size*10], name="mineCountsOneHot")
W = tf.Variable(tf.truncated_normal([size*10, size], stddev=0.01), name="W")
b = tf.Variable(tf.truncated_normal([size], stddev=0.01), name="b")
y = tf.matmul(mineCountsOneHot, W) + b

In [None]:
def weight_variable(shape):
  initial = tf.truncated_normal(shape, stddev=0.01)
  return tf.Variable(initial)

def bias_variable(shape):
  initial = tf.truncated_normal(shape, stddev=0.01)
  return tf.Variable(initial)

In [None]:
def conv2d(x, W):
  return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
  return tf.nn.max_pool(x, ksize=[1, 2, 2, 1],
                        strides=[1, 2, 2, 1], padding='SAME')

In [None]:
y_image = tf.reshape(y, [-1, rows, cols, 1])

# First convolution layer
W_conv1 = weight_variable([5, 5, 1, 32])
b_conv1 = bias_variable([32])

h_conv1 = tf.nn.relu(conv2d(y_image, W_conv1) + b_conv1)
h_pool1 = max_pool_2x2(h_conv1)

# Second convolution layer
W_conv2 = weight_variable([5, 5, 32, 64])
b_conv2 = bias_variable([64])

h_conv2 = tf.nn.relu(conv2d(h_pool1, W_conv2) + b_conv2)
h_pool2 = max_pool_2x2(h_conv2)

# Third convolution layer
W_conv3 = weight_variable([5, 5, 64, 64])
b_conv3 = bias_variable([64])

h_conv3 = tf.nn.relu(conv2d(h_pool2, W_conv3) + b_conv3)
h_pool3 = max_pool_2x2(h_conv3)

# Fully connected layer
W_fc1 = weight_variable([64, 1024])
b_fc1 = bias_variable([1024])

h_pool3_flat = tf.reshape(h_pool3, [-1, 64])
h_fc1 = tf.nn.relu(tf.matmul(h_pool3_flat, W_fc1) + b_fc1)

# Dropout
keep_prob = tf.placeholder(tf.float32)
h_fc1_drop = tf.nn.dropout(h_fc1, keep_prob)

# Output layer
W_fc2 = weight_variable([1024, size*2])
b_fc2 = bias_variable([size*2])

y_conv = tf.matmul(h_fc1_drop, W_fc2) + b_fc2

In [None]:
validGuessAverages = tf.placeholder(tf.float32, [None, size*2], name="validGuessAverages")

In [None]:
# Loss function
cross_entropy = tf.reduce_mean(
    tf.nn.softmax_cross_entropy_with_logits(labels=validGuessAverages, logits=y_conv))

In [None]:
# Summaries for tensorboard
_ = tf.summary.scalar('loss', cross_entropy)

In [None]:
# Optimiser
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)

In [None]:
savePathPrefix = './tf.Mines7/' + str(dimensions) + '/'

modelDataPath = savePathPrefix

def summaryPath(run):
    return(savePathPrefix + 'runs/' + str(run) + '/')

In [None]:
# Create session and initialise or restore stuff
saver = tf.train.Saver()

sess = tf.InteractiveSession()

merged = tf.summary.merge_all()
writer = tf.summary.FileWriter(summaryPath(1), sess.graph)

In [None]:
tf.global_variables_initializer().run()

In [None]:
# Restore model?
#saver.restore(sess, modelDataPath + "model-500")

In [None]:
# Train
for iteration in range(100001):
    batch_xs, batch_ys, _ = next_training_batch(10000)
    summary, loss, _ = sess.run([merged, cross_entropy, train_step],
                                 feed_dict={mineCountsOneHot: batch_xs, validGuessAverages: batch_ys, keep_prob: 0.5})
    writer.add_summary(summary, iteration)
    print('%s: Loss at step %s: %s' % (dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), iteration, loss))
    if iteration % 10 == 0:
        save_path = saver.save(sess, modelDataPath + 'model', global_step=iteration)
        print("Model saved in file: %s" % save_path)

In [None]:
# Test trained model on larger batch size
batch_xs, batch_ys, _ = next_training_batch(10000)
print(sess.run(cross_entropy, feed_dict={mineCountsOneHot: batch_xs, validGuessAverages: batch_ys}))

In [None]:
# Run a test
batchSize = 10000
batch_xs, batch_ys,_ = next_training_batch(batchSize)

predictions = sess.run(tf.nn.softmax(y), feed_dict={mineCountsOneHot: batch_xs, validGuessAverages: batch_ys})
bestSquares = [pred.argmax() for pred in predictions]
unfrees = (batch_ys == 0).astype(int)
frees = [unfrees[i][bestSquares[i]] for i in range(batchSize)]
print("Number of errors for batch size of ", batchSize)
print(sum(frees))

In [None]:
# Find boards that we failed on
batchSize = 1000
batch_xs, batch_ys, _ = next_training_batch(batchSize)

predictions = sess.run(tf.nn.softmax(y), feed_dict={mineCountsOneHot: batch_xs, validGuessAverages: batch_ys})
bestSquares = [pred.argmax() for pred in predictions]
unfrees = (batch_ys == 0).astype(int)
guesses = [unfrees[i][bestSquares[i]] for i in range(batchSize)]
for i in range(batchSize):
    if guesses[i] == 1:
        print(batch_xs[i].reshape(dimensions))
        summary = sess.run(tf.summary.image('mine_miss', tf.reshape((batch_xs[i]+1).astype(float),[-1,rows,cols,1]), 100))
        writer.add_summary(summary)

In [None]:
#batch_xs = [[-1,1,-1,0,0,0,-1,-1,-1,1,1,1,-1,-1,1,2,2,1,1,-1,2,1,1,-1,-1,2,2,-1,-1,2,-1,1,1,0,-1,-1,2,-1,-1,4,-1,2,-1,1,2,-1,1,0,1,2,-1,3,2,2,1,-1,2,-1,1,0,0,1,1,-1,-1,-1,-1,-1,-1,1,1,-1,-1,0,0,3,-1,4,1,2,-1,1,-1,-1,0,0,0,2,-1,-1,-1,2,-1,1,0,0,0,-1,1,2,-1,2,1,2,2,3,3,2,-1,-1,1,-1,1,-1,0,1,2,-1,-1,-1,1,1,1,-1,1,0,-1,-1,-1,-1,-1,-1,-1,-1,0,-1,-1,-1,-1,-1,1,-1,-1,-1]]
batch_xs0 = [-1] * (size)
batch_xs0[0] = 1
batch_xs0[1] = 1
batch_xs0[cols] = 1

predictions = sess.run(tf.nn.softmax(y), feed_dict={mineCounts: [batch_xs0]})
bestSquares = [pred.argmax() for pred in predictions]

print(bestSquares[0] // cols, bestSquares[0] % cols)

In [None]:
np.save("./W", sess.run(W))

In [None]:
np.save("./b", sess.run(b))

In [None]:
np.savez("./model", sess.run([W,b]))