# **Custom Models & Training with TensorFlow**

In [11]:
import tensorflow as tf
from tensorflow import keras
import numpy as np
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Set random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

# ============================================================================
# 1. CUSTOM LOSS FUNCTIONS
# ============================================================================

def huber_fn(y_true, y_pred):
    """Simple Huber loss function"""
    error = y_true - y_pred
    is_small_error = tf.abs(error) < 1
    squared_loss = tf.square(error) / 2
    linear_loss = tf.abs(error) - 0.5
    return tf.where(is_small_error, squared_loss, linear_loss)

def create_huber(threshold=1.0):
    """Factory function to create Huber loss with custom threshold"""
    def huber_fn(y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < threshold
        squared_loss = tf.square(error) / 2
        linear_loss = threshold * tf.abs(error) - threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)
    return huber_fn

class HuberLoss(keras.losses.Loss):
    """Custom Huber Loss class"""
    def __init__(self, threshold=1.0, **kwargs):
        self.threshold = threshold
        super().__init__(**kwargs)

    def call(self, y_true, y_pred):
        error = y_true - y_pred
        is_small_error = tf.abs(error) < self.threshold
        squared_loss = tf.square(error) / 2
        linear_loss = self.threshold * tf.abs(error) - self.threshold**2 / 2
        return tf.where(is_small_error, squared_loss, linear_loss)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# ============================================================================
# 2. CUSTOM FUNCTIONS (ACTIVATION, INITIALIZER, REGULARIZER, CONSTRAINT)
# ============================================================================

def my_softplus(z):
    """Custom softplus activation"""
    return tf.math.log(tf.exp(z) + 1.0)

def my_glorot_initializer(shape, dtype=tf.float32):
    """Custom Glorot initializer"""
    stddev = tf.sqrt(2. / (shape[0] + shape[1]))
    return tf.random.normal(shape, stddev=stddev, dtype=dtype)

def my_l1_regularizer(weights):
    """Custom L1 regularizer"""
    return tf.reduce_sum(tf.abs(0.01 * weights))

def my_positive_weights(weights):
    """Custom constraint to keep weights positive"""
    return tf.where(weights < 0., tf.zeros_like(weights), weights)

class MyL1Regularizer(keras.regularizers.Regularizer):
    """Custom L1 Regularizer class"""
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, weights):
        return tf.reduce_sum(tf.abs(self.factor * weights))

    def get_config(self):
        return {"factor": self.factor}

# ============================================================================
# 3. CUSTOM METRICS
# ============================================================================

class HuberMetric(keras.metrics.Metric):
    """Custom Huber metric"""
    def __init__(self, threshold=1.0, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.huber_fn = create_huber(threshold)
        # Fixed: Pass shape as tuple, not string
        self.total = self.add_weight(name="total", shape=(), initializer="zeros")
        self.count = self.add_weight(name="count", shape=(), initializer="zeros")

    def update_state(self, y_true, y_pred, sample_weight=None):
        metric = self.huber_fn(y_true, y_pred)
        self.total.assign_add(tf.reduce_sum(metric))
        self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))

    def result(self):
        return self.total / self.count

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "threshold": self.threshold}

# ============================================================================
# 4. CUSTOM LAYERS
# ============================================================================

class MyDense(keras.layers.Layer):
    """Custom Dense layer"""
    def __init__(self, units, activation=None, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.activation = keras.activations.get(activation)

    def build(self, input_shape):
        self.kernel = self.add_weight(
            name='kernel',
            shape=(input_shape[-1], self.units),
            initializer='glorot_uniform',
            trainable=True
        )
        self.bias = self.add_weight(
            name='bias',
            shape=(self.units,),
            initializer='zeros',
            trainable=True
        )

    def call(self, X):
        return self.activation(X @ self.kernel + self.bias)

    def compute_output_shape(self, batch_input_shape):
        # Handle both tuple and TensorShape inputs
        if hasattr(batch_input_shape, 'as_list'):
            shape_list = batch_input_shape.as_list()
        else:
            shape_list = list(batch_input_shape)
        return tf.TensorShape(shape_list[:-1] + [self.units])

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "units": self.units,
                "activation": keras.activations.serialize(self.activation)}

class MyGaussianNoise(keras.layers.Layer):
    """Custom Gaussian Noise layer"""
    def __init__(self, stddev, **kwargs):
        super().__init__(**kwargs)
        self.stddev = stddev

    def call(self, X, training=None):
        if training:
            noise = tf.random.normal(tf.shape(X), stddev=self.stddev)
            return X + noise
        else:
            return X

    def compute_output_shape(self, batch_input_shape):
        return batch_input_shape

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "stddev": self.stddev}

class ResidualBlock(keras.layers.Layer):
    """Custom Residual Block"""
    def __init__(self, n_layers, n_neurons, **kwargs):
        super().__init__(**kwargs)
        self.n_layers = n_layers
        self.n_neurons = n_neurons
        self.hidden = [keras.layers.Dense(n_neurons, activation="elu",
                                        kernel_initializer="he_normal")
                      for _ in range(n_layers)]

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        return inputs + Z

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "n_layers": self.n_layers, "n_neurons": self.n_neurons}

# ============================================================================
# 5. CUSTOM MODELS
# ============================================================================

class ResidualRegressor(keras.Model):
    """Custom model with residual blocks"""
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.output_dim = output_dim
        self.hidden1 = keras.layers.Dense(30, activation="elu",
                                        kernel_initializer="he_normal")
        self.block1 = ResidualBlock(2, 30)
        self.block2 = ResidualBlock(2, 30)
        self.out = keras.layers.Dense(output_dim)

    def call(self, inputs):
        Z = self.hidden1(inputs)
        for _ in range(1 + 3):
            Z = self.block1(Z)
        Z = self.block2(Z)
        return self.out(Z)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "output_dim": self.output_dim}

class ReconstructingRegressor(keras.Model):
    """Custom model with reconstruction loss"""
    def __init__(self, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.output_dim = output_dim
        self.hidden = [keras.layers.Dense(30, activation="selu",
                                        kernel_initializer="lecun_normal")
                      for _ in range(5)]
        self.out = keras.layers.Dense(output_dim)

    def build(self, input_shape):
        n_inputs = input_shape[-1]
        self.reconstruct = keras.layers.Dense(n_inputs)
        super().build(input_shape)

    def call(self, inputs):
        Z = inputs
        for layer in self.hidden:
            Z = layer(Z)
        reconstruction = self.reconstruct(Z)
        recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
        self.add_loss(0.05 * recon_loss)
        return self.out(Z)

    def get_config(self):
        base_config = super().get_config()
        return {**base_config, "output_dim": self.output_dim}

# ============================================================================
# 6. CUSTOM GRADIENT FUNCTION
# ============================================================================

@tf.function
def f(w1, w2):
    """Example function for gradient computation"""
    return 3 * w1**2 + 2 * w1 * w2

@tf.custom_gradient
def my_better_softplus(z):
    """Custom softplus with custom gradient"""
    exp = tf.exp(z)
    def my_softplus_gradients(grad):
        return grad / (1 + 1 / exp)
    return tf.math.log(exp + 1), my_softplus_gradients

# ============================================================================
# 7. TRAINING UTILITIES
# ============================================================================

def random_batch(X, y, batch_size=32):
    """Generate random batch"""
    idx = np.random.randint(len(X), size=batch_size)
    return X[idx], y[idx]

def print_status_bar(iteration, total, loss, metrics=None):
    """Print training progress"""
    metrics_str = " - ".join(["{}: {:.4f}".format(m.name, m.result())
                             for m in [loss] + (metrics or [])])
    end = "" if iteration < total else "\n"
    print(f"\r{iteration}/{total} - {metrics_str}", end=end)

# ============================================================================
# 8. MAIN TRAINING PIPELINE
# ============================================================================

def main():
    print("Creating synthetic dataset...")
    # Create sample data
    X, y = make_regression(n_samples=1000, n_features=10, noise=0.1, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Scale the data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    print(f"Training data shape: {X_train_scaled.shape}")
    print(f"Training labels shape: {y_train.shape}")

    # ========================================================================
    # Example 1: Using functional custom loss
    # ========================================================================
    print("\n" + "="*50)
    print("EXAMPLE 1: Model with custom Huber loss function")
    print("="*50)

    model1 = keras.Sequential([
        keras.layers.Dense(64, activation="relu", input_shape=(X_train_scaled.shape[1],)),
        keras.layers.Dense(32, activation="relu"),
        keras.layers.Dense(1)
    ])

    model1.compile(loss=huber_fn, optimizer="adam", metrics=["mae"])

    history1 = model1.fit(X_train_scaled, y_train,
                         validation_data=(X_test_scaled, y_test),
                         epochs=5, batch_size=32, verbose=1)

    # ========================================================================
    # Example 2: Using custom loss class
    # ========================================================================
    print("\n" + "="*50)
    print("EXAMPLE 2: Model with custom Huber loss class")
    print("="*50)

    model2 = keras.Sequential([
        keras.layers.Dense(64, activation="relu", input_shape=(X_train_scaled.shape[1],)),
        keras.layers.Dense(32, activation="relu"),
        keras.layers.Dense(1)
    ])

    model2.compile(loss=HuberLoss(2.0), optimizer="adam", metrics=[HuberMetric(2.0)])

    history2 = model2.fit(X_train_scaled, y_train,
                         validation_data=(X_test_scaled, y_test),
                         epochs=5, batch_size=32, verbose=1)

    # ========================================================================
    # Example 3: Using custom layers
    # ========================================================================
    print("\n" + "="*50)
    print("EXAMPLE 3: Model with custom layers")
    print("="*50)

    model3 = keras.Sequential([
        MyDense(64, activation="relu"),
        MyGaussianNoise(0.1),
        MyDense(32, activation="relu"),
        MyDense(1)
    ])

    model3.compile(loss="mse", optimizer="adam", metrics=["mae"])
    model3.fit(X_train_scaled, y_train,
               validation_data=(X_test_scaled, y_test),
               epochs=5, batch_size=32, verbose=1)

    # ========================================================================
    # Example 4: Using custom model (ResidualRegressor)
    # ========================================================================
    print("\n" + "="*50)
    print("EXAMPLE 4: Custom ResidualRegressor model")
    print("="*50)

    model4 = ResidualRegressor(output_dim=1)
    model4.compile(loss="mse", optimizer="adam", metrics=["mae"])
    model4.fit(X_train_scaled, y_train,
               validation_data=(X_test_scaled, y_test),
               epochs=5, batch_size=32, verbose=1)

    # ========================================================================
    # Example 5: Custom training loop
    # ========================================================================
    print("\n" + "="*50)
    print("EXAMPLE 5: Custom training loop")
    print("="*50)

    # Create a simple model for custom training
    model5 = keras.Sequential([
        keras.layers.Dense(64, activation="relu", input_shape=(X_train_scaled.shape[1],)),
        keras.layers.Dense(32, activation="relu"),
        keras.layers.Dense(1)
    ])

    # Custom training parameters
    n_epochs = 3
    batch_size = 32
    n_steps = len(X_train_scaled) // batch_size
    optimizer = keras.optimizers.Adam(learning_rate=0.01)
    loss_fn = keras.losses.MeanSquaredError()

    # Metrics
    mean_loss = keras.metrics.Mean()
    mae_metric = keras.metrics.MeanAbsoluteError()

    # Custom training loop
    for epoch in range(1, n_epochs + 1):
        print(f"Epoch {epoch}/{n_epochs}")

        # Reset metrics
        mean_loss.reset_state()
        mae_metric.reset_state()

        for step in range(1, n_steps + 1):
            X_batch, y_batch = random_batch(X_train_scaled, y_train, batch_size)

            with tf.GradientTape() as tape:
                y_pred = model5(X_batch, training=True)
                main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
                # Add regularization losses if any
                loss = tf.add_n([main_loss] + model5.losses)

            # Compute and apply gradients
            gradients = tape.gradient(loss, model5.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model5.trainable_variables))

            # Update metrics
            mean_loss.update_state(loss)
            mae_metric.update_state(y_batch, y_pred)

            # Print progress
            if step % 10 == 0 or step == n_steps:
                print_status_bar(step * batch_size, len(y_train), mean_loss, [mae_metric])

        print()  # New line after each epoch

    # ========================================================================
    # Example 6: Gradient computation example
    # ========================================================================
    print("\n" + "="*50)
    print("EXAMPLE 6: Custom gradient computation")
    print("="*50)

    # Variables for gradient computation
    w1 = tf.Variable(3.0)
    w2 = tf.Variable(4.0)

    with tf.GradientTape(persistent=True) as tape:
        z = f(w1, w2)

    dz_dw1 = tape.gradient(z, w1)
    dz_dw2 = tape.gradient(z, w2)

    print(f"f(w1={w1.numpy()}, w2={w2.numpy()}) = {z.numpy()}")
    print(f"df/dw1 = {dz_dw1.numpy()}")
    print(f"df/dw2 = {dz_dw2.numpy()}")

    # Clean up
    del tape

    print("\n" + "="*50)
    print("All examples completed successfully!")
    print("="*50)

if __name__ == "__main__":
    main()

Creating synthetic dataset...
Training data shape: (800, 10)
Training labels shape: (800,)

EXAMPLE 1: Model with custom Huber loss function
Epoch 1/5


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 17ms/step - loss: 103.1421 - mae: 103.6409 - val_loss: 104.2953 - val_mae: 104.7953
Epoch 2/5
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - loss: 102.7409 - mae: 103.2391 - val_loss: 103.7648 - val_mae: 104.2648
Epoch 3/5
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - loss: 102.1120 - mae: 102.6104 - val_loss: 102.6833 - val_mae: 103.1833
Epoch 4/5
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 8ms/step - loss: 100.8235 - mae: 101.3217 - val_loss: 100.5299 - val_mae: 101.0299
Epoch 5/5
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - loss: 98.3268 - mae: 98.8260 - val_loss: 96.7364 - val_mae: 97.2358

EXAMPLE 2: Model with custom Huber loss class
Epoch 1/5
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 10ms/step - huber_metric_4: 6577.9766 - loss: 205.5618 - val_huber_metric_4: 6435.0059 - val_loss: 208.3313