In [None]:
from typing import Any, Dict, Iterable, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf

# Generate data

In [None]:
N_COVAR = 128

In [None]:
def gen_data(
  n: int,
  base_rate=0.50,
  cens_prop=0.20,
  n_covar=N_COVAR,
) -> Dict[str, np.ndarray]:
  """Generate data.
  
  Args:
    n: Sample size.
    base_rate: Base event rate.
    cens_prop: Expected censoring proportion.
    n_covar: Number of covariates.
    n_freq: Number of frequencies.
  
  """

  # Covariates.
  x = np.random.rand(n, n_covar)
  
  # Linear predictor.
  coef = np.random.randn(n_covar)
  eta = np.dot(x, coef)
  eta = (eta - np.mean(eta)) / np.std(eta)

  # Time-to-event.
  event_rate = base_rate * np.exp(eta)
  event_time = np.random.exponential(scale=1/event_rate, size=len(event_rate))

  cens_rate = cens_prop / (1 - cens_prop) * event_rate 
  cens_time = np.random.exponential(scale=1/cens_rate, size=len(cens_rate))

  status = (event_time <= cens_time)
  time = np.where(status, event_time, cens_time)

  # Target matrix.
  y = np.stack((status, time), axis=1)

  # Output.
  return {
    "x": x,
    "risk": event_rate,
    "status": status,
    "time": time,
    "y": y,
  }

In [None]:
def split_data(
  data: Dict[str, np.ndarray],
  train_prop: float = 0.6,
  val_prop: float = 0.2,
) -> Dict[str, np.ndarray]:

  n = len(data["time"])
  test_prop = 1 - (train_prop + val_prop)
  assert test_prop >= 0

  n_train = int(n * train_prop)
  n_val = int(n * val_prop)
  n_test = int(n * test_prop)

  out = {}
  for key in data.keys():
    out[f"train_{key}"] = data[key][:n_train]
    out[f"val_{key}"] = data[key][n_train:(n_train + n_val)]
    out[f"test_{key}"] = data[key][(n_train + n_val):]
  
  return out

## Overall

In [None]:
class PrepData:

  def __init__(
    self,
    n: int,
    base_rate=0.50,
    batch_size=128,
    cens_prop=0.20,
    n_covar=N_COVAR,
    train_prop=0.6,
    val_prop=0.2,
  ) -> None:
    self.data = gen_data(n, base_rate, cens_prop, n_covar)
    self.split_data = split_data(self.data, train_prop, val_prop)
    self.n = n
    self.base_rate = base_rate
    self.cens_prop = cens_prop
    self.n_covar = n_covar
  
  def get_orig_data(self) -> Dict[str, np.ndarray]:
    return self.data
  
  def get_split_data(self) -> Dict[str, np.ndarray]:
    return self.split_data

In [None]:
class DataGenerator:

  def __init__(
    self,
    x: np.ndarray,
    y: np.ndarray,
    batch_size=128,
  ):
    self.n = x.shape[0]
    self.x = x
    self.y = y  
    self.batch_size = batch_size
    self.n_covar = x.shape[1]
    self.steps_per_epoch = self.n // batch_size
   
  def get_batch(
      self, index: np.ndarray) -> Tuple[np.ndarray, Tuple[np.ndarray]]:
      x = self.x[index]
      y = self.y[index]
      return x, y
  
  def generator(self) -> Iterable[Tuple[np.ndarray, Tuple[np.ndarray]]]:
    index = np.arange(self.n)
    for b in range(self.steps_per_epoch):
      start = b * self.batch_size
      idx = index[start:(start + self.batch_size)]
      yield self.get_batch(idx)
  
  def make_dataset(self) -> tf.data.Dataset:
    """Create dataset from generator."""
    ds = tf.data.Dataset.from_generator(
      self.generator,
      output_signature=(
        tf.TensorSpec(shape=(self.batch_size, self.n_covar), dtype=tf.float32),
        tf.TensorSpec(shape=(self.batch_size, 2), dtype=tf.float32)
      )
    )
    return ds
  
  def __call__(self) -> tf.data.Dataset:
    return self.make_dataset()

In [None]:
def prep_datasets(
    split_data: Dict[str, np.ndarray],
    batch_size=128,
  ) -> Dict[str, tf.data.Dataset]:
  sets = ["train", "val", "test"]
  out = {}
  for key in sets:
    x = split_data[f"{key}_x"]
    y = split_data[f"{key}_y"]
    data_fn = DataGenerator(x, y, batch_size)
    ds = data_fn()
    out[f"{key}"] = ds
  return out

# Kaplan-Meier

In [None]:
def kaplan_meier(status: np.ndarray, time: np.ndarray) -> pd.DataFrame:
  """Taublate Kaplan-Meier Curve."""
  
  # Add 0 if not present.
  unique_times = np.sort(np.unique(time))
  if not (0 in unique_times):
    unique_times = np.insert(unique_times, 0, 0)
  n_unique_time = len(unique_times)

  at_risk = []
  events = []
  censorings = []

  for u in unique_times:
    at_risk.append(np.sum(time >= u))

    events.append(np.sum((time == u) & status))
    censorings.append(np.sum((time == u) & ~status))

  out = pd.DataFrame({
    "time": unique_times,
    "n_at_risk": at_risk,
    "n_event": events,
    "n_cens": censorings,
  })
  out["haz"] = out.n_event / out.n_at_risk
  out["surv"] = np.cumprod(1 - out.haz)
  return out  

## Estimator

In [None]:
class KaplanMeier:

  def __init__(self, status: np.ndarray, time: np.ndarray) -> None:
    self.status = status
    self.time = time
    self.km = kaplan_meier(self.status, self.time)
  
  def return_table(self) -> pd.DataFrame:
    return self.km
    
  def __call__(self, x: float) -> float:
    km = self.km
    return km.surv[np.max(np.where(km.time <= x))]

In [None]:
# Fit Kaplan-Meier.
km = KaplanMeier(
  status = np.array([True, False, True, False]),
  time = np.arange(1, 5)
)
km.return_table()

Unnamed: 0,time,n_at_risk,n_event,n_cens,haz,surv
0,0,4,0,0,0.0,1.0
1,1,4,1,0,0.25,0.75
2,2,3,0,1,0.0,0.75
3,3,2,1,0,0.5,0.375
4,4,1,0,1,0.0,0.375


In [None]:
# Evaluate Kaplan-Meier.
km(1.1)

0.75

# C-statistic

* Reference: [On the C-statistics for Evaluating Overall Adequacy of Risk Prediction Procedures with Censored Survival Data](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3079915/).

In [None]:
def cstat_caculation(km, risk, status, tau, time):  
  n = len(risk)
  upper = 0
  lower = 0

  for i in range(n):
    di, ti, ri = status[i], time[i], risk[i]
    if not di:
      continue

    for j in range(n):
      tj, rj = time[j], risk[j]

      if (ti < tj) & (ti < tau):
        p_cens = np.squeeze(km(ti))
        weight = 1 / (p_cens ** 2)
        upper += weight * (ri > rj)
        lower += weight
  
  return upper, lower

In [None]:
class Cstat:
  """Calculate concordance.
  
  Note that the Kaplan-Meier curve of the censoring distribution may be fit
  using different data from that used to calculate the C-statistic.
  
  """

  def __init__(self, status: np.ndarray, time: np.ndarray, tau=None) -> None:
    self.km = KaplanMeier(~status, time)
    if not tau:
      tau = np.percentile(time, q=95)
    self.tau = tau
  
  def __call__(
    self,
    risk: np.ndarray,
    status: np.ndarray,
    time: np.ndarray,
    tau=None
  ) -> float:

    upper, lower = cstat_caculation(self.km, risk, status, self.tau, time)
    return upper / lower if lower > 0 else 0.5

## Testing

In [None]:
data_fn = PrepData(n=100)
data = data_fn.get_split_data()

In [None]:
cstat = Cstat(data["train_status"], data["train_time"])

In [None]:
cstat(data["train_risk"], data["train_status"], data["train_time"])

0.7507647088771979

In [None]:
cstat(data["val_risk"], data["val_status"], data["val_time"])

0.8100961454298465

## Callback

* See [Examples of Keras callback applications](https://www.tensorflow.org/guide/keras/custom_callback#examples_of_keras_callback_applications).

In [None]:
class CstatCallback(tf.keras.callbacks.Callback):

  def __init__(
    self,
    train_status: np.ndarray, 
    train_time: np.ndarray,
    val_status: np.ndarray,
    val_time: np.ndarray,
    val_x: np.ndarray,
    min_epochs=10,
    patience=10,
  ) -> None:
    super(CstatCallback, self).__init__()
    self.cstat = Cstat(train_status, train_time)
    self.val_x = val_x
    self.val_status = val_status
    self.val_time = val_time
  
    self.min_epochs = min_epochs
    self.patience = patience
    self.best_weights = None
    self.best_cstat = 0
    self.wait = 0

    self.history = {"epoch": [], "cstat": []}
  
  def on_epoch_end(self, epoch, logs=None):
    if epoch >= self.min_epochs:

      # Calculate C-statistic.
      risk = self.model.predict(self.val_x)
      risk = np.squeeze(np.array(risk))
      result = self.cstat(risk, self.val_status, self.val_time)
      print(f"Epoch {epoch}, validation cstat: {result:.4f}")
      self.history["epoch"].append(epoch)
      self.history["cstat"].append(result)

      if result > self.wait:
        self.best_cstat = result
        self.best_weights = self.model.get_weights()
        self.wait = 0
      else:
        self.wait += 1
        if self.wait >= self.patience:
          self.stopped_epoch = epoch
          self.model.stop_training = True
          print("Restoring model weights from the end of the best epoch.")
          self.model.set_weights(self.best_weights)
    
  def get_history(self) -> pd.DataFrame:
    return pd.DataFrame(self.history)

# Proportional hazards loss

In [None]:
class CoxLoss(tf.keras.losses.Loss):

  def __init__(self, **kwargs):
    super().__init__(**kwargs) 
  
  def call(
      self,
      y_true: Tuple[tf.Tensor],
      y_pred: tf.Tensor
  ) -> tf.Tensor:
    """Calculate Cox PH Loss.
    
    Args:
      y_true: (status, time).
      y_pred: risk.

    """
    
    # Note: autograph requires unpacking using indices.
    status = tf.cast(y_true[:, 0], dtype=bool)

    # Absent any events in the batch, there is no loss contribution.
    if not tf.math.reduce_any(status):
      return tf.constant(0.0, dtype=tf.float32)

    time = tf.cast(y_true[:, 1], dtype=tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    # assert status.shape == time.shape
    
    # Matrix where `at_risk[i, j] = True` if subject j is at risk
    # at the event time for subject i. 
    at_risk = tf.map_fn(
      lambda x: (time >= x), time, fn_output_signature=bool)
    
    risk_score = tf.squeeze(y_pred)
    risk_score_mat = tf.math.multiply(
        tf.ones((time.shape[0], time.shape[0]), dtype=tf.float32), risk_score)
    
    # Only at-risk subjects contribute to the denominator.
    # Note: logsumexp is implemented manually because tf.reduce_logsumexp
    # gives an error when used with graph execution.
    max_score = tf.reduce_max(risk_score)
    risk_score_mat = tf.subtract(risk_score_mat, max_score)
    risk_sets = tf.ragged.boolean_mask(risk_score_mat, at_risk)
    
    set_exp = tf.math.exp(risk_sets)
    set_sum = tf.reduce_sum(set_exp, axis=1)
    denom = tf.add(tf.math.log(set_sum), max_score)

    # The log-likelihood only increments at event times.
    diff = tf.subtract(risk_score, denom)
    return -1 * tf.reduce_mean(tf.ragged.boolean_mask(diff, status))


## Testing

In [None]:
def logsumexp(x: np.ndarray) -> np.ndarray:
  delta = np.max(x)
  y = x - delta
  return delta + np.log(np.sum(np.exp(y)))

In [None]:
if False:
  time = tf.constant(np.array([1, 2, 3]), dtype=tf.float32)
  status = tf.constant(np.array([True, False, True]), dtype=bool)
  risk_score = tf.constant(np.array([3, 2, 1]), dtype=tf.float32)
  at_risk = tf.map_fn(lambda x: (time >= x), time, dtype=bool)

  # Denominator calculation.
  risk_score_mat = tf.math.multiply(
      tf.ones_like(at_risk, dtype=risk_score.dtype), risk_score)
  denom = tf.reduce_logsumexp(
      tf.ragged.boolean_mask(risk_score_mat, at_risk), axis=1)

  exp_denom = np.array([
      logsumexp([3., 2., 1.]),
      logsumexp([2., 1.]),
      logsumexp([1.])
  ])
  assert np.allclose(denom.numpy(), exp_denom)

  # Overall calculation.
  y_true = (status, time)
  y_pred = risk_score
  loss_fn = CoxLoss()
  obs = loss_fn(y_true, y_pred)
  exp = -1 * np.sum(
      status.numpy() * (risk_score.numpy() - exp_denom)) / np.sum(status.numpy())
  assert np.allclose(obs, exp)

# Modeling

## Model architecture

In [None]:
def linear_model() -> tf.keras.Model:
  input = tf.keras.layers.Input(shape=(N_COVAR,), name="input", dtype=tf.float32)
  output = tf.keras.layers.Dense(1, name="output")(input)
  model = tf.keras.Model(input, output, name="model")
  return model

In [None]:
def dropout_model() -> tf.keras.Model:
  input = tf.keras.layers.Input(shape=(N_COVAR,), name="input")
  h = tf.keras.layers.Dense(
    32,
    activation="relu",
    kernel_regularizer=tf.keras.regularizers.L2(),
    name="dense1"
  )(input)
  h = tf.keras.layers.Dropout(0.50, name="drop")(h)
  h = tf.keras.layers.Dense(
    32,
    activation="relu",
    kernel_regularizer=tf.keras.regularizers.L2(),
    name="dense2"
  )(h)
  output = tf.keras.layers.Dense(1, name="output")(h)
  model = tf.keras.Model(input, output, name="model")
  return model

## Data sets

In [None]:
data_fn = PrepData(n=128 * 10)

## Built-in training

### From tensors

In [None]:
# Model.
model = linear_model()

In [None]:
data = data_fn.get_split_data()
train_x = tf.constant(data["train_x"], dtype=tf.float32)
train_y = tf.constant(data["train_y"], dtype=tf.float32)

In [None]:
cstat_callback = CstatCallback(
  train_status = data["train_status"],
  train_time = data["train_time"],
  val_status = data["val_status"],
  val_time = data["val_time"],
  val_x = data["val_x"],
)

In [None]:
model.compile(
  optimizer=tf.keras.optimizers.Adam(),
  loss=CoxLoss(),
)

In [None]:
history = model.fit(
  x=train_x,
  y=train_y,
  epochs=20,
  callbacks=[cstat_callback],
  verbose=0
)

Epoch 10, validation cstat: 0.6154
Epoch 11, validation cstat: 0.6243
Epoch 12, validation cstat: 0.6317
Epoch 13, validation cstat: 0.6377
Epoch 14, validation cstat: 0.6433
Epoch 15, validation cstat: 0.6483
Epoch 16, validation cstat: 0.6538
Epoch 17, validation cstat: 0.6597
Epoch 18, validation cstat: 0.6634
Epoch 19, validation cstat: 0.6674


In [None]:
cstat_history = cstat_callback.get_history()
cstat_history.head()

Unnamed: 0,epoch,cstat
0,10,0.615362
1,11,0.624278
2,12,0.631735
3,13,0.637717
4,14,0.643315


### From datasets

In [None]:
# Training and validation datasets.
ds = prep_datasets(data_fn.get_split_data())

In [None]:
# Training and validation datasets.
history = model.fit(
  x=ds["train"],
  epochs=2,
  validation_data=ds["val"]
)

Epoch 1/2
Epoch 2/2


In [None]:
# Evaluate model.
model.evaluate(ds["test"])



3.798910140991211