In [1]:
import tensorflow as tf
from tensorflow import keras
import numpy as np

In [3]:
# A tensor
t = tf.constant([[1, 2, 3], [4, 5, 6]])
t

<tf.Tensor: id=1, shape=(2, 3), dtype=int32, numpy=
array([[1, 2, 3],
       [4, 5, 6]])>

In [4]:
t.shape, t.dtype   # shape and data type

(TensorShape([2, 3]), tf.int32)

In [5]:
t[:, 1:]   # indexing

<tf.Tensor: id=5, shape=(2, 2), dtype=int32, numpy=
array([[2, 3],
       [5, 6]])>

In [6]:
t+10, tf.square(t), t@tf.transpose(t)   #operations

(<tf.Tensor: id=7, shape=(2, 3), dtype=int32, numpy=
 array([[11, 12, 13],
        [14, 15, 16]])>, <tf.Tensor: id=8, shape=(2, 3), dtype=int32, numpy=
 array([[ 1,  4,  9],
        [16, 25, 36]])>, <tf.Tensor: id=11, shape=(2, 2), dtype=int32, numpy=
 array([[14, 32],
        [32, 77]])>)

In [8]:
tf.cast(t, tf.float32)   # type conversion

<tf.Tensor: id=12, shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [9]:
v = tf.Variable([[1, 2, 3], [4, 5, 6]])   # variable
v

<tf.Variable 'Variable:0' shape=(2, 3) dtype=int32, numpy=
array([[1, 2, 3],
       [4, 5, 6]])>

In [10]:
# Ways to modify variable
v.assign(2*v)
v[0, 1].assign(42)
v[:, 2].assign([0, 1])
v.scatter_nd_update(indices=[[0,0], [1, 2]], updates=[100, 200])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=int32, numpy=
array([[100,  42,   0],
       [  8,  10, 200]])>

In [11]:
# Custom Algorithms

In [14]:
# Custom Loss Functions
#(Huber loss with threshold param)
def create_huber(threshold=1):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold*tf.abs(error) - threshold**2 / 2

        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

#Compile in a model
#model.compile(loss=create_huber(2), optimizer='nadam')

In [None]:
# Loading a model with custom loss
model = keras.models.load_model('model.h5', 
                               custom_objects={'huber_fn': create_huber(2)})   # provides dictionary of custom objects

In [None]:
#(using function means having to specify threshold when loading model,
#alternatively to save the threshold to the model too, subclass keras.losses.Loss)

In [15]:
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    
    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold*tf.abs(error) - threshold**2 / 2
        
        return tf.where(is_small_error, squared_loss, linear_loss)
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'threshold': self.threshold}

In [16]:
#model.compile(loss=HuberLoss(2), optimizer='nadam')   # compile with new class
#model = keras.models.load_model('model.h5',
#                               custom_objects={'HuberLoss': HuberLoss})   # load model

In [17]:
# Custom Activation Function
#(equivalent of keras softplus)
def my_softplus(z):
    return tf.math.log(tf.exp(z) + 1)

In [18]:
# Custom Initializer
#(equivalent of keras glorot_normal())
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2/(shape[0], shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

In [19]:
# Custom Regularizer
#(equivalent to keras l1)
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01*weights))

In [20]:
# Custom Constraints
#(equivalent to keras nonneg())
def my_pos_weights(weights):
    return tf.where(weights<0, tf.zeros_like(weights), weights)

In [None]:
layer = Dense(30, activation=my_softplus,
             kernel_initializer=my_glorot_initializer,
             kernel_regularizer=my_l1_regularizer,
             kernel_constraint=my_pos_weights)

In [21]:
#(for subclassing losses, layers, activations, the method is cal(),
#for regularizers, initializers, and contraints its __call__())

In [23]:
# Custom Metrics
#(huber loss, calculated as a streaming metric throught the batches)
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight('total', initializer='zeros')
        self.count = self.add_weight('count', initializer='zeros')
    
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    
    def result(self):
        return self.total / self.count
    
    def get_config(self):
        base_config = super().get_config()
        return{**base_config, 'threshold': self.threshold}

In [24]:
# Custom Layers

In [25]:
# Weightless Layers
#(transformation layers like Flatten or ReLU)
#(exponential activation)
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

In [26]:
# Layers with Weights
#(basic dense layer)
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
        
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(name='kernel',
                                      shape=[batch_input_shape[-1], self.units],
                                      initializer='glorot_normal')
        self.bias = self.add_weight(name='bias',
                                   shape=[self.units],
                                   initializer='zeros')
        super().build(batch_input_shape)
    
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    
    def compute_output_shape(self, bath_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1] + [self.units])
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'units': self.units,
               'activation': keras.activations.serialize(self.activation)}

In [27]:
# Layers with multiple inputs and outputs
#(for non sequential neural net)
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):   # pass a tuple of inputs
        X1, X2 = X
        return [X1 + X2, X1 * X2, X1 / X2]
    
    def compute_output_shape(self, batch_input_shape):   # pass a tuple of input batch shape
        b1, b2 = batch_input_shape
        return [b1, b1, b1]

In [28]:
# Layers with different behaviours in training and testing
#(for batchnorm and dropout)
#(adds Gaussian noise during training)
class MyGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    
    def call(self, X, training=None):
        if training:   # specify behaviour during training
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
        else:
            return X
    
    def ompute_output_shape(self, batch_input_shape):
        return batch_input_shape

In [4]:
# Residual block layer
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.n_layers = n_layers
        self.n_neurons = n_neurons
        # store n_layers dense layers in hidden attribute
        self.hidden = [keras.layers.Dense(n_neurons, activation='relu',
                                         kernel_initializer='he_normal')
                      for _ in range(n_layers)]
    
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'n_layers': self.n_layers,
                               'n_neurons': slef.n_neurons}

In [13]:
# Custom Models
from tensorflow.keras.utils import plot_model

In [11]:
#(example custom model)
class ResidualRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation='elu',
                                         kernel_initializer='he_normal')
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)
        
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1+3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)
    
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'output_dim': self.output_dim}

In [18]:
model = ResidualRegressor(1)
model.compile(loss='mse', optimizer='nadam')

In [19]:
# Losses and metrics of model internals
#(weights and activations as opposed to predictions)

In [21]:
# Model with reconstruction loss as an auxiliary output
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_sim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation='selu',
                                         kernel_initializer='lecun_normal')
                      for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        self.reconstruction_mean = keras.metrics.Mean(name='reconstruction_error')   # error object to show during training
        
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)   # creates an extra dense layer to reconstruct the inputs
        super().build(batch_input_shape)
    
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruction(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05*recon_loss)
        if training:   # shows custom loss
            result = self.reconstruction_mean(recon_loss)
            self.add_metrics(result)
        return self.out(Z)

In [22]:
# Autodif

In [26]:
# autodiff in tensorflow
#(f is example function)
def f(w1, w2):
    return 3*w1**2 + 2*w1*w2

w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)
    
gradients = tape.gradient(z, [w1, w2])   #calling gradient will erase the tape
gradients

[<tf.Tensor: id=77, shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: id=69, shape=(), dtype=float32, numpy=10.0>]

In [27]:
# Persistant taping to call gradient many times
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2)
del tape   #delete after use to save resource
dz_dw1, dz_dw2

(<tf.Tensor: id=101, shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: id=106, shape=(), dtype=float32, numpy=10.0>)

In [30]:
# Forces the tape to compute non-variable
#(useful for computing model internals)
c1, c2 = tf.constant(5.), tf.constant(3.)   # not variables
with tf.GradientTape() as tape:
    tape.watch(c1)   # force to watch the c1 tensor
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[<tf.Tensor: id=161, shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: id=153, shape=(), dtype=float32, numpy=10.0>]

In [31]:
# Stop backprop for some parts of a model
def f(w1, w2):
    return 3*w1**2 + tf.stop_gradient(2*w1*w2)   #only allows forward pass

with tf.GradientTape() as tape:
    z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])
gradients

[<tf.Tensor: id=181, shape=(), dtype=float32, numpy=30.0>, None]

In [32]:
# Using specified derivative function
#(for functions that can return NaN like softplus)
@tf.custom_gradient   #tells tf to use custom function
def my_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1/exp)
    return tf.math.log(exp + 1), my_softplus_gradients

In [33]:
# Custom Training Loop

In [38]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

In [36]:
#(example model set up)
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.regularizers import l2

# model architecture
l2_reg = l2(0.05)
model = Sequential([
    Dense(30, activation='elu', kernel_initializer='he_normal',
         kernel_regularizer=l2_reg),
    Dense(1, kernel_regularizer=l2_reg)
])

# function to randomly sample training set
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

# Training status bar
def print_status_bar(iter, total, loss, metrics=None):
    metrics = ' - '.join(['{}: {:.4f}'.format(m.name, m.result())
                    for m in [loss] + (metrics or [])])
    end = '' if iter < total else '\n'
    print('\r{}/{} - '.format(iter, total) + metrics, end=end)

In [39]:
# hyperparam set up
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [42]:
# Custom loop
for epoch in range(1, n_epochs+1):  #epoch loop
    print('Epoch {}/{}'.format(epoch, n_epochs))
    for step in range(1, n_steps+1):   #batch loop
        X_batch, y_batch = random_batch(X_train, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch, training=True)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        # can perform gradients transformation at this step
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        for variable in model.variables:   # perform weight constraint
            if variable.constraint is not None:
                variable.assign(variable.constriant(variable))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()

Epoch 1/5


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

11610/11610 - mean: 4966.7681 - mean_absolute_error: 12.43142
Epoch 2/5
11610/11610 - mean: 75.5766 - mean_absolute_error: 2.8063
Epoch 3/5
11610/11610 - mean: 1528.1996 - mean_absolute_error: 7.9507
Epoch 4/5
11610/11610 - mean: 4.4117 - mean_absolute_error: 1.1636
Epoch 5/5
11610/11610 - mean: 426.5146 - mean_absolute_error: 6.2395


In [43]:
# TensorFlow Functions and Graphs

In [2]:
# Convert a function into tensorflow(tm) function
#(tensorflow will optimizes the computational stuff)
def cube(x):
    return x**3

tf_cube = tf.function(cube)

In [3]:
#alternatively and more common way
@tf.function
def tf_cube(x):
    return x**3

In [4]:
#(computation graphs are reused for tensors with same shape)
#(new graph is generated for each Python value passed, only use Python value for arguements with few unique values)

In [5]:
# Autograph and Tracing

In [7]:
tf.autograph.to_code(tf_cube.python_function)   #the source code read by tensorflow

"def tf__tf_cube(x):\n  do_return = False\n  retval_ = ag__.UndefinedReturnValue()\n  with ag__.FunctionScope('tf_cube', 'tf_cube_scope', ag__.ConversionOptions(recursive=True, user_requested=True, optional_features=(), internal_convert_user_code=True)) as tf_cube_scope:\n    do_return = True\n    retval_ = tf_cube_scope.mark_return_value(x ** 3)\n  do_return,\n  return ag__.retval(retval_)\n"

In [None]:
# Side notes
#(graphs will only have TensorFlow constructs/does not include calling library)
#(it is preferable to create variables with something like build() method and update with assign instead of using '=')
#(only for loop over a tensor/dataset will be graphed)
#(use vectorized calculations over loops)