In [4]:
import numpy as np

import tensorflow as tf
from tensorflow import keras
import tensorflow_probability as tfp
from keras import layers, initializers, optimizers, losses

from tensorflow.keras.callbacks import Callback
from tqdm.keras import TqdmCallback
from tqdm import tqdm

2025-10-13 17:57:03.684211: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1760389023.701024    9340 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1760389023.706085    9340 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1760389023.719552    9340 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1760389023.719579    9340 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1760389023.719582    9340 computation_placer.cc:177] computation placer alr

In order for we to not have to import tensorflow every time we want to test something. Development for the models will be made here and then migrated to the proper .py files.


In [247]:
class FrailtyModelNN(keras.models.Model):

    def __init__(self, parameters, loglikelihood, input_dim = None, seed = None):
        super().__init__()
        self.parameters = {
            "theta": {"link": lambda x : tf.math.exp(x), "link_inv": lambda x : tf.math.log(x), "par_type": "nn", "update_func": None, "shape": None, "initializer": None},
            "tau": {"link": lambda x : tf.math.exp(x), "link_inv": lambda x : tf.math.log(x), "par_type": "nn", "update_func": None, "shape": 5, "initializer": None},
            "alpha": {"link": lambda x : 1/(1+tf.math.exp(-x)), "link_inv": lambda x : tf.math.log(x) - tf.math.log(1-x), "par_type": "independent", "update_func": None, "shape": None, "init": 0.5},
            "gamma": {"link": lambda x : tf.math.exp(x), "link_inv": lambda x : tf.math.log(x), "par_type": "independent", "update_func": None, "shape": None, "init": 1.0},
            "phi1": {"link": lambda x : tf.math.exp(x), "link_inv": lambda x : tf.math.log(x), "par_type": "independent", "update_func": None, "shape": None, "init": 1.0},
            "phi2": {"link": lambda x : tf.identity(x), "link_inv": lambda x : tf.identity(x), "par_type": "independent", "update_func": None, "shape": None, "init": 0.0}
        }
        self.n_acum_step = tf.Variable(0, dtype = tf.int32, trainable = False)

        if(input_dim is not None):
            self.define_structure(input_dim, seed)


    def clone_architecture(self, model, input_dim):
        pass
    
    def define_structure(self, input_dim, seed):

        # Goes through the list of parameters for the model and filter them by their classes:
        # - "nn" will be treated as an output from a given neural network that receives the variables x as input.
        # - "independet" will be treated an an individual tf.Variable, trainable object. It is still trained in tensorflow, but is constant for all subjects
        # - "fixed" will be treated as a non-trainable tf.Variable. Basically just a known constant.
        # - "manual" will be treated as a non-trainable tf.Variable, but its value will be eventually updated manually using user provided functions (useful in cases where closed forms can be obtained)
        # - "dependent" will be treated simply as a deterministic function of other parameters and will be updated after training

        self.nn_pars = []
        self.independent_pars = []
        self.fixed_pars = []
        self.manual_pars = []
        for parameter in self.parameters:
            par = self.parameters[parameter]
            if(par["par_type"] == "nn"):
                self.nn_pars.append( parameter )
            elif(par["par_type"] == "independent"):
                self.independent_pars.append( parameter )
            elif(par["par_type"] == "fixed"):
                self.fixed_pars.append( parameter )
            elif(par["par_type"] == "manual"):
                self.manual_pars.append( parameter )
            else:
                raise Exception("Invalid parameter {} type: {}".format(parameter, par["par_type"]))

        # If at least one parameter is to be modeled as a neural network output, define its architecture here
        if( len(self.nn_pars) > 0 ):
            # Try to implement a function that, given a model, clone its architecture
            # clone_architecture(given_model)

            # The user provided architecture must output an array with the exact same shape as the concatenation of all "nn" parameters.
            # For example, if theta (nn) is a single constant and alpha (nn) is an array with 3 values, the expected output for the neural network
            # component is 4. For that, these parameters must be at most one dimensional arrays. (No matrix, or more complex structured parameters!)
            initializer = initializers.GlorotNormal(seed = seed)
            self.dense1 = keras.layers.Dense(units = 16, activation = "gelu", kernel_initializer = initializer, dtype = tf.float32, name = "dense1")
            self.dense2 = keras.layers.Dense(units = 6, kernel_initializer = initializer, dtype = tf.float32, activation = None, use_bias = False, name = "output")

        # Dictionary with all parameters that are its individual weights
        self.model_variables = {}

        # Include variables that do not depend on the variables x, but are still trained by tensorflow
        for parameter in self.independent_pars:
            par = self.parameters[parameter]

            # If shape is None, set it to ()
            if(par["shape"] is None):
                par["shape"] = ()

            raw_init = par["link_inv"]( par["init"] )

            # Name for the new, transformed parameter
            raw_parameter = "raw_" + parameter
            self.model_variables[raw_parameter] = self.add_weight(
                name = raw_parameter,
                shape = par["shape"],
                initializer = keras.initializers.Constant( raw_init ),
                trainable = True,
                dtype = tf.float32
            )

        # Include variables that are not trained by tensorflow (known, fixed constants or manual trained variables)
        for parameter in np.concatenate([self.fixed_pars, self.manual_pars]):
            par = self.parameters[parameter]
            self.model_variables[parameter] = self.add_weight(
                name = parameter,
                shape = par["shape"],
                initializer = keras.initializers.Constant( par["initializer"] ),
                trainable = False,
                dtype = tf.float32
            )

        # Organize trainable variables information, so each variable can get mapped to an index in the self.trainable_variables and its gradients
        self.vars_to_index = {}
        # Before we build the model, the only variables that appear in here are the ones corresponding to "independent" parameters
        for i, var in enumerate(self.trainable_variables):
            # From the variable path, get its name (raw_<variable>)
            var_name = var.path.split("/")[-1]
            # Save its corresponding index
            self.vars_to_index[var_name] = i

        nn_par_index = 0
        # We must also include in this list the indices for "nn" parameters
        for i, parameter in enumerate(self.nn_pars):
            par = self.parameters[ parameter ]
            if(par["shape"] is None):
                par_shape = 1
            else:
                # The parameter must be at most a 1-dimensional array, whose indices will be saved for future location in the neural network output results
                par_shape = par["shape"]
            
            self.vars_to_index["raw_" + parameter] = tf.constant( np.arange(nn_par_index, nn_par_index+par_shape), dtype = tf.int32 )
            nn_par_index += par_shape


        # Once the entire structure has been defined, force the model to build all the weights properly
        dummy_input = keras.Input(input_dim)
        self(dummy_input)


        # In the future, it might be interesting to allow the user to specify an optimizer for each single parameter in the model.
        # For now, they will specify one for the independent parameters and other for the neural network weights

        # Not that the model is built and all the trainable variables instantiated, we define the gradient variables
        self.gradient_accumulation_independent_pars = [
            tf.Variable(tf.zeros_like(v, dtype = tf.float32), trainable = False) for v in self.trainable_variables[ :len(self.independent_pars) ]
        ]

        # The gradient values for the neural network component always comes right after the weights for the independent parameters
        self.gradient_accumulation_nn = [
            tf.Variable(tf.zeros_like(v, dtype = tf.float32), trainable = False) for v in self.trainable_variables[ len(self.independent_pars): ]
        ]


    def call(self, x_input):
        x = self.dense1(x_input)
        return self.dense2(x)

    def get_variable(self, parameter, nn_output = None):
        """
            Once that all variables have been properly defined and mapped, this method uses their proper link functions to transform from
            the variables 'raw' state into their proper values used in the likelihood.

            If nn_output is passed, we automatically assume that the parameter is an output from the neural network and proceed by taking its
            value differently than if it was an independent parameter.
        """
        # Get the raw name for that parameter
        raw_parameter = "raw_" + parameter
        # Filter the desired parameter from the list
        par = self.parameters[parameter]

        # If nn_output is None, assume the parameter is independent from the data x and get it directly as a transformed weight
        if(nn_output is None):            
            # Get the transformed parameter from its raw version, considering its proper link function
            return par["link"]( self.model_variables[raw_parameter] )
        
        # If nn_output is not None, assume the parameter came as a neural network output and return it from its positions in the output
        return par["link"]( tf.gather(nn_output, self.vars_to_index[raw_parameter], axis = 1) )

    def loss_func(self, variables, nn_output, t, delta):
        """
            This is an example of a correctly defined loss function.

            - eta: Corresponds to the output of the neural network with input x - Corresponds to the 
        """

        theta = self.get_variable("theta", nn_output)
        tau = self.get_variable("theta", nn_output)

        alpha = self.get_variable("alpha")
        gamma = self.get_variable("gamma")
        phi1 = self.get_variable("phi1")
        phi2 = self.get_variable("phi2")

        # log_f0 = tf.math.log(phi1) + (phi1-1)*tf.math.log(y) + phi2 - tf.math.exp(phi2) * tf.math.pow(y, phi1)
        # F0 = 1 - tf.math.exp(-tf.math.pow(y, phi1) * tf.math.exp(phi2))
        # laplace_transform_term = 1 + gamma*theta*F0/(1-alpha)

        # loss_weights = delta*eta + delta*log_f0 + (1-alpha)/(alpha*gamma)*(1 - tf.math.pow(laplace_transform_term, alpha)) + (alpha-1)*delta*tf.math.log(laplace_transform_term)
        # loss_weights_mean = -tf.math.reduce_mean(loss_weights)
        
        # return loss_weights_mean

        return tf.constant( (phi2 - 3)**2 )

    def train_step(self, data):
        """
            Called by each batch in order to evaluate the loglikelihood and accumulate the parameters gradients using training data.
        """
        x, t, delta = data

        self.n_acum_step.assign_add(1)
        with tf.GradientTape() as tape:
            nn_output = self(x, training = True)
            likelihood_loss = self.loss_func(variables = self.model_variables, nn_output = nn_output, t = t, delta = delta)

        # The first weights are always destined to the fixed parameters
        # The neural network related weights comes after those in the self.trainable_variables object
        gradients = tape.gradient(likelihood_loss, self.trainable_variables)

        # If the loss does not depend on a specific parameter, its corresponding gradient will be None
        # To avoid crash problems in that case, we simply replace None with a zero like gradient, so those weights do not get updated
        # It is the user's responsibility to build a loss that depends on all the trainable parameters, but we allow that to happen in this case
        # for generality and to avoid unneccessary crashes when testing new models
        gradients = [
            g if g is not None else tf.zeros_like(v)
            for g, v in zip(gradients, self.trainable_variables)
        ]

        independent_gradients = gradients[ :len(self.independent_pars) ]
        nn_gradients = gradients[ len(self.independent_pars): ]

        for i in range( len(self.gradient_accumulation_independent_pars) ):
            self.gradient_accumulation_independent_pars[i].assign_add( independent_gradients[i] )

        for i in range( len(self.gradient_accumulation_nn) ):
            self.gradient_accumulation_nn[i].assign_add( nn_gradients[i] )

        tf.cond(tf.equal(self.n_acum_step, self.gradient_accumulation_steps), self.apply_accumulated_gradients, lambda: None)

        return {"likelihood_loss": likelihood_loss}

    def apply_accumulated_gradients(self):
        # ----------------------------------- Independent parameters component -----------------------------------
        # Apply the accumulated gradients to the trainable variables
        self.optimizer_independent_pars.apply_gradients( zip(self.gradient_accumulation_independent_pars, self.trainable_variables[ :len(self.independent_pars) ]) )
        # Resets all the cumulated gradients to zero
        for i in range(len(self.gradient_accumulation_nn)):
            self.gradient_accumulation_independent_pars[i].assign(tf.zeros_like(self.trainable_variables[ :len(self.independent_pars) ][i], dtype = tf.float32))
        
        # ----------------------------------- Neural network component -----------------------------------
        self.optimizer_nn.apply_gradients( zip(self.gradient_accumulation_nn, self.trainable_variables[ len(self.independent_pars): ]) )
        # Resets all the cumulated gradients to zero
        for i in range(len(self.gradient_accumulation_nn)):
            self.gradient_accumulation_nn[i].assign(tf.zeros_like(self.trainable_variables[ len(self.independent_pars): ][i], dtype = tf.float32))

        # Reset the gradient accumulation steps counter to zero
        self.n_acum_step.assign(0)

    def compile_model(self, optimizer_independent_pars, optimizer_nn, run_eagerly):
        """
            Defines the configuration for the model, such as batch size, training mode, early stopping.
        """
        # optimizers.Adam(learning_rate = learning_rate, gradient_accumulation_steps = None),
        self.optimizer_independent_pars = optimizer_independent_pars
        self.optimizer_nn = optimizer_nn
        self.compile(
            run_eagerly = run_eagerly
        )

    def train_model(self, x, t, delta,
                    epochs, shuffle = True,
                    optimizer_independent_pars = optimizers.Adam(learning_rate = 0.001),
                    optimizer_nn = optimizers.Adam(learning_rate = 0.001),
                    train_batch_size = None, val_batch_size = None,
                    buffer_size = 4096, gradient_accumulation_steps = None,
                    early_stopping = True, early_stopping_min_delta = 0.0, early_stopping_patience = 10, early_stopping_warmup = 0,
                    run_eagerly = True, verbose = 1):
        """
            This is the function that start the training.
        """
        
        # Pass the input variables to tensorflow default types
        x = tf.cast(x, dtype = tf.float32)
        t = tf.cast(t, dtype = tf.float32)
        delta = tf.cast(delta, dtype = tf.float32)
        
        # If input is a vector, transform it into a column
        if(len(x.shape) == 1):
            x = tf.reshape( x, shape = (len(x), 1) )
        if(len(t.shape) == 1):
            t = tf.reshape( t, shape = (len(t), 1) )
        if(len(delta.shape) == 1):
            delta = tf.reshape( delta, shape = (len(delta), 1) )

        # Salva os dados originais
        self.x = x
        self.t = t
        self.delta = delta

        # If no validation step should be taken, training data is the same as validation data
        self.x_train, self.t_train, self.delta_train = self.x, self.t, self.delta
        self.x_val, self.t_val, self.delta_val = self.x, self.t, self.delta
        
        # Declara os callbacks do modelo
        self.callbacks = [ ]
        
        if(verbose >= 1):
            self.callbacks.append( TqdmCallback(verbose = 0, position = 0, leave = True) )
        
        if(early_stopping):
            # Avoids overfitting and speeds training
            metric = "likelihood_loss"
            es = keras.callbacks.EarlyStopping(monitor = metric,
                                               mode = "min",
                                               start_from_epoch = early_stopping_warmup,
                                               min_delta = early_stopping_min_delta,
                                               patience = early_stopping_patience,
                                               restore_best_weights = True)
            self.callbacks.append(es)

        # If batch_size is unspecified, set it to be the training size. Note that decreasing the batch size to smaller values, such as 500 for example, has previously lead the
        # model to converge too early, leading to a lot of time of investigation. When dealing with neural networks in the statistical models context, we recommend to use a single
        # batch in training. Alternatives in the case that the sample is too big might be to consider a "gradient accumulation" approach.
        self.train_batch_size = train_batch_size
        if(self.train_batch_size is None):
            self.train_batch_size = self.x_train.shape[0]

        self.val_batch_size = val_batch_size
        if(self.val_batch_size is None):
            self.val_batch_size = self.x_val.shape[0]
        
        self.gradient_accumulation_steps = gradient_accumulation_steps
        if(self.gradient_accumulation_steps is None):
            # The number of batches until the actual weights update (we ensure that the weights are updated only once per epoch, even though we might have multiple batches)
            self.gradient_accumulation_steps = int(np.ceil( self.x_train.shape[0] / self.train_batch_size ))

        self.compile_model(optimizer_independent_pars = optimizer_independent_pars, optimizer_nn = optimizer_nn, run_eagerly = run_eagerly)

        # Create the training dataset
        self.buffer_size = buffer_size
        train_dataset = tf.data.Dataset.from_tensor_slices((self.x_train, self.t_train, self.delta_train))
        train_dataset = train_dataset.shuffle(buffer_size = self.buffer_size).batch(self.train_batch_size).prefetch(tf.data.AUTOTUNE)
        
        self.fit(
            train_dataset,
            epochs = epochs,
            verbose = 0,
            callbacks = self.callbacks,
            batch_size = self.train_batch_size,
            shuffle = shuffle
        )
        

In [249]:
model = FrailtyModelNN(None, None, input_dim = (None, 1), seed = 10)

np.random.seed(10)
x = np.random.normal(size = 10)
t = np.random.exponential(size = 10)
delta = np.random.binomial(n = 1, p = 0.5, size = 10)

model.train_model(x, t, delta,
                  epochs = 100, shuffle = True,
                  optimizer_independent_pars = optimizers.Adam(learning_rate = 0.01),
                  optimizer_nn = optimizers.Adam(learning_rate = 0.01),
                  train_batch_size = None, val_batch_size = None,
                  buffer_size = 4096, gradient_accumulation_steps = None,
                  run_eagerly = True, verbose = 1)

0epoch [00:00, ?epoch/s]

In [250]:
model.get_weights()

[0.0,
 0.0,
 0.0,
 1.3184538,
 array([[ 0.05222978, -0.42660013,  0.15580629, -0.03946069,  0.4755298 ,
          0.48080763,  0.4745159 , -0.23191053,  0.25770208,  0.45803365,
          0.13718636,  0.53897285,  0.42468554,  0.10244699,  0.2735144 ,
         -0.3586141 ]], dtype=float32),
 array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       dtype=float32),
 array([[ 4.59125452e-02, -3.75002444e-01,  1.36961371e-01,
         -3.46878804e-02,  4.18014020e-01,  4.22653496e-01],
        [ 4.17122751e-01, -2.03860730e-01,  2.26532787e-01,
          4.02634084e-01,  1.20593548e-01,  4.73783612e-01],
        [ 3.73319447e-01,  9.00559351e-02,  2.40432560e-01,
         -3.15239400e-01,  4.11400080e-01, -4.99688447e-01],
        [-1.54003322e-01,  3.98376286e-01, -6.77022561e-02,
         -6.87515596e-03,  2.22874284e-02,  6.38603926e-01],
        [-2.10074767e-01,  5.15375519e-03, -6.19977772e-01,
          4.71546680e-01,  2.97671575e-02,  3.47635865e-01],
       

In [203]:
tf.math.exp( tf.gather( model.predict( x[:,None] ), [0], axis = 1 ) )

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step


<tf.Tensor: shape=(10, 1), dtype=float32, numpy=
array([[0.4917291 ],
       [0.71194553],
       [1.4696095 ],
       [1.0033522 ],
       [0.7488525 ],
       [1.2633488 ],
       [0.8924959 ],
       [0.9562993 ],
       [0.99828273],
       [1.0688634 ]], dtype=float32)>

In [None]:
model = FrailtyModelNN(None, None)

model.define_structure()

x = np.random.normal(size = 10)
t = np.random.exponential(size = 10)
delta = np.random.binomial(n = 1, p = 0.5, size = 10)

model.train_model(x, t, delta,
                  epochs = 10, shuffle = True,
                  optimizer = optimizers.Adam(learning_rate = 0.001),
                  train_batch_size = None, val_batch_size = None,
                  buffer_size = 4096, gradient_accumulation_steps = None,
                  run_eagerly = True, verbose = 1)

0epoch [00:00, ?epoch/s]

raw_theta: [[-0.00948396]
 [-0.0276411 ]
 [ 0.18126766]
 [ 0.09189229]
 [ 0.17733589]
 [-0.04042894]
 [ 0.00667084]
 [ 0.12130801]
 [ 0.37508252]
 [-0.03350516]]
raw_tau: [[ 0.37486914  0.3583766  -0.00391461  0.11909363  0.413968  ]
 [ 0.05663535  0.01132016  0.04765547  0.05542642  0.06414704]
 [-0.12494157  0.17425172 -0.30946922 -0.29076833 -0.13145213]
 [-0.08244592  0.06923717 -0.15583271 -0.15276313 -0.08781268]
 [-0.12342028  0.1692943  -0.30262083 -0.28478065 -0.12984766]
 [ 0.13562647  0.06976443  0.0701836   0.0966004   0.15561514]
 [-0.00924998  0.00173164 -0.01137376 -0.01205722 -0.01022647]
 [-0.09860086  0.10167401 -0.20598343 -0.19873121 -0.10428607]
 [-0.1791718   0.436904   -0.66195965 -0.58252394 -0.20213403]
 [ 0.2389637   0.18477742  0.05500455  0.11640279  0.27284223]]
alpha: 0.5
gamma: 1.0
phi1: 1.0
phi2: 0.0


ValueError: Attempt to convert a value ({'raw_alpha': <Variable path=frailty_model_nn_7/raw_alpha, shape=(), dtype=float32, value=0.0>, 'raw_gamma': <Variable path=frailty_model_nn_7/raw_gamma, shape=(), dtype=float32, value=0.0>, 'raw_phi1': <Variable path=frailty_model_nn_7/raw_phi1, shape=(), dtype=float32, value=0.0>, 'raw_phi2': <Variable path=frailty_model_nn_7/raw_phi2, shape=(), dtype=float32, value=0.0>}) with an unsupported type (<class 'keras.src.utils.tracking.TrackedDict'>) to a Tensor.

In [87]:
model.trainable_variables[0].path.split("/")[-1]

'raw_alpha'

In [53]:
model.predict(x[:,None])

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step


array([[-1.7739912 ],
       [-0.86790764],
       [-0.7947914 ],
       [ 0.41015804],
       [ 0.97464114],
       [ 0.04176952],
       [ 1.498434  ],
       [-0.72059375],
       [-0.85633576],
       [ 0.35985187]], dtype=float32)