# Modelos Personalizados com TensorFlow

In [1]:
## Imports
import tensorflow as tf
import numpy as np
from tensorflow import keras
K = keras.backend

In [1]:
## Dataset
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_test_scaled = scaler.transform(X_test)

## Usando o TensorFlow como NumPy

### Tensores e Operações

#### Tensores

In [5]:
tf.constant([[1., 2., 3.,], [4., 5., 6.]]) # Matriz

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)>

In [3]:
tf.constant(42) # Escalar

<tf.Tensor: shape=(), dtype=int32, numpy=42>

#### Indexação

In [6]:
t = tf.constant([[1., 2., 3.,], [4., 5., 6.]])
t[:, 1:]

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
       [5., 6.]], dtype=float32)>

#### Operações

In [7]:
t + 10

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
       [14., 15., 16.]], dtype=float32)>

In [8]:
tf.square(t)

<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1.,  4.,  9.],
       [16., 25., 36.]], dtype=float32)>

In [9]:
t @ tf.transpose(t) # multiplicação de matriz

<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
       [32., 77.]], dtype=float32)>

#### Keras Backend

In [10]:
t = K.constant([[1, 1, 1], [2, 2, 2]])
K.square(K.transpose(t)) + 10

<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 14.],
       [11., 14.],
       [11., 14.]], dtype=float32)>

### Tensores e NumPy

In [12]:
a = np.array([2., 3., 4.])
tf.constant(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([2., 3., 4.])>

In [13]:
t.numpy()

array([[1., 1., 1.],
       [2., 2., 2.]], dtype=float32)

In [14]:
np.array(t)

array([[1., 1., 1.],
       [2., 2., 2.]], dtype=float32)

In [15]:
tf.square(a)

<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4.,  9., 16.])>

In [16]:
np.square(t)

array([[1., 1., 1.],
       [4., 4., 4.]], dtype=float32)

### Conversões de Tipos
- Os tipos tem que ser compatíveis para se fazer operações

In [17]:
try:
    tf.constant(2.0) + tf.constant(40., dtype=tf.float64)
except tf.errors.InvalidArgumentError as ex:
    print(ex)

cannot compute AddV2 as input #1(zero-based) was expected to be a float tensor but is a double tensor [Op:AddV2]


In [18]:
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)

<tf.Tensor: shape=(), dtype=float32, numpy=42.0>

### Variáveis

In [20]:
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v.assign(2 * v)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2.,  4.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [22]:
v[0, 1].assign(42)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  6.],
       [ 8., 10., 12.]], dtype=float32)>

In [23]:
v[:, 2].assign([0., 1.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[ 2., 42.,  0.],
       [ 8., 10.,  1.]], dtype=float32)>

In [24]:
v.scatter_nd_update(indices=[[0, 0], [1, 2]],
                    updates=[100., 200.])

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[100.,  42.,   0.],
       [  8.,  10., 200.]], dtype=float32)>

In [26]:
sparse_delta = tf.IndexedSlices(values=[[1., 2., 3.], [4., 5., 6.]],
                                indices=[1, 0])
v.scatter_update(sparse_delta)

<tf.Variable 'UnreadVariable' shape=(2, 3) dtype=float32, numpy=
array([[4., 5., 6.],
       [1., 2., 3.]], dtype=float32)>

### Outras Estruturas de Dados
- Há outros tipos de dados: conjuntos, listas de tensores, strings, etc. Consultar o livro e o GitHub se algum dia eu precisar usar.

## Personalizando Modelos

### Função Perda Personalizada

In [6]:
## Huber Loss
def huber_fn(y_true, y_pred):
    error = y_true, y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error)/2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

input_shape = X_train.shape[1:]

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal",
                       input_shape=input_shape),
    keras.layers.Dense(1),
])

model.compile(loss=huber_fn, optimizer="nadam", metrics=["mae"])
model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1c6f3d38b20>

### Salvando e Carregando os Modelos
- É necessário fornecer um dicionário mapeando os nomes aos objetos

In [6]:
## Novo Modelo (Huber Personalizado)
def create_huber(threshold=1.0):
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

model.compile(loss=create_huber(2.0), optimizer="nadam", metrics=["mae"])

model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x28729fcc970>

In [6]:
## Salvando e Carregando o Modelo
model.save("my_model_with_a_custom_loss.h5")
model = keras.models.load_model("my_model_with_a_custom_loss.h5",
                                custom_objects={"huber_fn": create_huber(2.0)})

model.fit(X_train_scaled, y_train, epochs=2,
          validation_data=(X_valid_scaled, y_valid))

Epoch 1/2
Epoch 2/2


<tensorflow.python.keras.callbacks.History at 0x1d8a48e2880>

In [7]:
## Classe
class HuberLoss(keras.losses.Loss):
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)
    def call(self, y_true, y_predict):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss  = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'threshold': self.threshold}
    
model.compile(loss=HuberLoss(2.), optimizer="nadam", metrics=["mae"])
model.loss.threshold

2.0

### Funções de Ativação, Inicializadores e Regularizadores
- Se usar classes é call() para losses, layers (inclui função de ativação) e modelos e __ call__() para regularizadores, inicializadores e restrições.

In [13]:
## Função de ativação: equivalente à keras.activations.softplus()
def my_softplus(z):
    return tf.math.log(tf.exp(z)+1)

In [9]:
## Inicializador dos pesos: equivalente à keras.initializers.glorot_normal()
def my_glorot_initializer(shape, dtype=tf.float32):
    stddev = tf.sqrt(2. / shape[0] + shape[1])
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

In [10]:
## Regularizador: equivalente à keras.regularizers.l1(0.01)
def my_l1_regularizer(weights):
    return tf.reduce_sum(tf.abs(0.01*weights))

In [11]:
## Restrições dos pesos: equivalente à keras.constraints.nonneg()
def my_positive_weights(weights):
    return tf.where(weights < 0., tf.zeros_like(weights), weights)
    # poderia se usar tf.nn.relu(weights)

In [14]:
## Tudo junto
layer = keras.layers.Dense(30, activation=my_softplus, kernel_initializer=my_glorot_initializer, 
                           kernel_regularizer=my_l1_regularizer, kernel_constraint=my_positive_weights)

### Métricas Personalizadas
- Métricas são facilmente interpretáveis e não necessariamente diferenciáveis. Já a função perda não necessariamente é facilmente interpretável, mas deve ser diferenciável, pois é a partir dela que calculamos o gradiente.
- Definir a métrica é igual definir uma função perda, como ilustra o código abaixo.
- A métrica é calculada para cada lote e, após o final da época, se calcula a média.

In [7]:
model.compile(loss="mse", optimizer="nadam", metrics=[create_huber(2.0)])

#### Streaming Metrics
- Algumas vezes não queremos simplesmente fazer a média, como no caso da precisão, mas gradualmente atualizar o seu valor, lote após lote.

##### Exemplo: Precisão

In [13]:
precision = keras.metrics.Precision()
precision([1, 1, 1, 1, 0], [1, 1, 1, 1, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [14]:
precision.result()

<tf.Tensor: shape=(), dtype=float32, numpy=0.8>

In [15]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([1.], dtype=float32)>]

In [16]:
precision([0, 0, 0], [1, 1, 1])

<tf.Tensor: shape=(), dtype=float32, numpy=0.5>

In [17]:
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]

In [18]:
precision.reset_states()
precision.variables

[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>,
 <tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>]

##### Criando a própria métrica de streaming

In [19]:
class HuberMetric(keras.metrics.Metric):
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        self.total = self.add_weight('total', initializer='zeros') # soma
        self.count = self.add_weight('count', initializer='zeros') # número de instâncias
    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
    def result(self):
        return self.total/self.count
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'threshold': self.threshold}

### Camadas Personalizadas

#### Sem Parâmetros: Lambda

In [21]:
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))

model = keras.models.Sequential([
    keras.layers.Dense(30, activation="relu", input_shape=input_shape),
    keras.layers.Dense(1),
    exponential_layer
])
model.compile(loss="mse", optimizer="sgd")

#### Com Parâmetros: Subclasse

In [2]:
class MyDense(keras.layers.Layer):
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)
    def build(self, batch_input_shape):
        self.kernel = self.add_weight(name='kernel', shape=[batch_input_shape[-1], self.units],
                                     initializer='glorot_normal')
        self.bias = self.add_weight(name='bias', shape=[self.units], initializer='zeros')
        super().build(batch_input_shape)
    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)
    def compute_output_shape(self, batch_input_shape):
        return tf.TensorShape(batch_input_shape.as_list()[:-1]+[self.units])
    def get_config(self):
        base_config = super().get_config()
        return {**base_config, 'units': self.units, 'activation': keras.activations.serialize(self.activation)}
    

In [7]:
model = keras.models.Sequential([
    MyDense(30, activation="relu", input_shape=input_shape),
    MyDense(1)
])

#### Múltiplas Entradas ou Saídas

In [8]:
class MyMultiLayer(keras.layers.Layer):
    def call(self, X):
        X1, X2 = X ## Recebe uma tupla como entrada
        return X1 + X2, X1 * X2

    def compute_output_shape(self, batch_input_shape):
        batch_input_shape1, batch_input_shape2 = batch_input_shape
        return [batch_input_shape1, batch_input_shape2]

#### Comportamento Diferente: Treinamento/Predição

In [11]:
class AddGaussianNoise(keras.layers.Layer):
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev
    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X
    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

### Modelos personalizados

In [18]:
class ResidualBlock(keras.layers.Layer):
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(n_neurons, activation='elu', kernel_initializer='he_normal')
                       for _ in range(n_layers)]
    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

In [19]:
class ResidualRegressor(keras.models.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden1 = keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal")
        self.block1 = ResidualBlock(n_layers=2, n_neurons=30)
        self.block2 = ResidualBlock(n_layers=2, n_neurons=30)
        self.out = keras.layers.Dense(output_dim)
    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

In [24]:
## Treinando o Modelo
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = ResidualRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=5)
score = model.evaluate(X_test_scaled, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [25]:
## Salvando e Carregando o Modelo
model.save("my_custom_model.ckpt")
model2 = keras.models.load_model("my_custom_model.ckpt")
score = model2.evaluate(X_test_scaled, y_test)
history = model2.fit(X_train_scaled, y_train, epochs=5)
score = model2.evaluate(X_test_scaled, y_test)



INFO:tensorflow:Assets written to: my_custom_model.ckpt\assets


INFO:tensorflow:Assets written to: my_custom_model.ckpt\assets


Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


### Função Perda e Métricas Baseadas nos Parâmetros do Modelo

In [26]:
class ReconstructingRegressor(keras.Model):
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.hidden = [keras.layers.Dense(30, activation="selu", kernel_initializer="lecun_normal")
                       for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)
        self.reconstruction_mean = keras.metrics.Mean(name='reconstruction_error')
    def build(self, batch_input_shape):
        n_inputs = batch_input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
    def call(self, inputs, training=None):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction-inputs))
        self.add_loss(0.05*recon_loss)
        if training:
            result = self.reconstruction_mean(recon_loss)
            self.add_metric(result)
        return self.out(Z)

In [27]:
## Treinando o Modelo
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = ReconstructingRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)

Epoch 1/2
Epoch 2/2


### Computando Gradientes com Autodiff

In [14]:
## Exemplo Básico
def f(w1, w2):
    return 3*w1**2+2*w1*w2

w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
    z = f(w1, w2)

gradients = tape.gradient(z, [w1, w2])
print(gradients)

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>, <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]


In [15]:
## Tornando o Gradiente persistente
with tf.GradientTape(persistent=True) as tape:
    z = f(w1, w2)

dz_dw1 = tape.gradient(z, w1)
dz_dw2 = tape.gradient(z, w2) # works now!
print(dz_dw1, dz_dw2)
del tape

tf.Tensor(36.0, shape=(), dtype=float32) tf.Tensor(10.0, shape=(), dtype=float32)


In [16]:
## Caso seja uma constante, usar tape.watch()
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
    tape.watch(c1)
    tape.watch(c2)
    z = f(c1, c2)

gradients = tape.gradient(z, [c1, c2])
gradients

[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
 <tf.Tensor: shape=(), dtype=float32, numpy=10.0>]

In [17]:
## Caso queira parar a retropropagação
def f(w1, w2):
    return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)

with tf.GradientTape() as tape:
    z = f(w1, w2)

print(z) # mesmo resultado
print(tape.gradient(z, [w1, w2]))

tf.Tensor(105.0, shape=(), dtype=float32)
[<tf.Tensor: shape=(), dtype=float32, numpy=30.0>, None]


In [15]:
## Caso queira customizar um gradiente de uma função do TensorFlow
@tf.custom_gradient
def my_better_softplus(z):
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    return tf.where(z > 30., z, tf.math.log(tf.exp(z) + 1.)), my_softplus_gradients

x = tf.Variable([1000.])
with tf.GradientTape() as tape:
    z = my_better_softplus(x)

z, tape.gradient(z, [x])

(<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1000.], dtype=float32)>,
 [<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>])

### Laços de Treinamento Personalizados

In [4]:
## Modelo
keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
    keras.layers.Dense(30, activation='elu', kernel_initializer='he_normal', kernel_regularizer=l2_reg),
    keras.layers.Dense(1, kernel_regularizer=l2_reg)
])

In [26]:
## Funções Úteis: Própria Barra de Progressão
def random_batch(X, y, batch_size=32):
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]
    
def progress_bar(iteration, total, size=30):
    running = iteration < total
    c = ">" if running else "="
    p = (size - 1) * iteration // total
    fmt = "{{:-{}d}}/{{}} [{{}}]".format(len(str(total)))
    params = [iteration, total, "=" * p + c + "." * (size - p - 1)]
    return fmt.format(*params)

def print_status_bar(iteration, total, loss, metrics=None, size=30):
    metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                         for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print("\r{} - {}".format(progress_bar(iteration, total), metrics), end=end)

In [17]:
## Definições
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]

In [28]:
## Laço Personalizado
for epoch in range(1, n_epochs+1):
    print('Epoch {}/{}'.format(epoch, n_epochs))
    for step in range(1, n_steps+1):
        X_batch, y_batch = random_batch(X_train_scaled, y_train)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        for variable in model.variables:
            if variable.constraint is not None:
                variable.assign(variable.constraint(variable))
        mean_loss(loss)
        for metric in metrics:
            metric(y_batch, y_pred)
        print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
    print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
    for metric in [mean_loss] + metrics:
        metric.reset_states()   

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


## Funções e Grafos no TensorFlow
- Keras automaticamente converte as funções normais personalizadas (métrica, perda, etc) para TF. Se deve tomar cuidado em usar sempre tensores, pois valores numéricos diferentes no Python exigem novos grafos. 

In [2]:
## Primeira opção
def cube(x):
    return x**3

tf_cube = tf.function(cube)
tf_cube

<tensorflow.python.eager.def_function.Function at 0x17e7dd1ba60>

In [5]:
## Segunda opção
@tf.function
def tf_cube(x):
    return x ** 3

## Para acessar a função Python:
tf_cube.python_function(2)

8

### Regras a serem respeitadas:
- Evitar o uso de bibliotecas externas, usar equivalentes do TensorFlow, já que o grafo pode apenas conter operações do TensorFlow.
- Criar as variáveis fora da tf.function e, caso queira fazer a variável assumir um novo valor, utilizar o método <span style='font-family:monospace;'>assign()</span>.
- O código fonte da função deve ser acessível para o TensorFlow.
- O TensorFlow só captura loops que iteram sobre um tensor (<span style='font-family:monospace;'>tf.range(x)</span>). Entretanto, pode ser que você queira que ele rode durante o traçado, como no caso em que definimos as camadas da rede.
- Prefira formas vetorizadas que laços.