In [1]:
import tensorflow as tf
import numpy as np

In [2]:
array_tensor = tf.constant([[1,2,3], [4,5,6]])

In [3]:
tf.constant(42)

<tf.Tensor: shape=(), dtype=int32, numpy=42>

In [4]:
array_tensor.shape

TensorShape([2, 3])

In [5]:
array_tensor[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[2, 3],
       [5, 6]], dtype=int32)>

In [6]:
array_tensor[..., 1, tf.newaxis]

<tf.Tensor: shape=(2, 1), dtype=int32, numpy=
array([[2],
       [5]], dtype=int32)>

In [7]:
array_tensor + 10

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[11, 12, 13],
       [14, 15, 16]], dtype=int32)>

In [8]:
tf.square(array_tensor)

<tf.Tensor: shape=(2, 3), dtype=int32, numpy=
array([[ 1,  4,  9],
       [16, 25, 36]], dtype=int32)>

In [9]:
tf.transpose(array_tensor)

<tf.Tensor: shape=(3, 2), dtype=int32, numpy=
array([[1, 4],
       [2, 5],
       [3, 6]], dtype=int32)>

In [10]:
array_tensor @ tf.transpose(array_tensor)

<tf.Tensor: shape=(2, 2), dtype=int32, numpy=
array([[14, 32],
       [32, 77]], dtype=int32)>

In [11]:
from tensorflow import keras
K = keras.backend
K.square(tf.transpose(array_tensor)) + 10

<tf.Tensor: shape=(3, 2), dtype=int32, numpy=
array([[11, 26],
       [14, 35],
       [19, 46]], dtype=int32)>

In [12]:
a = np.array([2,4,5])

In [13]:
tf.constant(a, dtype=tf.float32)

<tf.Tensor: shape=(3,), dtype=float32, numpy=array([2., 4., 5.], dtype=float32)>

# Cannot add float to integer

In [14]:
tf.constant(2.) + tf.constant(40)

InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a int32 tensor [Op:AddV2]

In [15]:
t1 = tf.constant(40., dtype=tf.float64)
t2 = tf.constant(2.0) + tf.cast(t1, dtype=tf.float32)
t2

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

In [16]:
# Variables

In [17]:
v = tf.Variable([[1,2,3],[4,5,6]])

In [18]:
v.assign(v*2)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=int32, numpy=
array([[ 2,  4,  6],
       [ 8, 10, 12]], dtype=int32)>

In [19]:
v.scatter_nd_update(indices=[[0,0], [1,2]], updates=[100, 200])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=int32, numpy=
array([[100,   4,   6],
       [  8,  10, 200]], dtype=int32)>

# Implement huber loss

In [20]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    
    def get_config(self):
        base_config = super().get_config()
        new_config = {**base_config, "threshold": self.threadhold}

In [21]:
# Load model to test

In [24]:
elu_adam_model_test = keras.models.load_model("./elu_adam.h5", custom_objects={"HuberLoss": HuberLoss})

# Implement custom activation, initializer, regularizer, weights

In [25]:
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1.0)


def my_glorot_init(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(): # like relu weight
    return tf.where(weights < 0., tf.zeros_like(weights), weights)


In [26]:
layer = keras.layers.Dense(30,
                          activation=my_softplus,
                          kernel_initializer=my_glorot_init,
                          kernel_regularizer=my_l1_regularizer,
                          kernel_constraint=my_positive_weights)

In [27]:
class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        super(self, MyL1Regularizer).__init__()
        self.factor = factor
    
    def __call__(self, weights):
        tf.reduce_sum(tf.abs(self.factor * weights))
        
    def get_config(self):
        return {"factor": self.factor}

# Custom metrics

In [28]:
class HuberMetric(keras.metrics.Metric):

    def __init__(self, threshold=1.0, **kwargs):
        super(self, HuberMetric).__init__()
        self.huber_loss = HuberLoss(threshold, **kwargs)
        self.threshold = threshold
        self.total = tf.Variable(0.)
        self.count = tf.Variable(0.)
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_loss(y_true, y_pred)
        self.total.assign(self.total + tf.reduce_sum(metric))
        self.count.assign(self.count + tf.cast(tf.size(y_true), dtype=tf.float32))
        
    def result(self):
        return self.total/self.count
        
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# Custom Layer

In [29]:
# Model without weights
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [30]:
# Model with weight
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super(self, MyDense).__init__(kwargs)
        self.units = units
        # Get activation function from name
        self.activation = keras.activation.get(activation)
        
    def build(self, batch_input_shape):
        self.weights = self.add_weight(name="weights", initializer="glorot_normal", shape=[batch_input_shape[-1], units])
        # Same shape as units to add later
        self.bias = self.add_weight(nbame="bias", initializer="zeros", shape=[self.units])
        super().build()
                                     
    def call(self, X):
        return self.activation(X @ self.weights +  self.bias)
        
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
                                       
    def get_config(self):
        base_configs = super().get_config()
        # Cannot save activation as is, need to save as a function
        return {**base_configs, "units":self.units, "activation":keras.activations.serialize(self.activation)}

In [34]:
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X
        return [X1 + X2, X1 * X2, X1 / X2]
    
    def compute_output_shape(self, batch_input_shape):
        b1, b2 = batch_input_shape
        return [b1, b1, b1]

In [37]:
class MyGaussianLayer(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super(self, MyGaussianLayer).__init__(**kwargs)
        self.stddev = stddev
        
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal((tf.shapeX), stddev=self.stddev)
            return tf.add(X, noise)
        else:
            return X
        
    def computer_output_shape(self, batch_input_shape):
        return batch_input_shape

# Custom model with residual block

In [39]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_units, n_dense_layers, activation, **kwargs):
        super(self, ResidualBlock).__init__(**kwargs)
        self.hidden = []
    
    def build(self, batch_input_shape):
        for index in range(self.n_dense_layers):
            hidden_layer = tf.keras.layers.Dense(self.n_units, activation="elu", kernel_initializer="he_normal")
            self.hidden.append(hidden_layer)
        super().build()
        
    def call(self, X):
        Z = X
        for dense_layer in self.hidden:
            Z = dense_layer(Z)
        return X + Z
    
    def get_config(self):
        base_configs = super().get_config()
        return {**base_configs, "n_units":n_units, "n_dense_layers":n_dense_layers}

In [43]:
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.output_dim = output_dim
        
    def build(self, batch_input_shape):
        self.input_layer = tf.keras.layers.InputLayer(shape=batch_input_shape)
        self.dense_1  = tf.keras.layers.Dense(100, activation="elu", kernel_initializer="he_normal")
        self.resid_block_1 = ResidualBlock(100, 2)
        self.resid_block_2 = ResidualBlock(100, 2)
        self.output_layer = tf.keras.layers.Dense(output_dim, activation="sigmoid", kernel_initializer="he_normal")
        super().build(batch_input_shape)
    
    def call(self, X):
        Z = self.input_layer(X)
        Z = self.dense_1(Z)
        for i in range(1+3):
            Z = self.resid_block_1(Z)
        Z = self.resid_block_2(Z)
        Z = self.dense_2(Z)
        return Z
    
    def get_config(self):
        base_configs = self.get_config()
        configs = {
            **base_configs, 
            "input": tf.keras.layers.serialize(self.input_layer),
            "dense_1": tf.keras.layers.serialize(self.dense_1),
            "resid_block_1": tf.keras.layers.serialize(self.resid_block_1),
            "resid_block_2": tf.keras.layers.serialize(self.resid_block_2),
            "output": tf.keras.layers.serialize(self.output_layer)
        }
        return configs
        

In [44]:
# Custom Reconstruction Loss
class ReconRegressor(keras.Model):
    def __init__(self, num_dense, output_dim, recon_loss_scale=0.05, **kwargs):
        super().__init__(**kwargs)
        self.num_dense = num_dense
        self.recon_loss_scale = recon_loss_scale
        self.output_dim = output_dim
        self.mean_recon_error = keras.metrics.Mean(name="recon_error", dtype=tf.float32)
    
    def build(self, batch_input_shape):
        layers = []
        input_layer = tf.keras.layers.InputLayer(batch_input_shape)
        layers.append(input_layer)
        for i in self.num_dense:
            if i < self.num_dense-1:
                dense_layer = tf.keras.layers.Dense(100, activation="elu", kernel_initializer="he_normal")(Z)
                layers.self.append(dense_layer)
            else:
                dense_layer = tf.keras.layers.Dense(self.output_dim, activation="elu", kernel_initializer="he_normal")(Z)
                layers.self.append(dense_layer)
                
        self.recon = tf.keras.layers.Dense(batch_input_shape[-1], activation="sigmoid", kernel_initializer="he_normal")
        self.layers = layers
        super().build(batch_input_shape)
    
    def call(self, X):
        Z = X
        for layer in self.layers:
            Z = layer(Z)
        recon_val = self.recon(Z)
        recon_loss = tf.Variable(self.recon_loss_scale * tf.reduce_mean(tf.square(recon_val - X)))
        self.add_loss(recon_loss)
        self.add_metric(mean_recon_error(recon_loss))
        return Z

In [45]:
def f(w1, w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [47]:
w1, w2 = 5, 3
eps = 1e-6
(f(w1 + eps, w2) - f(w1, w2)) / eps

36.000003007075065

In [48]:
(f(w1, w2 + eps) - f(w1, w2)) / eps

10.000000003174137

In [55]:
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)

tz_gradients_1 = tape.gradient(z, [w1, w2])
tz_gradients_1

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [54]:
# Don't call gradient twice
tz_gradients_2 = tape.gradient(z, [w1, w2])

RuntimeError: GradientTape.gradient can only be called once on non-persistent tapes.

In [57]:
# To call gradient many times, use persistent=True
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)
    
dz_gradient_1 = tape.gradient(z, [w1, w2])
dz_gradient_2 = tape.gradient(z, [w1, w2])

# have to delete manually if use persistent

In [59]:
# Not work if not a variable
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[None, None]

In [62]:
# Same thing but watches the tensor to calculate graident as normal
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)
    
watched_gradient = tape.gradient(z, [c1, c2])

In [63]:
watched_gradient

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [64]:
# Todo research jacobian and reverse-mode autodiff

In [66]:
def f_ver_2(w1, w2):
    z = 3*w1**2 + tf.stop_gradient(2*w1*w2)
    return z 

w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f_ver_2(w1, w2)

new_gradient = tape.gradient(z, [w1, w2])
new_gradient

[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]

In [68]:
x = tf.Variable([100.])
with tf.GradientTape() as tape:
    z = my_softplus(x)
softplus_gradient = tape.gradient(z, [x])
softplus_gradient

[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>]

In [69]:
@tf.custom_gradient
def my_better_softplus(x):
    exp = exp(x)
    result = tf.math.log(tf.exp(x) + 1.0)
    def calc_gradient(gradient):
        return gradient / (1 + 1 + exp)
    return result