In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
folder = 'drive/MyDrive/Code/GitHub/CellularAutomata'
import sys
sys.path.insert(1, folder)
import ca_model
import count_pixels_dataset as cpd
import visualisation as vis_ca

In [3]:
# import importlib
# importlib.reload(ca_model)
# importlib.reload(vis_ca)

In [4]:
import numpy as np
# import matplotlib.pyplot as plt
import tensorflow as tf
from IPython.display import clear_output
# import matplotlib.colors as cl
# # from tensorflow.keras.layers import Conv2D
# from moviepy.video.io.ffmpeg_writer import FFMPEG_VideoWriter
# import moviepy.editor as mvp
# import platform, tqdm
# import PIL.Image, PIL.ImageDraw
# from IPython.display import Image, clear_output
# from tensorflow.python.client import device_lib
# print(device_lib.list_local_devices())

In [None]:
JustTesting = True # If True run everything faster

LoadPreviousModelQ = False # if True we load the model, either for further training or just testing
i_step_load = 360000

seed_training = 1
seed_movies = 1

RunTrainingQ = True # If True we run the training

RebuildDatasetQ = False # if True we rebuild the dataset
MutateTrainingQ = True # if True, during training we mutate the image
MutateTestingQ = True # if True, during testing we mutate the image

RunTestQ = True # if True, in the end we test an increasing size of inputs

no_channels = '4TimesClasses' # 'SameClasses' '5PlusClasses' '4TimesClasses'

dataset = 'count_digits'
id_run = 'CA_CD_3Classes_Deeper_M{}_MutTrain{}_MutTest{}'.format(no_channels, MutateTrainingQ, MutateTestingQ) # the prefix to all file names which will be use for saving and loading the model

NO_CLASSES = 3
limits_classes = [2, 8] #len of this should be NO_CLASSES-1
limits_c_p = [0, 2, 8, 100]
color_lookup = tf.constant([
                    [255, 0, 0],
                    [0, 255, 0],
                    [0, 0, 255],
                    [0, 0, 0], # This is the default for digits.
                    [255, 255, 255] # This is the background.
                    ])

H, W = 10, 10

TR_EVOLVE = 50 # Number of time steps to let CA evolve for each input during training
TST_EVOLVE = 50 # Number of time steps to let CA evolve for each input during testing

BATCH_SIZE = 64 # number of images per batch

if no_channels == 'SameClasses':
    NO_CHANNELS = NO_CLASSES # number of hidden states of the CA, must be at least NO_CLASSES because there are two outputs
elif no_channels == '4TimesClasses':
    NO_CHANNELS = 4 * NO_CLASSES # number of hidden states of the CA, must be at least NO_CLASSES because there are two outputs
elif no_channels == '5PlusClasses':
    NO_CHANNELS = 5 + NO_CLASSES # number of hidden states of the CA, must be at least NO_CLASSES because there are two outputs
    
if JustTesting:
    TR_NO_ITERATIONS = 500 # number of iterations for the training loop
    export_every = 250 # number of iterations between each model export
    visualise_every = 50 # number of iteration between each model visualisation
    i_step_verify = [250, 500] # [250, 500]
else:
    TR_NO_ITERATIONS = 500000 # number of iterations for the training loop
    export_every = 10000 # number of iterations between each model export
    visualise_every = 2000 # number of iteration between each model visualisation
    i_step_verify = [240000, 500000] # [240000, 500000]


ADD_NOISE = True # if True then the normal update of the CA has noise added

In [None]:
ca_model.CAModel(NO_CHANNELS, NO_CLASSES, H, W).update_state.summary()

In [None]:
# Prepare the dataset
size_ds = 10000 #the number of images on the dataset

BuildDS = False
if RebuildDatasetQ:
    BuildDS = True
else:
    try:
        res = np.load(folder + '/dataset/{}_{}.npz'.format(dataset, size_ds))
        x_train = res['x_train']
        y_train = res['y_train']
        x_test = res['x_test']
        y_test = res['y_test']
    except:
        BuildDS = True
if BuildDS:
    x_train, x_test, y_train, y_test = cpd.build_dataset(size_ds, H, W) 
    np.savez(folder + '/dataset/{}_{}.npz'.format(dataset, size_ds), x_train=x_train, x_test=x_test, y_train=y_train, y_test=y_test)

y_train_pic = cpd.to_classes_dim_label(x_train, y_train, limits_classes)
y_label_train = np.zeros(y_train.shape[0])
for i in range(y_train.shape[0]):
    y_label_train[i] = cpd.class_indice_f(y_train[i], limits_classes)
# y_test_pic = to_classes_dim_label(x_test, y_test)

In [None]:
lr = 1e-3 # initial learning rate
lr_sched = tf.keras.optimizers.schedules.PiecewiseConstantDecay([int(TR_NO_ITERATIONS*0.3), int(TR_NO_ITERATIONS*0.7)], [lr, lr*0.1, lr*0.01])
trainer = tf.keras.optimizers.Adam(lr_sched) # use ADAM optimizer with learning rate schedule

loss_log = np.zeros(TR_NO_ITERATIONS) # for plotting of loss function across time
loss_log_classes = np.zeros((TR_NO_ITERATIONS, NO_CLASSES)) # for plotting of loss function across time

In [None]:
if LoadPreviousModelQ:
    ca, loss_log, loss_log_classes = ca_model.get_model(folder, id_run, i_step_load, NO_CHANNELS, NO_CLASSES, H, W, ADD_NOISE)

    ITER = i_step_load
else:
    ca = ca_model.CAModel(NO_CHANNELS, NO_CLASSES, H, W)
    ITER = 0

In [None]:
# Training happens here
np.random.seed(seed_training)
if RunTrainingQ:
    # Training Loop
    for i in range(ITER, TR_NO_ITERATIONS):

        b_idx = np.random.randint(0, x_train.shape[0] - 1, size=BATCH_SIZE)
        x0 = ca.initialize(x_train[b_idx])
        y0 = y_train_pic[b_idx]
        y0_label = y_label_train[b_idx]
        y0_label = tf.convert_to_tensor(y0_label)        

        x, loss, c_l = ca_model.train_step(trainer, ca, x0, y0, y0_label, TR_EVOLVE, NO_CLASSES, MutateTrainingQ=MutateTrainingQ)

        loss_log[i] = loss.numpy()
        loss_log_classes[i, :] = [k.numpy() for k in c_l]

        if i % visualise_every == 0:
            clear_output()
            vis_ca.plot_loss(loss_log[:i], loss_log_classes[:i, :], folder, id_run, limits_c_p, color_lookup, True)
        if i % export_every == 0:
            ca_model.export_model(folder, id_run, ca, i, loss_log, loss_log_classes)

        print('\r step: {}, log10(loss): {}, log10(loss)[classes]: {}'.format(i + 1, np.log10(loss), np.log10(c_l)), end='')
    ca_model.export_model(folder, id_run, ca, TR_NO_ITERATIONS, loss_log, loss_log_classes)

In [None]:
eval_bs = 5 ** 2 # number of samples to show in the movie
for i_step_v in i_step_verify:
    ca, loss_log, loss_log_classes = ca_model.get_model(folder, id_run, i_step_v, NO_CHANNELS, NO_CLASSES, H, W, ADD_NOISE)

    np.random.seed(seed_movies)

    new_idx = np.random.randint(0, x_test.shape[0] - 1, size=eval_bs)
    x = ca.initialize(x_test[new_idx])

    vis_ca.make_run_videos(folder, id_run, i_step_v, TST_EVOLVE, MutateTestingQ, x, ca, color_lookup)

In [None]:
# if RunTestQ:
#     def add_pixel(old_image):
#         new_image = old_image.copy()
#         indices = np.asarray(np.where(old_image == 1)).T
#         success = 0
#         k = 0
#         while (not success) or (k > 100):
#             k += 1
#             idx = np.random.randint(indices.shape[0])
#             x, y = indices[idx, :]
#             x_or_y = np.random.rand() < 0.5
#             pos_or_neg = np.random.rand() < 0.5
#             if x_or_y:
#                 if pos_or_neg:
#                     dx = 1
#                     dy = 0
#                 else:
#                     dx = -1
#                     dy = 0
#             else:
#                 if pos_or_neg:
#                     dx = 0
#                     dy = 1
#                 else:
#                     dx = 0
#                     dy = -1
#             nx = min(max(0, x + dx), 9)
#             ny = min(max(0, y + dy), 9)
#             if old_image[nx, ny] < 0.5:
#                 new_image[nx, ny] = 1
#                 success = 1
#         return new_image

#     images = np.zeros((20, 25, H, W), dtype=np.float32)
#     for i_img in range(25):
#         images[0, i_img, int(H / 2), int(W / 2)] = 1
#     for j in range(1, 20):
#         for i_img in range(25):
#             images[j, i_img, :, :] = add_pixel(images[j - 1, i_img, :, :])
#     images = tf.constant(images)
    
#     for i_step_v in i_step_verify:
#         ca, loss_log, loss_log_classes = get_model(i_step_v)
#         x = ca.initialize(images[0, :, :, :])
#         with VideoWriter(folder + '/CA/Movie_test_increase_{}_{}.mp4'.format(id_run, i_step_v)) as vid:
#             for j in range(20):
#                 x = ca.mutate(x, tf.expand_dims(images[j, :, :, :], -1))
#                 for i in tqdm.trange(TR_EVOLVE):
#                     x = ca(x)
#                     image = zoom(tile2d(classify_and_color(ca, x)), scale=4)
#                     im = np.uint8(image*255)
#                     im = PIL.Image.fromarray(im)
#                     draw = PIL.ImageDraw.Draw(im)
#                     vid.add(np.uint8(im))
#         x = ca.initialize(images[0, :, :, :])
#         with VideoWriter(folder + '/CA/Movie_test_decrease_{}_{}.mp4'.format(id_run, i_step_v)) as vid:
#             for j in range(20):
#                 x = ca.mutate(x, tf.expand_dims(images[19 - j, :, :, :], -1))
#                 for i in tqdm.trange(TR_EVOLVE):
#                     x = ca(x)
#                     image = zoom(tile2d(classify_and_color(ca, x)), scale=4)
#                     im = np.uint8(image*255)
#                     im = PIL.Image.fromarray(im)
#                     draw = PIL.ImageDraw.Draw(im)
#                     vid.add(np.uint8(im))