In [None]:
import os
import sys
import warnings
warnings.filterwarnings('ignore')
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from collections import defaultdict
import json
try:
    import numpy as np
except ImportError:
    os.system(f"{sys.executable} -m pip install -q numpy")
    import numpy as np
try:
    from tqdm.auto import tqdm
except ImportError:
    os.system(f"{sys.executable} -m pip install -q tqdm")
    from tqdm.auto import tqdm
try:
    import cv2
except ImportError:
    os.system(f"{sys.executable} -m pip install -q opencv-python")
    import cv2
try:
    from PIL import Image
except ImportError:
    os.system(f"{sys.executable} -m pip install -q Pillow")
    from PIL import Image
try:
    import matplotlib
    import matplotlib.pyplot as plt
except ImportError:
    os.system(f"{sys.executable} -m pip install -q matplotlib")
    import matplotlib
    import matplotlib.pyplot as plt
HAS_PANDAS = False
try:
    import pandas as pd
    HAS_PANDAS = True
except (ImportError, ValueError):
    print(f"Pandas unavailable (optional)")
HAS_SEABORN = False
if HAS_PANDAS:
    try:
        import seaborn as sns
        sns.set_style("whitegrid")
        HAS_SEABORN = True
    except (ImportError, ValueError):
        print(f"Seaborn unavailable (optional)")
if not HAS_SEABORN:
    plt.style.use('default')
    plt.rcParams['figure.facecolor'] = 'white'
    plt.rcParams['axes.grid'] = True
    plt.rcParams['grid.alpha'] = 0.3
try:
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torchvision import transforms
    from torch.utils.data import Dataset, DataLoader
except ImportError:
    os.system(f"{sys.executable} -m pip install -q torch torchvision")
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torchvision import transforms
    from torch.utils.data import Dataset, DataLoader
if torch.backends.mps.is_available():
    device = torch.device("mps")
    device_name = "Apple Silicon GPU (MPS)"
elif torch.cuda.is_available():
    device = torch.device("cuda")
    device_name = f"CUDA GPU ({torch.cuda.get_device_name(0)})"
else:
    device = torch.device("cpu")
    device_name = "CPU"
print(f"Compute device: {device_name}")
MVTEC_ROOT = Path("MVTecAD")
OUTPUT_DIR = Path("outputs")
MODELS_DIR = OUTPUT_DIR / "models"
PSEUDO_LABELS_DIR = OUTPUT_DIR / "pseudo_labels"
RESULTS_DIR = OUTPUT_DIR / "results"
for dir_path in [OUTPUT_DIR, MODELS_DIR, PSEUDO_LABELS_DIR, RESULTS_DIR]:
    dir_path.mkdir(parents=True, exist_ok=True)
if not MVTEC_ROOT.exists():
    raise FileNotFoundError(f"MVTec AD dataset not found at {MVTEC_ROOT}")
else:
    print(f"Dataset found: {MVTEC_ROOT.absolute()}")
if not HAS_PANDAS:
    print("\nPandas/Seaborn unavailable")

In [None]:
def analyze_mvtec_dataset(root_path: Path) -> Dict:
    categories = [d.name for d in root_path.iterdir()
                  if d.is_dir() and not d.name.startswith('.')]
    dataset_info = {}
    for cat in sorted(categories):
        cat_path = root_path / cat
        train_good = cat_path / "train" / "good"
        test_path = cat_path / "test"
        info = {
            'train_good': len(list(train_good.glob("*.png"))) if train_good.exists() else 0,
            'test_defects': {}}
        if test_path.exists():
            for defect_type in test_path.iterdir():
                if defect_type.is_dir():
                    info['test_defects'][defect_type.name] = len(list(defect_type.glob("*.png")))
        dataset_info[cat] = info
    return dataset_info
print("MVTecAD Dataset Analysis:\n")
dataset_info = analyze_mvtec_dataset(MVTEC_ROOT)
total_train = sum(info['train_good'] for info in dataset_info.values())
total_test = sum(sum(info['test_defects'].values()) for info in dataset_info.values())
print(f"Total Categories: {len(dataset_info)}")
print(f"Total Train Images: {total_train} (good samples)")
print(f"Total Test Images: {total_test} (all types)")
print("\nPer-Category Breakdown:")
print(f"{'Category':<15} {'Train':<8} {'Test':<8} {'Defect Types'}")
for cat, info in sorted(dataset_info.items()):
    n_defects = len(info['test_defects'])
    n_test = sum(info['test_defects'].values())
    print(f"{cat:<15} {info['train_good']:<8} {n_test:<8} {n_defects}")
print("\n")

In [None]:
class MVTecDataset(Dataset):    
    def __init__(self, root: Path, category: str, split: str = 'train',
                 transform=None, target_size: Tuple[int, int] = (224, 224)):
        self.root = root
        self.category = category
        self.split = split
        self.transform = transform
        self.target_size = target_size
        self.samples = []
        self._load_samples()
    def _load_samples(self):
        cat_path = self.root / self.category / self.split
        if self.split == 'train':
            good_path = cat_path / "good"
            if good_path.exists():
                for img_path in sorted(good_path.glob("*.png")):
                    self.samples.append({
                        'path': img_path,
                        'label': 'good',
                        'is_defect': False})
        else:
            for defect_type in sorted(cat_path.iterdir()):
                if defect_type.is_dir():
                    for img_path in sorted(defect_type.glob("*.png")):
                        self.samples.append({
                            'path': img_path,
                            'label': defect_type.name,
                            'is_defect': defect_type.name != 'good'})
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        sample = self.samples[idx]
        img = Image.open(sample['path']).convert('RGB')
        img = img.resize(self.target_size, Image.BILINEAR)
        if self.transform:
            img = self.transform(img)
        return {
            'image': img,
            'label': sample['label'],
            'is_defect': sample['is_defect'],
            'path': str(sample['path'])}
def get_transforms(size: int = 224):
    return transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])])

In [None]:
class DINOFeatureExtractor:    
    def __init__(self, model_name: str = 'dino_vits16', device='cpu'):
        self.device = device
        self.model_name = model_name
        print(f"\nLoading DINO model: {model_name}")        
        try:
            self.model = torch.hub.load('facebookresearch/dino:main', model_name,
                                       force_reload=False, verbose=False)
            self.model = self.model.to(device)
            self.model.eval()
            for param in self.model.parameters():
                param.requires_grad = False
            patch_size_attr = self.model.patch_embed.patch_size
            if isinstance(patch_size_attr, (list, tuple)):
                self.patch_size = patch_size_attr[0]
            else:
                self.patch_size = patch_size_attr
            self.num_heads = self.model.blocks[0].attn.num_heads
            print(f"Model loaded successfully")            
        except Exception as e:
            raise
    def extract_attention_maps(self, img_tensor: torch.Tensor,
                               layers: List[int] = [-1, -2, -3]) -> Dict[str, torch.Tensor]:
        B, C, H, W = img_tensor.shape
        with torch.no_grad():
            attentions = []
            def hook_fn(module, input, output):
                if isinstance(output, tuple):
                    attn_weights = output[1]
                else:
                    attn_weights = output
                attentions.append(attn_weights)
            hooks = []
            for layer_idx in layers:
                hook = self.model.blocks[layer_idx].attn.register_forward_hook(hook_fn)
                hooks.append(hook)
            features = self.model(img_tensor.to(self.device))
            for hook in hooks:
                hook.remove()
        attention_maps = {}
        h_patches = H // self.patch_size
        w_patches = W // self.patch_size
        for layer_idx, attn in zip(layers, attentions):
            cls_attn = attn[:, :, 0, 1:]
            cls_attn_avg = cls_attn.mean(dim=1)
            attn_map = cls_attn_avg.reshape(B, h_patches, w_patches)
            attention_maps[f'layer_{layer_idx}'] = attn_map
        return {
            'attention_maps': attention_maps,
            'features': features,
            'patch_grid': (h_patches, w_patches)}
    def compute_anomaly_score(self, attention_maps: Dict[str, torch.Tensor],
                             strategy: str = 'hierarchical') -> torch.Tensor:
        maps = list(attention_maps.values())
        if strategy == 'hierarchical':
            weights = [0.5, 0.3, 0.2][:len(maps)]
            score = sum(w * m for w, m in zip(weights, maps))
        elif strategy == 'max':
            score = torch.stack(maps).max(dim=0)[0]
        else:
            score = torch.stack(maps).mean(dim=0)
        return score

In [None]:
class PseudoLabelGenerator:    
    def __init__(self, dino_extractor: DINOFeatureExtractor):
        self.dino = dino_extractor
    def generate_pseudo_labels(self, img: np.ndarray,
                               threshold_percentile: float = 95,
                               min_box_size: int = 20,
                               use_multiscale: bool = True) -> List[Dict]:
        h, w = img.shape[:2]
        transform = get_transforms()
        img_tensor = transform(Image.fromarray(img)).unsqueeze(0)
        pseudo_labels = []
        scales = [1.0, 0.75, 1.25] if use_multiscale else [1.0]
        for scale in scales:
            if scale != 1.0:
                new_h, new_w = int(h * scale), int(w * scale)
                img_scaled = cv2.resize(img, (new_w, new_h))
                img_tensor_scaled = transform(Image.fromarray(img_scaled)).unsqueeze(0)
            else:
                img_tensor_scaled = img_tensor
                new_h, new_w = h, w
            result = self.dino.extract_attention_maps(img_tensor_scaled)
            anomaly_score = self.dino.compute_anomaly_score(
                result['attention_maps'],
                strategy='hierarchical')
            score_np = anomaly_score[0].cpu().numpy()
            threshold = np.percentile(score_np, threshold_percentile)
            mask = (score_np > threshold).astype(np.uint8)
            mask_resized = cv2.resize(mask, (new_w, new_h), interpolation=cv2.INTER_NEAREST)
            num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(
                mask_resized, connectivity=8)
            for i in range(1, num_labels):
                x, y, box_w, box_h, area = stats[i]
                if box_w < min_box_size or box_h < min_box_size:
                    continue
                if scale != 1.0:
                    x = int(x / scale)
                    y = int(y / scale)
                    box_w = int(box_w / scale)
                    box_h = int(box_h / scale)
                x_att = int((x / new_w) * score_np.shape[1])
                y_att = int((y / new_h) * score_np.shape[0])
                w_att = max(1, int((box_w / new_w) * score_np.shape[1]))
                h_att = max(1, int((box_h / new_h) * score_np.shape[0]))
                x_att = max(0, min(x_att, score_np.shape[1] - 1))
                y_att = max(0, min(y_att, score_np.shape[0] - 1))
                x_att_end = min(x_att + w_att, score_np.shape[1])
                y_att_end = min(y_att + h_att, score_np.shape[0])
                region_score = score_np[y_att:y_att_end, x_att:x_att_end].mean()
                confidence = float(region_score)
                pseudo_labels.append({
                    'bbox': [int(x), int(y), int(box_w), int(box_h)],
                    'confidence': float(confidence),
                    'scale': float(scale)})
        if len(pseudo_labels) > 0:
            pseudo_labels = self._nms(pseudo_labels, iou_threshold=0.5)
        return pseudo_labels
    def _nms(self, boxes: List[Dict], iou_threshold: float = 0.5) -> List[Dict]:
        if len(boxes) == 0:
            return []
        boxes = sorted(boxes, key=lambda x: x['confidence'], reverse=True)
        keep = []
        while len(boxes) > 0:
            keep.append(boxes[0])
            boxes = boxes[1:]
            if len(boxes) == 0:
                break
            filtered = []
            for box in boxes:
                iou = self._compute_iou(keep[-1]['bbox'], box['bbox'])
                if iou < iou_threshold:
                    filtered.append(box)
            boxes = filtered
        return keep
    def _compute_iou(self, box1: List[int], box2: List[int]) -> float:
        x1, y1, w1, h1 = box1
        x2, y2, w2, h2 = box2
        xi1 = max(x1, x2)
        yi1 = max(y1, y2)
        xi2 = min(x1 + w1, x2 + w2)
        yi2 = min(y1 + h1, y2 + h2)
        inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
        box1_area = w1 * h1
        box2_area = w2 * h2
        union_area = box1_area + box2_area - inter_area
        return inter_area / union_area if union_area > 0 else 0

In [None]:
class AIDefectReasoner:    
    def __init__(self, category: str):
        self.category = category
        self.domain_priors = {
            'critical_regions': [],
            'texture_sensitivity': 0.5,
            'object_sensitivity': 0.7}
        self.known_defect_embeddings = []        
        self.thresholds = {
            'compactness': 0.6,
            'edge_strength': 0.10,
            'repetition': 0.3,
            'severity_low': 0.3,
            'severity_high': 0.6,
            'trust_high': 0.4,     
            'trust_low': 0.2}      
    def analyze_defect_region(self,
                             img: np.ndarray,
                             bbox: List[int],
                             attention_map: np.ndarray,
                             layer_activations: Dict[str, np.ndarray]) -> Dict:
        x, y, w, h = bbox
        region = img[y:y+h, x:x+w]
        reasoning_trace = []
        defect_type, type_confidence = self._reason_about_type(
            region, bbox, attention_map, layer_activations, reasoning_trace)
        severity = self._reason_about_severity(
            region, bbox, attention_map, defect_type, reasoning_trace)
        confidence_vector = self._compute_confidence_vector(
            region, bbox, attention_map, layer_activations, reasoning_trace)
        trust_level = self._reason_about_trust(confidence_vector, reasoning_trace)
        novelty = self._reason_about_novelty(region, reasoning_trace)
        return {
            'defect_type': defect_type,
            'severity': float(severity),
            'confidence_vector': confidence_vector,
            'trust_level': trust_level,
            'novelty': novelty,
            'reasoning_trace': reasoning_trace,
            'type_confidence': float(type_confidence)}
    def _reason_about_type(self,
                          region: np.ndarray,
                          bbox: List[int],
                          attention_map: np.ndarray,
                          layer_activations: Dict[str, np.ndarray],
                          trace: List[str]) -> Tuple[str, float]:
        x, y, w, h = bbox
        gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY)
        area = w * h
        perimeter = 2 * (w + h)
        compactness = (4 * np.pi * area) / (perimeter ** 2) if perimeter > 0 else 0
        trace.append(f"Compactness: {compactness:.3f}")
        edges = cv2.Canny(gray, 50, 150)
        edge_strength = edges.sum() / (edges.shape[0] * edges.shape[1])
        trace.append(f"Edge strength: {edge_strength:.3f}")
        repetition_score = self._compute_repetition(gray)
        trace.append(f"Repetition: {repetition_score:.3f}")
        aspect_ratio = max(w, h) / (min(w, h) + 1e-6)
        is_elongated = aspect_ratio > 2.5
        trace.append(f"Aspect ratio: {aspect_ratio:.2f}")
        early_layers = sum(layer_activations.get(f'layer_{i}', torch.tensor([0.0])).cpu().numpy().mean()
                          for i in range(-11, -8))
        late_layers = sum(layer_activations.get(f'layer_{i}', torch.tensor([0.0])).cpu().numpy().mean()
                         for i in range(-3, 0))
        layer_ratio = late_layers / (early_layers + 1e-6)
        trace.append(f"Layer ratio (late/early): {layer_ratio:.3f}")
        object_score = 0.0
        texture_score = 0.0
        if compactness > self.thresholds['compactness']:
            object_score += 0.3
            trace.append("→ Compact shape favors OBJECT")
        if edge_strength > self.thresholds['edge_strength']:
            object_score += 0.3
            trace.append("→ Strong edges favor OBJECT")
        if layer_ratio > 1.2:
            object_score += 0.25
            trace.append("→ Late layer dominance favors OBJECT")
        if repetition_score > self.thresholds['repetition']:
            texture_score += 0.35
            trace.append("→ High repetition favors TEXTURE")
        if is_elongated:
            texture_score += 0.2
            trace.append("→ Elongated shape favors TEXTURE")
        if layer_ratio < 0.8:
            texture_score += 0.25
            trace.append("→ Early layer dominance favors TEXTURE")
        if object_score > texture_score and object_score > 0.5:
            defect_type = 'object'
            confidence = object_score / (object_score + texture_score)
        elif texture_score > 0.5:
            defect_type = 'texture'
            confidence = texture_score / (object_score + texture_score)
        else:
            defect_type = 'unknown'
            confidence = 0.5
        trace.append(f"DECISION: {defect_type.upper()} (confidence: {confidence:.3f})")
        return defect_type, confidence
    def _compute_repetition(self, gray: np.ndarray) -> float:
        if gray.shape[0] < 10 or gray.shape[1] < 10:
            return 0.0
        f = np.fft.fft2(gray)
        fshift = np.fft.fftshift(f)
        magnitude = np.abs(fshift)
        h, w = magnitude.shape
        center_mask = np.zeros_like(magnitude)
        center_mask[h//4:3*h//4, w//4:3*w//4] = 1
        high_freq_energy = (magnitude * (1 - center_mask)).sum()
        total_energy = magnitude.sum()
        repetition = high_freq_energy / (total_energy + 1e-6)
        return min(repetition * 2, 1.0)
    def _reason_about_severity(self,
                               region: np.ndarray,
                               bbox: List[int],
                               attention_map: np.ndarray,
                               defect_type: str,
                               trace: List[str]) -> float:
        x, y, w, h = bbox
        gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY)
        img_area = attention_map.shape[0] * attention_map.shape[1]
        area_ratio = (w * h) / img_area
        trace.append(f"Area ratio: {area_ratio:.3f}")
        contrast_score = gray.std() / 128.0
        trace.append(f"Contrast: {contrast_score:.3f}")
        edges = cv2.Canny(gray, 50, 150)
        boundary_strength = edges.sum() / (edges.shape[0] * edges.shape[1])
        trace.append(f"Boundary: {boundary_strength:.3f}")
        attention_strength = attention_map.mean()
        trace.append(f"Attention: {attention_strength:.3f}")
        if defect_type == 'object':
            severity = (
                0.4 * area_ratio * 10 +
                0.3 * boundary_strength * 5 +
                0.2 * contrast_score +
                0.1 * attention_strength)
        else:
            severity = (
                0.2 * area_ratio * 10 +
                0.3 * contrast_score +
                0.3 * attention_strength +
                0.2 * boundary_strength * 5)
        severity = np.clip(severity, 0, 1)
        trace.append(f"Severity: {severity:.3f}")
        return severity
    def _compute_confidence_vector(self,
                                   region: np.ndarray,
                                   bbox: List[int],
                                   attention_map: np.ndarray,
                                   layer_activations: Dict[str, np.ndarray],
                                   trace: List[str]) -> Dict[str, float]:
        trace.append("\n--- Confidence Vector ---")
        attention_entropy = -np.sum(
            attention_map * np.log(attention_map + 1e-10)
        ) / np.log(attention_map.size)
        attention_confidence = 1.0 - attention_entropy
        trace.append(f"Attention conf: {attention_confidence:.3f}")
        gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY)
        edges = cv2.Canny(gray, 50, 150)
        boundary_clarity = edges.sum() / (edges.shape[0] * edges.shape[1])
        detection_confidence = min(boundary_clarity * 10, 1.0)
        trace.append(f"Detection conf: {detection_confidence:.3f}")
        layer_values = [act.cpu().numpy().mean() for act in layer_activations.values()]
        if len(layer_values) > 1:
            layer_std = np.std(layer_values)
            stability_confidence = 1.0 / (1.0 + layer_std)
        else:
            stability_confidence = 0.5
        trace.append(f"Stability conf: {stability_confidence:.3f}")
        x, y, w, h = bbox
        size_reasonable = 0.01 < (w * h) / (region.shape[0] * region.shape[1]) < 0.5
        domain_consistency = 1.0 if size_reasonable else 0.5
        trace.append(f"Domain conf: {domain_consistency:.3f}")
        return {
            'attention': float(attention_confidence),
            'detection': float(detection_confidence),
            'stability': float(stability_confidence),
            'domain': float(domain_consistency)}
    def _reason_about_trust(self,
                           confidence_vector: Dict[str, float],
                           trace: List[str]) -> str:
        trace.append("\n--- Trust Reasoning ---")
        avg_confidence = np.mean(list(confidence_vector.values()))
        min_confidence = min(confidence_vector.values())
        trace.append(f"Avg conf: {avg_confidence:.3f}")
        trace.append(f"Min conf: {min_confidence:.3f}")
        if avg_confidence > self.thresholds['trust_high'] and min_confidence > 0.6:
            trust = 'high'
            trace.append("TRUST: HIGH (all signals strong)")
        elif avg_confidence < self.thresholds['trust_low'] or min_confidence < 0.3:
            trust = 'low'
            trace.append("TRUST: LOW (weak signals)")
        else:
            trust = 'medium'
            trace.append("TRUST: MEDIUM (mixed signals)")
        return trust
    def _reason_about_novelty(self, region: np.ndarray, trace: List[str]) -> str:
        trace.append("\n--- Novelty Assessment ---")
        gray = cv2.cvtColor(region, cv2.COLOR_RGB2GRAY)
        hist, _ = np.histogram(gray.flatten(), bins=32, range=(0, 256))
        hist = hist.astype(float) / (hist.sum() + 1e-6)
        if len(self.known_defect_embeddings) == 0:
            trace.append("No history - marking NOVEL")
            self.known_defect_embeddings.append(hist)
            return 'novel'
        similarities = [
            np.sum(np.minimum(hist, known))
            for known in self.known_defect_embeddings]
        max_similarity = max(similarities)
        trace.append(f"Max similarity: {max_similarity:.3f}")
        if max_similarity > 0.8:
            novelty = 'known'
            trace.append("NOVELTY: KNOWN")
        elif max_similarity > 0.5:
            novelty = 'variant'
            trace.append("NOVELTY: VARIANT")
            self.known_defect_embeddings.append(hist)
        else:
            novelty = 'novel'
            trace.append("NOVELTY: NOVEL")
            self.known_defect_embeddings.append(hist)
        return novelty

In [None]:
class YOLODefectDetector:    
    def __init__(self, model_size: str = 'n'):
        try:
            from ultralytics import YOLO
            self.YOLO = YOLO
            self.model = None
            self.model_size = model_size
            print(f"YOLOv8 available")
        except ImportError:
            print("Installing ultralytics (YOLOv8)...")
            os.system(f"{sys.executable} -m pip install -q ultralytics")
            from ultralytics import YOLO
            self.YOLO = YOLO
            self.model = None
            self.model_size = model_size
            print(f"YOLOv8 installed")
    def prepare_yolo_dataset(self, pseudo_labels_dict: Dict,
                            output_dir: Path, category: str):
        img_train_dir = output_dir / "images" / "train"
        img_val_dir = output_dir / "images" / "val"
        lbl_train_dir = output_dir / "labels" / "train"
        lbl_val_dir = output_dir / "labels" / "val"
        for d in [img_train_dir, img_val_dir, lbl_train_dir, lbl_val_dir]:
            d.mkdir(parents=True, exist_ok=True)
        items = list(pseudo_labels_dict.items())
        split_idx = int(len(items) * 0.8)
        train_items = items[:split_idx]
        val_items = items[split_idx:]
        def write_yolo_annotations(items, img_dir, lbl_dir):
            for img_path, labels in items:
                img = cv2.imread(img_path)
                h, w = img.shape[:2]
                img_name = Path(img_path).name
                dst_img = img_dir / img_name
                cv2.imwrite(str(dst_img), img)
                lbl_name = img_name.replace('.png', '.txt')
                dst_lbl = lbl_dir / lbl_name
                with open(dst_lbl, 'w') as f:
                    for lbl in labels:
                        x, y, box_w, box_h = lbl['bbox']
                        x_center = (x + box_w / 2) / w
                        y_center = (y + box_h / 2) / h
                        norm_w = box_w / w
                        norm_h = box_h / h
                        f.write(f"0 {x_center:.6f} {y_center:.6f} {norm_w:.6f} {norm_h:.6f}\n")
        write_yolo_annotations(train_items, img_train_dir, lbl_train_dir)
        write_yolo_annotations(val_items, img_val_dir, lbl_val_dir)
        data_yaml = output_dir / "data.yaml"
        with open(data_yaml, 'w') as f:
            f.write(f"path: {output_dir.absolute()}\n")
            f.write(f"train: images/train\n")
            f.write(f"val: images/val\n")
            f.write(f"nc: 1\n")
            f.write(f"names: ['defect']\n")
        print(f"YOLO dataset: {len(train_items)} train, {len(val_items)} val")
        return data_yaml
    def train(self, data_yaml: Path, epochs: int = 50, imgsz: int = 224):
        print(f"\nTraining YOLOv8{self.model_size}...")
        self.model = self.YOLO(f'yolov8{self.model_size}.pt')
        results = self.model.train(
            data=str(data_yaml),
            epochs=epochs,
            imgsz=imgsz,
            batch=16,
            device='mps' if device.type == 'mps' else ('cuda' if device.type == 'cuda' else 'cpu'),
            patience=10,
            save=True,
            project=str(MODELS_DIR),
            name='yolo_defect_detector',
            exist_ok=True,
            verbose=False)
        print(f"Training completed")
        return results
    def detect(self, img: np.ndarray, conf_threshold: float = 0.01) -> List[Dict]:
        if self.model is None:
            raise ValueError("Model not trained yet")        
        results = self.model(img, conf=conf_threshold, verbose=False)[0]
        detections = []        
        if len(results.boxes) == 0:
            results = self.model(img, conf=0.001, verbose=False)[0]        
        for box in results.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = float(box.conf[0])
            cls = int(box.cls[0])
            detections.append({
                'bbox': [int(x1), int(y1), int(x2-x1), int(y2-y1)],
                'confidence': conf,
                'class': cls})
        return detections

In [None]:
class ClosedLoopRefinement:    
    def __init__(self, dino_extractor: DINOFeatureExtractor,
                 pseudo_label_gen: PseudoLabelGenerator):
        self.dino = dino_extractor
        self.pseudo_gen = pseudo_label_gen
        self.threshold_percentile = 90
        self.min_box_size = 10
        self.refinement_history = []
    def refine_from_detector_feedback(self,
                                     img: np.ndarray,
                                     yolo_detections: List[Dict],
                                     ground_truth_label: str) -> Dict:
        pseudo_labels = self.pseudo_gen.generate_pseudo_labels(
            img,
            threshold_percentile=self.threshold_percentile,
            min_box_size=self.min_box_size)
        n_pseudo = len(pseudo_labels)
        n_yolo = len(yolo_detections)
        if ground_truth_label != 'good':
            if n_yolo > n_pseudo:
                self.threshold_percentile = max(90, self.threshold_percentile - 1)
            elif n_yolo < n_pseudo and n_yolo == 0:
                self.threshold_percentile = min(99, self.threshold_percentile + 1)
        else:
            if n_yolo > 0:
                self.min_box_size = min(50, self.min_box_size + 2)
        self.refinement_history.append({
            'threshold_percentile': self.threshold_percentile,
            'min_box_size': self.min_box_size,
            'n_pseudo': n_pseudo,
            'n_yolo': n_yolo,
            'ground_truth': ground_truth_label})
        return {
            'adjusted_params': {
                'threshold_percentile': self.threshold_percentile,
                'min_box_size': self.min_box_size
            },
            'agreement_stats': {
                'n_pseudo_labels': n_pseudo,
                'n_yolo_detections': n_yolo
            }}

In [None]:
class InspectionDecisionEngine:    
    def __init__(self,
                 accept_threshold: float = 0.4,  
                 reject_threshold: float = 0.3):
        self.accept_threshold = accept_threshold
        self.reject_threshold = reject_threshold
    def make_decision(self,
                     detections: List[Dict],
                     defect_analyses: List[Dict],
                     category: str) -> Dict:
        if len(detections) == 0:
            return {
                'decision': 'AUTO_ACCEPT',
                'defect_present': False,
                'defects': [],
                'confidence': 1.0,
                'recommended_action': 'Accept product for shipment',
                'reasoning': 'No defects detected',
                'reasoning_traces': []}
        filtered_analyses = []
        filtered_detections = []
        for det, analysis in zip(detections, defect_analyses):
            x, y, w, h = det['bbox']
            box_area = w * h
            if box_area < (1024 * 1024 * 0.8):
                filtered_detections.append(det)
                filtered_analyses.append(analysis)
        if len(filtered_detections) > 50:
            sorted_pairs = sorted(
                zip(filtered_detections, filtered_analyses),
                key=lambda x: x[0]['confidence'],
                reverse=True)
            filtered_detections = [p[0] for p in sorted_pairs[:20]]
            filtered_analyses = [p[1] for p in sorted_pairs[:20]]
        defect_details = []
        max_severity = 0.0
        avg_confidence = 0.0
        has_novel = False
        low_trust_count = 0
        all_traces = []
        for det, analysis in zip(detections, defect_analyses):
            severity = analysis['severity']
            conf_vector = analysis['confidence_vector']
            overall_confidence = np.mean(list(conf_vector.values()))
            if analysis['trust_level'] == 'low':
                low_trust_count += 1
            if analysis['novelty'] == 'novel':
                has_novel = True
            defect_details.append({
                'bbox': det['bbox'],
                'type': analysis['defect_type'],
                'severity': severity,
                'confidence': float(overall_confidence),
                'confidence_vector': conf_vector,
                'trust_level': analysis['trust_level'],
                'novelty': analysis['novelty'],
                'type_confidence': analysis['type_confidence']})
            max_severity = max(max_severity, severity)
            avg_confidence += overall_confidence
            all_traces.extend(analysis['reasoning_trace'])
        if len(filtered_detections) == 0:
            return {
                'decision': 'AUTO_ACCEPT',
                'defect_present': False,
                'defects': [],
                'confidence': 1.0,
                'recommended_action': 'Accept - no significant defects',
                'reasoning': 'All detections filtered as noise',
                'reasoning_traces': []}
        avg_confidence /= len(detections)
        reasoning_parts = []        
        if has_novel:
            decision = 'FLAG_NEW_DEFECT'
            action = 'Flag for engineering - novel defect pattern'
            reasoning_parts.append('Novel defect detected')
        elif low_trust_count / len(detections) > 0.8:
            decision = 'HUMAN_REVIEW'
            action = 'Flag for human inspection - low confidence'
            reasoning_parts.append(f'Low trust: {low_trust_count}/{len(detections)}')
        elif max_severity > 0.5 and avg_confidence > self.reject_threshold: 
            decision = 'AUTO_REJECT'
            action = 'Reject product - critical defect'
            reasoning_parts.append(f'Severity: {max_severity:.2f}, Conf: {avg_confidence:.2f}')
        elif max_severity < 0.3 and avg_confidence < 0.3: 
            decision = 'AUTO_ACCEPT'
            action = 'Accept - minor defects within tolerance'
            reasoning_parts.append(f'Low severity: {max_severity:.2f}')
        else:
            decision = 'HUMAN_REVIEW'
            action = 'Flag for human inspection - uncertain'
            reasoning_parts.append(f'Borderline: sev={max_severity:.2f}, conf={avg_confidence:.2f}')
        return {
            'decision': decision,
            'defect_present': True,
            'defects': defect_details,
            'confidence': float(avg_confidence),
            'recommended_action': action,
            'reasoning': ' | '.join(reasoning_parts),
            'reasoning_traces': all_traces}

In [None]:
def visualize_attention_maps(img: np.ndarray,
                            attention_maps: Dict[str, torch.Tensor],
                            save_path: Optional[Path] = None):
    n_layers = len(attention_maps)
    fig, axes = plt.subplots(1, n_layers + 1, figsize=(4 * (n_layers + 1), 4))
    axes[0].imshow(img)
    axes[0].set_title('Original')
    axes[0].axis('off')
    for idx, (layer_name, attn_map) in enumerate(attention_maps.items()):
        attn_np = attn_map[0].cpu().numpy()
        attn_resized = cv2.resize(attn_np, (img.shape[1], img.shape[0]))
        axes[idx + 1].imshow(img)
        axes[idx + 1].imshow(attn_resized, alpha=0.6, cmap='jet')
        axes[idx + 1].set_title(layer_name)
        axes[idx + 1].axis('off')
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        plt.close()
    else:
        plt.show()
def visualize_detection_result(img: np.ndarray,
                               detections: List[Dict],
                               decision: Dict,
                               save_path: Optional[Path] = None):
    fig, ax = plt.subplots(1, 1, figsize=(10, 10))
    ax.imshow(img)
    for det in decision['defects']:
        x, y, w, h = det['bbox']
        severity = det['severity']
        if severity > 0.7:
            color = 'red'
        elif severity > 0.4:
            color = 'orange'
        else:
            color = 'yellow'
        rect = plt.Rectangle((x, y), w, h, linewidth=3,
                            edgecolor=color, facecolor='none')
        ax.add_patch(rect)
        label = f"{det['type']}: {det['confidence']:.2f}"
        ax.text(x, y - 5, label, color=color, fontsize=10,
               bbox=dict(boxstyle='round', facecolor='black', alpha=0.7))
    decision_color = {
        'AUTO_ACCEPT': 'green',
        'AUTO_REJECT': 'red',
        'HUMAN_REVIEW': 'orange',
        'FLAG_NEW_DEFECT': 'purple'
    }[decision['decision']]
    ax.text(0.5, 0.98, f"DECISION: {decision['decision']}",
           transform=ax.transAxes, fontsize=14, weight='bold',
           color='white', ha='center', va='top',
           bbox=dict(boxstyle='round', facecolor=decision_color, alpha=0.8))
    ax.text(0.5, 0.02, decision['recommended_action'],
           transform=ax.transAxes, fontsize=10,
           color='white', ha='center', va='bottom',
           bbox=dict(boxstyle='round', facecolor='black', alpha=0.7))
    ax.axis('off')
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        plt.close()
    else:
        plt.show()

In [None]:
class DefectDiscoveryPipeline:    
    def __init__(self, category: str):
        self.category = category
        print("\n")
        print(f"Initializing Defect Discovery Pipeline: {category}")
        print("\n")
        print("1. Loading DINO Vision Transformer...")
        self.dino = DINOFeatureExtractor(model_name='dino_vits16', device=device)
        print("2. Initializing pseudo-label generator...")
        self.pseudo_gen = PseudoLabelGenerator(self.dino)
        print("3. Setting up AI reasoner...")
        self.reasoner = AIDefectReasoner(category=category)
        print("4. Preparing YOLO detector...")
        self.yolo = YOLODefectDetector(model_size='n')
        print("5. Initializing closed-loop refinement...")
        self.refinement = ClosedLoopRefinement(self.dino, self.pseudo_gen)
        print("6. Creating inspection decision engine...")
        self.decision_engine = InspectionDecisionEngine()
        print("\nPipeline initialized\n")    
    def generate_training_data(self, n_samples: int = 100):
        print("\n")
        print(f"Phase 1: Pseudo-Label Generation -")
        print("\n")
        test_dataset = MVTecDataset(
            MVTEC_ROOT,
            self.category,
            split='test',
            transform=None,
            target_size=(224, 224))
        all_defect_samples = [s for s in test_dataset.samples if s['is_defect']]
        all_good_samples = [s for s in test_dataset.samples if not s['is_defect']]
        print(f"Dataset: {len(all_defect_samples)} defects, {len(all_good_samples)} good")
        import random
        random.seed(42)
        random.shuffle(all_defect_samples)
        random.shuffle(all_good_samples)
        defect_split = int(len(all_defect_samples) * 0.7)
        good_split = int(len(all_good_samples) * 0.7)
        train_defects = all_defect_samples[:defect_split]
        eval_defects = all_defect_samples[defect_split:]
        eval_goods = all_good_samples[good_split:]
        self.held_out_defects = eval_defects
        self.held_out_goods = eval_goods
        print(f"Split: {len(train_defects)} train, {len(eval_defects)} held-out defects")
        if len(train_defects) > n_samples:
            train_defects = train_defects[:n_samples]
        print(f"Generating pseudo-labels for {len(train_defects)} images...")
        pseudo_labels_dict = {}
        for sample in tqdm(train_defects):
            img = np.array(Image.open(sample['path']).convert('RGB'))
            labels = self.pseudo_gen.generate_pseudo_labels(
                img,
                threshold_percentile=90,
                min_box_size=20,
                use_multiscale=True)
            if len(labels) > 0:
                pseudo_labels_dict[str(sample['path'])] = labels
        print(f"\nGenerated pseudo-labels for {len(pseudo_labels_dict)} images")
        pseudo_label_file = PSEUDO_LABELS_DIR / f"{self.category}_pseudo_labels.json"
        with open(pseudo_label_file, 'w') as f:
            json.dump(pseudo_labels_dict, f, indent=2)
        return pseudo_labels_dict
    def train_detector(self, pseudo_labels_dict: Dict, epochs: int = 50):
        print("\n")
        print(f"Phase 2: YOLO Detector Training -")
        print("\n")
        yolo_data_dir = PSEUDO_LABELS_DIR / f"{self.category}_yolo_data"
        data_yaml = self.yolo.prepare_yolo_dataset(
            pseudo_labels_dict,
            yolo_data_dir,
            self.category)
        results = self.yolo.train(data_yaml, epochs=epochs, imgsz=224)
        possible_paths = [
            MODELS_DIR / "yolo_defect_detector" / "weights" / "best.pt",
            MODELS_DIR / "yolo_defect_detector2" / "weights" / "best.pt",
            MODELS_DIR / "yolo_defect_detector3" / "weights" / "best.pt"]
        best_weights = None
        for path in possible_paths:
            if path.exists():
                best_weights = path
                break
        if best_weights:
            print(f"\nLoading trained weights: {best_weights}")
            self.yolo.model = self.yolo.YOLO(str(best_weights))
            print(f"Model loaded successfully\n")
        else:
            print(f"\nCould not find best.pt\n")
        return results
    def run_inference(self, img_path: str, visualize: bool = True, debug: bool = False) -> Dict:
        img = np.array(Image.open(img_path).convert('RGB'))        
        if debug:
            print(f"\nDEBUG - Processing: {Path(img_path).name}")
            print(f"  Image shape: {img.shape}")
        detections = self.yolo.detect(img, conf_threshold=0.001)
        if debug:
            print(f"  Detections (conf=0.001): {len(detections)}")
            if len(detections) > 0:
                for i, det in enumerate(detections[:3]):
                    print(f"    Box {i}: conf={det['confidence']:.4f}, bbox={det['bbox']}")
        if len(detections) == 0 and debug:
            raw_results = self.yolo.model(img, conf=0.0001, verbose=False)[0]
            print(f"  Raw YOLO boxes (conf=0.0001): {len(raw_results.boxes)}")
            if len(raw_results.boxes) > 0:
                for i, box in enumerate(raw_results.boxes[:3]):
                    print(f"    Raw box {i}: conf={float(box.conf[0]):.6f}")
        transform = get_transforms()
        img_tensor = transform(Image.fromarray(img)).unsqueeze(0)
        dino_result = self.dino.extract_attention_maps(img_tensor)
        attention_map = self.dino.compute_anomaly_score(
            dino_result['attention_maps'],
            strategy='hierarchical'
        )[0].cpu().numpy()
        attention_resized = cv2.resize(attention_map, (img.shape[1], img.shape[0]))
        defect_analyses = []
        for det in detections:
            analysis = self.reasoner.analyze_defect_region(
                img,
                det['bbox'],
                attention_resized,
                dino_result['attention_maps'])
            defect_analyses.append(analysis)
        decision = self.decision_engine.make_decision(
            detections, defect_analyses, self.category)
        if visualize:
            save_path = RESULTS_DIR / f"{Path(img_path).stem}_result.png"
            visualize_detection_result(img, detections, decision, save_path)
            if not debug:
                print(f"Saved: {save_path}")
        return decision
    def evaluate_on_test_set(self, n_samples: int = 50):
        print("\n")
        print(f"Phase 3: Test Set Evaluation -")
        print("\n")
        if not hasattr(self, 'held_out_defects'):
            print("WARNING: No held-out samples\n")
            test_dataset = MVTecDataset(MVTEC_ROOT, self.category, split='test', transform=None)
            test_samples = test_dataset.samples[:n_samples]
        else:
            held_out = self.held_out_defects + self.held_out_goods
            import random
            random.shuffle(held_out)
            test_samples = held_out[:min(n_samples, len(held_out))]
            print(f"Using {len(test_samples)} held-out samples")
            print(f"  {sum(1 for s in test_samples if s['is_defect'])} defects, {sum(1 for s in test_samples if not s['is_defect'])} good\n")
        results = {
            'correct_accept': 0,
            'correct_reject': 0,
            'false_accept': 0,
            'false_reject': 0,
            'human_review': 0,
            'decisions': []}
        print(f"Evaluating on {len(test_samples)} images...")
        for sample in tqdm(test_samples):
            decision = self.run_inference(str(sample['path']), visualize=False, debug=False)
            is_defect_gt = sample['is_defect']
            decision_type = decision['decision']
            defect_detected = decision['defect_present']
            if decision_type in ['HUMAN_REVIEW', 'FLAG_NEW_DEFECT']:
                results['human_review'] += 1
            elif defect_detected and is_defect_gt:
                results['correct_reject'] += 1
            elif not defect_detected and not is_defect_gt:
                results['correct_accept'] += 1
            elif defect_detected and not is_defect_gt:
                results['false_reject'] += 1
            else:
                results['false_accept'] += 1
            results['decisions'].append({
                'path': str(sample['path']),
                'ground_truth': sample['label'],
                'is_defect_gt': is_defect_gt,
                'decision': decision_type})
        total = len(test_samples)
        automated = total - results['human_review']
        total_defects = sum(1 for s in test_samples if s['is_defect'])
        automation_rate = automated / total if total > 0 else 0.0
        if automated > 0:
            auto_correct = results['correct_accept'] + results['correct_reject']
            automated_accuracy = auto_correct / automated
        else:
            automated_accuracy = 0.0
        overall_accuracy = (results['correct_accept'] + results['correct_reject']) / total if total > 0 else 0.0
        false_accept_rate = results['false_accept'] / total_defects if total_defects > 0 else 0.0
        print("\n")
        print(f"Evaluation Results:")
        print(f"Total: {total} ({total_defects} defects, {total - total_defects} good)")
        print(f"")
        print(f"Automation: {automation_rate:.1%} ({automated}/{total})")
        print(f"Automated Accuracy: {automated_accuracy:.1%} (of {automated} automated)")
        print(f"Overall Accuracy: {overall_accuracy:.1%}")
        print(f"")
        print(f"Correct Accept: {results['correct_accept']}")
        print(f"Correct Reject: {results['correct_reject']}")
        print(f"False Accept: {results['false_accept']} ", end="")
        if false_accept_rate == 0:
            print("SAFE")
        elif false_accept_rate < 0.2:
            print("Risk")
        else:
            print("HIGH RISK")
        print(f"False Reject: {results['false_reject']}")
        print(f"Human Review: {results['human_review']} ({results['human_review']/total:.1%})")
        print(f"")
        print(f"False Accept Rate: {false_accept_rate:.1%} ", end="")
        if false_accept_rate == 0:
            print("(Safe)")
        elif false_accept_rate < 0.5:
            print("(Moderate Risk)")
        else:
            print("(High Risk)")
        print("\n")
        results_file = RESULTS_DIR / f"{self.category}_evaluation.json"
        serializable = {
            'category': self.category,
            'total': total,
            'total_defects': total_defects,
            'automation_rate': float(automation_rate),
            'automated_accuracy': float(automated_accuracy),
            'overall_accuracy': float(overall_accuracy),
            'false_accept_rate': float(false_accept_rate),
            'correct_accept': results['correct_accept'],
            'correct_reject': results['correct_reject'],
            'false_accept': results['false_accept'],
            'false_reject': results['false_reject'],
            'human_review': results['human_review']}
        with open(results_file, 'w') as f:
            json.dump(serializable, f, indent=2)
        return results

In [None]:
def main():
    CATEGORY = 'grid'  # Change this for each category
    N_PSEUDO_LABEL_SAMPLES = 100
    TRAINING_EPOCHS = 50
    N_EVAL_SAMPLES = 30
    print(f"\nConfiguration:")
    print(f" Category: {CATEGORY}")
    print(f" Pseudo-label samples: {N_PSEUDO_LABEL_SAMPLES}")
    print(f" Training epochs: {TRAINING_EPOCHS}")
    print(f" Evaluation samples: {N_EVAL_SAMPLES}")    
    pipeline = DefectDiscoveryPipeline(category=CATEGORY)
    pseudo_labels = pipeline.generate_training_data(n_samples=N_PSEUDO_LABEL_SAMPLES)
    if len(pseudo_labels) < 10:
        print("\nToo few pseudo-labels")
        return
    training_results = pipeline.train_detector(pseudo_labels, epochs=TRAINING_EPOCHS)
    eval_results = pipeline.evaluate_on_test_set(n_samples=N_EVAL_SAMPLES)
    print("\n")
    print(f"Phase 4: Visualization Examples -")
    print("\n")    
    if hasattr(pipeline, 'held_out_defects'):
        viz_samples = (pipeline.held_out_defects[:3] + pipeline.held_out_goods[:2])
    else:
        test_dataset = MVTecDataset(MVTEC_ROOT, CATEGORY, split='test')
        defect_samples = [s for s in test_dataset.samples if s['is_defect']][:3]
        good_samples = [s for s in test_dataset.samples if not s['is_defect']][:2]
        viz_samples = defect_samples + good_samples
    print("Running inference on samples...")
    for sample in viz_samples:
        decision = pipeline.run_inference(str(sample['path']), visualize=True, debug=True)
        gt = "DEFECT" if sample['is_defect'] else "GOOD"
        print(f" └─ GT={gt}, Decision={decision['decision']}\n")
    print("\n")
    print(f"Pipeline Completed.")
    print(f"\nOutputs:")
    print(f" Models: {MODELS_DIR}")
    print(f" Pseudo-labels: {PSEUDO_LABELS_DIR}")
    print(f" Results: {RESULTS_DIR}")
    print("\n")
if __name__ == "__main__":
    main()