In [3]:
import tensorflow as tf

arr = tf.constant([[1.,2.,3.],[4.,5.,6.]])
num = tf.constant(42)
print(arr.shape)
print(num.shape)

(2, 3)
()


In [4]:
print(arr[:, 1:])
print(arr[..., 1, tf.newaxis])

tf.Tensor(
[[2. 3.]
 [5. 6.]], shape=(2, 2), dtype=float32)
tf.Tensor(
[[2.]
 [5.]], shape=(2, 1), dtype=float32)


In [5]:
print(arr+10)

tf.Tensor(
[[11. 12. 13.]
 [14. 15. 16.]], shape=(2, 3), dtype=float32)


In [6]:
print(tf.square(arr))

tf.Tensor(
[[ 1.  4.  9.]
 [16. 25. 36.]], shape=(2, 3), dtype=float32)


In [7]:
print(tf.transpose(arr))

tf.Tensor(
[[1. 4.]
 [2. 5.]
 [3. 6.]], shape=(3, 2), dtype=float32)


In [8]:
# keras has a low level api too

from tensorflow import keras

k = keras.backend
print(k.square(k.transpose(arr)) + 10)

tf.Tensor(
[[11. 26.]
 [14. 35.]
 [19. 46.]], shape=(3, 2), dtype=float32)


In [10]:
# you can use tensors with numpy
# np uses 64 bit, but tf uses 32 bit, so when converting set the dtype
import numpy as np

arr2 = np.array([2.,4.,5.])
print(tf.constant(arr2, dtype=tf.float32))
print(arr.numpy())
print(tf.square(arr2))
print(np.square(arr))

tf.Tensor([2. 4. 5.], shape=(3,), dtype=float32)
[[1. 2. 3.]
 [4. 5. 6.]]
tf.Tensor([ 4. 16. 25.], shape=(3,), dtype=float64)
[[ 1.  4.  9.]
 [16. 25. 36.]]


In [None]:
# type conversions are expensive and not automatic, will throw an exception if you don't do it manually

tf.constant(2.) + tf.constant(40, dtype=tf.float32)

# or can do a cast
x = tf.constant(40, dtype=tf.float64)
tf.constant(2.0) + tf.cast(x, tf.float32)

In [12]:
# all of these are constants and can't be used in a neural net, for that we need variables

v = tf.Variable([[1.,2.,3.],[4.,5.,6.]])

In [None]:
# modifying a variable
v.assign(2 * v)
v[0, 1].assign(42)
v[:, 2].assign([0., 1.])
# updating multiple values at once with scatter
v.scatter_nd_update(indices=[[0,0],[1,2]], updates=[100., 200.])

In [None]:
# custom huber loss fn, quadratic when small, linear when big, so it's good for outliers

def huber_fn(y_true, y_pred):
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

# now you can pass this loss into your model

model.compile(loss=huber_fn, optimizer="nadam")
model.fit(x_train, y_train)

# when you load this model again though all its custom objects will need to be mapped

model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn": huber_fn})

In [None]:
# creating a configured loss function

def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2/2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam")

# specify the threshold when you reload the model, it isn't saved
model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"huber_fn": create_huber(2.0)})

In [None]:
# can solve this by subclassing keras.loss.Loss and implementing get_config

class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2/2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold" : self.threshold}

    
# use it the same way

model.compile(loss=HuberLoss(2.0), optimizer="nadam")

model = keras.models.load_model("my_model_with_a_custom_loss.h5", custom_objects={"HuberLoss": HuberLoss)})

In [None]:
# custom activations, initializers, regularizers, and constraints

def my_softplus(z):
    return tf.math.log(tf.exp(z)+1.0)
def my_glorot_initializer(shaper, dtype=tf.float32):
    stddev = tf.sqrt(2./ (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01 * weights))
def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

# arguments depend on the function

layer = keras.layers.Dense(30, 
                           activation=my_softplus, 
                           kernel_initializer=my_glorot_initializer,
                           kernel_regularizer=my_l1_regularizer,
                           kernel_constraint=my_positive_weights)

# if you want to save the hyperparameters you need to subclass the class
# call for losses, layers, models
# __call__ for regularizers, initializers, constraints

class MyL1Regularizer(keras.regularizers.Regularizer):
    def __init__(self, factor):
        self.factor = factor
    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))
    def get_config(self):
        return {"factor":self.factor}

In [None]:
# we can also use our custom classes as metrics

model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

# example precision metric

precision = keras.metrics.Precision()

# can pass it labels and predictions

precision([0,1,1,1,0,1,0,1],[1,1,0,1,0,1,0,1]) # would return .8
precision([0,1,0,0,1,0,1,1],[1,0,1,1,0,0,0,0]) # now it would return .5

# streaming metric it returns the overall precision of everything passed to it
# when you are ready you can call it
precision.result()
precision.variables

In [None]:
# creating a custom metric

class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight("total", initializer="zeros") # add_weight creates variables needed
        self.count = self.add_weight("count", initializer="zeros") # to keep track of the state over many batches
    def update_state(self, y_true, y_pred, sample_weight=None): # update_state used when you use an instance of 
        metric = self.huber_fn(y_true, y_pred)                  # the class as a function. updates variables for batch
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self): # computes and returns final result
        return self.total / self.count
    def get_config(self): # ensures model is saved correctly
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

In [None]:
# simple custom layer that applies exp function to its inputs

exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [14]:
# implementation of dense layer

class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    def build(self, batch_input_shape):
        self.kernel = self.add_weight( # builds the layers variables with add_weight
            name="kernel", 
            shape=[batch_input_shape[-1], self.units],
            initializer="glorot_normal")
        self.bias = self.add_weight(
            name="bias",
            shape=[self.units],
            initializer="zeros")
        super().build(batch_input_shape)
    def call(self, x): # performs the calculation, returns the result
        return self.activation(x @ self.kernel + self.bias) # the @ here is a matrix multiplication operation
    def compute_output_shape(self, batch_input_shape): # returns shape of the layers outputs
        return tf.TensorShape(batch_input_shape.as_list()[:-1])
    def get_config(self):
        base_config = super().get_config()

In [None]:
# implementation of layer with multiple inputs

class MyMultiLayer(keras.layers.Layers):
    def call(self, x): # since this is multi-input, the param is a tuple
        x1, x2 = x
        return [x1+x2, x1*x2, x1/x2] # returns the list of output
    def compute_output_shape(self, batch_input_shape): # returns list of output shapes
        b1, b2 = batch_input_shape
        return [b1, b1, b1]

In [None]:
# if you want different behavior during training and testing, add a training parameter
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    def call(self, x, training=None):
        if training:
            noise = tf.random.normal(tf.shape(x), stddev=self.stddev)
            return x + noise
        else:
            return x
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape  

In [None]:
# can build models with loops and skip connections

class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs):
            self.hidden = [keras.layers.Dense(n_neurons,activation="elu",kernel_initializer="he_normal")
                           for _ in range(n_layers)]
    def call(self, inputs):
        z = inputs
        for layer in self.hidden:
            z = layer(z)
        return inputs + z
    
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
        
    def call(self, inputs):
        z = self.hidden1(inputs)
        for _ in range(1+3):
            z = self.block1(z)
        z = self.block2(z)
        return self.out(z)
    
    # we would need to implement get_config for saving and loading the weights of a trained version of this model

In [None]:
# custom loss based on model internals

class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30,activation="selu",kernel_initializer="lecun_normal")
                           for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(batch_input_shape)
        
    def call(self, inputs):
        z = inputs
        for layer in self.hidden:
            z = layer(z)
        reconstruction = self.reconstruct(z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs)) # Computes the mean of elements across dimensions of a tensor.
        # when we define a custom loss pass it to add_loss
        # for a custom metric do the same and pass it to add_metric
        self.add_loss(0.05 * recon_loss)
        return self.out(z)

def f(w1,w2):
    return 3 * w1 ** 2 + 2 * w1 * w2

In [None]:
def f(w1, w2):
    return 3 * w1 ** 2+2 * w1 * w2

w1, w2 = tf.Variable(5.), tf.Variable(3.) # making our variables
with tf.GradientTape() as tape: # records a tape of every computation
    z = f(w1,w2)
    
gradients = tape.gradient(z, [w1, w2]) # gets the gradient, can only call once since it deletes the tape

# can make the tape persistent

with tf.GradientTape(persistent=True) as tape:
    z = f(w1,w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)

del tape

c1, c2 = tf.Constant(5.), tf.Constant(3.) # making our variables
with tf.GradientTape() as tape:
    z = f(c1,c2)
gradients = tape.gradient(z, [c1, c2]) # [None, None] cannot use the tape with constant

# unless you tell it to watch them

with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1,c2)
    
gradients = tape.gradient(z, [c1, c2]) # this works

# if you want to stop the gradients from backpropagating

def f(w1, w2):
    return 3 * w1 ** 2 + tf.stop_gradients(2*w1*w2) # stop_gradient 

with tf.GradientTape() as tape:
    z = f(w1,w2)

In [None]:
# getting the gradients of softplus you can run into issues
# you can avoid these by implementing a custom one

@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    return tf.math.log(exp + 1), my_softplus_gradients

In [None]:
# you can also write a custom training method if fit() doesn't do what you want

l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, 
                       activation="elu",
                       kernel_initializer="he_normal",
                       kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
                      
])

# sampling function

def random_batch(x, y, batch_size=32):
    idx = np.random.randint(len(x), size=batch_size)
    return x[idx], y[idx]

def print_status_bar(iteration, total, loss, metrics=None):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                         for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{}/{} - ".format(iteration, total) + metrics, end=end)

n_epochs = 5
batch_size = 32
n_steps = len(x_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

# building our custom training loop

for epoch in range(1, n_epochs + 1):
    print("Epoch {}/{}".format(epoch, n_epochs))
    for step in range(1, n_steps+1)
        x_batch, y_batch = random_batch(x_train_scaled, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(x_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss]+model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # get the gradients and use for descent
        for variabel in model.variables: # this is if you apply constraints to the model
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step*batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

In [18]:
# all these do the same

def cube(x):
    return x**3

cube(tf.constant(2.0))

tf_cube=tf.function(cube)

tf_cube(2)

@tf.function
def tf_cube(x):
    return x ** 3

# can call the original python function
tf_cube.python_function(2)

8