In [None]:
import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent))

In [None]:
import torch
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd
import seaborn as sns
from collections import defaultdict
import statistics
from torch.distributions.categorical import Categorical
from IPython import display
import copy
from torch.utils.data import DataLoader
from tqdm import trange
import torch.nn.functional as F
from tqdm import tqdm
from scipy import spatial
import math
import cv2
from matplotlib import colors
from pathlib import Path
from tqdm import trange
from ipywidgets import interact
from collections import defaultdict
from src.env import ACTION_MAPPER
from src.rl_utils import get_project_folder

In [None]:
%run main.py --load-version 0 --n-rollouts 10 --n-samples 200_000 --epochs 300 --reg-coef 5e-2  \
    --num-envs 10 --n-clf 25 --clf-type "moe" --mode "nmf" --algo "ppo" --env-id PongNoFrameskip-v4 --learning-rate 5e-3 --n-experts 4 \
        --n-concepts 2 --target-layer 3 --batch-size 512 --val-interval 10 --max-patience 5 --ccp-alpha 5e-5 --agent-version 0 \
            --save-start 0

In [None]:
if GLOBAL_INFO["args"].clf_type == "dt":
    model = sorted(GLOBAL_INFO["clf"], key=lambda x: (
        x[1], -x[0].clf.get_n_leaves()))[-1][0]
elif GLOBAL_INFO["args"].clf_type == "none":
    model = GLOBAL_INFO["clf"][0]
else:
    model = sorted(GLOBAL_INFO["clf"], key=lambda x: x[1])[-1]
    clf = model[0].clf
model = model[0]

## Expert visualization

In [None]:
folder = Path().cwd().parent / \
    f"experiment-data/experts/{GLOBAL_INFO['args'].env_id}"

if not folder.exists():
    folder.mkdir(parents=True)

n_concepts = model.reducer.n_components_

weight = [clf.experts[i].weight.numpy(force=True)
          for i in range(len(clf.experts))]
weight = np.stack(weight, axis=0)
weight = np.reshape(weight, (len(clf.experts), -1,
                    n_concepts, 9, 9))

nrows = weight.shape[2]
ncols = weight.shape[1]


for expert_id in range(weight.shape[0]):
    fig_large, ax_large = plt.subplots(
        nrows=nrows, ncols=ncols, layout="constrained", figsize=(4 * ncols, 4.1 * nrows))
    if nrows == 1:
        ax_large = np.expand_dims(ax_large, 0)

    max_ = np.max(weight[expert_id])
    min_ = np.min(weight[expert_id])
    largest = min(max_, abs(min_))
    print(largest)
    max_ = largest
    min_ = -largest
    for row in range(nrows):
        for col in range(ncols):
            fig, ax = plt.subplots(layout="constrained", figsize=(4, 4.25))
            weight_instace = weight[expert_id, col, row]
            sns.heatmap(weight_instace, cbar=False, vmax=max_, vmin=min_,
                        cmap=sns.color_palette("coolwarm", as_cmap=True),
                        ax=ax, square=True)
            ax.axis("off")
            ax.set_title(
                f"{ACTION_MAPPER[GLOBAL_INFO['args'].env_id][col].capitalize()}", fontsize=25)
            fig.savefig(
                folder /
                f"expert={expert_id}__concept={row}__action={ACTION_MAPPER[GLOBAL_INFO['args'].env_id][col]}_{col}.svg",
                bbox_inches='tight', pad_inches=0)
            plt.close(fig)

            # Large plot

            sns.heatmap(weight_instace, cbar=False, vmax=max_, vmin=min_,
                        cmap=sns.color_palette("coolwarm", as_cmap=True),
                        ax=ax_large[row, col], square=True)

            ax_large[row, col].set_xticks([])
            ax_large[row, col].set_yticks([])
            if row == 0:
                ax_large[row, col].set_title(
                    f"{ACTION_MAPPER[GLOBAL_INFO['args'].env_id][col].capitalize()}", fontsize=40)

            if col == 0:
                if "Car" in GLOBAL_INFO['args'].env_id:
                    label = "Grass" if row == 0 else "Road"
                elif "Pong" in GLOBAL_INFO['args'].env_id:
                    label = "Ball and\nBackground" if row == 0 else "Ball and\nPaddles"
                else:
                    continue

                ax_large[row, col].set_ylabel(
                    label, fontsize=40)

    fig_large.savefig(
        folder / f"linear model__env_id={GLOBAL_INFO['args'].env_id}__expert={expert_id}.pdf", bbox_inches='tight', pad_inches=0)

## Create Concept-based Explanations

In [None]:
def evaluate_im(args, fabric, envs, model, num_eps: int):
    env_idx = 0
    model.eval()
    fabric.seed_everything(args.seed)
    result = defaultdict(list)

    next_obs = torch.tensor(envs.reset(seed=args.seed)[0],
                            device=fabric.device)

    for _ in trange(2_000):
        with torch.no_grad():
            concept, shape, _ = model.obs_to_concepts(
                model.env_id, next_obs, model.agent, model.reducer)
            mask = np.reshape(concept, shape)

            expert = model.get_cluster(next_obs)
            logits = model(next_obs)
            action = torch.argmax(logits, -1)

        result["expert"].append(expert[env_idx].numpy(
            force=True)[np.newaxis, ...])
        result["obs"].append(next_obs[env_idx].numpy(
            force=True)[np.newaxis, ...])
        result["mask"].append(mask[env_idx][np.newaxis, ...])
        result["logits"].append(
            logits[env_idx].numpy(force=True)[np.newaxis, ...])

        if envs.envs[0].spec.id == "CarRacing-v2":
            img = cv2.resize(envs.envs[0].render(), dsize=(96, 96))[:84, 6:90][
                np.newaxis, ...]
        elif envs.envs[0].spec.id == "PongNoFrameskip-v4":
            img = cv2.resize(envs.envs[0].render()[34:-16], dsize=(84, 84))[
                np.newaxis, ...]
        elif envs.envs[0].spec.id == "MsPacmanNoFrameskip-v4":
            img = cv2.resize(envs.envs[0].render()[:-39], dsize=(84, 84))[
                np.newaxis, ...]
        elif envs.envs[0].spec.id == "BreakoutNoFrameskip-v4":
            img = cv2.resize(envs.envs[0].render()[32:, 8:-8], dsize=(84, 84))[
                np.newaxis, ...]
        else:
            raise NotImplementedError
        result["img"].append(img)

        next_obs, _, _, _, infos = envs.step(action.numpy(force=True))
        next_obs = torch.tensor(next_obs, device=fabric.device)

        result["action"].append(action[env_idx].numpy(force=True)[np.newaxis])

        if "final_info" not in infos:
            continue

        for info in infos["final_info"]:
            # Skip the envs that are not done
            if info is None or "episode" not in info:
                continue
            result["return"].append(info["episode"]["r"])

    result = {key: np.concatenate(value, axis=0)
              for key, value in result.items()}
    return result

In [None]:
im_results = evaluate_im(
    GLOBAL_INFO["args"], GLOBAL_INFO["fabric"], GLOBAL_INFO["envs"], model, GLOBAL_INFO["args"].num_envs)

In [None]:
np.random.seed(9937)
chosen_expert = np.argmax(im_results["expert"], 1)

for ei in np.unique(chosen_expert):
    print(sorted(np.random.choice(np.arange(len(chosen_expert))
          [chosen_expert == ei], 10, False).tolist()))

In [None]:
folder = get_project_folder(
) / f"experiment-data/samples/{GLOBAL_INFO['args'].env_id}"
if not folder.exists():
    folder.mkdir(parents=True)

img = im_results["img"]
action = np.argmax(im_results["logits"], -1)
expert = np.argmax(im_results["expert"], -1)
expert_prob = np.max(im_results["expert"], -1)
concepts = im_results["mask"]
weights = np.array([np.reshape(model.clf.experts[i].weight.numpy(
    force=True), (model.clf.n_actions, -1, 9, 9)) for i in range(len(model.clf.experts))])

colors = ["Blues", "Reds", "Purples", "Greens", "Oranges"]

for i in range(len(img)):
    fig, ax = plt.subplots(layout="constrained", figsize=(5, 5))
    masks = weights[expert[i]][action[i]] * concepts[i]
    masks = np.array([cv2.resize(masks[j], dsize=(84, 84))
                     for j in range(len(masks))])
    masks = np.abs(masks)  # What to do with negative attribution?
    masks = (masks - masks.min()) / (masks.max() - masks.min() + 1e-8)

    plt.imshow(img[i])
    for k in range(len(masks)):
        plt.imshow(masks[k], alpha=masks[k], cmap=sns.color_palette(
            colors[k], as_cmap=True), vmin=0, vmax=1)
    ax.set_title(ACTION_MAPPER[GLOBAL_INFO["args"].env_id]
                 [action[i]].capitalize(), fontsize=25)
    ax.axis("off")

    fig.savefig(
        folder / f"fig{i}__expert={expert[i]}__prob={expert_prob[i]}__action={action[i]}.svg", bbox_inches='tight', pad_inches=0)
    plt.close()

In [None]:
folder = get_project_folder(
) / f"experiment-data/concepts/{GLOBAL_INFO['args'].env_id}"
if not folder.exists():
    folder.mkdir(parents=True)


action = np.argmax(im_results["logits"], -1)
img = im_results["img"]

for i in range(len(img)):
    for cj in range(len(im_results["mask"][0])):
        fig, ax = plt.subplots(layout="constrained")
        ax.imshow(im_results["img"][i])
        mask = cv2.resize(im_results["mask"][i][cj], dsize=(84, 84))
        mask = (mask - mask.min()) / (mask.max() - mask.min() + 1e-8)
        ax.imshow(mask, alpha=mask,
                  cmap=sns.color_palette("coolwarm", as_cmap=True)
                  )
        ax.axis("off")

        fig.savefig(
            folder / f"fig{i}__concepts={cj}__action={action[i]}.svg", bbox_inches='tight', pad_inches=0)
        plt.close()

In [None]:
folder = get_project_folder(
) / f"experiment-data/states/{GLOBAL_INFO['args'].env_id}"
if not folder.exists():
    folder.mkdir(parents=True)


img = im_results["img"]

for i in range(len(img)):
    fig, ax = plt.subplots(layout="constrained")
    ax.imshow(img[i])
    ax.axis("off")

    fig.savefig(
        folder / f"fig{i}__action={im_results['action'][i]}.svg", bbox_inches='tight', pad_inches=0)
    plt.close()