In [14]:
import numpy as np
import pandas as pd
from ast import literal_eval
from pathlib import Path
from pprint import pprint
from sympy.combinatorics import Permutation
import matplotlib.pyplot as plt

# local data_dir
data_dir = Path("data")

# Load puzzle info
puzzle_info = pd.read_csv(data_dir / 'puzzle_info.csv', index_col='puzzle_type')
# Parse allowed_moves
puzzle_info['allowed_moves'] = puzzle_info['allowed_moves'].apply(literal_eval)
# puzzle_info_df['total_allowed_moves'] = puzzle_info_df['allowed_moves_dict'].apply(lambda x: len(x.keys()))
puzzle_info['number_moves'] = puzzle_info['allowed_moves'].apply(lambda x: len(x.keys()))

# Load puzzles
puzzles_all = pd.read_csv(data_dir / 'puzzles.csv', index_col='id')
# Parse color states
puzzles_all = puzzles_all.assign(
    initial_state=lambda df: df['initial_state'].str.split(';'),
    solution_state=lambda df: df['solution_state'].str.split(';')
)
# keep only puzzles with puzzle_type in cube_3/3/3
puzzles_all = puzzles_all[puzzles_all['puzzle_type'] == 'cube_3/3/3']

puzzles_all['total_components'] = puzzles_all['solution_state'].apply(len)
puzzles_all['all_unique_components'] = puzzles_all['solution_state'].apply(lambda x: np.unique(x))
puzzles_all['unique_components'] = puzzles_all['all_unique_components'].apply(len)



In [6]:
# create ditionary of allowed moves
allowed_moves = puzzle_info.loc['cube_3/3/3']['allowed_moves']
# create dictionary of allowed moves converting list to np.array
allowed_moves = {k: np.array(v) for k, v in allowed_moves.items()}
# include inverse moves
move_key = list(allowed_moves.keys())
for i in range(len(allowed_moves)):
    move = allowed_moves[move_key[i]]
    allowed_moves[f"-{move_key[i]}"] = np.argsort(move)

# update move_key
move_key = list(allowed_moves.keys())

print(move_key)

['f0', 'f1', 'f2', 'r0', 'r1', 'r2', 'd0', 'd1', 'd2', '-f0', '-f1', '-f2', '-r0', '-r1', '-r2', '-d0', '-d1', '-d2']


In [53]:

from sklearn.preprocessing import OneHotEncoder

def preprocess_cube_state(state, encoder):
    '''
    Preprocess a cube state for input into the CNN
    Args:
    state: 1D array of cube state (3 x 3 x 6 = 54 labels)
    encoder: previously fit OneHotEncoder object
    
    Return: 3D array of one-hot encoded cube state
    '''
    # Reshape the state into a 3x3x6 array (6 faces, each 3x3)
    reshaped_state = state.reshape(6, 3, 3)

    # One-hot encode the labels
    one_hot_encoded = encoder.transform(reshaped_state.reshape(-1, 1)).toarray()

    # Reshape back to 3D structure suitable for CNN (each label is now a one-hot vector)
    one_hot_reshaped = one_hot_encoded.reshape(6, 3, 3, -1)

    return one_hot_reshaped



In [45]:
def apply_move(state, move):
    """
    Apply a move to the cube state.

    Args:
    state (np.array): The current state of the cube.
    move (str): The move to apply.
    allowed_moves (dict): Dictionary of moves and their corresponding permutation functions.

    Returns:
    np.array: New state of the cube after the move.
    """
    return state[move]

import random

def generate_random_state(solution_state, allowed_moves, used_states, min_moves=3, max_moves=20):
    """
    Generate a random cube state by applying a series of random moves to the solution state.
    Checks against a set of used states to avoid duplicates.

    Args:
    solution_state (np.array): The solution state of the cube.
    allowed_moves (dict): Dictionary of moves and their corresponding permutation functions.
    used_states (set): A set of states already used/generated.
    min_moves (int): Minimum number of random moves.
    max_moves (int): Maximum number of random moves.

    Returns:
    np.array: A randomly manipulated state of the cube, or None if a duplicate is found.
    list: The list of moves applied to reach this state, or None if a duplicate is found.
    int: The number of moves applied, or None if a duplicate is found.
    """
    while True:
        num_moves = random.randint(min_moves, max_moves)
        move_sequence = random.choices(list(allowed_moves.keys()), k=num_moves)

        current_state = solution_state.copy()
        for move in move_sequence:
            current_state = apply_move(current_state, allowed_moves[move])

        state_tuple = tuple(current_state.flatten())  # Convert to tuple for set operations
        if state_tuple not in used_states:
            used_states.add(state_tuple)
            return current_state, move_sequence, num_moves
        # If the state is a duplicate, the loop continues to generate a new sequence



In [43]:
# get solution state
solution_state = np.array(puzzles_all.iloc[30]['solution_state'])

# Create a set of used states to avoid duplicates
used_states = set()

# Example of generating a unique state and its associated move count
random_state, move_sequence, num_moves = generate_random_state(solution_state, allowed_moves, 
                                                               used_states, min_moves=3, max_moves=20)

used_states.add(tuple(solution_state.flatten()))  # Add the solution state to avoid generating it


print(f"Number of moves: {num_moves}")
print(f"Move sequence: {move_sequence}")
print(f"Random state:\n{random_state}")
print(type(random_state))

Number of moves: 3
Move sequence: ['r0', '-f1', '-r2']
Random state:
['D' 'A' 'B' 'D' 'C' 'C' 'D' 'A' 'B' 'A' 'B' 'F' 'C' 'B' 'F' 'A' 'B' 'F'
 'C' 'D' 'C' 'C' 'F' 'C' 'C' 'F' 'C' 'A' 'D' 'F' 'A' 'D' 'E' 'A' 'D' 'F'
 'E' 'E' 'E' 'A' 'A' 'B' 'E' 'E' 'E' 'B' 'F' 'D' 'B' 'E' 'E' 'B' 'F' 'D']
<class 'numpy.ndarray'>


In [60]:
import h5py

# Define all possible labels in the cube states
all_possible_labels = np.array(['A', 'B', 'C', 'D', 'E', 'F']).reshape(-1, 1)
# Fit the OneHotEncoder with all possible labels
encoder = OneHotEncoder()
encoder.fit(all_possible_labels)

# Main script to generate the dataset
num_samples = 300000
batch_size = 10000  # Save data every 10000 samples
used_states = {tuple(solution_state.flatten())}  # Initialize used states with the solution state

# Initialize lists to store data
states = []
move_counts = []
used_states = set()

# File to save the data
h5_file = h5py.File('cube_data.h5', 'w')

for i in range(num_samples):
    random_state, move_sequence, num_moves = generate_random_state(solution_state, allowed_moves, 
                                                                   used_states, min_moves=3, max_moves=20)
    
    # peform preprocessing on random_state
    random_state = preprocess_cube_state(random_state, encoder)
    
    # Convert random_state to a 1D array and store
    states.append(random_state.flatten())
    move_counts.append(num_moves)

    # Update the used_states set
    used_states.add(tuple(random_state.flatten()))

    # Save and clear lists in batches to reduce memory usage
    if (i + 1) % batch_size == 0 or (i + 1) == num_samples:
        print(f"writing {i + 1} of {num_samples}")
        # Convert lists to numpy arrays for saving
        states_array = np.array(states)
        move_counts_array = np.array(move_counts)

        # Save the batch to file
        h5_file.create_dataset(f'states_batch_{i // batch_size}', data=states_array)
        h5_file.create_dataset(f'move_counts_batch_{i // batch_size}', data=move_counts_array)

        # Clear lists for the next batch
        states.clear()
        move_counts.clear()

# Close the file
h5_file.close()



writing 10000 of 300000
writing 20000 of 300000
writing 30000 of 300000
writing 40000 of 300000
writing 50000 of 300000
writing 60000 of 300000
writing 70000 of 300000
writing 80000 of 300000
writing 90000 of 300000
writing 100000 of 300000
writing 110000 of 300000
writing 120000 of 300000
writing 130000 of 300000
writing 140000 of 300000
writing 150000 of 300000
writing 160000 of 300000
writing 170000 of 300000
writing 180000 of 300000
writing 190000 of 300000
writing 200000 of 300000
writing 210000 of 300000
writing 220000 of 300000
writing 230000 of 300000
writing 240000 of 300000
writing 250000 of 300000
writing 260000 of 300000
writing 270000 of 300000
writing 280000 of 300000
writing 290000 of 300000
writing 300000 of 300000


In [63]:
i = 0
h5_file = h5py.File('cube_data.h5', 'r')
states_batch = h5_file[f'states_batch_{i}'][:]
move_counts_batch = h5_file[f'move_counts_batch_{i}'][:]

for state in states_batch:
    print(state.shape)
    print(state)
    break

(324,)
[1. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 1. 0. 0. 0. 0. 0.
 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 0.
 0. 0. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0.
 0. 0. 0. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 0.
 0. 0. 0. 0. 1. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 1. 0. 0. 0. 0.
 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 1. 0. 0. 0.
 0. 0. 0. 0. 0. 1. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 1. 0. 0. 0. 0. 0.
 0. 0. 1. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0.
 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 1. 0. 0. 0.
 0. 0. 0. 1. 0. 0. 0. 0. 0. 1. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.
 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 1. 0. 0.
 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 1. 0.
 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 1. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1.
 0. 0. 0. 0. 0. 1. 1. 0. 0. 0. 0. 0.]


In [71]:
def reshape_data(flattened_data, n_categories):
    """
    Reshape the flattened one-hot encoded data back to its original 3D structure.

    Args:
    flattened_data (np.array): The flattened one-hot encoded data.
    n_categories (int): Number of categories used in one-hot encoding.

    Returns:
    np.array: Reshaped data into its original 3D structure.
    """
    # Calculate the total number of elements in one face
    face_elements = 3 * 3 * n_categories

    # Reshape back to 3D structure (6 faces, 3x3, with one-hot encoded labels)
    reshaped_data = flattened_data.reshape((6, 3, 3, n_categories))

    return reshaped_data

reshape_data(state, 6).shape

(6, 3, 3, 6)

In [79]:
def load_and_preprocess_data(h5_file):
    all_states = []
    all_move_counts = []

    #for i in range(h5_file.attrs['num_batches']):
    for i in range(2):
        # Load data
        states_batch = h5_file[f'states_batch_{i}'][:]
        move_counts_batch = h5_file[f'move_counts_batch_{i}'][:]

        # Decode and preprocess states
        for state in states_batch:
            decoded_state = reshape_data(state, n_categories=6)  # Implement this function based on your encoding scheme
            #preprocessed_state = preprocess_cube_state(decoded_state)
            all_states.append(decoded_state)

        all_move_counts.extend(move_counts_batch)

    return np.array(all_states), np.array(all_move_counts)

# Open the HDF5 file and load the data
h5_file = h5py.File('cube_data.h5', 'r')
states, move_counts = load_and_preprocess_data(h5_file)
h5_file.close()


In [81]:
states.shape

(20000, 6, 3, 3, 6)

If the goal of your CNN is to predict the number of moves away from the goal state for a given cube state, you should treat this as a regression problem. In a regression setting, the CNN will output a continuous value representing the estimated number of moves, rather than classifying the input into categories.

Here's how you can adapt the CNN architecture, compilation, and training process for a regression task:

## 1. Define the CNN Architecture for Regression
For regression, the output layer will have a single neuron (since you're predicting a single continuous value - the number of moves), and you will typically use a linear activation function (or no activation function) for the output layer.

In [87]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv3D, MaxPooling3D, Flatten, Dense, Dropout

model = Sequential([
    Conv3D(32, (3, 3, 3), activation='relu', padding='same', input_shape=(6, 3, 3, 6)),
    MaxPooling3D((2, 2, 2)),
    Conv3D(64, (3, 3, 3), activation='relu', padding='same'),
    #MaxPooling3D((2, 2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(1)  # Single neuron for regression output
])

## 2. Compile the Model for Regression
When compiling the model for a regression task, use a loss function suitable for regression, like Mean Squared Error (MSE).

## 3. Train the Model
Train your model on the states and their corresponding move counts. Ensure that move_counts contains numerical values (not categorical labels).

In [90]:
print(states.shape)
print(move_counts.shape)

(20000, 6, 3, 3, 6)
(20000,)


In [91]:
model.compile(optimizer='adam',
              loss='mean_squared_error')

history = model.fit(states, move_counts, epochs=10, validation_split=0.2)



Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


## 4. Evaluate the Model
Evaluate the model using a regression metric like Mean Squared Error or Mean Absolute Error on a test set.

In [102]:
# create test set
# Initialize np.arrays to store test data
states_test


states_test = []
move_counts_test = []
used_states_test = set()


for i in range(5000):
    random_state, move_sequence, num_moves = generate_random_state(solution_state, allowed_moves, 
                                                                   used_states, min_moves=3, max_moves=20)
    
    # peform preprocessing on random_state
    random_state = preprocess_cube_state(random_state, encoder)
    
    # Convert random_state to a 1D array and store
    states_test.append(random_state)
    move_counts_test.append(num_moves)

    # Update the used_states set
    used_states_test.add(tuple(random_state.flatten()))


states_test = np.array(states_test)
move_counts_test = np.array(move_counts_test)

print(f"shape of states_test: {states_test.shape}")
print(f"shape of move_counts_test: {move_counts_test.shape}")

shape of states_test: (5000, 6, 3, 3, 6)
shape of move_counts_test: (5000,)


In [103]:
test_loss = model.evaluate(states_test, move_counts_test)
print(f"Test Loss (Mean Squared Error): {test_loss}")


Test Loss (Mean Squared Error): 13.279394149780273
