In [1]:
import time
import numpy as np
import pandas as pd
import tensorflow as tf
from functools import reduce
from matplotlib import pyplot as plt
from pprint import pprint
print(tf.__version__)
tf.random.set_seed(42) # The Ultimate Question of Life, the Universe, and Everything

true_weights = tf.constant(list(range(5)), dtype=tf.float32)[:, tf.newaxis]
x = tf.constant(tf.random.uniform((32, 5)), dtype=tf.float32)
y = tf.constant(x @ true_weights, dtype=tf.float32)


2.7.0


# Models

Model is a set of parameters and the computation methods using these parameters. It makes sense to create a class

In [5]:
class LinearRegression(object):
    def __init__(self, num_parameters):
        self._weights = tf.Variable(tf.random.uniform((num_parameters, 1)), dtype=tf.float32)
        
    @tf.function
    def __call__(self, x):
        return tf.linalg.matmul(x, self._weights)

    @property
    def variables(self):
        return self._weights
        

In [6]:
model = LinearRegression(5)

@tf.function
def train_step():
    with tf.GradientTape() as tape:
        y_hat = model(x)
        loss = tf.reduce_mean(tf.square(y-y_hat))
    gradients = tape.gradient(loss, model.variables)
    model.variables.assign_add(tf.constant([-0.05], dtype=tf.float32) * gradients)
    return loss

t0 = time.time()
for iteration in range(1001):
    loss = train_step()
    if not (iteration % 200):
        print('MSE at iteration {:4d} is {:5.4f}'.format(iteration, loss))
pprint(model.variables)
print(f'time took: {time.time() - t0} seconds')

MSE at iteration    0 is 15.1177
MSE at iteration  200 is 0.0278
MSE at iteration  400 is 0.0012
MSE at iteration  600 is 0.0001
MSE at iteration  800 is 0.0000
MSE at iteration 1000 is 0.0000
<tf.Variable 'Variable:0' shape=(5, 1) dtype=float32, numpy=
array([[-1.9200621e-03],
       [ 1.0043813e+00],
       [ 2.0000753e+00],
       [ 3.0021193e+00],
       [ 3.9955368e+00]], dtype=float32)>
time took: 3.477302312850952 seconds


works fine, but it can be better

In [7]:
class LinearRegression(tf.keras.Model):
    def __init__(self, num_parameters, **kwargs):
        super().__init__(**kwargs)
        self._weights = tf.Variable(tf.random.uniform((num_parameters, 1)), dtype=tf.float32)
        
    @tf.function
    def call(self, x): #keras model's __call__ is a wrapper over this call lol
        return tf.linalg.matmul(x, self._weights)
    # .variables already here, returns the collection

In [8]:
model = LinearRegression(5)

@tf.function
def train_step():
    with tf.GradientTape() as tape:
        y_hat = model(x)
        loss = tf.reduce_mean(tf.square(y - y_hat))
    gradients = tape.gradient(loss, model.variables)
    
    for g, v in zip(gradients, model.variables):
        v.assign_add(tf.constant([-0.05], dtype=tf.float32) * g)
    return loss

t0 = time.time()
for iteration in range(1001):
    loss = train_step()
    if not (iteration % 200):
        print('MSE at iteration {:4d} is {:5.4f}'.format(iteration, loss))
        
pprint(model.variables)
print(f'time took: {time.time() - t0} seconds')

MSE at iteration    0 is 18.7201
MSE at iteration  200 is 0.0325
MSE at iteration  400 is 0.0017
MSE at iteration  600 is 0.0002
MSE at iteration  800 is 0.0000
MSE at iteration 1000 is 0.0000
[<tf.Variable 'Variable:0' shape=(5, 1) dtype=float32, numpy=
array([[-3.1363715e-03],
       [ 1.0065705e+00],
       [ 2.0000944e+00],
       [ 3.0032609e+00],
       [ 3.9934683e+00]], dtype=float32)>]
time took: 0.2891373634338379 seconds


In [9]:
print(model.summary())
model.compile(loss='mse', metrics=['mae'])
print(model.evaluate(x, y, verbose=-1))

Model: "linear_regression"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
Total params: 5
Trainable params: 5
Non-trainable params: 0
_________________________________________________________________
None
[4.435382834344637e-06, 0.0016956925392150879]


In [10]:
model = LinearRegression(5)
model.compile(optimizer='SGD', loss='mse')
model.optimizer.lr.assign(.05)

t0 = time.time()
history = model.fit(x, y, epochs = 1001, verbose=0)
pprint(history.history['loss'][::200])
pprint(model.variables)
print(f'time took: {time.time() - t0} seconds')
# works longer due to convenience of .fit

[18.022109985351562,
 0.03321070224046707,
 0.0017082320991903543,
 0.000158100068802014,
 2.3927499569253996e-05,
 4.314993930165656e-06]
[<tf.Variable 'Variable:0' shape=(5, 1) dtype=float32, numpy=
array([[-3.0572712e-03],
       [ 1.0064490e+00],
       [ 2.0001047e+00],
       [ 3.0031922e+00],
       [ 3.9935679e+00]], dtype=float32)>]
time took: 1.6570169925689697 seconds


### adding useless bias to check if model handles it

In [11]:
class LinearRegressionV2(tf.keras.Model):
    def __init__(self, num_parameters, **kwargs):
        super().__init__(**kwargs)
        self._weights = tf.Variable(tf.random.uniform((num_parameters, 1)), dtype=tf.float32)
        self._bias = tf.Variable([100], dtype=tf.float32)
        
    @tf.function
    def call(self, x):
        return tf.linalg.matmul(x, self._weights) + self._bias
    
model = LinearRegressionV2(5)

t0 = time.time()
for iteration in range(1001):
    loss = train_step()
        
pprint(model.variables)
print(f'time took: {time.time() - t0} seconds')
# doesn't seem to see bias at all! tf is using same graph as before for some reason

[<tf.Variable 'Variable:0' shape=(5, 1) dtype=float32, numpy=
array([[0.803156  ],
       [0.49777734],
       [0.37054038],
       [0.9118674 ],
       [0.637642  ]], dtype=float32)>,
 <tf.Variable 'Variable:0' shape=(1,) dtype=float32, numpy=array([100.], dtype=float32)>]
time took: 0.19468259811401367 seconds


In [12]:
@tf.function
def train_step(model):
    with tf.GradientTape() as tape:
        y_hat = model(x)
        loss = tf.reduce_mean(tf.square(y-y_hat))
    gradients = tape.gradient(loss, model.variables)
        
    for g, v in zip(gradients, model.variables):
        v.assign_add(tf.constant([-0.05], dtype=tf.float32) * g)
    return loss

model = LinearRegression(5)
for iteration in range(1001):
    loss = train_step(model)
pprint(model.variables)

model = LinearRegressionV2(5)
for iteration in range(5001):
    loss = train_step(model)
pprint(model.variables)

print(train_step._get_tracing_count())


[<tf.Variable 'Variable:0' shape=(5, 1) dtype=float32, numpy=
array([[5.6935853e-04],
       [1.0001738e+00],
       [1.9999688e+00],
       [2.9999273e+00],
       [3.9994380e+00]], dtype=float32)>]
[<tf.Variable 'Variable:0' shape=(5, 1) dtype=float32, numpy=
array([[-2.6784972e-03],
       [ 9.9403036e-01],
       [ 1.9959843e+00],
       [ 2.9956350e+00],
       [ 3.9983540e+00]], dtype=float32)>,
 <tf.Variable 'Variable:0' shape=(1,) dtype=float32, numpy=array([0.00965639], dtype=float32)>]
2


## Layers

In [13]:
class Linear(tf.keras.layers.Layer):
    def __init__(self, num_inputs, num_outputs, **kwargs):
        super().__init__(**kwargs)
        self._weights = tf.Variable(tf.random.uniform((num_inputs, num_outputs)), dtype=tf.float32)
        
    @tf.function
    def call(self, x):
        return tf.linalg.matmul(x, self._weights)

class Regression(tf.keras.layers.Layer):
    def __init__(self, num_inputs_per_layer, num_outputs_per_layer, **kwargs):
        super().__init__(**kwargs)
        self._layers = [Linear(num_inputs, num_outputs)
                       for (num_inputs, num_outputs) in zip(num_inputs_per_layer, num_outputs_per_layer)]
        
    @tf.function
    def call(self, x):
        for layer in self._layers:
            x = layer(x)
        return x

In [14]:
model = Regression([5, 3], [3, 1])

for it in range(1001):
    loss = train_step(model)

print('MAE is:', tf.reduce_mean(tf.abs(y-model(x))).numpy())

MAE is: 1.3709068e-06


One problem with this linear layer is that it needs the complete sizing information and
allocates resources for all the variables upfront. Ideally, we want it to be a bit lazy, it
should calculate variable sizes and occupy resources only when needed. To archive this,
we implement the build method which will handle the variable initialization. The
build method can be explicitly called, or it will be invoked automatically the first time
there is data flow to it. With this, the constructor now only stores the hyperparameters for
the layer.


In [15]:
class Linear(tf.keras.layers.Layer):
    
    def __init__(self, units, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units
        
    def build(self, input_shape):
        self._weights = self.add_weight(shape=(input_shape[-1], self.units))
        super().build(input_shape)
        
    @tf.function
    def call(self, x):
        output = tf.linalg.matmul(x, self._weights)
        return output
    
class Regression(tf.keras.Model):
    
    def __init__(self, units, **kwargs):
        super().__init__(**kwargs)
        self._layers = [Linear(unit) for unit in units]
    
    @tf.function
    def call(self, x):
        for layer in self._layers:
            x = layer(x)
        return x
    
model = Regression([3, 1])
pprint(model.variables) # should be empty

for iteration in range(1001):
    loss = train_step(model)
    
print('MAE is: ', tf.reduce_mean(tf.abs(y - model(x))).numpy())

pprint(model.variables)

[]
MAE is:  3.3527613e-07
[<tf.Variable 'linear_4/Variable:0' shape=(5, 3) dtype=float32, numpy=
array([[-0.6077952 ,  0.4016254 ,  0.05021823],
       [-0.66946924, -0.04704069,  0.51261395],
       [-0.7182107 ,  0.36316237,  0.95652574],
       [ 0.34688628, -0.59620196,  1.321127  ],
       [ 0.88979864,  0.49720666,  1.6945884 ]], dtype=float32)>,
 <tf.Variable 'linear_5/Variable:0' shape=(3, 1) dtype=float32, numpy=
array([[0.21429297],
       [0.04486551],
       [2.2347693 ]], dtype=float32)>]


honestly it's just easier to use Dense

In [16]:
class Regression(tf.keras.Model):
    def __init__(self, units, **kwargs):
        super().__init__(**kwargs)
        self._layers = [tf.keras.layers.Dense(unit, use_bias=False) for unit in units]
    
    @tf.function
    def call(self, x):
        for layer in self._layers:
            x = layer(x)
        return x
    
model = Regression([3, 1])
pprint(model.variables) # should be empty

for iteration in range(1001):
    loss = train_step(model)
    
print('MAE is: ', tf.reduce_mean(tf.abs(y - model(x))).numpy())

pprint(model.variables)

[]
MAE is:  7.376075e-07
[<tf.Variable 'dense/kernel:0' shape=(5, 3) dtype=float32, numpy=
array([[ 0.5343044 ,  0.7596104 ,  0.04952784],
       [-0.44352436, -1.0123183 , -0.7924589 ],
       [ 0.40134627, -0.81265366, -0.4920736 ],
       [ 0.83151644, -0.9049989 , -0.69753546],
       [ 1.8104649 , -0.38345483, -0.5909047 ]], dtype=float32)>,
 <tf.Variable 'dense_1/kernel:0' shape=(3, 1) dtype=float32, numpy=
array([[ 1.7215422],
       [-1.1626605],
       [-0.740184 ]], dtype=float32)>]


## Activations

our model is just a linear transformation

let's add activations as layers

In [18]:
class ReLU(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    
    @tf.function
    def call(self, x):
        return tf.maximum(tf.constant(0, x.dtype), x)


class NeuralNetwork(tf.keras.Model):
    def __init__(self, units, last_linear=True, **kwargs):
        super().__init__(**kwargs)
        layers = []
        n = len(units)
        for i, unit in enumerate(units):
            layers.append(Linear(unit))
            if i < n - 1 or not last_linear:
                layers.append(ReLU())
        self._layers = layers
    
    @tf.function
    def call(self, x):
        for layer in self._layers:
            x = layer(x)
        return x

model = NeuralNetwork([3, 1])

for iteration in range(1001):
    loss = train_step(model)

print('MAE is: ', tf.reduce_mean(tf.abs(y - model(x))).numpy())

pprint(model.variables)


MAE is:  1.0803342e-06
[<tf.Variable 'linear_8/Variable:0' shape=(5, 3) dtype=float32, numpy=
array([[-0.23759118, -0.15370896,  0.50241596],
       [ 0.9812125 , -0.32753882,  0.47469217],
       [ 0.67018586,  0.53977454,  0.3383523 ],
       [ 0.73897237,  1.0795476 ,  0.34267387],
       [ 0.76488936,  1.2309433 ,  1.0314057 ]], dtype=float32)>,
 <tf.Variable 'linear_9/Variable:0' shape=(3, 1) dtype=float32, numpy=
array([[1.0873303],
       [1.705775 ],
       [1.0360578]], dtype=float32)>]


non-linearity made it harder

next idea is to fuse linear and non-linear together

In [20]:
class Linear(tf.keras.layers.Layer):
    def __init__(self, units, use_bias=True, activation='linear', **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units
        self.use_bias = use_bias
        self.activation = activation
        
    def build(self, input_shape):
        self._weights = self.add_weight(shape=(input_shape[-1], self.units))
        if self.use_bias:
            self._bias = self.add_weight(shape=(self.units), initializer='ones')
        super().build(input_shape)
    
    @tf.function
    def call(self, x):
        output = tf.linalg.matmul(x, self._weights)
        if self.use_bias:
            output += self._bias
        if self.activation == 'relu':
            outpus = tf.maximum(tf.constant(0, x.dtype), output)
        return output


class NeuralNetwork(tf.keras.Model):
    def __init__(self, units, use_bias=True, last_linear=True, **kwargs):
        super().__init__(**kwargs)
        layers = [Linear(unit, use_bias, 'relu') for unit in units[:-1]]
        layers.append(Linear(units[-1], use_bias, 'linear' if last_linear else 'relu'))
        self._layers = layers
    
    @tf.function
    def call(self, x):
        for layer in self._layers:
            x = layer(x)
        return x

model = NeuralNetwork([3, 1])

for iteration in range(1001):
    loss = train_step(model)

print('MAE is: ', tf.reduce_mean(tf.abs(y - model(x))).numpy())

pprint(model.variables)

MAE is:  5.505979e-06
[<tf.Variable 'linear_12/Variable:0' shape=(5, 3) dtype=float32, numpy=
array([[ 0.5395865 , -0.6896926 , -0.9564117 ],
       [ 0.10679358,  0.7153631 ,  0.81789774],
       [ 0.02986215, -0.46865082,  1.3411657 ],
       [ 1.2822582 , -0.5852419 ,  0.31231666],
       [ 1.8721235 ,  0.6778295 ,  0.6126671 ]], dtype=float32)>,
 <tf.Variable 'linear_12/Variable:0' shape=(3,) dtype=float32, numpy=array([-0.14929764,  1.107722  ,  0.25128555], dtype=float32)>,
 <tf.Variable 'linear_13/Variable:0' shape=(3, 1) dtype=float32, numpy=
array([[ 1.8443246 ],
       [-0.38262856],
       [ 1.3164601 ]], dtype=float32)>,
 <tf.Variable 'linear_13/Variable:0' shape=(1,) dtype=float32, numpy=array([0.36842757], dtype=float32)>]


### FC Networks

In [21]:
class Sequential(tf.keras.Model):
    def __init__(self, layers, **kwargs):
        super().__init__(**kwargs)
        self._layers = layers
        
    @tf.function
    def call(self, x):
        for layer in self._layers:
            x = layer(x)
        return x

class MLP(tf.keras.Model): # multi-layer perceptron
    def __init__(self, num_hidden_units, num_targets, hidden_activation='relu', **kwargs):
        super().__init__(**kwargs)
        if type(num_hidden_units) is int: num_hidden_units = [num_hidden_units]
        self.feature_extractor = Sequential([tf.keras.layers.Dense(unit, activation=hidden_activation) for unit in num_hidden_units])
        self.last_linear = tf.keras.layers.Dense(num_targets, activation='linear')
        
    @tf.function
    def call(self, x):
        features = self.feature_extractor(x)
        outputs = self.last_linear(features)
        return outputs

I'll test in on boston housing

In [None]:
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.boston_housing.load_data()
y_train, y_test = map(lambda x: np.expand_dims(x, -1), (y_train, y_test))
X_train, y_train, X_test, y_test = map(lambda x: tf.cast(x, tf.float32), (X_train, y_train, X_test, y_test))

@tf.function
def train_step(model, x, y):
    with tf.GradientTape() as tape:
        loss = tf.reduce_mean(tf.square(y - model(x)))
        
        gradients = tape.gradient(loss, model.variables)
        for g, v in zip(gradients, model.variables):
            v.assign_add(tf.constant([-0.01], dtype=tf.float32) * g)
        return loss

@tf.function
def test_step(model, x, y):
    return tf.reduce_mean(tf.square(y - model(x)))

def train(model, n_epochs=1000, his_freq=10):
    history = []
    for iteration in range(1, n_epochs + 1):
        tr_loss = train_step(model, X_train, y_train)
        te_loss = test_step(model, X_test, y_test)
        if not ineration % his_freq:
            history.append({
                'iteration': iteration,
                'training_loss': tr_loss.numpy(),
                'testing_loss': te_loss.numpy()
            })
    return model, pd.DataFrame(history)

mlp, mlp_history = train(MLP(4, 1))
pprint(mlp_history.tail())
ax = mlp_history.plot(x='iteration', kind='line', logy=True)
fig = ax.get_figure()
