## Import requirements

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf

%matplotlib inline
plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

import os
import glob
import pickle

import sys
sys.path.insert(0, '..')
from ca_funcs import make_glider, make_game_of_life
from utils import *
from train_ca import *


%load_ext autoreload
%autoreload 2

In [None]:
# Set up some global variables
USE_GPU = True

if USE_GPU:
    # device on dedicated node
    #device = '/device:GPU:0'
    # device assigned via slurm
    device = '/job:localhost/replica:0/task:0/device:XLA_GPU:0'
else:
    device = '/cpu:0'

# Constant to control how often we print when training models
print_every = 100

print('Using device: ', device)

In [None]:
############## MULTI-GPUs Config ##############
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True
#config.log_device_placement = True
config.log_device_placement = False
#config.gpu_options.visible_device_list='0,1,2,3'
config.gpu_options.visible_device_list='0'

In [None]:
# Set the random seed in tensorflow at graph level
tf.compat.v1.set_random_seed(111)

# Set the session in tensorflow
sess = tf.compat.v1.Session(config=config)

# Set the session in keras
tf.compat.v1.keras.backend.set_session(sess)

tf.debugging.set_log_device_placement(True)
strategy = tf.distribute.MirroredStrategy()

print('Number of devices: {}\n'.format(strategy.num_replicas_in_sync))

# Build and train a model

In [None]:
#### Make training data
tf.random.set_seed(0)

## Make training data
train_size, wspan, hspan = (100, 10, 10)
X_train = tf.convert_to_tensor(np.random.choice([0,1], (train_size, wspan, hspan), p=[.5,.5]), tf.float32)
gol = make_game_of_life()
Y_train = gol(tf.convert_to_tensor(X_train, tf.float32))

X_train = X_train[..., tf.newaxis]
Y_train = Y_train[..., tf.newaxis]


In [None]:
#### Define and build model
tf.random.set_seed(0)
layer_dims = [10, 10, 10]
num_classes = 2

try:
    del model
except:
    pass

loss = lambda x, y : tf.keras.losses.categorical_crossentropy(tf.reshape(x, shape=(-1, num_classes)), 
                                                              tf.reshape(y, shape=(-1, num_classes)), 
                                                              from_logits=True)
model = initialize_model((wspan, hspan), layer_dims, num_classes=num_classes)
# model = initialize_model((wspan, hspan), [10, 10, 10, 10], num_classes=num_classes, totalistic=True, bc="periodic")
model.compile(optimizer=tf.keras.optimizers.Adam(lr=1e-2), loss=loss)

model.summary()

In [None]:
#### Run training
Y_train_onehot = tf.squeeze(tf.one_hot(tf.cast(Y_train, tf.int32), num_classes))
train_history = model.fit(x=X_train, y=Y_train_onehot, epochs=100, batch_size=10, verbose=0)

plt.plot(train_history.history['loss'], 'k')

In [None]:
### Plot results

## Generate testing data
X_test = tf.convert_to_tensor(np.moveaxis(np.dstack([make_glider(10), make_glider(10)]), 2, 0), tf.float32)
# X_test = tf.convert_to_tensor(make_glider(10), tf.float32)[tf.newaxis, ...]
Y_test = gol(X_test)
X_test = X_test[..., tf.newaxis]
Y_test = Y_test[..., tf.newaxis]

Y_pred = logit_to_pred(model(X_test), shape=(-1, wspan, hspan))

plt.figure(figsize=(12,4))

plt.subplot(1,3,1)
plt.imshow(tf.squeeze(X_test[0]))
plt.axis('off')
plt.title("Input")

plt.subplot(1,3,2)
plt.imshow(tf.squeeze(Y_test[0]))
plt.axis('off')
plt.title("Expected Output")

plt.subplot(1,3,3)
plt.imshow(tf.squeeze(Y_pred[0]))
plt.axis('off')
plt.title("Observed Output")



In [None]:
### Save and load a model
model.save('path_to_my_model.h5')
del model
#model = tf.keras.models.load_model('path_to_my_model.h5', custom_objects={'Wraparound2D': Wraparound2D})

# Show activation patterns of hidden layers

In [None]:
import tensorflow.keras.backend as K

inp = model.input                                           # input placeholder
outputs = [layer.output for layer in model.layers]          # all layer outputs
functor = K.function(inp, outputs)   # evaluation function

layer_outs = functor([X_test, 1.])



# Plot activations of different neurons in different layers 
all_layer_activations = list()

min_max_scaler = lambda x : (x - np.min(x))/(np.max(x) - np.min(x))
# min_max_scaler = lambda x : (x - np.mean(x))
for j in range(1, 5):
    if j==1:
        layer_im = np.hstack([min_max_scaler(layer_outs[1][0][..., i]) for i in range(10)])
    else:
        pattern = np.reshape(layer_outs[j][0], (wspan, hspan, -1))
        layer_im = np.hstack([min_max_scaler(pattern[..., i]) for i in range(10)])
    all_layer_activations.append(layer_im)

        
plt.figure()
plt.imshow(np.vstack(all_layer_activations))
plt.title("Activations of hidden layers given \"Glider\" input")

plt.figure()
plt.imshow(np.squeeze(np.dstack(model.layers[1].weights[0].numpy())))
plt.title("Convolutional filters")