In [1]:
import tensorflow as tf

In [2]:
# Generating data of 3 dim input and 2 dim output
n, d = 500, 3
x = tf.random.uniform(minval=-1, maxval=1, shape = (n, d))
x2 = tf.cast(x**2, tf.float32)
weights_true = tf.cast(tf.constant([[5,1,5],[1,2,1]]), tf.float32)
bias_true = tf.cast(tf.constant([1,2]), tf.float32)
y_true = tf.matmul(x2, weights_true, transpose_b=True) + tf.matmul(x,weights_true,transpose_b=True) + bias_true
print(f'x: {x.shape}, weights: {weights_true.shape}, bias: {bias_true.shape}, y: {y_true.shape}')

x: (500, 3), weights: (2, 3), bias: (2,), y: (500, 2)


In [3]:
class MSE:
    def __call__(self, y_pred, y_true):
        self.y_pred = y_pred
        self.y_true = y_true
        return tf.reduce_mean((y_pred - y_true) ** 2)

    def backward(self):
        n = self.y_true.shape[0]
        self.gradient = 2. * (self.y_pred - self.y_true) / n
        return self.gradient

# From Scratch

In [4]:
class Linear:
    def __init__(self, input_dim: int, num_hidden: int = 1):
        self.weights = tf.Variable(tf.random.uniform(shape=(input_dim, num_hidden)))
        self.bias = tf.Variable(tf.zeros(shape=(num_hidden,)))
    
    def __call__(self, x):
        self.x = x
        return tf.matmul(x, self.weights) + self.bias

    def backward(self, gradient):
        self.weights_gradient = tf.matmul(self.x,gradient,transpose_a=True)
        self.bias_gradient = tf.reduce_sum(gradient, 0)
        self.x_gradient = tf.matmul(gradient, self.weights, transpose_b=True)
        return self.x_gradient

    def update(self, lr):
        self.weights = self.weights - lr * self.weights_gradient
        self.bias = self.bias - lr * self.bias_gradient

In [5]:
import math
class Relu:
    def __call__(self, input_):
        self.input_ = input_
        self.output = tf.clip_by_value(self.input_, 0, math.inf)
        return self.output
    
    def backward(self, output_gradient):
        self.input_gradient = tf.cast((self.input_ > 0), tf.float32) * output_gradient
        return self.input_gradient

In [6]:
from typing import Callable
class Model:
    def __init__(self, input_dim, num_hidden):
        self.linear1 = Linear(input_dim, num_hidden)
        self.relu1 = Relu()
        self.linear2 = Linear(num_hidden,12) # Pick 12 as number of neurons in hidden layer
        self.relu2 = Relu()
        self.linear3 = Linear(12,2)
    
    # Forward pass
    def __call__(self, x):
        l1 = self.linear1(x)
        r1 = self.relu1(l1)
        l2 = self.linear2(r1)
        r2 = self.relu2(l2)
        l3 = self.linear3(r2)
        return l3
    
    def backward(self, output_gradient):
        linear3_gradient = self.linear3.backward(output_gradient)
        relu2_gradient = self.relu2.backward(linear3_gradient)
        linear2_gradient = self.linear2.backward(relu2_gradient)
        relu1_gradient = self.relu1.backward(linear2_gradient)
        linear1_gradient = self.linear1.backward(relu1_gradient)
        return linear1_gradient

    def update(self, lr):
        self.linear3.update(lr)
        self.linear2.update(lr)
        self.linear1.update(lr)

In [7]:
# Training
def fit(x, y, model: Callable, loss: Callable, lr: float, num_epochs: int):
    for epoch in range(num_epochs):
        y_pred = model(x)
        loss_value = loss(y_pred, y)
        if epoch % 50 == 0:
            print(f'Epoch {epoch}, loss {loss_value}')
        gradient_from_loss = loss.backward()
        model.backward(gradient_from_loss)
        model.update(lr)

loss = MSE()
model = Model(d, 20)
fit(x, y_true, model=model, loss=loss, lr=0.0025, num_epochs=1000)

Epoch 0, loss 390.6031188964844
Epoch 50, loss 2.557358503341675
Epoch 100, loss 1.5464422702789307
Epoch 150, loss 1.208223581314087
Epoch 200, loss 0.9708928465843201
Epoch 250, loss 0.7791450619697571
Epoch 300, loss 0.6188719272613525
Epoch 350, loss 0.4915865361690521
Epoch 400, loss 0.39818868041038513
Epoch 450, loss 0.33309367299079895
Epoch 500, loss 0.28764432668685913
Epoch 550, loss 0.25477850437164307
Epoch 600, loss 0.2293122559785843
Epoch 650, loss 0.2089524120092392
Epoch 700, loss 0.19240371882915497
Epoch 750, loss 0.17844606935977936
Epoch 800, loss 0.16652798652648926
Epoch 850, loss 0.1562509834766388
Epoch 900, loss 0.14738903939723969
Epoch 950, loss 0.13960964977741241


In [8]:
import plotly.graph_objects as go
def plot_intereactive_3d(x, y, y_pred=None):

    fig = go.Figure()
    fig.add_trace(go.Scatter3d(x = x[:,0],
                        y = x[:,1],
                        z = y.reshape([-1]),
                        opacity=0.5, mode='markers', name='Underlying Function'
                        ))
    
    if y_pred is not None:
        fig.add_trace(go.Scatter3d(x = x[:,0],
                    y = x[:,1],
                    z = y_pred.reshape([-1]),
                    opacity=0.5, mode='markers', name='Predicted Function'
                    ))
        
    fig.update_layout(scene = dict(
                        xaxis_title='X1',
                        yaxis_title='X2',
                        zaxis_title='Y'),
                        width=700,
                        margin=dict(r=20, b=10, l=10, t=10))
    fig.show()

In [9]:
from sklearn.manifold import TSNE
X_reduced = TSNE(n_components=2).fit_transform(x)
y_true_reduced = TSNE(n_components=1).fit_transform(y_true)
y_pred_reduced = TSNE(n_components=1).fit_transform(model(x))
print(f'X_reduced: {X_reduced.shape}, y_true_reduced: {y_true_reduced.shape}, y_pred_reduced: {y_pred_reduced.shape}')
plot_intereactive_3d(X_reduced,y_true_reduced,y_pred_reduced)

X_reduced: (500, 2), y_true_reduced: (500, 1), y_pred_reduced: (500, 1)


# Built in functionality

In [10]:
from keras.layers import Layer
class Linear(Layer):
    """y = w.x + b"""
    def __init__(self, units=32):
        super(Linear, self).__init__()
        self.units = units
    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units), initializer='random_normal', trainable=True)
        self.b = self.add_weight(shape=(self.units,), initializer='random_normal', trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

In [11]:
class Dropout(Layer):
    def __init__(self, rate):
        super(Dropout, self).__init__()
        self.rate = rate

    def call(self, inputs, training=None):
        return inputs if training else tf.nn.dropout(inputs, rate=self.rate)

In [12]:
class MLP(Layer):
    """Simple stack of Linear layers."""

    def __init__(self):
        super(MLP, self).__init__()
        self.linear_1 = Linear(22)
        self.linear_2 = Linear(12)
        self.linear_3 = Linear(2)

    def call(self, inputs):
        x = self.linear_1(inputs)
        x = tf.nn.relu(x)
        x = self.linear_2(x)
        x = tf.nn.relu(x)
        return self.linear_3(x)

In [13]:
optimizer = tf.keras.optimizers.SGD(learning_rate=0.0001)
mse_loss_fn = tf.keras.losses.MeanSquaredError()
loss_metric = tf.keras.metrics.Mean()

dataset = tf.data.Dataset.from_tensor_slices((x, y_true))
dataset = dataset.shuffle(buffer_size=1).batch(30)

epochs = 500
mlp = MLP()
# Iterate over epochs.
for epoch in range(epochs):

    # Iterate over the batches of the dataset.
    for step, (x_batch, y_batch) in enumerate(dataset):
        with tf.GradientTape() as tape:
            # Forward Pass
            y_pred_batch = mlp(x_batch)
            # Compute loss
            loss = mse_loss_fn(y_batch, y_pred_batch)
            loss += sum(mlp.losses)  # Add regularization loss

        grads = tape.gradient(loss, mlp.trainable_weights) # Use autograd
        optimizer.apply_gradients(zip(grads, mlp.trainable_weights)) # Update learnable parameter

        loss_metric(loss)
    if epoch % 100 == 0:
        print(f'Epoch {epoch}, loss {loss_metric.result()}')

Epoch 0, loss 27.990819931030273
Epoch 100, loss 25.487775802612305
Epoch 200, loss 23.37083625793457
Epoch 300, loss 21.453664779663086
Epoch 400, loss 19.67547035217285


In [14]:
def plot_intereactive_3d(x, y, y_pred=None):
    import plotly.graph_objects as go

    fig = go.Figure()
    fig.add_trace(go.Scatter3d(x = x[:,0],
                        y = x[:,1],
                        z = y.reshape([-1]),
                        opacity=0.5, mode='markers', name='Underlying Function'
                        ))
    
    if y_pred is not None:
        fig.add_trace(go.Scatter3d(x = x[:,0],
                    y = x[:,1],
                    z = y_pred.reshape([-1]),
                    opacity=0.5, mode='markers', name='Predicted Function'
                    ))
        
    fig.update_layout(scene = dict(
                        xaxis_title='X1',
                        yaxis_title='X2',
                        zaxis_title='Y'),
                        width=700,
                        margin=dict(r=20, b=10, l=10, t=10))
    fig.show()

In [15]:
from sklearn.manifold import TSNE
X_reduced = TSNE(n_components=2).fit_transform(x)
y_true_reduced = TSNE(n_components=1).fit_transform(y_true)
y_pred_reduced = TSNE(n_components=1).fit_transform(mlp(x))
print(f'X_reduced: {X_reduced.shape}, y_true_reduced: {y_true_reduced.shape}, y_pred_reduced: {y_pred_reduced.shape}')
plot_intereactive_3d(X_reduced,y_true_reduced,y_pred_reduced)

X_reduced: (500, 2), y_true_reduced: (500, 1), y_pred_reduced: (500, 1)
