As attempted in [solver.ipynb](solver.ipynb), informed search can perform relatively well. I must say, this is quite an achievement after CS2040S and CS2109S. However, it takes forever to solve some configurations. Hence, in this notebook, I will be attempting a neural network solution at finding the shortest solution to Rubik's cube.

## Rubik's cube convention

To recap, Rubik's cube has the following index convention:

![Rubik indices](images/face-indices.png)

Given a state in numpy array of dimension 6 x 3 x 3, the index corresponds to the face indicated above. The central face in the image is the front face.
Each block is then a 3 x 3 array that corresponds to the cells.

Each cell colour is indicated by an integer between 0 to 5 (inclusive).

The correspondences between indices and faces are:
* 0: face up (U)
* 1: face left (L)
* 2: face front (F)
* 3: face right (R)
* 4: face down (D)
* 5: face back (B)

Each action is one of `"L"`, `"L'"`, `"R"`, `"R'"`, `"F"`, `"F'"`, `"B"`, `"B'"`, `"U"`, `"U'"`, `"D"`, `"D'"`, where action without apostrophe `'` is clockwise and action with apostrophe `'` is anticlockwise. Clockwise direction is determined by the rotation of the face.

## Neural network

The neural network is designed as such: given a Rubik's cube position, return the best move for that position. Hence the neural network will take 54 inputs, and return 12 outputs. The 54 inputs represents the cube position, while the 12 outputs represents the normalized probabilities of the 12 possible moves.

We will be reusing the informed search algorithm, for data generation. For each cube position, we do an informed search to find a solution. If the solution is not found within the time limit, we take the first step from the model solution. Otherwise, we take the first step from the more efficient solution, between the one given by informed search and the model solution.

The informed search we would use to generate inputs would be:

* Hashmap
* Heuristic: uniformity - coeff * steps_taken
* Coefficient: 3.0

In [1]:
# import necessary libraries for this notebook

from copy import deepcopy
import numpy as np
from sortedcontainers import SortedSet

from time import time
import matplotlib.pyplot as plt
import random

import pickle
import torch
import torch.nn as nn
import os
import math

In [2]:
# define the utils

class Rubik:
    actions = ["L", "L'", "R", "R'", "F", "F'", "B", "B'", "U", "U'", "D", "D'"]
    
    def apply_action(state, action):
        """Applies the action on the state and returns a new state.
        Both `state` and returned state must be numpy array of 6 x 3 x 3 that represents a state of a Rubik's cube.
        Action must be one of "L", "L'", "R", "R'", "F", "F'", "B", "B'", "U", "U'", "D", "D'".

        First, deepcopy the initial state, and then call the respective functions that mutates the copied state.
        Then, return the copied state.
        """

        state = deepcopy(state)
        match action:
            case "L":
                return Rubik.rotate_left_clockwise(state)
            case "L'":
                return Rubik.rotate_left_anticlockwise(state)
            case "R":
                return Rubik.rotate_right_clockwise(state)
            case "R'":
                return Rubik.rotate_right_anticlockwise(state)
            case "F":
                return Rubik.rotate_front_clockwise(state)
            case "F'":
                return Rubik.rotate_front_anticlockwise(state)
            case "B":
                return Rubik.rotate_back_clockwise(state)
            case "B'":
                return Rubik.rotate_back_anticlockwise(state)
            case "U":
                return Rubik.rotate_up_clockwise(state)
            case "U'":
                return Rubik.rotate_up_anticlockwise(state)
            case "D":
                return Rubik.rotate_down_clockwise(state)
            case "D'":
                return Rubik.rotate_down_anticlockwise(state)
            case _:
                raise ValueError(f"Unrecognised action {action}")

    def rotate_face_clockwise(face):
        temp_corner = face[0][0]
        face[0][0] = face[2][0]
        face[2][0] = face[2][2]
        face[2][2] = face[0][2]
        face[0][2] = temp_corner

        temp_side = face[0][1]
        face[0][1] = face[1][0]
        face[1][0] = face[2][1]
        face[2][1] = face[1][2]
        face[1][2] = temp_side

    def rotate_face_anticlockwise(face):
        temp_corner = face[0][0]
        face[0][0] = face[0][2]
        face[0][2] = face[2][2]
        face[2][2] = face[2][0]
        face[2][0] = temp_corner

        temp_side = face[0][1]
        face[0][1] = face[1][2]
        face[1][2] = face[2][1]
        face[2][1] = face[1][0]
        face[1][0] = temp_side

    def rotate_left_clockwise(state):
        temp1, temp2, temp3 = state[0, :, 0]
        state[0, :, 0] = state[5, :, 0]
        state[5, :, 0] = state[4, :, 0]
        state[4, :, 0] = state[2, :, 0]
        state[2, :, 0] = temp1, temp2, temp3
        Rubik.rotate_face_clockwise(state[1])
        return state

    def rotate_left_anticlockwise(state):
        temp1, temp2, temp3 = state[0, :, 0]
        state[0, :, 0] = state[2, :, 0]
        state[2, :, 0] = state[4, :, 0]
        state[4, :, 0] = state[5, :, 0]
        state[5, :, 0] = temp1, temp2, temp3
        Rubik.rotate_face_anticlockwise(state[1])
        return state

    def rotate_right_clockwise(state):
        temp1, temp2, temp3 = state[0, :, 2]
        state[0, :, 2] = state[2, :, 2]
        state[2, :, 2] = state[4, :, 2]
        state[4, :, 2] = state[5, :, 2]
        state[5, :, 2] = temp1, temp2, temp3
        Rubik.rotate_face_clockwise(state[3])
        return state

    def rotate_right_anticlockwise(state):
        temp1, temp2, temp3 = state[0, :, 2]
        state[0, :, 2] = state[5, :, 2]
        state[5, :, 2] = state[4, :, 2]
        state[4, :, 2] = state[2, :, 2]
        state[2, :, 2] = temp1, temp2, temp3
        Rubik.rotate_face_anticlockwise(state[3])
        return state

    def rotate_front_clockwise(state):
        temp1, temp2, temp3 = state[0, 2, :]
        state[0, 2, :] = state[1, :, 2][::-1]
        state[1, :, 2][::-1] = state[4, 0, :][::-1]
        state[4, 0, :][::-1] = state[3, :, 0]
        state[3, :, 0] = temp1, temp2, temp3
        Rubik.rotate_face_clockwise(state[2])
        return state

    def rotate_front_anticlockwise(state):
        temp1, temp2, temp3 = state[0, 2, :]
        state[0, 2, :] = state[3, :, 0]
        state[3, :, 0] = state[4, 0, :][::-1]
        state[4, 0, :][::-1] = state[1, :, 2][::-1]
        state[1, :, 2][::-1] = temp1, temp2, temp3
        Rubik.rotate_face_anticlockwise(state[2])
        return state

    def rotate_back_clockwise(state):
        temp1, temp2, temp3 = state[0, 0, :]
        state[0, 0, :] = state[3, :, 2]
        state[3, :, 2] = state[4, 2, :][::-1]
        state[4, 2, :][::-1] = state[1, :, 0][::-1]
        state[1, :, 0][::-1] = temp1, temp2, temp3
        Rubik.rotate_face_clockwise(state[5])
        return state

    def rotate_back_anticlockwise(state):
        temp1, temp2, temp3 = state[0, 0, :]
        state[0, 0, :] = state[1, :, 0][::-1]
        state[1, :, 0][::-1] = state[4, 2, :][::-1]
        state[4, 2, :][::-1] = state[3, :, 2]
        state[3, :, 2] = temp1, temp2, temp3
        Rubik.rotate_face_anticlockwise(state[5])
        return state

    def rotate_up_clockwise(state):
        temp1, temp2, temp3 = state[1, 0, :]
        state[1, 0, :] = state[2, 0, :]
        state[2, 0, :] = state[3, 0, :]
        state[3, 0, :] = state[5, 2, :][::-1]
        state[5, 2, :][::-1] = temp1, temp2, temp3
        Rubik.rotate_face_clockwise(state[0])
        return state

    def rotate_up_anticlockwise(state):
        temp1, temp2, temp3 = state[1, 0, :]
        state[1, 0, :] = state[5, 2, :][::-1]
        state[5, 2, :][::-1] = state[3, 0, :]
        state[3, 0, :] = state[2, 0, :]
        state[2, 0, :] = temp1, temp2, temp3
        Rubik.rotate_face_anticlockwise(state[0])
        return state

    def rotate_down_clockwise(state):
        temp1, temp2, temp3 = state[1, 2, :]
        state[1, 2, :] = state[5, 0, :][::-1]
        state[5, 0, :][::-1] = state[3, 2, :]
        state[3, 2, :] = state[2, 2, :]
        state[2, 2, :] = temp1, temp2, temp3
        Rubik.rotate_face_clockwise(state[4])
        return state

    def rotate_down_anticlockwise(state):
        temp1, temp2, temp3 = state[1, 2, :]
        state[1, 2, :] = state[2, 2, :]
        state[2, 2, :] = state[3, 2, :]
        state[3, 2, :] = state[5, 0, :][::-1]
        state[5, 0, :][::-1] = temp1, temp2, temp3
        Rubik.rotate_face_anticlockwise(state[4])
        return state

    def turn_tuple(state):
        return tuple(state.reshape(54))

    def is_terminal(state):
        centers = state[:, 1, 1]
        return np.all(state == centers[:, None][:, None])

    def turn_numpy(state):
        return np.array(state).reshape((6, 3, 3))

def heuristic(state, steps_taken):
    def face_heuristic(face):
        count = 0
        for i in range(3):
            for j in range(3):
                if i < 2 and face[i][j] == face[i + 1][j]:
                    count += 1
                if j < 2 and face[i][j] == face[i][j + 1]:
                    count += 1
        return count
    return sum([face_heuristic(face) for face in state]) - 3.0 * steps_taken

class Node:
    def __init__(self, state, steps_taken=0, parent=None, action=None):
        """Initialises a node.
        `state`: A numpy array of dimension 6 x 3 x 3
        `steps_taken`: The number of steps taken from the initial state.
        `parent`: An instance of `Node` that contains the previous state.
        `action`: The action that transitions the previous state to this state.
        """
        self.state = Rubik.turn_tuple(state)
        self.steps_taken = steps_taken
        self.parent = parent
        self.action = action
        self.h_value = heuristic(state, steps_taken)

    def __lt__(self, node):
        """Determines the priority of this node compared to another node.
        This determines the position of this node in the search frontier.
        """
        return self.h_value < node.h_value

    def __hash__(self):
        """Determines the hash value of this node.
        This enables the hashing algorithm of the set of explored states.
        Nodes with the same states must have the same hash values.
        """
        return hash(self.state)

    def __eq__(self, other):
        """Determines the equality of two nodes.
        This enables the hashing algorithm of the set of explored states.
        Since we do not want to re-explore the same states, any two nodes
        with the same state must be equal, regardless of the values of other fields.
        """
        return self.state == other.state
        
    def get_numpy_state(self):
        """Get the numpy array that represents the state contained in this node.
        The numpy array must be of dimension 6 x 3 x 3.
        """
        return Rubik.turn_numpy(self.state)

def reverse_move(move):
    if len(move) == 2:
        return move[0]
    else:
        return move + "'"

def get_terminal_state():
    return np.array([
        [[i for k in range(3)] for j in range(3)] for i in range(1, 7)
    ])

def generate_puzzle(scramble):    
    puzzle = get_terminal_state()
    for action in scramble:
        puzzle = Rubik.apply_action(puzzle, action)
    scramble.reverse()
    solution = [reverse_move(move) for move in scramble]
    return puzzle, solution

def solve(state, time_limit):
    step_limit = 26
    start_time = time()
    first_node = Node(state)
    frontier = SortedSet([first_node])
    explored = {
        first_node: first_node.steps_taken,
    }

    while len(frontier) != 0:
        curr_node = frontier.pop()
        curr_state = curr_node.get_numpy_state()
        
        for action in Rubik.actions:
            child_state = Rubik.apply_action(curr_state, action)
            if Rubik.is_terminal(child_state):
                actions = [action]
                while curr_node.parent is not None:
                    actions.append(curr_node.action)
                    curr_node = curr_node.parent
                actions.reverse()
                return actions, time() - start_time

            if curr_node.steps_taken == step_limit - 1:
                continue

            child_node = Node(
                child_state,
                steps_taken=curr_node.steps_taken + 1,
                parent=curr_node,
                action=action,
            )
            if child_node in explored:
                if explored[child_node] > child_node.steps_taken:
                    explored[child_node] = child_node.steps_taken
                    frontier.add(child_node)
            else:
                explored[child_node] = child_node.steps_taken
                frontier.add(child_node)

        if time() - start_time > time_limit:
            return None, time_limit

## Define the data generator

The data generator would randomly generate a scramble, and generate the puzzle and the model solution for it.
The puzzle would then go through the informed search to see if there is a more efficient solution.
A few things to take note:
1. The scramble generator must be such that it does not revert its previous action. This means, for each current move, we prevent the next move from being a reverse of it. There are many more ways that the next move can revert the current moves, but we can only go so far to prevent some of such cases.
2. The final input generator would compare the solution returned by informed search and the model solution.

In [3]:
def generate_scramble(num_of_moves):
    scramble = []
    current_reverse_move = None
    for _ in range(num_of_moves):
        move = random.choice(Rubik.actions)
        while move == current_reverse_move:
            move = random.choice(Rubik.actions)
        current_reverse_move = reverse_move(move)
        scramble.append(move)
    return scramble

In [6]:
test_5_move_scramble = generate_scramble(5)
test_10_move_scramble = generate_scramble(10)
test_15_move_scramble = generate_scramble(15)

print(test_5_move_scramble)
print(test_10_move_scramble)
print(test_15_move_scramble)

assert all([move in Rubik.actions for move in test_5_move_scramble]) and len(test_5_move_scramble) == 5
assert all([move in Rubik.actions for move in test_10_move_scramble]) and len(test_10_move_scramble) == 10
assert all([move in Rubik.actions for move in test_15_move_scramble]) and len(test_15_move_scramble) == 15

['U', "F'", "B'", "U'", 'D']
["U'", "R'", "U'", 'D', "L'", 'F', 'R', "L'", 'D', 'F']
["B'", "R'", 'F', 'R', 'B', 'L', 'R', "B'", 'L', "F'", "D'", 'R', "D'", 'L', 'F']


In [4]:
def generate_input(num_of_moves, time_limit, debug=False):
    scramble = generate_scramble(num_of_moves)
    puzzle, model_solution = generate_puzzle(scramble)
    if debug:
        print(f"Generating solution for {model_solution}")
    puzzle_tuple = Rubik.turn_tuple(puzzle)
    
    solution, time_spent = solve(puzzle, time_limit)
    if debug:
        print(f"Solution found: {solution}, in {time_spent}")
    if solution is None:
        return puzzle_tuple, model_solution[0]
    elif len(solution) < len(model_solution):
        return puzzle_tuple, solution[0]
    else:
        return puzzle_tuple, model_solution[0]

In [8]:
test_time_limit = 5
print(generate_input(5, test_time_limit, debug=True))
print(generate_input(10, test_time_limit, debug=True))
print(generate_input(15, test_time_limit, debug=True))

Generating solution for ['D', "B'", 'R', 'U', 'U']
Solution found: ['D', "B'", 'R', "U'", "U'"], in 0.008527994155883789
((3, 3, 3, 0, 0, 5, 0, 0, 2, 5, 3, 3, 0, 1, 1, 2, 2, 0, 5, 5, 0, 2, 2, 0, 1, 3, 4, 1, 3, 2, 1, 3, 4, 5, 5, 2, 5, 2, 1, 4, 4, 1, 4, 4, 3, 1, 1, 0, 2, 5, 5, 4, 4, 4), 'D')
Generating solution for ['U', 'B', 'B', 'F', "L'", "F'", 'B', "F'", 'R', "L'"]
Solution found: ['U', "B'", 'F', "B'", "L'", 'B', "F'", "F'", 'R', "L'"], in 2.3437442779541016
((0, 5, 1, 3, 0, 1, 3, 5, 1, 5, 4, 4, 0, 1, 4, 5, 3, 4, 5, 4, 2, 2, 2, 2, 1, 5, 2, 0, 0, 0, 0, 3, 1, 0, 3, 2, 2, 1, 3, 0, 4, 2, 1, 1, 4, 4, 4, 3, 5, 5, 2, 3, 3, 5), 'U')
Generating solution for ["F'", 'B', 'L', "R'", 'F', 'L', 'B', "L'", 'R', 'R', "F'", 'B', 'B', 'L', "R'"]
Solution found: None, in 5
((3, 1, 2, 4, 0, 4, 4, 0, 1, 0, 1, 5, 0, 1, 4, 2, 3, 4, 3, 5, 5, 2, 2, 0, 2, 2, 1, 4, 3, 0, 1, 3, 1, 2, 4, 0, 3, 3, 4, 0, 4, 5, 0, 3, 5, 3, 5, 1, 2, 5, 5, 5, 2, 1), "F'")


In [5]:
def generate_data(initial_data={}, rounds=10, time_limit=5, debug=False):
    action_map = { "L": 0, "L'": 1, "R": 2, "R'": 3, "F": 4, "F'": 5, "B": 6, "B'": 7, "U": 8, "U'": 9, "D": 10, "D'": 11 }
    data = initial_data
    for _ in range(rounds):
        num_of_moves = random.randint(1, 32)
        puzzle, action = generate_input(num_of_moves, time_limit=time_limit, debug=debug)
        data[puzzle] = action_map[action]
    return data

In [18]:
print(generate_data(debug=True))

Generating solution for ["D'", "D'", 'F', 'R', 'U', 'D', 'R', "L'", "B'", "U'", "B'", "L'", "D'", 'F', "R'", 'B', 'D', 'D', "L'", 'U', 'B', "F'", 'L', "B'", 'F', 'D', 'B', "R'"]
Solution found: None, in 5
Generating solution for ['U', "D'", "U'", 'L', 'D', 'L', "U'", 'B', "F'", 'D', "R'", 'U', 'D', 'R', 'R', 'D', 'R', "U'", "R'", "B'", 'D', 'F', 'D', 'U', 'L', 'D', 'D', "L'", "B'", "L'"]
Solution found: None, in 5
Generating solution for ['L', 'D', 'B']
Solution found: ['L', 'D', 'B'], in 0.0039522647857666016
Generating solution for ['B', 'R', "B'", "U'", 'F', 'R', 'D', 'L', 'L', "U'", 'L', 'U', "B'", 'U', "D'"]
Solution found: None, in 5
Generating solution for ["U'", "D'", "L'", "F'", "D'", "L'", 'U', "L'", "R'", "L'", 'F', "R'", 'L', 'F']
Solution found: None, in 5
Generating solution for ['B', 'F', "U'", "D'", "B'", "L'", 'F', 'R', 'F', "D'", "F'", 'B', "D'", 'L', 'L', "F'", 'R', "U'", "L'", 'R', 'L', 'L', "R'", "R'", 'U', 'D']


KeyboardInterrupt: 

In [6]:
def generate_data_main(filename="data/data.pkl", rounds=10, time_limit=5, debug=True):
    try:
        with open(filename, "rb") as f:
            data = pickle.load(f)
    except FileNotFoundError:
        data = {}

    print(f"Initial data size: {len(data)}")
    if debug:
        print(f"initial data: {data}")

    data = generate_data(initial_data=data, rounds=rounds, time_limit=time_limit, debug=debug)
    print(f"Final data size: {len(data)}")
    with open(filename, "wb") as f:
        pickle.dump(data, f)

In [7]:
#generate_data_main(rounds=5000, debug=False)

In [8]:
def generate_input_no_informed_search(num_of_moves, debug=True):
    scramble = generate_scramble(num_of_moves)
    puzzle, model_solution = generate_puzzle(scramble)
    puzzle_tuple = Rubik.turn_tuple(puzzle)

    return puzzle_tuple, model_solution[0]

In [16]:
def generate_data_no_informed_search(initial_data={}, rounds=10, debug=True):
    action_map = { "L": 0, "L'": 1, "R": 2, "R'": 3, "F": 4, "F'": 5, "B": 6, "B'": 7, "U": 8, "U'": 9, "D": 10, "D'": 11 }
    data = initial_data
    for i in range(rounds):
        num_of_moves = random.randint(1, 26) # requires at most 26 moves to solve
        puzzle, action = generate_input_no_informed_search(num_of_moves, debug)
        data[puzzle] = (action_map[action], num_of_moves)
        if debug and i % 10000 == 0:
            print(f'{i} data points generated') 
    return data

In [17]:
def generate_data_no_informed_search_main(filename="data/data_no_informed_search.pkl", rounds=10, debug=True):
    try:
        with open(filename, "rb") as f:
            data = pickle.load(f)
    except FileNotFoundError:
        data = {}

    print(f"Initial data size: {len(data)}")
    if debug:
        print(f"initial data: {data}")

    data = generate_data_no_informed_search(initial_data=data, rounds=rounds, debug=debug)
    print(f"Final data size: {len(data)}")
    with open(filename, "wb") as f:
        pickle.dump(data, f)

In [18]:
generate_data_no_informed_search_main(rounds=300000)

Initial data size: 0
initial data: {}
0 data points generated
10000 data points generated
20000 data points generated
30000 data points generated
40000 data points generated
50000 data points generated
60000 data points generated
70000 data points generated
80000 data points generated
90000 data points generated
100000 data points generated
110000 data points generated
120000 data points generated
130000 data points generated
140000 data points generated
150000 data points generated
160000 data points generated
170000 data points generated
180000 data points generated
190000 data points generated
200000 data points generated
210000 data points generated
220000 data points generated
230000 data points generated
240000 data points generated
250000 data points generated
260000 data points generated
270000 data points generated
280000 data points generated
290000 data points generated
Final data size: 257664


## The neural net

We define the class representing the neural network. This class is to be trained on.

The architecture has 3 layers, each with ReLU activation function:

* 54 -> 512
* 512 -> 128
* 128 -> 12

In [11]:
class NeuralNetStep(nn.Module):
    def __init__(self):
        super().__init__()
        self.output = nn.Sequential(
            nn.Linear(54, 4096),
            nn.LeakyReLU(0.1),
            nn.Linear(4096, 1024),
            nn.LeakyReLU(0.1),
            nn.Linear(1024, 12),
        )

    def forward(self, x):
        return self.output(x)

## Load training data

We load the training data from the generated file in the previous step. We then split them into train data and test data.

In [12]:
def load_data(filename="data/data.pkl", test=0.2):
    """Loads training data from the indicated file, with a portion dedicated for test data
    The returned value is a tuple of (x_train, y_train, x_test, y_test), each is a torch tensor.
    """
    with open(filename, "rb") as f:
        data = pickle.load(f)
    print(f"Data size: {len(data)}")
    
    test_size = round(test * len(data))
    train_size = len(data) - test_size
    x_train = []
    x_test = []
    y_train = []
    y_test = []
    for puzzle, move in data.items():
        p = random.random()
        if p < test_size / (test_size + train_size):
            x_test.append(puzzle)
            y_test.append(move)
            test_size -= 1
        else:
            x_train.append(puzzle)
            y_train.append(move)
            train_size -= 1

    return (
        torch.tensor(x_train, dtype=torch.float),
        torch.tensor(y_train, dtype=torch.long),
        torch.tensor(x_test, dtype=torch.float),
        torch.tensor(y_test, dtype=torch.long),
    )

In [91]:
test_x_train, test_y_train, test_x_test, test_y_test = load_data(test=0.2)
with open("data/data.pkl", "rb") as test_f:
    test_data = pickle.load(test_f)
expected_test_size = round(len(test_data) * 0.2)
expected_train_size = len(test_data) - expected_test_size
assert test_x_train.size() == torch.Size([expected_train_size, 54])
assert test_y_train.size() == torch.Size([expected_train_size])
assert test_x_test.size() == torch.Size([expected_test_size, 54])
assert test_y_test.size() == torch.Size([expected_test_size])

Data size: 46108


In [20]:
def load_data_with_num_of_moves(filename="data/data_no_informed_search.pkl", test=0.2):
    """Loads training data from the indicated file, with a portion dedicated for test data
    The returned value is a tuple of (x_train, y_train, x_test, y_test, num_of_moves_test), each is a torch tensor.
    """
    with open(filename, "rb") as f:
        data = pickle.load(f)
    print(f"Data size: {len(data)}")
    
    test_size = round(test * len(data))
    train_size = len(data) - test_size
    x_train = []
    x_test = []
    y_train = []
    y_test = []
    num_of_moves_test = []
    for puzzle, solution in data.items():
        # solution is a (next_move, num_of_moves) tuple
        p = random.random()
        if p < test_size / (test_size + train_size):
            x_test.append(puzzle)
            y_test.append(solution[0])
            num_of_moves_test.append(solution[1])
            test_size -= 1
        else:
            x_train.append(puzzle)
            y_train.append(solution[0])
            train_size -= 1

    return (
        torch.tensor(x_train, dtype=torch.float),
        torch.tensor(y_train, dtype=torch.long),
        torch.tensor(x_test, dtype=torch.float),
        torch.tensor(y_test, dtype=torch.long),
        torch.tensor(num_of_moves_test)
    )

In [60]:
test_x_train, test_y_train, test_x_test, test_y_test, test_num_of_moves_test = load_data_with_num_of_moves(test=0.2)
with open("data/data_no_informed_search.pkl", "rb") as test_f:
    test_data = pickle.load(test_f)
expected_test_size = round(len(test_data) * 0.2)
expected_train_size = len(test_data) - expected_test_size
assert test_x_train.size() == torch.Size([expected_train_size, 54])
assert test_y_train.size() == torch.Size([expected_train_size])
assert test_x_test.size() == torch.Size([expected_test_size, 54])
assert test_y_test.size() == torch.Size([expected_test_size])
assert test_num_of_moves_test.size() == torch.Size([expected_test_size])

Data size: 856967


## Training loop

We define a training loop that takes in two torch tensors,
* `x_train`: The x_data torch tensor of size `(train_size, 54)`
* `y_train`: The y_data torch tensor of size `(train_size)`
* `epochs`: Number of training iterations

The training loop then returns a neural network model that is trained on the given data.

In [13]:
def train_model(x_train, y_train, initial_model=None, initial_optimiser=None, epochs=100, batch_size=-1):
    model = initial_model if initial_model else NeuralNetStep()
    optimiser = initial_optimiser if initial_optimiser else torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-6)
    loss_fn = nn.CrossEntropyLoss()

    if batch_size <= 0:
        for i in range(epochs):
            optimiser.zero_grad()
            pred = model.forward(x_train)
            loss = loss_fn(pred, y_train)
            loss.backward()
            optimiser.step()
            if i % 10 == 0:
                print(f"Completed epoch {i} with previous loss {loss}")
    else:
        batch_n = 0
        total_batch_num = math.ceil(len(x_train) / batch_size)
        while batch_n < total_batch_num:
            features = x_train[batch_n * batch_size : (batch_n + 1) * batch_size]
            targets = y_train[batch_n * batch_size : (batch_n + 1) * batch_size]
            
            for i in range(epochs):        
                optimiser.zero_grad()
                pred = model.forward(features)
                loss = loss_fn(pred, targets)
                loss.backward()
                optimiser.step()
                if i == 0:
                    print(f'Training batch {batch_n+1}/{total_batch_num}, initial loss of this batch is {loss}')
            batch_n += 1

    return model

In [120]:
x_train, y_train, x_test, y_test = load_data()
model = model
#model = train_model(x_train, y_train, epochs=100)

Data size: 46108
Completed epoch 0 with previous loss 2.489006519317627
Completed epoch 10 with previous loss 2.4556515216827393
Completed epoch 20 with previous loss 2.4311633110046387
Completed epoch 30 with previous loss 2.4154109954833984
Completed epoch 40 with previous loss 2.3957443237304688
Completed epoch 50 with previous loss 2.3731448650360107
Completed epoch 60 with previous loss 2.343139410018921
Completed epoch 70 with previous loss 2.3102376461029053
Completed epoch 80 with previous loss 2.2797844409942627
Completed epoch 90 with previous loss 2.249770402908325


In [26]:
x_train_2, y_train_2, x_test_2, y_test_2, num_of_moves_test_2 = load_data_with_num_of_moves()
#model_2 = train_model(x_train_2, y_train_2, epochs=300)
model_2_alt = train_model(x_train_2, y_train_2, epochs=100)


Data size: 257664
Completed epoch 0 with previous loss 2.541945219039917
Completed epoch 10 with previous loss 2.494835615158081
Completed epoch 20 with previous loss 2.4463489055633545
Completed epoch 30 with previous loss 2.4417881965637207
Completed epoch 40 with previous loss 2.432758331298828
Completed epoch 50 with previous loss 2.4268739223480225
Completed epoch 60 with previous loss 2.420896530151367
Completed epoch 70 with previous loss 2.414188861846924
Completed epoch 80 with previous loss 2.4067935943603516
Completed epoch 90 with previous loss 2.39858078956604


## Evaluate the model

We evaluate the model using the test data, to determine if we need further training.

The evaluate function takes in the following parameters:
* `model`: The trained model
* `x_test`: The test data
* `y_test`: The test label

The returned value is a float between `0` and `1` representing how accurate the model is.
Value `0` means not accurate at all, and `1` means very accurate.

In [14]:
def evaluate(model, x_test, y_test):
    output = model.forward(x_test)
    predictions = torch.argmax(output, axis=1)
    corrects = predictions == y_test
    for i in range(12):
        print(f"Accuracy to predict move {i} is {torch.sum(y_test[corrects] == i) / torch.sum(y_test == i)}")
    
    return (torch.sum(corrects) / y_test.shape[0]).item()

In [122]:
evaluate(model, x_test, y_test)

Accuracy to predict move 0 is 0.17965653538703918
Accuracy to predict move 1 is 0.15961800515651703
Accuracy to predict move 2 is 0.1260606050491333
Accuracy to predict move 3 is 0.10443864017724991
Accuracy to predict move 4 is 0.2882273197174072
Accuracy to predict move 5 is 0.09919571131467819
Accuracy to predict move 6 is 0.20420792698860168
Accuracy to predict move 7 is 0.22884881496429443
Accuracy to predict move 8 is 0.1595330685377121
Accuracy to predict move 9 is 0.14606741070747375
Accuracy to predict move 10 is 0.23882503807544708
Accuracy to predict move 11 is 0.18005181849002838


0.17566688358783722

In [24]:
def evaluate_with_num_of_moves(model, x_test, y_test, num_of_moves_test):
    output = model.forward(x_test)
    predictions = torch.argmax(output, axis=1)
    corrects = predictions == y_test
    # for i in range(12):
    #     print(f"Accuracy to predict move {i} is {torch.sum(y_test[corrects] == i) / torch.sum(y_test == i)}")
    for num_of_moves in range(1, 27):
        mask = num_of_moves_test_2 == num_of_moves
        acc = torch.sum(predictions[mask] == y_test[mask]) / y_test[mask].shape[0]
        print(f'Accuracy for puzzle with number of moves {num_of_moves} is {acc}')
    
    return (torch.sum(corrects) / y_test.shape[0]).item()

In [25]:
evaluate_with_num_of_moves(model_2, x_test_2, y_test_2, num_of_moves_test_2)

Accuracy for puzzle with number of moves 1 is 1.0
Accuracy for puzzle with number of moves 2 is 0.8636363744735718
Accuracy for puzzle with number of moves 3 is 0.8756219148635864
Accuracy for puzzle with number of moves 4 is 0.7282700538635254
Accuracy for puzzle with number of moves 5 is 0.6015543937683105
Accuracy for puzzle with number of moves 6 is 0.48658648133277893
Accuracy for puzzle with number of moves 7 is 0.3951111137866974
Accuracy for puzzle with number of moves 8 is 0.32905226945877075
Accuracy for puzzle with number of moves 9 is 0.2857760488986969
Accuracy for puzzle with number of moves 10 is 0.2430117279291153
Accuracy for puzzle with number of moves 11 is 0.21234354376792908
Accuracy for puzzle with number of moves 12 is 0.19413763284683228
Accuracy for puzzle with number of moves 13 is 0.16855822503566742
Accuracy for puzzle with number of moves 14 is 0.15143488347530365
Accuracy for puzzle with number of moves 15 is 0.13884082436561584
Accuracy for puzzle with nu

0.2074010819196701

## Save data

We now write the logic to save training data. This is done with the help of [this StackOverflow post](https://stackoverflow.com/questions/63655048/how-can-i-save-my-training-progress-in-pytorch-for-a-certain-batch-no).

In [31]:
def save_checkpoint(model, optimiser, save_path="data/model"):
    torch.save({
        'model': model.state_dict(),
        #'optimiser': optimiser.state_dict(),
    }, save_path)

def load_checkpoint(load_path="data/model"):
    model = NeuralNetStep()
    optimiser = torch.optim.Adam(model.parameters())
    if not os.path.isfile(load_path):
        return model, optimiser
    
    checkpoint = torch.load(load_path)
    model.load_state_dict(checkpoint['model'])
    #optimiser.load_state_dict(checkpoint['optimiser'])
    return model, optimiser

In [32]:
save_checkpoint(model_2, None, save_path="data/model2")

In [37]:
def train_model_main(epochs=1000):
    """Resume training model from saved data
    The returned value is a tuple of `(model, accuracy)`,
    where `model` is the trained model, and `accuracy` is the accuracy of the current model.
    """
    x_train, y_train, x_test, y_test = load_data()
    model, optimiser = load_checkpoint()
    model = train_model(x_train, y_train, initial_model=model, initial_optimiser=optimiser, epochs=epochs)
    save_checkpoint(model, optimiser)
    accuracy = evaluate(model, x_test, y_test)
    return model, accuracy

In [40]:
model, accuracy = train_model_main(epochs=2000)
print(f"Current accuracy: {accuracy}")

Data size: 17028
Training loop 0
Training loop 100
Training loop 200
Training loop 300
Training loop 400
Training loop 500
Training loop 600
Training loop 700
Training loop 800
Training loop 900
Training loop 1000
Training loop 1100
Training loop 1200
Training loop 1300
Training loop 1400
Training loop 1500
Training loop 1600
Training loop 1700
Training loop 1800
Training loop 1900
Current accuracy: 0.3464474380016327
