In [1]:
import numpy as np
import tensorflow as tf
import random

C:\Users\xnive\anaconda3\envs\tf\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll
C:\Users\xnive\anaconda3\envs\tf\lib\site-packages\numpy\.libs\libopenblas.WCDJNK7YVMPZQ2ME2ZZHJJRJ3JIKNDB7.gfortran-win_amd64.dll
  stacklevel=1)


# Getting more than linear estimations by training our weights

The idea of this project is to create an architecture that is small and is able to detect non-linear relationships between data. We will first explain this architecture and then make some experiments.

The idea behind this architecture is to have a Neural Network, named Task, with inputs I, outputs O and weights W, and instead of regular backpropagation to learn the weights W we implement a second Neural network, named Lift, with inputs I and outputs W (approximations of the weights of the first NN). For training we will consider the "right" input to be the weights after a pass of backpropragation, and use MSE to train the network Lift.

For this task we will use a simple MLP network for both of our networks but it should be able to extend to any architecture in a trivial way.

## Task

In [2]:
def task():
    task=tf.keras.Sequential(layers=[tf.keras.layers.Dense(5, activation="relu", input_shape=(3,)), tf.keras.layers.Dense(1)])
    task.compile(loss="mse")
    return task

## Lift

In [3]:
def lift():
    lift=tf.keras.Sequential(layers=[tf.keras.layers.Dense(10), tf.keras.layers.Dense(26)])
    lift.compile(loss="mse")
    return lift

## Model

In [4]:
class lifter(tf.keras.Model):
    def __init__(self):
        super(lifter, self).__init__()
        self.task=tf.keras.Sequential(layers=[tf.keras.layers.Dense(5, activation="relu", input_shape=(3,)), tf.keras.layers.Dense(1)])
        self.lift=tf.keras.Sequential(layers=[tf.keras.layers.Dense(10, activation="relu", input_shape=(3,)), tf.keras.layers.Dense(26)])
        self.task.compile(loss="mse")
        self.lift.compile(loss="mse")
        weights=self.task.get_weights()
        self.weight_shapes=[np.shape(weight) for weight in weights]

    def call(self, X):
        pred=np.array([[]])
        for i in range(len(X)):
            new_weights=self.lift(X[i:i+1]).numpy()
            j=0
            start=0
            end=0
            end2=0
            for layer in self.task.layers:
                end+=get_size(self.weight_shapes[j])
                end2=end+get_size(self.weight_shapes[j+1])
                new_layer_weights=[np.reshape(new_weights[:,start:end], self.weight_shapes[j]), np.reshape(new_weights[:,end:end2], self.weight_shapes[j+1])]
                layer.set_weights(new_layer_weights)
                start=end2
                end=end2
                j+=2
            new_pred=self.task(X[i:i+1]).numpy()
            pred=np.concatenate([pred, new_pred], axis=1)
        return pred

    def fit(self, X,y, epochs=1, print_every=1000):
        for j in range(epochs):
            for i in range(len(X)):
                if (i+1)%print_every==0:
                    print(i+1)
                self.call(X[i:i+1])
                self.task.fit(X[i:i+1],y[i:i+1], verbose=0)
                weights=flatten_weights(self.task.get_weights())
                self.lift.fit(X[i:i+1], weights, verbose=0)
        

## Auxiliary Functions

In [5]:
def flatten_weights(weights):
    flat=np.array([])
    for weight in weights:
        flat=np.concatenate([flat, weight.flatten()])
    return np.array([flat])

In [6]:
def get_size(shape):
    size=1
    for i in shape:
        size*=i
    return size

# Data and Training

We will make a small artificial task where the data has inputs (x,y,z) and outputs x^2+2yz

In [7]:
random.seed(0)
X=np.array([[random.randrange(0,10), random.randrange(0,10), random.randrange(0,10)] for i in range(10000)])
y=np.array([X[i,0]**2+2*X[i,1]*X[i,2] for i in range(10000)])

X_test=np.array([[random.randrange(0,10), random.randrange(0,10), random.randrange(0,10)] for i in range(10000)])
y_test=np.array([X_test[i,0]**2+2*X_test[i,1]*X_test[i,2] for i in range(10000)])

In [8]:
model=lifter()
model.fit(X, y, epochs=1, print_every=1000)

1000
2000
3000
4000
5000
6000
7000
8000
9000
10000


# Comparison with regular model

In [9]:
test1=tf.keras.Sequential(layers=[tf.keras.layers.Dense(5, activation="relu", input_shape=(3,)), tf.keras.layers.Dense(1)])
test2=tf.keras.Sequential(layers=[tf.keras.layers.Dense(10, activation="relu", input_shape=(3,)), tf.keras.layers.Dense(10, activation="relu"), tf.keras.layers.Dense(1)])
test1.compile(loss="mse")
test2.compile(loss="mse")

In [10]:
test1.fit(X,y, batch_size=1)
test2.fit(X,y, batch_size=1)



<tensorflow.python.keras.callbacks.History at 0x1a939b26108>

# Let's compare with the test set.

In [15]:
from sklearn.metrics import mean_squared_error as mse

print(mse(y_test, np.reshape(model.call(X_test), -1)))
print(mse(y_test, test1(X_test)))
print(mse(y_test, test2(X_test)))

11.751248024717423
577.8288923440286
299.96931042965923


## Our model is clearly much better

However, there are a few things to consider, training time is much longer, however this can probably be optimized. We could also implement some ideas for batches that make computations faster by paralelization. 

To be fair, this model was designed for this sort of task in mind, but in general it can be used to look for different interactions between the variables that are non-linear. And although activation functions do this and are able to model many classes of functions it is interesting to see what can be found if we do not know much about our variables and task at hand.

Let us compare what happens if we did some more 

In [16]:
test1.fit(X,y, batch_size=1, epochs=9)
test2.fit(X,y, batch_size=1, epochs=9)

Epoch 1/9
Epoch 2/9
Epoch 3/9
Epoch 4/9
Epoch 5/9
Epoch 6/9
Epoch 7/9
Epoch 8/9
Epoch 9/9
Epoch 1/9
Epoch 2/9
Epoch 3/9
Epoch 4/9
Epoch 5/9
Epoch 6/9
Epoch 7/9
Epoch 8/9
Epoch 9/9


<tensorflow.python.keras.callbacks.History at 0x1a92ddbef48>

In [17]:
print(mse(y_test, test1(X_test)))
print(mse(y_test, test2(X_test)))

125.23955670380508
12.815976485974884


## But what happens with a test set outside of the domain, in this case out of range(0,10)?

In [26]:
X_test2=np.array([[random.randrange(100,200), random.randrange(100,200), random.randrange(100,200)] for i in range(10000)])
y_test2=np.array([X_test2[i,0]**2+2*X_test2[i,1]*X_test2[i,2] for i in range(10000)])

In [27]:
print(mse(y_test2, np.reshape(model.call(X_test2), -1)))
print(mse(y_test2, test1(X_test2)))
print(mse(y_test2, test2(X_test2)))

450406552376.2903
4308421073.077956
4121513846.260282


## After seeing that with bigger numbers the error is bigger than that of the bigger MLP and when it gets even bigger it even gets worse than the smaller one, I expect that the error is growing because we are multiplying two errors, one for the prediction of weights and then for the net. I would then expect that with more training we would see bigger improvements on our model.

In [None]:
model.fit(X,Y, epochs=9)

In [None]:
print(mse(y_test, np.reshape(model.call(X_test), -1)))
print(mse(y_test2, np.reshape(model.call(X_test2), -1)))