# <font color="#418FDE" size="6.5" uppercase>**Customization**</font>

>Last update: 20260127.
    
By the end of this Lecture, you will be able to:
- Implement custom Keras layers by subclassing tf.keras.layers.Layer with build and call methods. 
- Define custom loss and metric functions or classes compatible with Keras training workflows. 
- Integrate custom components into models and verify they save and load correctly. 


## **1. Building Custom Layers**

### **1.1. Subclassing Keras Layers**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_01_01.jpg?v=1769558974" width="250">



>* Custom layers capture problem-specific tensor operations
>* They add structure, parameters, and reusable configurations

>* Custom layers have a clear, repeatable lifecycle
>* They initialize, manage weights, and transform inputs predictably

>* Subclassing enables complex, custom layer behaviors and logic
>* Keeps compatibility with Keras training, saving, deployment



In [None]:
#@title Python Code - Subclassing Keras Layers

# This script shows a simple custom layer.
# It focuses on subclassing tf.keras.layers.Layer.
# Run cells to see shapes and behavior.

# !pip install tensorflow==2.20.0.

# Import required TensorFlow modules.
import tensorflow as tf

# Set deterministic random seeds for reproducibility.
tf.random.set_seed(7)

# Print TensorFlow version in one short line.
print("TensorFlow version:", tf.__version__)

# Define a custom scaling layer using subclassing.
class ScalingLayer(tf.keras.layers.Layer):

    # Initialize layer and store configuration values.
    def __init__(self, scale_init=1.0, **kwargs):
        super().__init__(**kwargs)
        self.scale_init = float(scale_init)

    # Create trainable weights based on input shape.
    def build(self, input_shape):
        if len(input_shape) < 2:
            raise ValueError("Input rank must be at least two.")
        last_dim = int(input_shape[-1])
        if last_dim <= 0:
            raise ValueError("Last dimension must be positive.")
        self.scale = self.add_weight(
            name="scale", shape=(last_dim,), initializer=tf.keras.initializers.Constant(
                self.scale_init
            ), trainable=True,
        )
        super().build(input_shape)

    # Define the forward computation for the layer.
    def call(self, inputs):
        inputs = tf.convert_to_tensor(inputs)
        if inputs.shape.rank is None:
            raise ValueError("Inputs must have known rank.")
        return inputs * self.scale

    # Enable serialization of configuration values.
    def get_config(self):
        config = super().get_config()
        config.update({"scale_init": self.scale_init})
        return config

# Create a tiny model that uses the custom layer.
inputs = tf.keras.Input(shape=(3,), name="features")

# Apply the custom scaling layer to the inputs.
x = ScalingLayer(scale_init=0.5, name="scaling_layer")(inputs)

# Add a small dense layer for demonstration.
outputs = tf.keras.layers.Dense(1, activation="linear")(x)

# Build the Keras model object.
model = tf.keras.Model(inputs=inputs, outputs=outputs, name="demo_model")

# Compile the model with simple settings.
model.compile(optimizer="sgd", loss="mse", metrics=["mae"])

# Create a tiny deterministic dataset for training.
x_train = tf.constant([[1.0, 2.0, 3.0], [0.5, 0.0, -1.0]], dtype=tf.float32)

y_train = tf.constant([[1.0], [0.0]], dtype=tf.float32)

# Train briefly with silent output to avoid logs.
model.fit(x_train, y_train, epochs=5, batch_size=1, verbose=0)

# Show model prediction before saving and loading.
original_pred = model.predict(x_train, verbose=0)

# Save the model including the custom layer.
model_path = "scaling_layer_demo.keras"

# Use the native Keras saving format.
model.save(model_path)

# Load the model with custom_objects mapping.
loaded_model = tf.keras.models.load_model(
    model_path, custom_objects={"ScalingLayer": ScalingLayer}
)

# Compute predictions with the loaded model.
loaded_pred = loaded_model.predict(x_train, verbose=0)

# Print shapes and a few prediction values.
print("Input shape:", x_train.shape)

# Show the learned scale weights from original model.
print("Original scale weights:", model.get_layer("scaling_layer").get_weights())

# Show the learned scale weights from loaded model.
print("Loaded scale weights:", loaded_model.get_layer("scaling_layer").get_weights())

# Compare first prediction from original and loaded models.
print("Original first prediction:", float(original_pred[0, 0]))

# Final line prints loaded first prediction for confirmation.
print("Loaded first prediction:", float(loaded_pred[0, 0]))




### **1.2. Managing Trainable Weights**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_01_02.jpg?v=1769559073" width="250">



>* Custom layers must clearly declare trainable weights
>* Clear weights enable training, saving, and reuse

>* Choose which layer parameters are learnable
>* Separate trainable, frozen parts to control training

>* Choose weight shapes, initializations, and constraints carefully
>* Good choices improve training stability, speed, and robustness



In [None]:
#@title Python Code - Managing Trainable Weights

# This script shows custom layer weights management.
# It focuses on trainable and nontrainable TensorFlow weights.
# Run cells sequentially to follow the simple example.

# !pip install tensorflow==2.20.0.

# Import TensorFlow and NumPy with short aliases.
import tensorflow as tf
import numpy as np

# Print TensorFlow version in one concise line.
print("TensorFlow version:", tf.__version__)

# Set deterministic seeds for reproducible behavior.
tf.random.set_seed(7)
np.random.seed(7)

# Define a custom scaling layer with explicit weights.
class ScalingLayer(tf.keras.layers.Layer):
    # Initialize layer and choose trainable behavior.
    def __init__(self, trainable_scale=True, **kwargs):
        super().__init__(**kwargs)
        self._trainable_scale = bool(trainable_scale)

    # Create weights once input shape is known.
    def build(self, input_shape):
        feature_dim = int(input_shape[-1])
        if feature_dim <= 0:
            raise ValueError("Feature dimension must be positive.")
        self.scale = self.add_weight(
            name="scale", shape=(feature_dim,), initializer="ones",
            trainable=self._trainable_scale,
        )
        self.bias = self.add_weight(
            name="bias", shape=(feature_dim,), initializer="zeros",
            trainable=True,
        )
        self.running_mean = self.add_weight(
            name="running_mean", shape=(feature_dim,), initializer="zeros",
            trainable=False,
        )
        super().build(input_shape)

    # Define the forward computation for the layer.
    def call(self, inputs, training=False):
        inputs = tf.convert_to_tensor(inputs)
        if training:
            batch_mean = tf.reduce_mean(inputs, axis=0)
            self.running_mean.assign(batch_mean)
        return inputs * self.scale + self.bias

# Create a small model using the custom layer.
inputs = tf.keras.Input(shape=(3,))
outputs = ScalingLayer(trainable_scale=True)(inputs)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

# Show how many trainable weights the model has.
print("Trainable weights count:", len(model.trainable_weights))

# Show how many nontrainable weights the model has.
print("Nontrainable weights count:", len(model.non_trainable_weights))

# Build a tiny dataset of simple numeric pairs.
x_data = np.array([[1.0, 2.0, 3.0]], dtype=np.float32)
y_data = np.array([[2.0, 4.0, 6.0]], dtype=np.float32)

# Compile the model with a basic optimizer and loss.
model.compile(optimizer="sgd", loss="mse")

# Train briefly with silent output to update weights.
model.fit(x_data, y_data, epochs=20, verbose=0)

# Inspect the learned scale and bias values after training.
scale_value = model.layers[1].scale.numpy()
bias_value = model.layers[1].bias.numpy()

# Print a short summary of learned parameters.
print("Learned scale:", np.round(scale_value, 3))
print("Learned bias:", np.round(bias_value, 3))

# Demonstrate that running_mean stayed nontrainable during training.
print("Running mean:", np.round(model.layers[1].running_mean.numpy(), 3))




### **1.3. Using build vs init**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_01_03.jpg?v=1769559163" width="250">



>* Constructor sets hyperparameters independent of input shape
>* Build creates input-shaped weights when data arrives

>* build delays weight creation for flexible inputs
>* constructor stores fixed choices; build creates variables

>* Avoid creating input-shaped variables in constructors
>* Create weights in build to ensure flexibility



In [None]:
#@title Python Code - Using build vs init

# This script explains build versus init clearly.
# It shows a custom dense layer example.
# It compares behavior and prints useful shapes.

# !pip install tensorflow==2.20.0.

# Import required modules from TensorFlow.
import tensorflow as tf

# Set a deterministic random seed for reproducibility.
tf.random.set_seed(7)

# Print TensorFlow version in one short line.
print("TensorFlow version:", tf.__version__)

# Define a custom dense layer using build correctly.
class MyDenseBuilt(tf.keras.layers.Layer):
    # Store configuration only inside the constructor.
    def __init__(self, units, use_bias=True, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.use_bias = use_bias

    # Create weights using the input shape inside build.
    def build(self, input_shape):
        last_dim = int(input_shape[-1])
        self.w = self.add_weight(
            name="kernel", shape=(last_dim, self.units)
        )
        if self.use_bias:
            self.b = self.add_weight(
                name="bias", shape=(self.units,)
            )
        else:
            self.b = None
        super().build(input_shape)

    # Define the forward computation inside call.
    def call(self, inputs):
        outputs = tf.matmul(inputs, self.w)
        if self.b is not None:
            outputs = outputs + self.b
        return outputs

# Define a layer that incorrectly creates weights in __init__.
class MyDenseInit(tf.keras.layers.Layer):
    # Try to guess input dimension inside constructor.
    def __init__(self, units, guessed_input_dim, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.guessed_input_dim = guessed_input_dim
        self.w = self.add_weight(
            name="kernel_init", shape=(guessed_input_dim, units)
        )

    # Keep build empty because weights already exist.
    def build(self, input_shape):
        super().build(input_shape)

    # Forward pass uses the guessed kernel shape.
    def call(self, inputs):
        return tf.matmul(inputs, self.w)

# Create two small input tensors with different feature sizes.
inputs_4 = tf.ones(shape=(2, 4), dtype=tf.float32)

# Another input has three features instead of four.
inputs_3 = tf.ones(shape=(2, 3), dtype=tf.float32)

# Instantiate the correctly built layer with five units.
layer_built = MyDenseBuilt(units=5, use_bias=True)

# Call the layer on inputs_4 to trigger build.
outputs_built_4 = layer_built(inputs_4)

# Call the same layer on inputs_3 to test flexibility.
try:
    outputs_built_3 = layer_built(inputs_3)
    error_message_built = None
except Exception as exc:
    outputs_built_3 = None
    error_message_built = str(exc)

# Instantiate the incorrect layer guessing four features.
layer_init = MyDenseInit(units=5, guessed_input_dim=4)

# Call the incorrect layer on matching shape inputs.
outputs_init_4 = layer_init(inputs_4)

# Try calling incorrect layer on mismatched shape safely.
try:
    outputs_init_3 = layer_init(inputs_3)
    error_message = None
except Exception as exc:
    outputs_init_3 = None
    error_message = str(exc)

# Print shapes created by the correctly built layer.
print("MyDenseBuilt kernel shape:", layer_built.w.shape)

# Show output shapes for both compatible input sizes.
print("Output shape with inputs_4:", outputs_built_4.shape)

# The same layer adapts to inputs_3 automatically.
print("Output shape with inputs_3:", getattr(outputs_built_3, "shape", None))

# Print the error message or a fallback string for built layer.
print("Error when using inputs_3 with MyDenseBuilt:")
print(error_message_built if error_message_built else "No error raised.")

# Print kernel shape for the incorrect initialization layer.
print("MyDenseInit kernel shape:", layer_init.w.shape)

# Show output shape when guessed dimension matches input.
print("Output shape init with inputs_4:", outputs_init_4.shape)

# Print whether calling with wrong shape produced an error.
print("Error when using inputs_3 with MyDenseInit:")

# Print the error message or a fallback string.
print(error_message if error_message else "No error raised.")



## **2. Designing Custom Losses**

### **2.1. Function Based Losses**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_02_01.jpg?v=1769559294" width="250">



>* Loss functions map predictions and targets to error
>* They encode which mistakes matter for the task

>* Loss functions must be batched, differentiable, scalar
>* Shape loss to emphasize domain-specific risk priorities

>* Custom losses encode constraints and domain rules
>* They integrate smoothly into existing training workflows



In [None]:
#@title Python Code - Function Based Losses

# This script shows custom function based losses.
# It uses TensorFlow Keras with small dummy data.
# Focus on clarity not model performance today.

# !pip install tensorflow==2.20.0.

# Import required libraries for TensorFlow usage.
import os
import random
import numpy as np
import tensorflow as tf

# Set seeds for reproducible behavior in training.
seed_value = 42
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)

# Print TensorFlow version in one short line.
print("TensorFlow version:", tf.__version__)

# Select device preference based on GPU availability.
physical_gpus = tf.config.list_physical_devices("GPU")
if physical_gpus:
    device_name = "GPU"
else:
    device_name = "CPU"

# Print which device type will likely be used.
print("Using device type:", device_name)

# Create small synthetic regression style dataset.
num_samples = 64
x_data = np.linspace(-1.0, 1.0, num_samples).reshape(-1, 1)
noise = 0.1 * np.random.randn(num_samples, 1)

# Generate targets with simple linear relationship.
y_true = 2.0 * x_data + 0.5 + noise

# Validate shapes before building the model.
assert x_data.shape == (num_samples, 1)
assert y_true.shape == (num_samples, 1)

# Define a simple baseline mean squared error loss.
def mse_loss(y_true_batch, y_pred_batch):
    squared_error = tf.square(y_true_batch - y_pred_batch)
    return tf.reduce_mean(squared_error)

# Define custom loss penalizing underestimates more strongly.
def asymmetric_underestimate_loss(y_true_batch, y_pred_batch):
    diff = y_pred_batch - y_true_batch
    under_mask = tf.cast(diff < 0.0, tf.float32)
    over_mask = 1.0 - under_mask

    # Apply larger weight when model underestimates targets.
    under_weight = 3.0
    over_weight = 1.0
    weighted_error = (
        under_weight * under_mask * tf.square(diff) +
        over_weight * over_mask * tf.square(diff)
    )

    # Return mean loss value over the batch.
    return tf.reduce_mean(weighted_error)

# Build a tiny sequential regression model.
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(1,)),
    tf.keras.layers.Dense(8, activation="relu"),
    tf.keras.layers.Dense(1)
])

# Compile model with custom asymmetric loss function.
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.05),
    loss=asymmetric_underestimate_loss,
    metrics=[mse_loss]
)

# Train briefly with silent verbose setting.
history = model.fit(
    x_data,
    y_true,
    epochs=40,
    batch_size=16,
    verbose=0
)

# Evaluate model on same small dataset silently.
loss_value, mse_value = model.evaluate(
    x_data,
    y_true,
    verbose=0
)

# Make a few predictions for inspection.
x_test = np.array([[-0.5], [0.0], [0.5]], dtype=np.float32)
y_pred = model.predict(x_test, verbose=0)

# Print concise summary of training results.
print("Custom asymmetric loss on training data:", float(loss_value))
print("Baseline MSE metric on training data:", float(mse_value))

# Print example predictions versus expected linear targets.
print("Test inputs:", x_test.reshape(-1))
print("Model predictions:", y_pred.reshape(-1))
print("Ideal targets (approx):", (2.0 * x_test + 0.5).reshape(-1))

# Save and reload model to confirm loss compatibility.
save_path = "custom_loss_model.keras"
model.save(save_path, include_optimizer=False)
reloaded_model = tf.keras.models.load_model(
    save_path,
    custom_objects={"asymmetric_underestimate_loss": asymmetric_underestimate_loss,
                    "mse_loss": mse_loss}
)

# Evaluate reloaded model to verify consistent behavior.
reloaded_loss, reloaded_mse = reloaded_model.evaluate(
    x_data,
    y_true,
    verbose=0
)
print("Reloaded model loss and mse:", float(reloaded_loss), float(reloaded_mse))



### **2.2. Loss classes with call**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_02_02.jpg?v=1769559342" width="250">



>* Loss classes add configuration, state, and helpers
>* call computes loss using tunable, reusable behavior

>* Class-based losses serialize cleanly with model configs
>* They preserve behavior across environments, deployments, and audits

>* Class losses can adapt using tracked statistics
>* They keep training loops simple while adding sophistication



In [None]:
#@title Python Code - Loss classes with call

# This script shows a custom loss class.
# It focuses on the call method usage.
# It keeps training tiny and output short.

# !pip install tensorflow==2.20.0.

# Import required standard libraries.
import os
import random
import numpy as np

# Import TensorFlow and Keras components.
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Set deterministic random seeds.
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# Print TensorFlow version briefly.
print("TensorFlow version:", tf.__version__)

# Define a custom loss class with call.
class ScaledMAELoss(keras.losses.Loss):
    # Initialize with a scale hyperparameter.
    def __init__(self, scale=1.0, name="scaled_mae", reduction=keras.losses.Reduction.SUM_OVER_BATCH_SIZE):
        super().__init__(name=name, reduction=reduction)
        self.scale = tf.convert_to_tensor(scale, dtype=tf.float32)

    # Implement the core loss computation.
    def call(self, y_true, y_pred):
        y_true = tf.convert_to_tensor(y_true, dtype=tf.float32)
        y_pred = tf.convert_to_tensor(y_pred, dtype=tf.float32)
        tf.debugging.assert_shapes([(y_true, (None, 1)), (y_pred, (None, 1))])
        mae = tf.reduce_mean(tf.abs(y_true - y_pred))
        return self.scale * mae

# Create a tiny synthetic regression dataset.
num_samples = 64
x_values = np.linspace(-1.0, 1.0, num_samples).astype("float32")
noise = 0.05 * np.random.randn(num_samples).astype("float32")

# Compute targets with a simple linear rule.
y_values = (2.0 * x_values + 0.5 + noise).reshape(-1, 1)

# Reshape features for Keras dense input.
x_values = x_values.reshape(-1, 1)

# Build a simple sequential regression model.
model = keras.Sequential([
    layers.Input(shape=(1,)),
    layers.Dense(8, activation="relu"),
    layers.Dense(1)
])

# Instantiate the custom loss with a chosen scale.
custom_loss = ScaledMAELoss(scale=2.0)

# Compile the model using the custom loss.
model.compile(optimizer="adam", loss=custom_loss, metrics=["mae"])

# Train briefly with silent logging.
history = model.fit(x_values, y_values, epochs=10, batch_size=16, verbose=0)

# Evaluate the trained model once.
loss_value, mae_value = model.evaluate(x_values, y_values, verbose=0)

# Show the final loss and metric values.
print("Final scaled loss:", float(loss_value))
print("Final MAE metric:", float(mae_value))

# Save the model including the custom loss configuration.
model_path = "custom_loss_model.keras"
model.save(model_path, include_optimizer=False)

# Load the model with custom_objects mapping.
loaded_model = keras.models.load_model(
    model_path,
    custom_objects={"ScaledMAELoss": ScaledMAELoss}
)

# Evaluate the loaded model to verify behavior.
loaded_loss, loaded_mae = loaded_model.evaluate(
    x_values,
    y_values,
    verbose=0
)

# Print a short confirmation of consistency.
print("Loaded model loss:", float(loaded_loss))
print("Loaded model MAE:", float(loaded_mae))
print("Loss class scale attribute:", float(custom_loss.scale))



### **2.3. Loss Reduction and Masking**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_02_03.jpg?v=1769559417" width="250">



>* Loss reduction aggregates per-example errors into one
>* Choose averaging, summing, or weighting to match priorities

>* Masking selects which elements affect loss aggregation
>* It ignores padding or missing data, focusing learning

>* Align loss with masks and workflow expectations
>* Careful design improves performance, stability, and reuse



## **3. Managing Custom Metrics**

### **3.1. Metric State Variables**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_03_01.jpg?v=1769559589" width="250">



>* Metrics use state variables to accumulate information
>* Stored sums and counts produce final averaged values

>* Metric state variables integrate with TensorFlow systems
>* They persist across saving, loading, and distributed training

>* State variables control metric behavior across workflows
>* They enable continuous tracking, export, and reliable restoration



In [None]:
#@title Python Code - Metric State Variables

# This script shows metric state variables.
# It uses a tiny model and custom metric.
# Focus is saving and loading metric state.

# !pip install tensorflow==2.20.0.

# Import required libraries safely.
import os
import numpy as np
import tensorflow as tf

# Set deterministic seeds for reproducibility.
pseed = 123
np.random.seed(pseed)
tf.random.set_seed(pseed)

# Print TensorFlow version briefly.
print("TensorFlow version:", tf.__version__)

# Define a simple custom metric class.
class RunningMeanAbsoluteError(tf.keras.metrics.Metric):
    # Initialize metric state variables.
    def __init__(self, name="running_mae", **kwargs):
        super().__init__(name=name, **kwargs)
        self.total_abs_error = self.add_weight(
            name="total_abs_error", initializer="zeros"
        )
        self.count = self.add_weight(
            name="count", initializer="zeros"
        )

    # Update state variables with new batch values.
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.cast(y_pred, tf.float32)
        tf.debugging.assert_shapes(
            [(y_true, (None, 1)), (y_pred, (None, 1))]
        )
        abs_error = tf.abs(y_true - y_pred)
        batch_sum = tf.reduce_sum(abs_error)
        batch_count = tf.cast(tf.size(abs_error), tf.float32)
        self.total_abs_error.assign_add(batch_sum)
        self.count.assign_add(batch_count)

    # Compute final metric value from state.
    def result(self):
        return tf.math.divide_no_nan(self.total_abs_error, self.count)

    # Reset state variables between epochs.
    def reset_states(self):
        self.total_abs_error.assign(0.0)
        self.count.assign(0.0)

# Create a tiny synthetic regression dataset.
num_samples = 64
x_data = np.linspace(-1.0, 1.0, num_samples).reshape(-1, 1)
noise = 0.1 * np.random.randn(num_samples, 1)
y_data = 2.0 * x_data + 0.5 + noise

# Build a minimal Keras Sequential model.
model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=(1,)),
    tf.keras.layers.Dense(4, activation="relu"),
    tf.keras.layers.Dense(1)
])

# Compile model with custom metric instance.
model.compile(
    optimizer="adam",
    loss="mse",
    metrics=[RunningMeanAbsoluteError()]
)

# Train briefly to update metric state variables.
history = model.fit(
    x_data,
    y_data,
    epochs=3,
    batch_size=16,
    verbose=0
)

# Show metric result after training.
metric_name = "running_mae"
final_metric = history.history[metric_name][-1]
print("Final training", metric_name, "before save:", final_metric)

# Save model including custom metric configuration.
save_path = "custom_metric_model.keras"
model.save(save_path)

# Load model with custom_objects mapping.
loaded_model = tf.keras.models.load_model(
    save_path,
    custom_objects={"RunningMeanAbsoluteError": RunningMeanAbsoluteError}
)

# Evaluate loaded model to confirm metric still works.
loaded_results = loaded_model.evaluate(
    x_data,
    y_data,
    batch_size=16,
    verbose=0,
    return_dict=True
)

# Print metric value after loading model.
print("Metric from loaded model:", loaded_results[metric_name])

# Inspect internal state variables of loaded metric.
loaded_metric = None
for m in loaded_model.metrics:
    if isinstance(m, RunningMeanAbsoluteError):
        loaded_metric = m
        break

# Safely print metric state variable values.
if loaded_metric is not None:
    print("Loaded total_abs_error:", float(loaded_metric.total_abs_error))
    print("Loaded count:", float(loaded_metric.count))
else:
    print("Custom metric instance not found in loaded model.")




### **3.2. Updating And Reading Metrics**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_03_02.jpg?v=1769559682" width="250">



>* Metric update combines new batch with state
>* Reading converts state into stable, logged value

>* Metrics update each batch and log performance
>* Domain metrics use cumulative stats for reporting

>* Metric state must persist across saves and restarts
>* Deterministic updates ensure consistent, trustworthy reported values



In [None]:
#@title Python Code - Updating And Reading Metrics

# This script shows custom metric behavior.
# It focuses on updating and reading metrics.
# It also demonstrates saving and loading models.

# !pip install tensorflow==2.20.0.

# Import required standard libraries.
import os
import random
import numpy as np

# Import TensorFlow and Keras components.
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Set deterministic seeds for reproducibility.
seed_value = 42
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)

# Print TensorFlow version in one short line.
print("TensorFlow version:", tf.__version__)

# Define a simple custom metric class.
class RunningMeanAbsoluteError(tf.keras.metrics.Metric):

    # Initialize metric with two state variables.
    def __init__(self, name="running_mae", **kwargs):
        super().__init__(name=name, **kwargs)
        self.total_abs_error = self.add_weight(
            name="total_abs_error", initializer="zeros"
        )
        self.total_samples = self.add_weight(
            name="total_samples", initializer="zeros"
        )

    # Update state using predictions and true targets.
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(y_true, tf.float32)
        y_pred = tf.cast(y_pred, tf.float32)
        tf.debugging.assert_shapes([(y_true, (None, 1)), (y_pred, (None, 1))])
        abs_error = tf.abs(y_true - y_pred)
        batch_error = tf.reduce_sum(abs_error)
        batch_size = tf.cast(tf.size(y_true), tf.float32)
        self.total_abs_error.assign_add(batch_error)
        self.total_samples.assign_add(batch_size)
        return self.result()

    # Read metric value from internal state.
    def result(self):
        return tf.math.divide_no_nan(self.total_abs_error, self.total_samples)

    # Reset state between epochs or evaluations.
    def reset_states(self):
        self.total_abs_error.assign(0.0)
        self.total_samples.assign(0.0)

# Create a tiny synthetic regression dataset.
num_samples = 128
x_data = np.linspace(-1.0, 1.0, num_samples).astype("float32")
y_data = (2.0 * x_data + 0.3).reshape(-1, 1).astype("float32")
x_data = x_data.reshape(-1, 1)

# Build a very small Keras model.
inputs = keras.Input(shape=(1,))
outputs = layers.Dense(1, activation="linear")(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate the custom metric for training.
running_mae = RunningMeanAbsoluteError()

# Compile model with custom metric included.
model.compile(
    optimizer="sgd",
    loss="mse",
    metrics=[running_mae],
)

# Train briefly with silent logs to avoid spam.
history = model.fit(
    x_data,
    y_data,
    epochs=3,
    batch_size=16,
    verbose=0,
)

# Read metric value after training completes.
final_metric_value = running_mae.result().numpy()
print("Running MAE after training:", float(final_metric_value))

# Show internal state variables for clarity.
print("Total absolute error state:", float(running_mae.total_abs_error.numpy()))
print("Total samples state:", float(running_mae.total_samples.numpy()))

# Save the model including custom metric configuration.
save_path = "custom_metric_model.keras"
model.save(save_path)

# Load the model with custom_objects mapping.
loaded_model = keras.models.load_model(
    save_path,
    custom_objects={"RunningMeanAbsoluteError": RunningMeanAbsoluteError},
)

# Evaluate loaded model to update its metric state.
results = loaded_model.evaluate(
    x_data,
    y_data,
    batch_size=16,
    verbose=0,
)

# Read metric value from loaded model evaluation.
loaded_metric_value = float(results[1])
print("Running MAE after loading:", loaded_metric_value)

# Confirm shapes are correct before manual metric update.
assert x_data.shape == y_data.shape

# Manually call metric on a small batch to illustrate update.
small_x = x_data[:4]
small_y = y_data[:4]
manual_metric = RunningMeanAbsoluteError()
manual_metric.update_state(small_y, small_x)
print("Manual metric value on small batch:", float(manual_metric.result().numpy()))



### **3.3. Resetting Metric State**

<img src="https://cdn.jsdelivr.net/gh/mhrafiei/contents@main/LFF/Master TensorFlow 2.20.0/Module_10/Lecture_A/image_03_03.jpg?v=1769559806" width="250">



>* Metrics accumulate batch information over time
>* Reset state to avoid mixed, misleading results

>* Reset metrics between epochs and workflow phases
>* Clean states keep comparisons fair and reliable

>* Reset metrics after loading models for reuse
>* Prevents stale state, keeps evaluations clean, trustworthy



In [None]:
#@title Python Code - Resetting Metric State

# This script shows resetting custom metric state.
# It uses a tiny model and synthetic data.
# Focus on clear metric behavior across phases.

# !pip install tensorflow==2.20.0.

# Import required libraries safely.
import os
import random
import numpy as np
import tensorflow as tf

# Set deterministic seeds for reproducibility.
seed_value = 42
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value)

# Print TensorFlow version briefly.
print("TensorFlow version:", tf.__version__)

# Define a simple custom accuracy like metric.
class SimpleAccuracy(tf.keras.metrics.Metric):
    def __init__(self, name="simple_accuracy", **kwargs):
        super().__init__(name=name, **kwargs)
        self.total_examples = self.add_weight(
            name="total_examples", initializer="zeros", dtype=tf.float32
        )
        self.correct_examples = self.add_weight(
            name="correct_examples", initializer="zeros", dtype=tf.float32
        )

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.cast(tf.reshape(y_true, [-1]), tf.int32)
        y_pred_labels = tf.argmax(y_pred, axis=-1, output_type=tf.int32)
        matches = tf.cast(tf.equal(y_true, y_pred_labels), tf.float32)
        batch_total = tf.cast(tf.size(y_true), tf.float32)
        self.total_examples.assign_add(batch_total)
        self.correct_examples.assign_add(tf.reduce_sum(matches))

    def result(self):
        return tf.math.divide_no_nan(self.correct_examples, self.total_examples)

    def reset_state(self):
        self.total_examples.assign(0.0)
        self.correct_examples.assign(0.0)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config}

# Create tiny synthetic classification data.
num_classes = 3
num_features = 4
num_train = 64
num_val = 32

# Generate random features and integer labels.
x_train = np.random.randn(num_train, num_features).astype("float32")
y_train = np.random.randint(num_classes, size=(num_train,)).astype("int32")
x_val = np.random.randn(num_val, num_features).astype("float32")
y_val = np.random.randint(num_classes, size=(num_val,)).astype("int32")

# Validate shapes before building the model.
assert x_train.shape[1] == num_features
assert x_val.shape[1] == num_features

# Build a tiny dense model using Keras.
inputs = tf.keras.Input(shape=(num_features,))
hidden = tf.keras.layers.Dense(8, activation="relu")(inputs)
outputs = tf.keras.layers.Dense(num_classes, activation="softmax")(hidden)
model = tf.keras.Model(inputs=inputs, outputs=outputs)

# Instantiate the custom metric for training.
train_metric = SimpleAccuracy(name="train_simple_accuracy")

# Compile the model with custom metric included.
model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=[train_metric],
)

# Train briefly with silent logs to avoid spam.
history = model.fit(
    x_train,
    y_train,
    epochs=2,
    batch_size=16,
    verbose=0,
)

# Show metric value after training phase.
train_metric_value = float(train_metric.result().numpy())
print("Metric after training phase:", round(train_metric_value, 4))

# Manually reset metric state before validation.
train_metric.reset_state()
print("Metric after manual reset:", float(train_metric.result().numpy()))

# Evaluate on validation data using a fresh metric.
val_results = model.evaluate(
    x_val,
    y_val,
    batch_size=16,
    verbose=0,
    return_dict=True,
)

# Print validation metric value clearly.
print("Validation metric from evaluate:", round(val_results["train_simple_accuracy"], 4))

# Save the model including custom metric configuration.
save_path = "custom_metric_model.keras"
model.save(save_path, include_optimizer=False)

# Load the model with custom_objects mapping.
loaded_model = tf.keras.models.load_model(
    save_path,
    custom_objects={"SimpleAccuracy": SimpleAccuracy},
)

# Evaluate loaded model to show fresh metric state.
loaded_results = loaded_model.evaluate(
    x_val,
    y_val,
    batch_size=16,
    verbose=0,
    return_dict=True,
)

# Print metric from loaded model evaluation.
print(
    "Loaded model validation metric:",
    round(loaded_results["train_simple_accuracy"], 4),
)



# <font color="#418FDE" size="6.5" uppercase>**Customization**</font>


In this lecture, you learned to:
- Implement custom Keras layers by subclassing tf.keras.layers.Layer with build and call methods. 
- Define custom loss and metric functions or classes compatible with Keras training workflows. 
- Integrate custom components into models and verify they save and load correctly. 

<font color='yellow'>Congratulations on completing this course!</font>