In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import argparse
import random
import warnings
import numpy as np
import datasets.mvtec as mvtec
import torch
import torch.optim as optim
from cnn.efficientnet import EfficientNet as effnet
from cnn.resnet import resnet18 as resnet18
from cnn.resnet import wide_resnet50_2 as wide_resnet50_2
from cnn.vgg import vgg19_bn as vgg19_bn
from datasets.mvtec import MVTecDataset
from torch.utils.data import DataLoader
from utils.cfa import *
from utils.cfa_new import get_feature_extractor, CfaModel
from utils.metric import *
from utils.visualizer import *

warnings.filterwarnings("ignore", category=UserWarning)
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")


def parse_args():
    parser = argparse.ArgumentParser("CFA configuration")
    parser.add_argument("--data_path", type=str, default="../../../datasets/MVTec/")
    parser.add_argument("--save_path", type=str, default="./mvtec_result")
    parser.add_argument("--Rd", type=bool, default=False)
    parser.add_argument("--cnn", type=str, choices=["resnet18", "wide_resnet50_2", "efficientnet_b5", "vgg19_bn"], default="wide_resnet50_2")
    parser.add_argument("--resize", type=int, choices=[224, 256], default=224)
    parser.add_argument("--size", type=int, choices=[224, 256], default=224)
    parser.add_argument("--gamma_c", type=int, default=1)
    parser.add_argument("--gamma_d", type=int, default=1)

    parser.add_argument("--class_name", type=str, default="zipper")
    parser.add_argument(
            "--backbone",
            type=str,
            choices=["resnet18", "wide_resnet50_2", "efficientnet_b5", "vgg19_bn"],
            default="wide_resnet50_2",
        )
    return parser.parse_args(args=[])


seed = 1024
random.seed(seed)
torch.manual_seed(seed)
if use_cuda:
    torch.cuda.manual_seed_all(seed)

args = parse_args()
class_names = mvtec.CLASS_NAMES if args.class_name == "all" else [args.class_name]
class_name = args.class_name

train_dataset = MVTecDataset(
    dataset_path=args.data_path,
    class_name=class_name,
    resize=args.resize,
    cropsize=args.size,
    is_train=True,
    wild_ver=args.Rd,
)

test_dataset = MVTecDataset(
    dataset_path=args.data_path,
    class_name=class_name,
    resize=args.resize,
    cropsize=args.size,
    is_train=False,
    wild_ver=args.Rd,
)

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=4,
    pin_memory=True,
    shuffle=True,
    drop_last=True,
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=4,
    pin_memory=True,
)

In [121]:
import pytest
import torch
from einops import rearrange
from torch import nn
from torch.utils.data import DataLoader
from utils.cfa import Descriptor as OldDescriptor

from anomalib.models.cfa.cnn.resnet import wide_resnet50_2
from anomalib.models.cfa.datasets.mvtec import MVTecDataset
from anomalib.models.cfa.torch_model import CfaModel
from anomalib.models.cfa.torch_model import CoordConv2d as NewCoordConv2d
from anomalib.models.cfa.torch_model import Descriptor as NewDescriptor
from anomalib.models.cfa.torch_model import get_feature_extractor
from anomalib.models.cfa.utils.cfa import DSVDD
from anomalib.models.cfa.utils.coordconv import CoordConv2d as OldCoordConv2d
from anomalib.models.components import GaussianBlur2d

device = torch.device("cuda")
train_dataset = MVTecDataset("/home/sakcay/projects/anomalib/datasets/MVTec/", "zipper")
train_loader = DataLoader(train_dataset, 4)

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Compare the feature extractor
_, (input, _, _) = next(enumerate(train_loader))
input = input.cuda()

old_feature_extractor = wide_resnet50_2(pretrained=True, progress=True).to(device)
old_feature_extractor.eval()

new_feature_extractor = get_feature_extractor("wide_resnet50_2", device=torch.device("cuda"))

old_features = old_feature_extractor(input)
new_features = new_feature_extractor(input)
new_features = [val for val in new_features.values()]

for old_feature, new_feature in zip(old_features, new_features):
    assert torch.allclose(old_feature, new_feature, atol=1e-1), "Old and new features should match."
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Compute the memory bank.
cfa_old = DSVDD(old_feature_extractor, train_loader, "wide_resnet50_2", 1, 1, device).to(device)
cfa_new = CfaModel(train_loader, "wide_resnet50_2", 1, 1, device).to(device)
assert torch.allclose(cfa_old.C, cfa_new.memory_bank, atol=1e-1)
assert cfa_new.memory_bank.requires_grad is False
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Compute Forward-Pass.
old_loss, old_score = cfa_old(old_features)

cfa_new.train()
new_loss = cfa_new(input)

assert (old_loss - new_loss).abs() / 1000 < 1e-1, "Old and new losses should match."
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #


# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Compute Anomaly Map.
heatmaps = None
old_loss, old_score = cfa_old(old_features)
heatmap = old_score.cpu().detach()
heatmap = torch.mean(heatmap, dim=1)
heatmaps = torch.cat((heatmaps, heatmap), dim=0) if heatmaps != None else heatmap

heatmaps = upsample(heatmaps, size=input.size(2), mode="bilinear")
heatmaps = gaussian_smooth(heatmaps, sigma=4)
heatmaps = torch.tensor(heatmaps).cuda().unsqueeze(1)

cfa_new.train()
new_loss = cfa_new(input)

cfa_new.eval()
new_heatmaps = cfa_new(input)
assert torch.allclose(heatmaps, new_heatmaps, atol=1e-1), "Old and new heatmaps should match."

100%|██████████| 60/60 [00:08<00:00,  7.39it/s]
100%|██████████| 60/60 [00:08<00:00,  7.35it/s]


In [110]:
test_imgs = list()
gt_mask_list = list()
gt_list = list()
heatmaps1 = None

cfa_old.eval()
for x, y, mask in test_loader:
    test_imgs.extend(x.cpu().detach().numpy())
    # gt_list.extend(y.cpu().detach().numpy())
    # gt_mask_list.extend(mask.cpu().detach().numpy())

    p = old_feature_extractor(x.to(device))
    _, score = cfa_old(p)
    heatmap1 = score.cpu().detach()
    heatmap1 = torch.mean(heatmap1, dim=1)
    heatmaps1 = torch.cat((heatmaps1, heatmap1), dim=0) if heatmaps1 != None else heatmap1

heatmaps1 = upsample(heatmaps1, size=x.size(2), mode="bilinear")
heatmaps1 = gaussian_smooth(heatmaps1, sigma=4)

gt_mask = np.asarray(gt_mask_list)
heatmaps1 = rescale(heatmaps1)

test_imgs = list()
gt_mask_list = list()
gt_list = list()
heatmaps = None

cfa_old.eval()
i, (x, y, mask) = next(enumerate(test_loader))
test_imgs.extend(x.cpu().detach().numpy())
# gt_list.extend(y.cpu().detach().numpy())
# gt_mask_list.extend(mask.cpu().detach().numpy())

p = old_feature_extractor(x.to(device))
_, score = cfa_old(p)
heatmap = score.cpu().detach()
heatmap = torch.mean(heatmap, dim=1)
heatmaps = torch.cat((heatmaps, heatmap), dim=0) if heatmaps != None else heatmap

heatmaps = upsample(heatmaps, size=x.size(2), mode="bilinear")
heatmaps = gaussian_smooth(heatmaps, sigma=4)

gt_mask = np.asarray(gt_mask_list)
# heatmaps = rescale(heatmaps)

In [None]:
heatmap = score.cpu().detach()
heatmap = torch.mean(heatmap, dim=1)
heatmaps = torch.cat((heatmaps, heatmap), dim=0) if heatmaps != None else heatmap

heatmaps = upsample(heatmaps, size=x.size(2), mode="bilinear")
heatmaps = gaussian_smooth(heatmaps, sigma=4)

gt_mask = np.asarray(gt_mask_list)
# heatmaps = rescale(heatmaps)

In [111]:
cfa_new.eval()
new_heatmap = cfa_new(x.cuda())
new_heatmap.shape

torch.Size([4, 1, 224, 224])

In [120]:
torch.allclose(torch.tensor(heatmaps1).unsqueeze(1).cuda(), new_heatmap, atol=1e-1)

True