## Import requirements

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
print(tf.__version__)

2.4.0


In [4]:
import os
import glob
import pickle

import sys
sys.path.insert(0, '..')
from ca_funcs import make_glider, make_game_of_life
from utils import *
from train_ca import *



In [5]:
if tf.test.gpu_device_name():
    print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
else:
    print("Please install GPU version of TF")

Default GPU Device: /device:GPU:0


In [3]:
num_classes = 14
wspan= 100
hspan = 100
nhood = 1

# Build and train a model

In [7]:
X_train = tf.convert_to_tensor(np.random.choice([0,1], (1000, wspan, hspan), p=[.5,.5]), tf.float32)
gol = make_game_of_life()
Y_train = gol(tf.convert_to_tensor(X_train, tf.float32))

X_train = X_train[..., tf.newaxis]
Y_train = Y_train[..., tf.newaxis]

In [2]:
### Load Data
data = np.load("../../data/d0.npy")
for i in range(1,10):
    temp_data = np.load("../../data/d"+str(i)+".npy")
    data=np.concatenate((data,temp_data),axis=0)

In [3]:
data.shape
data=data.reshape(len(data),2,100,100,1)
data.shape

(145000, 2, 100, 100, 1)

In [5]:
### split data into train and test
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(data[:,0], data[:,1],test_size=0.2)


In [6]:
Y_train_onehot = tf.squeeze(tf.one_hot(tf.cast(Y_train, tf.int32), num_classes))
Y_test_onehot = tf.squeeze(tf.one_hot(tf.cast(Y_test, tf.int32), num_classes))

In [7]:
### save train test arrays
np.save('X_train.npy',X_train)
np.save('X_test.npy',X_test)
np.save('Y_train_onehot.npy',Y_train_onehot)
np.save('Y_test_onehot.npy',Y_test_onehot)

In [4]:
### load train and test arrays
#80
#X_train = np.load('X_train.npy')
#20
X_train = np.load('X_test.npy')
#20
#X_test = np.load('X_test.npy')
#80
#X_test = np.load('X_train.npy')
#80
#Y_train_onehot = np.load('Y_train_onehot.npy')
#20
#Y_train_onehot = np.load('Y_test_onehot.npy')
#20
#Y_test_onehot = np.load('Y_test_onehot.npy')
#80
#Y_test_onehot = np.load('Y_train_onehot.npy')
#80
#Y_train = np.load('Y_train.npy')
#20
Y_train = np.load('Y_test.npy')
#80
#Y_test = np.load('Y_train.npy')
#20
#Y_test = np.load('Y_test.npy')

In [21]:
#### Define and build model
tf.random.set_seed(0)
layer_dims = [100, 100, 100]
loss = lambda x, y : tf.keras.losses.MSE(x,y)
diameter = 2*nhood+1
model = tf.keras.Sequential()
model.add(tf.keras.layers.InputLayer((wspan, hspan, 1)))
model.add(tf.keras.layers.Conv2D(100, kernel_size=[diameter, diameter], padding='same', 
                                 activation='relu', kernel_initializer=tf.keras.initializers.he_normal(), 
                                 bias_initializer=tf.keras.initializers.he_normal()))
for i in range(11):
    model.add(tf.keras.layers.Conv2D(100, kernel_size=[1,1],activation='relu', kernel_initializer=tf.keras.initializers.he_normal(), 
                                 bias_initializer=tf.keras.initializers.he_normal()))
model.add(tf.keras.layers.Conv2D(1, kernel_size=[1,1], kernel_initializer=tf.keras.initializers.he_normal(), 
                                 bias_initializer=tf.keras.initializers.he_normal()))
#for i in range(1, len(layer_dims)):
#model.add(tf.keras.layers.Dense(layer_dims[i],  activation='relu',
#                                kernel_initializer=tf.keras.initializers.he_normal(), 
#                                bias_initializer=tf.keras.initializers.he_normal()))
#model.add(tf.keras.layers.Dense(num_classes,  activation='relu',
#                                kernel_initializer=tf.keras.initializers.he_normal(), 
#                                bias_initializer=tf.keras.initializers.he_normal()))
model.compile(optimizer=tf.keras.optimizers.Adam(lr=1e-4), loss=loss,metrics=['accuracy'])
#model.compile(optimizer=tf.keras.optimizers.SGD(lr=1e-1, nesterov=True), loss=loss,metrics=['accuracy'])

EPOCHS = 150
checkpoint_filepath = 'best_working_ADAM_e-3_keras_model_2.h5'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=True)
model.summary()

Model: "sequential_6"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
conv2d_78 (Conv2D)           (None, 100, 100, 100)     1000      
_________________________________________________________________
conv2d_79 (Conv2D)           (None, 100, 100, 100)     10100     
_________________________________________________________________
conv2d_80 (Conv2D)           (None, 100, 100, 100)     10100     
_________________________________________________________________
conv2d_81 (Conv2D)           (None, 100, 100, 100)     10100     
_________________________________________________________________
conv2d_82 (Conv2D)           (None, 100, 100, 100)     10100     
_________________________________________________________________
conv2d_83 (Conv2D)           (None, 100, 100, 100)     10100     
_________________________________________________________________
conv2d_84 (Conv2D)           (None, 100, 100, 100)    

In [None]:
#### Run training
train_history1 = model.fit(x=X_train, y=Y_train, epochs=150, batch_size=100,shuffle = True,verbose=1,validation_split=0.2,callbacks=[model_checkpoint_callback])
#train_history = model.fit(x=X_train, y=Y_train, epochs=EPOCHS, batch_size=28,shuffle = True,verbose=1,validation_split=0.2,callbacks=[model_checkpoint_callback])



Epoch 1/150
Epoch 2/150
Epoch 3/150

In [12]:
###  Load model
#file='best_working_SGD_nesterov_e-1_kerastemp.h5'
file='best_working_SGD_nesterov_e-1_keras_model_2.h5'
model.load_weights(file)

In [9]:
Y_train=np.reshape(Y_train,(len(Y_train),100,100,1))
print(Y_train.shape)
X_train=np.reshape(X_train,(len(X_train),100,100,1))
print(X_train.shape)

(29000, 100, 100, 1)
(29000, 100, 100, 1)


In [None]:
train_loss = train_history.history['loss']+train_history1.history['loss'] 
val_loss = train_history.history['val_loss']+train_history1.history['val_loss']
plt.plot(train_loss, 'k',label="training loss",color='green')
plt.plot(val_loss, 'k',label="validation loss",color='red')
plt.xlabel('epochs', fontsize=18)
plt.ylabel('loss', fontsize=16)
plt.legend() 

In [20]:
### Evaluate the model
results = model.evaluate(X_test, Y_test_onehot, batch_size=32)



In [21]:
print("test loss, test accuracy : ",results)

test loss, test accuracy :  [0.1317313313484192, 0.9501531720161438]


In [None]:
X_test = X_test[..., tf.newaxis]
Y_test = Y_test[..., tf.newaxis]

In [None]:
Y_pred = logit_to_pred(model(X_test[:13]), shape=(-1, wspan, hspan))

In [None]:
### Plot results

## Generate testing data
#X_test = tf.convert_to_tensor(np.moveaxis(np.dstack([make_glider(10), make_glider(10)]), 2, 0), tf.float32)
# X_test = tf.convert_to_tensor(make_glider(10), tf.float32)[tf.newaxis, ...]
#Y_test = gol(X_test)




plt.figure(figsize=(12,4))
temp=1
plt.subplot(1,3,1)
plt.imshow(tf.squeeze(X_test[temp]))
plt.axis('off')
plt.title("Input")
print('input',X_test[temp].reshape(100,100))
plt.subplot(1,3,2)
plt.imshow(tf.squeeze(Y_test[temp]))
plt.axis('off')
plt.title("Expected Output")
print('expected',Y_test[temp].reshape(100,100))
plt.subplot(1,3,3)
plt.imshow(tf.squeeze(Y_pred[temp]))
plt.axis('off')
plt.title("Observed Output")
print('obswered',Y_pred[temp])


In [18]:
### Save and load a model
model.save('keras_model.h5')
#del model
#model = tf.keras.models.load_model('path_to_my_model.h5', custom_objects={'Wraparound2D': Wraparound2D})

# Show activation patterns of hidden layers

In [None]:
import tensorflow.keras.backend as K

inp = model.input                                           # input placeholder
outputs = [layer.output for layer in model.layers]          # all layer outputs
functor = K.function(inp, outputs)   # evaluation function

layer_outs = functor([X_test, 1.])



# Plot activations of different neurons in different layers 
all_layer_activations = list()

min_max_scaler = lambda x : (x - np.min(x))/(np.max(x) - np.min(x))
# min_max_scaler = lambda x : (x - np.mean(x))
for j in range(1, 5):
    if j==1:
        layer_im = np.hstack([min_max_scaler(layer_outs[1][0][..., i]) for i in range(10)])
    else:
        pattern = np.reshape(layer_outs[j][0], (wspan, hspan, -1))
        layer_im = np.hstack([min_max_scaler(pattern[..., i]) for i in range(10)])
    all_layer_activations.append(layer_im)

        
plt.figure()
plt.imshow(np.vstack(all_layer_activations))
plt.title("Activations of hidden layers given \"Glider\" input")

plt.figure()
plt.imshow(np.squeeze(np.dstack(model.layers[1].weights[0].numpy())))
plt.title("Convolutional filters")