# Identity Increases Stability of Neural Cellular Automata
### James Stovold
### April 2025


Code to support ALIFE 2025 paper:
 > J. Stovold "Identity Increases Stability of Neural Cellular Automata"

Based on the Colab notebook developed by Mordvintsev et al. for the ["Growing Neural Cellular Automata"](http://distill.pub/2020/growing-ca) article. <!--and J. Stovold's Colab notebook for the ["Neural Cellular Automata can Respond to Signals"](https://github.com/jstovold/ALIFE2023/blob/master/ExternalSignals.ipynb) article at ALIFE 2023.-->

Code and materials are restricted for any and all use prior to publication; post-publication the code and associated materials are available under the Apache 2.0 license


In [None]:
import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')
if gpus:
  try:
    # Currently, memory growth needs to be the same across GPUs
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    # Memory growth must be set before GPUs have been initialized
    print(e)



In [None]:
#@title Imports and Notebook Utilities

import os
import io
import PIL.Image, PIL.ImageDraw
import base64
import zipfile
import json
import requests
import numpy as np
import matplotlib.pylab as pl
import glob
import random
import datetime

from scipy.ndimage.interpolation import rotate as scipy_rotate


import tensorflow as tf

from IPython.display import Image, HTML, clear_output
import tqdm

import os
os.environ['FFMPEG_BINARY'] = 'ffmpeg'
import moviepy.editor as mvp
from moviepy.video.io.ffmpeg_writer import FFMPEG_VideoWriter
clear_output()

def np2pil(a):
  if a.dtype in [np.float32, np.float64]:
    a = np.uint8(np.clip(a, 0, 1)*255)
  return PIL.Image.fromarray(a)

def imwrite(f, a, fmt=None):
  a = np.asarray(a)
  if isinstance(f, str):
    fmt = f.rsplit('.', 1)[-1].lower()
    if fmt == 'jpg':
      fmt = 'jpeg'
    f = open(f, 'wb')
  np2pil(a).save(f, fmt, quality=95)

def imencode(a, fmt='jpeg'):
  a = np.asarray(a)
  if len(a.shape) == 3 and a.shape[-1] == 4:
    fmt = 'png'
  f = io.BytesIO()
  imwrite(f, a, fmt)
  return f.getvalue()

def im2url(a, fmt='jpeg'):
  encoded = imencode(a, fmt)
  base64_byte_string = base64.b64encode(encoded).decode('ascii')
  return 'data:image/' + fmt.upper() + ';base64,' + base64_byte_string

def imshow(a, fmt='jpeg'):
  display(Image(data=imencode(a, fmt)))

def tile2d(a, w=None):
  a = np.asarray(a)
  if w is None:
    w = int(np.ceil(np.sqrt(len(a))))
  th, tw = a.shape[1:3]
  pad = (w-len(a))%w
  a = np.pad(a, [(0, pad)]+[(0, 0)]*(a.ndim-1), 'constant')
  h = len(a)//w
  a = a.reshape([h, w]+list(a.shape[1:]))
  a = np.rollaxis(a, 2, 1).reshape([th*h, tw*w]+list(a.shape[4:]))
  return a

def zoom(img, scale=4):
  img = np.repeat(img, scale, 0)
  img = np.repeat(img, scale, 1)
  return img

def rotate(img, angle=90):
  img = scipy_rotate(img, angle=angle)
  return img

class VideoWriter:
  def __init__(self, filename, fps=30.0, **kw):
    self.writer = None
    self.params = dict(filename=filename, fps=fps, **kw)

  def add(self, img):
    img = np.asarray(img)
    if self.writer is None:
      h, w = img.shape[:2]
      self.writer = FFMPEG_VideoWriter(size=(w, h), **self.params)
    if img.dtype in [np.float32, np.float64]:
      img = np.uint8(img.clip(0, 1)*255)
    if len(img.shape) == 2:
      img = np.repeat(img[..., None], 3, -1)
    self.writer.write_frame(img)

  def close(self):
    if self.writer:
      self.writer.close()

  def __enter__(self):
    return self

  def __exit__(self, *kw):
    self.close()


In [None]:
#@title Cellular Automata Parameters
loading_from_file = True

OTHER_N  = 12                       # Number of "empty" spaces in the genome for the CA to communicate between cells
ID_N     = 1                        # Number of ID channels

# Total number of CA state channels (+ 4 for RGBA):
CHANNEL_N = 4 + OTHER_N + ID_N

TARGET_PADDING = 15                 # Number of pixels used to pad the target image border
TARGET_SIZE = 30
TARGET_PADDED_SIZE = TARGET_SIZE + TARGET_PADDING * 2

BATCH_SIZE = 12
POOL_SIZE = 1024
CELL_FIRE_RATE = 0.5

EXPERIMENT_TYPE = "Regenerating" #param ["Growing", "Persistent", "Regenerating"]
EXPERIMENT_MAP = {"Growing":0, "Persistent":1, "Regenerating":2}
EXPERIMENT_N = EXPERIMENT_MAP[EXPERIMENT_TYPE]

USE_PATTERN_POOL = [0, 1, 1][EXPERIMENT_N]
DAMAGE_N = [0, 0, 3][EXPERIMENT_N]  # Number of patterns to damage in a batch

BEHAVIOUR_SWITCH=2000               # at what point do we change the way we present the target images to the network?
THRESHOLD = 0.01                    # kill off any NCAs that don't produce anything visible
fix_seed  = False                   # for reproducibility

CONTROL = 2     # 0: original NCA model (w/17ch); 1: ID NCA model (w/ one ID); 2: full ID NCA model (i.e. non-control)

In [None]:
#@title CA Model and Utilities

from tensorflow.keras.layers import Conv2D


def load_image(url, max_size=TARGET_SIZE, zoom=1):
  r = requests.get(url)
  img = PIL.Image.open(io.BytesIO(r.content))
  img.thumbnail((max_size*zoom, max_size*zoom), PIL.Image.LANCZOS)
  img = np.float32(img)/255.0
  img[...,:3] *= img[..., 3:]
  return img


def load_gecko(zoom=1):
  url = "https://github.com/googlefonts/noto-emoji/blob/main/png/128/emoji_u1f98e.png?raw=true"
  return load_image(url, zoom=zoom)


def to_rgba(x):
  return x[..., :4]

def to_rgba_id(x, y=None):
  if CONTROL == 0:
    return(x[...,:4])
  else:
    if y is None:
      return tf.concat((x[..., :4], x[..., -ID_N:]), axis=-1)
    else:
      pad = tf.ones(x[...,-ID_N:].shape, tf.float32) * y
      return tf.concat((x[..., :4], pad))

def to_alpha(x):
  return tf.clip_by_value(x[..., 3:4], 0.0, 1.0)

def to_rgb(x):
  # assume rgb premultiplied by alpha
  rgb, a = x[..., :3], to_alpha(x)
  return 1.0-a+rgb

def get_channels_no_alpha(x, ch=[5,6,7]):
  rgb = x[..., ch]
  return 1.0-rgb

def get_channels(x, ch=[5,6,7]):
  rgb, a = x[..., ch], to_alpha(x)
  return 1.0-a+rgb

def get_living_mask_wide(x, threshold=0.1):
  alpha = x[:, :, :, 3:4]
  return tf.nn.max_pool2d(alpha, 5, [1, 1, 1, 1], 'SAME') > threshold

def get_living_mask_ff(x, threshold = 0.1):
  firefly = x[:, :, :, -ENV_N:]
  return tf.nn.max_pool2d(firefly, 3, [1, 1, 1, 1], 'SAME') > threshold


def get_living_mask_rgb(x, threshold = 0.1):
#   alpha = x[:, :, :, 3:4]
  pool = tf.nn.max_pool2d(x[None, ...], 3, [1, 1, 1, 1], 'SAME')
  return tf.reduce_max(pool, axis=-1) > threshold

def get_living_mask(x, threshold = 0.1):
  alpha = x[:, :, :, 3:4]
  return tf.nn.max_pool2d(alpha, 3, [1, 1, 1, 1], 'SAME') > threshold

def get_living(x):
  alpha = x[0, :, :, 3:4]
  return tf.nn.max_pool2d(alpha, 3, [1, 1, 1, 1], 'SAME') > 0.1

def get_living_mask_np(x):
  alpha = x[0, :, :, 3:4]
  return (tf.nn.max_pool2d(alpha, 3, [1, 1, 1, 1], 'SAME') > 0.1).numpy()

def make_seed(size, n=1):
  x = np.zeros([n, size, size, CHANNEL_N], np.float32)
  x[:, size//2, size//2, 3:] = 1.0
  return x



@tf.function
def get_stim(jitter=False):
  size = TARGET_PADDED_SIZE
  stim_x = (size // 2) - (TARGET_SIZE // 10)
  stim_y = (size // 2) - (TARGET_SIZE // 10)

  if jitter:
    rand = tf.random.uniform(shape=(), minval=-1, maxval=1, dtype=tf.int32)
    stim_x = stim_x + rand
    stim_y = stim_y + rand

  return (stim_x, stim_y)

class CAModel(tf.keras.Model):

  def __init__(self, channel_n=CHANNEL_N, fire_rate=CELL_FIRE_RATE):
    super().__init__()
    self.channel_n = channel_n
    self.fire_rate = fire_rate

    self.dmodel = tf.keras.Sequential([
          Conv2D(128, 1, activation=tf.nn.relu, name='layer1'),
          Conv2D(self.channel_n, 1, activation=None,
              kernel_initializer=tf.zeros_initializer, name='layer2'),
    ])

    self(tf.zeros([1, 3, 3, channel_n]))  # dummy call to build the model

  @tf.function
  def perceive(self, x, angle=0.0):
    identify = np.float32([0, 1, 0])
    identify = np.outer(identify, identify)
    dx = np.outer([1, 2, 1], [-1, 0, 1]) / 8.0  # Sobel filter
    dy = dx.T
    c, s = tf.cos(angle), tf.sin(angle)
    kernel = tf.stack([identify, c*dx-s*dy, s*dx+c*dy], -1)[:, :, None, :]
    kernel = tf.repeat(kernel, self.channel_n, 2)
    y = tf.nn.depthwise_conv2d(x, kernel, [1, 1, 1, 1], 'SAME')
    return y

  @tf.function
  def call(self, x, fire_rate=None, angle=0.0, step_size=1.0):
    pre_life_mask = get_living_mask(x)

    y = self.perceive(x, angle)
    dx = self.dmodel(y) * step_size
    if fire_rate is None:
      fire_rate = self.fire_rate
    update_mask = tf.random.uniform(tf.shape(x[:, :, :, :1])) <= fire_rate
    x += dx * tf.cast(update_mask, tf.float32)

    post_life_mask = get_living_mask(x)
    life_mask = pre_life_mask & post_life_mask
    return x * tf.cast(life_mask, tf.float32)


CAModel().dmodel.summary()

# Training

In [None]:
#@title Train Utilities (SamplePool, Model Export, Damage)
from google.protobuf.json_format import MessageToDict
from tensorflow.python.framework import convert_to_constants

class SamplePool:
  def __init__(self, *, _parent=None, _parent_idx=None, **slots):
    self._parent = _parent
    self._parent_idx = _parent_idx
    self._slot_names = slots.keys()
    self._size = None
    for k, v in slots.items():
      if self._size is None:
        self._size = len(v)
      assert self._size == len(v)
      setattr(self, k, np.asarray(v))

  def sample(self, n):
    idx = np.random.choice(self._size, n, False)
    batch = {k: getattr(self, k)[idx] for k in self._slot_names}
    batch = SamplePool(**batch, _parent=self, _parent_idx=idx)
    return batch

  def commit(self):
    for k in self._slot_names:
      getattr(self._parent, k)[self._parent_idx] = getattr(self, k)

@tf.function
def make_circle_masks(n, h, w):
  x = tf.linspace(-1.0, 1.0, w)[None, None, :]
  y = tf.linspace(-1.0, 1.0, h)[None, :, None]
  center = tf.random.uniform([2, n, 1, 1], -0.5, 0.5)
  r = tf.random.uniform([n, 1, 1], 0.1, 0.4)
  x, y = (x-center[0])/r, (y-center[1])/r
  mask = tf.cast(x*x+y*y < 1.0, tf.float32)
  return mask


def export_model(ca, base_fn):
  ca.save_weights(base_fn)

  cf = ca.call.get_concrete_function(
      x=tf.TensorSpec([None, None, None, CHANNEL_N]),
      fire_rate=tf.constant(0.5),
      angle=tf.constant(0.0),
      step_size=tf.constant(1.0))
  cf = convert_to_constants.convert_variables_to_constants_v2(cf)
  graph_def = cf.graph.as_graph_def()
  graph_json = MessageToDict(graph_def)
  graph_json['versions'] = dict(producer='1.14', minConsumer='1.14')
  model_json = {
      'format': 'graph-model',
      'modelTopology': graph_json,
      'weightsManifest': [],
  }
  with open(base_fn+'.json', 'w') as f:
    json.dump(model_json, f)

def generate_pool_figures(pool, step_i):
  tiled_pool = tile2d(to_rgb(pool.x[:49]))
  fade = np.linspace(1.0, 0.0, 72)
  ones = np.ones(72)
  tiled_pool[:, :72]  += (-tiled_pool[:,  :72] + ones[None, :, None]) * fade[None, :,    None]
  tiled_pool[:, -72:] += (-tiled_pool[:, -72:] + ones[None, :, None]) * fade[None, ::-1, None]
  tiled_pool[:72, :]  += (-tiled_pool[:72,  :] + ones[:, None, None]) * fade[:,    None, None]
  tiled_pool[-72:, :] += (-tiled_pool[-72:, :] + ones[:, None, None]) * fade[::-1, None, None]
  imwrite('train_log/%04d_pool.jpg'%step_i, tiled_pool)

def visualize_batch(x0, x, y0, step_i):
  vis0 = np.hstack(to_rgb(x0).numpy())
  vis1 = np.hstack(to_rgb(x).numpy())
  vis2 = np.hstack(to_rgb(y0).numpy())
  vis3 = np.hstack(get_channels(x0, [16,16,16]).numpy())
  vis4 = np.hstack(get_channels(x.numpy(), [16,16,16]).numpy())
  vis = np.vstack([vis0, vis1, vis2, vis3, vis4])
  imwrite('train_log/batches_%04d.jpg'%step_i, vis)
  print('batch (before/after):')
  imshow(zoom(vis,2))

def plot_loss(loss_log):
  pl.figure(figsize=(10, 4))
  pl.title('Loss history (log10)')
  pl.plot(np.log10(loss_log), '.', alpha=0.1)
  pl.show()





In [None]:
#@title Choose Target Image { vertical-output: true}

target_img = load_gecko()
imshow(zoom(to_rgba(target_img), 2), fmt='png')



In [None]:
#@title Pull the images from github according to genome { vertical-output: true }

target_img_1 = load_gecko()
h, w = target_img_1.shape[:2]

p = (TARGET_PADDED_SIZE - h) // 2
padded_img_1 = tf.pad(target_img_1, [[p, p], [p,p], [0,0]])

print(padded_img_1.shape)
imshow(zoom(to_rgba(padded_img_1),2),fmt='png')



In [None]:
#@title Initialize Training { vertical-output: true}

# @tf.function
def make_seed_env(signal=1.0):
  h, w = (TARGET_PADDED_SIZE, TARGET_PADDED_SIZE)
  seed = np.zeros([h, w, CHANNEL_N], np.float32)
  seed[h//2, w//2, 3:] = 1.0
  if CONTROL == 0:
    return seed, 0.0
  elif CONTROL == 1:
    signal = 1.0

  seed[h//2, w//2, -ID_N:] = signal
  return seed, signal

@tf.function
def get_target(target_img, signal):
  if CONTROL == 0:
    return(target_img)
  elif CONTROL == 1:
    signal = 1.0

  pad = tf.ones(target_img[...,0].shape, tf.float32) * tf.cast(signal, tf.float32)

  # restrict signal ID to living cells
  mask = get_living_mask_rgb(target_img)
  pad = pad * tf.cast(mask, tf.float32)

  # stick it on the back of the target image:
  new_target_img = tf.concat((target_img, pad[0,...,None]), -1)
  return(new_target_img)




h, w = padded_img_1.shape[:2]
print(h,w)
seed = np.zeros([h, w, CHANNEL_N], np.float32)
seed[h//2, w//2, 3:]  = 1.0
seed[h//2, w//2, -ID_N:] = 1.0

def loss_f(x, y):
  return tf.reduce_mean(tf.square(to_rgba_id(x)-to_rgba_id(y)), [-2, -3, -1])

ca = CAModel()

loss_log = []
lr = 2e-3

lr_sched = tf.keras.optimizers.schedules.PiecewiseConstantDecay(
    [2000], [lr, lr*0.1])
trainer = tf.keras.optimizers.Adam(lr_sched)


loss0 = loss_f(seed[None,...], get_target(padded_img_1, 0)[None,...]).numpy()

pool = SamplePool(x=np.repeat(seed[None, ...], POOL_SIZE, 0),
                  y=np.repeat(get_target(padded_img_1,0)[None,...], POOL_SIZE, 0),
                  z=np.repeat(1.0, POOL_SIZE,0))

!mkdir -p train_log && rm -f train_log/*

In [None]:
#@title Training Loop {vertical-output: true}

@tf.function
def train_step(x, y):
  iter_n = tf.random.uniform([], 64, 200, tf.int32)
  with tf.GradientTape() as g:
    for i in tf.range(iter_n):
      x = ca(x)
    loss = tf.reduce_mean(loss_f(x, y))
  grads = g.gradient(loss, ca.weights)
  grads = [g/(tf.norm(g)+1e-8) for g in grads]
  trainer.apply_gradients(zip(grads, ca.weights))
  return x, loss

if fix_seed:
  tf.random.set_seed(1)

if not loading_from_file:
  tic = datetime.datetime.now()

  start_i = 6
  end_i = 9
  num_seeded = 3
  add_noise = False

  for i in range(8000+1):

    if i == BEHAVIOUR_SWITCH:
      # move signal to middling outputs rather than best outputs
      # (keep training best outputs to persist)
      start_i    = 6
      end_i      = 12
      num_seeded = 3
      add_noise  = False


    if USE_PATTERN_POOL:
      batch = pool.sample(BATCH_SIZE)
      x0 = batch.x
      y0 = batch.y
      z0 = batch.z

      loss_rank = loss_f(x0, y0).numpy().argsort()[::-1]
      x0 = x0[loss_rank]
      y0 = y0[loss_rank]
      z0 = z0[loss_rank]

      for i in range(num_seeded):
        x0[i], z0[i] = make_seed_env(np.round(np.random.random() * 2.0) / 2.0)
        y0[i] = get_target(padded_img_1, z0[i])

      for y in range(start_i, end_i):
        # add noise:
        if add_noise:
          noise = tf.random.uniform(x0[y, ..., :-ID_N].shape, maxval=0.2)
          noise = tf.concat((noise, tf.zeros(x0[y,...,-ID_N:].shape)), -1)
          x0[y] = x0[y] + noise

      for y in range(BATCH_SIZE-DAMAGE_N, BATCH_SIZE):
          damage = 1.0-make_circle_masks(1, h, w).numpy()[..., None]
          x0[y:] *= damage
          if np.sum(to_rgba(x0[y])) < THRESHOLD:
            x0[y], z0[y] = make_seed_env(np.round(np.random.random() * 2.0) / 2.0)
            y0[y] = get_target(padded_img_1, z0[y])
          else:
            y0[y,...] = get_target(padded_img_1, z0[y])


    else:
      seed = make_seed_env()
      x0 = np.repeat(seed[None, ...], BATCH_SIZE, 0)

    x, loss = train_step(x0, y0)

    if USE_PATTERN_POOL:
      batch.x[:] = x
      batch.y[:] = y0
      batch.z[:] = z0
      batch.commit()

    step_i = len(loss_log)
    loss_log.append(loss.numpy())

    if step_i%10 == 0:
      generate_pool_figures(pool, step_i)
    if step_i%100 == 0:
      clear_output()
      visualize_batch(x0, x, y0, step_i)
      print(z0)
      plot_loss(loss_log)
      export_model(ca, 'train_log/%04d.weights.h5'%step_i)

    print('\r step: %d, log10(loss): %.3f'%(len(loss_log), np.log10(loss)), end='')
  toc = datetime.datetime.now()
  print("\n", toc - tic)
  loading_from_file = False

In [None]:
#@title Save weights to file and download {vertical-output:true}
if not loading_from_file:
  from google.colab import files
  filename = f'model_control_{CONTROL}_rerun.tar.gz'
  !tar -zcvf {filename} train_log/
  files.download(filename)


# Figures

In [None]:
#@title Load model from file {vertical-output:true}
# loading_from_file = True

def load_model_from_file():
  r = requests.get(f'https://github.com/jstovold/ALIFE2025/raw/refs/heads/master/model_control_{CONTROL}.tar.gz?raw=true')
  with open("/content/model.tar.gz", "wb") as f:
    f.write(r.content)

  !rm -rf /content/train_log
  !tar -xvf /content/model.tar.gz
  !rm /content/model.tar.gz

if loading_from_file:
  load_model_from_file()

# Testing multiple organisms in same environment

In [None]:
#@title Generate comparison image {vertical-output:true}

def generate_comparison_offset_aux(x1,y1, x2,y2, offset1, offset2):
  size = TARGET_PADDED_SIZE
  comparison_img = np.zeros([size*2, size*3, 4], np.float32)
  gecko = load_gecko()
  g_size = gecko.shape[0]

  top1  = y1 - (g_size//2) + offset1
  left1 = x1 - (g_size//2)
  top2  = y2 - (g_size//2) + offset2
  left2 = x2 - (g_size//2)

  comparison_img[top1:top1+g_size, left1:left1+g_size, ...] += gecko
  comparison_img[top2:top2+g_size, left2:left2+g_size, ...] += gecko

  return(comparison_img)


def generate_comparison(distance,offset1=0,offset2=0):
  size = TARGET_PADDED_SIZE
  comp_img = generate_comparison_offset_aux(int(size*distance), size, size*2, size, offset1, offset2)
  return(comp_img)

imshow(zoom(to_rgba(generate_comparison(1.9,5,0)), 2), fmt='png')

In [None]:
#@title Collect data on errors and bound boxes {vertical-output:true}
## N.B. this will take a long time to run (min. 24h)


NUM_ORGANISMS       = 1                        # how many to grow
model_nums          = [8000] * NUM_ORGANISMS   # set up which model to use
distances           = [1.7, 1.75, 1.8, 1.85, 1.9]  # (2.0 -) distance between organisms
time_to_seed_second = 10                       # how long to wait between seeds
seed_times          = [0, 10, 50, 100, 150, 200, 250]
length_of_trial     = 1000                     # how long to run trial for
use_drive           = True                     # push output to Google Drive?
save_root           = '/content/Output'
capture_history     = False
offset_vals         = [0,5,10,15]
OLD_CONTROL = CONTROL

seed1_vals = [0.0,0.5,1.0]
seed2_vals = [0.0,0.5,1.0]



def calc_bounding_box(x):
  # function to calculate the bounding box of the living cells within x
  # returns a tuple: (area, width, height, density of fill)
  # assumes rank-4 tensor as input: [model, y, x, c]

  alpha     = x[...,3]

  num_cells = np.count_nonzero(alpha)
  cells     = np.nonzero(alpha)

  # find top-most, left-most, right-most, and bottom-most cells with a value
  top       = np.min(cells[1])
  bottom    = np.max(cells[1])
  right     = np.min(cells[2])
  left      = np.max(cells[2])

  # shouldn't need abs, but...
  height    = abs(bottom - top) + 1
  width     = abs(right - left) + 1

  area      = height * width
  density   = num_cells / area

  return (area, width, height, density, top, left)



def rmse(x, y):
  return tf.reduce_mean(tf.square(to_rgba(x)-to_rgba(y)), [-2, -3, -1])

err = np.zeros((3, len(seed_times), len(distances), len(offset_vals), len(offset_vals), len(seed1_vals), len(seed2_vals), 1000), np.float32)
box = np.zeros((3, len(seed_times), len(distances), len(offset_vals), len(offset_vals), len(seed1_vals), len(seed2_vals), 1000, 6), np.float32)
total_runs = 3 * len(seed_times) * len(distances) * len(offset_vals) * len(offset_vals) * len(seed1_vals) * len(seed2_vals)
run_i = 1


model_nums = [8000]

time_to_seed_second = 10


if use_drive:
  from google.colab import drive
  drive.mount('/content/drive')
  save_root = '/content/drive/MyDrive/Research/NCAs/Identity/Full_Test'

for c in range(1,3,1):
# if True:
  CONTROL = c
  load_model_from_file(CONTROL == 1)
  root_dir = ''
  models = []
  for i in model_nums:
    ca = CAModel()
    ca.load_weights(root_dir + 'train_log/%04d.weights.h5'%i)
    models.append(ca)
  t_i = -1
  for time_to_seed_second in seed_times:
    t_i += 1
    d = -1
    for distance in distances:
      d += 1
      o1 = -1
      for offset in offset_vals:
        o1 += 1
        o2 = -1
        for offset2 in offset_vals:
          o2 += 1
          comp_img = generate_comparison(distance=distance, offset1=offset, offset2=offset2)
          s1 = -1
          for seed1 in seed1_vals:
            s1 += 1
            s2 = -1
            for seed2 in seed2_vals:
              s2 += 1
              x = np.zeros([1, TARGET_PADDED_SIZE*2, TARGET_PADDED_SIZE*3, CHANNEL_N], np.float32)

              # seed two organisms:
              x[..., TARGET_PADDED_SIZE+offset, int(TARGET_PADDED_SIZE*distance), 3:] = 1.0
              x[..., TARGET_PADDED_SIZE+offset, int(TARGET_PADDED_SIZE*distance), -ID_N:] = seed1

              out_fn = f'test_boundingbox_gecko_distance_{distance}_control_{CONTROL}_numorg_{NUM_ORGANISMS}_second_{time_to_seed_second}_length_{length_of_trial}_offset1_{offset}_offset2_{offset2}_seed1_{int(seed1*10)}_seed2_{int(seed2*10)}.mp4'


              with VideoWriter(out_fn) as vid:
                for i in tqdm.trange(1000):
                  if i == time_to_seed_second:
                    x[..., TARGET_PADDED_SIZE+offset2, TARGET_PADDED_SIZE*2, 3:] = 1.0
                    x[..., TARGET_PADDED_SIZE+offset2, TARGET_PADDED_SIZE*2, -ID_N:] = seed2


                  bounds = calc_bounding_box(x) #  (area, width, height, density, top, left)
                  box[c,t_i,d,o1,o2,s1,s2,i,:] = bounds
                  err[c,t_i,d,o1,o2,s1,s2,i]    = rmse(x, comp_img)

                  vis = np.hstack(to_rgb(x))
                  vis2 = np.hstack(get_channels_no_alpha(x, [16,16,16]))   #]
                  vis4 = np.hstack(get_channels(x, [16,16,16]))   #]
                  vid1 = np.vstack((vis, vis2, vis4))

                  vis_reduced = np.hstack(to_rgb(x[:,20:-20,(-TARGET_PADDED_SIZE*2)+20:-20,:]))

                  vid.add(zoom(vid1, 2))
                  for ca, xk in zip(models, x):
                    xk[:] = ca(xk[None,...])[0]

              # save video to {save_root}
              !mkdir -p {save_root}/{c}/videos/
              !cp {out_fn} {save_root}/{c}/videos/
              print(f"{run_i} / {total_runs}")
              run_i += 1

# After everything completes, push bounding box and error values to npy files:
np.save('/content/box.npy', box)
!cp /content/box.npy {save_root}

np.save('/content/err.npy', err)
!cp /content/err.npy {save_root}



In [None]:
print(box.shape)

In [None]:

NUM_ORGANISMS       = 1                        # how many to grow
model_nums          = [8000] * NUM_ORGANISMS   # set up which model to use
# distances           = [1.7, 1.75, 1.8, 1.85, 1.9]                      # (2.0 -) distance between organisms
distances           = [1.85, 1.9]                      # (2.0 -) distance between organisms
time_to_seed_second = 10                       # how long to wait between seeds
seed_times          = [10] #0, 50, 100, 150, 200, 250]
length_of_trial     = 1000                     # how long to run trial for
use_drive           = True
save_root           = ''
capture_history     = False
offset_vals         = [0,5,10,15]
OLD_CONTROL = CONTROL
run_single_test = True

seed1_vals = [0.0,0.5,1.0]
seed2_vals = [0.0,0.5,1.0]

# seed1_vals = [1.0]
# seed2_vals = [1.0]
CONTROL = 1
import numpy as np
save_root = '/content/drive/MyDrive/Research/NCAs/Identity/BoundingBox2_control1'
box_control1 = np.load(f"{save_root}/box_control1.npy")
print(box_control1.shape)

def calc_bounding_box(x):
  # function to calculate the bounding box of the living cells within x
  # returns a tuple: (area, width, height, density of fill)
  # assumes rank-4 tensor as input: [model, y, x, c]

  alpha     = x[...,3]

  num_cells = np.count_nonzero(alpha)
  cells     = np.nonzero(alpha)

  # find top-most, left-most, right-most, and bottom-most cells with a value
  top       = np.min(cells[1])
  bottom    = np.max(cells[1])
  right     = np.min(cells[2])
  left      = np.max(cells[2])

  # shouldn't need abs, but...
  height    = abs(bottom - top) + 1
  width     = abs(right - left) + 1

  area      = height * width
  density   = num_cells / area

  return (area, width, height, density, top, left)


In [None]:
# yes, perfectly aware this is terrible code, but I don't really care about
# speed/memory use, just need to get the data out so I can use R to build the
# graphs

import pandas as pd
box_reduced = box[...,-1,:]    # take only 1000th timestep

df = pd.DataFrame(columns = ['control','seed_times','distances', 'offset1', 'offset2', 'seed1', 'seed2', 'area', 'width','height','density', 'top', 'left' ])

for c in range(3):
  t_i = -1
  for t in seed_times:
    t_i += 1
    d_i = -1
    for d in distances:
      d_i += 1
      o1_i = -1
      for o1 in offset_vals:
        o1_i += 1
        o2_i = -1
        for o2 in offset_vals:
          o2_i += 1
          s1_i = -1
          for s1 in seed1_vals:
            s1_i += 1
            s2_i = -1
            for s2 in seed2_vals:
              s2_i += 1
              this_box = box_reduced[c-1,t_i,d_i,  o1_i, o2_i, s1_i, s2_i,:]
              df = pd.concat([df, pd.DataFrame([{'control':c,'seed_times':t,'distances': d, 'offset1': o1, 'offset2': o2, 'seed1':s1, 'seed2':s2, 'area': this_box[0], 'width':this_box[1],'height':this_box[2],'density':this_box[3], 'top':this_box[4], 'left':this_box[5]}])] , ignore_index=True)


In [None]:
#@title Push dataframe to CSV file
from google.colab import files
df_filename = '/content/box.csv'
df.to_csv(df_filename)
files.download(df_filename)
!cp {df_filename} {save_root}/box_test.csv

In [None]:
#@title Produce bounding boxes for comparison image as a point of reference
import pandas as pd

control_bounds = np.zeros((len(distances), len(offset_vals), len(offset_vals),6), np.float32)
d_i = -1
for distance in distances:
  d_i += 1
  o1_i = -1
  for offset1 in offset_vals:
    o1_i += 1
    o2_i = -1
    for offset2 in offset_vals:
      o2_i += 1
      img = generate_comparison(distance, offset1, offset2)
      temp_bounds = calc_bounding_box(img[None,...])
      control_bounds[d_i,o1_i,o2_i, :] = temp_bounds

df_control = pd.DataFrame(columns = ['distances', 'offset1', 'offset2', 'area', 'width','height','density', 'top', 'left' ])

d_i = -1
for d in distances:
  d_i += 1
  o1_i = -1
  for o1 in offset_vals:
    o1_i += 1
    o2_i = -1
    for o2 in offset_vals:
      o2_i += 1
      this_box = control_bounds[d_i, o1_i, o2_i, :]
      df_control = pd.concat([df_control, pd.DataFrame([{'distances': d, 'offset1': o1, 'offset2': o2, 'area': this_box[0], 'width':this_box[1],'height':this_box[2],'density':this_box[3],'top':this_box[4], 'left':this_box[5]}])] , ignore_index=True)


In [None]:
from google.colab import files
df_filename = '/content/box_control.csv'
df_control.to_csv(df_filename)
files.download(df_filename)