In [None]:
# set up notebook for cloud

# on colab, you'll probably need to restart runtime after running the commands below

#! mkdir nsrts/weights
#! gdown 1np2iNoc7WVL0p7XxXFmCslrkGda3UIdr -O nsrts/weights/

#! git clone https://github.com/riveSunder/NeuralSymbolicRegressionThatScales.git nsrts
#! pip install nsrts/src/
#! pip install pytorch-lightning
#! pip install torch==1.11
#! pip install libtorch

#! git clone https://github.com/riveSunder/yuca.git
#! pip install -e ./yuca/


In [None]:
# change the directory fro NeuralSymbolicRegressionThatScales
# e.g. on colab `nsrts_dir = "nsrts"
nsrts_dir = "../nsrts"

In [None]:
import os

import numpy as np
import torch

import matplotlib
import matplotlib.pyplot as plt
matplotlib.rcParams["animation.embed_limit"] = 256

import matplotlib.animation
import IPython

from nesymres.architectures.model import Model
from nesymres.utils import load_metadata_hdf5
from nesymres.dclasses import FitParams, NNEquation, BFGSParams
from pathlib import Path
from functools import partial
import torch
from sympy import lambdify

import sympy as sp
import json

from yuca.lenia import Lenia
from yuca.zoo.librarian import Librarian
from yuca.utils import seed_all


In [None]:
def get_lenia_fn(neighborhood_conv, growth_fn, dt):
    """
    given the convolution and growth functions, and a step size dt
    return a function that implements a continuous CA under Lenia framework
    """
    
    def update_ca_fn(grid):
        
        nbhd = neighborhood_conv(grid)
        growth = growth_fn(nbhd)
        
        #grid = torch.tensor(np.clip(grid.clone().detach() + dt * growth, 0.0, 1.0)).clone().detach().to(grid.dtype).clone().detach()
        grid = torch.clamp(grid + dt*growth, 0, 1.0).double().to(grid.dtype)
        
        return grid
    
    return update_ca_fn

# functions facilitating IPython animation
# initializing the plot
def plot_grid(grid):
    
    global subplot_0 
    
    fig, ax = plt.subplots(1,1, figsize=(6,6), facecolor="white")
    
    subplot_0 = ax.imshow(grid.numpy().squeeze(), cmap="magma", interpolation="nearest")
    
    ax.set_xticklabels("")
    ax.set_yticklabels("")
    
    return fig, ax

# updating the figure during animation
def update_fig(ii):
    
    global subplot_0
    global grid
    global update_ca_fn
    
    grid = update_ca_fn(grid)
    
    subplot_0.set_array(grid.numpy().squeeze())
    



In [None]:
# user configurable options

use_toy_data = False

# whether to get input x/y dataset from CA simulation
use_ca_dynamics = True

# Gaussian and the Lenia Orbium params
mu = 0.15
sigma = 0.015
gaussian = lambda x: np.exp(-((x -mu) / sigma)**2)


# seed pseudorandom number generators
seed_all(1337)

In [None]:
if use_toy_data:
    pass
else:
    
    ca = Lenia()
    ca.restore_config("orbium.npy")
    ca.no_grad()

    x = np.arange(-0.1,1.5, 0.001)
    fig, ax = plt.subplots(1,2, gridspec_kw={"width_ratios": [0.2, 0.8]}, figsize=(12,3))
    ax[0].imshow(ca.neighborhood_kernels.squeeze(), cmap="magma")
    ax[0].set_title("neighborhood kernel")
    ax[1].plot(x, ca.genesis_fns[0](x))
    ax[1].set_title("growth function")
    plt.show()


    #grid[:,:, -64:, :64] = torch.rand(64,64)
    if use_ca_dynamics:
        
        grid = torch.zeros(1,1,128,128)
        my_steps = 32
        num_patterns = 5
        dim_rand = 54
        lib = Librarian()

        grid[:,:, :dim_rand, :] = torch.rand(dim_rand, grid.shape[-1])

        pattern, _ = lib.load("orbium_orbium000")

        for ii in range(num_patterns):
            grid[:,:,25*ii:25*ii+pattern.shape[-2], 25*ii:25*ii + pattern.shape[-1]] = torch.tensor(pattern)

        plt.figure(figsize=(6,6))
        plt.imshow(grid.squeeze(), cmap="magma")

        for step in range(my_steps):
            #neighborhood = ca.neighborhood_conv(grid)
            #old_grid = 1.0 * grid
            grid = ca(grid)

        #grid[:,:, :dim_rand, -dim_rand:] = torch.rand(dim_rand,dim_rand)

        neighborhood = ca.neighborhood_conv(grid)
        old_grid = 1.0 * grid
        grid = ca(grid)

        plt.figure(figsize=(6,6))
        plt.imshow(neighborhood.squeeze(), cmap="magma")

        plt.figure(figsize=(6,6))
        plt.imshow(old_grid.squeeze(), cmap="magma")
        plt.show()

       
    else:
        # use random uniform inputs to ca
        grid = torch.rand(1, 1, 128, 128)
        grid_ramp = torch.tensor(np.arange(0,1,1/grid.shape[-1])).reshape(1,1,1,grid.shape[-1])
        
        grid = (grid * grid_ramp).to(grid.dtype)

        neighborhood = ca.neighborhood_conv(grid)
        old_grid = 1.0 * grid
        grid = ca(grid)

        plt.figure(figsize=(6,6))
        plt.imshow(neighborhood.squeeze(), cmap="magma")

        plt.figure(figsize=(6,6))
        plt.imshow(old_grid.squeeze(), cmap="magma")
        plt.show()
        

    x0 = old_grid.numpy().ravel().reshape(-1,1)
    x1 = neighborhood.numpy().ravel().reshape(-1,1)

    #data_x = np.append(x0, x1, axis=1)
    data_x = x1 
    data_y = ca.genesis_fns[0](data_x) #grid.ravel().reshape(-1,1)


    print(x0.shape, x1.shape, data_x.shape, data_y.shape)


In [None]:
## Load equation configuration and architecture configuration
import omegaconf

json_filepath = os.path.join(nsrts_dir, "jupyter", "100M", "eq_setting.json")
with open(json_filepath, 'r') as json_file:
    eq_setting = json.load(json_file)
     
cfg_filepath = os.path.join(nsrts_dir, "jupyter", "100M", "config.yaml")
cfg = omegaconf.OmegaConf.load(cfg_filepath)
    
## Set up BFGS load rom the hydra config yaml
bfgs = BFGSParams(
        activated= cfg.inference.bfgs.activated,
        n_restarts=cfg.inference.bfgs.n_restarts,
        add_coefficients_if_not_existing=cfg.inference.bfgs.add_coefficients_if_not_existing,
        normalization_o=cfg.inference.bfgs.normalization_o,
        idx_remove=cfg.inference.bfgs.idx_remove,
        normalization_type=cfg.inference.bfgs.normalization_type,
        stop_time=cfg.inference.bfgs.stop_time,
    )

In [None]:
# adjust this parameter up for greater accuracy and longer runtime
cfg.inference.beam_size = 2

In [None]:
params_fit = FitParams(word2id=eq_setting["word2id"], 
                            id2word={int(k): v for k,v in eq_setting["id2word"].items()}, 
                            una_ops=eq_setting["una_ops"], 
                            bin_ops=eq_setting["bin_ops"], 
                            total_variables=list(eq_setting["total_variables"]),  
                            total_coefficients=list(eq_setting["total_coefficients"]),
                            rewrite_functions=list(eq_setting["rewrite_functions"]),
                            bfgs=bfgs,
                            beam_size=cfg.inference.beam_size #This parameter is a tradeoff between accuracy and fitting time
                            )

weights_path = os.path.join(nsrts_dir, "weights", "100M.ckpt")

In [None]:
## Load architecture, set into eval mode, and pass the config parameters
model = Model.load_from_checkpoint(weights_path, cfg=cfg.architecture)
model.eval()
if torch.cuda.is_available(): 
    model.cuda()

In [None]:
fitfunc = partial(model.fitfunc, cfg_params=params_fit)

In [None]:
print("X shape: ", data_x.shape)
print("y shape: ", data_y.shape)

In [None]:
idx = np.random.choice(16384, 8000)

output = fitfunc(data_x[idx], data_y[idx].squeeze()) 
output

In [None]:
print(output["best_bfgs_preds"])

sr_fn = lambda x: sp.lambdify("x_1", output["best_bfgs_preds"])(x)[0]

In [None]:
print(output["best_bfgs_preds"])

sr_fn = lambda x: sp.lambdify("x_1", output["best_bfgs_preds"])(x)[0]

In [None]:
data_x.mean(), data_x.std(), data_x.min(), data_x.max()

In [None]:
# visualize the SR growth function
my_x = data_x[:1000]

plt.figure(figsize=(12,8))

plt.plot(my_x, gaussian(my_x), "o", color="r")
plt.plot(my_x, sr_fn(my_x), "x", color="b", alpha=0.1)

plt.plot(my_x, np.clip(sr_fn(my_x),0,1),".",  color="b", alpha=0.4)
plt.show()


In [None]:
# validate the dynamics. Do patterns behave as expected? 
num_frames = 100
grid = torch.zeros(1,1,96,96)

lib =  Librarian()
ca = Lenia()
ca.restore_config("orbium.npy")
ca.no_grad()

pattern, _ = lib.load("orbium_orbium000")
grid[:,:,50:72,50:72] = torch.tensor(pattern)

print(ca.neighborhood_conv(grid).max())
update_ca_fn  = get_lenia_fn(ca.neighborhood_conv, sr_fn, ca.dt)

fig, ax = plot_grid(grid)

plt.close("all")

IPython.display.HTML(matplotlib.animation.FuncAnimation(fig, update_fig, frames=num_frames, interval=100).to_jshtml())