In [None]:
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior() 
import numpy as np
import matplotlib.pyplot as plt

# Define a Neural Arithmetic Logic Unit (NALU)
Useful to learn the **multiplication/division** operation.

In [None]:
# The Neural Arithmetic Logic Unit
def NALU(in_dim, out_dim):

    shape = (int(in_dim.shape[-1]), out_dim)
    epsilon = 1e-7 
    
    # NAC
    W_hat = tf.Variable(tf.truncated_normal(shape, stddev=0.02))
    M_hat = tf.Variable(tf.truncated_normal(shape, stddev=0.02))
    G = tf.Variable(tf.truncated_normal(shape, stddev=0.02))
        
    W = tf.tanh(W_hat) * tf.sigmoid(M_hat)
    # Forward propogation
    a = tf.matmul(in_dim, W)
    
    # NALU  
    m = tf.exp(tf.matmul(tf.log(tf.abs(in_dim) + epsilon), W))
    g = tf.sigmoid(tf.matmul(in_dim, G))
    y = g * a + (1 - g) * m
    
    return y


### Create some toy data

In [None]:
# Test the Network by learning the addition

# Generate a series of input number X1 and X2 for training
x1 = np.arange(0,10000,5, dtype=np.float32)
x2 = np.arange(5,10005,5, dtype=np.float32)


y_train = x1 * x2

x_train = np.column_stack((x1,x2))

print(x_train.shape)
print(y_train.shape)

# Generate a series of input number X1 and X2 for testing
x1 = np.arange(1000,2000,8, dtype=np.float32)
x2 = np.arange(1000,1500,4, dtype= np.float32)

x_test = np.column_stack((x1,x2))
y_test = x1 * x2

print()
print(x_test.shape)
print(y_test.shape)


## Build, train, and test NALU network

In [None]:
# Define the placeholder to feed the value at run time
X = tf.placeholder(dtype=tf.float32, shape =[None , 2])    # Number of samples x Number of features (number of inputs to be added)
Y = tf.placeholder(dtype=tf.float32, shape=[None,])

# define the network
# Here the network contains only one NAC cell (for testing)
y_pred = NALU(in_dim=X, out_dim=1)
y_pred = tf.squeeze(y_pred)             # Remove extra dimensions if any

# Mean Square Error (MSE)
loss = tf.reduce_mean( (y_pred - Y) **2)
#loss= tf.losses.mean_squared_error(labels=y_train, predictions=y_pred)



# Hyperparameters
alpha = 0.05    # learning rate
epochs = 2000

optimize = tf.train.AdamOptimizer(learning_rate=alpha).minimize(loss)

with tf.Session() as sess:

    #init = tf.global_variables_initializer()
    cost_history = []

    sess.run(tf.global_variables_initializer())

    # pre training evaluate
    print("Pre training MSE: ", sess.run (loss, feed_dict={X: x_test, Y: y_test}))
    print()
    for i in range(epochs):
        _, cost = sess.run([optimize, loss ], feed_dict={X: x_train, Y: y_train})
        print("epoch: {}, MSE: {}".format( i,cost) )
        cost_history.append(cost)

    # plot the MSE over each iteration
    plt.plot(np.arange(epochs),np.log(cost_history))  # Plot MSE on log scale
    plt.xlabel("Epoch")
    plt.ylabel("MSE")
    # plt.savefig('nalu_DOT.png')
    plt.show()

  
    # post training loss
    print("Post training MSE: ", sess.run(loss, feed_dict={X: x_test, Y: y_test}))

    print("Actual product: ", y_test[0:10])
    print()
    print("Predicted product: ", sess.run(y_pred[0:10], feed_dict={X: x_test, Y: y_test}))
