In [17]:
!pip install torch torchvision
!pip install anomalib
!pip install matplotlib numpy scikit-learn
!pip install lightning
!pip install kornia
!pip install FrEIA
!pip install python-dotenv
!pip install open_clip_torch


MVTec dataset downloaded and extracted.


FileNotFoundError: Path does not exist: /content/mvtec_anomaly_detection/bottle

In [1]:
!pip install -U anomalib



In [35]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from anomalib.data import MVTec
from anomalib.models import Padim
import numpy as np
from sklearn.metrics import roc_auc_score, average_precision_score
from torch.utils.data import Subset
import random

# Set random seed for reproducibility
torch.manual_seed(42)
random.seed(42)

# Custom Compose class to handle both image and mask
class CustomCompose:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, mask):
        for t in self.transforms:
            if isinstance(image, torch.Tensor):
                if t.__class__.__name__ == 'Resize':
                    image = t(image.unsqueeze(0)).squeeze(0)
                else:
                    image = t(image)
            else:
                image = t(image)
        return image, mask

# Define custom transforms
transform = CustomCompose([
    transforms.Resize((64, 64)),  # Reduced image size
])

# Initialize the MVTec dataset
datamodule = MVTec(
    root="./",
    category="bottle",
    image_size=(64, 64),  # Reduced image size
    train_batch_size=64,  # Increased batch size
    eval_batch_size=64,
    num_workers=2,
    transform=transform
)

# Setup the datamodule
datamodule.setup()

# Function to create a subset of the data
def create_subset(dataset, fraction=0.1):
    num_samples = int(len(dataset) * fraction)
    indices = random.sample(range(len(dataset)), num_samples)
    return Subset(dataset, indices)

# Create subsets of the data
train_subset = create_subset(datamodule.train_data, fraction=0.1)
test_subset = create_subset(datamodule.test_data, fraction=0.1)

# Create new dataloaders with the subsets
train_loader = torch.utils.data.DataLoader(
    train_subset, batch_size=64, shuffle=True, num_workers=2
)
test_loader = torch.utils.data.DataLoader(
    test_subset, batch_size=64, shuffle=False, num_workers=2
)

# Initialize the Padim model
model = Padim()

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Define loss function
criterion = nn.MSELoss()

# Training function
def train_model(model, train_loader, optimizer, criterion, num_epochs=2):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        for batch in train_loader:
            inputs = batch["image"].to(device)
            optimizer.zero_grad()
            outputs = model(inputs)

            # Debugging: print output details
            print(f"Output Type: {type(outputs)}")
            if isinstance(outputs, dict):
                print(f"Output Keys: {outputs.keys()}")
                if "anomaly_map" in outputs:
                    anomaly_map = outputs["anomaly_map"]
                    print(f"Anomaly Map shape: {anomaly_map.shape}")

                    # Create a target tensor of zeros with the same shape as the anomaly map
                    target = torch.zeros_like(anomaly_map, device=device)

                    # Compute the loss
                    loss = criterion(anomaly_map, target)
                    loss.backward()
                    optimizer.step()
                else:
                    print("Error: 'anomaly_map' not found in model outputs.")
                    continue
            elif isinstance(outputs, torch.Tensor):
                print(f"Single tensor output shape: {outputs.shape}")
                anomaly_map = outputs
                target = torch.zeros_like(anomaly_map, device=device)
                loss = criterion(anomaly_map, target)
                loss.backward()
                optimizer.step()
            else:
                print("Unexpected output structure.")
                continue

        print(f"Epoch {epoch + 1}/{num_epochs} completed, Loss: {loss.item()}")

# Evaluation function
def evaluate_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in test_loader:
            inputs = batch["image"].to(device)
            labels = batch["label"].to(device)
            outputs = model(inputs)

            # Debugging outputs
            print(f"Batch inputs shape: {inputs.shape}")
            print(f"Labels shape: {labels.shape}")

            if isinstance(outputs, dict) and "anomaly_map" in outputs:
                preds = outputs["anomaly_map"].cpu().numpy()
                print(f"Predictions shape: {preds.shape}")
            elif isinstance(outputs, torch.Tensor):
                preds = outputs.cpu().numpy()
                print(f"Single tensor predictions shape: {preds.shape}")
            else:
                print("Unexpected output structure or 'anomaly_map' not found.")
                continue  # Skip this iteration if outputs are not as expected

            if preds.size == 0 or labels.size == 0:
                print("Warning: Predictions or labels are empty.")
                continue

            all_preds.extend(preds.flatten())
            all_labels.extend(labels.cpu().numpy())

    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)

    if all_labels.size == 0 or all_preds.size == 0:
        print("Error: The predictions or labels array is empty.")
        return None, None

    roc_auc = roc_auc_score(all_labels, all_preds)
    avg_precision = average_precision_score(all_labels, all_preds)

    return roc_auc, avg_precision

# Train the model
train_model(model, train_loader, optimizer, criterion)

# Evaluate the model
roc_auc, avg_precision = evaluate_model(model, test_loader)

if roc_auc is not None and avg_precision is not None:
    print(f"ROC AUC Score: {roc_auc:.4f}")
    print(f"Average Precision Score: {avg_precision:.4f}")
else:
    print("Evaluation failed due to empty predictions or labels.")

# Save the trained model
torch.save(model.state_dict(), 'padim_model_quick.pth')

# Example inference
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
sample_input = torch.randn(1, 3, 64, 64).to(device)
with torch.no_grad():
    prediction = model(sample_input)
    anomaly_score = prediction["anomaly_map"].max().item()

print(f"Anomaly Score: {anomaly_score:.4f}")


Output Type: <class 'torch.Tensor'>
Single tensor output shape: torch.Size([20, 100, 16, 16])


RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn