# Counterfactual explanations

In [1]:
import trustyai

trustyai.init(
    path=[
         "../dep/org/kie/kogito/explainability-core/1.8.0.Final/*",
#         "../dep/org/kie/kogito/explainability-core/1.8.0.Final/explainability-core-2.0.0-SNAPSHOT.jar",
#         "../dep/org/kie/kogito/explainability-core/1.8.0.Final/explainability-core-1.8.0.Final-tests.jar",
        "../dep/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar",
        "../dep/org/apache/commons/commons-lang3/3.12.0/commons-lang3-3.12.0.jar",
        "../dep/org/optaplanner/optaplanner-core/8.8.0.Final/optaplanner-core-8.8.0.Final.jar",
        "../dep/org/apache/commons/commons-math3/3.6.1/commons-math3-3.6.1.jar",
        "../dep/org/kie/kie-api/7.55.0.Final/kie-api-7.55.0.Final.jar",
        "../dep/io/micrometer/micrometer-core/1.6.6/micrometer-core-1.6.6.jar",
    ]
)

## Simple example

We start by defining our black-box model, typically represented by

$$
f(\mathbf{x}) = \mathbf{y}
$$

Where $\mathbf{x}=\{x_1, x_2, \dots,x_m\}$ and $\mathbf{y}=\{y_1, y_2, \dots,y_n\}$.

Our example toy model, in this case, takes an all-numerical input $\mathbf{x}$ and return a $\mathbf{y}$ of either `true` or `false` if the sum of the $\mathbf{x}$ components is within a threshold $\epsilon$ of a point $\mathbf{C}$, that is:

$$
f(\mathbf{x}, \epsilon, \mathbf{C})=\begin{cases}
\text{true},\qquad \text{if}\ \mathbf{C}-\epsilon<\sum_{i=1}^m x_i <\mathbf{C}+\epsilon \\
\text{false},\qquad \text{otherwise}
\end{cases}
$$

This model is provided in the `TestUtils` module. We instantiate with a $\mathbf{C}=500$ and $\epsilon=1.0$.

In [2]:
from trustyai.utils import TestUtils

center = 500.0
epsilon = 1.0

model = TestUtils.getSumThresholdModel(center, epsilon)

Next we need to define a **goal**.
If our model is $f(\mathbf{x'})=\mathbf{y'}$ we are then defining our $\mathbf{y'}$ and the counterfactual result will be the $\mathbf{x'}$ which satisfies $f(\mathbf{x'})=\mathbf{y'}$.

We will define our goal as `true`, that is, the sum is withing the vicinity of a (to be defined) point $\mathbf{C}$. The goal is a list of `Output` which take the following parameters

- The feature name
- The feature type
- The feature value (wrapped in `Value`)
- A confidence threshold, which we will leave at zero (no threshold)

In [3]:
from trustyai.model import Output, Type, Value

goal = [Output("inside", Type.BOOLEAN, Value(True), 0.0)]

We will now define our initial features, $\mathbf{x}$. Each feature can be instantiated by using `FeatureFactory` and in this case we want to use numerical features, so we'll use `FeatureFactory.newNumericalFeature`.

In [4]:
import random
from trustyai.model import FeatureFactory

features = [FeatureFactory.newNumericalFeature(f"x{i+1}", random.random()*10.0) for i in range(4)]

As we can see, the sum of of the features will not be within $\epsilon$ (1.0) of $\mathbf{C}$ (500.0). As such the model prediction will be `false`:

In [5]:
feature_sum = 0.0
for f in features:
    value = f.getValue().asNumber()
    print(f"Feature {f.getName()} has value {value}")
    feature_sum += value
print(f"\nFeatures sum is {feature_sum}")

Feature x1 has value 1.4465457485394606
Feature x2 has value 9.904958794943276
Feature x3 has value 8.632408661102822
Feature x4 has value 2.666374576834393

Features sum is 22.65028778141995


The next step is to specify the **constraints** of the features, i.e. which features can be changed and which should be fixed. Since we want all features to be able to change, we specify `False` for all of them:

In [6]:
constraints = [False] * 4

Finally, we also specify which are the **bounds** for the counterfactual search. Typically this can be set either using domain-specific knowledge or taken from the data. In this case we simply specify an arbitrary (sensible) value, e.g. all the features can vary between `0` and `1000`.

In [7]:
from trustyai.model.domain import NumericalFeatureDomain

feature_boundaries = [NumericalFeatureDomain.create(0.0, 1000.0)] * 4

In order to use the boundaries in the explainer we need to wrap all of them in a `DataDomain` class:

In [8]:
from trustyai.model import DataDomain

data_domain = DataDomain(feature_boundaries)

We can now instantiate the **explainer** itself.

To do so, we will to configure the termination criteria. For this example we will specify that the counterfactual search should only execute a maximum of 10,000 iterations before stopping and returning whatever the best result is so far.

In [9]:
from org.optaplanner.core.config.solver.termination import TerminationConfig
from org.kie.kogito.explainability.local.counterfactual import CounterfactualConfigurationFactory
from java.lang import Long

termination_config = TerminationConfig().withScoreCalculationCountLimit(Long.valueOf(10_000))

solver_config = (
        CounterfactualConfigurationFactory.builder()
        .withTerminationConfig(termination_config)
        .build()
    )

We can can now instantiate the explainer itself using `CounterfactualExplainer` and our `solver_config` configuration.

In [10]:
from org.kie.kogito.explainability.local.counterfactual import CounterfactualExplainer

explainer = CounterfactualExplainer.builder().withSolverConfig(solver_config).build()

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


We will now express the counterfactual problem as defined above.

- `original` represents our $\mathbf{x}$ which know gives a prediction of `False`
- `goals` represents our $\mathbf{y'}$, that is our desired prediction (`True`)
- `domain` repreents the boundaries for the counterfactual search

In [11]:
from trustyai.model import PredictionFeatureDomain, PredictionInput, PredictionOutput

original = PredictionInput(features)
goals = PredictionOutput(goal)
domain = PredictionFeatureDomain(data_domain.getFeatureDomains())

We wrap these quantities in a `CounterfactualPrediction` (the UUID is simply to label the search instance):

In [12]:
import uuid
from trustyai.model import CounterfactualPrediction

prediction = CounterfactualPrediction(original, goals, domain, constraints, None, uuid.uuid4())

We now request the counterfactual $\mathbf{x'}$ which is closest to $\mathbf{x}$ and which satisfies $f(\mathbf{x'}, \epsilon, \mathbf{C})=\mathbf{y'}$:

In [13]:
explanation_async = explainer.explainAsync(prediction, model)

The counterfactual explainer API operates in a asynchronous way, so we need to `.get()` the result:

In [14]:
explanation = explanation_async.get()

We can see that the counterfactual $\mathbf{x'}$

In [15]:
feature_sum = 0.0
for entity in explanation.getEntities():
    print(entity)
    feature_sum += entity.getProposedValue()
    
print(f"\nFeature sum is {feature_sum}")

java.lang.DoubleFeature{value=1.547375925562755, intRangeMinimum=0.0, intRangeMaximum=1000.0, id='x1'}
java.lang.DoubleFeature{value=486.3570827969562, intRangeMinimum=0.0, intRangeMaximum=1000.0, id='x2'}
java.lang.DoubleFeature{value=8.632408661102822, intRangeMinimum=0.0, intRangeMaximum=1000.0, id='x3'}
java.lang.DoubleFeature{value=2.666374576834393, intRangeMinimum=0.0, intRangeMaximum=1000.0, id='x4'}

Feature sum is 499.2032419604562


### Constrained features

As we've seen, it is possible to constraint a specific feature $x_i$ by setting the _constraints_ list corresponding element to `True`.

In this example, we know want to fix $x_1$ and $x_4$. That is, these features should have the same value in the counterfactual $\mathbf{x'}$ as in the original $\mathbf{x}$.

In [16]:
constraints = [True, False, False, True] # x1, x2, x3 and x4

We simply need to wrap the previous quantities with the new constraints:

In [17]:
prediction = CounterfactualPrediction(original, goals, domain, constraints, None, uuid.uuid4())

And request a new counterfactual explanation

In [18]:
explanation = explainer.explainAsync(prediction, model).get()

We can see that $x_1$ and $x_4$ has the same value as the original and the model satisfies the conditions.

In [19]:
print(f"Original x1: {features[0].getValue()}")
print(f"Original x4: {features[3].getValue()}\n")

for entity in explanation.getEntities():
    print(entity)

Original x1: 1.4465457485394606
Original x4: 2.666374576834393

java.lang.DoubleFeature{value=1.4465457485394606, intRangeMinimum=1.4465457485394606, intRangeMaximum=1.4465457485394606, id='x1'}
java.lang.DoubleFeature{value=486.3570827969562, intRangeMinimum=0.0, intRangeMaximum=1000.0, id='x2'}
java.lang.DoubleFeature{value=8.538028490590378, intRangeMinimum=0.0, intRangeMaximum=1000.0, id='x3'}
java.lang.DoubleFeature{value=2.666374576834393, intRangeMinimum=2.666374576834393, intRangeMaximum=2.666374576834393, id='x4'}


## Using Python models



We will now show how to use a custom Python model with TrustyAI counterfactual explanations.

The model will be an XGBoost one trained with the `credit-bias` dataset.

For convenience, the model is pre-trained and serialised with `joblib` so that for this example we simply need to deserialised it.

In [20]:
import joblib

xg_model = joblib.load("model.joblib")
print(xg_model)

XGBClassifier(base_score=0.5, booster='gbtree', colsample_bylevel=1,
              colsample_bynode=1, colsample_bytree=1, gamma=0, gpu_id=-1,
              importance_type='gain', interaction_constraints='',
              learning_rate=0.07, max_delta_step=0, max_depth=8,
              min_child_weight=1, missing=nan, monotone_constraints='()',
              n_estimators=200, n_jobs=12, num_parallel_tree=1, random_state=27,
              reg_alpha=0, reg_lambda=1, scale_pos_weight=0.9861206227457426,
              seed=27, subsample=1, tree_method='exact', validate_parameters=1,
              verbosity=None)


The input features are

- `NewCreditCustomer`, a boolean
- `Amount`, numerical
- `Interest`, numerical
- `LoanDuration`, months (integer)
- `Education`, level (integer)
- `NrOfDependants`, integer
- `EmploymentDurationCurrentEmployer`, years (integer)
- `IncomeFromPrincipalEmployer`, numerical
- `IncomeFromPension`, numerical
- `IncomeFromFamilyAllowance`, numerical
- `IncomeFromSocialWelfare`, numerical
- `IncomeFromLeavePay`, numerical
- `IncomeFromChildSupport`, numerical
- `IncomeOther`, numerical
- `ExistingLiabilities`, integer
- `RefinanceLiabilities`, integer
- `DebtToIncome`, numerical
- `FreeCash`, numerical
- `CreditScoreEeMini`, integer
- `NoOfPreviousLoansBeforeLoan`, integer
- `AmountOfPreviousLoansBeforeLoan`, numerical
- `PreviousRepaymentsBeforeLoan`, numerical
- `PreviousEarlyRepaymentsBefoleLoan`, numerical
- `PreviousEarlyRepaymentsCountBeforeLoan`, integer
- `Council_house`, integer (binary)
- `Homeless`, integer (binary)
- `Joint_ownership`, integer (binary)
- `Joint_tenant`, integer (binary)
- `Living_with_parents`, integer (binary)
- `Mortgage`, integer (binary)
- `Other`, integer
- `Owner`, integer (binary)
- `Owner_with_encumbrance`, integer (binary)
- `Tenant`, integer (binary)
- `Entrepreneur`, integer (binary)
- `Fully`, integer (binary)
- `Partially`, integer (binary)
- `Retiree`, integer (binary)
- `Self_employed`, integer (binary)


In [21]:
x = [[False,2125.0,20.97,60,4.0,0.0,6.0,0.0,301.0,0.0,53.0,0.0,0.0,0.0,8,6,26.29,10.92,1000.0,1.0,500.0,590.95,0.0,0.0,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0]]

In [22]:
import numpy as np

print(xg_model.predict_proba(np.array(x)))
print(f"Paid loan is predicted as: {xg_model.predict(np.array(x))}")

[[0.7770493  0.22295067]]
Paid loan is predicted as: [False]


In [23]:
from typing import List
from trustyai.utils import toJList

def predict(inputs: List[PredictionInput]) -> List[PredictionOutput]:
    values = [feature.getValue().asNumber() for feature in inputs.get(0).getFeatures()]
    result = xg_model.predict(np.array([values]))
    value = False if result[0]==0 else True
    output = Output("PaidLoan", Type.BOOLEAN, Value(value), 0.0)
    return toJList([PredictionOutput([output])])

In [24]:
from trustyai.model import PredictionProvider

model = PredictionProvider(predict)

In [25]:
features = [
    FeatureFactory.newBooleanFeature("NewCreditCustomer", False),
    FeatureFactory.newNumericalFeature("Amount", 2125.0),
    FeatureFactory.newNumericalFeature("Interest", 20.97),
    FeatureFactory.newNumericalFeature("LoanDuration", 60.0),
    FeatureFactory.newNumericalFeature("Education", 4.0),
    FeatureFactory.newNumericalFeature("NrOfDependants", 0.0),
    FeatureFactory.newNumericalFeature("EmploymentDurationCurrentEmployer", 6.0),
    FeatureFactory.newNumericalFeature("IncomeFromPrincipalEmployer", 0.0),
    FeatureFactory.newNumericalFeature("IncomeFromPension", 301.0),
    FeatureFactory.newNumericalFeature("IncomeFromFamilyAllowance", 0.0),
    FeatureFactory.newNumericalFeature("IncomeFromSocialWelfare", 53.0),
    FeatureFactory.newNumericalFeature("IncomeFromLeavePay", 0.0),
    FeatureFactory.newNumericalFeature("IncomeFromChildSupport", 0.0),
    FeatureFactory.newNumericalFeature("IncomeOther", 0.0),
    FeatureFactory.newNumericalFeature("ExistingLiabilities", 8.0),
    FeatureFactory.newNumericalFeature("RefinanceLiabilities", 6.0),
    FeatureFactory.newNumericalFeature("DebtToIncome", 26.29),
    FeatureFactory.newNumericalFeature("FreeCash", 10.92),
    FeatureFactory.newNumericalFeature("CreditScoreEeMini", 1000.0),
    FeatureFactory.newNumericalFeature("NoOfPreviousLoansBeforeLoan", 1.0),
    FeatureFactory.newNumericalFeature("AmountOfPreviousLoansBeforeLoan", 500.0),
    FeatureFactory.newNumericalFeature("PreviousRepaymentsBeforeLoan", 590.95),
    FeatureFactory.newNumericalFeature("PreviousEarlyRepaymentsBefoleLoan", 0.0),
    FeatureFactory.newNumericalFeature("PreviousEarlyRepaymentsCountBeforeLoan", 0.0),
    FeatureFactory.newBooleanFeature("Council_house", False),
    FeatureFactory.newBooleanFeature("Homeless", False),
    FeatureFactory.newBooleanFeature("Joint_ownership", False),
    FeatureFactory.newBooleanFeature("Joint_tenant", False),
    FeatureFactory.newBooleanFeature("Living_with_parents", False),
    FeatureFactory.newBooleanFeature("Mortgage", False),
    FeatureFactory.newBooleanFeature("Other", False),
    FeatureFactory.newBooleanFeature("Owner", False),
    FeatureFactory.newBooleanFeature("Owner_with_encumbrance", True),
    FeatureFactory.newBooleanFeature("Tenant", True),
    FeatureFactory.newBooleanFeature("Entrepreneur", False),
    FeatureFactory.newBooleanFeature("Fully", False),
    FeatureFactory.newBooleanFeature("Partially", False),
    FeatureFactory.newBooleanFeature("Retiree", True),
    FeatureFactory.newBooleanFeature("Self_employed", False),   
]

In [26]:
n_features = len(features)
print(n_features)

39


In [27]:
from trustyai.utils import toJList

model.predictAsync(toJList([PredictionInput(features)])).get()[0].getOutputs()[0].toString()

'Output{value=false, type=boolean, score=0.0, name='PaidLoan'}'

In [28]:
constraints = [False] * n_features

In [29]:
features_boundaries = [NumericalFeatureDomain.create(0.0, 10000.0)] * n_features

In [30]:
termination_config = TerminationConfig().withSecondsSpentLimit(Long.valueOf(20))

solver_config = (
        CounterfactualConfigurationFactory.builder()
        .withTerminationConfig(termination_config)
        .build()
    )

In [31]:
explainer = CounterfactualExplainer.builder().withSolverConfig(solver_config).build()

In [32]:
goal = [Output("PaidLoan", Type.BOOLEAN, Value(True), 0.0)]

In [33]:
original = PredictionInput(features)
goals = PredictionOutput(goal)
domain = PredictionFeatureDomain(features_boundaries)

In [34]:
prediction = CounterfactualPrediction(original, goals, domain, constraints, None, uuid.uuid4())

In [37]:
explanation = explainer.explainAsync(prediction, model).get()

In [38]:
for entity in explanation.getEntities():
    print(entity)

java.lang.BooleanFeature{value=false, id='NewCreditCustomer'}
java.lang.DoubleFeature{value=2125.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='Amount'}
java.lang.DoubleFeature{value=20.97, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='Interest'}
java.lang.DoubleFeature{value=60.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='LoanDuration'}
java.lang.DoubleFeature{value=4.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='Education'}
java.lang.DoubleFeature{value=0.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='NrOfDependants'}
java.lang.DoubleFeature{value=6.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='EmploymentDurationCurrentEmployer'}
java.lang.DoubleFeature{value=0.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='IncomeFromPrincipalEmployer'}
java.lang.DoubleFeature{value=301.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='IncomeFromPension'}
java.lang.DoubleFeature{value=0.0, intRangeMinimum=0.0, intRangeMaximum=10000.0, id='IncomeFromFamil

In [42]:
testf = [f.asFeature() for f in explanation.getEntities()]
model.predictAsync(toJList([PredictionInput(testf)])).get()[0].getOutputs()[0].toString()

'Output{value=true, type=boolean, score=0.0, name='PaidLoan'}'