<a href="https://colab.research.google.com/github/jalevano/tfm_uoc_datascience/blob/main/01_Mask2Former_Implementacion.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""

Evaluación de Modelos Mask2Former para Segmentación de Personas
================================================================

Este módulo implementa la evaluación sistemática del modelo Mask2Former para
segmentación de instancias de personas. Parte del framework de evaluación
comparativa para TFM sobre técnicas de segmentación.

Autor: Jesús L.
Fecha: 2025
Proyecto: TFM - Evaluación Comparativa de Técnicas de Segmentación

"""



In [None]:
import torch
import torch.nn.functional as F
import numpy as np
import cv2
import json
import time
import psutil
import os
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any, Union
from dataclasses import dataclass
from pathlib import Path

from transformers import Mask2FormerImageProcessor, Mask2FormerForUniversalSegmentation
from PIL import Image
import matplotlib.pyplot as plt
from scipy.spatial.distance import directed_hausdorff
from skimage import measure, morphology

In [None]:
@dataclass
class EvaluationConfig:
   """
   Configuración para la evaluación de modelos de segmentación.

   Attributes:
       +model_name: Identificador del modelo en Hugging Face
       +device: Dispositivo (cuda/cpu)
       +confidence_threshold: Umbral mínimo de confianza para detecciones
       +output_dir: Directorio para almacenar resultados
       +save_masks: Para guardar las máscaras de segmentación
       +enable_panoptic: Para habilitar evaluación panóptica adicional
       +panoptic_overlap_threshold: Umbral para solapamiento en panóptica
   """
   model_name: str = "facebook/mask2former-swin-base-coco-instance"
   device: str = "cuda" if torch.cuda.is_available() else "cpu"
   confidence_threshold: float = 0.5
   output_dir: str = "./results"
   save_masks: bool = True
   enable_panoptic: bool = True
   panoptic_overlap_threshold: float = 0.5

In [None]:
class PhotoContextClassifier:
   """
   Clasificador de contextos fotográficos para análisis.

   Implementa la categorización automática de imágenes según diferentes
   contextos fotográficos relevantes para evaluación de segmentación.
   """

   CONTEXT_CATEGORIES = {
       'retrato_simple': 'Persona individual con fondo uniforme',
       'retrato_complejo': 'Persona individual con fondo detallado',
       'multiples_personas': 'Dos o más personas en la imagen',
       'exterior_natural': 'Entorno natural (parques, paisajes)',
       'exterior_urbano': 'Entorno urbano (calles, edificios)',
       'iluminacion_dificil': 'Contraluz, sombras, poca luz',
       'poses_complejas': 'Posturas no estándar, oclusiones'
   }

   # Mapeo de clases COCO relevantes para contexto fotográfico
   COCO_CONTEXT_CLASSES = {
       'natural': [16, 17, 18, 19],  # bird, cat, dog, horse
       'urban': [2, 3, 5, 6, 7, 8],  # bicycle, car, bus, train, truck, boat
       'indoor': [56, 57, 58, 59, 60, 61, 62]  # chair, couch, bed, table, etc.
   }

   @classmethod
   def get_valid_categories(cls) -> List[str]:
       """Retorna lista de categorías válidas."""
       return list(cls.CONTEXT_CATEGORIES.keys())

   @classmethod
   def validate_category(cls, category: str) -> str:
       """Valida y corrige categoría si es necesario."""
       if category not in cls.CONTEXT_CATEGORIES:
           print(f"Warning: Categoría '{category}' inválida, usando 'retrato_simple'")
           return 'retrato_simple'
       return category

   @classmethod
   def get_category_description(cls, category: str) -> str:
       """Obtiene la descripción de una categoría."""
       return cls.CONTEXT_CATEGORIES.get(category, "Categoría no válida")

   @staticmethod
   def classify_context(image: np.ndarray, num_persons: int,
                       panoptic_info: Optional[Dict] = None) -> Tuple[str, float]:
       """
       Clasifica el contexto fotográfico de una imagen usando información visual
       y panóptica.

       Args:
           image: Imagen en formato numpy array (H, W, C)
           num_persons: Número de personas detectadas
           panoptic_info: Información panóptica opcional para contexto mejorado

       Returns:
           Tuple con (categoría, puntuación de complejidad)
       """
       h, w = image.shape[:2]

       # Análisis básico de complejidad visual
       gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
       edges = cv2.Canny(gray, 50, 150)
       edge_density = np.sum(edges > 0) / (h * w)

       # Análisis de varianza de color
       color_variance = np.var(image.reshape(-1, 3), axis=0).mean()

       # Análisis de contexto usando información panóptica
       context_complexity = 0.0
       context_category = 'retrato_simple'  # Categoría por defecto

       if panoptic_info:
           # Calcular diversidad de clases detectadas
           segments = panoptic_info.get('segments_info', [])
           unique_classes = len(set(seg.get('category_id', 0) for seg in segments))
           context_complexity += min(unique_classes / 10.0, 0.3)

           # Detectar contextos específicos usando las clases COCO
           class_ids = [seg.get('category_id', 0) for seg in segments]

           if any(cls in PhotoContextClassifier.COCO_CONTEXT_CLASSES['urban'] for cls in class_ids):
               context_category = 'exterior_urbano'
               context_complexity += 0.2
           elif any(cls in PhotoContextClassifier.COCO_CONTEXT_CLASSES['natural'] for cls in class_ids):
               context_category = 'exterior_natural'
               context_complexity += 0.15
           elif any(cls in PhotoContextClassifier.COCO_CONTEXT_CLASSES['indoor'] for cls in class_ids):
               # Si hay muchos elementos indoor, mantener como complejo pero no cambiar categoría
               context_complexity += 0.1

       # Complejidad base de la imagen
       base_complexity = (edge_density * 0.6 + min(color_variance / 1000, 1.0) * 0.4)
       total_complexity = min(base_complexity + context_complexity, 1.0)

       # Clasificación jerárquica usando SOLO las claves del diccionario CONTEXT_CATEGORIES
       if num_persons > 1:
           context_category = 'multiples_personas'
           total_complexity = min(total_complexity + 0.2, 1.0)
       elif total_complexity > 0.8:
           # Muy complejo - posiblemente poses difíciles o iluminación complicada
           if edge_density > 0.15:  # Muchos bordes = poses complejas
               context_category = 'poses_complejas'
           else:  # Poca definición = problemas de iluminación
               context_category = 'iluminacion_dificil'
       elif base_complexity > 0.5:
           # Contexto no modificado por panóptica = retrato complejo
           if context_category == 'retrato_simple':
               context_category = 'retrato_complejo'
       # Si base_complexity <= 0.5 y no hay múltiples personas, mantener 'retrato_simple'

       # Validar que la categoría existe en CONTEXT_CATEGORIES
       context_category = PhotoContextClassifier.validate_category(context_category)

       return context_category, total_complexity

In [None]:
class AdvancedSegmentationMetrics:
   """
   Calculadora de métricas avanzadas para evaluación de segmentación.

   Implementa métricas estándar, especializadas para fotografía y panópticas
   para evaluación cuantitativa completa de la calidad de segmentación.
   """

   @staticmethod
   def intersection_over_union(pred_mask: np.ndarray, gt_mask: np.ndarray) -> float:
       """Calcula Intersection over Union (IoU) entre máscaras."""

       if pred_mask.shape != gt_mask.shape:
           raise ValueError(f"Las máscaras deben tener la misma forma: {pred_mask.shape} vs {gt_mask.shape}")

       intersection = np.logical_and(pred_mask, gt_mask).sum()
       union = np.logical_or(pred_mask, gt_mask).sum()

       if union == 0:
           return 1.0 if intersection == 0 else 0.0

       return float(intersection / union)

   @staticmethod
   def dice_coefficient(pred_mask: np.ndarray, gt_mask: np.ndarray) -> float:
       """Calcula coeficiente de Dice entre máscaras."""

       if pred_mask.shape != gt_mask.shape:
           raise ValueError(f"Las máscaras deben tener la misma forma: {pred_mask.shape} vs {gt_mask.shape}")

       intersection = np.logical_and(pred_mask, gt_mask).sum()
       total = pred_mask.sum() + gt_mask.sum()

       if total == 0:
           return 1.0 if intersection == 0 else 0.0

       return float(2 * intersection / total)

   @staticmethod
   def boundary_iou(pred_mask: np.ndarray, gt_mask: np.ndarray,
                   dilation_ratio: float = 0.02) -> float:
       """
       Calcula IoU de los contornos de las máscaras.

       Métrica importante para fotografía ya que evalúa la precisión de los bordes,
       crucial para aplicaciones como cambio de fondo o efectos de retrato.
       """

       def get_boundary(mask: np.ndarray, dilation_ratio: float) -> np.ndarray:
           h, w = mask.shape
           dilation_pixels = max(1, int(dilation_ratio * np.sqrt(h * w)))

           kernel = np.ones((dilation_pixels, dilation_pixels), np.uint8)
           dilated = cv2.dilate(mask.astype(np.uint8), kernel, iterations=1)
           boundary = dilated - mask.astype(np.uint8)

           return boundary.astype(bool)

       try:
           pred_boundary = get_boundary(pred_mask, dilation_ratio)
           gt_boundary = get_boundary(gt_mask, dilation_ratio)

           return AdvancedSegmentationMetrics.intersection_over_union(pred_boundary, gt_boundary)
       except Exception as e:
           print(f"Error calculando Boundary IoU: {e}")
           return 0.0

   @staticmethod
   def hausdorff_distance(pred_mask: np.ndarray, gt_mask: np.ndarray) -> float:
       """
       Calcula distancia de Hausdorff entre contornos de máscaras.

       Métrica robusta para evaluar la similitud de formas, especialmente útil
       para detectar errores en poses complejas o contornos irregulares.
       """

       def get_contour_points(mask: np.ndarray) -> np.ndarray:
           contours, _ = cv2.findContours(
               mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
           )
           if not contours:
               return np.array([[0, 0]])

           # Concatenar todos los puntos de contorno
           points = np.vstack([contour.reshape(-1, 2) for contour in contours])
           return points

       try:
           pred_points = get_contour_points(pred_mask)
           gt_points = get_contour_points(gt_mask)

           # Manejar casos edge donde no hay contornos
           if len(pred_points) <= 1 or len(gt_points) <= 1:
               return float('inf')

           dist_1 = directed_hausdorff(pred_points, gt_points)[0]
           dist_2 = directed_hausdorff(gt_points, pred_points)[0]

           return float(max(dist_1, dist_2))
       except Exception as e:
           print(f"Error calculando Hausdorff distance: {e}")
           return float('inf')

   @staticmethod
   def coherence_metrics(mask: np.ndarray) -> Dict[str, float]:
       """
       Calcula métricas de coherencia espacial de la máscara.

       Evalúa la calidad estructural de la segmentación, importante para
       determinar si la máscara es utilizable en aplicaciones fotográficas.
       """

       if np.sum(mask) == 0:
           return {
               'connected_components': 0.0,
               'hole_ratio': 0.0,
               'compactness': 0.0,
               'solidity': 0.0,
               'extent': 0.0
           }

       try:
           # Componentes conectados
           labeled_mask = measure.label(mask)
           num_components = int(labeled_mask.max())

           # Propiedades geométricas usando contornos
           contours, _ = cv2.findContours(
               mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
           )

           hole_ratio = 0.0
           solidity = 0.0
           compactness = 0.0
           extent = 0.0

           if contours:
               # Calcular convex hull y propiedades
               largest_contour = max(contours, key=cv2.contourArea)
               hull = cv2.convexHull(largest_contour)

               # Crear máscaras para cálculos
               hull_mask = np.zeros_like(mask)
               cv2.fillPoly(hull_mask, [hull], 1)

               convex_area = np.sum(hull_mask)
               actual_area = np.sum(mask)

               if convex_area > 0:
                   hole_ratio = max(0.0, float((convex_area - actual_area) / convex_area))
                   solidity = float(actual_area / convex_area)

               # Compacidad (4π * área / perímetro²)
               perimeter = cv2.arcLength(largest_contour, True)
               if perimeter > 0:
                   compactness = float((4 * np.pi * actual_area) / (perimeter ** 2))

               # Extent (área / área del bounding box)
               x, y, w, h = cv2.boundingRect(largest_contour)
               bbox_area = w * h
               if bbox_area > 0:
                   extent = float(actual_area / bbox_area)

           return {
               'connected_components': float(num_components),
               'hole_ratio': hole_ratio,
               'compactness': compactness,
               'solidity': solidity,
               'extent': extent
           }

       except Exception as e:
           print(f"Error calculando métricas de coherencia: {e}")
           return {
               'connected_components': 0.0,
               'hole_ratio': 0.0,
               'compactness': 0.0,
               'solidity': 0.0,
               'extent': 0.0
           }

   @staticmethod
   def panoptic_quality_metrics(pred_panoptic: Dict, gt_panoptic: Dict,
                              num_classes: int = 133) -> Dict[str, float]:
       """
       Calcula métricas de Panoptic Quality (PQ) para evaluación panóptica.

       PQ = SQ * RQ donde:
       - SQ (Segmentation Quality): Calidad promedio de IoU de segmentos matched
       - RQ (Recognition Quality): Fracción de ground truth segments que fueron
       matched

       Fundamental para evaluar la capacidad del modelo de entender escenas
       completas.
       """

       try:
           pred_segments = {seg['id']: seg for seg in pred_panoptic.get('segments_info', [])}
           gt_segments = {seg['id']: seg for seg in gt_panoptic.get('segments_info', [])}

           pred_mask = pred_panoptic.get('segmentation')
           gt_mask = gt_panoptic.get('segmentation')

           # Manejar diferentes tipos de máscaras
           if hasattr(pred_mask, 'cpu'):
               pred_mask = pred_mask.cpu().numpy()
           if hasattr(gt_mask, 'cpu'):
               gt_mask = gt_mask.cpu().numpy()

           if pred_mask is None or gt_mask is None or pred_mask.size == 0 or gt_mask.size == 0:
               return {'PQ': 0.0, 'SQ': 0.0, 'RQ': 0.0, 'matched_segments': 0, 'total_predicted': 0, 'total_ground_truth': 0}

           # Asegurar que las máscaras sean numpy arrays
           pred_mask = np.array(pred_mask)
           gt_mask = np.array(gt_mask)

           # Calcular intersecciones para cada par de segmentos
           intersections = {}
           pred_areas = {}
           gt_areas = {}

           for pred_id, pred_seg in pred_segments.items():
               pred_areas[pred_id] = (pred_mask == pred_id).sum()

               for gt_id, gt_seg in gt_segments.items():
                   if gt_seg['category_id'] == pred_seg['category_id']:
                       intersection = ((pred_mask == pred_id) & (gt_mask == gt_id)).sum()
                       if intersection > 0:
                           intersections[(pred_id, gt_id)] = intersection
                           if gt_id not in gt_areas:
                               gt_areas[gt_id] = (gt_mask == gt_id).sum()

           # Calcular matches usando threshold de IoU > 0.5
           matches = []
           for (pred_id, gt_id), intersection in intersections.items():
               union = pred_areas[pred_id] + gt_areas[gt_id] - intersection
               iou = intersection / union if union > 0 else 0

               if iou > 0.5:
                   matches.append({
                       'pred_id': pred_id,
                       'gt_id': gt_id,
                       'iou': iou,
                       'intersection': intersection,
                       'union': union
                   })

           # Calcular métricas
           if len(matches) == 0:
               return {
                   'PQ': 0.0, 'SQ': 0.0, 'RQ': 0.0,
                   'matched_segments': 0,
                   'total_predicted': len(pred_segments),
                   'total_ground_truth': len(gt_segments)
               }

           # SQ (Segmentation Quality): IoU promedio de matches
           sq = float(np.mean([match['iou'] for match in matches]))

           # RQ (Recognition Quality): matches / (total_gt + total_pred - matches)
           total_pred = len(pred_segments)
           total_gt = len(gt_segments)
           rq = float(len(matches) / (total_pred + total_gt - len(matches))) if (total_pred + total_gt) > 0 else 0.0

           # PQ = SQ * RQ
           pq = sq * rq

           return {
               'PQ': pq,
               'SQ': sq,
               'RQ': rq,
               'matched_segments': len(matches),
               'total_predicted': total_pred,
               'total_ground_truth': total_gt
           }

       except Exception as e:
           print(f"Error calculando métricas panópticas: {e}")
           return {
               'PQ': 0.0, 'SQ': 0.0, 'RQ': 0.0,
               'matched_segments': 0,
               'total_predicted': 0,
               'total_ground_truth': 0
           }


In [None]:
class Mask2FormerEvaluator:
   """
   Evaluador principal para el modelo Mask2Former con capacidades panópticas.

   Implementa el pipeline completo de evaluación: carga del modelo,
   procesamiento de imágenes, cálculo de métricas y almacenamiento de resultados.

   """

   def __init__(self, config: EvaluationConfig):
       """
       Inicializa el evaluador con la configuración especificada.

       Args:
           config: Configuración de evaluación
       """
       self.config = config
       self.device = torch.device(config.device)

       # Crear directorio de salida
       Path(config.output_dir).mkdir(parents=True, exist_ok=True)

       # Inicializar modelo y procesador
       self._load_model()

       # Inicializar componentes auxiliares
       self.context_classifier = PhotoContextClassifier()
       self.metrics_calculator = AdvancedSegmentationMetrics()

       # Almacenamiento de resultados
       self.results = []

       # Información del modelo cargado
       self._log_model_info()

   def _load_model(self) -> None:

       """Carga el modelo Mask2Former universal y el procesador de imágenes."""

       print(f"Cargando modelo {self.config.model_name} en {self.device}")

       try:
           self.processor = Mask2FormerImageProcessor.from_pretrained(
               self.config.model_name
           )
           self.model = Mask2FormerForUniversalSegmentation.from_pretrained(
               self.config.model_name
           ).to(self.device)

           self.model.eval()
           print("Modelo cargado correctamente")

       except Exception as e:
           print(f"Error cargando modelo: {e}")
           raise

   def _log_model_info(self) -> None:

       """Registra información detallada del modelo cargado para trazabilidad."""

       total_params = sum(p.numel() for p in self.model.parameters())
       trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)

       print(f"Parámetros totales: {total_params:,}")
       print(f"Parámetros entrenables: {trainable_params:,}")
       print(f"Memoria del modelo: {total_params * 4 / (1024**3):.2f} GB (estimado)")

       # Detectar tipo de entrenamiento del modelo basado en el nombre
       model_type = "instance" if "instance" in self.config.model_name else \
                   "panoptic" if "panoptic" in self.config.model_name else \
                   "semantic" if "semantic" in self.config.model_name else "universal"

       print(f"Tipo de entrenamiento detectado: {model_type}")
       print(f"Evaluación panóptica: {'Habilitada' if self.config.enable_panoptic else 'Deshabilitada'}")

       # Almacenar para uso posterior
       self.model_training_type = model_type

   def _monitor_resources(self) -> Dict[str, float]:
       """
       Monitorea el uso de recursos del sistema para análisis de eficiencia.

       Returns:
           Diccionario con métricas de uso de recursos
       """
       try:
           memory_info = psutil.virtual_memory()

           resources = {
               'cpu_percent': float(psutil.cpu_percent(interval=0.1)),
               'memory_used_mb': float(memory_info.used / (1024 * 1024)),
               'memory_percent': float(memory_info.percent),
               'memory_available_mb': float(memory_info.available / (1024 * 1024))
           }

           # Añadir información de GPU si está disponible
           if torch.cuda.is_available():
               resources['gpu_memory_allocated_mb'] = float(torch.cuda.memory_allocated() / (1024 * 1024))
               resources['gpu_memory_reserved_mb'] = float(torch.cuda.memory_reserved() / (1024 * 1024))
               resources['gpu_memory_cached_mb'] = float(torch.cuda.memory_cached() / (1024 * 1024))

           return resources

       except Exception as e:
           print(f"Error monitoreando recursos: {e}")
           return {'error': str(e)}

   def _process_single_image(self, image_path: str,
                            ground_truth_mask: Optional[np.ndarray] = None,
                            ground_truth_panoptic: Optional[Dict] = None) -> Dict[str, Any]:
       """
       Procesa una imagen individual con análisis completo de instancias y panóptico.

       Args:
           image_path: Ruta a la imagen
           ground_truth_mask: Máscara ground truth para instancias de personas
           ground_truth_panoptic: Información panóptica ground truth

       Returns:
           Diccionario estructurado con todos los resultados del procesamiento
       """

       try:
           # Cargar y validar imagen
           if not os.path.exists(image_path):
               raise FileNotFoundError(f"Imagen no encontrada: {image_path}")

           image = Image.open(image_path).convert("RGB")
           image_np = np.array(image)

           # Validar dimensiones de imagen
           if image_np.shape[0] < 32 or image_np.shape[1] < 32:
               raise ValueError(f"Imagen demasiado pequeña: {image_np.shape}")

           # Monitorear recursos antes del procesamiento
           resources_before = self._monitor_resources()
           start_time = time.time()

           # Procesar imagen con el modelo
           inputs = self.processor(images=image, return_tensors="pt").to(self.device)

           with torch.no_grad():
               outputs = self.model(**inputs)

           processing_time = (time.time() - start_time) * 1000  # en ms
           resources_after = self._monitor_resources()

           # Post-procesamiento para segmentación de instancias
           instance_result = self.processor.post_process_instance_segmentation(
               outputs, target_sizes=[image.size[::-1]]
           )[0]

           # Post-procesamiento panóptico (si está habilitado)
           panoptic_result = None
           if self.config.enable_panoptic:
               try:
                   panoptic_result = self.processor.post_process_panoptic_segmentation(
                       outputs, target_sizes=[image.size[::-1]]
                   )[0]
               except Exception as e:
                   print(f"Warning: Segmentación panóptica falló para {image_path}: {e}")
                   panoptic_result = None

           # Extraer y procesar máscaras de personas (clase 0 en COCO)
           person_masks = []
           confidences = []
           person_scores = []

           for i, (label, score) in enumerate(zip(
               instance_result["labels"],
               instance_result["scores"]
           )):
               if label == 0 and score > self.config.confidence_threshold:  # Persona
                   mask = instance_result["masks"][i].cpu().numpy().squeeze()
                   # Asegurar que la máscara sea booleana
                   mask = mask > 0.5
                   person_masks.append(mask)
                   confidences.append(float(score.item()))
                   person_scores.append(float(score.item()))

           # Combinar todas las máscaras de personas
           if person_masks:
               combined_mask = np.logical_or.reduce(person_masks)
           else:
               combined_mask = np.zeros(image_np.shape[:2], dtype=bool)

           # Clasificar contexto fotográfico con información panóptica
           context, complexity = self.context_classifier.classify_context(
               image_np, len(person_masks), panoptic_result
           )

           # Estructura base de resultados
           result = {
               'experiment_metadata': {
                   'model_name': 'mask2former',
                   'model_version': self.config.model_name,
                   'model_training_type': self.model_training_type,
                   'timestamp': datetime.now().isoformat(),
                   'device': str(self.device),
                   'confidence_threshold': self.config.confidence_threshold,
                   'panoptic_enabled': self.config.enable_panoptic,
                   'framework_version': 'transformers'  # Para trazabilidad
               },
               'image_data': {
                   'filename': os.path.basename(image_path),
                   'full_path': os.path.abspath(image_path),
                   'resolution': list(image.size),
                   'context_category': context,
                   'context_description': self.context_classifier.get_category_description(context),
                   'complexity_score': float(complexity),
                   'file_size_mb': float(os.path.getsize(image_path) / (1024 * 1024))
               },
               'segmentation_results': {
                   'instance_masks_detected': len(person_masks),
                   'mean_confidence': float(np.mean(confidences)) if confidences else 0.0,
                   'max_confidence': float(np.max(confidences)) if confidences else 0.0,
                   'min_confidence': float(np.min(confidences)) if confidences else 0.0,
                   'confidence_std': float(np.std(confidences)) if len(confidences) > 1 else 0.0,
                   'processing_time_ms': float(processing_time),
                   'memory_delta_mb': float(resources_after.get('memory_used_mb', 0) - resources_before.get('memory_used_mb', 0)),
                   'individual_scores': person_scores
               },
               'performance_metrics': {
                   'resources_before': resources_before,
                   'resources_after': resources_after,
                   'gpu_memory_peak_mb': float(torch.cuda.max_memory_allocated() / (1024 * 1024)) if torch.cuda.is_available() else 0.0
               },
               'metrics': {
                   'coherence_metrics': self.metrics_calculator.coherence_metrics(combined_mask)
               }
           }

           # Añadir información panóptica detallada si está disponible
           if panoptic_result:
               segments_info = panoptic_result.get('segments_info', [])
               person_segments = [seg for seg in segments_info if seg.get('category_id') == 0]

               # Análisis de diversidad de clases detectadas
               category_counts = {}
               for seg in segments_info:
                   cat_id = seg.get('category_id', -1)
                   category_counts[cat_id] = category_counts.get(cat_id, 0) + 1

               result['segmentation_results'].update({
                   'panoptic_person_segments': len(person_segments),
                   'total_panoptic_segments': len(segments_info),
                   'panoptic_categories': len(set(seg.get('category_id', -1) for seg in segments_info)),
                   'category_distribution': category_counts,
                   'average_segment_area': float(np.mean([seg.get('area', 0) for seg in segments_info])) if segments_info else 0.0
               })

               # Métricas de calidad panóptica con ground truth
               if ground_truth_panoptic:
                   panoptic_metrics = self.metrics_calculator.panoptic_quality_metrics(
                       panoptic_result, ground_truth_panoptic
                   )
                   result['metrics']['panoptic_metrics'] = panoptic_metrics

           # Calcular métricas con ground truth de instancias
           if ground_truth_mask is not None:
               try:
                   # Validar compatibilidad de máscaras
                   if ground_truth_mask.shape != combined_mask.shape:
                       print(f"Warning: Redimensionando ground truth mask de {ground_truth_mask.shape} a {combined_mask.shape}")
                       ground_truth_mask = cv2.resize(
                           ground_truth_mask.astype(np.uint8),
                           (combined_mask.shape[1], combined_mask.shape[0])
                       ) > 0.5

                   overlap_metrics = {
                       'iou': self.metrics_calculator.intersection_over_union(
                           combined_mask, ground_truth_mask
                       ),
                       'dice_coefficient': self.metrics_calculator.dice_coefficient(
                           combined_mask, ground_truth_mask
                       )
                   }

                   boundary_metrics = {
                       'boundary_iou': self.metrics_calculator.boundary_iou(
                           combined_mask, ground_truth_mask
                       ),
                       'hausdorff_distance': self.metrics_calculator.hausdorff_distance(
                           combined_mask, ground_truth_mask
                       )
                   }

                   result['metrics']['overlap_metrics'] = overlap_metrics
                   result['metrics']['boundary_metrics'] = boundary_metrics
                   result['metrics']['has_ground_truth'] = True

               except Exception as e:
                   print(f"Error calculando métricas con ground truth: {e}")
                   result['metrics']['ground_truth_error'] = str(e)
           else:
               result['metrics']['has_ground_truth'] = False

           # Guardar máscaras si se requiere
           if self.config.save_masks:
               base_filename = os.path.splitext(os.path.basename(image_path))[0]

               # Guardar máscara de instancias
               instance_mask_filename = f"instance_mask_{base_filename}.png"
               instance_mask_path = os.path.join(self.config.output_dir, instance_mask_filename)
               cv2.imwrite(instance_mask_path, (combined_mask * 255).astype(np.uint8))
               result['segmentation_results']['instance_mask_path'] = instance_mask_path

               # Guardar segmentación panóptica si está disponible
               if panoptic_result:
                   try:
                       panoptic_mask = panoptic_result['segmentation']
                       if hasattr(panoptic_mask, 'cpu'):
                           panoptic_mask = panoptic_mask.cpu().numpy()

                       panoptic_mask_filename = f"panoptic_mask_{base_filename}.png"
                       panoptic_mask_path = os.path.join(self.config.output_dir, panoptic_mask_filename)

                       # Convertir a imagen RGB para visualización
                       panoptic_rgb = self._panoptic_to_rgb(panoptic_mask, panoptic_result['segments_info'])
                       cv2.imwrite(panoptic_mask_path, cv2.cvtColor(panoptic_rgb, cv2.COLOR_RGB2BGR))
                       result['segmentation_results']['panoptic_mask_path'] = panoptic_mask_path

                   except Exception as e:
                       print(f"Error guardando máscara panóptica: {e}")

           return result

       except Exception as e:
           # Crear resultado de error estructurado
           error_result = {
               'experiment_metadata': {
                   'model_name': 'mask2former',
                   'model_version': self.config.model_name,
                   'error': str(e),
                   'timestamp': datetime.now().isoformat()
               },
               'image_data': {
                   'filename': os.path.basename(image_path),
                   'full_path': os.path.abspath(image_path) if os.path.exists(image_path) else image_path,
                   'processing_failed': True,
                   'error_type': type(e).__name__
               },
               'segmentation_results': {
                   'instance_masks_detected': 0,
                   'processing_time_ms': 0.0,
                   'error_message': str(e)
               },
               'metrics': {
                   'has_ground_truth': ground_truth_mask is not None,
                   'processing_error': True
               }
           }
           return error_result

   def _panoptic_to_rgb(self, panoptic_mask: np.ndarray, segments_info: List[Dict]) -> np.ndarray:
       """
       Convierte máscara panóptica a imagen RGB para visualización.

       Args:
           panoptic_mask: Máscara panóptica con IDs de segmentos
           segments_info: Información de los segmentos

       Returns:
           Imagen RGB de la segmentación panóptica coloreada por categorías
       """
       h, w = panoptic_mask.shape
       rgb_mask = np.zeros((h, w, 3), dtype=np.uint8)

       # Mapeo de colores para diferentes categorías
       # Colores especiales para categorías importantes
       category_colors = {
           0: [255, 100, 100],   # Persona - Rojo brillante
           1: [100, 255, 100],   # Vehículo - Verde
           2: [100, 100, 255],   # Objeto - Azul
       }

       # Generar colores consistentes para el resto de categorías
       np.random.seed(42)  # Para colores reproducibles
       colors = np.random.randint(50, 255, (200, 3))  # Evitar colores muy oscuros

       for segment in segments_info:
           try:
               segment_id = segment['id']
               category_id = segment.get('category_id', -1)
               mask = panoptic_mask == segment_id

               # Usar color especial si existe, sino color generado
               if category_id in category_colors:
                   rgb_mask[mask] = category_colors[category_id]
               else:
                   color_idx = (category_id + segment_id) % len(colors)
                   rgb_mask[mask] = colors[color_idx]

           except Exception as e:
               print(f"Error coloreando segmento {segment.get('id', 'unknown')}: {e}")
               continue

       return rgb_mask

   def evaluate_dataset(self, image_paths: List[str],
                       ground_truth_masks: Optional[List[np.ndarray]] = None,
                       ground_truth_panoptic: Optional[List[Dict]] = None) -> List[Dict[str, Any]]:
       """
       Evalúa un conjunto de imágenes con análisis completo de instancias y panóptico.

       Args:
           image_paths: Lista de rutas a las imágenes
           ground_truth_masks: Lista opcional de máscaras ground truth para instancias
           ground_truth_panoptic: Lista opcional de datos panópticos ground truth

       Returns:
           Lista con resultados de evaluación para cada imagen
       """
       print(f"Iniciando evaluación de {len(image_paths)} imágenes")
       print(f"Modelo: {self.config.model_name}")
       print(f"Modo panóptico: {'Habilitado' if self.config.enable_panoptic else 'Deshabilitado'}")
       print(f"Dispositivo: {self.device}")
       print(f"Umbral de confianza: {self.config.confidence_threshold}")

       # Validaciones de entrada
       if ground_truth_masks and len(ground_truth_masks) != len(image_paths):
           raise ValueError(f"Número de máscaras GT ({len(ground_truth_masks)}) != número de imágenes ({len(image_paths)})")

       if ground_truth_panoptic and len(ground_truth_panoptic) != len(image_paths):
           raise ValueError(f"Número de datos panópticos GT ({len(ground_truth_panoptic)}) != número de imágenes ({len(image_paths)})")

       # Verificar que las imágenes existen
       missing_images = [path for path in image_paths if not os.path.exists(path)]
       if missing_images:
           print(f"Warning: {len(missing_images)} imágenes no encontradas:")
           for img in missing_images[:5]:  # Mostrar solo las primeras 5
               print(f"  - {img}")
           if len(missing_images) > 5:
               print(f"  ... y {len(missing_images) - 5} más")

       results = []
       successful_count = 0
       failed_count = 0
       total_processing_time = 0.0

       # Limpiar memoria GPU antes de empezar
       if torch.cuda.is_available():
           torch.cuda.empty_cache()
           torch.cuda.reset_peak_memory_stats()

       for i, image_path in enumerate(image_paths):
           print(f"Procesando [{i+1:4d}/{len(image_paths)}]: {os.path.basename(image_path)}")

           gt_mask = ground_truth_masks[i] if ground_truth_masks else None
           gt_panoptic = ground_truth_panoptic[i] if ground_truth_panoptic else None

           try:
               result = self._process_single_image(image_path, gt_mask, gt_panoptic)

               # Verificar si el procesamiento fue exitoso
               if result.get('image_data', {}).get('processing_failed', False):
                   failed_count += 1
                   print(f"Error: {result.get('segmentation_results', {}).get('error_message', 'Unknown error')}")
               else:
                   successful_count += 1
                   processing_time = result.get('segmentation_results', {}).get('processing_time_ms', 0.0)
                   total_processing_time += processing_time
                   masks_detected = result.get('segmentation_results', {}).get('instance_masks_detected', 0)
                   print(f"{masks_detected} personas detectadas ({processing_time:.1f}ms)")

               results.append(result)

           except Exception as e:
               failed_count += 1
               print(f"Excepción no controlada: {e}")

               # Crear resultado de error mínimo
               error_result = {
                   'experiment_metadata': {'model_name': 'mask2former', 'critical_error': str(e)},
                   'image_data': {'filename': os.path.basename(image_path), 'processing_failed': True},
                   'segmentation_results': {'instance_masks_detected': 0, 'error_message': str(e)},
                   'metrics': {'processing_error': True}
               }
               results.append(error_result)

           # Limpiar caché GPU periódicamente y mostrar progreso
           if torch.cuda.is_available() and (i + 1) % 10 == 0:
               torch.cuda.empty_cache()
               gpu_memory = torch.cuda.memory_allocated() / (1024**3)  # GB
               print(f"GPU memoria actual: {gpu_memory:.2f} GB")

           # Mostrar progreso cada 25% del dataset
           progress_milestones = [len(image_paths) * p // 100 for p in [25, 50, 75]]
           if (i + 1) in progress_milestones:
               progress = ((i + 1) / len(image_paths)) * 100
               avg_time = total_processing_time / max(successful_count, 1)
               print(f"Progreso: {progress:.0f}% - Tiempo promedio: {avg_time:.1f}ms - Exitosas: {successful_count}")

       # Resumen final
       self.results.extend(results)
       success_rate = (successful_count / len(image_paths)) * 100 if image_paths else 0
       avg_processing_time = total_processing_time / max(successful_count, 1)

       print(f"\n=== RESUMEN DE EVALUACIÓN ===")
       print(f"Total imágenes: {len(image_paths)}")
       print(f"Procesadas exitosamente: {successful_count} ({success_rate:.1f}%)")
       print(f"Fallos: {failed_count}")
       print(f"Tiempo promedio por imagen: {avg_processing_time:.1f}ms")
       print(f"Tiempo total de procesamiento: {total_processing_time/1000:.1f}s")

       if torch.cuda.is_available():
           peak_gpu_memory = torch.cuda.max_memory_allocated() / (1024**3)
           print(f"Memoria GPU pico: {peak_gpu_memory:.2f} GB")

       return results

   def save_results(self, filename: Optional[str] = None) -> str:
       """
       Guarda los resultados en formato JSON estructurado para análisis posterior.

       Args:
           filename: Nombre del archivo (opcional)

       Returns:
           Ruta del archivo guardado
       """
       if not filename:
           timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
           model_type = self.model_training_type
           filename = f"mask2former_{model_type}_evaluation_{timestamp}.json"

       filepath = os.path.join(self.config.output_dir, filename)

       # Preparar resumen básico de la evaluación
       successful_results = [r for r in self.results if not r.get('image_data', {}).get('processing_failed', False)]
       failed_results = [r for r in self.results if r.get('image_data', {}).get('processing_failed', False)]

       # Preparar datos para serialización JSON
       def json_serializer(obj):
           """Serializa objetos no estándar para JSON."""
           if isinstance(obj, np.ndarray):
               return obj.tolist()
           elif isinstance(obj, np.integer):
               return int(obj)
           elif isinstance(obj, np.floating):
               return float(obj)
           elif isinstance(obj, torch.Tensor):
               return obj.cpu().numpy().tolist()
           elif hasattr(obj, 'isoformat'):  # datetime objects
               return obj.isoformat()
           else:
               return str(obj)

       serializable_results = []
       for result in self.results:
           try:
               serializable_result = json.loads(json.dumps(result, default=json_serializer))
               serializable_results.append(serializable_result)
           except Exception as e:
               print(f"Error serializando resultado: {e}")
               simplified = {
                   'image_data': {'filename': result.get('image_data', {}).get('filename', 'unknown')},
                   'serialization_error': str(e)
               }
               serializable_results.append(simplified)

       # Estructura final del archivo JSON
       output_data = {
           'evaluation_summary': {
               'total_images': len(self.results),
               'successful_images': len(successful_results),
               'failed_images': len(failed_results),
               'success_rate_percent': (len(successful_results) / len(self.results)) * 100 if self.results else 0,
               'model_config': {
                   'model_name': self.config.model_name,
                   'model_training_type': self.model_training_type,
                   'confidence_threshold': self.config.confidence_threshold,
                   'panoptic_enabled': self.config.enable_panoptic,
                   'device': str(self.device)
               },
               'evaluation_date': datetime.now().isoformat(),
               'available_categories': list(PhotoContextClassifier.CONTEXT_CATEGORIES.keys())
           },
           'results': serializable_results
       }

       # Guardar archivo
       try:
           with open(filepath, 'w', encoding='utf-8') as f:
               json.dump(output_data, f, indent=2, ensure_ascii=False)

           print(f"Resultados guardados en: {filepath}")
           print(f"Tamaño del archivo: {os.path.getsize(filepath) / (1024*1024):.2f} MB")
           return filepath

       except Exception as e:
           print(f"Error guardando resultados: {e}")
           raise

In [None]:
# Ejemplo de uso del evaluador completo
if __name__ == "__main__":
   # Configuración de la evaluación
   config = EvaluationConfig(
       model_name="facebook/mask2former-swin-base-coco-instance",
       device="cuda" if torch.cuda.is_available() else "cpu",
       confidence_threshold=0.5,
       output_dir="./mask2former_results",
       save_masks=True,
       enable_panoptic=True  # Habilitar análisis panóptico
   )

   print("=== EVALUADOR MASK2FORMER PARA TFM ===")
   print(f"Modelo: {config.model_name}")
   print(f"Dispositivo: {config.device}")
   print(f"Análisis panóptico: {'✓' if config.enable_panoptic else '✗'}")

   # Inicializar evaluador
   evaluator = Mask2FormerEvaluator(config)

   # Lista de imágenes a evaluar (personalizar según tu dataset)
   image_paths = [
       "path/to/image1.jpg",
       "path/to/image2.jpg",
       "path/to/image3.jpg",
       # ... agregar más imágenes
   ]

   # PASO 1: Crear template para ground truth (opcional)
   print("\n--- Paso 1: Generando template de anotación ---")
   gt_template_path = create_ground_truth_template(image_paths, config.output_dir)
   print(f"Completar anotaciones en: {gt_template_path}")

   # PASO 2: Evaluar dataset sin ground truth (recolección inicial de datos)
   print("\n--- Paso 2: Evaluando dataset (sin ground truth) ---")
   results = evaluator.evaluate_dataset(image_paths)

   # PASO 3: Guardar resultados estructurados
   print("\n--- Paso 3: Guardando resultados ---")
   results_file = evaluator.save_results()

   print(f"\n=== EVALUACIÓN COMPLETADA ===")
   print(f"Resultados guardados en: {results_file}")
   print(f"Imágenes procesadas: {len([r for r in results if not r.get('image_data', {}).get('processing_failed', False)])}")
   print(f"Template GT disponible en: {gt_template_path}")