In [0]:
# from google.colab import drive
# drive.mount('/content/drive')
from copy import copy
from six import StringIO
from pprint import pprint

import gym
from gym import spaces, error, utils
from gym.utils import seeding
import numpy as np


resolved = 0
unfinished = 1
error = 2


def checkSolution(grid):
    N = len(grid)
    for i in range(N):
        for j in range(N):
            if grid[i][j] == 0:
                return unfinished
            n = N // 3
            iOffset = i // n * n
            jOffset = j // n * n
            square = grid[iOffset:iOffset + n, jOffset:jOffset + n].flatten()

            uniqueInRow = countItem(grid[i], grid[i, j]) == 1
            uniqueInCol = countItem(grid[:, j:j + 1].flatten(), grid[i, j]) == 1
            uniqueInSquare = countItem(square, grid[i, j]) == 1

            if not (uniqueInRow and uniqueInCol and uniqueInSquare):
                return error
    return resolved


def countItem(vector, item):
    count = 0
    for item2 in vector:
        if item2 == item: count += 1
    return count


def getSolutions(grid, stopAt=1, i=-1, j=-1, omit=-1):
    N = len(grid)
    check = checkSolution(grid)

    if check == resolved:
        return np.array([grid], dtype=int)
    if check == error:
        return np.empty(shape=(0, N, N), dtype=int)

    if i == -1:
        for i in range(N):
            for j in range(N):
                if grid[i, j] == 0: break
            if grid[i, j] == 0: break

    values = np.arange(1, N + 1)
    np.random.shuffle(values)

    solutions = np.empty(shape=(0, N, N), dtype=int)
    for value in values:
        if omit == value: continue
        cGrid = np.copy(grid)
        cGrid[i, j] = value
        subSolutions = getSolutions(cGrid, stopAt=stopAt - len(solutions))
        solutions = np.concatenate((solutions, subSolutions))
        if len(solutions) >= stopAt:
            return solutions
    return solutions


class SudokuEnv(gym.Env):
    last_action = None

    def __init__(self, n):
        self.n = n
        self.val_limit = self.n - 1
        self.nxn = self.n * self.n
        self.observation_space = spaces.Box(1, self.n, shape=(self.n, self.n))
        # Action = nxn*value + (row*n + col), n = 9 here
        self.action_space = spaces.Discrete(self.nxn * self.val_limit + (self.val_limit * self.n + self.val_limit))
        self.grid = []
        self.original_indices_row = []
        self.original_indices_col = []
        self.base = getSolutions(np.zeros(shape=(self.n, self.n)))[0]

        N = len(self.base)
        positions = []
        for i in range(N):
            for j in range(N):
                positions.append((i, j))
        np.random.shuffle(positions)

        count = 0
        for i, j in positions:
            if count > 1:
                break
            oldValue = self.base[i, j]
            self.base[i, j] = 0
            solutions = getSolutions(self.base, stopAt=2, i=i, j=j, omit=oldValue)
            if len(solutions) == 0:
                count += 1
            else:
                self.base[i, j] = oldValue

    def step(self, action):
        """
        """
        if self.last_action != None and self.last_action == action:
            return np.copy(self.grid), -0.5, False, None
        self.last_action = action
        oldGrid = np.copy(self.grid)

        square = action % self.nxn
        col = square % self.n
        row = (square - col) // self.n
        val = action // self.nxn + 1

        for i in range(len(self.original_indices_row)):
            if col == self.original_indices_col[i] and row == self.original_indices_row[i]:
                # print(f"ORIGINAL FILL Row: {row} Column: {col} Value: {val}")
                return np.copy(self.grid), -1, False, None

        if self.grid[row, col] == val:
            # print("Already there")
            return np.copy(self.grid), -1, False, None

        self.grid[row, col] = val

        stats = checkSolution(self.grid)
        if stats == resolved:
            return np.copy(self.grid), 1, True, None
        elif stats == unfinished:
            return np.copy(self.grid), -0.1, False, None
        elif stats == error:
            self.grid = oldGrid
            return np.copy(self.grid), -1, False, None

    def reset(self):
        self.last_action = None
        self.grid = np.copy(self.base)
        self.original_indices_row, self.original_indices_col = np.nonzero(self.grid)
        return np.copy(self.grid)

    def render(self, mode='human', close=False):
        if self.last_action != None:
            square = self.last_action % self.nxn
            col = square % self.n
            row = (square - col) // self.n
            val = self.last_action // self.nxn
        for i in range(len(self.grid)):
            for j in range(len(self.grid)):
                if self.last_action != None and i == row and j == col:
                    if val == self.grid[i, j]:
                        print('')
                    else:
                        print('')
                else:
                    print('')
                if j % 3 == 2 and j != len(self.grid) - 1:
                    print(' | ')
            if i % 3 == 2 and i != len(self.grid) - 1:
                print('\n-------------------------\n')
            else:
                print('\n')
        print('\n\n')

In [0]:
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

'''
Variables needed for DQN
'''
GAMMA = 0.9
LEARNING_RATE = 0.1
MEMORY_SIZE = 1000000
BATCH_SIZE = 20
EXPLORATION_MAX = 0.95
EXPLORATION_MIN = 0.01
EXPLORATION_DECAY = 0.8

class DQNSolver:

    def __init__(self, observation_space, action_space):
        self.exploration_rate = EXPLORATION_MAX

        self.action_space = action_space
        self.memory = deque(maxlen=MEMORY_SIZE)

        self.model = self.create_model(observation_space)
        self.target_model = self.create_model(observation_space)
        # self.model.add(Dense(24, input_shape=(observation_space,), activation="relu"))
        # self.model.add(Dense(48, activation="relu"))
        # self.model.add(Dense(24, activation="relu"))
        # self.model.add(Dense(self.action_space, activation="linear"))
        # self.model.compile(loss="mse", optimizer=Adam(lr=LEARNING_RATE))

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.action_space)
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])

    def experience_replay(self):
        if len(self.memory) < BATCH_SIZE:
            return
        batch = random.sample(self.memory, BATCH_SIZE)
        for state, action, reward, state_next, terminal in batch:
            q_update = self.target_model.predict(state_next)
            #q_update = reward
            if not terminal:
                q_update = (reward + GAMMA * np.amax(self.target_model.predict(state_next)[0]))
            q_values = self.model.predict(state)
            q_values[0][action] = q_update
            self.model.fit(state, q_values, verbose=0)
        self.exploration_rate *= EXPLORATION_DECAY
        self.exploration_rate = max(EXPLORATION_MIN, self.exploration_rate)

    def target_train(self):
        weights = self.model.get_weights()
        target_weights = self.target_model.get_weights()
        for i in range(len(target_weights)):
            target_weights[i] = weights[i]
        self.target_model.set_weights(target_weights       )

    def create_model(self, observation_space):
        model = Sequential()
        model.add(Dense(24, input_shape=(observation_space,), activation="relu"))
        model.add(Dense(48, activation="relu"))
        model.add(Dense(24, activation="relu"))
        model.add(Dense(self.action_space, activation="linear"))
        model.compile(loss="mse", optimizer=Adam(lr=LEARNING_RATE))
        return model

    def save_model(self, fn):
        self.model.save(fn)

In [0]:
import matplotlib.pyplot as plt

%matplotlib inline

runs=[]
steps=[]
def dqn_sudoku():
    env = SudokuEnv(9)
    observation_space = env.observation_space.shape[0]
    action_space = env.action_space.n
    dqn_solver = DQNSolver(observation_space, action_space)
    run = 0
    trials = 100
    trial_len =2000
    # while True:
    for trial in range(trials):
        #run += 1
        state = env.reset()
        # print(state)
        # print(observation_space)
        state = np.reshape(state, [observation_space, observation_space])
        #step = 0
        # while True:
        for step in range(trial_len):
            # step += 1
            #env.render()
            action = dqn_solver.act(state)
            state_next, reward, terminal, info = env.step(action)
            reward = reward if not terminal else -reward
            state_next = np.reshape(state_next, [observation_space, observation_space])
            dqn_solver.remember(state, action, reward, state_next, terminal)
            dqn_solver.experience_replay()
            if step % 1 == 0:
                dqn_solver.target_train()
            state = state_next
            # print(state)
            # print("step: {}, reward: {}".format(step, reward))
            # print("exploration_rate: {}".format(dqn_solver.exploration_rate))
            
            if terminal:
                print("Run: " + str(run) + ", exploration: " +
                      str(dqn_solver.exploration_rate) + ", score: " + str(step))
                # score_logger.add_score(step, run)
                break

        runs.append(trial)
        steps.append(step)
        # print("trial: {}, reward: {}".format(trial, reward))
        # print("exploration_rate: {}".format(dqn_solver.exploration_rate))
        if step >= 1900:
            print("Failed to complete in trial {}".format(trial))
            if step % 10 == 0:
                dqn_agent.save_model("trial-{}.model".format(trial))
        else:
            print("Completed in {} trials".format(trial))
            dqn_agent.save_model("success.model")
            break



In [0]:
dqn_sudoku()
x = runs
y = steps
plt.plot(x, y, color='blue')
plt.ylabel('steps')
plt.xlabel('trial')
plt.title('DQN Run')
plt.show()
# plt.savefig('DQN_run.png', transparent=True)

