In [None]:

import zipfile
from google.colab import drive

drive.mount('/content/drive/')


Mounted at /content/drive/


In [None]:
zip_ref = zipfile.ZipFile("/content/drive/MyDrive/project/rubiks_cube-master-2.zip", 'r')
zip_ref.extractall("/content/sample_data")
zip_ref.close()

In [None]:
!pip3 install pycuber

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pycuber
  Downloading pycuber-0.2.2-py3-none-any.whl (23 kB)
Installing collected packages: pycuber
Successfully installed pycuber-0.2.2


In [None]:
from collections import Counter
from random import choice

import numpy as np
import pycuber as pc
import keras.backend as K
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from keras.layers import Dense, Input, LeakyReLU
from keras.models import Model
from keras.optimizers import Adam
from tqdm import tqdm

In [None]:
action_map = {'F': 0, 'B': 1, 'U': 2, 'D': 3, 'L': 4, 'R': 5, "F'": 6, "B'": 7, "U'": 8, "D'": 9, "L'": 10, "R'": 11,
              'F2': 12, 'B2': 13, 'U2': 14, 'D2': 15, 'L2': 16, 'R2': 17, "F2'": 18, "B2'": 19, "U2'": 20, "D2'": 21,
              "L2'": 22, "R2'": 23}
action_map_small = {'F': 0, 'B': 1, 'U': 2, 'D': 3, 'L': 4, 'R': 5, "F'": 6, "B'": 7, "U'": 8, "D'": 9, "L'": 10, "R'": 11}
inv_action_map = {v: k for k, v in action_map.items()}


In [None]:
inv_action_map

{0: 'F',
 1: 'B',
 2: 'U',
 3: 'D',
 4: 'L',
 5: 'R',
 6: "F'",
 7: "B'",
 8: "U'",
 9: "D'",
 10: "L'",
 11: "R'",
 12: 'F2',
 13: 'B2',
 14: 'U2',
 15: 'D2',
 16: 'L2',
 17: 'R2',
 18: "F2'",
 19: "B2'",
 20: "U2'",
 21: "D2'",
 22: "L2'",
 23: "R2'"}

In [None]:
color_map = {'green': 0, 'blue': 1, 'yellow': 2, 'red': 3, 'orange': 4, 'white': 5}

color_list_map = {'green': [1, 0, 0, 0, 0, 0], 'blue': [0, 1, 0, 0, 0, 0], 'yellow': [0, 0, 1, 0, 0, 0],
                  'red': [0, 0, 0, 1, 0, 0], 'orange': [0, 0, 0, 0, 1, 0], 'white': [0, 0, 0, 0, 0, 1]}

In [None]:
def flatten(cube):
    sides = [cube.F, cube.B, cube.U, cube.D, cube.L, cube.R]
    flat = []
    for x in sides:
        for i in range(3):
            for j in range(3):
                flat.append(x[i][j].colour)
    return flat

In [None]:
def flatten_1d_b(cube):
    sides = [cube.F, cube.B, cube.U, cube.D, cube.L, cube.R]
    flat = []
    for x in sides:
        for i in range(3):
            for j in range(3):
                flat.extend(color_list_map[x[i][j].colour])
    return flat

In [None]:
def order(data):
    if len(data) <= 1:
        return 0

    counts = Counter()

    for d in data:
        counts[d] += 1

    probs = [float(c) / len(data) for c in counts.values()]

    return max(probs)

In [None]:
def perc_solved_cube(cube):
    flat = flatten(cube)
    perc_side = [order(flat[i:(i + 9)]) for i in range(0, 9 * 6, 9)]
    return np.mean(perc_side)

In [None]:
def gen_sample(n_steps=6):
    cube = pc.Cube()

    transformation = [choice(list(action_map.keys())) for _ in range(n_steps)]

    my_formula = pc.Formula(transformation)

    cube(my_formula)

    my_formula.reverse()

    sample_X = []
    sample_Y = []
    cubes = []

    for s in my_formula:
        sample_X.append(flatten_1d_b(cube))
        sample_Y.append(action_map[s.name])
        cubes.append(cube.copy())
        cube(s.name)

    return sample_X, sample_Y, cubes

In [None]:
def gen_sample_small(n_steps=6):
    cube = pc.Cube()

    transformation = [choice(list(action_map_small.keys())) for _ in range(n_steps)]

    my_formula = pc.Formula(transformation)

    cube(my_formula)

    my_formula.reverse()

    sample_X = []
    sample_Y = []
    cubes = []

    for s in my_formula:
        sample_X.append(flatten_1d_b(cube))
        sample_Y.append(action_map[s.name])
        cubes.append(cube.copy())
        cube(s.name)

    return sample_X, sample_Y, cubes

In [None]:
def gen_sequence(n_steps=6):
    cube = pc.Cube()

    transformation = [choice(list(action_map_small.keys())) for _ in range(n_steps)]#25----randomness comes here 

    my_formula = pc.Formula(transformation)

    cube(my_formula)

    my_formula.reverse()

    cubes = []
    distance_to_solved = []

    for i, s in enumerate(my_formula):
        cubes.append(cube.copy())
        cube(s.name)
        distance_to_solved.append(n_steps-i)#25-i

    return cubes, distance_to_solved

In [None]:
def get_all_possible_actions_cube_small(cube):

    flat_cubes = []
    rewards = []

    for a in action_map_small:
        cube_copy = cube.copy()
        cube_copy = cube_copy(a)
        flat_cubes.append(flatten_1d_b(cube_copy))
        rewards.append(2*int(perc_solved_cube(cube_copy)>0.99)-1)

    return flat_cubes, rewards


In [None]:
def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

In [None]:
def acc(y_true, y_pred):
    return K.cast(K.equal(K.max(y_true, axis=-1),
                          K.cast(K.argmax(y_pred, axis=-1), K.floatx())),
                  K.floatx())


def get_model(lr=0.0001):
    input1 = Input((324,))

    d1 = Dense(1024)
    d2 = Dense(1024)
    d3 = Dense(1024)

    d4 = Dense(50)

    x1 = d1(input1)
    x1 = LeakyReLU()(x1)
    x1 = d2(x1)
    x1 = LeakyReLU()(x1)
    x1 = d3(x1)
    x1 = LeakyReLU()(x1)
    x1 = d4(x1)
    x1 = LeakyReLU()(x1)

    out_value = Dense(1, activation="linear", name="value")(x1)
    out_policy = Dense(len(action_map_small), activation="softmax", name="policy")(x1)

    model = Model(input1, [out_value, out_policy])

    model.compile(loss={"value": "mae", "policy": "sparse_categorical_crossentropy"}, optimizer=Adam(lr),
                  metrics={"policy": acc})
    model.summary()

    return model

In [None]:
if __name__ == "__main__":

    N_SAMPLES = 100
    N_EPOCH = 100

    file_path = "auto.h5"

    checkpoint = ModelCheckpoint(file_path, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

    early = EarlyStopping(monitor="val_loss", mode="min", patience=1000)

    reduce_on_plateau = ReduceLROnPlateau(monitor="val_loss", mode="min", factor=0.1, patience=50, min_lr=1e-8)

    callbacks_list = [checkpoint, early, reduce_on_plateau]

    model = get_model(lr=0.0001)
    #model.load_weights(file_path)

    for i in range(N_EPOCH):
        print(i)
        cubes = []
        distance_to_solved = []
        for j in tqdm(range(N_SAMPLES)):
            _cubes, _distance_to_solved = gen_sequence(25)
            cubes.extend(_cubes)
            distance_to_solved.extend(_distance_to_solved)

        cube_next_reward = []
        flat_next_states = []
        cube_flat = []

        for c in tqdm(cubes):
            flat_cubes, rewards = get_all_possible_actions_cube_small(c)
            cube_next_reward.append(rewards)
            flat_next_states.extend(flat_cubes)
            cube_flat.append(flatten_1d_b(c))

        for _ in range(20):

            cube_target_value = []
            cube_target_policy = []

            next_state_value, _ = model.predict(np.array(flat_next_states), batch_size=1024)
            next_state_value = next_state_value.ravel().tolist()
            next_state_value = list(chunker(next_state_value, size=len(action_map_small)))

            for c, rewards, values in tqdm(zip(cubes, cube_next_reward, next_state_value)):
                r_plus_v = 0.4*np.array(rewards) + np.array(values)
                target_v = np.max(r_plus_v)
                target_p = np.argmax(r_plus_v)
                cube_target_value.append(target_v)
                cube_target_policy.append(target_p)

            cube_target_value = (cube_target_value-np.mean(cube_target_value))/(np.std(cube_target_value)+0.01)

            print(cube_target_policy[-30:])
            print(cube_target_value[-30:])

            sample_weights = 1. / np.array(distance_to_solved)
            sample_weights = sample_weights * sample_weights.size / np.sum(sample_weights)

            model.fit(np.array(cube_flat), [np.array(cube_target_value), np.array(cube_target_policy)[..., np.newaxis]],
                      epochs=1, batch_size=128, sample_weight=[sample_weights, sample_weights])
            # sample_weight=[sample_weights, sample_weights],

        model.save_weights(file_path)


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 324)]        0           []                               
                                                                                                  
 dense (Dense)                  (None, 1024)         332800      ['input_1[0][0]']                
                                                                                                  
 leaky_re_lu (LeakyReLU)        (None, 1024)         0           ['dense[0][0]']                  
                                                                                                  
 dense_1 (Dense)                (None, 1024)         1049600     ['leaky_re_lu[0][0]']            
                                                                                              

100%|██████████| 100/100 [00:07<00:00, 14.16it/s]
 37%|███▋      | 923/2500 [00:34<00:59, 26.42it/s]


KeyboardInterrupt: ignored

In [None]:
model = get_model()
model.load_weights('/content/auto.h5')
sample_X, sample_Y, cubes = gen_sample(10)
cube = cubes[0]
cube.score = 0
list_sequences = [[cube]]
existing_cubes = set()
for j in range(1000):

        X = [flatten_1d_b(x[-1]) for x in list_sequences]

        value, policy = model.predict(np.array(X), batch_size=1024)

        new_list_sequences = []

        for x, policy in zip(list_sequences, policy):

            new_sequences = [x + [x[-1].copy()(action)] for action in action_map]

            pred = np.argsort(policy)

            cube_1 = x[-1].copy()(list(action_map.keys())[pred[-1]])
            cube_2 = x[-1].copy()(list(action_map.keys())[pred[-2]])

            new_list_sequences.append(x + [cube_1])
            new_list_sequences.append(x + [cube_2])

        print("new_list_sequences", len(new_list_sequences))
        last_states_flat = [flatten_1d_b(x[-1]) for x in new_list_sequences]
        value, _ = model.predict(np.array(last_states_flat), batch_size=1024)
        value = value.ravel().tolist()
        for x, v in zip(new_list_sequences, value):
            x[-1].score = v if str(x[-1]) not in existing_cubes else -1

        new_list_sequences.sort(key=lambda x: x[-1].score , reverse=True)

        new_list_sequences = new_list_sequences[:100]
        existing_cubes.update(set([str(x[-1]) for x in new_list_sequences]))

        list_sequences = new_list_sequences

        list_sequences.sort(key=lambda x: perc_solved_cube(x[-1]), reverse=True)

        prec = perc_solved_cube((list_sequences[0][-1]))

        print(prec)

        if prec == 1:
            break

print(perc_solved_cube(list_sequences[0][-1]))
print(list_sequences[0][-1])

   