# Balloon Detection Training with Roboflow Dataset

This notebook trains a YOLO model on the Roboflow balloon detection dataset with a 90/5/5 train/test/validation split.

## Project Context
This is part of an air defense demo system that detects and tracks balloon targets with friend/foe discrimination by color.

## Dataset Split Strategy
- **Training**: 90%
- **Test**: 5% 
- **Validation**: 5%


## 1. Setup and Installation


In [None]:
# Install required packages
!pip install roboflow ultralytics matplotlib seaborn pandas numpy opencv-python pillow

# Import libraries
import os
import json
import shutil
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from sklearn.model_selection import train_test_split
import cv2
from PIL import Image
import torch

# YOLO and Roboflow
from ultralytics import YOLO
from roboflow import Roboflow

# Set random seeds for reproducibility
random.seed(42)
np.random.seed(42)

# Check CUDA availability
print("🔍 System Information:")
print(f"  • PyTorch version: {torch.__version__}")
print(f"  • CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"  • CUDA version: {torch.version.cuda}")
    print(f"  • GPU count: {torch.cuda.device_count()}")
    print(f"  • Current GPU: {torch.cuda.get_device_name(0)}")
    print(f"  • GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    print("  • Using CPU (no CUDA available)")

print("\n✅ All packages imported successfully!")


## 2. Download Roboflow Dataset


In [None]:
# Initialize Roboflow and download dataset
rf = Roboflow(api_key="J82Hqb1HfGHJDIfUt6PJ")
project = rf.workspace("hss-1e8zb").project("balloon-detection-hyqqa")
version = project.version(2)
dataset = version.download("yolov11")

print(f"✅ Dataset downloaded to: {dataset.location}")
print(f"Dataset info: {dataset}")


## 3. Analyze Original Dataset Structure


In [None]:
# Explore the downloaded dataset structure
dataset_path = Path(dataset.location)
print(f"Dataset location: {dataset_path}")
print("\nDataset structure:")

for root, dirs, files in os.walk(dataset_path):
    level = root.replace(str(dataset_path), '').count(os.sep)
    indent = ' ' * 2 * level
    print(f"{indent}{os.path.basename(root)}/")
    subindent = ' ' * 2 * (level + 1)
    for file in files[:5]:  # Show first 5 files
        print(f"{subindent}{file}")
    if len(files) > 5:
        print(f"{subindent}... and {len(files) - 5} more files")


In [None]:
# Check the original data.yaml file
data_yaml_path = dataset_path / "data.yaml"
if data_yaml_path.exists():
    with open(data_yaml_path, 'r') as f:
        original_config = f.read()
    print("Original data.yaml:")
    print(original_config)
else:
    print("No data.yaml found in dataset")


## 4. Create Custom 90/5/5 Split


In [None]:
# Create custom split directories
custom_dataset_path = Path("data/roboflow_custom")
custom_dataset_path.mkdir(parents=True, exist_ok=True)

# Create split directories
splits = ['train', 'test', 'val']
for split in splits:
    (custom_dataset_path / split / 'images').mkdir(parents=True, exist_ok=True)
    (custom_dataset_path / split / 'labels').mkdir(parents=True, exist_ok=True)

print("✅ Created custom dataset directory structure")


In [None]:
# Find all images and labels from the original dataset
def find_dataset_files(dataset_path):
    """Find all image and label files in the dataset"""
    images = []
    labels = []
    
    # Look for images in train/val/test directories
    for split in ['train', 'val', 'test']:
        split_path = dataset_path / split
        if split_path.exists():
            img_path = split_path / 'images'
            lbl_path = split_path / 'labels'
            
            if img_path.exists():
                for img_file in img_path.glob('*.jpg'):
                    images.append(img_file)
            if lbl_path.exists():
                for lbl_file in lbl_path.glob('*.txt'):
                    labels.append(lbl_file)
    
    return images, labels

all_images, all_labels = find_dataset_files(dataset_path)
print(f"Found {len(all_images)} images and {len(all_labels)} labels")

# Create image-label pairs
image_label_pairs = []
for img_path in all_images:
    # Find corresponding label file
    label_name = img_path.stem + '.txt'
    label_path = img_path.parent.parent / 'labels' / label_name
    
    if label_path.exists():
        image_label_pairs.append((img_path, label_path))
    else:
        print(f"Warning: No label found for {img_path.name}")

print(f"Created {len(image_label_pairs)} image-label pairs")


In [None]:
# Split the data: 90% train, 5% test, 5% val
random.shuffle(image_label_pairs)

total_samples = len(image_label_pairs)
train_size = int(0.90 * total_samples)
test_size = int(0.05 * total_samples)
val_size = total_samples - train_size - test_size

train_pairs = image_label_pairs[:train_size]
test_pairs = image_label_pairs[train_size:train_size + test_size]
val_pairs = image_label_pairs[train_size + test_size:]

print(f"Dataset split:")
print(f"  Training: {len(train_pairs)} samples ({len(train_pairs)/total_samples*100:.1f}%)")
print(f"  Test: {len(test_pairs)} samples ({len(test_pairs)/total_samples*100:.1f}%)")
print(f"  Validation: {len(val_pairs)} samples ({len(val_pairs)/total_samples*100:.1f}%)")
print(f"  Total: {total_samples} samples")


In [None]:
# Copy files to custom split directories
def copy_files(pairs, split_name):
    """Copy image and label files to the specified split directory"""
    split_path = custom_dataset_path / split_name
    
    for img_path, lbl_path in pairs:
        # Copy image
        img_dest = split_path / 'images' / img_path.name
        shutil.copy2(img_path, img_dest)
        
        # Copy label
        lbl_dest = split_path / 'labels' / lbl_path.name
        shutil.copy2(lbl_path, lbl_dest)

# Copy files for each split
copy_files(train_pairs, 'train')
copy_files(test_pairs, 'test')
copy_files(val_pairs, 'val')

print("✅ Files copied to custom split directories")


## 5. Create Custom data.yaml Configuration


In [None]:
# Create custom data.yaml for our 90/5/5 split
data_yaml_content = f"""# Custom Roboflow Balloon Detection Dataset - 90/5/5 Split
# Generated for air defense demo system

# Dataset paths (relative to this file)
path: {custom_dataset_path.absolute()}
train: train/images
val: val/images
test: test/images

# Classes
nc: 1  # number of classes
names: ['balloon']  # class names

# Dataset info
total_samples: {total_samples}
train_samples: {len(train_pairs)}
val_samples: {len(val_pairs)}
test_samples: {len(test_pairs)}
split_ratio: "90/5/5"
"""

# Write the custom data.yaml
custom_data_yaml = custom_dataset_path / "data.yaml"
with open(custom_data_yaml, 'w') as f:
    f.write(data_yaml_content)

print("✅ Created custom data.yaml")
print("\nCustom data.yaml content:")
print(data_yaml_content)


## 6. Dataset Analysis and Visualization


In [None]:
# Analyze dataset statistics
def analyze_dataset(split_pairs, split_name):
    """Analyze a dataset split"""
    total_objects = 0
    image_sizes = []
    
    for img_path, lbl_path in split_pairs:
        # Count objects in label file
        with open(lbl_path, 'r') as f:
            lines = f.readlines()
            total_objects += len([line for line in lines if line.strip()])
        
        # Get image dimensions
        img = Image.open(img_path)
        image_sizes.append(img.size)
    
    avg_width = np.mean([size[0] for size in image_sizes])
    avg_height = np.mean([size[1] for size in image_sizes])
    
    return {
        'split': split_name,
        'samples': len(split_pairs),
        'total_objects': total_objects,
        'avg_objects_per_image': total_objects / len(split_pairs) if split_pairs else 0,
        'avg_width': avg_width,
        'avg_height': avg_height
    }

# Analyze all splits
train_stats = analyze_dataset(train_pairs, 'train')
val_stats = analyze_dataset(val_pairs, 'val')
test_stats = analyze_dataset(test_pairs, 'test')

# Create summary DataFrame
stats_df = pd.DataFrame([train_stats, val_stats, test_stats])
print("Dataset Statistics:")
print(stats_df.to_string(index=False))


In [None]:
# Visualize dataset distribution
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Sample distribution pie chart
axes[0, 0].pie([len(train_pairs), len(val_pairs), len(test_pairs)], 
               labels=['Train (90%)', 'Val (5%)', 'Test (5%)'], 
               autopct='%1.1f%%', startangle=90)
axes[0, 0].set_title('Dataset Split Distribution')

# Object count per split
splits = ['Train', 'Val', 'Test']
object_counts = [train_stats['total_objects'], val_stats['total_objects'], test_stats['total_objects']]
axes[0, 1].bar(splits, object_counts, color=['blue', 'orange', 'green'])
axes[0, 1].set_title('Total Objects per Split')
axes[0, 1].set_ylabel('Number of Objects')

# Average objects per image
avg_objects = [train_stats['avg_objects_per_image'], val_stats['avg_objects_per_image'], test_stats['avg_objects_per_image']]
axes[1, 0].bar(splits, avg_objects, color=['blue', 'orange', 'green'])
axes[1, 0].set_title('Average Objects per Image')
axes[1, 0].set_ylabel('Average Objects')

# Image size distribution
all_sizes = []
for split_pairs in [train_pairs, val_pairs, test_pairs]:
    for img_path, _ in split_pairs:
        img = Image.open(img_path)
        all_sizes.append(img.size)

widths = [size[0] for size in all_sizes]
heights = [size[1] for size in all_sizes]
axes[1, 1].scatter(widths, heights, alpha=0.6)
axes[1, 1].set_xlabel('Width (pixels)')
axes[1, 1].set_ylabel('Height (pixels)')
axes[1, 1].set_title('Image Size Distribution')

plt.tight_layout()
plt.show()


In [None]:
# Display sample images with annotations
def display_sample_images(split_pairs, split_name, num_samples=4):
    """Display sample images with their annotations"""
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    axes = axes.flatten()
    
    sample_pairs = random.sample(split_pairs, min(num_samples, len(split_pairs)))
    
    for idx, (img_path, lbl_path) in enumerate(sample_pairs):
        # Load image
        img = cv2.imread(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h, w = img.shape[:2]
        
        # Load annotations
        with open(lbl_path, 'r') as f:
            lines = f.readlines()
        
        # Draw bounding boxes
        for line in lines:
            if line.strip():
                parts = line.strip().split()
                if len(parts) >= 5:
                    class_id, x_center, y_center, width, height = map(float, parts[:5])
                    
                    # Convert from normalized coordinates to pixel coordinates
                    x_center *= w
                    y_center *= h
                    width *= w
                    height *= h
                    
                    # Calculate corner coordinates
                    x1 = int(x_center - width/2)
                    y1 = int(y_center - height/2)
                    x2 = int(x_center + width/2)
                    y2 = int(y_center + height/2)
                    
                    # Draw rectangle
                    cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
                    cv2.putText(img, 'balloon', (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
        
        axes[idx].imshow(img)
        axes[idx].set_title(f'{split_name}: {img_path.name}')
        axes[idx].axis('off')
    
    # Hide unused subplots
    for idx in range(len(sample_pairs), len(axes)):
        axes[idx].axis('off')
    
    plt.suptitle(f'Sample Images from {split_name} Split', fontsize=16)
    plt.tight_layout()
    plt.show()

# Display samples from each split
display_sample_images(train_pairs, 'Training', 4)
display_sample_images(val_pairs, 'Validation', 4)
display_sample_images(test_pairs, 'Test', 4)


## 7. Model Training Configuration


In [None]:
# Training configuration
training_config = {
    'model': 'yolo11n.pt',  # YOLOv11 nano for speed
    'data': str(custom_data_yaml),
    'epochs': 100,
    'imgsz': 640,
    'batch': 16,
    'patience': 20,  # Early stopping
    'dropout': 0.1,  # Regularization
    'lr0': 0.01,     # Learning rate
    'momentum': 0.937,
    'weight_decay': 0.0005,
    'warmup_epochs': 3,
    'cos_lr': True,  # Cosine learning rate scheduler
    'save_period': 10,
    'project': 'runs/train',
    'name': 'roboflow_balloon_90_5_5',
    
    # GPU/CUDA settings
    'device': 0 if torch.cuda.is_available() else 'cpu',  # Use GPU if available
    'workers': 8 if torch.cuda.is_available() else 4,     # More workers for GPU
    
    # Data augmentation
    'hsv_h': 0.015,
    'hsv_s': 0.7,
    'hsv_v': 0.4,
    'degrees': 0.0,
    'translate': 0.1,
    'scale': 0.5,
    'shear': 0.0,
    'perspective': 0.0,
    'flipud': 0.0,
    'fliplr': 0.5,
    'mosaic': 1.0,
    'mixup': 0.0,
    
    # Model settings
    'save': True,
    'save_txt': True,
    'save_conf': True,
    'save_crop': False,
    'show_labels': True,
    'show_conf': True,
    'visualize': False,
    'augment': True,
    'agnostic_nms': False,
    'retina_masks': False,
    'overlap_mask': True,
    'mask_ratio': 4,
}

print("Training Configuration:")
for key, value in training_config.items():
    print(f"  {key}: {value}")

print(f"\n🚀 Training will use: {'GPU (CUDA)' if torch.cuda.is_available() else 'CPU'}")
if torch.cuda.is_available():
    print(f"   • Device: {torch.cuda.get_device_name(0)}")
    print(f"   • Workers: {training_config['workers']}")
else:
    print(f"   • Workers: {training_config['workers']}")


## 8. Model Training


In [None]:
# Load the model
model = YOLO(training_config['model'])
print(f"✅ Loaded model: {training_config['model']}")

# Start training
print("\n🚀 Starting training...")
print(f"Dataset: {training_config['data']}")
print(f"Epochs: {training_config['epochs']}")
print(f"Batch size: {training_config['batch']}")
print(f"Image size: {training_config['imgsz']}")

# Train the model
results = model.train(**training_config)

print("\n✅ Training completed!")
print(f"Results saved to: runs/train/{training_config['name']}/")


## 9. Training Results Analysis


In [None]:
# Load training results
results_path = Path(f"runs/train/{training_config['name']}")
results_csv = results_path / "results.csv"

if results_csv.exists():
    # Load results
    results_df = pd.read_csv(results_csv)
    
    print("Training Results Summary:")
    print(f"Total epochs: {len(results_df)}")
    print(f"Best mAP50: {results_df['metrics/mAP50(B)'].max():.4f}")
    print(f"Best mAP50-95: {results_df['metrics/mAP50-95(B)'].max():.4f}")
    print(f"Final loss: {results_df['train/box_loss'].iloc[-1]:.4f}")
    
    # Display last few epochs
    print("\nLast 5 epochs:")
    print(results_df[['epoch', 'train/box_loss', 'val/box_loss', 'metrics/mAP50(B)', 'metrics/mAP50-95(B)']].tail())
else:
    print("Results CSV not found. Training may still be in progress.")


In [None]:
# Plot training curves
if results_csv.exists():
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Loss curves
    axes[0, 0].plot(results_df['epoch'], results_df['train/box_loss'], label='Train Box Loss', color='blue')
    axes[0, 0].plot(results_df['epoch'], results_df['val/box_loss'], label='Val Box Loss', color='red')
    axes[0, 0].set_title('Box Loss')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Loss')
    axes[0, 0].legend()
    axes[0, 0].grid(True)
    
    # mAP curves
    axes[0, 1].plot(results_df['epoch'], results_df['metrics/mAP50(B)'], label='mAP@0.5', color='green')
    axes[0, 1].plot(results_df['epoch'], results_df['metrics/mAP50-95(B)'], label='mAP@0.5:0.95', color='orange')
    axes[0, 1].set_title('mAP Metrics')
    axes[0, 1].set_xlabel('Epoch')
    axes[0, 1].set_ylabel('mAP')
    axes[0, 1].legend()
    axes[0, 1].grid(True)
    
    # Precision and Recall
    axes[1, 0].plot(results_df['epoch'], results_df['metrics/precision(B)'], label='Precision', color='purple')
    axes[1, 0].plot(results_df['epoch'], results_df['metrics/recall(B)'], label='Recall', color='brown')
    axes[1, 0].set_title('Precision & Recall')
    axes[1, 0].set_xlabel('Epoch')
    axes[1, 0].set_ylabel('Score')
    axes[1, 0].legend()
    axes[1, 0].grid(True)
    
    # Learning rate
    if 'lr/pg0' in results_df.columns:
        axes[1, 1].plot(results_df['epoch'], results_df['lr/pg0'], label='Learning Rate', color='red')
        axes[1, 1].set_title('Learning Rate Schedule')
        axes[1, 1].set_xlabel('Epoch')
        axes[1, 1].set_ylabel('Learning Rate')
        axes[1, 1].legend()
        axes[1, 1].grid(True)
    
    plt.tight_layout()
    plt.show()
else:
    print("Cannot plot training curves - results CSV not found.")


## 10. Model Validation and Testing


In [None]:
# Load the best model
best_model_path = results_path / "weights" / "best.pt"
if best_model_path.exists():
    best_model = YOLO(str(best_model_path))
    print(f"✅ Loaded best model: {best_model_path}")
    
    # Validate on test set
    print("\n🔍 Validating on test set...")
    test_results = best_model.val(data=str(custom_data_yaml), split='test')
    
    print(f"Test Results:")
    print(f"  mAP@0.5: {test_results.box.map50:.4f}")
    print(f"  mAP@0.5:0.95: {test_results.box.map:.4f}")
    print(f"  Precision: {test_results.box.mp:.4f}")
    print(f"  Recall: {test_results.box.mr:.4f}")
else:
    print("Best model not found. Training may not be complete.")


## 11. Inference on Sample Images


In [None]:
# Run inference on sample images
if best_model_path.exists():
    # Get sample images from test set
    test_images = list((custom_dataset_path / 'test' / 'images').glob('*.jpg'))[:4]
    
    if test_images:
        print(f"Running inference on {len(test_images)} test images...")
        
        # Run inference
        results = best_model(test_images, save=True, save_txt=True, conf=0.25)
        
        # Display results
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        axes = axes.flatten()
        
        for idx, result in enumerate(results):
            if idx < len(axes):
                # Get the image with predictions
                img = result.plot()
                axes[idx].imshow(img)
                axes[idx].set_title(f'Prediction: {result.path.name}')
                axes[idx].axis('off')
                
                # Print detection info
                if result.boxes is not None and len(result.boxes) > 0:
                    print(f"{result.path.name}: {len(result.boxes)} detections")
                    for box in result.boxes:
                        conf = box.conf.item()
                        print(f"  Confidence: {conf:.3f}")
                else:
                    print(f"{result.path.name}: No detections")
        
        # Hide unused subplots
        for idx in range(len(results), len(axes)):
            axes[idx].axis('off')
        
        plt.suptitle('Model Predictions on Test Images', fontsize=16)
        plt.tight_layout()
        plt.show()
    else:
        print("No test images found for inference.")
else:
    print("Cannot run inference - best model not found.")


## 12. Model Performance Summary


In [None]:
# Create performance summary
if results_csv.exists() and best_model_path.exists():
    print("=" * 60)
    print("🎯 BALLOON DETECTION MODEL - PERFORMANCE SUMMARY")
    print("=" * 60)
    
    print(f"\n📊 Dataset Information:")
    print(f"  • Total samples: {total_samples}")
    print(f"  • Training samples: {len(train_pairs)} (90%)")
    print(f"  • Validation samples: {len(val_pairs)} (5%)")
    print(f"  • Test samples: {len(test_pairs)} (5%)")
    print(f"  • Total objects: {train_stats['total_objects'] + val_stats['total_objects'] + test_stats['total_objects']}")
    
    print(f"\n🏋️ Training Configuration:")
    print(f"  • Model: {training_config['model']}")
    print(f"  • Epochs: {training_config['epochs']}")
    print(f"  • Batch size: {training_config['batch']}")
    print(f"  • Image size: {training_config['imgsz']}")
    print(f"  • Learning rate: {training_config['lr0']}")
    print(f"  • Early stopping patience: {training_config['patience']}")
    
    print(f"\n📈 Training Results:")
    print(f"  • Best mAP@0.5: {results_df['metrics/mAP50(B)'].max():.4f}")
    print(f"  • Best mAP@0.5:0.95: {results_df['metrics/mAP50-95(B)'].max():.4f}")
    print(f"  • Final training loss: {results_df['train/box_loss'].iloc[-1]:.4f}")
    print(f"  • Final validation loss: {results_df['val/box_loss'].iloc[-1]:.4f}")
    
    if 'test_results' in locals():
        print(f"\n🧪 Test Set Performance:")
        print(f"  • mAP@0.5: {test_results.box.map50:.4f}")
        print(f"  • mAP@0.5:0.95: {test_results.box.map:.4f}")
        print(f"  • Precision: {test_results.box.mp:.4f}")
        print(f"  • Recall: {test_results.box.mr:.4f}")
    
    print(f"\n💾 Model Files:")
    print(f"  • Best model: {best_model_path}")
    print(f"  • Last model: {results_path / 'weights' / 'last.pt'}")
    print(f"  • Results: {results_path}")
    
    print(f"\n🎯 Air Defense System Readiness:")
    best_map50 = results_df['metrics/mAP50(B)'].max()
    if best_map50 > 0.8:
        print(f"  ✅ Excellent detection performance (mAP@0.5: {best_map50:.3f})")
    elif best_map50 > 0.6:
        print(f"  ✅ Good detection performance (mAP@0.5: {best_map50:.3f})")
    elif best_map50 > 0.4:
        print(f"  ⚠️  Moderate detection performance (mAP@0.5: {best_map50:.3f})")
    else:
        print(f"  ❌ Poor detection performance (mAP@0.5: {best_map50:.3f})")
    
    print("\n" + "=" * 60)
    print("Training completed successfully! 🚀")
    print("=" * 60)
else:
    print("⚠️  Training results not available. Please check if training completed successfully.")


## 13. Export Model for Deployment


In [None]:
# Export model for deployment
if best_model_path.exists():
    print("📦 Exporting model for deployment...")
    
    # Export to different formats
    export_formats = ['onnx', 'torchscript']
    
    for fmt in export_formats:
        try:
            exported_path = best_model.export(format=fmt)
            print(f"  ✅ Exported to {fmt}: {exported_path}")
        except Exception as e:
            print(f"  ❌ Failed to export to {fmt}: {e}")
    
    print("\n🎯 Model ready for air defense system deployment!")
    print("\nNext steps:")
    print("1. Integrate with tracking system (SORT)")
    print("2. Add friend/foe color classification")
    print("3. Implement safety gates and no-fire zones")
    print("4. Deploy on edge device (RPi5/Jetson)")
else:
    print("Cannot export model - best model not found.")


---

## 🎯 **Notebook Complete!**

This notebook provides a complete pipeline for training a balloon detection model on the Roboflow dataset with your requested 90/5/5 split. The trained model is ready for integration into your air defense system.

**Key Features:**
- ✅ Roboflow dataset download and processing
- ✅ Custom 90/5/5 train/test/validation split
- ✅ Comprehensive dataset analysis and visualization
- ✅ YOLOv11 training with optimized parameters
- ✅ Training metrics and performance analysis
- ✅ Model validation and inference testing
- ✅ Model export for deployment (ONNX/TorchScript)

**Ready for your air defense demo system! 🚀**
