# üß± LEGO Assembly Error Detection - Training Notebook

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/tanamujaya/lego_assembly_detection/blob/main/LEGO_Assembly_Error_Detection_Training.ipynb)

This notebook trains a YOLO-based computer vision model to detect assembly errors in LEGO models.

**Features:**
- YOLOv8 object detection (nano version optimized for Raspberry Pi)
- K-fold cross-validation support
- Few-shot fine-tuning capabilities
- Automatic evaluation and metrics visualization

**Author:** Tanaka Mujaya  
**Project:** Bachelor's Thesis - HS Rhein-Waal

---
## 1. Setup Environment

First, let's check if we're running on GPU and install the required packages.

In [None]:
# Check GPU availability
!nvidia-smi

In [None]:
# Install required packages
!pip install ultralytics --quiet
!pip install kaggle --quiet
!pip install scikit-learn --quiet

print("‚úÖ Packages installed successfully!")

In [None]:
# Import libraries
import os
import json
import shutil
import zipfile
from pathlib import Path
from datetime import datetime

import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Image, display

from ultralytics import YOLO
from sklearn.model_selection import train_test_split

print("‚úÖ Libraries imported successfully!")

---
## 2. Download Dataset from Kaggle

The dataset is hosted on Kaggle. You'll need to upload your Kaggle API credentials.

In [None]:
# Upload your kaggle.json file
# Go to https://www.kaggle.com/settings -> API -> Create New Token
# This downloads a kaggle.json file

from google.colab import files

print("Please upload your kaggle.json file:")
uploaded = files.upload()

In [None]:
# Setup Kaggle credentials
# Handle both uppercase and lowercase filename
!mkdir -p ~/.kaggle
!mv -f kaggle.json ~/.kaggle/ 2>/dev/null || mv -f Kaggle.json ~/.kaggle/kaggle.json 2>/dev/null || echo "File already moved"
!chmod 600 ~/.kaggle/kaggle.json

print("‚úÖ Kaggle credentials configured!")

In [None]:
# Download the dataset
KAGGLE_DATASET = "tanakamujaya/lego-assembly-detection-dataset"

!kaggle datasets download -d {KAGGLE_DATASET} -p /content/data --unzip

print("‚úÖ Dataset downloaded!")

In [None]:
# Check the dataset structure
!echo "=== Dataset Structure ==="
!find /content/data -type d

---
## 3. Configuration

Set up training parameters and paths.

In [None]:
# ============================================================================
# CONFIGURATION - Modify these settings as needed
# ============================================================================

# Paths
BASE_DIR = Path("/content")
DATA_DIR = BASE_DIR / "data"
MODELS_DIR = BASE_DIR / "models"
RESULTS_DIR = BASE_DIR / "results"

# Create directories
MODELS_DIR.mkdir(exist_ok=True)
RESULTS_DIR.mkdir(exist_ok=True)

# Dataset configuration
DATASET_CONFIG = {
    'image_size': 416,  # Can use 320 for faster training
    'random_seed': 42
}

# Model configuration
MODEL_CONFIG = {
    'variant': 'yolov8n',  # nano version (fastest, smallest)
    'pretrained': True,
    'num_classes': 2,  # correct_assembly, assembly_error
}

# Training configuration
TRAINING_CONFIG = {
    'epochs': 40,
    'batch_size': 16,
    'learning_rate': 0.001,
    'patience': 20,  # Early stopping
    'device': 0,  # GPU (use 'cpu' if no GPU)
    'workers': 2,
}

# Class names
CLASS_NAMES = ['correct_assembly', 'assembly_error']

print("‚úÖ Configuration set!")
print(f"   Model: {MODEL_CONFIG['variant']}")
print(f"   Image size: {DATASET_CONFIG['image_size']}")
print(f"   Epochs: {TRAINING_CONFIG['epochs']}")
print(f"   Batch size: {TRAINING_CONFIG['batch_size']}")

---
## 4. Prepare Dataset

Locate the `combined_dataset` folder and create the YOLO configuration file.

**Note:** The dataset is already pre-split into train/val folders.

In [None]:
def find_combined_dataset(data_dir):
    """
    Find the combined_dataset folder which contains the full training data.
    Handles pre-split datasets (with train/val subfolders).
    """
    data_dir = Path(data_dir)
    
    print("üîç Searching for combined_dataset...")
    
    # Search for combined_dataset anywhere in the directory tree
    for combined_dir in data_dir.rglob('combined_dataset'):
        # Check if it's a pre-split dataset (has train/val subfolders)
        train_dir = combined_dir / 'train'
        val_dir = combined_dir / 'val'
        
        if train_dir.exists() and val_dir.exists():
            print(f"   ‚úÖ Found pre-split combined_dataset!")
            print(f"   Location: {combined_dir}")
            return combined_dir, 'pre-split'
        
        # Check if it has direct images/labels folders
        images_dir = combined_dir / 'images'
        labels_dir = combined_dir / 'labels'
        
        if images_dir.exists() and labels_dir.exists():
            print(f"   ‚úÖ Found combined_dataset!")
            print(f"   Location: {combined_dir}")
            return combined_dir, 'flat'
    
    # Fallback: look for any dataset with train folder
    print("   ‚ö†Ô∏è combined_dataset not found, searching for alternatives...")
    
    for subdir in data_dir.rglob('train'):
        if (subdir / 'images').exists() and (subdir / 'labels').exists():
            parent = subdir.parent
            print(f"   Using: {parent.name}")
            return parent, 'pre-split'
    
    raise FileNotFoundError(f"Could not find a valid dataset in {data_dir}")


def count_images(directory):
    """Count images in a directory."""
    directory = Path(directory)
    if not directory.exists():
        return 0
    image_extensions = {'.jpg', '.jpeg', '.png', '.bmp'}
    return sum(1 for f in directory.iterdir() if f.suffix.lower() in image_extensions)


# Find the dataset
dataset_dir, dataset_type = find_combined_dataset(DATA_DIR)

# Count samples
if dataset_type == 'pre-split':
    train_count = count_images(dataset_dir / 'train' / 'images')
    val_count = count_images(dataset_dir / 'val' / 'images')
    test_count = count_images(dataset_dir / 'test' / 'images') if (dataset_dir / 'test').exists() else 0
    total_count = train_count + val_count + test_count
    
    print(f"\nüìä Dataset Statistics:")
    print(f"   Train: {train_count} images")
    print(f"   Val: {val_count} images")
    if test_count > 0:
        print(f"   Test: {test_count} images")
    print(f"   Total: {total_count} images")
else:
    total_count = count_images(dataset_dir / 'images')
    print(f"\nüìä Total images: {total_count}")

In [None]:
def create_dataset_yaml(dataset_dir, output_path, dataset_type='pre-split'):
    """
    Create a YOLO dataset.yaml configuration file.
    """
    dataset_dir = Path(dataset_dir)
    output_path = Path(output_path)
    
    # Determine paths based on dataset type
    if dataset_type == 'pre-split':
        train_path = 'train/images'
        val_path = 'val/images'
        test_path = 'test/images' if (dataset_dir / 'test').exists() else 'val/images'
    else:
        train_path = 'images'
        val_path = 'images'
        test_path = 'images'
    
    yaml_content = f"""# LEGO Assembly Error Detection Dataset
# Auto-generated by training notebook

path: {dataset_dir}
train: {train_path}
val: {val_path}
test: {test_path}

# Classes
names:
  0: correct_assembly
  1: assembly_error

# Number of classes
nc: 2
"""
    
    with open(output_path, 'w') as f:
        f.write(yaml_content)
    
    print(f"‚úÖ Created dataset.yaml at: {output_path}")
    return output_path


# Create the YAML configuration
yaml_path = BASE_DIR / 'dataset.yaml'
yaml_path = create_dataset_yaml(dataset_dir, yaml_path, dataset_type)

# Display the YAML content
print("\nüìÑ Dataset configuration:")
print("-" * 40)
!cat {yaml_path}

In [None]:
# Preview some training images
import random
from PIL import Image as PILImage

if dataset_type == 'pre-split':
    train_images_dir = dataset_dir / 'train' / 'images'
else:
    train_images_dir = dataset_dir / 'images'

all_images = list(train_images_dir.iterdir())
sample_images = random.sample(all_images, min(4, len(all_images)))

fig, axes = plt.subplots(1, len(sample_images), figsize=(16, 4))
if len(sample_images) == 1:
    axes = [axes]

for ax, img_path in zip(axes, sample_images):
    img = PILImage.open(img_path)
    ax.imshow(img)
    ax.set_title(img_path.name[:20] + '...')
    ax.axis('off')

plt.suptitle('Sample Training Images', fontsize=14)
plt.tight_layout()
plt.show()

---
## 5. Train the Model

Now let's train the YOLOv8 model on our dataset.

In [None]:
# Initialize model
print(f"üöÄ Loading {MODEL_CONFIG['variant']} model...")

model = YOLO(f"{MODEL_CONFIG['variant']}.pt")

print("‚úÖ Model loaded!")

In [None]:
# Train the model
print("üèãÔ∏è Starting training...")
print(f"   This may take a while depending on your GPU.")
print(f"   Epochs: {TRAINING_CONFIG['epochs']}")
print(f"   Batch size: {TRAINING_CONFIG['batch_size']}")
print()

# Training arguments
train_args = {
    'data': str(yaml_path),
    'epochs': TRAINING_CONFIG['epochs'],
    'batch': TRAINING_CONFIG['batch_size'],
    'imgsz': DATASET_CONFIG['image_size'],
    'device': TRAINING_CONFIG['device'],
    'workers': TRAINING_CONFIG['workers'],
    'patience': TRAINING_CONFIG['patience'],
    'save': True,
    'project': str(RESULTS_DIR),
    'name': 'lego_detection',
    'exist_ok': True,
    'pretrained': MODEL_CONFIG['pretrained'],
    'optimizer': 'Adam',
    'lr0': TRAINING_CONFIG['learning_rate'],
    'verbose': True,
    'plots': True,
    # Augmentation
    'hsv_h': 0.015,
    'hsv_s': 0.7,
    'hsv_v': 0.4,
    'degrees': 10,
    'translate': 0.1,
    'scale': 0.5,
    'fliplr': 0.5,
    'mosaic': 1.0,
    'mixup': 0.1,
}

# Start training
results = model.train(**train_args)

print("\n‚úÖ Training complete!")

---
## 6. Evaluate Results

Let's look at the training metrics and evaluate on the validation set.

In [None]:
# Display training results
results_dir = RESULTS_DIR / 'lego_detection'

# Show results plot
results_img = results_dir / 'results.png'
if results_img.exists():
    display(Image(filename=str(results_img), width=800))
else:
    print("Results plot not found")

In [None]:
# Show confusion matrix
confusion_matrix_img = results_dir / 'confusion_matrix.png'
if confusion_matrix_img.exists():
    display(Image(filename=str(confusion_matrix_img), width=500))
else:
    print("Confusion matrix not found")

In [None]:
# Show PR curve
pr_curve_img = results_dir / 'PR_curve.png'
if pr_curve_img.exists():
    display(Image(filename=str(pr_curve_img), width=500))
else:
    print("PR curve not found")

In [None]:
# Evaluate on validation set
print("üìä Evaluating on validation set...")

# Load best model
best_model_path = results_dir / 'weights' / 'best.pt'
best_model = YOLO(str(best_model_path))

# Run validation
val_results = best_model.val(
    data=str(yaml_path),
    split='val',
    verbose=True
)

print("\n" + "="*50)
print("üìà VALIDATION RESULTS")
print("="*50)
print(f"Precision:    {val_results.results_dict['metrics/precision(B)']:.4f}")
print(f"Recall:       {val_results.results_dict['metrics/recall(B)']:.4f}")
print(f"mAP@0.5:      {val_results.results_dict['metrics/mAP50(B)']:.4f}")
print(f"mAP@0.5:0.95: {val_results.results_dict['metrics/mAP50-95(B)']:.4f}")
print("="*50)

---
## 7. Test Inference

Let's run inference on some validation images to see the model in action.

In [None]:
# Run inference on random validation images
if dataset_type == 'pre-split':
    val_images_dir = dataset_dir / 'val' / 'images'
else:
    val_images_dir = dataset_dir / 'images'

val_images = list(val_images_dir.iterdir())
sample_val_images = random.sample(val_images, min(4, len(val_images)))

print("üîç Running inference on sample images...\n")

# Run prediction
predictions = best_model.predict(
    source=sample_val_images,
    save=True,
    project=str(RESULTS_DIR),
    name='test_predictions',
    exist_ok=True,
    conf=0.5
)

# Display predictions
pred_dir = RESULTS_DIR / 'test_predictions'
pred_images = [f for f in pred_dir.iterdir() if f.suffix.lower() in {'.jpg', '.jpeg', '.png'}]

if pred_images:
    fig, axes = plt.subplots(1, len(pred_images), figsize=(16, 4))
    if len(pred_images) == 1:
        axes = [axes]

    for ax, img_path in zip(axes, pred_images):
        img = PILImage.open(img_path)
        ax.imshow(img)
        ax.set_title(img_path.name[:25])
        ax.axis('off')

    plt.suptitle('Model Predictions', fontsize=14)
    plt.tight_layout()
    plt.show()
else:
    print("No prediction images found")

---
## 8. Save and Download Model

Save the trained model and download it for deployment.

In [None]:
# Copy best model to models directory
final_model_path = MODELS_DIR / 'lego_detector_best.pt'
shutil.copy(best_model_path, final_model_path)

print(f"‚úÖ Best model saved to: {final_model_path}")
print(f"   Model size: {final_model_path.stat().st_size / 1024 / 1024:.2f} MB")

In [None]:
# Export to ONNX format (optional - for optimized inference)
print("üì¶ Exporting to ONNX format...")

onnx_path = best_model.export(
    format='onnx',
    imgsz=DATASET_CONFIG['image_size'],
    simplify=True
)

print(f"‚úÖ ONNX model saved to: {onnx_path}")

In [None]:
# Download the model
from google.colab import files

print("üì• Preparing model for download...")

# Create a zip with the model and results
zip_path = '/content/lego_detector_model.zip'
with zipfile.ZipFile(zip_path, 'w') as zipf:
    zipf.write(final_model_path, 'lego_detector_best.pt')
    if Path(onnx_path).exists():
        zipf.write(onnx_path, Path(onnx_path).name)

print(f"\n‚úÖ Model package ready!")
print(f"   Click below to download:")

files.download(zip_path)

---
## 9. Summary

Training complete! Here's what was accomplished:

In [None]:
# Print summary
print("="*60)
print("üéâ TRAINING SUMMARY")
print("="*60)

print(f"\nüìä Dataset:")
if dataset_type == 'pre-split':
    print(f"   Train: {train_count} images")
    print(f"   Val: {val_count} images")
    if test_count > 0:
        print(f"   Test: {test_count} images")
    print(f"   Total: {total_count} images")
else:
    print(f"   Total: {total_count} images")

print(f"\nü§ñ Model:")
print(f"   Architecture: {MODEL_CONFIG['variant']}")
print(f"   Image size: {DATASET_CONFIG['image_size']}")
print(f"   Classes: {CLASS_NAMES}")

print(f"\nüìà Validation Results:")
print(f"   Precision: {val_results.results_dict['metrics/precision(B)']:.4f}")
print(f"   Recall: {val_results.results_dict['metrics/recall(B)']:.4f}")
print(f"   mAP@0.5: {val_results.results_dict['metrics/mAP50(B)']:.4f}")

print(f"\nüìÅ Outputs:")
print(f"   Best model: {final_model_path}")
print(f"   Results: {results_dir}")

print("\n" + "="*60)
print("Next steps:")
print("1. Download the model zip file")
print("2. Deploy to Raspberry Pi 4B")
print("3. Run inference using inference.py")
print("="*60)

---

## üìö Resources

- **GitHub Repository:** [github.com/tanamujaya/lego_assembly_detection](https://github.com/tanamujaya/lego_assembly_detection)
- **Dataset:** [Kaggle - LEGO Assembly Detection Dataset](https://www.kaggle.com/datasets/tanakamujaya/lego-assembly-detection-dataset)
- **YOLOv8 Documentation:** [docs.ultralytics.com](https://docs.ultralytics.com)

---

*Created as part of Bachelor's Thesis at HS Rhein-Waal*