https://towardsdatascience.com/predicting-individual-survival-curves-with-keras-abb1f1f051f

# Predicting individual survival curves with Keras

A Deep Learning adaptation of the Kaplan-Meier estimator for customer lifetime value models

TL;DR Survival analysis models are widely used in different areas ranging from medicine to e-commerce. There is increasing attention on how to develop individual survival functions rather than population ones, mainly with the use of Deep Learning frameworks. This post introduces a Deep Learning adaptation of one of the most common non-parametric approaches for population survival analysis, the Kaplan-Meier estimator.

## Introduction

Both in research and the industry, there is an increasing interest in predicting individual survival functions, i.e. a survival probability function at any given time. Most of the existing methodologies for this task are either parametric or semi-parametric, while very few remain strictly non-parametric.

Some PyTorch implementations of the most popular models based on Deep Learning can be found in the pycox library, while scikit-survival and XGBoost provide other Machine Learning alternatives, like Random Forests and Gradient Boosting, to Survival regression.

We introduce an adaptation of one of the most widely known non-parametric survival analysis methods, the Kaplan-Meier estimator, to predict individual survival functions. We achieve this with a Deep Learning variation of the Multi-Task Logistic Regression (MTLR) and N-MTLR. The main difference of our model is that censored data is manipulated using sample weights and the model’s output at each time period is a multiplication of the previous period’s output and a sigmoid layer.

## Kaplan-Meier estimate

Let be S(t) the probability to live at least t time units, i.e. the survival function:


Which by conditional probabilities it is also:



Where


And its estimator is given by:



Putting it into words, the KM estimator at time t is the KM estimator at time t-1 multiplied by the proportion of individuals that haven’t died during time t among those who are known to have survived up to time t.

## Deep Learning adaptation

Our approach is quite simple:


* We represent the above recursion with a multi-output feed-forward neural network in which each output is the previous output multiplied by a sigmoid layer that represents the probability q(t).

* Censored data is manipulated using sample weights: for each output t, the sample weight is 1 if the individual’s starting date was at least t time periods ago, and 0 otherwise.


How does the code look like?



Of course, the architecture is just a reference. To this model’s fit method should be passed X with shape (n_samples, n_features), y (n_samples, periods), and w (n_samples, periods) as explained above.

## Summary

There are several methods to fit a survival regression model, each with its benefits and drawbacks. In this article, I proposed a very straightforward method that takes advantage of Tensorflow flexibility to use a feed-forward Neural Network to produce individual survival curves without relying on strong assumptions and that is conceptually an adaptation of one of the most common survival analysis models: the Kaplan-Meier estimator.

In [1]:
import numpy as np

import tensorflow as tf

from keras import backend as K 

from typing import Optional
from typing import List
from typing import Tuple
#from typing import Multiply
#from typing import UnitNorm

2024-04-10 15:09:49.209805: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:


class TFModel:

    def __init__(self):
        #self, model, model_dir, train_dataset, eval_dataset,
        #         learning_rate, num_epochs):
        #self.num_epochs = num_epochs
        #self.model_dir = model_dir
        #self.model = model
        #self.train_ds = train_dataset
        
        print("Initializing Tensor Flow Model")
        self._periods = 2

    def build_model(
        self,
        input_shape: int,
        hidden_units: List[int],
        dropout: Optional[float] = None,
        activation: Optional[str] = None,
        kernel_regularizer: Optional[str] = None,
        kernel_constraint: bool = False,
        noise: Optional[float] = None,
        normalization: bool = False,
    ):
        K.clear_session()
        inputs = tf.keras.Input(shape=(input_shape,))
        x = inputs
        for units in hidden_units:
            x = tf.keras.layers.Dense(
                units,
                activation=activation,
                kernel_regularizer=kernel_regularizer,
                kernel_constraint=tf.keras.constraints.UnitNorm() 
                    if kernel_constraint else None,
            )(x)
            x = tf.keras.layers.GaussianNoise(noise)(x) if noise else x
            x = tf.keras.layers.BatchNormalization()(x) if normalization else x
            x = tf.keras.layers.Dropout(dropout)(x) if dropout else x
        outputs = []
        for period in range(self._periods):
            if period == 0:
                o = tf.keras.layers.Dense(
                    1,
                    activation="sigmoid",
                    kernel_regularizer=kernel_regularizer,
                    kernel_constraint=UnitNorm() if kernel_constraint else None,
                )(x)
                outputs.append(o)
                continue
            o = tf.keras.layers.Dense(
                1,
                activation="sigmoid",
                kernel_regularizer=kernel_regularizer,
                kernel_constraint=tf.keras.constraints.UnitNorm() 
                    if kernel_constraint else None,
            )(x)
            o = tf.keras.layers.Multiply()([o, outputs[period - 1]])
            outputs.append(o)
        self.model = tf.keras.Model(inputs=inputs, outputs=outputs)
        print(self.model.summary())

    def fit(
        self,
        X_train: np.ndarray,
        y_train: List[np.ndarray],
        w_train: List[np.ndarray],
        validation_data: Tuple[np.ndarray, List[np.ndarray], List[np.ndarray]],
        epochs: Optional[int] = 100,
        batch_size: Optional[int] = 256,
        patience: Optional[int] = 10,
    ):
        self.model.compile(optimizer="Adam", loss="binary_crossentropy")
        callback = tf.keras.callbacks.EarlyStopping(
            monitor="val_loss", patience=patience
        )
        self.model.fit(
            X_train,
            y_train,
            sample_weight=w_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=validation_data,
            callbacks=[callback],
        )

In [3]:
mytfmodel=TFModel()

Initializing Tensor Flow Model


In [4]:
mytfmodel.build_model(input_shape=50000,hidden_units=[128,64])

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 50000)]              0         []                            
                                                                                                  
 dense (Dense)               (None, 128)                  6400128   ['input_1[0][0]']             
                                                                                                  
 dense_1 (Dense)             (None, 64)                   8256      ['dense[0][0]']               
                                                                                                  
 dense_2 (Dense)             (None, 1)                    65        ['dense_1[0][0]']             
                                                                                              

In [5]:
#mytfmodel.fit()