# iNALU

In [None]:
import numpy as np
import tensorflow as tf

## Layers and Models

In [None]:
# original NALU with matrix gates
class NALU_Matrix(tf.keras.layers.Layer):
  def __init__(self, in_features=2, out_units=1, epsilon=0.000001):
    super().__init__()
    self.in_features = in_features
    self.out_units = out_units
    self.epsilon = epsilon

  def build(self, input_shape):
    self.G = self.add_weight(name="Gate_weights",
                             shape=[self.in_features, self.out_units],
                             initializer=tf.random_normal_initializer(stddev=1.0),
                             trainable=True)
    self.AW_hat = self.add_weight(name="AW_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.AM_hat = self.add_weight(name="AM_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MW_hat = self.add_weight(name="MW_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MM_hat = self.add_weight(name="MM_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)

  def call(self, inputs):
    a = tf.matmul(inputs, tf.nn.tanh(self.AW_hat) * tf.nn.sigmoid(self.AM_hat))
    m = tf.exp(tf.matmul(tf.math.log(tf.abs(inputs) + self.epsilon), tf.nn.tanh(self.MW_hat) * tf.nn.sigmoid(self.MM_hat)))
    gT = tf.nn.sigmoid( tf.matmul(inputs, self.G) )
    return tf.multiply(gT, a) + tf.multiply(1 - gT, m)

In [None]:
# original NALU with vector gates
class NALU_Vector(tf.keras.layers.Layer):
  def __init__(self, in_features=2, out_units=1, epsilon=0.000001):
    super().__init__()
    self.in_features = in_features
    self.out_units = out_units
    self.epsilon = epsilon

  def build(self, input_shape):
    self.G = self.add_weight(name="Gate_weights",
                             shape=[self.in_features],
                             initializer=tf.random_normal_initializer(stddev=1.0),
                             trainable=True)
    self.AW_hat = self.add_weight(name="AW_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.AM_hat = self.add_weight(name="AM_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MW_hat = self.add_weight(name="MW_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MM_hat = self.add_weight(name="MM_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)

  def call(self, inputs):
    a = tf.matmul(inputs, tf.nn.tanh(self.AW_hat) * tf.nn.sigmoid(self.AM_hat), transpose_b=True)
    m = tf.exp(tf.matmul(tf.math.log(tf.abs(inputs) + self.epsilon), tf.nn.tanh(self.MW_hat) * tf.nn.sigmoid(self.MM_hat), transpose_b=True))
    gT = tf.sigmoid(tf.matmul(inputs, tf.reshape(self.G, [self.in_features, -1])))
    return tf.multiply(gT, a) + tf.multiply(1 - gT, m)

In [None]:
# improved NALU with matrix gates
class iNALU_Matrix(tf.keras.layers.Layer):
  def __init__(self, in_features=2, out_units=1, epsilon=0.000001, clipping=20):
    super().__init__()
    self.in_features = in_features
    self.out_units = out_units
    self.epsilon = epsilon
    self.clipping = clipping

  def build(self, input_shape):
    self.G = self.add_weight(name="Gate_weights",
                             shape=[self.in_features, self.out_units],
                             initializer=tf.random_normal_initializer(stddev=1.0),
                             trainable=True)
    self.AW_hat = self.add_weight(name="AW_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.AM_hat = self.add_weight(name="AM_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MW_hat = self.add_weight(name="MW_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MM_hat = self.add_weight(name="MM_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)

  def call(self, inputs):
    a = tf.matmul(inputs, tf.nn.tanh(self.AW_hat) * tf.nn.sigmoid(self.AM_hat))
    w = tf.nn.tanh(self.MW_hat) * tf.nn.sigmoid(self.MM_hat)
    m = tf.exp(tf.minimum(tf.matmul(tf.math.log(tf.maximum(tf.abs(inputs), self.epsilon)), w), self.clipping))
    g = tf.sigmoid(tf.matmul(tf.abs(inputs), self.G))
    s = tf.abs(tf.reshape(w, [-1])) # flatten s to (200)
    return g * a + (1 - g) * m * tf.clip_by_value(tf.reduce_prod(tf.reshape(tf.sign(tf.reshape(tf.concat([inputs] * w.shape[1], axis=1), shape=[-1, w.shape[0] * w.shape[1]])) * s + (1 - s), shape=[-1, w.shape[1], w.shape[0]]), axis=2), -1, 1)


In [None]:
# improved NALU with vector gates
class iNALU_Vector(tf.keras.layers.Layer):
  def __init__(self, in_features=2, out_units=1, epsilon=0.000001, clipping=20):
    super().__init__()
    self.in_features = in_features
    self.out_units = out_units
    self.epsilon = epsilon
    self.clipping = clipping

  def build(self, input_shape):
    self.G = self.add_weight(name="Gate_weights",
                             shape=[self.in_features],
                             initializer=tf.random_normal_initializer(stddev=1.0),
                             trainable=True)
    self.AW_hat = self.add_weight(name="AW_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.AM_hat = self.add_weight(name="AM_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MW_hat = self.add_weight(name="MW_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MM_hat = self.add_weight(name="MM_hat",
                                  shape=[self.out_units, self.in_features],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)

  def call(self, inputs):
    a = tf.matmul(inputs, tf.nn.tanh(self.AW_hat) * tf.nn.sigmoid(self.AM_hat), transpose_b=True)
    w = tf.transpose(tf.nn.tanh(self.MW_hat) * tf.nn.sigmoid(self.MM_hat))
    m = tf.exp(tf.minimum(tf.matmul(tf.math.log(tf.abs(inputs) + self.epsilon), w), self.clipping))
    g = tf.sigmoid(tf.matmul(tf.abs(inputs), tf.reshape(self.G, [self.in_features, -1])))
    s = tf.abs(tf.reshape(w, [-1])) # flatten s to (200)
    return g * a + (1 - g) * m * tf.clip_by_value(tf.reduce_prod(tf.reshape(tf.sign(tf.reshape(tf.concat([inputs] * w.shape[1], axis=1), shape=[-1, w.shape[0] * w.shape[1]])) * s + (1 - s), shape=[-1, w.shape[1], w.shape[0]]), axis=2), -1, 1)


In [None]:
# improved NALU with input-independent gates
class iNALU_Independent(tf.keras.layers.Layer):
  def __init__(self, in_features=2, out_units=1, epsilon=0.000001, clipping=20):
    super().__init__()
    self.in_features = in_features
    self.out_units = out_units
    self.epsilon = epsilon
    self.clipping = clipping

  def build(self, input_shape):
    self.G = self.add_weight(name="Gate_weights",
                             shape=[self.out_units],
                             initializer=tf.random_normal_initializer(stddev=1.0),
                             trainable=True)
    self.AW_hat = self.add_weight(name="AW_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.AM_hat = self.add_weight(name="AM_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MW_hat = self.add_weight(name="MW_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)
    self.MM_hat = self.add_weight(name="MM_hat",
                                  shape=[self.in_features, self.out_units],
                                  initializer=tf.initializers.random_uniform(minval=-2, maxval=2),
                                  trainable=True)

  def call(self, inputs):
    a = tf.matmul(inputs, tf.nn.tanh(self.AW_hat) * tf.nn.sigmoid(self.AM_hat))
    w = tf.nn.tanh(self.MW_hat) * tf.nn.sigmoid(self.MM_hat)
    m = tf.exp(tf.minimum(tf.matmul(tf.math.log(tf.maximum(tf.abs(inputs), self.epsilon)), w), self.clipping))
    g = tf.sigmoid(self.G)
    s = tf.abs(tf.reshape(w, [-1])) # flatten s to (200)
    return g * a + (1 - g) * m * tf.clip_by_value(tf.reduce_prod(tf.reshape(tf.sign(tf.reshape(tf.concat([inputs] * w.shape[1], axis=1), shape=[-1, w.shape[0] * w.shape[1]])) * s + (1 - s), shape=[-1, w.shape[1], w.shape[0]]), axis=2), -1, 1)


In [None]:
# create a sequential NALU model of one of the NALU architectures
# model depth = len(dims) - 1
# dims[0] = input shape
# dims[-1] = output shape
def Sequential_NALU(arch, dims):
  return tf.keras.Sequential([tf.keras.Input(shape=dims[0])] + [
      arch(dims[i-1], dims[i]) for i in range(1, len(dims))
  ])

## Data

In [None]:
my_function = lambda inputs: ( # wrapper
  lambda x1, x2, x3: x1 * x2 + x3 # formula
)(*inputs)

In [None]:
def train_gen(formula, starts: list, steps: list, num_values: int, num_samples: int):
  # Generate a range of numbers at regular intervals for training
  x_t = np.array([np.arange(starts[i], starts[i] + num_samples*steps[i], step=steps[i], dtype=np.float32) for i in range(num_values)])
  y_t = formula(x_t)

  return x_t.T, y_t

x_train, y_train = train_gen(
  my_function,
  [0, 5, 1000],
  [1, 1, 1024],
  3,
  1000,
)

print(x_train.shape)
print(y_train.shape)

In [None]:
def test_gen(formula, min: int, max: int, num_values: int, num_samples: int):
  # Generate a series of input numbers for testing
  x_t = np.random.randint(min, max, size=(num_samples, num_values,)).astype(np.float32)
  y_t = formula(x_t.T)

  return x_t, y_t

x_test, y_test = test_gen(
  my_function,
  0,
  10000,
  3,
  200,
)

print(x_test.shape)
print(y_test.shape)

(200, 3)
(200,)


## Training

In [None]:
# inputs = tf.keras.Input(shape=(x_train.shape[1],))
# outputs = NALU_Matrix(x_train.shape[1], x_train.shape[1])(inputs)
# model = tf.keras.Model(inputs=inputs, outputs=outputs,)
model = Sequential_NALU(
    iNALU_Independent,
    [x_train.shape[1], x_train.shape[1], 1]
)

In [None]:
loss_fn = tf.keras.losses.MeanSquaredError()

starter_learning_rate = 0.01
end_learning_rate = 0.0001
epochs = 50000
epsilon = 1e-06
decay_steps = 0.9 * epochs
alpha = tf.keras.optimizers.schedules.PolynomialDecay(
    starter_learning_rate,
    decay_steps,
    end_learning_rate,
    power=0.5)

optimizer = tf.keras.optimizers.experimental.RMSprop(
    learning_rate=alpha,
)

In [None]:
earlystopping = tf.keras.callbacks.EarlyStopping(monitor='mse', patience=5000,) # min_delta=epsilon)

checkpoint_filepath = '/tmp/checkpoint'
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='mse',
    save_best_only=True)

In [None]:
model.compile(optimizer=optimizer,
              loss=loss_fn,
              metrics=[tf.keras.metrics.MeanAbsoluteError(name='mae'), tf.keras.metrics.MeanSquaredError(name='mse'), tf.keras.metrics.MeanAbsolutePercentageError(name='mape'), tf.keras.metrics.MeanSquaredLogarithmicError(name='msle')],)

In [None]:
model.summary()

In [None]:
history = model.fit(x_train,
                    y_train,
                    epochs=epochs,
                    # callbacks=[earlystopping],
                    callbacks=[model_checkpoint_callback],
                    verbose=1,)

## Evaluation and Testing

In [None]:
# Automated evaluation on main test set
model.evaluate(x_test,  y_test, verbose=2)

In [None]:
# Evaluate the model on a test set and pretty print
def pretty_test(model, x_test, y_test, template, final, epsilon=1e-9):
  assert len(x_test) == len(y_test)
  lines = []
  perc = 0
  total = len(y_test)
  rms = []
  for i in range(total):
    x = x_test[i]
    y = y_test[i]
    pred = model.predict(np.expand_dims(x, axis=0), verbose=0).squeeze().squeeze()
    acc = np.abs(pred/y) if np.abs(pred) < np.abs(y) else np.abs(y/pred)
    perc += acc
    err = np.abs(pred - y)
    rms.append(err)
    lines.append(template.format(*x, y, pred, acc, err))
  acc = perc/total
  rms = np.sqrt(np.mean(np.array(rms)**2))
  lines.append(final.format(acc, rms))
  return lines

In [None]:
# Manual human readable testing
x_test_2, y_test_2 = test_gen(
  my_function,
  50,
  100,
  3,
  10,
)

print("\n".join(pretty_test(
  model,
  x_test_2,
  y_test_2,
  "({} * {}) + {} = {}, prediction: {}, accuracy: {}, error: {}",
  "accuracy: {}, rms: {}",
  epsilon,
)))