# Street sign with Pre-Trained WideResNet

With additional shield net, generate example images 

In [1]:
import torch 

print(torch.cuda.is_available())

False


  return torch._C._cuda_getDeviceCount() > 0


In [2]:
from torch.utils.data import Dataset, DataLoader
from os.path import join
import pandas as pd 
from PIL import Image
from torch.optim import SGD
import seaborn as sb 
from gtsrb import GTSRB
from detectors import EnsembleDetector, SemanticDetector, LogicOnlyDetector
import os 
from pytorch_ood.utils import fix_random_seed
import seaborn as sb 
import torch 

# os.environ["CUDA_VISIBLE_DEVICES"] = "MIG-GPU-bb1ccb6e-2bc9-c7a1-b25d-3eef9033e192/0/0" 

sb.set()

device="cuda:0"
root = "../data/"


fix_random_seed()

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    fix_random_seed()

g = torch.Generator()
g.manual_seed(0)


<torch._C.Generator at 0x7f6e1cdd0810>

In [3]:
import torchvision
from torchvision.transforms import ToTensor, Resize, Compose
import torch 
from torch.utils.data import DataLoader

trans = Compose([ToTensor(), Resize((32, 32))])

train_data = GTSRB(root=root, train=True, transforms=trans)
test_data = GTSRB(root=root, train=False, transforms=trans)

In [4]:
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, num_workers=2, worker_init_fn=seed_worker)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, num_workers=2, worker_init_fn=seed_worker)

In [5]:
from torch import nn
from torchvision.models.resnet import resnet18
from pytorch_ood.model import WideResNet
from functools import partial

# def override 
def Model(num_classes=None, *args, **kwargs):
    model = WideResNet(*args, num_classes=1000, **kwargs, pretrained="imagenet32")
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    return model


In [6]:
from tqdm.notebook import tqdm 
import numpy as np 

def train_model(att_index, num_classes, n_epochs=20):
    """
    train a model for the given attribute index 
    """
    trans = Compose([ToTensor(), Resize((32, 32))])
    train_data = GTSRB(root=root, train=True, transforms=trans)
    test_data = GTSRB(root=root, train=False, transforms=trans)
    
    train_loader = DataLoader(train_data, batch_size=32, shuffle=True, num_workers=2, worker_init_fn=seed_worker)
    test_loader = DataLoader(test_data, batch_size=32, shuffle=False, num_workers=2, worker_init_fn=seed_worker)
    
    model = Model(num_classes=num_classes).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9, nesterov=True)

    accs = []

    for epoch in range(n_epochs):
        running_loss = 0.0
        model.train()
        bar = tqdm(train_loader)
        for inputs, y in bar:
            labels = y[:, att_index]
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss = 0.8 * running_loss + 0.2 * loss.item()
            bar.set_postfix({"loss": running_loss})

        correct = 0
        total = 0

        with torch.no_grad():
            model.eval()

            for inputs, y in test_loader:
                labels = y[:, att_index]
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, dim=1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f'Accuracy of the network on the test images: {correct / total:.2%}')

    return model 

# Sign Network 

In [7]:
from torch.utils.data import DataLoader
from pytorch_ood.utils import is_known
from tqdm.notebook import tqdm 
from pytorch_ood.dataset.img import TinyImages300k
from pytorch_ood.utils import ToUnknown
from torch.utils.data import random_split

def train_sign_model(n_epochs=20):
    tiny = TinyImages300k(root=root, download=True, transform=trans, target_transform=ToUnknown())
    data_train_out, data_test_out, _ = random_split(tiny, [50000, 10000, 240000], generator=torch.Generator().manual_seed(123))

    train_data_noatt = GTSRB(root=root, train=True, transforms=trans, target_transform=lambda y: y[0])
    test_data_noatt = GTSRB(root=root, train=False, transforms=trans, target_transform=lambda y: y[0])

    new_loader = DataLoader(train_data_noatt + data_train_out, batch_size=32, shuffle=True, num_workers=10, worker_init_fn=seed_worker)
    new_test_loader = DataLoader(test_data_noatt + data_test_out, batch_size=32, shuffle=False, num_workers=10, worker_init_fn=seed_worker)

    model = Model(num_classes=2).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = SGD(model.parameters(), lr=0.001, momentum=0.9, nesterov=True)

    accs = []

    for epoch in range(n_epochs):
        running_loss = 0.0
        model.train()
        
        bar = tqdm(new_loader)
        for inputs, y in bar:
            labels = is_known(y).long()
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss = 0.8 * running_loss + 0.2 * loss.item()
            bar.set_postfix({"loss": running_loss})

        correct = 0
        total = 0

        with torch.no_grad():
            model.eval()

            for inputs, y in new_test_loader:
                labels = is_known(y).long()
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f'Accuracy of the shape network on the test images: {correct / total:.2%}')
        accs.append(correct / total)

    return model 

# OOD Evaluation 

In [8]:
from pytorch_ood.dataset.img import (LSUNCrop, LSUNResize, Textures, TinyImageNetCrop, TinyImageNetResize)
from pytorch_ood.detector import EnergyBased, MaxSoftmax, Mahalanobis, MaxLogit, ViM, Entropy
from pytorch_ood.utils import ToRGB, OODMetrics

def evaluate(label_net, shape_net, color_net, shield_net):
    _ = label_net.eval()
    _ = shape_net.eval()
    _ = color_net.eval()
    _ = shield_net.eval()
    
    results = []

    trans = Compose([Resize(size=(32, 32)), ToRGB(), ToTensor()])
    data_in = GTSRB(root=root, train=False, transforms=trans, target_transform=lambda y: y[0])
    data_in_train = GTSRB(root=root, train=True, transforms=trans, target_transform=lambda y: y[0])
    loader_train = DataLoader(data_in_train, batch_size=1024, shuffle=False, worker_init_fn=seed_worker)
    # dataset_out_test = Textures(root=root, transform=trans, target_transform=ToUnknown(), download=True)
    
    
    detectors = {
        "MSP": MaxSoftmax(label_net),
        "Energy": EnergyBased(label_net),
        "Mahalanobis": Mahalanobis(label_net.features),
        "MaxLogit": MaxLogit(label_net),
        "Entropy": Entropy(label_net),
        "ViM": ViM(label_net.features, w=label_net.fc.weight, b=label_net.fc.bias, d=64),
        "Logic": LogicOnlyDetector(
            label_net, shape_net, 
            color_net, 
            GTSRB(root=root).class_to_shape, 
            GTSRB(root=root).class_to_color
        ),
        "Logic+": LogicOnlyDetector(
            label_net, shape_net, 
            color_net, 
            GTSRB(root=root).class_to_shape, 
            GTSRB(root=root).class_to_color,
            sign_net=shield_net,
        ),
        "LogicOOD+": SemanticDetector(
            label_net, 
            shape_net, 
            color_net, 
            GTSRB(root=root).class_to_shape, 
            GTSRB(root=root).class_to_color, 
            sign_net=shield_net
        ),
        "LogicOOD": SemanticDetector(
                label_net, 
                shape_net, 
                color_net, 
                GTSRB(root=root).class_to_shape, 
                GTSRB(root=root).class_to_color, 
        ),
        "Ensemble": EnsembleDetector(label_net, shape_net, color_net)
    }
    
    for detector_name, detector in detectors.items():
        print(f"Fitting {detector_name}")
        try:
            detector.fit(loader_train, device=device)
        except TypeError:
            detector.fit(loader_train)
        
    datasets = {d.__name__: d for d in (LSUNCrop, LSUNResize, Textures, TinyImageNetCrop, TinyImageNetResize)}
    
    for detector_name, detector in detectors.items():
        for data_name, dataset_c in datasets.items():
            data_out = dataset_c(root=root, transform=trans, target_transform=ToUnknown(), download=True)
            loader = DataLoader(data_in+data_out, batch_size=64, shuffle=False, worker_init_fn=seed_worker)
            
            scores = []
            ys = []
            
            with torch.no_grad():
                for x, y in loader:
                    scores.append(detector(x.to(device)))
                    ys.append(y.to(device))
                    
                scores = torch.cat(scores, dim=0).cpu()
                ys = torch.cat(ys, dim=0).cpu()
            
            metrics = OODMetrics()
            metrics.update(scores, ys)
            r = metrics.compute()
            r.update({
                "Method": detector_name,
                "Dataset": data_name
            })
            print(r)
            results.append(r)
    
    return results 

In [9]:
def evaluate_acc(net, att_idx=0, oe=False):
    _ = net.eval()
    
    if oe:
        target_trans = lambda y: torch.tensor(1)
    else:
         target_trans = lambda y: y[att_idx]

    trans = Compose([Resize(size=(32, 32)), ToRGB(), ToTensor()])
    data_in = GTSRB(root=root, train=False, transforms=trans, target_transform=target_trans)
    loader = DataLoader(data_in, batch_size=64, shuffle=False, worker_init_fn=seed_worker)
            
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = net(inputs)
            predicted = outputs.max(dim=1).indices
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return correct / total  

def evaluate_accs(label_net, shape_net, color_net, shield_net):
    r = {}
    names = ("Label", "Color", "Shape",)
    
    for n, net in enumerate((label_net, color_net, shape_net)): 
        acc = evaluate_acc(net, n)
        r[names[n]] = acc
    
    acc = evaluate_acc(shield_net, oe=True)
    r["Sign"] = acc
    
    return [r] 

In [10]:
results = []
results_acc = []

n_epochs = 20

for trial in range(10):
    # TODO: add monkeypatching 
    shield_net = train_sign_model(n_epochs=n_epochs)
    shape_net = train_model(att_index=2, num_classes=5, n_epochs=n_epochs)
    color_net = train_model(att_index=1, num_classes=4, n_epochs=n_epochs)
    label_net = train_model(att_index=0, num_classes=43, n_epochs=n_epochs)
    
    res = evaluate(label_net, shape_net, color_net, shield_net)
    res_acc = evaluate_accs(label_net, shape_net, color_net, shield_net)
    
    for r in res:
        r.update({"Seed": trial})
        
    for r in res_acc:
        r.update({"Seed": trial})
    
    results += res
    results_acc += res_acc
    
    del shield_net
    del shape_net
    del color_net
    del label_net
    
    torch.cuda.empty_cache()

RuntimeError: CUDA unknown error - this may be due to an incorrectly set up environment, e.g. changing env variable CUDA_VISIBLE_DEVICES after program start. Setting the available devices to be zero.

In [None]:
import pandas as pd 
result_df = pd.DataFrame(results)
mean = (result_df.groupby(by=["Method", "Seed"]).mean() * 100).groupby("Method").agg(["mean", "sem"])[["AUROC", "AUPR-IN", "AUPR-OUT", "FPR95TPR"]]
mean
print(mean.sort_values(by=("AUROC", "mean")).sort_values(by=("AUROC", "mean")).to_latex(float_format="%.2f"))

In [None]:
from scipy.stats import ttest_ind

sem_auroc = result_df[result_df["Method"] == "Semantic"].groupby(by=["Method", "Seed"]).mean()["AUROC"]
sem_ensemble =  result_df[result_df["Method"] == "Ensemble"].groupby(by=["Method", "Seed"]).mean()["AUROC"]

print(ttest_ind(sem_auroc, sem_ensemble, equal_var=False))

In [None]:
print((pd.DataFrame(results_acc) * 100).agg(["mean", "sem"]).to_latex(float_format="%.2f"))

In [None]:
(result_df.groupby(by=["Method", "Seed"]).mean() * 100).groupby("Method").agg(["mean", "sem"])

In [None]:
pd.DataFrame(results_acc).agg(["mean", "sem"])
# dataset gives np.array([label, color, shape])

# results_acc = evaluate_accs(label_net, shape_net, color_net, shield_net)
# print(results_acc)

# Eval

In [None]:
def ood_label(x):
    if x == True:
        return "Normal"
    else:
        return "Anomaly"

In [None]:
trans = Compose([Resize(size=(32, 32)), ToRGB(), ToTensor()])

In [None]:
from pytorch_ood.utils import OODMetrics, ToRGB, ToUnknown
from pytorch_ood.dataset.img import Textures

In [None]:
from tqdm.notebook import tqdm 
test_in_data = GTSRB(root=root, train=False, transforms=trans, target_transform=lambda y: y[0])

detector = MaxSoftmax(label_net)

sem_detector = SemanticDetector(
    label_net, 
    shape_net, 
    color_net, 
    GTSRB(root=root).class_to_shape, 
    GTSRB(root=root).class_to_color, 
    sign_net=shield_net
)

scores = []
my_scores = []
ys = []
xs = []
ys_hat = []

metrics = OODMetrics()

datasets = (LSUNCrop, LSUNResize, Textures, TinyImageNetCrop, TinyImageNetResize)
datasets = [c(root=root, transform=trans, target_transform=ToUnknown(), download=True) for c in datasets]
loaders = [DataLoader(d, batch_size=128, worker_init_fn=seed_worker) for d in datasets]
loaders.append( DataLoader(test_in_data, batch_size=128, worker_init_fn=seed_worker))

with torch.no_grad():
     for loader in loaders:
        for x, y in tqdm(loader):
            x = x.to(device)
            scores.append(detector(x))
            ys_hat.append(label_net(x).softmax(dim=1).max(dim=1).indices) 
            my_scores.append(sem_detector(x))

            ys.append(y)
            xs.append(x.cpu())


scores = torch.cat(scores, dim=0).cpu()
ys = torch.cat(ys, dim=0).cpu()
ys_hat = torch.cat(ys_hat, dim=0).cpu()
my_scores = torch.cat(my_scores, dim=0).cpu()
xs = torch.cat(xs, dim=0).cpu()


metrics = OODMetrics()
metrics.update(my_scores, ys)
print(metrics.compute())

In [None]:
scores

In [None]:
y

In [None]:
from pytorch_ood.utils import is_unknown


print(xs.shape)
print(my_scores.shape)
print(ys_hat.shape)
print(ys.shape)
print(scores.shape)

print(is_known(ys).sum())
print(is_unknown(ys).sum())


In [None]:
from pandas import DataFrame

df_1 = DataFrame()
df_1["Scores"] = scores.cpu().numpy()
df_1["Labels"] = ys >= 0
df_1["Labels"] = df_1["Labels"].apply(ood_label)
df_1["Method"] = "Implicit"

sb.histplot(data=df_1, x="Scores", hue="Labels", common_norm=False, stat="probability", bins=30)

# Examples 

In [None]:
# custom_params = {"axes.spines.right": True, "axes.spines.top": True, "axes.spines.bottom": True, "axes.spines.left": True}
import matplotlib.pyplot as plt 
import numpy as np 

sb.set_theme(style="white")

sem_detector = SemanticDetector(
    label_net, 
    shape_net, 
    color_net, 
    GTSRB(root=root).class_to_shape, 
    GTSRB(root=root).class_to_color, 
    sign_net=shield_net
)

dataset = GTSRB(root=root)

top_values, top_indxs = (-scores[ys<0]).topk(50)

# top_values, top_indxs = (- scores[ys<0]).topk(50)
imgs = xs[top_indxs]

for n, img in enumerate(imgs):
    img_batch = img.unsqueeze(0).to(device)
    
    print(img_batch.shape)
    with torch.no_grad():
        l = label_net(img_batch)
        s = shape_net(img_batch)
        c = color_net(img_batch)
        o = shield_net(img_batch)
        
    sign_detected = o.max(dim=1).indices.cpu()
        
    my_score = sem_detector(img_batch)[0]
    
    lindex = l.argmax(dim=1).item()
    sindex = s.argmax(dim=1).item()
    cindex = c.argmax(dim=1).item()
    oindex = o.argmax(dim=1).item()
    
    lname = dataset.class_to_name[lindex -1]
    sname = dataset.shape_to_name[sindex]
    cname = dataset.color_to_name[cindex]
    oname = "Sign" if oindex else "NoSign"
    
    plt.xticks([])
    plt.yticks([])
    plt.imshow(np.moveaxis(img.numpy(), 0, -1))
    plt.suptitle(f"'{lname.title()}' ({l.softmax(dim=1).max().item():.2%}) \n {sname.title()} | {cname.title()} | {oname.title()}" , fontsize=19)
    plt.title(f"Consistent: {'Yes' if abs(my_score) > 0.0 else 'No'}", fontsize=19) # 'Yes' if my_score > 0.0 else 'No'
    plt.tight_layout(pad=0.5)
    plt.savefig(f"img/prediction-example-{n}.pdf", bbox_inches="tight")
    plt.show()

In [None]:
sem_detector = SemanticDetector(
    label_net, 
    shape_net, 
    color_net, 
    GTSRB(root=root).class_to_shape, 
    GTSRB(root=root).class_to_color, 
    sign_net=shield_net
)

top_values, top_indxs = (scores[ys>=0]).topk(50)

for n, i in enumerate([i for i in top_indxs]):
    img = xs[ys>=0][i]
    img_batch = img.unsqueeze(0).to(device)
    
    with torch.no_grad():
        l = label_net(img_batch)
        s = shape_net(img_batch)
        c = color_net(img_batch)
        o = shield_net(img_batch)
    
    # my_score = detector(img_batch)[0]
    my_score = my_scores[ys>=0][i]
    # score = scores[ys>=0][i]
    
    lindex = l.argmax(dim=1).item()
    sindex = s.argmax(dim=1).item()
    cindex = c.argmax(dim=1).item()
    oindex = o.argmax(dim=1).item()
        
    lname = dataset.class_to_name[lindex -1]
    sname = dataset.shape_to_name[sindex]
    cname = dataset.color_to_name[cindex]
    oname = "Sign" if oindex == 1 else "NoSign"
        
    plt.imshow(np.moveaxis(img.numpy(), 0, -1))
    plt.suptitle(f"'{lname.title()}' ({l.softmax(dim=1).max().item():.2%}) \n {sname.title()} | {cname.title()} | {oname}", fontsize=19)
    plt.title(f"Consistent: {'Yes' if abs(my_score) > 0.0 else 'No'}", fontsize=19) # 'Yes' if my_score > 0.0 else 'No'
    plt.xticks([])
    plt.yticks([])
    plt.tight_layout(pad=0.5)
    plt.savefig(f"img/prediction-example-in-{n}.pdf", bbox_inches="tight")
    plt.show()

In [None]:
sem_detector = SemanticDetector(
    label_net, 
    shape_net, 
    color_net, 
    GTSRB(root=root).class_to_shape, 
    GTSRB(root=root).class_to_color, 
    sign_net=shield_net
)

# in-distribution where prediction was not correct and was rejected 
index = (ys>=0) & (ys != ys_hat.cpu()) & (my_scores.abs() == 0.0)
if index.sum().item() > 0:
    top_values, top_indxs = (-scores[index]).topk(1)

    for n, i in enumerate([i for i in top_indxs]):
        img = xs[index][i]
        img_batch = img.unsqueeze(0).to(device)

        print(img_batch.shape)
        with torch.no_grad():
            l = label_net(img_batch)
            s = shape_net(img_batch)
            c = color_net(img_batch)
            o = shield_net(img_batch)

        my_score = sem_detector(img_batch)[0]

        lindex = l.argmax(dim=1).item()
        sindex = s.argmax(dim=1).item()
        cindex = c.argmax(dim=1).item()

        lname = dataset.class_to_name[lindex -1]
        sname = dataset.shape_to_name[sindex]
        cname = dataset.color_to_name[cindex]
        oname = "Sign" if oindex == 1 else "NoSign"

        plt.imshow(np.moveaxis(img.numpy(), 0, -1))
        plt.suptitle(f"'{lname.title()}' ({l.softmax(dim=1).max().item():.2%}) \n {sname.title()} | {cname.title()} | {oname.title()}", fontsize=19)
        plt.title(f"Consistent: {'Yes' if abs(my_score) > 0.0 else 'No'}", fontsize=19) # 'Yes' if my_score > 0.0 else 'No'
        plt.tight_layout(pad=0.5)
        plt.xticks([])
        plt.yticks([])
        plt.savefig(f"img/prediction-example-in-error-{n}.pdf", bbox_inches="tight")
        plt.show()

In [None]:

# in-distribution where prediction was not correct and was accepted 
index = (ys>=0) & (ys == ys_hat.cpu()) & (my_scores.abs() >= 0.0)
if index.sum().item() > 0:
    top_values, top_indxs = (-scores[index]).topk(10)

    for n, i in enumerate([i for i in top_indxs]):
        img = xs[index][i]
        img_batch = img.unsqueeze(0).to(device)

        print(img_batch.shape)
        with torch.no_grad():
            l = label_net(img_batch)
            s = shape_net(img_batch)
            c = color_net(img_batch)
            o = shield_net(img_batch)

        my_score = sem_detector(img_batch)[0]

        lindex = l.argmax(dim=1).item()
        sindex = s.argmax(dim=1).item()
        cindex = c.argmax(dim=1).item()

        lname = dataset.class_to_name[lindex]
        sname = dataset.shape_to_name[sindex]
        cname = dataset.color_to_name[cindex]
        oname = "Sign" if oindex == 1 else "NoSign"

        plt.imshow(np.moveaxis(img.numpy(), 0, -1))
        plt.suptitle(f"'{lname.title()}' ({l.softmax(dim=1).max().item():.2%}) \n {sname.title()} | {cname.title()} | {oname.title()}")
        plt.title(f"Consistent: {'Yes' if abs(my_score) > 0.0 else 'No'}") # 'Yes' if my_score > 0.0 else 'No'
        plt.tight_layout(pad=0.5)
        plt.xticks([])
        plt.yticks([])
        plt.savefig(f"img/prediction-example-in-correct-{n}.pdf", bbox_inches="tight")
        plt.show()

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import seaborn as sb 

# ys_hat_anom = ys_hat.clone()
# ys_hat_anom[my_scores > -0.99] = 44 

# fig, ax = plt.subplots(figsize=(20,20))
# m = confusion_matrix(ys[ys >= 0], ys_hat_anom[ys >= 0])

# for i in range(m.shape[0]):
#     m[i,i] = 0
    
# disp = ConfusionMatrixDisplay(m, display_labels=list(dataset.class_to_name.values()) + ["Anomaly"])
# disp.plot(ax=ax, xticks_rotation="vertical", colorbar=False)

# plt.tight_layout(pad=0)

# plt.savefig("img/confusion.pdf")
# plt.savefig("img/confusion.jpg", dpi=300, bbox_inches="tight")

In [None]:
msp = MaxSoftmax(label_net)
semantic_detector = SemanticDetector(
                label_net, 
                shape_net, 
                color_net, 
                GTSRB(root=root).class_to_shape, 
                GTSRB(root=root).class_to_color)

In [None]:
from pytorch_ood.utils import TensorBuffer

data_in = GTSRB(root=root, train=False, transforms=trans, target_transform=lambda y: y[0])
data_out = Textures(root=root, transform=trans, target_transform=ToUnknown(), download=True)

loader = DataLoader(data_in + data_out, shuffle=False, batch_size=128)

buffer = TensorBuffer()

with torch.no_grad():
    for x, y in loader:
        scores_msp = msp(x.to(device))
        buffer.append("msp", scores_msp)

        scores_sem = semantic_detector(x.to(device))
        buffer.append("sem", scores_sem)

        buffer.append("y", y)