In [1]:
import sys

sys.path.append('..')
import os
import torch
import pandas as pd
import numpy as np
import deep_logic as dl
from deep_logic.models.relu_nn import XReluNN
from deep_logic.models.psi_nn import PsiNetwork
from deep_logic.models.tree import XDecisionTreeClassifier
from deep_logic.models.brl import XBRLClassifier
from deep_logic.models.logistic_regression import XLogisticRegressionClassifier
from deep_logic.utils.base import set_seed, ClassifierNotTrainedError, IncompatibleClassifierError
from deep_logic.utils.metrics import Accuracy
from deep_logic.models.general_nn import XGeneralNN
from deep_logic.utils.datasets import ConceptToTaskDataset
from deep_logic.utils.data import get_splits_train_val_test
from deep_logic.logic.base import test_explanation
from data import MNIST
from experiments.MNIST.concept_extractor_mnist import concept_extractor_mnist

results_dir = 'results/mnist'
if not os.path.isdir(results_dir):
    os.makedirs(results_dir)
dataset_root = "../data/MNIST_EVEN_ODD/"
dataset_root

Using cache found in C:\Users\gabri/.cache\torch\hub\pytorch_vision_v0.6.0


'../data/MNIST_EVEN_ODD/'

## Define loss, metrics and saved metrics

In [3]:
loss = torch.nn.CrossEntropyLoss()
metric = Accuracy()
method_list = ['General', 'Psi', 'LogisticRegression', 'BRL', 'DTree', 'Relu']

## Loading MNIST data

In [4]:
if not os.path.isfile(os.path.join(dataset_root, f"{MNIST}_predictions.npy")):
    concept_extractor_mnist(dataset_root)
else:
    print("Concepts already extracted")
dataset = ConceptToTaskDataset(dataset_root, dataset_name=MNIST, predictions=True)
concept_names = dataset.attribute_names
print("Concept names", concept_names)
n_features = dataset.n_attributes
print("Number of features", n_features)
class_names = dataset.classes
print("Class names", class_names)
n_classes = dataset.n_classes
print("Number of classes", n_classes)

Concepts already extracted
Concept names ['Zero' 'One' 'Two' 'Three' 'Four' 'Five' 'Six' 'Seven' 'Eight' 'Nine']
Number of features 10
Class names ['Even', 'Odd']
Number of classes 2


## Training

In [None]:
epochs = 200
l_r = 1e-3
lr_scheduler = False
top_k_explanations = 1
simplify = True
seeds = [*range(10)]
print("Seeds", seeds)
device = torch.device("cpu") if torch.cuda.is_available() else torch.device("cpu")
print("Device", device)

for method in method_list:

    methods = []
    splits = []
    explanations = []
    model_accuracies = []
    explanation_accuracies = []
    elapsed_times = []
    explanation_fidelities = []
    explanation_complexities = []

    for seed in seeds:
        set_seed(seed)
        name = os.path.join(results_dir, f"{method}_{seed}")

        train_data, val_data, test_data = get_splits_train_val_test(dataset, load=False)
        x_val = torch.tensor(dataset.attributes[val_data.indices])
        y_val = torch.tensor(dataset.targets[val_data.indices])
        x_test = torch.tensor(dataset.attributes[test_data.indices])
        y_test = torch.tensor(dataset.targets[test_data.indices])
        print(train_data.indices)

        # Setting device
        print(f"Training {name} Classifier...")

        if method == 'BRL':
            model = XBRLClassifier(name=name, n_classes=n_classes, n_features=n_features,
                                   feature_names=concept_names, class_names=dataset.classes, discretize=True)
            try:
                model.load(device)
                print(f"Model {name} already trained")
            except (ClassifierNotTrainedError, IncompatibleClassifierError):
                results = model.fit(val_data, metric=metric, save=True, verbose=False)
            accuracy = model.evaluate(test_data, metric=metric)
            print("Test model accuracy", accuracy)
            formulas, times, exp_accuracies, exp_complexities = [], [], [], []
            for i, class_to_explain in enumerate(dataset.classes):
                formula, elapsed_time = model.get_global_explanation(i, concept_names,
                                                                     return_time=True)
                exp_accuracy = accuracy
                explanation_complexity = dl.logic.complexity(formula)
                formulas.append(formula), times.append(elapsed_time)
                exp_accuracies.append(exp_accuracy), exp_complexities.append(explanation_complexity)
                print(f"{class_to_explain} <-> {formula}")
                print("Elapsed time", elapsed_time)
                print("Explanation accuracy", exp_accuracy)
                print("Explanation complexity", explanation_complexity)

        elif method == 'DTree':
            model = XDecisionTreeClassifier(name=name, n_classes=n_classes, n_features=n_features)
            try:
                model.load(device)
                print(f"Model {name} already trained")
            except (ClassifierNotTrainedError, IncompatibleClassifierError):
                results = model.fit(train_data, val_data, metric=metric, save=True)
            accuracy = model.evaluate(test_data, metric=metric)
            print("Test model accuracy", accuracy)
            formulas, times, exp_accuracies, exp_complexities = [], [], [], []
            for i, class_to_explain in enumerate(dataset.classes):
                formula, elapsed_time = model.get_global_explanation(i, concept_names,
                                                                     return_time=True)
                exp_accuracy = accuracy
                explanation_complexity = dl.logic.complexity(formula)
                formulas.append(formula), times.append(elapsed_time)
                exp_accuracies.append(exp_accuracy), exp_complexities.append(explanation_complexity)
                print(f"{class_to_explain} <-> {formula}")
                print("Elapsed time", elapsed_time)
                print("Explanation accuracy", exp_accuracy)
                print("Explanation complexity", explanation_complexity)

        elif method == 'Psi':
            # Network structures
            l1_weight = 1e-2
            print("l1 weight", l1_weight)
            hidden_neurons = []
            fan_in = 5
            lr_psi = 1e-3
            model = PsiNetwork(n_classes, n_features, hidden_neurons, loss,
                               l1_weight, name=name, fan_in=fan_in)
            try:
                model.load(device)
                print(f"Model {name} already trained")
            except (ClassifierNotTrainedError, IncompatibleClassifierError):
                results = model.fit(train_data, val_data, epochs=epochs, l_r=lr_psi, verbose=True,
                                    metric=metric, lr_scheduler=lr_scheduler, device=device, save=True)
            accuracy = model.evaluate(test_data, metric=metric)
            print("Test model accuracy", accuracy)
            formulas, times, exp_accuracies, exp_complexities = [], [], [], []
            for i, class_to_explain in enumerate(dataset.classes):
                formula, elapsed_time = model.get_global_explanation(i, concept_names,
                                                                     simplify=simplify, return_time=True)
                exp_accuracy, _ = test_explanation(formula, i, x_test, y_test,
                                                   metric=metric, concept_names=concept_names)
                explanation_complexity = dl.logic.complexity(formula)
                formulas.append(formula), times.append(elapsed_time), exp_accuracies.append(exp_accuracy)
                exp_complexities.append(explanation_complexity)
                print(f"{class_to_explain} <-> {formula}")
                print("Elapsed time", elapsed_time)
                print("Explanation accuracy", exp_accuracy)
                print("Explanation complexity", explanation_complexity)

        elif method == 'Relu':
            # Network structures
            l1_weight = 1e-2
            hidden_neurons = [100, 50, 30]
            model = XReluNN(n_classes=1, n_features=n_features, name=name,
                            hidden_neurons=hidden_neurons, loss=torch.nn.BCEWithLogitsLoss(), l1_weight=l1_weight)
            try:
                model.load(device)
                print(f"Model {name} already trained")
            except (ClassifierNotTrainedError, IncompatibleClassifierError):
                results = model.fit(train_data, val_data, epochs=epochs, l_r=l_r, verbose=True,
                                    metric=metric, lr_scheduler=lr_scheduler, device=device, save=True)
            accuracy = model.evaluate(test_data, metric=metric)
            print("Test model accuracy", accuracy)
            formulas, times, exp_accuracies, exp_complexities = [], [], [], []
            for i, class_to_explain in enumerate(dataset.classes):
                formula, elapsed_time = model.get_global_explanation(x_val, y_val, i,
                                                                     topk_explanations=top_k_explanations,
                                                                     concept_names=concept_names,
                                                                     simplify=simplify, return_time=True)
                exp_accuracy, _ = test_explanation(formula, i, x_test, y_test,
                                                   metric=metric, concept_names=concept_names)
                explanation_complexity = dl.logic.complexity(formula)
                formulas.append(formula), times.append(elapsed_time), exp_accuracies.append(exp_accuracy)
                exp_complexities.append(explanation_complexity)
                print(f"{class_to_explain} <-> {formula}")
                print("Elapsed time", elapsed_time)
                print("Explanation accuracy", exp_accuracy)
                print("Explanation complexity", explanation_complexity)

        elif method == 'General':
            # Network structures
            l1_weight = 1e0
            fan_in = None
            hidden_neurons = [100, 50, 30]
            model = XGeneralNN(n_classes=2, n_features=n_features, hidden_neurons=hidden_neurons,
                               loss=torch.nn.BCEWithLogitsLoss(), name=name, l1_weight=l1_weight, fan_in=fan_in)
            try:
                model.load(device)
                print(f"Model {name} already trained")
            except (ClassifierNotTrainedError, IncompatibleClassifierError):
                results = model.fit(train_data, val_data, epochs=epochs, l_r=l_r, metric=metric,
                                    lr_scheduler=lr_scheduler, device=device, save=True, verbose=True)
            accuracy = model.evaluate(test_data, metric=metric)
            print("Test model accuracy", accuracy)
            formulas, times, exp_accuracies, exp_complexities = [], [], [], []
            for i, class_to_explain in enumerate(dataset.classes):
                formula, elapsed_time = model.get_global_explanation(x_val, y_val, i, simplify=simplify,
                                                                     topk_explanations=top_k_explanations,
                                                                     concept_names=concept_names, return_time=True)
                exp_accuracy, _ = test_explanation(formula, i, x_test, y_test,
                                                   metric=metric, concept_names=concept_names)
                explanation_complexity = dl.logic.complexity(formula)
                formulas.append(formula), times.append(elapsed_time), exp_accuracies.append(exp_accuracy)
                exp_complexities.append(explanation_complexity)
                print(f"{class_to_explain} <-> {formula}")
                print("Elapsed time", elapsed_time)
                print("Explanation accuracy", exp_accuracy)
                print("Explanation complexity", explanation_complexity)

        elif method == 'LogisticRegression':
            l_r_lr = 1e-1
            set_seed(seed)
            model = XLogisticRegressionClassifier(name=name, n_classes=1, n_features=n_features,
                                                  loss=torch.nn.BCEWithLogitsLoss())
            try:
                model.load(device)
                print(f"Model {name} already trained")
            except (ClassifierNotTrainedError, IncompatibleClassifierError):
                results = model.fit(train_data, val_data, epochs=epochs, l_r=l_r_lr, metric=metric,
                                    lr_scheduler=lr_scheduler, device=device, save=True, verbose=True)
            accuracy = model.evaluate(test_data, metric=metric)
            print("Test model accuracy", accuracy)
            formulas, times, exp_accuracies, exp_complexities = [""], [0], [0], [0]
        else:
            raise NotImplementedError(f"{method} not implemented")

        methods.append(method)
        splits.append(seed)
        explanations.append(formulas[0])
        model_accuracies.append(accuracy)
        explanation_accuracies.append(np.mean(exp_accuracies))
        elapsed_times.append(np.mean(times))
        explanation_complexities.append(np.mean(exp_complexities))

    explanation_consistency = dl.logic.formula_consistency(explanations)
    print(f'Consistency of explanations: {explanation_consistency:.4f}')

    results = pd.DataFrame({
        'method': methods,
        'split': splits,
        'explanation': explanations,
        'model_accuracy': model_accuracies,
        'explanation_accuracy': explanation_accuracies,
        'explanation_complexity': explanation_complexities,
        'explanation_consistency': explanation_consistency,
        'elapsed_time': elapsed_times,
    })
    results.to_csv(os.path.join(results_dir, f'results_{method}.csv'))
    print(results)

## Summary

In [5]:
cols = ['model_accuracy', 'explanation_accuracy', 'explanation_complexity', 'elapsed_time',
        'explanation_consistency']
mean_cols = [f'{c}_mean' for c in cols]
sem_cols = [f'{c}_sem' for c in cols]

results = {}
summaries = {}
for method in method_list:
    results[method] = pd.read_csv(os.path.join(results_dir, f"results_{method}.csv"))
    df_mean = results[method][cols].mean()
    df_sem = results[method][cols].sem()
    df_mean.columns = mean_cols
    df_sem.columns = sem_cols
    summaries[method] = pd.concat([df_mean, df_sem])
    summaries[method].name = method

results = pd.concat([results[method] for method in method_list], axis=1).T
results.to_csv(os.path.join(results_dir, f'results.csv'))

summary = pd.concat([summaries[method] for method in method_list], axis=1).T
summary.columns = mean_cols + sem_cols
summary.to_csv(os.path.join(results_dir, 'summary.csv'))
print(summary)

                    model_accuracy_mean  explanation_accuracy_mean  \
General                       99.797500                  99.662083   
Psi                           99.810000                  76.030000   
LogisticRegression            99.835833                   0.000000   
BRL                           99.734167                  99.734167   
DTree                         99.995833                  99.995833   
Relu                          99.782500                  95.170000   

                    explanation_complexity_mean  elapsed_time_mean  \
General                                   18.50          11.158586   
Psi                                       19.60           0.134629   
LogisticRegression                         0.00           0.000000   
BRL                                       22.95           0.225282   
DTree                                    639.95           0.001230   
Relu                                      44.05          21.510664   

                  