In [21]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

from sklearn.metrics import accuracy_score
from torchvision import models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision.transforms import transforms


In [22]:
class ZeroShotDefectClassifier(nn.Module):
    def __init__(self, num_attributes, num_classes):
        super(ZeroShotDefectClassifier, self).__init__()
        self.features = models.resnet50(pretrained=True)
        self.features.fc = nn.Identity()  
        self.fc = nn.Linear(2048, num_classes)

    def forward(self, x):
        x = self.features(x)
        x = self.fc(x)
        return x


In [23]:
# Directory path and the transforms setting
data_dir = 'D:/MvTec/mvtec_anomaly_detection'
class_name = 'bottle'
class_dir = os.path.join(data_dir, class_name)
train_dir = os.path.join(class_dir, 'Train')
test_dir = os.path.join(class_dir, 'Test')
ground_truth_dir = os.path.join(class_dir, 'ground_truth')

transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [24]:
# Dataset and dataloader for training data
train_dataset = ImageFolder(train_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

In [25]:
# Define the model and move it to the appropriate device
num_attributes = 3
num_classes = len(train_dataset.classes)
model = ZeroShotDefectClassifier(num_attributes, num_classes)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


ZeroShotDefectClassifier(
  (features): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequ

In [26]:
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [27]:
# Training loop
num_epochs = 10
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i + 1) % 10 == 0:
            print(f"Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{total_step}], Loss: {loss.item():.4f}")

print("Training completed.")


Training completed.


In [28]:
# Define the path to save the trained weights
save_path = 'D:/MvTec/mvtec_anomaly_detection/bottle/model_weights.pth'

# Save the trained model's state dictionary
torch.save(model.state_dict(), save_path)

print(f"Trained weights saved at: {save_path}")

Trained weights saved at: D:/MvTec/mvtec_anomaly_detection/bottle/model_weights.pth


In [64]:
correct_predictions = 0
true_positives = 0
predicted_positives = 0

# Iterate over the test dataset
for images, labels in test_loader:
    
    images = images.to(device)
    labels = labels.to(device)

    # Forward pass
    outputs = model(images)
    _, predicted = torch.max(outputs, 1)

    # Calculate accuracy
    correct_predictions += torch.sum(predicted == labels).item()

    # Calculate precision
    true_positives += torch.sum((predicted == labels) & (predicted == 1)).item()
    predicted_positives += torch.sum(predicted == 1).item()

    accuracy = correct_predictions / len(test_dataset)
    if predicted_positives != 0:
        precision = (true_positives) / (predicted_positives)
    else:
        precision = 0.0
    

print("Accuracy: {:.4f}".format(accuracy))
print("Precision: {:.4f}".format(precision))


Accuracy: 0.2410
Precision: 0.0000


In [65]:
#Inference
from PIL import Image

# Load the trained model

model.load_state_dict(torch.load('D:/MvTec/mvtec_anomaly_detection/bottle/model_weights.pth'))
model.eval()

# Set up image transformation
transform = transforms.Compose([transforms.Resize((256, 256)), transforms.ToTensor()])
  

# Load and preprocess the test image
image = Image.open('D:/MvTec/mvtec_anomaly_detection/bottle/test/broken_large/000.png')
image = transform(image)
image = image.unsqueeze(0)  

# Perform inference
with torch.no_grad():
    output = model(image)

# Process the output to detect defects
defect_threshold = 0.5 
defect_probability = output[0].item()  
if defect_probability > defect_threshold:
    print("Defect detected!")
else:
    print("No defect detected.")



No defect detected.


In [49]:
batch_size = 32
test_dir = os.path.join(data_dir, "bottle", "test")
ground_truth_dir = os.path.join(data_dir, "bottle", "ground_truth")
# Load the test dataset
test_dataset = ImageFolder(test_dir, transform=transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size, shuffle=False)


In [66]:
# Define the defect labels
defect_labels = ['broken_large', 'broken_small', 'contamination']  

# Initialize variables for counting correct predictions
correct_predictions_by_class = {label: 0 for label in defect_labels}
total_samples_by_class = {label: 0 for label in defect_labels}

# Iterate over the test data and labels
for images, labels in test_loader:
    images = images.to(device)
    labels = labels.to(device)

    # Forward pass
    outputs = model(images)
    _, predicted = torch.max(outputs, 1)

    # Count correct predictions for defect classes
    for defect_class in defect_labels:
        defect_samples = (labels == test_dataset.class_to_idx[defect_class]).sum().item()
        correct_predictions_by_class[defect_class] += ((predicted == labels) & (labels == test_dataset.class_to_idx[defect_class])).sum().item()
        total_samples_by_class[defect_class] += defect_samples

# precision for defect classes
precision_by_class = {label: correct_predictions_by_class[label] / total_samples_by_class[label] for label in defect_labels}

# Overall accuracy and precision
total_samples = sum(total_samples_by_class.values())
correct_predictions = sum(correct_predictions_by_class.values())
accuracy = correct_predictions / total_samples


In [67]:
# Recall
recall_by_class = {}
for label in defect_labels:
    if total_samples_by_class[label] != 0:
        recall_by_class[label] = correct_predictions_by_class[label] / total_samples_by_class[label]
    else:
        recall_by_class[label] = 0.0

# F1 Score
f1_by_class = {}
for label in defect_labels:
    if precision_by_class[label] + recall_by_class[label] != 0:
        f1_by_class[label] = 2 * (precision_by_class[label] * recall_by_class[label]) / (precision_by_class[label] + recall_by_class[label])
    else:
        f1_by_class[label] = 0.0

# Calculate overall metrics
precision = sum(precision_by_class.values()) / len(defect_labels)
recall = sum(recall_by_class.values()) / len(defect_labels)
f1 = sum(f1_by_class.values()) / len(defect_labels)

# Print the metrics
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)


Accuracy: 0.31746031746031744
Precision: 0.3333333333333333
Recall: 0.3333333333333333
F1 Score: 0.3333333333333333
