# SECTION 1: SETUP & ENVIRONMENT

## 1.2 Install Dependencies

In [1]:
import subprocess
import sys

# Install required packages
packages = [
    'torch',
    'torchvision',
    'pytorch-lightning',
    'anomalib',
    'opencv-python',
    'numpy',
    'pillow',
    'matplotlib',
    'scikit-learn'
]

print("Installing packages...")
for package in packages:
    try:
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', package])
        print(f"  âœ“ {package}")
    except:
        print(f"  âš  {package} (may already be installed)")

print("\nâœ“ All packages installed!")

Installing packages...
  âœ“ torch
  âœ“ torchvision
  âœ“ pytorch-lightning
  âœ“ anomalib
  âœ“ opencv-python
  âœ“ numpy
  âœ“ pillow
  âœ“ matplotlib
  âœ“ scikit-learn

âœ“ All packages installed!


## 1.3 Import Libraries & Set Paths

In [2]:
import os
import cv2
import numpy as np
import torch
import torchvision
import json
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc, roc_auc_score, confusion_matrix
from torchvision import transforms
import warnings
warnings.filterwarnings('ignore')

# Import Anomalib
from anomalib.models import Patchcore
from anomalib.data import Folder
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

# Device
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"âœ“ Device: {DEVICE}")
if torch.cuda.is_available():
    print(f"  GPU: {torch.cuda.get_device_name(0)}")

# Set paths
DATASET_LOCATION = Path('.')  # Local path
DATASET_ROOT = DATASET_LOCATION / 'dataset'
TRAIN_GOOD = DATASET_ROOT / "train" / "good"
TEST_GOOD = DATASET_ROOT / "test" / "good"
TEST_DEFECT = DATASET_ROOT / "test" / "defect"
CHECKPOINT_DIR = Path("checkpoints")
CHECKPOINT_DIR.mkdir(exist_ok=True)
EVALUATION_DIR = Path("evaluation_results")
EVALUATION_DIR.mkdir(exist_ok=True)
INFERENCE_INPUT = Path("inference_images")
INFERENCE_INPUT.mkdir(exist_ok=True)
INFERENCE_OUTPUT = Path("inference_results")
INFERENCE_OUTPUT.mkdir(exist_ok=True)

print(f"\nâœ“ Paths configured:")
print(f"  Dataset: {DATASET_ROOT.absolute()}")
print(f"  Checkpoints: {CHECKPOINT_DIR.absolute()}")
print(f"  Evaluation: {EVALUATION_DIR.absolute()}")

âœ“ Device: cuda
  GPU: Tesla T4

âœ“ Paths configured:
  Dataset: /content/dataset
  Checkpoints: /content/checkpoints
  Evaluation: /content/evaluation_results


## 1.4 Upload Dataset (For Colab Users)

In [6]:
import zipfile

if 'google.colab' in str(get_ipython()):
    from google.colab import files
    print("Please upload your dataset zip file (e.g., 'dataset.zip').")
    uploaded = files.upload()

    if uploaded:
        zip_filename = list(uploaded.keys())[0]
        print(f"\nProcessing uploaded file: {zip_filename}")

        # Ensure DATASET_ROOT exists
        DATASET_LOCATION.mkdir(parents=True, exist_ok=True)

        try:
            with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
                zip_ref.extractall(DATASET_LOCATION)
            print(f"âœ“ Successfully extracted '{zip_filename}' to '{DATASET_LOCATION}'")
        except zipfile.BadZipFile:
            print(f"âœ— Error: '{zip_filename}' is not a valid zip file.")
        except Exception as e:
            print(f"âœ— An error occurred during extraction: {e}")
    else:
        print("âš  No file uploaded.")
else:
    print(f"Please place your dataset zip file in the root directory and extract it into '{DATASET_ROOT.name}' for local execution.")
    print("Example: `unzip dataset.zip -d dataset`")


Please upload your dataset zip file (e.g., 'dataset.zip').


Saving dataset.zip to dataset (1).zip

Processing uploaded file: dataset (1).zip
âœ“ Successfully extracted 'dataset (1).zip' to '.'


---
# SECTION 2: DATASET VALIDATION

## 2.1 Validate Dataset Structure

In [7]:
def validate_dataset():
    """
    Validate dataset structure and count images in each folder.
    """
    results = {}

    for name, path in [("train/good", TRAIN_GOOD),
                        ("test/good", TEST_GOOD),
                        ("test/defect", TEST_DEFECT)]:
        if not path.exists():
            print(f"âœ— Missing: {name}")
            results[name] = 0
        else:
            images = list(path.glob("*.png")) + list(path.glob("*.jpg")) + list(path.glob("*.jpeg"))
            results[name] = len(images)
            print(f"âœ“ {name}: {len(images)} images")

    return results

print("Validating dataset structure...\n")
counts = validate_dataset()

print(f"\n{'='*50}")
print(f"Total training (good): {counts['train/good']}")
print(f"Total test (good): {counts['test/good']}")
print(f"Total test (defect): {counts['test/defect']}")
print(f"Total images: {sum(counts.values())}")
print(f"{'='*50}")

Validating dataset structure...

âœ“ train/good: 76 images
âœ“ test/good: 45 images
âœ“ test/defect: 6 images

Total training (good): 76
Total test (good): 45
Total test (defect): 6
Total images: 127


## 2.2 Check Image Validity

In [8]:
def check_image_validity():
    """
    Verify all images can be loaded and check sizes.
    """
    all_sizes = []
    invalid_count = 0

    for folder in [TRAIN_GOOD, TEST_GOOD, TEST_DEFECT]:
        if not folder.exists():
            continue

        images = list(folder.glob("*.png")) + list(folder.glob("*.jpg")) + list(folder.glob("*.jpeg"))

        for img_path in images:
            try:
                img = cv2.imread(str(img_path))
                if img is None:
                    invalid_count += 1
                else:
                    h, w = img.shape[:2]
                    all_sizes.append((w, h))
            except Exception as e:
                invalid_count += 1

    if all_sizes:
        sizes_array = np.array(all_sizes)
        print(f"Image Validity Check:")
        print(f"  Valid images: {len(all_sizes)}")
        print(f"  Invalid images: {invalid_count}")
        print(f"  Width range: {sizes_array[:, 0].min()} - {sizes_array[:, 0].max()}")
        print(f"  Height range: {sizes_array[:, 1].min()} - {sizes_array[:, 1].max()}")
        print(f"  Mean size: {sizes_array.mean(axis=0).astype(int)}")
    else:
        print("âš  No valid images found!")

print("Checking image validity...\n")
check_image_validity()
print("\nâœ“ Dataset validation complete")

Checking image validity...

Image Validity Check:
  Valid images: 127
  Invalid images: 0
  Width range: 143 - 238
  Height range: 132 - 256
  Mean size: [197 190]

âœ“ Dataset validation complete


---
# SECTION 3: TRAINING

## 3.1 Setup Data Module

In [11]:
print("Setting up data module...\n")

# Configure Anomalib Folder datamodule
datamodule = Folder(
    name="dataset_folder", # Added the missing 'name' argument
    root=str(DATASET_ROOT),
    normal_dir="train/good",  # Train on good images only
    abnormal_dir="test/defect",  # Use defects for validation (optional)
    num_workers=0,  # Set to 0 on Windows/Colab; increase on Linux    seed=42
)

print("âœ“ Data module configured")
print(f"  - Image size: 224x224")
print(f"  - Batch size: 32")
print(f"  - Train/val split: 80/20")

Setting up data module...

âœ“ Data module configured
  - Image size: 224x224
  - Batch size: 32
  - Train/val split: 80/20


## 3.2 Initialize & Train Patchcore

In [13]:
print("Initializing Patchcore model...\n")

# Initialize Patchcore
model = Patchcore(
    backbone="wide_resnet50_2",  # Strong backbone
    layers=["layer2", "layer3"],  # Intermediate layers
    num_neighbors=9,
)

print("âœ“ Patchcore initialized")
print(f"  - Backbone: wide_resnet50_2")
print(f"  - Layers: layer2, layer3")
print(f"  - Num neighbors: 9")

# Setup trainer
print("\nSetting up trainer...\n")

logger = TensorBoardLogger(
    save_dir="logs",
    name="patchcore_training",
    version="v1"
)

checkpoint_callback = ModelCheckpoint(
    dirpath=CHECKPOINT_DIR,
    filename="patchcore-{epoch:02d}",
    monitor="val_anomaly_map_auroc",
    mode="max",
    save_last=True,
    verbose=True
)

trainer = Trainer(
    max_epochs=1,  # Patchcore trains in 1 epoch
    accelerator="gpu" if torch.cuda.is_available() else "cpu",
    devices=1,
    logger=logger,
    callbacks=[checkpoint_callback],
    enable_progress_bar=True,
    enable_model_summary=True
)

print("âœ“ Trainer configured")
print(f"  - Max epochs: 1")
print(f"  - Accelerator: {'GPU' if torch.cuda.is_available() else 'CPU'}")

Initializing Patchcore model...



model.safetensors:   0%|          | 0.00/276M [00:00<?, ?B/s]

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:ðŸ’¡ Tip: For seamless cloud logging and experiment tracking, try installing [litlogger](https://pypi.org/project/litlogger/) to enable LitLogger, which logs metrics and artifacts automatically to the Lightning Experiments platform.


âœ“ Patchcore initialized
  - Backbone: wide_resnet50_2
  - Layers: layer2, layer3
  - Num neighbors: 9

Setting up trainer...

âœ“ Trainer configured
  - Max epochs: 1
  - Accelerator: GPU


## 3.3 Train the Model

In [14]:
print("\n" + "="*60)
print("STARTING TRAINING")
print("="*60 + "\n")

# Train
trainer.fit(model, datamodule=datamodule)

print("\n" + "="*60)
print("âœ“ TRAINING COMPLETE")
print("="*60)

# Save model
model_save_path = CHECKPOINT_DIR / "patchcore_trained.ckpt"
trainer.save_checkpoint(model_save_path)

print(f"\nâœ“ Model saved to: {model_save_path}")
print(f"  Size: {os.path.getsize(model_save_path) / (1024**2):.2f} MB")


STARTING TRAINING



TypeError: `model` must be a `LightningModule` or `torch._dynamo.OptimizedModule`, got `Patchcore`

---
# SECTION 4: EVALUATION

## 4.1 Load Model & Prepare Test Data

In [None]:
print("Loading trained model for evaluation...\n")

# Load model
model = Patchcore.load_from_checkpoint(CHECKPOINT_DIR / "patchcore_trained.ckpt")
model = model.to(DEVICE)
model.eval()

print("âœ“ Model loaded")

# Prepare transform
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])
])

def load_test_images(folder_path):
    images = []
    paths = []
    for ext in ['*.png', '*.jpg', '*.jpeg']:
        for img_path in Path(folder_path).glob(ext):
            try:
                img = Image.open(img_path).convert('RGB')
                img_tensor = test_transform(img)
                images.append(img_tensor)
                paths.append(img_path)
            except:
                pass
    return images, paths

# Load test images
print("Loading test images...")
good_images, good_paths = load_test_images(TEST_GOOD)
defect_images, defect_paths = load_test_images(TEST_DEFECT)

print(f"âœ“ Good images: {len(good_images)}")
print(f"âœ“ Defect images: {len(defect_images)}")

## 4.2 Generate Anomaly Scores

In [None]:
def predict_anomaly_score(model, images, device):
    scores = []
    with torch.no_grad():
        for img in images:
            img_batch = img.unsqueeze(0).to(device)
            output = model.predict(img_batch)

            if isinstance(output, dict):
                score = output.get('anomaly_score', output.get('score', 0.0))
            else:
                score = output.item() if isinstance(output, torch.Tensor) else float(output)

            scores.append(score)
    return np.array(scores)

print("Generating anomaly scores...\n")
good_scores = predict_anomaly_score(model, good_images, DEVICE)
defect_scores = predict_anomaly_score(model, defect_images, DEVICE)

print(f"Good images:")
print(f"  Mean: {good_scores.mean():.4f}, Std: {good_scores.std():.4f}")
print(f"  Min: {good_scores.min():.4f}, Max: {good_scores.max():.4f}")

print(f"\nDefect images:")
print(f"  Mean: {defect_scores.mean():.4f}, Std: {defect_scores.std():.4f}")
print(f"  Min: {defect_scores.min():.4f}, Max: {defect_scores.max():.4f}")

## 4.3 Calculate Metrics & ROC Curve

In [None]:
# Combine labels
y_true = np.concatenate([np.zeros(len(good_scores)), np.ones(len(defect_scores))])
y_scores = np.concatenate([good_scores, defect_scores])

# ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_scores)
roc_auc = auc(fpr, tpr)

# Optimal threshold (Youden's index)
youden_index = tpr - fpr
optimal_idx = np.argmax(youden_index)
optimal_threshold = thresholds[optimal_idx]

print("\n" + "="*60)
print("EVALUATION METRICS")
print("="*60)
print(f"\nROC-AUC Score: {roc_auc:.4f}")
print(f"Optimal Threshold: {optimal_threshold:.4f}")
print(f"\nAt optimal threshold:")
print(f"  True Positive Rate: {tpr[optimal_idx]:.4f}")
print(f"  False Positive Rate: {fpr[optimal_idx]:.4f}")

# Confusion matrix
y_pred = (y_scores >= optimal_threshold).astype(int)
tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0

print(f"\nConfusion Matrix:")
print(f"  True Negatives: {tn}")
print(f"  False Positives: {fp}")
print(f"  False Negatives: {fn}")
print(f"  True Positives: {tp}")
print(f"\nSensitivity (Recall): {sensitivity:.4f}")
print(f"Specificity: {specificity:.4f}")

## 4.4 Visualize Results

In [None]:
# ROC Curve
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Plot 1: ROC Curve
axes[0].plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.4f})')
axes[0].plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random Classifier')
axes[0].scatter(fpr[optimal_idx], tpr[optimal_idx], color='red', s=100, marker='o', label=f'Optimal = {optimal_threshold:.4f}')
axes[0].set_xlabel('False Positive Rate')
axes[0].set_ylabel('True Positive Rate')
axes[0].set_title('ROC Curve - Patchcore Anomaly Detection')
axes[0].legend(loc="lower right")
axes[0].grid(alpha=0.3)

# Plot 2: Score Distribution
axes[1].hist(good_scores, bins=20, alpha=0.7, label='Good', color='green', edgecolor='black')
axes[1].hist(defect_scores, bins=20, alpha=0.7, label='Defect', color='red', edgecolor='black')
axes[1].axvline(optimal_threshold, color='blue', linestyle='--', linewidth=2, label=f'Threshold = {optimal_threshold:.4f}')
axes[1].set_xlabel('Anomaly Score')
axes[1].set_ylabel('Frequency')
axes[1].set_title('Anomaly Score Distribution')
axes[1].legend()
axes[1].grid(alpha=0.3)

plt.tight_layout()
roc_path = EVALUATION_DIR / "roc_and_distribution.png"
plt.savefig(roc_path, dpi=150)
print(f"âœ“ Evaluation plots saved to: {roc_path}")
plt.show()

## 4.5 Save Evaluation Results

In [None]:
# Save results
results = {
    "model": "Patchcore (wide_resnet50_2)",
    "metrics": {
        "roc_auc": float(roc_auc),
        "optimal_threshold": float(optimal_threshold),
        "sensitivity": float(sensitivity),
        "specificity": float(specificity),
        "true_positives": int(tp),
        "true_negatives": int(tn),
        "false_positives": int(fp),
        "false_negatives": int(fn)
    },
    "score_statistics": {
        "good_mean": float(good_scores.mean()),
        "good_std": float(good_scores.std()),
        "defect_mean": float(defect_scores.mean()),
        "defect_std": float(defect_scores.std())
    },
    "test_set_sizes": {
        "good_images": len(good_scores),
        "defect_images": len(defect_scores)
    }
}

results_path = EVALUATION_DIR / "evaluation_results.json"
with open(results_path, 'w') as f:
    json.dump(results, f, indent=2)

print(f"âœ“ Results saved to: {results_path}")

---
# SECTION 5: INFERENCE

## 5.1 Upload Images for Inference (Colab)

In [None]:
# For Colab users - upload images for inference
if 'google.colab' in str(get_ipython()):
    from google.colab import files
    print("Click 'Choose Files' to upload images for inference")
    uploaded = files.upload()

    # Move uploaded files to inference folder
    for filename in uploaded.keys():
        import shutil
        shutil.move(filename, INFERENCE_INPUT / filename)

    print(f"\nâœ“ {len(uploaded)} images uploaded to {INFERENCE_INPUT}")
else:
    print(f"Place images in: {INFERENCE_INPUT.absolute()}")
    print("Then run the next cells for inference")

## 5.2 Run Inference

In [None]:
# Get inference images
image_extensions = ['*.png', '*.jpg', '*.jpeg', '*.bmp']
image_files = []

for ext in image_extensions:
    image_files.extend(INFERENCE_INPUT.glob(ext))

if not image_files:
    print(f"âš  No images found in {INFERENCE_INPUT}")
    print("Upload images and rerun this cell.")
else:
    print(f"Found {len(image_files)} images for inference\n")

    # Inference function
    def infer_single_image(model, image_path, device, threshold=optimal_threshold):
        try:
            img_pil = Image.open(image_path).convert('RGB')
            img_tensor = test_transform(img_pil).unsqueeze(0).to(device)

            with torch.no_grad():
                output = model.predict(img_tensor)

            if isinstance(output, dict):
                anomaly_score = float(output.get('anomaly_score', output.get('score', 0.0)))
            else:
                anomaly_score = float(output.item()) if isinstance(output, torch.Tensor) else float(output)

            anomaly_score = np.clip(anomaly_score, 0, 1)
            is_anomaly = anomaly_score > threshold
            confidence = abs(anomaly_score - threshold)

            return {
                'image': img_pil,
                'anomaly_score': anomaly_score,
                'is_anomaly': is_anomaly,
                'confidence': confidence,
                'status': 'ANOMALY' if is_anomaly else 'NORMAL'
            }
        except Exception as e:
            print(f"Error processing {image_path}: {e}")
            return None

    # Run inference
    results_inference = []
    for idx, img_path in enumerate(sorted(image_files)):
        print(f"  [{idx+1}/{len(image_files)}] {img_path.name}...")
        result = infer_single_image(model, img_path, DEVICE, optimal_threshold)

        if result:
            result['filename'] = img_path.name
            results_inference.append(result)

    print(f"\nâœ“ Inference complete: {len(results_inference)} images processed")

    # Summary
    normal_count = sum(1 for r in results_inference if not r['is_anomaly'])
    anomaly_count = sum(1 for r in results_inference if r['is_anomaly'])

    print(f"\nðŸ“Š Results:")
    print(f"  Normal: {normal_count}")
    print(f"  Anomalous: {anomaly_count}")

## 5.3 Visualize Inference Results

In [None]:
if results_inference:
    # Visualize
    num_vis = min(12, len(results_inference))
    n_cols = 4
    n_rows = (num_vis + n_cols - 1) // n_cols

    fig, axes = plt.subplots(n_rows, n_cols, figsize=(16, 4*n_rows))
    fig.suptitle('Anomalib Inference Results', fontsize=16, fontweight='bold')

    if num_vis == 1:
        axes = np.array([axes])
    else:
        axes = axes.flatten()

    for idx, result in enumerate(results_inference[:num_vis]):
        ax = axes[idx]
        ax.imshow(result['image'])

        color = 'red' if result['is_anomaly'] else 'green'
        status_text = f"{result['status']}\nScore: {result['anomaly_score']:.4f}\nConf: {result['confidence']:.4f}"

        ax.set_title(status_text, color=color, fontweight='bold', fontsize=11)
        ax.set_xlabel(result['filename'], fontsize=9)
        ax.axis('off')

    for idx in range(num_vis, len(axes)):
        axes[idx].axis('off')

    plt.tight_layout()
    vis_path = INFERENCE_OUTPUT / "inference_visualizations.png"
    plt.savefig(vis_path, dpi=150, bbox_inches='tight')
    print(f"âœ“ Visualizations saved to: {vis_path}")
    plt.show()

    # Save JSON results
    results_json = []
    for result in results_inference:
        results_json.append({
            'filename': result['filename'],
            'anomaly_score': float(result['anomaly_score']),
            'status': result['status'],
            'confidence': float(result['confidence']),
            'threshold': float(optimal_threshold)
        })

    results_json_path = INFERENCE_OUTPUT / "inference_results.json"
    with open(results_json_path, 'w') as f:
        json.dump(results_json, f, indent=2)

    print(f"âœ“ JSON results saved to: {results_json_path}")
else:
    print("No inference results to visualize")

---
# SECTION 6: DOWNLOAD RESULTS (Colab)

## 6.1 Download All Results

In [None]:
if 'google.colab' in str(get_ipython()):
    from google.colab import files
    import shutil

    print("Preparing files for download...\n")

    # Create zip with all results
    shutil.make_archive('anomalib_results', 'zip', '.',
                       base_dir=['checkpoints', 'evaluation_results', 'inference_results', 'logs'])

    print("Downloading results...\n")
    files.download('anomalib_results.zip')

    print("âœ“ Download started!")
    print("\nYour results include:")
    print("  - checkpoints/ (trained model)")
    print("  - evaluation_results/ (metrics & plots)")
    print("  - inference_results/ (inference outputs)")
    print("  - logs/ (training logs)")
else:
    print("Results are saved locally in:")
    print(f"  - {CHECKPOINT_DIR.absolute()}")
    print(f"  - {EVALUATION_DIR.absolute()}")
    print(f"  - {INFERENCE_OUTPUT.absolute()}")

---
# FINAL SUMMARY

In [None]:
print("\n" + "="*70)
print("ðŸŽ‰ ANOMALIB PATCHCORE COMPLETE WORKFLOW - FINISHED")
print("="*70)

print(f"\nðŸ“Š SUMMARY:")
print(f"\n1. DATASET:")
print(f"   Training (good): {counts['train/good']} images")
print(f"   Testing (good): {counts['test/good']} images")
print(f"   Testing (defect): {counts['test/defect']} images")

print(f"\n2. MODEL:")
print(f"   Architecture: Patchcore (wide_resnet50_2)")
print(f"   Backbone: wide_resnet50_2")
print(f"   Layers: layer2, layer3")
print(f"   Mode: Unsupervised (trained on good images only)")

print(f"\n3. EVALUATION METRICS:")
print(f"   ROC-AUC: {roc_auc:.4f}")
print(f"   Optimal Threshold: {optimal_threshold:.4f}")
print(f"   Sensitivity: {sensitivity:.4f}")
print(f"   Specificity: {specificity:.4f}")

print(f"\n4. OUTPUT FILES:")
print(f"   Trained Model: {CHECKPOINT_DIR / 'patchcore_trained.ckpt'}")
print(f"   Evaluation Plots: {EVALUATION_DIR / 'roc_and_distribution.png'}")
print(f"   Metrics JSON: {EVALUATION_DIR / 'evaluation_results.json'}")
print(f"   Inference Results: {INFERENCE_OUTPUT / 'inference_results.json'}")

print(f"\nâœ… All workflows complete!")
print("="*70)