In [1]:
import torch
import keras
!pip install dime-torch

import numpy as np

# Out of Distribution Detection Examples and Strategies

## Detecting covariate shifts with Random Forests

One example from [Deep learning for coders with fastai & pytorch](https://www.amazon.com/Deep-Learning-Coders-fastai-PyTorch/dp/1492045527) is how to cleverly use a random forest to detect the possibility of having OOD data from covariate shifts.

Strategy:
1. Create your test/train split. Sometimes, especially for forecasing things in the future a good train/test split is NOT a random mix and division - instead, a training set may occur earlier in time, while the test occurs later in the future.  It is important to check if there are covariate shifts occuring as things change over time.
2. Use a RF to predict if a row comes from the training set or test set. If the model performs well, you have some detectable shift in your data - in this example, premised on shifts over time.
3. Examine the RF feature importances to explain what is changing over time.

Reasoning:
* RFs are fast, easy to train and are robust against most hyperparameter choices so your results won't be very dependent on selection of a bad model.
* Your data does need to be tabular and RF-friendly (i.e., hopefully no very high cardinality categorical features)

# OOD for Neural Networks

Most NN are applicable under "closed world" settings, that is, they are expected to see the same classes at test time as they were trained on.  It is possible to modify the training of the network to be able to detect distribution shifts, and thus predict if the input is something novel, but this requires re-training models that have already been trained. It also complicates the implementation.  Here, I discuss 3 methods to detect OOD samples that do not require re-training (post-hoc methods).

If you have a model and an OOD detector trained, you can use them in tandem like this:

In [None]:
class OpenWorldClassifier:
    def __init__(self, closed_world_model, ood_detector):
        self.model = closed_world_model
        self.ood_detector = ood_detector
        return

    def predict(self, X):
        X = np.asarray(X)

        # 1. Check which samples are 'in-distribution'
        in_distribution = self.ood_detector(X)

        # 2. Default to 'UNKNOWN'
        predictions = np.array(['UNKNOWN'] * X.shape[0])

        # 3. If ID, use model
        predictions[np.where(in_distribution)] = self.model.predict(X[in_distribution])

        return predictions

* Softmax uses the raw output of the model

* Energy-based OOD uses the output before the softmax operation (logits in the last layer)

* DIME uses the inputs to the final dense layer, themselves, before the final layer operates.

## Softmax Confidence Scores

This was originally proposed as a baseline in a landmark paper by [Hendrycks and Gimpel](https://arxiv.org/abs/1610.02136).  This is based on the observation that "correctly classified examples tend to have greater maximum softmax probabilities than erroneously classified and out-of-distribution examples, allowing for their detection."

Assuming you have a network ending in a dense layer with a softmax activation, then:

A sample that the network recognizes (in distribution, ID) you might see logits like

```code
logits = [0.1, 0.2, 5.0, 0.1]
probs = softmax(logits) = [0.00727829, 0.00804375, 0.97739967, 0.00727829]
```

There is clearly a single class whose probability is much greater than the others.  Conversely, for a sample the network does not recognize you might see something like this instead.

```code
logits = [0.1, 0.2, 0.3, 0.1]
probs = softmax(logits) = [0.23112977, 0.25543791, 0.28230254, 0.23112977]
```

These are all very similar because the network is not strongly "triggering" on one of them.

Note that the value of a probability alone is not very useful, since these are not "real" probabilities, just softmax outputs.  However, when taken as a whole over the entire training set, H&G noticed that the max probability of a class tends to be higher for things the network recognizes. You can then define a lower bound on probability (softmax output) to use as a cutoff enabling IN/OUT predictions.

This is just a general trend H&G noticed, but it turns out it gives pretty reasonable performance so it serves as a good baseline.

In [5]:
class SoftmaxDetector:
    """
    Use softmax confidence score to detect inliers.

    In this implementation I am using alpha as the type I error rate.  The
    cutoff is determined such that only alpha*100 per cent of the training
    data falls below the cutoff and would be expected to be mistakenly
    identified as being OOD.

    Based on https://arxiv.org/pdf/1610.02136.pdf
    """
    def __init__(self, model, alpha=0.05):
        """
        Parameters
        ----------
        model : keras.models.Model
            Pre-trained model.
        alpha : float
            Type I error rate.
        """
        assert 0.0 < alpha < 1.0, 'alpha should be between 0 and 1'
        self.set_params(**{'model': model, 'alpha':alpha})
        return

    def set_params(self, **parameters):
        """Set parameters; for consistency with scikit-learn's estimator API."""
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

    def get_params(self, deep=True):
        """Get parameters; for consistency with scikit-learn's estimator API."""
        return {
            "model": self.model,
            "alpha": self.alpha
        }

    @staticmethod
    def softmax_score(probs):
        return np.max(np.asarray(probs), axis=1)

    def fit(self,
            X_train,
           ):
        """
        Parameters
        ----------
        X_train : ndarray
            Training data.
        """
        assert isinstance(self.model.layers[-1], keras.layers.Dense), 'model must end with a Dense layer'

        self.score_crit = np.percentile( # Score below this will be outlier
            self.softmax_score(self.model.predict(X_train)),
            self.alpha*100
        )

        return self

    def predict(self, X):
        """Predict if samples belong to the known distribution (inlier = True)."""
        return self.softmax_score(self.model.predict(X)) > self.score_crit

Consider visualizing your results to see how well this works:

~~~code
train_score = softmax_score(model.predict(X_train))
test_score = softmax_score(model.predict(X_test))
alternatives_score = softmax_score(model.predict(X_alternatives))

fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(6,4))

ax.hist(train_score, bins=25, label='Train')
ax.hist(test_score, bins=25, label='Test')
ax.hist(alternatives_score, bins=25, label='Alternative Set')

ax.set_xlabel('Softmax Confidence Score')
ax.legend(loc='best')
~~~

Here is some pseudocode on how to use this:

```code
model = ... # Load saved model or train from scratch
ood = SoftmaxDetector(model=model, alpha=0.05).fit(X_train)

# Hopefully these are close to 1-alpha
acc_test = np.sum(ood.predict(X_test)) / len(X_test)
acc_train = np.sum(ood.predict(X_train)) / len(X_train)

# Hopefully this is close to 1!
1.0 - np.sum(ood.predict(X_alternatives)) / len(X_alternatives)
```

## Energy-based OOD


[Liu et al.](https://arxiv.org/abs/2010.03759) showed that the softmax condfidence scores are not linearly aligned with the probability density of the input, whereas an alternative "energy score" is.  This uses the logits that would be used to compute the softmax probabilities as inputs to instead compute a "free energy" resembling a standard thermodynamic equation.

$E = -T \times {\rm log} \sum_{i=1}^{N_{class}} {\rm exp}( {\rm logit}_i / T)$

They proposed this as an alternative and showed that ID samples have a lower energy scores, while OOD samples tend to have a higher score.  You can then define a threshold energy that serves as a cutoff similar to the softmax score cutoff.

There is also a "temperature" hyperparmeter that can be adjusted in this method, but it is typically set to 1.

In [None]:
class EnergyBasedDetector:
    """
    Use energy-based score to detect inliers.

    Based on https://proceedings.neurips.cc/paper/2020/file/f5496252609c43eb8a3d147ab9b9c006-Paper.pdf
    """
    def __init__(self, model, alpha=0.05, T=1.0):
        """
        Parameters
        ----------
        model : keras.models.Model
            Pre-trained model.
        alpha : float
            Type I error rate.
        T : float
            Temperature.
        """
        assert 0.0 < alpha < 1.0, 'alpha should be between 0 and 1'
        assert T > 0.0, 'T must be positive'
        self.set_params(**{'model': model, 'alpha':alpha, 'T':T})
        return

    def set_params(self, **parameters):
        """Set parameters; for consistency with scikit-learn's estimator API."""
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

    def get_params(self, deep=True):
        """Get parameters; for consistency with scikit-learn's estimator API."""
        return {
            "model": self.model,
            "alpha": self.alpha,
            "T": self.T
        }

    @staticmethod
    def energy(logits, T):
        return -T * scipy.special.logsumexp(np.asarray(logits)/T, axis=1)

    def model_predict_(self, X):
        assert isinstance(self.model.layers[-1], keras.layers.Dense), 'model must end with a Dense layer'
        last_layer_act = self.model.layers[-1].activation
        self.model.layers[-1].activation = None # To get just the logits
        logits = self.model.predict(X)
        self.model.layers[-1].activation = last_layer_act

        return self.energy(logits, self.T)

    def fit(self,
            X_train,
           ):
        """
        Parameters
        ----------
        X_train : ndarray
            Training data.
        """
        energy_train = self.model_predict_(X_train)
        self.e_crit = np.percentile( # Energy above this will be an outlier
            energy_train,
            (1.0-self.alpha)*100
        )
        return self

    def predict(self, X):
        """Predict if samples belong to the known distribution (inlier = True)."""
        return self.model_predict_(X) < self.e_crit

Consider visualizing your results to see how temperature affects the separation.

~~~code
# Load Model
model = ... # Load or train model
model.layers[-1].activation = None # Just return the logits

fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(12,8))
for ax, t in zip(axes.ravel(), [0.01, 0.1, 1.0, 10.0]):
  train_score = energy(model.predict(X_train), T=t)
  test_score = energy(model.predict(X_test), T=t)
  alternatives_score = energy(model.predict(X_alternatives), T=t)

  ax.hist(train_score, bins=25, label='Train')
  ax.hist(test_score, bins=25, label='Test')
  ax.hist(alternatives_score, bins=25, label='Alternative Set')

  ax.set_title('T={}'.format(t))
  ax.set_xlabel('Energy Score')
  ax.legend(loc='best')
~~~

Here is some pseudocode on how to use this:

```code
model = ... # Load saved model or train from scratch
ood = EnergyBasedDetector(
    model=model,
    alpha=0.05,
    T=1.0
).fit(X_train)

# Hopefully these are close to 1-alpha
acc_test = np.sum(ood.predict(X_test)) / len(X_test)
acc_train = np.sum(ood.predict(X_train)) / len(X_train)

# Hopefully this is close to 1!
1.0 - np.sum(ood.predict(X_alternatives)) / len(X_alternatives)
```

## DIME

DIME stands for [distance to modelled embedding](https://arxiv.org/abs/2108.10673).  The idea is the cut the "head" off a NN and use the downstream output, just before the final softmax layer, as "feature space".  This space is modelled as a hyperplane using truncated SVD in an unsupervised fashion.

$\Phi_{i,X} \approx U_k \Sigma_k V_k^T$

If $k$ is equal to the rank of $\Phi_{i,X}$, the output of the NN after layer $i$ given NN input data $X$, then there is no compression.  In practice, $k$ may be selected as a hyperparameter where $k_{\rm max} = {\rm min}(n, p)$ if $\Phi_{i, X}$ has the shape (n,p).  A critical distance from this manifold can then be determined and serve as a simple discriminator between ID and OOD samples.

$d = \sqrt{(\phi - \hat{\phi})^2}$ is the reconstruction residual distance of the linear approximation:

$\hat{\phi} = {\rm proj}_{V_k} \phi $.

This is similar to certain forms of SIMCA, though recent versions use a linear combination of reconstruction and the "distance within" the hyperplane.


In [None]:
class DIMEDetector:
    """
    Use DIME to determine if a point is an inlier.

    Based on https://arxiv.org/pdf/2108.10673.pdf
    """
    def __init__(self, k, alpha=0.05):
        """
        Parameters
        ----------
        k : int
            Dimensionality of the hyperplane.
        alpha : float
            Type I error rate.
        """
        assert 0.0 < alpha < 1.0, 'alpha should be between 0 and 1'
        self.set_params(**{'k':k, 'alpha':alpha})
        return

    def set_params(self, **parameters):
        """Set parameters; for consistency with scikit-learn's estimator API."""
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

    def get_params(self, deep=True):
        """Get parameters; for consistency with scikit-learn's estimator API."""
        return {
            "k": self.k,
            "alpha": self.alpha
        }

    def fit(self,
            X_train,
           ):
        """
        Parameters
        ----------
        X_train : ndarray
            Training data - this should be a featurized version of the model training data.
            This will be converted to a torch.tensor() internally.
        """
        # Fit DIME
        self.dime = DIME(
            explained_variance_threshold=self.k,
            n_percentiles=10000
        ).fit(
            torch.tensor(X_train),
            calibrate_against_trainingset=True,
        )

        # Assume P_DIME = 1-alpha
        a = (self.dime._histogram_percentiles.numpy() - (1.0-self.alpha)*100)**2
        idx = np.where(np.min(a) == a)[0][0]
        self.d_crit = self.dime._d_from_histogram[idx+1].numpy()
        return self

    def predict(self, X):
        """Predict if samples belong to the known distribution (inlier = True)."""

        return self.dime.distance_to_hyperplane(
            torch.tensor(X)
        ).numpy() < self.d_crit

Consider visualizing your results to see how the model dimensionality affects the separation.

~~~code
# Load Model
model = ... # Load or train model

# Remove and create new model to output the embeddings
feature_extractor = keras.Sequential(
    model.layers[:-1]
)

X_train_feat = feature_extractor.predict(X_train)
X_test_feat = feature_extractor.predict(X_test)
X_alternatives_feat = feature_extractor.predict(X_alternatives)

fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(12,8))

x_train = torch.tensor(X_train_feat)  
x_test = torch.tensor(X_test_feat)  
x_alternatives = torch.tensor(X_alternatives_feat)

def plot_dime_hist(k, x_train, x_test, x_alternatives, ax=None):
    modelled_embedding = DIME(explained_variance_threshold=k).fit(x_train)

    if ax is None:
        fig = plt.figure()
        ax = fig.gca()
    
    ax.hist(modelled_embedding.distance_to_hyperplane(x_train, return_probabilities=False).numpy(), bins=25, label='Train')
    ax.hist(modelled_embedding.distance_to_hyperplane(x_test, return_probabilities=False).numpy(), bins=25, label='Test')

    ax.hist(modelled_embedding.distance_to_hyperplane(x_alternatives, return_probabilities=False).numpy(), bins=25,
            label='Alternative Set')
    ax.legend(loc='best')
    
    return ax

for k, ax in zip([10, 20, 50, 100], axes.ravel()):
    plot_dime_hist(k, x_train, x_test, x_alternatives, ax=ax)
    ax.set_xlabel('Distance to Hyperplane')
    ax.set_title('k={}'.format(k))
    ax.legend(loc='best')
plt.tight_layout()
~~~

Here is some pseudocode on how to use this:

```code
model = ... # Load saved model or train from scratch

# Remove and create new model to output the embeddings.
# This is assuming the final layer is the dense layer with a softmax activation.
feature_extractor = keras.Sequential(
    model.layers[:-1]
)

# Featurize the data
X_train_feat = feature_extractor.predict(X_train)
X_test_feat = feature_extractor.predict(X_test)
X_chall_train_feat = feature_extractor.predict(X_chall_train)
X_chall_test_feat = feature_extractor.predict(X_chall_test)

ood = DIMEDetector(
    k=20,
    alpha=0.05,
).fit(X_train_feat)

# Hopefully these are close to 1-alpha
acc_test = np.sum(ood.predict(X_test_feat)) / len(X_test_feat)
acc_train = np.sum(ood.predict(X_train_feat)) / len(X_train_feat)

# Hopefully this is close to 1!
1.0 - np.sum(ood.predict(X_alternatives_feat)) / len(X_alternatives_feat)
```