In [1]:
import os
import argparse
import glob
from pathlib import Path
from ultralytics import YOLO
import cv2
import yaml

# Dataset Preperation

In [2]:
def data_path():
    dataset_config = {
        "path": "",
        "train":r"C:\Users\himan\Downloads\TE_DefectsBody\images\train",
        "val":r"C:\Users\himan\Downloads\TE_DefectsBody\images\val",
        "test":r"D:\Data centr\IMG_data\UAV_Data\BodyFuselage",

        "names": {
            0: 'Crack',
            1: 'Dent_Scratch',
            2: 'Pitted Surface'
            }
    }

    with open('defect.yaml', 'w') as f:
        yaml.dump(dataset_config, f, default_flow_Style=False)

    print ("dataset created 'defect.yaml'")
    return 'defect.yaml' 


# Model Training

In [3]:
def train_yolo_model(dataset_yaml, epochs=100, img_size=640, batch=16):
    yolo_model=YOLO("yolov8n.pt")
    results=yolo_model.train(
        data=dataset_yaml,
        epochs=epochs,
        imgsz=img_size,
        batch=batch,
        name="drone_defect_detector",
        patience=20,
        save=True,
        device="0"
    )
    print (f"Model training completed")
    return yolo_model

In [4]:
def evaluate_model(yolo_model, test_data=None):
    if test_data is None:
        test_data=r"C:\Users\himan\Downloads\ac_damage_detector.v5i.yolov11\train\images\proxy-image-6-_jpeg.rf.60f09ba871e1815add8adbbc7b0061d4.jpg"

    results=yolo_model.val()

    print("Model Evaluation Results:")
    print(f"mAP@0.5: {results.box.map50:.4f}")
    print(f"mAP@0.5:0.95: {results.box.map:.4f}")
    print(f"Precision: {results.box.p:.4f}")
    print(f"Recall: {results.box.r:.4f}")

    return results

In [5]:
def process_image(yolo_model, img_pth, conf_threshold=0.25):
    img=cv2.imread(img_pth)

    if img is None:
        raise ValueError(f"Error:{img_pth}")
    
    img_rgb=cv2.cvtColor(img_rgb, conf=conf_threshold)
    result=result[0]

    output_image=img_rgb.copy()

    detected_defects=[]

    for box in result.boxes:
        x1,y1, x2, y2=map(int, box.xyxy[0])
        class_id=int(box.clss[0])
        confidence=float(box.conf[0])

        class_name=result.names[class_id]
        color=(255, 0, 0) if class_id==0 else (0, 255, 0) if class_id==1 else(0, 0, 255)
        cv2.rectangle(output_image, (x1,y1), (x2,y2), color, 2)

        label=f"{class_name}: {confidence:.2f}"
        cv2.putText(output_image, label, (x1,y1-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        
        detected_defects.append({
            'type':class_name,
            'confidence':confidence,
            'bounding_box':[x1,y1,x2,y2]
        })
        return output_image,detected_defects

In [7]:
def convert_annotations_to_yolo(input_folder, output_folder, img_width, img_height):
    os.makedirs(output_folder, exist_ok=True)
    print("Converting annotation to YOLO format")
    print ("Note: Implementation depends on your src")
    print(f"converted annotation saved to {output_folder}")

In [None]:
def main():
    print ("Drone Defection System")

    dataset_yaml=dataset

In [2]:
UAV_Classes= ['Tail', 'Left Wing', 'Right Wing', 'VTOL Arm', 'Propellers', 'Body Fuselage', 'Payload Box']

In [None]:
def detect_uav(model, image_path, output_path):
   
    image = cv2.imread(image_path)
    
    
    results = model.predict(source=image, conf=0.25, save=True, save_txt=True, project=output_path, name='inference', exist_ok=True)
    

    for result in results:
        boxes = result.boxes
        for box in boxes:
            cls = int(box.cls[0])
            conf = box.conf[0]
            if cls in range(len(UAV_Classes)):
                label = UAV_Classes[cls]
                print(f"Detected {label} with confidence {conf:.2f}")

In [None]:
def create_data_yaml(path_to_classes_txt, path_to_data_yaml):

 
  if not os.path.exists(path_to_classes_txt):
    print(f'classes.txt file not found! Please create a classes.txt labelmap and move it to {path_to_classes_txt}')
    return
  with open(path_to_classes_txt, 'r') as f:
    classes = []
    for line in f.readlines():
      if len(line.strip()) == 0: continue
      classes.append(line.strip())
  number_of_classes = len(classes)

  data = {
    'train': '/content/custom_data/train/images',
    'val': '/content/custom_data/valid/images',
    'test': '/content/custom_data/test/images',
    'nc': number_of_classes,
    'names': classes
  }

  with open(path_to_data_yaml, 'w') as f:
    yaml.dump(data, f, sort_keys=False)
  print(f'Created config file at {path_to_data_yaml}')

  return

path_to_classes_txt = '/content/custom_data/classes.txt'
path_to_data_yaml = '/content/data.yaml'

create_data_yaml(path_to_classes_txt, path_to_data_yaml)

print('\nFile contents:\n')
!cat /content/data.yaml

In [None]:
for image_path in glob.glob(f'/content/runs/detect/predict/*.jpg')[:10]:
  display(Image(filename=image_path, height=400))
  print('\n')

In [None]:
class UAVPartsDetector:
    def __init__(self, model_path=None):
        """Initialize the UAV parts detector"""
        if model_path and os.path.exists(model_path):
            self.model = YOLO(model_path)
        else:
            self.model = YOLO('yolov8n.pt')
        
        self.classes = UAV_CLASSES
    
    def train(self, data_yaml_path, epochs=100, img_size=640, batch_size=16):
        """Train the YOLO model on UAV parts dataset"""
    
        self.model.train(
            data=data_yaml_path,
            epochs=epochs,
            imgsz=img_size,
            batch=batch_size,
            patience=20, 
            device='0' if os.environ.get('CUDA_VISIBLE_DEVICES') else 'cpu'
        )
    
        self.model = YOLO(f'runs/detect/train/weights/best.pt')
        return f'runs/detect/train/weights/best.pt'
    
    def predict(self, image_path, conf=0.25):
        """Run prediction on a single image"""
        results = self.model.predict(image_path, conf=conf)
        return results[0]
    
    def detect_and_categorize(self, image_path, output_dir, conf=0.25):
        """Detect UAV parts and save cropped parts to respective folders"""
    
        os.makedirs(output_dir, exist_ok=True)
        for class_name in self.classes:
            os.makedirs(os.path.join(output_dir, class_name.replace(' ', '_')), exist_ok=True)
        
     
        results = self.predict(image_path, conf=conf)

        img = cv2.imread(image_path)
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
      
        boxes = results.boxes.xyxy.cpu().numpy()
        classes = results.boxes.cls.cpu().numpy().astype(int)
        confs = results.boxes.conf.cpu().numpy()
        
        detections = []
        for i, (box, class_id, conf_score) in enumerate(zip(boxes, classes, confs)):
            if class_id < len(self.classes):  
                class_name = self.classes[class_id]
                x1, y1, x2, y2 = map(int, box)
                
                part_img = img_rgb[y1:y2, x1:x2]
                if part_img.size == 0:
                    continue
                
                
                part_filename = f"{os.path.basename(image_path).split('.')[0]}_{class_name.replace(' ', '_')}_{i}.jpg"
                save_path = os.path.join(output_dir, class_name.replace(' ', '_'), part_filename)
                
            
                part_img_bgr = cv2.cvtColor(part_img, cv2.COLOR_RGB2BGR)
                cv2.imwrite(save_path, part_img_bgr)
                
                detections.append({
                    'class': class_name,
                    'confidence': float(conf_score),
                    'bbox': [int(x1), int(y1), int(x2), int(y2)],
                    'saved_path': save_path
                })
        
        return detections
    
    def process_directory(self, input_dir, output_dir, conf=0.25):
        """Process all images in a directory"""
        os.makedirs(output_dir, exist_ok=True)
        
        supported_formats = ['.jpg', '.jpeg', '.png', '.bmp']
        all_detections = {}
        
        for root, _, files in os.walk(input_dir):
            for file in files:
                if any(file.lower().endswith(fmt) for fmt in supported_formats):
                    image_path = os.path.join(root, file)
                    print(f"Processing {image_path}...")
                    
                    detections = self.detect_and_categorize(image_path, output_dir, conf)
                    all_detections[image_path] = detections
        
        return all_detections
    
    def visualize_detections(self, image_path, output_path=None, conf=0.25):
        """Visualize detections on an image"""
        results = self.predict(image_path, conf=conf)
        
        # Plot results
        annotated_img = results.plot()
        
        if output_path:
            cv2.imwrite(output_path, annotated_img)
        
        return annotated_img


def prepare_dataset(dataset_dir, output_yaml_path):
    

    os.makedirs(os.path.join(dataset_dir, 'images', 'train'), exist_ok=True)
    os.makedirs(os.path.join(dataset_dir, 'images', 'val'), exist_ok=True)
    os.makedirs(os.path.join(dataset_dir, 'labels', 'train'), exist_ok=True)
    os.makedirs(os.path.join(dataset_dir, 'labels', 'val'), exist_ok=True)
    

    data_yaml = {
        'path': os.path.abspath(dataset_dir),
        'train': 'images/train',
        'val': 'images/val',
        'nc': len(UAV_CLASSES),
        'names': UAV_CLASSES
    }
    

    with open(output_yaml_path, 'w') as f:
        yaml.dump(data_yaml, f, default_flow_style=False)
    
    return output_yaml_path


def convert_labelstudio_to_yolo(label_file, image_dir, output_dir, img_width, img_height):
    """
    Convert Label Studio annotations to YOLO format
    
    Args:
        label_file: Path to Label Studio JSON export
        image_dir: Directory containing the original images
        output_dir: Directory to save YOLO format annotations
        img_width, img_height: Original image dimensions
    """
    import json

    os.makedirs(os.path.join(output_dir, 'images', 'train'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'images', 'val'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'labels', 'train'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'labels', 'val'), exist_ok=True)

    with open(label_file, 'r') as f:
        annotations = json.load(f)

    for item in annotations:
        image_name = item['data']['image'].split('/')[-1]
        image_path = os.path.join(image_dir, image_name)
        
    
        if not os.path.exists(image_path):
            continue

        if np.random.rand() < 0.8:
            split = 'train'
        else:
            split = 'val'
        

        shutil.copy(image_path, os.path.join(output_dir, 'images', split, image_name))
   
        label_path = os.path.join(output_dir, 'labels', split, f"{os.path.splitext(image_name)[0]}.txt")
        
        with open(label_path, 'w') as f:
            if 'annotations' not in item:
                continue
                
            for annotation in item['annotations']:
                if 'bboxes' not in annotation['result']:
                    continue
                    
                bbox = annotation['result']['bboxes'][0]['value']
                
                
                label = bbox['labels'][0]
                try:
                    class_id = UAV_CLASSES.index(label)
                except ValueError:
                    print(f"Warning: Label {label} not in class list, skipping...")
                    continue
                
                
                x = bbox['x'] / 100.0  
                y = bbox['y'] / 100.0
                w = bbox['width'] / 100.0
                h = bbox['height'] / 100.0
                
       
                x = max(0, min(1.0, x))
                y = max(0, min(1.0, y))
                w = max(0, min(1.0, w))
                h = max(0, min(1.0, h))
                
                # Write to label file
                f.write(f"{class_id} {x} {y} {w} {h}\n")


def process_five_view_images(detector, input_dirs, output_dir):

    for view_name, dir_path in input_dirs.items():
        view_output_dir = os.path.join(output_dir, view_name)
        os.makedirs(view_output_dir, exist_ok=True)
        
        print(f"Processing {view_name} view images from {dir_path}...")
        detector.process_directory(dir_path, view_output_dir)


def main():

    print("UAV Parts Detection System")
    print("-----------------------")
    
    base_dir = "uav_detection"
    dataset_dir = os.path.join(base_dir, "dataset")
    yaml_path = os.path.join(base_dir, "uav_parts.yaml")
    model_save_path = os.path.join(base_dir, "models")
    
    os.makedirs(base_dir, exist_ok=True)
    os.makedirs(dataset_dir, exist_ok=True)
    os.makedirs(model_save_path, exist_ok=True)
    
    prepare_dataset(dataset_dir, yaml_path)
    
    detector = UAVPartsDetector()
    best_model_path = detector.train(yaml_path, epochs=50)  

    view_dirs = {
        'left': os.path.join(base_dir, ""),
        'right': os.path.join(base_dir, ""),
        'front': os.path.join(base_dir, ""),
        'rear': os.path.join(base_dir, ""),
        'down': os.path.join(base_dir, "")
    }
    
    output_dir = os.path.join(base_dir, "detected_parts")
    process_five_view_images(detector, view_dirs, output_dir)
    
    print(f"Model training completed: {best_model_path}")
    print(f"Detected parts saved to: {output_dir}")


In [None]:
if __name__ == "__main__":
    main()