# Prelude

The notebook contains utility functions from Mordvinstev et al.'s [colab notebook](https://colab.research.google.com/github/google-research/self-organising-systems/blob/master/notebooks/growing_ca.ipynb). 


In [None]:
import os
import io
import PIL.Image, PIL.ImageDraw
import base64
import zipfile
import json
import requests
import numpy as np
import matplotlib.pylab as pl
import glob

import tensorflow as tf
from IPython.display import Image, HTML, clear_output
import tqdm
import os

In [None]:
CHANNEL_N = 16        # Number of CA state channels
TARGET_SIZE = 32
BATCH_SIZE = 2
POOL_SIZE = 512
CELL_FIRE_RATE = 0.75

In [None]:
from tensorflow.keras.layers import Conv3D

def to_rgba(x):
    return x[..., :4]

def to_alpha(x):
    return tf.clip_by_value(x[..., 3:4], 0.0, 1.0)

def to_rgb(x):
    # assume rgb premultiplied by alpha
    rgb, a = x[..., :3], to_alpha(x)
    return 1.0-a+rgb

def get_living_mask(x):
    alpha = x[:, :, :, :, 3:4]
    return tf.nn.max_pool3d(alpha, 3, [1, 1, 1, 1, 1], 'SAME') > 0.1

def make_seed(size, n=1):
    x = np.zeros([n, size, size, CHANNEL_N], np.float32)
    x[:, size//2, size//2, size//2, 3:] = 1.0
    return x

class CAModel3D(tf.keras.Model):
  def __init__(self, channel_n=CHANNEL_N, fire_rate=CELL_FIRE_RATE):
    super().__init__()
    self.channel_n = channel_n
    self.fire_rate = fire_rate

    perc = Conv3D(64, 3, activation=None, groups=self.channel_n, 
                 padding="SAME", use_bias=False, trainable=False)
    
    self.dmodel = tf.keras.Sequential([
          perc,
          Conv3D(160, 1, activation=tf.nn.relu),
          Conv3D(self.channel_n, 1, activation=None,
              kernel_initializer=tf.zeros_initializer),
    ])
    
    self.build((None, 3, 3, 3, channel_n))
        
    identify = np.zeros((3, 3, 3))
    identify[1, 1, 1] = 1
    # 3D Sobel filters
    dx = np.array([1, 2, 1])[None, None, :] * np.outer([1, 2, 1], [-1, 0, 1])[:, :, None] / 32
    dy = np.array([1, 2, 1])[None, None, :] * np.outer([1, 2, 1], [-1, 0, 1]).T[:, :, None] / 32
    dz = (np.array([1, 2, 1])[None, None, :] * np.outer([1, 2, 1], [-1, 0, 1]).T[:, :, None]).T / 32

    kernel = np.stack([identify, dx, dy, dz], -1)[:, :, :, None, :]
    kernel = np.repeat([kernel], repeats=CHANNEL_N, axis=-1)
    perc.set_weights(kernel)
    
    self.dmodel.layers[0].trainable = False

  @tf.function
  def call(self, x, fire_rate=None, angle=0.0, step_size=1.0):
    pre_life_mask = get_living_mask(x)
    dx = self.dmodel(x)*step_size
    if fire_rate is None:
      fire_rate = self.fire_rate
    update_mask = tf.random.uniform(tf.shape(x[:, :, :, :1])) <= fire_rate
    x += dx * tf.cast(update_mask, tf.float32)

    post_life_mask = get_living_mask(x)
    life_mask = pre_life_mask & post_life_mask
    return x * tf.cast(life_mask, tf.float32)


CAModel3D().dmodel.summary()

# Training

In [None]:
from google.protobuf.json_format import MessageToDict
from tensorflow.python.framework import convert_to_constants

class SamplePool:
  def __init__(self, *, _parent=None, _parent_idx=None, **slots):
    self._parent = _parent
    self._parent_idx = _parent_idx
    self._slot_names = slots.keys()
    self._size = None
    for k, v in slots.items():
      if self._size is None:
        self._size = len(v)
      assert self._size == len(v)
      setattr(self, k, np.array(v))

  def sample(self, n):
    idx = np.random.choice(self._size, n, False)
    batch = {k: getattr(self, k)[idx] for k in self._slot_names}
    batch = SamplePool(**batch, _parent=self, _parent_idx=idx)
    return batch

  def commit(self):
    for k in self._slot_names:
      getattr(self._parent, k).setflags(write=1)
      getattr(self._parent, k)[self._parent_idx] = getattr(self, k)

@tf.function
def make_circle_masks(n, h, w, d):
  x = tf.linspace(-1.0, 1.0, w)[None, :, None, None]
  y = tf.linspace(-1.0, 1.0, h)[None, None, :, None]
  z = tf.linspace(-1.0, 1.0, d)[None, None, None, :]
  center = tf.random.uniform([3, n, 1, 1, 1], -0.5, 0.5)
  r = tf.random.uniform([n, 1, 1, 1], 0.1, 0.3)
  x, y, z = (x - center[0])/r, (y - center[1])/r, (z - center[2])/r
  mask = tf.cast(x*x+y*y+z*z < 1.0, tf.float32)
  return mask

def export_model(ca, base_fn):
  ca.save_weights(base_fn)

  cf = ca.call.get_concrete_function(
      x=tf.TensorSpec([None, None, None, None, CHANNEL_N]),
      fire_rate=tf.constant(0.5),
      angle=tf.constant(0.0),
      step_size=tf.constant(1.0))
  cf = convert_to_constants.convert_variables_to_constants_v2(cf)
  graph_def = cf.graph.as_graph_def()
  graph_json = MessageToDict(graph_def)
  graph_json['versions'] = dict(producer='1.14', minConsumer='1.14')
  model_json = {
      'format': 'graph-model',
      'modelTopology': graph_json,
      'weightsManifest': [],
  }
  with open(base_fn+'.json', 'w') as f:
    json.dump(model_json, f)

def plot_loss(loss_log):
  pl.figure(figsize=(10, 4))
  pl.title('Loss history (log10)')
  pl.plot(np.log10(loss_log), '.', alpha=0.1)
  pl.show()


# Data loading 

You will need to load 3D targets shapes to train the model. 

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!mkdir -p models
!cp /content/drive/MyDrive/training_data/models/* models/

In [None]:
import pickle
import json
import matplotlib.pyplot as plt

def plot_3d(arr):
    fig = plt.figure()
    ax = fig.add_subplot(projection='3d')
    u = np.moveaxis(arr, (0, 1), (1, 2))
    m = ax.voxels((u[:, :, :, 3] > 0.1), 
                  facecolors=np.clip(u[:, :, :, :4], 0, 1))
    clear_output()
    plt.show()

p = TARGET_SIZE
# owl = pickle.load(open("models/owl.pkl", "rb"))
# rabbit = pickle.load(open("models/rabbit.pkl", "rb"))
# elephant = pickle.load(open("models/elephant.pkl", "rb"))
# duck = pickle.load(open("models/duck.pkl", "rb"))
# targets = [np.array(x) for x in [owl, rabbit, elephant, duck]]
with open('ditest.json','r') as f:
    cat_limbs = json.loads(f.read())
ditest = np.array(cat_limbs)
targets = [ditest,ditest]
plot_3d(targets[0])

def pad_tgt(tgt):
    px = (p - tgt.shape[0]) // 2
    py = (p - tgt.shape[1]) // 2
    pz = (p - tgt.shape[2]) // 2
    return tf.pad(tgt, [
        (px, px + (p - tgt.shape[0] - 2 * px)), 
        (py, py + (p - tgt.shape[1] - 2 * py)), 
        (pz, pz + (p - tgt.shape[2] - 2 * pz)), (0, 0)])

In [None]:


def batch_3d_viz(x, x0):
    fig = plt.figure(figsize=plt.figaspect(0.35)*1.5)
    ax = [fig.add_subplot(2, BATCH_SIZE, i, projection='3d') 
          for i in range(1, 2 * BATCH_SIZE + 1)]
    for i in range(BATCH_SIZE):
        u = np.moveaxis(x0[i], (0, 1), (1, 2))
        ax[i].set_axis_off()
        ax[i].voxels((u[::-1, :, :, 3] > 0.1), 
                     facecolors=np.clip(u[::-1, :, :, :4], 0, 1))
    for i in range(BATCH_SIZE):
        u = np.moveaxis(x[i].numpy(), (0, 1), (1, 2))
        ax[BATCH_SIZE + i].set_axis_off()
        ax[BATCH_SIZE + i].voxels((u[::-1, :, :, 3] > 0.1), 
                                  facecolors=np.clip(u[::-1, :, :, :4], 0, 1))
    plt.show()
    plot_3d(np.array(x[0,:,:,:,:4]))


In [None]:
pad_targets = [tf.cast(pad_tgt(target_img), tf.float32) 
for target_img in targets]

plot_3d(np.array(pad_targets[0]))
h, w, d = pad_targets[0].shape[:3]
seed = np.zeros([len(targets), h, w, d, CHANNEL_N], np.float32)

seed[0, h//2, w//2 , d//2, 3:] = 1.0
seed[1, h//2+7, w//2+7 , d//2+7, 3:] = 1.0


plot_3d(seed[0,:,:,:,:4])

def loss_f(x, target):
    return tf.reduce_mean(tf.square(to_rgba(x) - target), 
                          [-2, -3, -4, -1])

ca = CAModel3D()
loss_log = []

lr = 1e-3
lr_sched = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
    [2000], [lr, lr*0.1])
trainer = tf.keras.optimizers.Adam(lr_sched)

loss0 = loss_f(seed[0], pad_targets[0]).numpy()

inp = tf.cast(np.repeat(seed, POOL_SIZE//len(targets), 0), tf.float32)
tgt = tf.repeat(pad_targets, POOL_SIZE//len(targets), 0)

pool = SamplePool(x=inp, y=tgt)

!mkdir -p train_log && rm -f train_log/*

# Training

In [None]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
@tf.function
def train_step(x, y):
    iter_n = tf.random.uniform([], 64, 96, tf.int32)
    with tf.GradientTape() as g:
        for i in tf.range(iter_n):
            x = ca(x)
        loss = tf.reduce_mean(loss_f(x, y))
    grads = g.gradient(loss, ca.weights)
    grads = [g/(tf.norm(g)+1e-8) for g in grads]
    # Hack to not train the first layer (Sobel filters)
    # because apparently setting .trainable = False isn't enough
    trainer.apply_gradients(zip(grads[1:], ca.weights[1:])) 
    return x, y, loss

for i in range(1500):
    batch = pool.sample(BATCH_SIZE)
    x0 = batch.x
    y0 = batch.y
    # print(x0.shape) == (4, 32, 32, 32, 16)
    # print(y0.shape) == (4, 32, 32, 32, 4)

    loss_rank = loss_f(x0, y0).numpy().argsort()[::-1]
    
    # print(loss_rank.shape) == (4,) might be the loss of 4 targets
    
    
    x0 = x0[loss_rank]
    y0 = y0[loss_rank]
    # print(x0.shape) == (4, 32, 32, 32, 16)
    # print(y0.shape) == (4, 32, 32, 32, 4)

    simple = np.random.choice(range(len(targets)), size=2) # two random number in targets

    # print(x0[:2].shape) == (2, 32, 32, 32, 16)
    # print(seed[simple].shape) == (2, 32, 32, 32, 16)
    x0[:2] = seed[simple]
    
    # remained to be figure out
    for n, u in enumerate(simple):  
        y0[n] = pad_targets[u]
        
        
    # print(x0.shape) == (4, 32, 32, 32, 16)
    # print(y0.shape) == (4, 32, 32, 32, 4)

    
    x, y, loss = train_step(x0, y0)
    
    # print(x.shape) == (4, 32, 32, 32, 16)
    # print(y.shape) == (4, 32, 32, 32, 4)

    step_i = len(loss_log)
    loss_log.append(loss.numpy())

    if step_i%100 == 0:
      clear_output()
      plot_loss(loss_log)
      export_model(ca, 'train_log/%04d'%step_i)

    if step_i%200 == 0:
      clear_output()
      batch_3d_viz(x, x0)
      pass

    print('\r step: %d, log10(loss): %.3f'%(len(loss_log), 
                                            np.log10(loss)), end='')
    
ca.save_weights('./wei/ditest.h5')

### Generation testing
codes below are used to generate 3d-voxel models by using trained models that stored in h5 files.
You can train your model by using the code above and test it by using the code below

In [None]:
new_model = CAModel3D()
new_model.load_weights('./test.h5')
import matplotlib.pyplot as plt

def plot_3d(arr,i):
    fig = plt.figure()
    ax = fig.add_subplot(projection='3d')
    u = np.moveaxis(seed[0], (0, 1), (1, 2))
    m = ax.voxels((u[:, :, :, 3] > 0.1), 
                  facecolors=np.clip(u[:, :, :, :4], 0, 1))
    clear_output()
    plt.show()

s=32
h=s
w=s
d=s
seed = np.zeros([4, s, s, s, 16], dtype=np.float)
seed[0, h//2, w//2 , d//2, 3:] = 1.0

for i in range(200):
  print(i)
  seed = new_model(seed)
  plot_3d(seed[0],i)

with open('output.json','w') as f:
  data = json.dumps(seed[0].numpy().tolist())
  f.write(data)

In [None]:
with open('output.json','r') as f:
    cat_limbs = json.loads(f.read())
ditest = np.array(cat_limbs)
targets = [ditest,ditest]
plot_3d(targets[0],0)

In [None]:
# pytorch version, not used any more
import torch
import torch.nn as nn

class NCAModel3D(nn.Module):
    def __init__(self, channel_n=CHANNEL_N, fire_rate=CELL_FIRE_RATE):
        super().__init__()
        self.channel_n = channel_n
        self.fire_rate = fire_rate

        self.pool3d = nn.MaxPool3d((3, 3, 3), stride=(1, 1, 1),
                                   padding=(1, 1, 1))

        perc = nn.Conv3d(self.channel_n, 64, (3, 3, 3), groups=16, bias=False,
                         padding_mode="circular", padding=(1, 1, 1))
        identify = torch.zeros(3, 3, 3)
        identify[1, 1, 1] = 1

        # 3D Sobel filters
        dx = torch.Tensor(np.array([1, 2, 1])[None, None, :] *
                          np.outer([1, 2, 1], [-1, 0, 1])[:, :, None])
        dy = torch.Tensor(np.array([1, 2, 1])[None, None, :] *
                          np.outer([1, 2, 1], [-1, 0, 1]).T[:, :, None])
        dz = torch.Tensor((np.array([1, 2, 1])[None, None, :] *
                           np.outer([1, 2, 1], [-1, 0, 1]).T[:, :, None]).T)

        kernel = torch.stack([identify, dx, dy, dz], -1)[:, :, :, :, None]
        kernel = kernel.repeat(1, 1, 1, CHANNEL_N, 1)
        kernel = torch.moveaxis(
            kernel.reshape(3, 3, 3, -1), -1, 0).reshape(-1, 1, 3, 3, 3)
        perc.weight = nn.Parameter(kernel)

        self.dmodel = nn.Sequential(
            perc,
            nn.Conv3d(64, 128, (1, 1, 1)),
            nn.ReLU(),
            nn.Conv3d(128, self.channel_n, (1, 1, 1)),
        )

    def forward(self, x, fire_rate=None, step_size=1.0):
        pre_life_mask = self.get_living_mask3d(x)
        dx = self.dmodel(x) * step_size
        if fire_rate is None:
            fire_rate = self.fire_rate
        update_mask = torch.rand(size=x[:, :1, :, :, :].shape) <= fire_rate
        x += dx * update_mask.float()

        post_life_mask = self.get_living_mask3d(x)
        life_mask = pre_life_mask & post_life_mask
        return x * life_mask.float()

    def get_living_mask3d(self, x):
        alpha = x[:, 3:4, :, :, :]
        return self.pool3d(alpha) > 0.1

##### generate gif
Here provide a simple piece of code that can generate the gif of how the model is generated from a singel dot.

In [None]:
import imageio
from pathlib import Path


def imgs2gif(imgPaths, saveName, duration=None, loop=0, fps=None):
    if fps:
        duration = 1 / fps
    images = [imageio.imread(str(img_path)) for img_path in imgPaths]
    imageio.mimsave(saveName, images, "gif", duration=duration, loop=loop)


# pathlist = Path(r"G:\img").glob("*.jpg")

p_lis = []
for i in range(130):
  p_lis.append('fig/'+str(i)+'.png')

# for n, p in enumerate(pathlist):
#     if n % 5 == 0:
#         p_lis.append(p)

imgs2gif(p_lis, "test.gif", 0)