In [30]:
import os
import glob
import ipdb
import random
import tensorflow as tf
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from sudoku import start as ORIGINAL_START, reward_goal, is_complete, is_valid, get_possible_actions, get_sudoku_grid
from tensorflow.keras.callbacks import ModelCheckpoint

In [31]:
def _grid_to_onehot(grid):
    """Return (9,9,10) one-hot with channel 0 = empty, 1..9 values."""
    oh = np.zeros((9, 9, 10), dtype=np.float32)
    for r in range(9):
        for c in range(9):
            v = int(grid[r, c])
            oh[r, c, v] = 1.0
    return oh

def _q_snapshot_to_target(Qsnap):
    """
    Qsnap expected shape >= (9,9,10). Return (9,9,9) with channels for values 1..9.
    """
    return Qsnap[:9, :9, 1:10].astype(np.float32)

def load_snapshots_to_dataset(snapshots_dir='snapshots',
                              max_files=None,
                              max_examples=None,
                              flatten_input=True,
                              normalize_targets=True):
    """
    Read .npz snapshot files saved by the trainer and build MLP-ready X, Y arrays.
    - X: inputs shape (N, 810) if flatten_input else (N,9,9,10)
    - Y: targets shape (N, 729) flattened (9*9*9)
    Looks for files like snapshots/*.npz and also dataset_*.npz created earlier.
    """
    files = sorted(glob.glob(os.path.join(snapshots_dir, '*.npz')))
    if max_files:
        files = files[:max_files]

    X_list = []
    Y_list = []
    total = 0

    for f in files:
        try:
            data = np.load(f, allow_pickle=True)
        except Exception:
            continue

        # prefer pre-built dataset files
        if 'X' in data and 'Y' in data:
            X_block = data['X']  # expected (N,9,9) int grids
            Y_block = data['Y']  # expected (N,9,9,9) Q targets
            for i in range(len(X_block)):
                grid = X_block[i]
                qtarget = Y_block[i]
                oh = _grid_to_onehot(grid)
                if flatten_input:
                    X_list.append(oh.reshape(-1))
                else:
                    X_list.append(oh)
                Y_list.append(qtarget.reshape(-1))
                total += 1
                if max_examples and total >= max_examples:
                    break
            if max_examples and total >= max_examples:
                break
            continue

        # fallback: per-episode snapshot with 'states' and 'Q'
        if 'states' in data and 'Q' in data:
            states = data['states']   # (T,9,9)
            Qsnap = data['Q']         # full Q snapshot
            for i in range(len(states)):
                grid = states[i]
                qtarget = _q_snapshot_to_target(Qsnap)
                oh = _grid_to_onehot(grid)
                if flatten_input:
                    X_list.append(oh.reshape(-1))
                else:
                    X_list.append(oh)
                Y_list.append(qtarget.reshape(-1))
                total += 1
                if max_examples and total >= max_examples:
                    break
            if max_examples and total >= max_examples:
                break

    if len(X_list) == 0:
        return None, None

    X = np.stack(X_list).astype(np.float32)
    Y = np.stack(Y_list).astype(np.float32)

    if normalize_targets:
        # scale targets to roughly [-1,1] by dividing by reward_goal (non-zero)
        scale = float(max(1.0, abs(reward_goal)))
        Y = Y / scale

    return X, Y

def build_mlp(input_dim=810, output_dim=729, hidden_sizes=(1024, 512), lr=1e-3, dropout=0.2):
    """
    Build a simple feed-forward network using TensorFlow Keras.
    Returns compiled model.
    """
    inputs = tf.keras.Input(shape=(input_dim,), name='sudoku_input')
    x = inputs
    for i, h in enumerate(hidden_sizes):
        x = tf.keras.layers.Dense(h, activation='swish', name=f'dense_{i}')(x)
        if dropout and dropout > 0.0:
            x = tf.keras.layers.Dropout(dropout, name=f'dropout_{i}')(x)
    outputs = tf.keras.layers.Dense(output_dim, activation='linear', name='q_values')(x)
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer=tf.keras.optimizers.Adam(lr),
                  loss='mse',
                  metrics=['mae'])
    return model

def train_mlp_from_snapshots(model_save='sudoku_mlp.keras',
                             snapshots_dir="snapshots/strategy2",
                             batch_size=128,
                             epochs=20,
                             val_split=0.1,
                             max_files=None,
                             max_examples=None):
    """
    Load dataset from snapshots, train MLP, save model to file.
    """
    model = build_mlp()
    cb = [
        tf.keras.callbacks.ModelCheckpoint(model_save, save_best_only=True, monitor='val_loss', verbose=1),
        tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, cooldown=2, verbose=1),
        tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True)
    ]
    for epoch in range(epochs):
        print("-----------------------------------")
        print(f"Epoch: {epoch}")
        print("-----------------------------------")
        for entry in os.listdir(snapshots_dir):
            snapshot = os.path.join(snapshots_dir, entry)
            print(f"Processing: {snapshot}")
            X, Y = load_snapshots_to_dataset(snapshot, max_files=max_files, max_examples=max_examples)
            if X is None:
                raise RuntimeError("No training data found in snapshots directory")
            
            history = model.fit(X, Y,
                                batch_size=batch_size,
                                epochs=1,
                                validation_split=val_split,
                                callbacks=cb,
                                verbose=2)
    model.save(model_save)
    return model, history








In [32]:
def predict_best_action_from_model(model, grid, legal_only=True, return_q=False):
    """
    Given a trained model and a grid (9x9), return best action (r,c,v).
    If legal_only=True restricts choices to get_possible_actions(grid).
    """
    oh = _grid_to_onehot(grid).reshape(1, -1).astype(np.float32)
    preds = model.predict(oh, verbose=0)[0]  # flattened 729
    # unnormalize if training used normalization
    preds = preds * float(max(1.0, abs(reward_goal)))

    # reshape to (9,9,9)
    qvals = preds.reshape(9, 9, 9)  # channels value=1..9 -> index 0..8

    best = None
    best_val = -float('inf')
    possible = get_possible_actions(grid) if legal_only else [(r, c, v) for r in range(9) for c in range(9) for v in range(1,10)]

    for (r, c, v) in possible:
        val = qvals[r, c, v-1]
        if val > best_val:
            best_val = val
            best = (r, c, v)

    if return_q:
        return best, best_val
    return best

def plot_sudoku_grid(grid, givens=None, filename=None, figsize=6):
    """
    Render a single 9x9 Sudoku grid using matplotlib.
    - grid: 9x9 ndarray of ints (0 = empty)
    - givens: optional 9x9 ndarray marking original clues (non-zero) to render in bold
              if None and ORIGINAL_START is available, that will be used.
    - filename: if provided the image is saved to this path, otherwise returns the Figure.
    Returns: matplotlib.Figure if filename is None, else the filename string.
    """
    grid = np.asarray(grid, dtype=int)
    assert grid.shape == (9, 9), "grid must be shape (9,9)"

    if givens is not None:
        givens = np.asarray(givens) != 0
    else:
        givens = np.zeros((9, 9), dtype=bool)

    fig, ax = plt.subplots(figsize=(figsize, figsize))
    ax.set_xlim(0, 9)
    ax.set_ylim(0, 9)
    ax.set_aspect('equal')

    # draw cell backgrounds and numbers
    count = 0
    for r in range(9):
        for c in range(9):
            val = int(grid[r, c])
            face = '#f2f2f2' if givens[r, c] else 'white'
            ax.add_patch(plt.Rectangle((c, r), 1, 1, facecolor=face, edgecolor='none'))
            if val != 0:
                color = 'black' if givens[r, c] else '#1f77b4'
                weight = 'bold' if givens[r, c] else 'normal'
                ax.text(c + 0.5, r + 0.5, str(val), ha='center', va='center',
                        fontsize=18, color=color, fontweight=weight)
            else:
                count += 1
    print(f"Accuracy: {((81-count)/81) * 100}")

    # grid lines: thick every 3 cells
    for i in range(10):
        lw = 2.5 if i % 3 == 0 else 0.8
        ax.plot([0, 9], [i, i], color='k', linewidth=lw)
        ax.plot([i, i], [0, 9], color='k', linewidth=lw)

    ax.invert_yaxis()
    ax.set_xticks([])
    ax.set_yticks([])
    plt.tight_layout()

    if filename:
        plt.savefig(filename, dpi=200)
        plt.close(fig)
        return filename
    return fig

    
def fill_grid_greedy_with_model(model, initial_grid, max_steps=200):
    """
    Greedily apply model's best legal action until complete or stuck.
    Returns list of grids (states).
    """
    grid = initial_grid.copy().astype(int)
    path = [grid.copy()]
    for _ in range(max_steps):
        if is_complete(grid):
            break
        action = predict_best_action_from_model(model, grid, legal_only=True)
        if action is None:
            break
        r, c, v = action
        grid[r, c] = v
        # if invalid (shouldn't happen if we used legal_only) break
        if not is_valid(grid):
            grid[r, c] = 0
            break
        path.append(grid.copy())
    return path
    
def load_model_and_solve(model_path,
                         grid,
                         save_image='sudoku_solution_nn.png',
                         return_path=False):
    """
    Load a saved keras model, run greedy inference to fill the sudoku,
    save the final grid image (if save_image provided) and return the path list.
    - initial_grid: 9x9 ndarray. If None and ORIGINAL_START is available it will be used.
    - return_path: if True return the sequence of grids, otherwise return None.
    """
    model = tf.keras.models.load_model(model_path)
    path = fill_grid_greedy_with_model(model, grid, max_steps=200)

    final_grid = path[-1]
    plot_sudoku_grid(final_grid, givens=grid, filename=save_image)

    if return_path:
        return path
    return None

In [33]:
model, history = train_mlp_from_snapshots(model_save='sudoku_mlp_2_layers_swish.keras', snapshots_dir="snapshots/strategy2")

-----------------------------------
Epoch: 0
-----------------------------------
Processing: snapshots/strategy2/5

Epoch 1: val_loss improved from None to 0.00000, saving model to sudoku_mlp_2_layers_swish.keras
3578/3578 - 17s - 5ms/step - loss: 4.6278e-05 - mae: 0.0033 - val_loss: 4.3442e-06 - val_mae: 0.0010 - learning_rate: 1.0000e-03
Processing: snapshots/strategy2/3

Epoch 1: val_loss improved from 0.00000 to 0.00000, saving model to sudoku_mlp_2_layers_swish.keras
3539/3539 - 16s - 5ms/step - loss: 3.2401e-04 - mae: 0.0064 - val_loss: 4.1126e-06 - val_mae: 7.5274e-04 - learning_rate: 1.0000e-03
Processing: snapshots/strategy2/2

Epoch 1: val_loss did not improve from 0.00000
3325/3325 - 15s - 5ms/step - loss: 1.1259e-05 - mae: 0.0014 - val_loss: 7.1795e-06 - val_mae: 9.3280e-04 - learning_rate: 1.0000e-03
Processing: snapshots/strategy2/4

Epoch 1: val_loss did not improve from 0.00000
3456/3456 - 15s - 4ms/step - loss: 1.3238e-05 - mae: 0.0013 - val_loss: 1.4123e-05 - val_mae:

In [37]:
import random
sample = random.randint(1000, 3000000)
#sample = 94
print(f"Chosen sample: {sample}")
counter = -1
with open("sudoku_extreme_dataset/train.csv") as fh:
    for line in fh:
        if counter == sample:
            question = line.split(",")[1]
            grid = np.array(get_sudoku_grid(question))
            snapshots_dir = f"snapshots/{sample}"
            break
        counter += 1
    else:
        raise Exception("Invalid sample number")
load_model_and_solve('sudoku_mlp_2_layers_swish.keras', grid, save_image='sudoku_solution_nn.png')


Chosen sample: 1153086
Accuracy: 81.48148148148148
