# 🔬 YOLOv11 Needle Defect Analysis & Training

Ce notebook permet d'analyser le dataset restructuré et d'entraîner un modèle YOLOv11 pour la détection de défauts sur des aiguilles.

## Configuration des images
- **Dimensions** : 1024×416 pixels (largeur × hauteur)
- **Format** : YOLO segmentation
- **Classes** : Défauts détectés sur les aiguilles

## Pipeline
1. **Analyse du dataset** - Visualisation et statistiques
2. **Validation des dimensions** - Vérification des images
3. **Entraînement du modèle** - YOLOv11 segmentation
4. **Validation et test** - Évaluation des performances


In [None]:
# Import des bibliothèques nécessaires
import os
import json
import yaml
import cv2
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from collections import Counter, defaultdict
import random
from PIL import Image, ImageDraw
import pandas as pd
from ultralytics import YOLO
import torch

# Set style for better plots
plt.style.use('default')
sns.set_palette("husl")

print("✅ Bibliothèques importées avec succès")
print(f"   - PyTorch version: {torch.__version__}")
print(f"   - CUDA disponible: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"   - GPU: {torch.cuda.get_device_name()}")


In [None]:
# Configuration du projet
DATASET_PATH = "Restructured_Dataset"  # Chemin vers le dataset restructuré
MODEL_NAME = "yolo11n-seg.pt"          # Modèle YOLOv11 (nano pour rapidité)
EPOCHS = 100                           # Nombre d'époques d'entraînement
IMAGE_SIZE = (416, 1024)              # Dimensions des images (hauteur, largeur) - DO NOT CHANGE
BATCH_SIZE = 8                        # Taille de batch (ajuster selon votre GPU)

# Configuration des graines pour la reproductibilité
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

print("🔧 Configuration du projet:")
print(f"   - Dataset: {DATASET_PATH}")
print(f"   - Modèle: {MODEL_NAME}")
print(f"   - Époques: {EPOCHS}")
print(f"   - Dimensions: {IMAGE_SIZE[1]}×{IMAGE_SIZE[0]} (largeur × hauteur)")
print(f"   - Batch size: {BATCH_SIZE}")
print(f"   - Graine aléatoire: {RANDOM_SEED}")


In [None]:
# Fonctions de chargement des métadonnées
def load_metadata(dataset_path):
    """Load all metadata files"""
    print("📋 Loading metadata...")
    
    dataset_path = Path(dataset_path)
    yolo_path = dataset_path / "yolo_format"
    metadata_path = dataset_path / "metadata"
    
    # Load data.yaml
    with open(yolo_path / "data.yaml", 'r') as f:
        data_config = yaml.safe_load(f)
    
    # Load split assignment
    with open(metadata_path / "split_assignment.json", 'r') as f:
        split_assignment = json.load(f)
    
    # Load needle summary
    with open(metadata_path / "needle_summary.json", 'r') as f:
        needle_summary = json.load(f)
    
    # Load defect distribution
    with open(metadata_path / "defect_distribution.json", 'r') as f:
        defect_distribution = json.load(f)
        
    print(f"✅ Metadata loaded successfully")
    print(f"   - Classes: {data_config['nc']}")
    print(f"   - Train/Val/Test: {len(split_assignment['train'])}/{len(split_assignment['val'])}/{len(split_assignment['test'])} needles")
    
    return data_config, split_assignment, needle_summary, defect_distribution

print("✅ Fonctions de chargement des métadonnées définies")


In [None]:
# Étape 1: Charger les métadonnées
print("📋 Chargement des métadonnées...")
data_config, split_assignment, needle_summary, defect_distribution = load_metadata(DATASET_PATH)


In [None]:
# Fonctions de validation des dimensions
def validate_image_dimensions(dataset_path, expected_size=(416, 1024)):
    """Validate that all images have the expected dimensions"""
    print(f"\n🔍 Validating image dimensions (expected: {expected_size[1]}x{expected_size[0]} - width x height)...")
    
    dataset_path = Path(dataset_path)
    yolo_path = dataset_path / "yolo_format"
    
    incorrect_images = []
    total_images = 0
    
    for split in ['train', 'val', 'test']:
        split_path = yolo_path / split / "images"
        if split_path.exists():
            for img_path in split_path.glob("*"):
                total_images += 1
                try:
                    # Load image to check dimensions
                    img = cv2.imread(str(img_path))
                    if img is not None:
                        h, w = img.shape[:2]
                        if (h, w) != expected_size:
                            incorrect_images.append({
                                'path': str(img_path),
                                'actual': (w, h),
                                'expected': (expected_size[1], expected_size[0])  # Convert to (width, height) for display
                            })
                except Exception as e:
                    print(f"⚠️  Error reading {img_path}: {e}")
    
    if incorrect_images:
        print(f"❌ Found {len(incorrect_images)} images with incorrect dimensions:")
        for img_info in incorrect_images[:5]:  # Show first 5
            print(f"   {img_info['path']}: {img_info['actual']} (expected: {img_info['expected']})")
        if len(incorrect_images) > 5:
            print(f"   ... and {len(incorrect_images) - 5} more")
    else:
        print(f"✅ All {total_images} images have correct dimensions!")
    
    return len(incorrect_images) == 0

print("✅ Fonction de validation des dimensions définie")


In [None]:
# Étape 2: Valider les dimensions des images
print("🔍 Validation des dimensions...")
validate_image_dimensions(DATASET_PATH, IMAGE_SIZE)


In [None]:
# Fonctions d'analyse et de visualisation
def analyze_dataset_distribution(split_assignment, needle_summary):
    """Analyze and visualize dataset distribution"""
    print("\n📊 Analyzing dataset distribution...")
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('Dataset Distribution Analysis', fontsize=16, fontweight='bold')
    
    # 1. Split distribution (needles)
    split_counts = {split: len(needles) for split, needles in split_assignment.items()}
    axes[0, 0].pie(split_counts.values(), labels=split_counts.keys(), autopct='%1.1f%%', startangle=90)
    axes[0, 0].set_title('Dataset Split (Needles)')
    
    # 2. Images per split
    split_image_counts = {}
    for split, needles in split_assignment.items():
        total_images = sum(needle_summary[needle]['image_count'] for needle in needles if needle in needle_summary)
        split_image_counts[split] = total_images
    
    axes[0, 1].bar(split_image_counts.keys(), split_image_counts.values(), color=['#FF9999', '#66B2FF', '#99FF99'])
    axes[0, 1].set_title('Total Images per Split')
    axes[0, 1].set_ylabel('Number of Images')
    
    # 3. Defect distribution across splits
    defect_split_data = defaultdict(lambda: {'train': 0, 'val': 0, 'test': 0})
    
    for split, needles in split_assignment.items():
        for needle in needles:
            if needle in needle_summary:
                defect_types = needle_summary[needle]['defect_types']
                if defect_types:
                    main_defect = defect_types[0]  # Use first defect as main
                    defect_split_data[main_defect][split] += 1
    
    # Convert to DataFrame for easier plotting
    defect_df = pd.DataFrame(defect_split_data).T.fillna(0)
    defect_df.plot(kind='bar', ax=axes[1, 0], stacked=True)
    axes[1, 0].set_title('Defect Distribution Across Splits')
    axes[1, 0].set_xlabel('Defect Class ID')
    axes[1, 0].set_ylabel('Number of Needles')
    axes[1, 0].legend(title='Split')
    axes[1, 0].tick_params(axis='x', rotation=45)
    
    # 4. Visible defects per needle distribution
    visible_counts = [needle_summary[needle]['visible_defects_count'] 
                     for needle in needle_summary.keys()]
    axes[1, 1].hist(visible_counts, bins=max(1, max(visible_counts)), alpha=0.7, edgecolor='black')
    axes[1, 1].set_title('Distribution of Visible Defects per Needle')
    axes[1, 1].set_xlabel('Number of Visible Defects')
    axes[1, 1].set_ylabel('Number of Needles')
    
    plt.tight_layout()
    plt.show()
    
    # Print summary statistics
    print(f"\n📈 Summary Statistics:")
    print(f"   Total needles: {sum(split_counts.values())}")
    print(f"   Total images: {sum(split_image_counts.values())}")
    print(f"   Images with defects: {sum(needle_summary[n]['visible_defects_count'] for n in needle_summary)}")
    print(f"   Average images per needle: {sum(split_image_counts.values()) / sum(split_counts.values()):.1f}")

print("✅ Fonction d'analyse de distribution définie")


In [None]:
# Étape 3: Analyser la distribution du dataset
print("📊 Analyse de la distribution...")
analyze_dataset_distribution(split_assignment, needle_summary)


In [None]:
# Fonctions de visualisation des échantillons
def parse_yolo_label(label_path):
    """Parse YOLO segmentation label file"""
    annotations = []
    try:
        with open(label_path, 'r') as f:
            lines = f.readlines()
            for line in lines:
                line = line.strip()
                if line:
                    parts = line.split()
                    if len(parts) >= 6:  # At least class + 2 points (x1,y1,x2,y2)
                        class_id = int(parts[0])
                        # Parse polygon points (normalized coordinates)
                        points = [float(x) for x in parts[1:]]
                        # Group points as (x,y) pairs
                        polygon = [(points[i], points[i+1]) for i in range(0, len(points), 2)]
                        annotations.append({'class_id': class_id, 'polygon': polygon})
    except Exception as e:
        print(f"⚠️  Error parsing {label_path}: {e}")
    return annotations

def visualize_samples(dataset_path, data_config, num_samples=6, split='train'):
    """Visualize random samples from the dataset"""
    print(f"\n🖼️  Visualizing {num_samples} samples from {split} set...")
    
    dataset_path = Path(dataset_path)
    yolo_path = dataset_path / "yolo_format"
    split_path = yolo_path / split
    image_files = list((split_path / "images").glob("*"))
    
    # Select random samples
    if len(image_files) < num_samples:
        num_samples = len(image_files)
        print(f"⚠️  Only {num_samples} images available in {split} set")
    
    selected_files = random.sample(image_files, num_samples)
    
    # Create subplot grid
    cols = 3
    rows = (num_samples + cols - 1) // cols
    fig, axes = plt.subplots(rows, cols, figsize=(15, 5 * rows))
    if rows == 1:
        axes = axes.reshape(1, -1)
    
    class_names = data_config['names']
    colors = plt.cm.tab10(np.linspace(0, 1, len(class_names)))
    
    for idx, img_path in enumerate(selected_files):
        row, col = idx // cols, idx % cols
        
        # Load image
        image = cv2.imread(str(img_path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        h, w = image.shape[:2]
        
        # Load corresponding label
        label_path = split_path / "labels" / f"{img_path.stem}.txt"
        annotations = parse_yolo_label(label_path)
        
        # Draw annotations
        img_with_annotations = image.copy()
        for ann in annotations:
            class_id = ann['class_id']
            polygon = ann['polygon']
            
            # Convert normalized coordinates to pixel coordinates
            pixel_polygon = [(int(x * w), int(y * h)) for x, y in polygon]
            
            # Draw polygon
            if len(pixel_polygon) >= 3:
                pts = np.array(pixel_polygon, np.int32)
                pts = pts.reshape((-1, 1, 2))
                
                # Fill polygon with transparency
                overlay = img_with_annotations.copy()
                cv2.fillPoly(overlay, [pts], colors[class_id % len(colors)] * 255)
                img_with_annotations = cv2.addWeighted(img_with_annotations, 0.7, overlay, 0.3, 0)
                
                # Draw polygon outline
                cv2.polylines(img_with_annotations, [pts], True, colors[class_id % len(colors)] * 255, 2)
                
                # Add class label
                if pixel_polygon:
                    center_x = int(np.mean([p[0] for p in pixel_polygon]))
                    center_y = int(np.mean([p[1] for p in pixel_polygon]))
                    label_text = f"{class_names[class_id] if class_id < len(class_names) else f'Class_{class_id}'}"
                    cv2.putText(img_with_annotations, label_text, (center_x-30, center_y),
                              cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        
        # Display
        axes[row, col].imshow(img_with_annotations)
        axes[row, col].set_title(f"{img_path.name}\n{len(annotations)} defects", fontsize=10)
        axes[row, col].axis('off')
    
    # Hide empty subplots
    for idx in range(num_samples, rows * cols):
        row, col = idx // cols, idx % cols
        axes[row, col].axis('off')
    
    plt.tight_layout()
    plt.show()

print("✅ Fonctions de visualisation des échantillons définies")


In [None]:
# Étape 4: Visualiser des échantillons du dataset
print("🖼️ Visualisation des échantillons d'entraînement...")
visualize_samples(DATASET_PATH, data_config, num_samples=6, split='train')


In [None]:
# Étape 5: Visualiser des échantillons de validation
print("🖼️ Visualisation des échantillons de validation...")
visualize_samples(DATASET_PATH, data_config, num_samples=4, split='val')


In [None]:
# Fonctions d'entraînement YOLO
def setup_model(model_name):
    """Initialize YOLO model"""
    print(f"🤖 Setting up YOLO model: {model_name}")
    model = YOLO(model_name)
    print(f"✅ Model loaded successfully")
    return model

def train_model(data_yaml, model, epochs, imgsz, batch_size, patience=50, **kwargs):
    """Train YOLO model"""
    print(f"🚀 Starting training...")
    print(f"   - Epochs: {epochs}")
    print(f"   - Image size: {imgsz}")
    print(f"   - Batch size: {batch_size}")
    print(f"   - Patience: {patience}")
    
    # Training parameters
    train_args = {
        'data': data_yaml,
        'epochs': epochs,
        'imgsz': imgsz,
        'batch': batch_size,
        'patience': patience,
        'save': True,
        'save_period': 10,
        'cache': True,
        'device': 'cuda' if torch.cuda.is_available() else 'cpu',
        'workers': 4,
        'project': 'runs/train',
        'name': 'yolo11_needle_defect',
        **kwargs
    }
    
    # Start training
    results = model.train(**train_args)
    
    print(f"✅ Training completed!")
    return results

def validate_model(data_yaml, model):
    """Validate the trained model"""
    print(f"🔍 Validating model...")
    
    # Run validation
    val_results = model.val(data=data_yaml)
    
    print(f"✅ Validation completed!")
    print(f"   - mAP50: {val_results.box.map50:.3f}")
    print(f"   - mAP50-95: {val_results.box.map:.3f}")
    
    return val_results

print("✅ Fonctions d'entraînement YOLO définies")


In [None]:
# Étape 6: Initialiser le modèle YOLO
print("🤖 Initialisation du modèle...")
model = setup_model(MODEL_NAME)


In [None]:
# Étape 7: Entraîner le modèle
print("🚀 Démarrage de l'entraînement...")
data_yaml_path = str(Path(DATASET_PATH) / "yolo_format" / "data.yaml")
results = train_model(data_yaml_path, model, EPOCHS, IMAGE_SIZE, BATCH_SIZE)


In [None]:
# Étape 8: Valider le modèle entraîné
print("🔍 Validation du modèle...")
val_results = validate_model(data_yaml_path, model)


In [None]:
# Fonctions de test et d'inférence
def test_inference(data_yaml, model, test_images, conf=0.25, save_results=True):
    """Test model inference on sample images"""
    print(f"🔍 Testing inference on {len(test_images)} images...")
    
    results = []
    for img_path in test_images:
        # Run inference
        result = model.predict(img_path, conf=conf, save=save_results)
        results.append(result)
        
        # Print results
        if result[0].boxes is not None:
            num_detections = len(result[0].boxes)
            print(f"   {img_path.name}: {num_detections} defects detected")
        else:
            print(f"   {img_path.name}: No defects detected")
    
    print(f"✅ Inference testing completed!")
    return results

print("✅ Fonctions de test et d'inférence définies")


In [None]:
# Étape 9: Tester l'inférence sur des images de test
print("🔍 Test d'inférence sur des images de test...")
test_images_path = Path(DATASET_PATH) / "yolo_format" / "test" / "images"
test_images = list(test_images_path.glob("*"))[:5]  # Prendre les 5 premières images de test

if test_images:
    inference_results = test_inference(data_yaml_path, model, test_images)
else:
    print("⚠️  Aucune image de test trouvée!")
