In [1]:
import os 
import shutil
import random


from transformers import AutoModelForObjectDetection, TrainingArguments, Trainer, YolosImageProcessor
from PIL import Image, ImageDraw
from datasets import Dataset
import numpy as np
import gc

import time
import configparser
import torch
import gc

import cv2

import matplotlib.pyplot as plt


  from .autonotebook import tqdm as notebook_tqdm
Matplotlib is building the font cache; this may take a moment.


In [2]:
# config = configparser.ConfigParser()
# config.read("config.ini")
# AUTH_TOKEN = config["auth"]["token"]

BASE_DIR = os.getcwd()
IMAGE_PROCESSOR_GLOBAL = YolosImageProcessor()


In [3]:
def clear_directory(directory):
    """
    Löscht den gesamten Inhalt eines Verzeichnisses, ohne das Verzeichnis selbst zu entfernen.

    Args:
        directory (str): Pfad zum Verzeichnis.
    """
    if os.path.exists(directory):
        for item in os.listdir(directory):
            item_path = os.path.join(directory, item)
            if os.path.isfile(item_path) or os.path.islink(item_path):
                os.unlink(item_path)  
            elif os.path.isdir(item_path):
                shutil.rmtree(item_path)  


def split_dataset(images_dir, labels_dir, output_dir, test_ratio=0.2, seed=42):
    """
    Teilt einen Datensatz (Bilder + Labels) in Training, Validierung und Test auf.

    Args:
        images_dir (str): Pfad zum Ordner mit den Bildern.
        labels_dir (str): Pfad zum Ordner mit den Labels.
        output_dir (str): Pfad zum Ordner, in dem die aufgeteilten Daten gespeichert werden sollen.
        val_ratio (float): Verhältnis der Validierungsdaten (zwischen 0 und 1). Standard: 0.1.
        test_ratio (float): Verhältnis der Testdaten (zwischen 0 und 1). Standard: 0.1.
        seed (int): Zufallssaat für Reproduzierbarkeit. Standard: 42.

    Returns:
        None
    """


    # Unterordner-Pfade
    train_images_dir = os.path.join(output_dir, "train/images")
    train_labels_dir = os.path.join(output_dir, "train/labels")
    test_images_dir = os.path.join(output_dir, "test/images")
    test_labels_dir = os.path.join(output_dir, "test/labels")
    
    # Ordner neu erstellen
    os.makedirs(train_images_dir, exist_ok=True)
    os.makedirs(train_labels_dir, exist_ok=True)
    os.makedirs(test_images_dir, exist_ok=True)
    os.makedirs(test_labels_dir, exist_ok=True)

    for subdir in [train_images_dir, train_labels_dir, test_images_dir, test_labels_dir]:
        clear_directory(subdir)
        os.makedirs(subdir, exist_ok=True)

    # Liste aller Bilder
    image_files = [f for f in os.listdir(images_dir) if f.endswith(('.jpg', '.png'))]
    
    # Shuffle und Split
    random.seed(seed)
    random.shuffle(image_files)
    
    test_split = int(len(image_files) * test_ratio)
    
    test_files = image_files[:test_split]
    train_files = image_files[test_split:]

    # Dateien kopieren
    def copy_files(file_list, dest_images_dir, dest_labels_dir):
        for file in file_list:
            shutil.copy(os.path.join(images_dir, file), os.path.join(dest_images_dir, file))
            label_file = file.rsplit('.', 1)[0] + '.txt'
            if os.path.exists(os.path.join(labels_dir, label_file)):
                shutil.copy(os.path.join(labels_dir, label_file), os.path.join(dest_labels_dir, label_file))

    copy_files(train_files, train_images_dir, train_labels_dir)
    copy_files(test_files, test_images_dir, test_labels_dir)

    print(f"Train/Validation/Test-Split abgeschlossen! Daten in '{output_dir}' gespeichert.")
    print(f"Train: {len(train_files)} | Test: {len(test_files)}")

yolo_images_dir = os.path.join(BASE_DIR, "Data", "Kugellager_Data", "YOLO_data", "yolo_images_dump")
yolo_labels_dir = os.path.join(BASE_DIR, "Data", "Kugellager_Data", "YOLO_data", "yolo_labels_dump")
yolo_output_dir = os.path.join(BASE_DIR, "Data", "Kugellager_Data", "YOLO_data")

split_dataset(images_dir=yolo_images_dir,
              labels_dir=yolo_labels_dir,
              output_dir=yolo_output_dir,
              test_ratio=0.2,
              seed=42)


Train/Validation/Test-Split abgeschlossen! Daten in 'c:\Users\anohl\OneDrive\Dokumente\A_Uni_stuff\Albstadt\Semester 2\Computer_vision\Aufgaben\Data\Kugellager_Data\YOLO_data' gespeichert.
Train: 480 | Test: 120


In [4]:
def convert_yolo_to_custom_format(images_dir, labels_dir, categories):
    """
    Converts YOLO annotations to a custom dataset format similar to CPPE-5.

    Args:
        images_dir (str): Path to the images directory.
        labels_dir (str): Path to the YOLO labels directory.
        categories (list): List of category names.

    Returns:
        list: A dataset where each entry contains image metadata and associated objects.
    """
    dataset = []
    annotation_id = 0
    image_id = 0

    for image_file in sorted(os.listdir(images_dir)):
        if not image_file.endswith(('.jpg', '.png', '.jpeg')):
            continue

        image_path = os.path.join(images_dir, image_file)

        # Open the image as a PIL image object
        with Image.open(image_path) as img:
            width, height = img.size

            # Prepare the image entry
            image_entry = {
                'image_id': image_id,
                'image': img.copy(),  # Keep a reference to the PIL image
                'width': width,
                'height': height,
                'objects': {
                    'id': [],
                    'area': [],
                    'bbox': [],
                    'category': []
                }
            }

            # Corresponding label file in YOLO format
            label_file = os.path.join(labels_dir, image_file.rsplit('.', 1)[0] + '.txt')
            if os.path.exists(label_file):
                with open(label_file, 'r') as f:
                    for line in f:
                        parts = line.strip().split()
                        category_id = int(parts[0])
                        x_center, y_center, box_width, box_height = map(float, parts[1:])

                        # Convert YOLO to bounding box coordinates
                        x_min = (x_center - box_width / 2) * width
                        y_min = (y_center - box_height / 2) * height
                        bbox_width = box_width * width
                        bbox_height = box_height * height
                        area = bbox_width * bbox_height

                        # Append object data
                        image_entry['objects']['id'].append(annotation_id)
                        image_entry['objects']['area'].append(int(area))
                        image_entry['objects']['bbox'].append([
                            round(x_min, 1),
                            round(y_min, 1),
                            round(bbox_width, 1),
                            round(bbox_height, 1)
                        ])
                        image_entry['objects']['category'].append(category_id)

                        annotation_id += 1

            dataset.append(image_entry)
            image_id += 1

    return dataset


categories_kugellager = [
    {"id": 0, "name": "defect"},
]
images_dir_kugellager_train = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/train/images")
labels_dir_kugellager_train = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/train/labels")

dataset_kugellager_train = Dataset.from_list(convert_yolo_to_custom_format(images_dir_kugellager_train, labels_dir_kugellager_train, categories_kugellager))

images_dir_kugellager_test = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/test/images")
labels_dir_kugellager_test = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/test/labels")

dataset_kugellager_test = Dataset.from_list(convert_yolo_to_custom_format(images_dir_kugellager_test, labels_dir_kugellager_test, categories_kugellager))



images_dir_kugellager_train_halb = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/train_halb/images")
labels_dir_kugellager_train_halb = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/train_halb/labels")

dataset_kugellager_train_halb = Dataset.from_list(convert_yolo_to_custom_format(images_dir_kugellager_train_halb, labels_dir_kugellager_train_halb, categories_kugellager))

images_dir_kugellager_test_halb = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/test_halb/images")
labels_dir_kugellager_test_halb = os.path.join(BASE_DIR, "Data/Kugellager_Data/YOLO_Data/test_halb/labels")

dataset_kugellager_test_halb = Dataset.from_list(convert_yolo_to_custom_format(images_dir_kugellager_test_halb, labels_dir_kugellager_test_halb, categories_kugellager))

categories_oberfläche = [
    {"id": 0, "name": "crazing"},
    {"id": 1, "name": "inclusion"},
    {"id": 2, "name": "patches"},
    {"id": 3, "name": "pitted surface"},
    {"id": 4, "name": "rolled in scale"},
    {"id": 5, "name": "scratches"}
]

images_dir_oberfläche_train = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/train/images")
labels_dir_oberfläche_train = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/train/labels")

dataset_oberfläche_train = Dataset.from_list(convert_yolo_to_custom_format(images_dir_oberfläche_train, labels_dir_oberfläche_train, categories_oberfläche))

images_dir_oberfläche_test = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/test/images")
labels_dir_oberfläche_test = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/test/labels")

dataset_oberfläche_test = Dataset.from_list(convert_yolo_to_custom_format(images_dir_oberfläche_test, labels_dir_oberfläche_test, categories_oberfläche))



images_dir_oberfläche_train_halb = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/train_halb/images")
labels_dir_oberfläche_train_halb = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/train_halb/labels")

dataset_oberfläche_train_halb = Dataset.from_list(convert_yolo_to_custom_format(images_dir_oberfläche_train_halb, labels_dir_oberfläche_train_halb, categories_oberfläche))

images_dir_oberfläche_test_halb = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/test_halb/images")
labels_dir_oberfläche_test_halb = os.path.join(BASE_DIR, "Data/Oberflächen_Data/YOLO_Data/test_halb/labels")

dataset_oberfläche_test_halb = Dataset.from_list(convert_yolo_to_custom_format(images_dir_oberfläche_test_halb, labels_dir_oberfläche_test_halb, categories_oberfläche))


In [5]:
# Das hier war lediglich Code um ein memory Problem zu lösen, das beim Training auftrat

# print("CUDA available:", torch.cuda.is_available())
# print("CUDA device name:", torch.cuda.get_device_name(0))
# print("CUDA version:", torch.version.cuda)
# print("cuDNN version:", torch.backends.cudnn.version())
# total_memory = torch.cuda.get_device_properties(0).total_memory

# # Belegter Speicher (in Bytes)
# allocated_memory = torch.cuda.memory_allocated(0)

# # Zwischengespeicherter Speicher
# cached_memory = torch.cuda.memory_reserved(0)

# print(f"Gesamtspeicher: {total_memory / 1e9:.2f} GB")
# print(f"Belegter Speicher: {allocated_memory / 1e9:.2f} GB")
# print(f"Zwischengespeicherter Speicher: {cached_memory / 1e9:.2f} GB")

In [6]:
def formatted_anns(image_id, category, area, bbox):
    annotations = []
    for i in range(0, len(category)):

        new_ann = {
            "id": image_id,
            "category_id": category[i],  # Hier wird das richtige category ID verwendet
            "isCrowd": 0,
            "area": area[i],
            "bbox": list(bbox[i]),
        }

        annotations.append(new_ann)
    return annotations

# Create annotations such that they match the expected form by the algorithm
def transform_ann(examples, image_processor = YolosImageProcessor()):
    image_ids = examples["image_id"]
    images, bboxes, area, categories = [], [], [], []
    for image, objects in zip(examples["image"], examples["objects"]): 
        image = np.array(image.convert("RGB"))[:, :, ::-1]
        area.append(objects["area"])
        images.append(image)
        bboxes.append(objects["bbox"])
        categories.append(objects["category"])

    targets = [
    {"image_id": id_, "annotations": formatted_anns(id_, cat_, ar_, box_)}
    for id_, cat_, ar_, box_ in zip(image_ids, categories, area, bboxes)
    ]

    return image_processor(images=images, annotations=targets, return_tensors="pt") # Is applied on the whole batch

def collate_fn(batch, image_processor = YolosImageProcessor()):
    pixel_values = [item["pixel_values"] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item["labels"] for item in batch]
    batch = {}
    batch["pixel_values"] = encoding["pixel_values"]
    #batch["pixel_mask"] = encoding["pixel_mask"] # For object detection we do not need this - only needed for segmentation.
    batch["labels"] = labels
    return batch


def model_training(categories, model_name, train_data, validation_data, num_epochs=3, image_processor=YolosImageProcessor(), output_name="Kugellager"):
    # Mapping zwischen IDs und Labels
    id2label = {category['id']: category['name'] for category in categories}
    label2id = {category['name']: category['id'] for category in categories} 

    # Modell initialisieren
    model = AutoModelForObjectDetection.from_pretrained(
        model_name,
        id2label=id2label,
        label2id=label2id,
        ignore_mismatched_sizes=True,  
    )

    device = "cuda:0" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}") 

    if device == "cpu":
        training_args = TrainingArguments(
        output_dir=f"trained_model/{output_name}/{model_name}", 
        remove_unused_columns=False, 
        load_best_model_at_end=False, 
        save_strategy="no", 
        eval_strategy="epoch", 
        per_device_train_batch_size=2, 
        push_to_hub=False,
        logging_steps=10,
        num_train_epochs=num_epochs
        )

    else:
        training_args = TrainingArguments(
        output_dir=f"trained_model/{output_name}/{model_name}", 
        remove_unused_columns=False, 
        load_best_model_at_end=False, 
        save_strategy="no", 
        eval_strategy="epoch", 
        per_device_train_batch_size=10, 
        push_to_hub=False,
        logging_steps=10,
        num_train_epochs=num_epochs
        )

    # Hier wird dafür gesorgt, dass das Modell auch auf der Grafikkarte trainert werden kann, wenn eine vorhanden sein sollte. 
    model.to(device)
    trainer = Trainer(
        model=model,
        args=training_args,
        data_collator=collate_fn, 
        train_dataset=train_data,
        eval_dataset=validation_data,
        tokenizer=image_processor,
    )

    trainer.train()

    evaluation_results = trainer.evaluate()

    # Plotten der Losses aus dem TrainerState
    train_logs = trainer.state.log_history

    # Extrahiere Training und Evaluation Losses
    train_losses = [log['loss'] for log in train_logs if 'loss' in log]
    eval_losses = [log['eval_loss'] for log in train_logs if 'eval_loss' in log]

    plt.figure(figsize=(14, 10))
    
    plt.plot(train_losses, label='Training Loss', color='blue')
    
    if len(eval_losses) > 0:
        # Falls unterschiedliche Längen, interpoliere oder schneide
        if len(eval_losses) != len(train_losses):
            # Interpoliere eval_losses auf die Länge von train_losses
            x_train = np.linspace(0, len(train_losses)-1, len(train_losses))
            x_eval = np.linspace(0, len(train_losses)-1, len(eval_losses))
            eval_losses_interpolated = np.interp(x_train, x_eval, eval_losses)
            
            plt.plot(eval_losses_interpolated, label='Evaluation Loss', color='red')
        else:
            plt.plot(eval_losses, label='Evaluation Loss', color='red')

    plt.title(f'Loss Progression for {model_name}')
    plt.xlabel('Training Steps')
    plt.ylabel('Loss')
    plt.legend()
    plt.tight_layout()
    plt.savefig(f"trained_model/{output_name}/{model_name}/loss_plot.png")
    plt.close()

    save_path = f"trained_model/{output_name}/{model_name}/final_model"
    model.save_pretrained(save_path)

    return evaluation_results, train_losses, eval_losses
    

# Transform data such that it can be feed to the model
train_data_kugellager = dataset_kugellager_train.with_transform(transform_ann)
test_data_kugellager = dataset_kugellager_test.with_transform(transform_ann)

train_data_oberfläche = dataset_oberfläche_train.with_transform(transform_ann)
test_data_oberfläche = dataset_oberfläche_test.with_transform(transform_ann)


train_data_kugellager_halb = dataset_kugellager_train_halb.with_transform(transform_ann)
test_data_kugellager_halb = dataset_kugellager_test_halb.with_transform(transform_ann)

train_data_oberfläche_halb = dataset_oberfläche_train_halb.with_transform(transform_ann)
test_data_oberfläche_halb = dataset_oberfläche_test_halb.with_transform(transform_ann)

best_models_kugellager_dict = {}
best_models_oberfläche_dict = {}

best_models_kugellager_dict_halb = {}
best_models_oberfläche_dict_halb = {}

# "ultralytics/yolov11-m", 
# "jparedesDS/welding-defects-detection",
# "facebook/detr-resnet-50", 
# "hustvl/yolos-small",
# "hustvl/yolos-tiny"
# "jparedesDS/welding-defects-detection",
# "hustvl/yolos-small",
# "hustvl/yolos-tiny",
model_training_list = [ "facebook/detr-resnet-50", ]

# for model_name in model_training_list:
#     evaluation_results, train_losses, eval_losses = model_training(
#         categories=categories_kugellager, 
#         model_name=model_name, 
#         train_data=train_data_kugellager, 
#         validation_data=test_data_kugellager, 
#         num_epochs=30,
#         output_name="Kugellager"
#     )
#     best_models_kugellager_dict[model_name] = {
#         'evaluation_results': evaluation_results,
#         'train_losses': train_losses,
#         'eval_losses': eval_losses
#     }


# for model_name in model_training_list:
#     evaluation_results, train_losses, eval_losses = model_training(
#         categories=categories_oberfläche, 
#         model_name=model_name, 
#         train_data=train_data_oberfläche, 
#         validation_data=test_data_oberfläche, 
#         num_epochs=30,
#         output_name="Oberfläche"
#     )
#     best_models_oberfläche_dict[model_name] = {
#         'evaluation_results': evaluation_results,
#         'train_losses': train_losses,
#         'eval_losses': eval_losses
#     }


# for model_name in model_training_list:
#     evaluation_results, train_losses, eval_losses = model_training(
#         categories=categories_kugellager, 
#         model_name=model_name, 
#         train_data=train_data_kugellager_halb, 
#         validation_data=test_data_kugellager_halb, 
#         num_epochs=30,
#         output_name="Kugellager_halb"
#     )
#     best_models_kugellager_dict_halb[model_name] = {
#         'evaluation_results': evaluation_results,
#         'train_losses': train_losses,
#         'eval_losses': eval_losses
#     }


# for model_name in model_training_list:
#     evaluation_results, train_losses, eval_losses = model_training(
#         categories=categories_oberfläche, 
#         model_name=model_name, 
#         train_data=train_data_oberfläche_halb, 
#         validation_data=test_data_oberfläche_halb, 
#         num_epochs=30,
#         output_name="Oberfläche_halb"
#     )
#     best_models_oberfläche_dict_halb[model_name] = {
#         'evaluation_results': evaluation_results,
#         'train_losses': train_losses,
#         'eval_losses': eval_losses
#     }

In [12]:
def plot_box(img, results, output_dir=None):
    if isinstance(img, str):
        image = Image.open(img)
    else:
        image = img
    image = image.copy()
    draw = ImageDraw.Draw(image)
    
    for obj in results:
        box = obj["box"]
        x = int(box["xmin"])
        y = int(box["ymin"])
        x2 = int(box["xmax"])
        y2 = int(box["ymax"])
        
        draw.rectangle([(x, y), (x2, y2)], outline="orange", width=2)
        
        label_text = f"{obj['label']} - {np.round(obj['score'], 3)}"
        draw.text((x, y - 20), label_text, fill="orange", stroke_width=2, stroke_fill="white")
    
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)
        output_path = os.path.join(output_dir, f"detection_result_{np.random.randint(10e6)}.jpg")
        image.save(output_path)
        return output_path
    return np.array(image) 

def load_model_for_inference(model_path, device="cuda" if torch.cuda.is_available() else "cpu"):
    """Load the trained model and move it to the specified device"""
    model = AutoModelForObjectDetection.from_pretrained(model_path)
    model.to(device)
    model.eval()
    return model

def get_predictions(model, image, image_processor=YolosImageProcessor(), confidence_threshold=0.5):
    """Process an image and return predictions with timing information"""
    start_time = time.time()
    
    if isinstance(image, str):
        image = Image.open(image)
    if image.mode != "RGB":
        image = image.convert("RGB")
    
    # Preprocessing timing
    preprocess_start = time.time()
    inputs = image_processor(images=image, return_tensors="pt")
    inputs = {k: v.to(model.device) for k, v in inputs.items()}
    preprocess_time = time.time() - preprocess_start
    
    # Inference timing
    inference_start = time.time()
    with torch.no_grad():
        outputs = model(**inputs)
    inference_time = time.time() - inference_start
    
    # Post-processing timing
    postprocess_start = time.time()
    results = []
    
    target_size = image_processor.post_process_object_detection(
        outputs, 
        target_sizes=[(image.size[1], image.size[0])],
        threshold=confidence_threshold
    )[0]
    
    for score, label, box in zip(
        target_size["scores"].tolist(),
        target_size["labels"].tolist(),
        target_size["boxes"].tolist()
    ):
        if score >= confidence_threshold:
            results.append({
                "score": score,
                "label": model.config.id2label[label],
                "box": {
                    "xmin": box[0],
                    "ymin": box[1],
                    "xmax": box[2],
                    "ymax": box[3]
                }
            })
    postprocess_time = time.time() - postprocess_start
    
    # Total time
    total_time = time.time() - start_time
    
    timing_info = {
        "preprocess_time": preprocess_time,
        "inference_time": inference_time,
        "postprocess_time": postprocess_time,
        "total_time": total_time
    }
    
    return results, timing_info

def detect_and_visualize(model_path, image_path, output_dir, confidence_threshold=0.5):
    """
    Führt die nötigen Funktionen aus um ein paar test-Bilder zu lablen un gibt Informaitonen über das Timing aus.
    """
    model = load_model_for_inference(model_path)
    predictions, timing_info = get_predictions(model, image_path, confidence_threshold=confidence_threshold)
    output_path = plot_box(image_path, predictions, output_dir)
    
    print(f"Processing times:")
    print(f"Preprocessing: {timing_info['preprocess_time']*1000:.1f}ms")
    print(f"Inference: {timing_info['inference_time']*1000:.1f}ms")
    print(f"Postprocessing: {timing_info['postprocess_time']*1000:.1f}ms")
    print(f"Total time: {timing_info['total_time']*1000:.1f}ms")
    
    return output_path, timing_info


def load_random_images(image_folder, num_images=10):
    """
    Lädt zufällig ausgewählte Bilder aus einem Ordner.
    """
    image_files = [f for f in os.listdir(image_folder) if f.lower().endswith(('jpg', 'jpeg', 'png'))]
    selected_files = random.sample(image_files, min(len(image_files), num_images))
    return [os.path.join(image_folder, f) for f in selected_files]


# model_path_oberfläche_yolo = os.path.join(BASE_DIR, "trained_model", "Oberfläche", "hustvl", "yolos-tiny", "final_model")
# model_path_oberfläche_resnet = os.path.join(BASE_DIR, "trained_model", "Oberfläche", "facebook", "detr-resnet-50", "final_model")
# list_pics_oberfläche = load_random_images(images_dir_oberfläche_test)
# for pic in list_pics_oberfläche:
#     detect_and_visualize(model_path=model_path_oberfläche_yolo, image_path=pic, output_dir="test_labeling_pics_oberfläche_yolo")
#     detect_and_visualize(model_path=model_path_oberfläche_resnet, image_path=pic, output_dir="test_labeling_pics_oberfläche_resnet", confidence_threshold=0.4)


# model_path_kugellager_yolo = os.path.join(BASE_DIR, "trained_model", "Kugellager", "hustlv", "yolos-tiny", "final_model")
model_path_kugellager_yolo = r"C:\Users\anohl\OneDrive\Dokumente\A_Uni_stuff\Albstadt\Semester 2\Computer_vision\Aufgaben\trained_model\Oberfläche\hustvl\yolos-tiny\final_model"
# model_path_kugellager_resnet = os.path.join(BASE_DIR, "trained_model", "Kugellager", "facebook", "detr-resnet-50", "final_model")
# list_pics_kugellager = load_random_images(images_dir_kugellager_test)
# for pic in list_pics_kugellager:
#     detect_and_visualize(model_path=model_path_kugellager_yolo, image_path=pic, output_dir="test_labeling_pics_kugellager_yolo")
#     detect_and_visualize(model_path=model_path_kugellager_resnet, image_path=pic, output_dir="test_labeling_pics_kugellager_resnet", confidence_threshold=0.4)

In [14]:
def run_video_detection(model_path, camera_id=0, confidence_threshold=0.5):
    """Run real-time object detection on video stream"""
    print("Loading model...")
    model = load_model_for_inference(model_path)
    image_processor = YolosImageProcessor()
    print("Model loaded successfully!")
    
    print("Opening camera...")
    cap = cv2.VideoCapture(camera_id)
    if not cap.isOpened():
        raise ValueError(f"Could not open camera with ID {camera_id}")
    print("Camera opened successfully!")
    
    # Create window
    window_name = 'Object Detection'
    cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
    
    # Initialize FPS counter
    fps_start_time = time.time()
    fps_counter = 0
    fps = 0
    
    print("Starting video stream. Press 'q' to quit.")
    
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Failed to grab frame")
                break
            
            # Convert BGR to RGB
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(rgb_frame)
            
            # Get predictions with timing
            predictions, timing_info = get_predictions(model, pil_image, 
                                                    image_processor=image_processor,
                                                    confidence_threshold=confidence_threshold)
            
            # Draw boxes
            result_image = plot_box(pil_image, predictions)
            
            # Convert back to BGR for OpenCV
            result_frame = cv2.cvtColor(result_image, cv2.COLOR_RGB2BGR)
            
            # Calculate and display FPS
            fps_counter += 1
            if time.time() - fps_start_time > 1:
                fps = fps_counter
                fps_counter = 0
                fps_start_time = time.time()
            
            # Add timing information to frame
            cv2.putText(result_frame, f"FPS: {fps}", (10, 30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
            cv2.putText(result_frame, f"Inference: {timing_info['inference_time']*1000:.1f}ms", 
                       (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
            
            # Show the frame
            cv2.imshow(window_name, result_frame)
            
            # Break on 'q' press
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                print("Quitting...")
                break
            
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        
    finally:
        print("Cleaning up...")
        cap.release()
        cv2.destroyAllWindows()

# Example usage for single image with timing
def detect_and_visualize(model_path, image_path, output_dir, confidence_threshold=0.5):
    model = load_model_for_inference(model_path)
    predictions, timing_info = get_predictions(model, image_path, confidence_threshold=confidence_threshold)
    output_path = plot_box(image_path, predictions, output_dir)
    
    print(f"Processing times:")
    print(f"Preprocessing: {timing_info['preprocess_time']*1000:.1f}ms")
    print(f"Inference: {timing_info['inference_time']*1000:.1f}ms")
    print(f"Postprocessing: {timing_info['postprocess_time']*1000:.1f}ms")
    print(f"Total time: {timing_info['total_time']*1000:.1f}ms")
    
    return output_path, timing_info



run_video_detection(model_path_kugellager_yolo, camera_id=0, confidence_threshold=0.5)

Loading model...
Model loaded successfully!
Opening camera...
Camera opened successfully!
Starting video stream. Press 'q' to quit.
Quitting...
Cleaning up...


In [22]:
for model_name, score in best_models_kugellager_dict.items():
    print(f"Model über alle Daten: {model_name}, performte mit folgenden Scores:\n{score}")

for model_name, score in best_models_oberfläche_dict.items():
    print(f"Model über alle Daten: {model_name}, performte mit folgenden Scores:\n{score}")

for model_name, score in best_models_kugellager_dict_halb.items():
    print(f"Model über hälfte der Daten: {model_name}, performte mit folgenden Scores:\n{score}")

for model_name, score in best_models_oberfläche_dict_halb.items():
    print(f"Model über hälfte der Daten: {model_name}, performte mit folgenden Scores:\n{score}")


with open("model_performance_results.txt", "w") as f:
    # Ergebnisse für best_models_kugellager_dict
    for model_name, score in best_models_kugellager_dict.items():
        f.write(f"Model über alle Daten: {model_name}, performte mit folgenden Scores:\n{score}\n\n")

    # Ergebnisse für best_models_oberfläche_dict
    for model_name, score in best_models_oberfläche_dict.items():
        f.write(f"Model über alle Daten: {model_name}, performte mit folgenden Scores:\n{score}\n\n")

    # Ergebnisse für best_models_kugellager_dict_halb
    for model_name, score in best_models_kugellager_dict_halb.items():
        f.write(f"Model über hälfte der Daten: {model_name}, performte mit folgenden Scores:\n{score}\n\n")

    # Ergebnisse für best_models_oberfläche_dict_halb
    for model_name, score in best_models_oberfläche_dict_halb.items():
        f.write(f"Model über hälfte der Daten: {model_name}, performte mit folgenden Scores:\n{score}\n\n")

model: facebook/detr-resnet-50, performte mit folgenden Scores:
{'evaluation_results': {'eval_loss': 0.9217938780784607, 'eval_runtime': 6.6457, 'eval_samples_per_second': 18.057, 'eval_steps_per_second': 2.257, 'epoch': 30.0}, 'train_losses': [2.1437, 2.1104, 1.8866, 1.4919, 1.3816, 1.4724, 1.346, 1.3652, 1.29, 1.3952, 1.4707, 1.7389, 1.882, 1.7675, 2.09, 1.5142, 1.3907, 1.4731, 1.7424, 1.5529, 1.4501, 1.6311, 1.3621, 1.3207, 1.325, 1.1566, 1.2321, 1.1528, 1.1561, 1.4008, 1.3711, 1.2979, 1.2548, 1.1528, 1.4543, 1.2188, 1.1515, 1.2485, 1.2591, 1.1641, 1.3861, 1.0755, 1.163, 1.194, 1.115, 0.9685, 1.1241, 1.1063, 1.225, 1.0863, 1.116, 1.1841, 1.0292, 0.939, 1.1251, 1.1066, 1.1021, 0.9601, 0.9873, 1.1667, 1.2175, 1.2583, 1.1623, 1.0874, 1.1932, 1.1639, 1.0901, 1.1069, 1.0272, 1.0402, 1.0037, 1.0132, 0.9903, 1.0413, 0.912, 0.8118, 0.9497, 0.8802, 0.9274, 0.969, 0.9352, 0.9586, 0.9883, 0.9851, 0.9245, 0.8756, 0.971, 0.8926, 1.1114, 0.9714, 0.9641, 0.873, 0.9988, 0.9649, 0.9028, 0.8621, 1.06

In [12]:
if torch.cuda.is_available():
    torch.cuda.empty_cache()
    torch.cuda.synchronize() 


gc.collect()

6896

In [13]:
# from transformers import AutoImageProcessor, pipeline
# from PIL import Image, ImageDraw, ImageFont

# random_images = random.sample(dataset_oberfläche_test["image"], 10)
# for test_image in random_images:
#     test = r"C:\Users\anohl\OneDrive\Dokumente\A_Uni_stuff\Albstadt\Semester 2\Computer_vision\Aufgaben\trained_model\hustvl\yolos-tiny\final_model"
#     out = pipeline("object-detection", model=test, image_processor=YolosImageProcessor())(test_image, threshold=0)

#     # Normalize the prediction thresholds
#     outs = [o["score"] for o in out]
#     filtered_out = []
#     threshold = 0.9
#     for o in out:
#         o["score"] = 1 / max(outs) * o["score"]
#         if o["score"] >= threshold:
#             filtered_out.append(o)
#             print(filtered_out)

#     def plot_box(img, results):
#         try:
#             image = Image.open(img)
#         except:
#             image = img
#         draw = ImageDraw.Draw(image)
#         for i, obj in enumerate(results):
#             box = [round(obj["box"][value], 2) for value in obj["box"].keys()]
#             x, y, x2, y2 = tuple(box)
#             draw.rectangle((x, y, x2, y2), outline="orange", width=2)
#             draw.text((x, y), f"{obj['label']} - {np.round(obj['score'], 3)}", fill="orange", stroke_width=2, stroke_fill="white")
#         image.save(f"./{np.random.randint(10e6)}.jpg")

#     # Visualize Results
#     plot_box(test_image, filtered_out)

In [14]:
# device = "cuda:0" if torch.cuda.is_available() else "cpu"
# print(f"Using device: {device}")

# # Verwendung von yolv8m um eine Blance zwischen performance und Genauigkeit zu haben 
# model = YOLO("yolov8m.pt")

# # Train the model
# path_to_yolo_yaml = os.path.join(BASE_DIR, "yaml_files", "yolo_dataset.yaml")
# train_results = model.train(
#     data=path_to_yolo_yaml,
#     epochs=400,
#     imgsz=150, # Weil das die tatsächliche Größe unserer Bilder darstellt
#     device=device,
#     batch=16,
#     mosaic=1.0,
# )

# # Evaluate model performance on the validation set
# metrics = model.val()
# print(metrics)

# # Export the model to the same directory as the script
# export_path = os.path.join(BASE_DIR, "Models", "yolo_kugellager_modell.pt")
# model_path = model.export()
# shutil.move(src=model_path, dst=export_path)
# print(f"Model exported to: {export_path}")


In [15]:
# metrics = model.val()
# print(f"Precision: {metrics['precision']}")
# print(f"Recall: {metrics['recall']}")
# print(f"mAP@0.5: {metrics['map50']}")
# print(f"mAP@0.5:0.95: {metrics['map']}")